• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    使用Python实现一个优雅的异步定时器

    发布者: 雪落无声 | 发布时间: 2025-6-14 12:22| 查看数: 118| 评论数: 0|帖子模式

    需求背景

    定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:

    • 支持异步操作,避免阻塞主线程;
    • 单例化事件循环,节省资源;
    • 优雅地管理定时器的生命周期;
    • 提供简单的接口,易于使用。
    为此,设计了一个
    1. Timer
    复制代码
    类,结合
    1. asyncio
    复制代码
    1. threading
    复制代码
    ,实现了一个高效的定时器。

    代码

    完整代码
    1. import asyncio
    2. import threading
    3. import time
    4. import sys

    5. class Timer:
    6.     _loop = None
    7.     _thread = None
    8.     _lock = threading.Lock()
    9.     _running_timers = 0

    10.     @classmethod
    11.     def _ensure_loop(cls):
    12.         with cls._lock:
    13.             if cls._loop is None or not cls._thread or not cls._thread.is_alive():
    14.                 cls._loop = asyncio.new_event_loop()
    15.                 cls._thread = threading.Thread(
    16.                     target=cls._run_loop,
    17.                     args=(cls._loop,),
    18.                     daemon=True
    19.                 )
    20.                 cls._thread.start()

    21.     @classmethod
    22.     def _run_loop(cls, loop):
    23.         asyncio.set_event_loop(loop)
    24.         try:
    25.             loop.run_forever()
    26.         except Exception as e:
    27.             print(f"事件循环异常: {e}")
    28.         finally:
    29.             loop.close()

    30.     @classmethod
    31.     def _shutdown(cls):
    32.         with cls._lock:
    33.             if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
    34.                 cls._loop.call_soon_threadsafe(cls._loop.stop)
    35.                 # 不使用 join,因为守护线程会在主线程退出时自动结束

    36.     def __init__(self):
    37.         self.is_running = False
    38.         self._stop_event = asyncio.Event()
    39.         self._task = None

    40.     async def _timer_loop(self, interval, callback):
    41.         try:
    42.             while not self._stop_event.is_set():
    43.                 await asyncio.sleep(interval)
    44.                 if not self._stop_event.is_set():
    45.                     await asyncio.get_event_loop().run_in_executor(None, callback)
    46.         except asyncio.CancelledError:
    47.             pass  # 正常取消时忽略
    48.         except Exception as e:
    49.             print(f"定时器循环异常: {e}")
    50.         finally:
    51.             self.is_running = False
    52.             Timer._running_timers -= 1
    53.             Timer._shutdown()

    54.     def start(self, interval, callback):
    55.         if not self.is_running:
    56.             Timer._ensure_loop()
    57.             self.is_running = True
    58.             self._stop_event.clear()
    59.             self._task = asyncio.run_coroutine_threadsafe(
    60.                 self._timer_loop(interval, callback),
    61.                 Timer._loop
    62.             )
    63.             Timer._running_timers += 1
    64.             # print(f"定时器已启动,每{interval}秒执行一次")

    65.     def stop(self):
    66.         if self.is_running:
    67.             self._stop_event.set()
    68.             if self._task:
    69.                 Timer._loop.call_soon_threadsafe(self._task.cancel)
    70.             self.is_running = False
    71.             # print("定时器已停止")

    72.     def is_active(self):
    73.         return self.is_running

    74. # 使用示例
    75. def callback1():
    76.     print(f"回调1触发: {time.strftime('%H:%M:%S')}")

    77. def callback2():
    78.     print(f"回调2触发: {time.strftime('%H:%M:%S')}")

    79. if __name__ == "__main__":
    80.     timer1 = Timer()
    81.     timer2 = Timer()

    82.     timer1.start(2, callback1)
    83.     timer2.start(3, callback2)

    84.     try:
    85.         time.sleep(100)
    86.         timer1.stop()
    87.         time.sleep(2)
    88.         timer2.stop()
    89.     except KeyboardInterrupt:
    90.         timer1.stop()
    91.         timer2.stop()
    92.     finally:
    93.         # 确保在程序退出时清理
    94.         Timer._shutdown()
    复制代码
    1. 单例事件循环的实现

    为了避免每个定时器都创建一个独立的事件循环,在
    1. Timer
    复制代码
    类中使用了类变量和类方法来管理全局唯一的事件循环:
    1. class Timer:
    2.     _loop = None
    3.     _thread = None
    4.     _lock = threading.Lock()
    5.     _running_timers = 0

    6.     @classmethod
    7.     def _ensure_loop(cls):
    8.         with cls._lock:
    9.             if cls._loop is None or not cls._thread or not cls._thread.is_alive():
    10.                 cls._loop = asyncio.new_event_loop()
    11.                 cls._thread = threading.Thread(
    12.                     target=cls._run_loop,
    13.                     args=(cls._loop,),
    14.                     daemon=True
    15.                 )
    16.                 cls._thread.start()
    复制代码

      1. _loop
      复制代码
      :存储全局的
      1. asyncio
      复制代码
      事件循环。
      1. _thread
      复制代码
      :将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
      1. _lock
      复制代码
      :线程锁,确保在多线程环境中创建事件循环时的线程安全。
      1. _ensure_loop
      复制代码
      :在需要时创建或重用事件循环,确保只有一个全局循环。
    守护线程(
    1. daemon=True
    复制代码
    )的设计使得程序退出时无需显式关闭线程,简化了资源清理。

    2. 事件循环的运行与关闭

    事件循环的运行逻辑封装在
    1. _run_loop
    复制代码
    中:
    1. @classmethod
    2. def _run_loop(cls, loop):
    3.     asyncio.set_event_loop(loop)
    4.     try:
    5.         loop.run_forever()
    6.     except Exception as e:
    7.         print(f"事件循环异常: {e}")
    8.     finally:
    9.         loop.close()
    复制代码

      1. run_forever
      复制代码
      :让事件循环持续运行,直到被外部停止。
    • 异常处理:捕获可能的错误并打印,便于调试。
      1. finally
      复制代码
      :确保循环关闭时资源被正确释放。
    关闭逻辑则由
    1. _shutdown
    复制代码
    方法控制:
    1. @classmethod
    2. def _shutdown(cls):
    3.     with cls._lock:
    4.         if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
    5.             cls._loop.call_soon_threadsafe(cls._loop.stop)
    复制代码
    当所有定时器都停止时(
    1. _running_timers == 0
    复制代码
    ),事件循环会被安全停止。

    3. 定时器核心逻辑

    每个
    1. Timer
    复制代码
    实例负责管理一个独立的定时任务:
    1. def __init__(self):
    2.     self.is_running = False
    3.     self._stop_event = asyncio.Event()
    4.     self._task = None

    5. async def _timer_loop(self, interval, callback):
    6.     try:
    7.         while not self._stop_event.is_set():
    8.             await asyncio.sleep(interval)
    9.             if not self._stop_event.is_set():
    10.                 await asyncio.get_event_loop().run_in_executor(None, callback)
    11.     except asyncio.CancelledError:
    12.         pass  # 正常取消时忽略
    13.     finally:
    14.         self.is_running = False
    15.         Timer._running_timers -= 1
    16.         Timer._shutdown()
    复制代码

      1. _stop_event
      复制代码
      :一个
      1. asyncio.Event
      复制代码
      对象,用于控制定时器的停止。
      1. _timer_loop
      复制代码
      :异步协程,每隔
      1. interval
      复制代码
      秒执行一次回调函数
      1. callback
      复制代码

      1. run_in_executor
      复制代码
      :将回调函数运行在默认的线程池中,避免阻塞事件循环。

    4. 启动与停止

    启动和停止方法是用户的主要接口:
    1. def start(self, interval, callback):
    2.     if not self.is_running:
    3.         Timer._ensure_loop()
    4.         self.is_running = True
    5.         self._stop_event.clear()
    6.         self._task = asyncio.run_coroutine_threadsafe(
    7.             self._timer_loop(interval, callback),
    8.             Timer._loop
    9.         )
    10.         Timer._running_timers += 1

    11. def stop(self):
    12.     if self.is_running:
    13.         self._stop_event.set()
    14.         if self._task:
    15.             Timer._loop.call_soon_threadsafe(self._task.cancel)
    16.         self.is_running = False
    复制代码

      1. start
      复制代码
      :启动定时器,确保事件循环可用,并记录运行中的定时器数量。
      1. stop
      复制代码
      :通过设置
      1. _stop_event
      复制代码
      并取消任务来停止定时器。

    5. 使用示例

    以下是一个简单的使用示例:
    1. def callback1():
    2.     print(f"回调1触发: {time.strftime('%H:%M:%S')}")

    3. def callback2():
    4.     print(f"回调2触发: {time.strftime('%H:%M:%S')}")

    5. timer1 = Timer()
    6. timer2 = Timer()

    7. timer1.start(2, callback1)  # 每2秒触发一次
    8. timer2.start(3, callback2)  # 每3秒触发一次

    9. time.sleep(10)  # 运行10秒
    10. timer1.stop()
    11. timer2.stop()
    复制代码
    输出可能如下:
    1. 回调1触发: 14:30:02
    2. 回调2触发: 14:30:03
    3. 回调1触发: 14:30:04
    4. 回调1触发: 14:30:06
    5. 回调2触发: 14:30:06
    6. ...
    复制代码
    设计亮点


    • 异步与多线程结合:通过
      1. asyncio
      复制代码
      1. threading
      复制代码
      ,实现了非阻塞的定时器,适合高并发场景。
    • 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
    • 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
    • 线程安全:使用锁机制确保多线程环境下的稳定性。

    适用场景


    • 周期性任务调度,如数据刷新、状态检查。
    • 后台服务中的定时监控。
    • 游戏或实时应用中的计时器需求。

    总结

    这个异步定时器实现结合了 Python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!
    以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注脚本之家其它相关文章!

    来源:https://www.jb51.net/python/339974n5e.htm
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表