// Package exegroup 管理1组执行单元的启动和终止; // // 使用举例: // // 新建 [Group]: // // g := New() // // 新建默认 [Group] (内置信号处理): // // g := Default() // // 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出: // // g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { // <-ctx.Done() // return ctx.Err() // }) // // 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出: // // server := &http.Server{Addr: ":80"} // inShutdown := &atomic.Bool{} // c := make(chan error, 1) // goFunc := func(_ context.Context) error { // err := server.ListenAndServe() // if inShutdown.Load() { // err = <-c // } // return err // } // stopFunc := func(ctx context.Context) { // inShutdown.Store(true) // c <- server.Shutdown(ctx) // } // g.New().WithName("server").WithGoStop(goFunc, stopFunc) // // 启动 [Group]: // // g.Run() package exegroup import ( "context" "fmt" "time" ) // Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行; // Group 的执行过程参考 [Group.Run]; // // Group 的配置包含: // - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行; // - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间; type Group struct { actors []*Actor cfg config } // New 创建 [Group]; func New(opts ...Option) *Group { cfg := config{} for _, opt := range opts { opt.apply(&cfg) } return &Group{ cfg: cfg, } } // Default 创建包含信号处理 [Actor] 的 [Group]; func Default(opts ...Option) *Group { g := New(opts...) g.New().WithName("signal").WithGo(HandleSignal()) return g } // New 创建并添加 [Actor], // 对 Actor 的配置通过链式调用完成; func (g *Group) New() *Actor { actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) g.actors = append(g.actors, actor) return actor } // Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成; // 当没有 Actor 时执行 Run 会 panic; // // Run 包含两个阶段: // 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段; // 2. 执行 [Actor] 的终止函数并返回; func (g *Group) Run(ctx context.Context) error { g.validate() ctx, cancel := context.WithCancel(ctx) c := make(chan error, len(g.actors)) g.start(ctx, c) return g.wait(c, cancel) } func (g *Group) validate() { if len(g.actors) == 0 { panic("no actor") } for _, actor := range g.actors { if actor.goFunc == nil { panic(actor.name + "has nil goFunc") } } } func (g *Group) start(ctx context.Context, c chan error) { for _, actor := range g.actors { go func(actor *Actor) { var err error defer func() { if v := recover(); v != nil { err = fmt.Errorf("%v", v) } c <- err }() err = actor.goFunc(ctx) }(actor) } } func (g *Group) wait(c chan error, cancel context.CancelFunc) (err error) { err = <-c cancel() ctx := context.Background() if g.cfg.stopTimeout > 0 { ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout) defer cancel() } for _, m := range g.actors { if m.stopFunc != nil { if g.cfg.concurrentStop { go m.stopFunc(ctx) } else { m.stopFunc(ctx) } } } for i := 1; i < len(g.actors); i++ { select { case <-c: case <-ctx.Done(): return } } return } type config struct { concurrentStop bool stopTimeout time.Duration } // Option 修改 [Group] 的配置; type Option interface { apply(cfg *config) } type optionFunc func(cfg *config) func (fn optionFunc) apply(cfg *config) { fn(cfg) } // WithConcurrentStop 并发执行 [Actor] 的终止函数; func WithConcurrentStop() Option { return optionFunc(func(cfg *config) { cfg.concurrentStop = true }) } // WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间; func WithStopTimeout(d time.Duration) Option { return optionFunc(func(cfg *config) { cfg.stopTimeout = d }) }