编程 用 Go 实现一个轻量级并发任务调度器(支持限速)

2025-05-07 12:48:52 +0800 CST views 118

用 Go 实现一个轻量级并发任务调度器(支持限速)

在学习 Go 的过程中,我尝试实现了一个轻量级的并发任务调度器,并支持速率限制、任务重试、优先级控制等功能。本文记录这一实现过程,也希望能为其他 Go 学习者提供参考。


🧭 为什么需要任务调度器

在微服务或分布式系统中,我们经常遇到以下问题:

  • 系统需要处理成千上万的异步任务(如 HTTP 请求、爬虫抓取等)
  • 控制并发量,避免下游服务压力过大
  • 遵守 API 配额,需要精确的速率控制
  • 需要任务重试、超时处理机制
  • 对任务执行结果进行收集和汇总

为了解决这些问题,我实现了一个功能完整、灵活可扩展的调度器,具备以下核心能力:

✅ 并发控制(Worker 池)
✅ 速率限制(令牌桶算法)
✅ 可配置重试机制
✅ 超时控制、优先级调度
✅ 可扩展监控与持久化


🔧 核心组件设计

Task 接口:任务抽象

type Task interface {
	ID() string
	Execute(ctx context.Context) (interface{}, error)
}

实现一个基础任务 SimpleTask

type SimpleTask struct {
	id     string
	action func() (interface{}, error)
}

func (t *SimpleTask) ID() string {
	return t.id
}

func (t *SimpleTask) Execute(ctx context.Context) (interface{}, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		return t.action()
	}
}

Scheduler:调度器核心结构

type Scheduler struct {
	workerNum   int
	rateLimiter *rate.Limiter
	taskQueue   chan Task
	resultChan  chan *Result
	errorChan   chan *Error
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
}

结果与错误结构:

type Result struct {
	TaskID   string
	Output   interface{}
	Attempts int
}

type Error struct {
	TaskID   string
	Err      error
	Attempts int
}

启动与停止

func NewScheduler(workerNum, queueSize int) *Scheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &Scheduler{
		workerNum:  workerNum,
		taskQueue:  make(chan Task, queueSize),
		resultChan: make(chan *Result, queueSize),
		errorChan:  make(chan *Error, queueSize),
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (s *Scheduler) SetRateLimit(perSecond int) {
	s.rateLimiter = rate.NewLimiter(rate.Limit(perSecond), perSecond)
}

func (s *Scheduler) Submit(task Task) {
	s.taskQueue <- task
}

func (s *Scheduler) Start() {
	for i := 0; i < s.workerNum; i++ {
		s.wg.Add(1)
		go s.worker()
	}
}

func (s *Scheduler) Stop() {
	s.cancel()
	s.wg.Wait()
	close(s.taskQueue)
	close(s.resultChan)
	close(s.errorChan)
}

Worker 协程实现

func (s *Scheduler) worker() {
	defer s.wg.Done()

	for {
		select {
		case <-s.ctx.Done():
			return
		case task := <-s.taskQueue:
			if s.rateLimiter != nil {
				if err := s.rateLimiter.Wait(s.ctx); err != nil {
					s.errorChan <- &Error{TaskID: task.ID(), Err: err}
					continue
				}
			}

			output, err := task.Execute(s.ctx)
			if err != nil {
				s.errorChan <- &Error{TaskID: task.ID(), Err: err}
			} else {
				s.resultChan <- &Result{TaskID: task.ID(), Output: output}
			}
		}
	}
}

⚙️ 高级功能扩展

任务重试

type WithRetry struct {
	task    Task
	max     int
	backoff time.Duration
}

func (r *WithRetry) ID() string { return r.task.ID() }

func (r *WithRetry) Execute(ctx context.Context) (interface{}, error) {
	var lastErr error
	for i := 0; i < r.max; i++ {
		if i > 0 {
			select {
			case <-time.After(r.backoff):
			case <-ctx.Done():
				return nil, ctx.Err()
			}
		}
		output, err := r.task.Execute(ctx)
		if err == nil {
			return output, nil
		}
		lastErr = err
	}
	return nil, fmt.Errorf("after %d attempts: %w", r.max, lastErr)
}

任务超时控制

type WithTimeout struct {
	task    Task
	timeout time.Duration
}

func (t *WithTimeout) ID() string { return t.task.ID() }

func (t *WithTimeout) Execute(ctx context.Context) (interface{}, error) {
	ctx, cancel := context.WithTimeout(ctx, t.timeout)
	defer cancel()
	return t.task.Execute(ctx)
}

🚀 使用示例

func main() {
	s := NewScheduler(3, 100)
	s.SetRateLimit(5)
	s.Start()

	// 收集结果
	go func() {
		for r := range s.resultChan {
			fmt.Printf("[SUCCESS] Task %s -> %v\n", r.TaskID, r.Output)
		}
	}()
	// 收集错误
	go func() {
		for e := range s.errorChan {
			fmt.Printf("[ERROR] Task %s -> %v\n", e.TaskID, e.Err)
		}
	}()

	for i := 0; i < 20; i++ {
		taskID := fmt.Sprintf("task-%d", i)
		task := &SimpleTask{
			id: taskID,
			action: func() (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("random error")
				}
				return fmt.Sprintf("result of %s", taskID), nil
			},
		}
		retry := &WithRetry{task: task, max: 3, backoff: 200 * time.Millisecond}
		s.Submit(retry)
	}

	time.Sleep(5 * time.Second)
	s.Stop()
}

