Skip to content

linkerlin/GoExecutors

Repository files navigation

GoExecutors

Go Version License Go Report Card

一个高性能、工业级的 Go 语言并发执行器库,灵感来自 Java 的 ExecutorService。提供了线程池管理、任务调度、Future 模式等功能。

🚀 特性

  • 高性能线程池:基于 goroutine 池的高效任务执行
  • 灵活配置:支持自定义线程池大小、队列容量、超时策略等
  • Future 模式:支持异步任务执行和结果获取
  • 错误处理:完善的错误处理和 panic 恢复机制
  • 性能监控:内置性能指标收集和监控
  • 优雅关闭:支持优雅关闭和强制关闭
  • 多种拒绝策略:支持 abort、caller-runs、discard 等拒绝策略
  • 日志系统:内置日志系统,支持多种日志级别
  • 上下文支持:完整的 context.Context 支持,便于取消和超时控制
  • 兼容性:保持与旧版本 API 的兼容性

📦 安装

go get github.com/linkerlin/GoExecutors

🎯 快速开始

基础用法

package main import ( "context" "fmt" "time" "github.com/linkerlin/GoExecutors/config" "github.com/linkerlin/GoExecutors/executors" ) func main() { // 创建配置 cfg := config.DefaultConfig() cfg.CorePoolSize = 4 cfg.MaxPoolSize = 8 cfg.QueueSize = 100 // 创建执行器 executor := executors.NewThreadPoolExecutor(cfg) defer executor.Shutdown() // 提交任务 task := executors.Callable(func(ctx context.Context) (interface{}, error) { fmt.Println("Hello, GoExecutors!") return "任务完成", nil }) future, err := executor.Submit(task) if err != nil { panic(err) } // 获取结果 result, err := future.Get() if err != nil { panic(err) } fmt.Printf("结果: %v\n", result) }

兼容性用法

package main import ( "fmt" "time" "github.com/linkerlin/GoExecutors/executors" ) func main() { // 使用兼容的 API es := executors.NewExecutors() defer es.Stop() // 提交任务 callable := func() (interface{}, error) { return "Hello, World!", nil } future := es.Submit(callable) // 获取结果 ret, timeoutErr, err, exception := future.GetResult(1 * time.Second) if err != nil { fmt.Printf("错误: %v\n", err) } else if timeoutErr != nil { fmt.Printf("超时: %v\n", timeoutErr) } else if exception != nil { fmt.Printf("异常: %v\n", exception) } else { fmt.Printf("结果: %v\n", ret) } }

📚 详细文档

配置选项

cfg := &config.Config{ CorePoolSize: 4, // 核心线程数 MaxPoolSize: 8, // 最大线程数 QueueSize: 100, // 队列大小 KeepAliveTime: 60 * time.Second, // 线程空闲时间 AllowCoreThreadTimeOut: false, // 是否允许核心线程超时 RejectPolicy: "abort", // 拒绝策略 ThreadNamePrefix: "goexecutor", // 线程名称前缀 EnableMetrics: true, // 启用性能监控 MetricsInterval: 10 * time.Second, // 指标收集间隔 EnableLogging: true, // 启用日志 LogLevel: "info", // 日志级别 }

环境变量配置

# 设置环境变量 export GO_EXECUTOR_CORE_POOL_SIZE=8 export GO_EXECUTOR_MAX_POOL_SIZE=16 export GO_EXECUTOR_QUEUE_SIZE=200 export GO_EXECUTOR_KEEP_ALIVE_TIME=30s export GO_EXECUTOR_REJECT_POLICY=discard export GO_EXECUTOR_ENABLE_METRICS=true export GO_EXECUTOR_ENABLE_LOGGING=true export GO_EXECUTOR_LOG_LEVEL=debug

任务类型

1. Callable 函数

task := executors.Callable(func(ctx context.Context) (interface{}, error) { // 执行任务逻辑 return "结果", nil })

2. 自定义 Task

type MyTask struct { Data string } func (t *MyTask) Execute(ctx context.Context) (interface{}, error) { // 执行任务逻辑 return t.Data + " 处理完成", nil } // 使用 task := &MyTask{Data: "测试数据"} future, err := executor.Submit(task)

Future 操作

// 提交任务 future, err := executor.Submit(task) // 阻塞获取结果 result, err := future.Get() // 带超时获取结果 result, err := future.GetWithTimeout(5 * time.Second) // 检查是否完成 if future.IsDone() { fmt.Println("任务已完成") } // 取消任务 future.Cancel()

性能监控

// 启用性能监控 cfg.EnableMetrics = true // 获取性能指标 metrics := executor.GetMetrics() fmt.Printf("提交任务数: %d\n", metrics.TasksSubmitted) fmt.Printf("完成任务数: %d\n", metrics.TasksCompleted) fmt.Printf("失败任务数: %d\n", metrics.TasksFailed) fmt.Printf("活跃线程数: %d\n", metrics.ActiveThreads) fmt.Printf("平均执行时间: %v\n", metrics.AvgExecutionTime()) fmt.Printf("任务吞吐量: %.2f 任务/秒\n", metrics.TaskThroughput()) fmt.Printf("成功率: %.2f%%\n", metrics.SuccessRate()*100)

错误处理

// 1. 正常错误 task := executors.Callable(func(ctx context.Context) (interface{}, error) { return nil, errors.New("业务错误") }) // 2. Panic 恢复 task := executors.Callable(func(ctx context.Context) (interface{}, error) { panic("发生恐慌") // 会被自动恢复 }) // 3. 上下文取消 task := executors.Callable(func(ctx context.Context) (interface{}, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: // 执行任务 return "完成", nil } })

拒绝策略

策略 描述
abort 抛出异常(默认)
caller_runs 在调用者线程中运行
discard 直接丢弃任务

优雅关闭

// 启动优雅关闭 executor.Shutdown() // 等待任务完成(带超时) if executor.AwaitTermination(30 * time.Second) { fmt.Println("执行器已优雅关闭") } else { fmt.Println("关闭超时,强制关闭") executor.ShutdownNow() }

🔧 高级用法

批量任务处理

// 批量提交任务 tasks := []executors.Task{ executors.Callable(func(ctx context.Context) (interface{}, error) { return "任务1", nil }), executors.Callable(func(ctx context.Context) (interface{}, error) { return "任务2", nil }), // ... 更多任务 } futures := make([]*executors.Future, len(tasks)) for i, task := range tasks { future, err := executor.Submit(task) if err != nil { fmt.Printf("提交任务 %d 失败: %v\n", i, err) continue } futures[i] = future } // 等待所有任务完成 for i, future := range futures { if future == nil { continue } result, err := future.Get() if err != nil { fmt.Printf("任务 %d 失败: %v\n", i, err) } else { fmt.Printf("任务 %d 结果: %v\n", i, result) } }

自定义日志

import "github.com/linkerlin/GoExecutors/logger" // 创建自定义日志器 customLogger := logger.NewSimpleLogger("debug") // 设置为全局日志器 logger.SetDefaultLogger(customLogger) // 或者在配置中启用 cfg.EnableLogging = true cfg.LogLevel = "debug"

🧪 测试

# 运行所有测试 go test -v ./... # 运行基准测试 go test -v -bench=. ./... # 运行覆盖率测试 go test -v -coverprofile=coverage.out ./... go tool cover -html=coverage.out

📊 性能基准

在 MacBook Pro (M1, 16GB) 上的基准测试结果:

BenchmarkThreadPoolExecutor_Submit-8 1000000 1203 ns/op BenchmarkThreadPoolExecutor_SubmitLight-8 2000000 856 ns/op BenchmarkThreadPoolExecutor_Concurrent-8 500000 2456 ns/op BenchmarkFuture_Get-8 5000000 234 ns/op BenchmarkFuture_GetWithTimeout-8 3000000 456 ns/op 

🤝 贡献

欢迎提交 Issue 和 Pull Request!

  1. Fork 本仓库
  2. 创建特性分支 (git checkout -b feature/AmazingFeature)
  3. 提交更改 (git commit -m 'Add some AmazingFeature')
  4. 推送到分支 (git push origin feature/AmazingFeature)
  5. 创建 Pull Request

📜 许可证

本项目采用 MIT 许可证。详情请参阅 LICENSE 文件。

🔗 相关链接

🙏 致谢

  • 感谢 Java 的 ExecutorService 提供的设计灵感
  • 感谢 Go 社区的优秀工具和库

如果这个项目对你有帮助,请给个 ⭐️ 支持一下!

About

一个Java ExecutorService风格的Go语言并发库。

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors