// Package exegroup 按照顺序启动和终止任务; // // Examples: // // 新建 [Group]: // // g := New() // // 新建默认 [Group] (内置信号处理): // // g := Default() // // 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出: // // g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error { // <-ctx.Done() // return ctx.Err() // }) // // 启动 [Group]: // // g.Run(ctx) package exegroup import ( "context" "errors" "fmt" "time" "git.blauwelle.com/go/crate/log" ) // Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行; // Group 的执行过程参考 [Group.Run]; // // Group 的配置: // - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间; type Group struct { errorsChan chan actorError actors []*Actor syncActors []*Actor asyncActors []*Actor cfg config } type actorError struct { actor *Actor err error } // New 创建 [Group]; func New(opts ...Option) *Group { cfg := config{ stopTimeout: -1, } for _, opt := range opts { opt.apply(&cfg) } return &Group{ cfg: cfg, } } // Default 创建包含信号处理 [Actor] 的 [Group]; func Default(opts ...Option) *Group { g := New(opts...) g.NewAsync().WithName("signal").WithStartFunc(HandleSignal()) return g } func (g *Group) newActor() *Actor { actor := &Actor{ group: g, closeAfterStop: make(chan struct{}), closeAfterTimeout: make(chan struct{}), name: fmt.Sprintf("actor-%d", len(g.actors)), stopTimeout: -1, } actor.startCtx, actor.startCancel = context.WithCancel(context.Background()) g.actors = append(g.actors, actor) return actor } // NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成; func (g *Group) NewAsync() *Actor { actor := g.newActor() g.asyncActors = append(g.asyncActors, actor) return actor } // NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成; func (g *Group) NewSync() *Actor { actor := g.newActor() g.syncActors = append(g.syncActors, actor) return actor } // Run 启动所有 [Actor] 并等待 [Actor] 执行完成; // 当没有 Actor 时执行 Run 会 panic; // // Run 包含两个阶段: // 1. 启动所有 [Actor]; // 2. 等待 ctx 或第1个Actor返回, 终止所有Actor. func (g *Group) Run(ctx context.Context) error { if err := g.validate(ctx); err != nil { return err } g.errorsChan = make(chan actorError, len(g.actors)) g.start(ctx) return g.stop(ctx, g.wait(ctx)) } func (g *Group) validate(ctx context.Context) error { if len(g.actors) == 0 { err := errors.New("no actor") log.Error(ctx, err.Error()) return err } for i, actor := range g.actors { if actor.startFunc == nil { err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name) log.Error(ctx, err.Error()) return err } } return nil } // start 启动所有的 [Actor] 后返回. func (g *Group) start(ctx context.Context) { for _, actor := range g.actors { go actor.start() log.Tracef(ctx, "group is starting group-actor %s", actor.name) } } // wait 等待 ctx 或 [Actor] 返回 func (g *Group) wait(ctx context.Context) error { log.Tracef(ctx, "group is waiting") select { case ae := <-g.errorsChan: log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name) if ae.err != nil { return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err) } return nil case <-ctx.Done(): log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error()) return ctx.Err() } } // stop 等待 [Actor] 停止 func (g *Group) stop(ctx context.Context, err error) error { stopCtx := context.Background() if g.cfg.stopTimeout >= 0 { log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout) var cancel func() stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout) defer cancel() } go func() { // 终止异步 [Actor] for _, actor := range g.asyncActors { log.Tracef(ctx, "group is stopping group-actor %s", actor.name) actor.sendStopSignal() } // 终止同步 [Actor] 并等待 [Actor] 退出 for i := len(g.syncActors) - 1; i >= 0; i-- { actor := g.syncActors[i] log.Tracef(ctx, "group is stopping group-actor %s", actor.name) actor.sendStopSignal() actor.wait() } }() // 等待所有 [Actor] 退出, 超时立即退出 for _, actor := range g.actors { select { case <-actor.closeAfterStop: log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name) case <-actor.closeAfterTimeout: log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name) case <-stopCtx.Done(): if err == nil { err = stopCtx.Err() } else { err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err) } log.Error(ctx, err.Error()) return err } } log.Tracef(ctx, "group return: %v", err) return err } type config struct { stopTimeout time.Duration // <0表示没有超时时间 } // Option 修改 [Group] 的配置; type Option interface { apply(cfg *config) } type optionFunc func(cfg *config) func (fn optionFunc) apply(cfg *config) { fn(cfg) } // WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间. // <0表示没有超时时间. func WithStopTimeout(d time.Duration) Option { return optionFunc(func(cfg *config) { cfg.stopTimeout = d }) }