用 Go 实现一个轻量级并发任务调度器(支持限速)
在学习 Go 的过程中,我尝试实现了一个轻量级的并发任务调度器,并支持速率限制、任务重试、优先级控制等功能。本文记录这一实现过程,也希望能为其他 Go 学习者提供参考。
🧭 为什么需要任务调度器
在微服务或分布式系统中,我们经常遇到以下问题:
- 系统需要处理成千上万的异步任务(如 HTTP 请求、爬虫抓取等)
- 控制并发量,避免下游服务压力过大
- 遵守 API 配额,需要精确的速率控制
- 需要任务重试、超时处理机制
- 对任务执行结果进行收集和汇总
为了解决这些问题,我实现了一个功能完整、灵活可扩展的调度器,具备以下核心能力:
✅ 并发控制(Worker 池)
✅ 速率限制(令牌桶算法)
✅ 可配置重试机制
✅ 超时控制、优先级调度
✅ 可扩展监控与持久化
🔧 核心组件设计
Task 接口:任务抽象
type Task interface {
ID() string
Execute(ctx context.Context) (interface{}, error)
}
实现一个基础任务 SimpleTask
:
type SimpleTask struct {
id string
action func() (interface{}, error)
}
func (t *SimpleTask) ID() string {
return t.id
}
func (t *SimpleTask) Execute(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return t.action()
}
}
Scheduler:调度器核心结构
type Scheduler struct {
workerNum int
rateLimiter *rate.Limiter
taskQueue chan Task
resultChan chan *Result
errorChan chan *Error
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
结果与错误结构:
type Result struct {
TaskID string
Output interface{}
Attempts int
}
type Error struct {
TaskID string
Err error
Attempts int
}
启动与停止
func NewScheduler(workerNum, queueSize int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
workerNum: workerNum,
taskQueue: make(chan Task, queueSize),
resultChan: make(chan *Result, queueSize),
errorChan: make(chan *Error, queueSize),
ctx: ctx,
cancel: cancel,
}
}
func (s *Scheduler) SetRateLimit(perSecond int) {
s.rateLimiter = rate.NewLimiter(rate.Limit(perSecond), perSecond)
}
func (s *Scheduler) Submit(task Task) {
s.taskQueue <- task
}
func (s *Scheduler) Start() {
for i := 0; i < s.workerNum; i++ {
s.wg.Add(1)
go s.worker()
}
}
func (s *Scheduler) Stop() {
s.cancel()
s.wg.Wait()
close(s.taskQueue)
close(s.resultChan)
close(s.errorChan)
}
Worker 协程实现
func (s *Scheduler) worker() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case task := <-s.taskQueue:
if s.rateLimiter != nil {
if err := s.rateLimiter.Wait(s.ctx); err != nil {
s.errorChan <- &Error{TaskID: task.ID(), Err: err}
continue
}
}
output, err := task.Execute(s.ctx)
if err != nil {
s.errorChan <- &Error{TaskID: task.ID(), Err: err}
} else {
s.resultChan <- &Result{TaskID: task.ID(), Output: output}
}
}
}
}
⚙️ 高级功能扩展
任务重试
type WithRetry struct {
task Task
max int
backoff time.Duration
}
func (r *WithRetry) ID() string { return r.task.ID() }
func (r *WithRetry) Execute(ctx context.Context) (interface{}, error) {
var lastErr error
for i := 0; i < r.max; i++ {
if i > 0 {
select {
case <-time.After(r.backoff):
case <-ctx.Done():
return nil, ctx.Err()
}
}
output, err := r.task.Execute(ctx)
if err == nil {
return output, nil
}
lastErr = err
}
return nil, fmt.Errorf("after %d attempts: %w", r.max, lastErr)
}
任务超时控制
type WithTimeout struct {
task Task
timeout time.Duration
}
func (t *WithTimeout) ID() string { return t.task.ID() }
func (t *WithTimeout) Execute(ctx context.Context) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
return t.task.Execute(ctx)
}
🚀 使用示例
func main() {
s := NewScheduler(3, 100)
s.SetRateLimit(5)
s.Start()
// 收集结果
go func() {
for r := range s.resultChan {
fmt.Printf("[SUCCESS] Task %s -> %v\n", r.TaskID, r.Output)
}
}()
// 收集错误
go func() {
for e := range s.errorChan {
fmt.Printf("[ERROR] Task %s -> %v\n", e.TaskID, e.Err)
}
}()
for i := 0; i < 20; i++ {
taskID := fmt.Sprintf("task-%d", i)
task := &SimpleTask{
id: taskID,
action: func() (interface{}, error) {
time.Sleep(100 * time.Millisecond)
if rand.Intn(10) == 0 {
return nil, fmt.Errorf("random error")
}
return fmt.Sprintf("result of %s", taskID), nil
},
}
retry := &WithRetry{task: task, max: 3, backoff: 200 * time.Millisecond}
s.Submit(retry)
}
time.Sleep(5 * time.Second)
s.Stop()
}
📈 性能优化建议
- 动态调整 worker 数量
- 合并小任务为批处理任务
- 使用任务亲和性减少上下文切换
- 接入 Prometheus 实现实时监控
- 使用 Kafka/Redis 实现任务持久化
- 使用 etcd 实现分布式任务调度
- 实现优雅停机处理(SIGTERM 捕获)
✅ 总结
本文实现了一个 Go 编写的轻量级任务调度器,具备高并发控制、速率限制、错误重试等能力,并通过组合模式支持了各种任务扩展。
适用场景包括:
- HTTP 接口调用调度
- 后台数据处理任务
- 爬虫任务调度系统
未来你还可以将其升级为分布式任务调度器,或者对接可视化面板用于企业级任务管理。