• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    Go实现一个轻量级并发任务调度器(支持限速)

    发布者: 晋3555 | 发布时间: 2025-8-14 14:40| 查看数: 8| 评论数: 0|帖子模式

    前言

    在日常开发中,我们经常会遇到这样的场景:

    • 有一堆任务要跑(比如:发请求、处理数据、爬虫等)
    • 不希望一次性全部跑完,担心打爆服务端或者被封
    • 想要设置并发数、限速,还能控制任务重试、失败记录
    那么,能不能用 Go 实现一个“轻量级的并发任务调度器”?——答案是:当然可以!
    今天我们就来用 Go 从零实现一个可配置的任务调度器,支持:

    • 最大并发数控制(worker pool)
    • 每秒请求速率限制(rate limit)
    • 简单的失败重试机制
    • 支持结果收集与错误输出

    效果展示

    你可以像这样调用我们的调度器:
    1. scheduler := NewScheduler(5, 10) // 并发 5,速率限制每秒 10 次

    2. for i := 0; i < 100; i++ {
    3.     task := NewTask(func() error {
    4.         // 模拟网络请求或业务逻辑
    5.         fmt.Println("正在处理任务:", i)
    6.         time.Sleep(300 * time.Millisecond)
    7.         return nil
    8.     })
    9.     scheduler.Submit(task)
    10. }

    11. scheduler.Wait()
    12. fmt.Println("全部任务完成")
    复制代码
    核心组件设计


    1. 任务(Task)

    我们将每个任务抽象为一个结构体:
    1. type Task struct {
    2.     fn   func() error
    3.     retry int
    4. }
    复制代码
    2. 调度器(Scheduler)

    负责维护任务队列、worker、速率限制器:
    1. type Scheduler struct {
    2.     tasks       chan *Task
    3.     wg          sync.WaitGroup
    4.     rateLimiter &lt;-chan time.Time
    5. }
    复制代码
    实现代码

    下面是完整实现(可以直接复制使用):
    1. type Task struct {
    2.     fn    func() error
    3.     retry int
    4. }

    5. func NewTask(fn func() error) *Task {
    6.     return &Task{fn: fn, retry: 3}
    7. }

    8. type Scheduler struct {
    9.     tasks       chan *Task
    10.     wg          sync.WaitGroup
    11.     rateLimiter <-chan time.Time
    12. }

    13. func NewScheduler(concurrency int, ratePerSecond int) *Scheduler {
    14.     s := &Scheduler{
    15.         tasks:       make(chan *Task, 100),
    16.         rateLimiter: time.Tick(time.Second / time.Duration(ratePerSecond)),
    17.     }

    18.     for i := 0; i < concurrency; i++ {
    19.         go s.worker()
    20.     }

    21.     return s
    22. }

    23. func (s *Scheduler) Submit(task *Task) {
    24.     s.wg.Add(1)
    25.     s.tasks <- task
    26. }

    27. func (s *Scheduler) worker() {
    28.     for task := range s.tasks {
    29.         <-s.rateLimiter // 限速

    30.         err := task.fn()
    31.         if err != nil && task.retry > 0 {
    32.             fmt.Println("任务失败,重试中...")
    33.             task.retry--
    34.             s.Submit(task)
    35.         } else if err != nil {
    36.             fmt.Println("任务最终失败:", err)
    37.         }

    38.         s.wg.Done()
    39.     }
    40. }

    41. func (s *Scheduler) Wait() {
    42.     s.wg.Wait()
    43.     close(s.tasks)
    44. }
    复制代码
    实战应用场景


    • 网络爬虫限速抓取
    • 批量发送邮件/SMS/请求,防止接口限流
    • 云服务任务调度、批量自动化操作
    • 异步数据采集和聚合

    总结

    Go 的并发模型非常适合处理“海量任务 + 控制速率 + 错误重试”的需求。本篇实现的调度器非常轻量,适合作为基础组件集成到你自己的系统中。
    如果你有更多需求,比如:

    • 增加失败回调
    • 支持超时控制
    • 任务优先级
    • 后台监控 dashboard
    到此这篇关于Go实现一个轻量级并发任务调度器(支持限速)的文章就介绍到这了,更多相关Go 并发任务调度器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    来源:互联网
    免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表