📈 性能优化建议

  • 动态调整 worker 数量
  • 合并小任务为批处理任务
  • 使用任务亲和性减少上下文切换
  • 接入 Prometheus 实现实时监控
  • 使用 Kafka/Redis 实现任务持久化
  • 使用 etcd 实现分布式任务调度
  • 实现优雅停机处理(SIGTERM 捕获)

✅ 总结

本文实现了一个 Go 编写的轻量级任务调度器,具备高并发控制、速率限制、错误重试等能力,并通过组合模式支持了各种任务扩展。

适用场景包括:

  • HTTP 接口调用调度
  • 后台数据处理任务
  • 爬虫任务调度系统

未来你还可以将其升级为分布式任务调度器,或者对接可视化面板用于企业级任务管理。

推荐文章

快速提升Vue3开发者的效率和界面
2025-05-11 23:37:03 +0800 CST
Vue3 组件间通信的多种方式
2024-11-19 02:57:47 +0800 CST
前端代码规范 - 图片相关
2024-11-19 08:34:48 +0800 CST
前端项目中图片的使用规范
2024-11-19 09:30:04 +0800 CST
支付宝批量转账
2024-11-18 20:26:17 +0800 CST
宝塔面板 Nginx 服务管理命令
2024-11-18 17:26:26 +0800 CST
在 Vue 3 中如何创建和使用插件?
2024-11-18 13:42:12 +0800 CST
浏览器自动播放策略
2024-11-19 08:54:41 +0800 CST
goctl 技术系列 - Go 模板入门
2024-11-19 04:12:13 +0800 CST
10个几乎无人使用的罕见HTML标签
2024-11-18 21:44:46 +0800 CST
PHP 唯一卡号生成
2024-11-18 21:24:12 +0800 CST
html一份退出酒场的告知书
2024-11-18 18:14:45 +0800 CST
html一个包含iPhoneX和MacBook模拟器
2024-11-19 08:03:47 +0800 CST
GROMACS:一个美轮美奂的C++库
2024-11-18 19:43:29 +0800 CST
html流光登陆页面
2024-11-18 15:36:18 +0800 CST
如何开发易支付插件功能
2024-11-19 08:36:25 +0800 CST
程序员茄子在线接单