需求背景
定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:
- 支持异步操作,避免阻塞主线程;
- 单例化事件循环,节省资源;
- 优雅地管理定时器的生命周期;
- 提供简单的接口,易于使用。
为此,设计了一个类,结合和,实现了一个高效的定时器。
代码
完整代码- import asyncio
- import threading
- import time
- import sys
- class Timer:
- _loop = None
- _thread = None
- _lock = threading.Lock()
- _running_timers = 0
- @classmethod
- def _ensure_loop(cls):
- with cls._lock:
- if cls._loop is None or not cls._thread or not cls._thread.is_alive():
- cls._loop = asyncio.new_event_loop()
- cls._thread = threading.Thread(
- target=cls._run_loop,
- args=(cls._loop,),
- daemon=True
- )
- cls._thread.start()
- @classmethod
- def _run_loop(cls, loop):
- asyncio.set_event_loop(loop)
- try:
- loop.run_forever()
- except Exception as e:
- print(f"事件循环异常: {e}")
- finally:
- loop.close()
- @classmethod
- def _shutdown(cls):
- with cls._lock:
- if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
- cls._loop.call_soon_threadsafe(cls._loop.stop)
- # 不使用 join,因为守护线程会在主线程退出时自动结束
- def __init__(self):
- self.is_running = False
- self._stop_event = asyncio.Event()
- self._task = None
- async def _timer_loop(self, interval, callback):
- try:
- while not self._stop_event.is_set():
- await asyncio.sleep(interval)
- if not self._stop_event.is_set():
- await asyncio.get_event_loop().run_in_executor(None, callback)
- except asyncio.CancelledError:
- pass # 正常取消时忽略
- except Exception as e:
- print(f"定时器循环异常: {e}")
- finally:
- self.is_running = False
- Timer._running_timers -= 1
- Timer._shutdown()
- def start(self, interval, callback):
- if not self.is_running:
- Timer._ensure_loop()
- self.is_running = True
- self._stop_event.clear()
- self._task = asyncio.run_coroutine_threadsafe(
- self._timer_loop(interval, callback),
- Timer._loop
- )
- Timer._running_timers += 1
- # print(f"定时器已启动,每{interval}秒执行一次")
- def stop(self):
- if self.is_running:
- self._stop_event.set()
- if self._task:
- Timer._loop.call_soon_threadsafe(self._task.cancel)
- self.is_running = False
- # print("定时器已停止")
- def is_active(self):
- return self.is_running
- # 使用示例
- def callback1():
- print(f"回调1触发: {time.strftime('%H:%M:%S')}")
- def callback2():
- print(f"回调2触发: {time.strftime('%H:%M:%S')}")
- if __name__ == "__main__":
- timer1 = Timer()
- timer2 = Timer()
- timer1.start(2, callback1)
- timer2.start(3, callback2)
- try:
- time.sleep(100)
- timer1.stop()
- time.sleep(2)
- timer2.stop()
- except KeyboardInterrupt:
- timer1.stop()
- timer2.stop()
- finally:
- # 确保在程序退出时清理
- Timer._shutdown()
复制代码 1. 单例事件循环的实现
为了避免每个定时器都创建一个独立的事件循环,在类中使用了类变量和类方法来管理全局唯一的事件循环:- class Timer:
- _loop = None
- _thread = None
- _lock = threading.Lock()
- _running_timers = 0
- @classmethod
- def _ensure_loop(cls):
- with cls._lock:
- if cls._loop is None or not cls._thread or not cls._thread.is_alive():
- cls._loop = asyncio.new_event_loop()
- cls._thread = threading.Thread(
- target=cls._run_loop,
- args=(cls._loop,),
- daemon=True
- )
- cls._thread.start()
复制代码
- :存储全局的事件循环。
- :将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
- :线程锁,确保在多线程环境中创建事件循环时的线程安全。
- :在需要时创建或重用事件循环,确保只有一个全局循环。
守护线程()的设计使得程序退出时无需显式关闭线程,简化了资源清理。
2. 事件循环的运行与关闭
事件循环的运行逻辑封装在中:- @classmethod
- def _run_loop(cls, loop):
- asyncio.set_event_loop(loop)
- try:
- loop.run_forever()
- except Exception as e:
- print(f"事件循环异常: {e}")
- finally:
- loop.close()
复制代码
- :让事件循环持续运行,直到被外部停止。
- 异常处理:捕获可能的错误并打印,便于调试。
- :确保循环关闭时资源被正确释放。
关闭逻辑则由方法控制:- @classmethod
- def _shutdown(cls):
- with cls._lock:
- if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
- cls._loop.call_soon_threadsafe(cls._loop.stop)
复制代码 当所有定时器都停止时(),事件循环会被安全停止。
3. 定时器核心逻辑
每个实例负责管理一个独立的定时任务:- def __init__(self):
- self.is_running = False
- self._stop_event = asyncio.Event()
- self._task = None
- async def _timer_loop(self, interval, callback):
- try:
- while not self._stop_event.is_set():
- await asyncio.sleep(interval)
- if not self._stop_event.is_set():
- await asyncio.get_event_loop().run_in_executor(None, callback)
- except asyncio.CancelledError:
- pass # 正常取消时忽略
- finally:
- self.is_running = False
- Timer._running_timers -= 1
- Timer._shutdown()
复制代码
- :一个对象,用于控制定时器的停止。
- :异步协程,每隔秒执行一次回调函数。
- :将回调函数运行在默认的线程池中,避免阻塞事件循环。
4. 启动与停止
启动和停止方法是用户的主要接口:- def start(self, interval, callback):
- if not self.is_running:
- Timer._ensure_loop()
- self.is_running = True
- self._stop_event.clear()
- self._task = asyncio.run_coroutine_threadsafe(
- self._timer_loop(interval, callback),
- Timer._loop
- )
- Timer._running_timers += 1
- def stop(self):
- if self.is_running:
- self._stop_event.set()
- if self._task:
- Timer._loop.call_soon_threadsafe(self._task.cancel)
- self.is_running = False
复制代码
- :启动定时器,确保事件循环可用,并记录运行中的定时器数量。
- :通过设置并取消任务来停止定时器。
5. 使用示例
以下是一个简单的使用示例:- def callback1():
- print(f"回调1触发: {time.strftime('%H:%M:%S')}")
- def callback2():
- print(f"回调2触发: {time.strftime('%H:%M:%S')}")
- timer1 = Timer()
- timer2 = Timer()
- timer1.start(2, callback1) # 每2秒触发一次
- timer2.start(3, callback2) # 每3秒触发一次
- time.sleep(10) # 运行10秒
- timer1.stop()
- timer2.stop()
复制代码 输出可能如下:- 回调1触发: 14:30:02
- 回调2触发: 14:30:03
- 回调1触发: 14:30:04
- 回调1触发: 14:30:06
- 回调2触发: 14:30:06
- ...
复制代码 设计亮点
- 异步与多线程结合:通过和,实现了非阻塞的定时器,适合高并发场景。
- 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
- 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
- 线程安全:使用锁机制确保多线程环境下的稳定性。
适用场景
- 周期性任务调度,如数据刷新、状态检查。
- 后台服务中的定时监控。
- 游戏或实时应用中的计时器需求。
总结
这个异步定时器实现结合了 Python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!
以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注脚本之家其它相关文章!
来源:https://www.jb51.net/python/339974n5e.htm
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|