From a844e9e34c3a2ab77eb3bba8114f33caae27993a Mon Sep 17 00:00:00 2001 From: Ge Song Date: Wed, 17 Jan 2024 16:17:51 +0800 Subject: [PATCH] exegroup/rewrite control logic, make sync actor stop serial; --- exegroup/README.md | 9 ++ exegroup/actor.go | 117 ++++++++++++++++++---- exegroup/eghttp/actor.go | 55 +++++++---- exegroup/go.mod | 2 + exegroup/go.sum | 4 + exegroup/group.go | 205 +++++++++++++++++++++++---------------- exegroup/group_test.go | 2 +- exegroup/signal.go | 7 +- 8 files changed, 279 insertions(+), 122 deletions(-) create mode 100644 exegroup/README.md create mode 100644 exegroup/go.sum diff --git a/exegroup/README.md b/exegroup/README.md new file mode 100644 index 0000000..d20c1d1 --- /dev/null +++ b/exegroup/README.md @@ -0,0 +1,9 @@ +# exegroup 按照顺序启动和终止任务 + +参考[group.go](./group.go)中的使用说明; + +### `Group` / `Actor` / `Actor.startFunc` 超时时间的约定 + +- `>= 0` 的超时时间被用来初始化定时器, 到时间时触发超时; +- `< 0` 的超时时间表示永不超时; +- 所有超时时间初始化为负数(-1); diff --git a/exegroup/actor.go b/exegroup/actor.go index 5be7515..52b5e2c 100644 --- a/exegroup/actor.go +++ b/exegroup/actor.go @@ -1,14 +1,93 @@ package exegroup -import "context" +import ( + "context" + "fmt" + "time" + + "git.blauwelle.com/go/crate/log" +) // Actor 是 [Group] 调度的执行单元; -// -// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出; +// startCtx, startCancel 需要在构造 Actor 时初始化, 在后续 start / wait 初始化会导致 data race. type Actor struct { - goFunc func(ctx context.Context) error - stopFunc func(ctx context.Context) - name string + startFunc func(ctx context.Context) error + group *Group + closeAfterStop chan struct{} + closeAfterTimeout chan struct{} + startCtx context.Context //nolint:containedctx + startCancel func() + name string + stopTimeout time.Duration // <0表示没有超时时间 + isFastStop bool +} + +func (actor *Actor) wrapFastStop() { + if !actor.isFastStop { + return + } + startFunc := actor.startFunc + actor.startFunc = func(ctx context.Context) error { + errChan := make(chan error, 1) + go func() { + var err error + defer func() { + if v := recover(); v != nil { + err = fmt.Errorf("recover: %s", v) + } + errChan <- err + }() + err = startFunc(ctx) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errChan: + return err + } + } +} + +// start 同步启动 [Actor], [Actor].startFunc panic 或返回的 error 会被写入 [Actor].errorChan. +func (actor *Actor) start() { + actor.wrapFastStop() + var err error + defer func() { + if v := recover(); v != nil { + err = fmt.Errorf("recover: %s", v) + } + actor.group.errorsChan <- actorError{ + actor: actor, + err: err, + } + log.Tracef(context.Background(), "group-actor %s return", actor.name) + close(actor.closeAfterStop) + }() + err = actor.startFunc(actor.startCtx) +} + +// sendStopSignal 通知 [Actor] 退出. +func (actor *Actor) sendStopSignal() { + actor.startCancel() + if actor.stopTimeout >= 0 { + go func() { + timer := time.NewTimer(actor.stopTimeout) + select { + case <-actor.closeAfterStop: + timer.Stop() + case <-timer.C: + close(actor.closeAfterTimeout) + } + }() + } +} + +// wait 等待 [Actor] 停止或超时. +func (actor *Actor) wait() { + select { + case <-actor.closeAfterTimeout: + case <-actor.closeAfterStop: + } } // WithName 指定 [Actor] 的 name; @@ -17,21 +96,21 @@ func (actor *Actor) WithName(name string) *Actor { return actor } -// WithGo 指定 [Actor] 的 goFunc; -// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出; -func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor { - actor.goFunc = goFunc +// WithStartFunc 指定 [Actor] 的 startFunc; +// 通过 WithStartFunc 注册的 startFunc 函数应该受 ctx 控制退出; +func (actor *Actor) WithStartFunc(startFunc func(ctx context.Context) error) *Actor { + actor.startFunc = startFunc + return actor +} + +// WithFastStop 设置 [Actor] 接收停止信号后立即终止. +func (actor *Actor) WithFastStop(fastStop bool) *Actor { + actor.isFastStop = fastStop return actor } -// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc; -// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出; -// 1. goFunc 被 [Group] 在 goroutine 中启动; -// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用; -// -// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出; -func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor { - actor.goFunc = goFunc - actor.stopFunc = stopFunc +// WithStopTimeout 指定停止 [Actor] 的最长等待时间, 设置 <0 没有最长等待时间. +func (actor *Actor) WithStopTimeout(duration time.Duration) *Actor { + actor.stopTimeout = duration return actor } diff --git a/exegroup/eghttp/actor.go b/exegroup/eghttp/actor.go index ead2ee2..f4d385f 100644 --- a/exegroup/eghttp/actor.go +++ b/exegroup/eghttp/actor.go @@ -2,11 +2,13 @@ package eghttp import ( "context" + "errors" "net" "net/http" "strconv" - "sync/atomic" "time" + + "git.blauwelle.com/go/crate/log" ) const ( @@ -30,6 +32,7 @@ func WithHandler(handler http.Handler) Option { }) } +// WithServer 指定要启动的服务器. 注意会替换原有的服务器, 导致早前配置服务器相关的 [Option] 失效. func WithServer(server *http.Server) Option { return optionFunc(func(cfg *config) { cfg.server = server @@ -49,9 +52,17 @@ func WithServerOption(fn func(server *http.Server)) Option { }) } +// WithShutdownTimeout 设置关闭服务的超时时间. <0 没有最长等待时间. +func WithShutdownTimeout(timeout time.Duration) Option { + return optionFunc(func(cfg *config) { + cfg.shutdownTimeout = timeout + }) +} + type config struct { - server *http.Server - startFn func(server *http.Server) error + server *http.Server + startFn func(server *http.Server) error + shutdownTimeout time.Duration } func newDefaultConfig() *config { @@ -63,6 +74,7 @@ func newDefaultConfig() *config { startFn: func(server *http.Server) error { return server.ListenAndServe() }, + shutdownTimeout: -1, } } @@ -72,24 +84,31 @@ func (fn optionFunc) apply(cfg *config) { fn(cfg) } -// HTTPListenAndServe 创建 [http.Server] 并提供启动和停止函数; -func HTTPListenAndServe(opts ...Option) (func(ctx context.Context) error, func(ctx context.Context)) { +// ListenAndServe 构造1个函数, 启动 [http.Server] 并受 ctx 控制终止. +func ListenAndServe(opts ...Option) func(ctx context.Context) error { cfg := newDefaultConfig() for _, opt := range opts { opt.apply(cfg) } - inShutdown := &atomic.Bool{} - c := make(chan error, 1) - goFunc := func(_ context.Context) error { - err := cfg.startFn(cfg.server) - if inShutdown.Load() { - err = <-c - } - return err - } - stopFunc := func(ctx context.Context) { - inShutdown.Store(true) - c <- cfg.server.Shutdown(ctx) + errChan := make(chan error, 2) //nolint:gomnd + return func(ctx context.Context) error { + go func() { + err := cfg.startFn(cfg.server) + log.Tracef(ctx, "server return with %s", err) + if !errors.Is(err, http.ErrServerClosed) { + errChan <- err + } + }() + go func() { + <-ctx.Done() + shutdownCtx := context.Background() + if cfg.shutdownTimeout >= 0 { + var cancel func() + shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.shutdownTimeout) + defer cancel() + } + errChan <- cfg.server.Shutdown(shutdownCtx) + }() + return <-errChan } - return goFunc, stopFunc } diff --git a/exegroup/go.mod b/exegroup/go.mod index 08ee572..9758e8e 100644 --- a/exegroup/go.mod +++ b/exegroup/go.mod @@ -1,3 +1,5 @@ module git.blauwelle.com/go/crate/exegroup go 1.20 + +require git.blauwelle.com/go/crate/log v0.15.0 diff --git a/exegroup/go.sum b/exegroup/go.sum new file mode 100644 index 0000000..cb3c0f1 --- /dev/null +++ b/exegroup/go.sum @@ -0,0 +1,4 @@ +git.blauwelle.com/go/crate/log v0.14.0 h1:y7hJXP+ZPY/wD+wlEzKgakpki8/l0LwZWqxtJ92Wy58= +git.blauwelle.com/go/crate/log v0.14.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0= +git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80= +git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0= diff --git a/exegroup/group.go b/exegroup/group.go index eee1f61..b9c7dcd 100644 --- a/exegroup/group.go +++ b/exegroup/group.go @@ -1,6 +1,6 @@ -// Package exegroup 管理1组执行单元的启动和终止; +// Package exegroup 按照顺序启动和终止任务; // -// 使用举例: +// Examples: // // 新建 [Group]: // @@ -12,54 +12,48 @@ // // 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出: // -// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { +// g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error { // <-ctx.Done() // return ctx.Err() // }) // -// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出: -// -// server := &http.Server{Addr: ":80"} -// inShutdown := &atomic.Bool{} -// c := make(chan error, 1) -// goFunc := func(_ context.Context) error { -// err := server.ListenAndServe() -// if inShutdown.Load() { -// err = <-c -// } -// return err -// } -// stopFunc := func(ctx context.Context) { -// inShutdown.Store(true) -// c <- server.Shutdown(ctx) -// } -// g.New().WithName("server").WithGoStop(goFunc, stopFunc) -// // 启动 [Group]: // -// g.Run() +// g.Run(ctx) package exegroup import ( "context" + "errors" "fmt" "time" + + "git.blauwelle.com/go/crate/log" ) // Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行; // Group 的执行过程参考 [Group.Run]; // -// Group 的配置包含: -// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行; +// Group 的配置: // - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间; type Group struct { - actors []*Actor - cfg config + errorsChan chan actorError + actors []*Actor + syncActors []*Actor + asyncActors []*Actor + cfg config +} + +type actorError struct { + actor *Actor + err error } // New 创建 [Group]; func New(opts ...Option) *Group { - cfg := config{} + cfg := config{ + stopTimeout: -1, + } for _, opt := range opts { opt.apply(&cfg) } @@ -71,88 +65,139 @@ func New(opts ...Option) *Group { // Default 创建包含信号处理 [Actor] 的 [Group]; func Default(opts ...Option) *Group { g := New(opts...) - g.New().WithName("signal").WithGo(HandleSignal()) + g.NewAsync().WithName("signal").WithStartFunc(HandleSignal()) return g } -// New 创建并添加 [Actor], -// 对 Actor 的配置通过链式调用完成; -func (g *Group) New() *Actor { - actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) +func (g *Group) newActor() *Actor { + actor := &Actor{ + group: g, + closeAfterStop: make(chan struct{}), + closeAfterTimeout: make(chan struct{}), + name: fmt.Sprintf("actor-%d", len(g.actors)), + stopTimeout: -1, + } + actor.startCtx, actor.startCancel = context.WithCancel(context.Background()) g.actors = append(g.actors, actor) return actor } -// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成; +// NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成; +func (g *Group) NewAsync() *Actor { + actor := g.newActor() + g.asyncActors = append(g.asyncActors, actor) + return actor +} + +// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成; +func (g *Group) NewSync() *Actor { + actor := g.newActor() + g.syncActors = append(g.syncActors, actor) + return actor +} + +// Run 启动所有 [Actor] 并等待 [Actor] 执行完成; // 当没有 Actor 时执行 Run 会 panic; // // Run 包含两个阶段: -// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段; -// 2. 执行 [Actor] 的终止函数并返回; +// 1. 启动所有 [Actor]; +// 2. 等待 ctx 或第1个Actor返回, 终止所有Actor. func (g *Group) Run(ctx context.Context) error { - g.validate() - ctx, cancel := context.WithCancel(ctx) - c := make(chan error, len(g.actors)) - g.start(ctx, c) - return g.wait(c, cancel) + if err := g.validate(ctx); err != nil { + return err + } + g.errorsChan = make(chan actorError, len(g.actors)) + g.start(ctx) + + return g.stop(ctx, g.wait(ctx)) } -func (g *Group) validate() { +func (g *Group) validate(ctx context.Context) error { if len(g.actors) == 0 { - panic("no actor") + err := errors.New("no actor") + log.Error(ctx, err.Error()) + return err } - for _, actor := range g.actors { - if actor.goFunc == nil { - panic(actor.name + " has nil goFunc") + for i, actor := range g.actors { + if actor.startFunc == nil { + err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name) + log.Error(ctx, err.Error()) + return err } } + return nil } -func (g *Group) start(ctx context.Context, c chan error) { +// start 启动所有的 [Actor] 后返回. +func (g *Group) start(ctx context.Context) { for _, actor := range g.actors { - go func(actor *Actor) { - var err error - defer func() { - if v := recover(); v != nil { - err = fmt.Errorf("%v", v) - } - c <- err - }() - err = actor.goFunc(ctx) - }(actor) + go actor.start() + log.Tracef(ctx, "group is starting group-actor %s", actor.name) + } +} + +// wait 等待 ctx 或 [Actor] 返回 +func (g *Group) wait(ctx context.Context) error { + log.Tracef(ctx, "group is waiting") + select { + case ae := <-g.errorsChan: + log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name) + if ae.err != nil { + return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err) + } + return nil + case <-ctx.Done(): + log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error()) + return ctx.Err() } } -func (g *Group) wait(c chan error, cancel context.CancelFunc) error { - err := <-c - cancel() - ctx := context.Background() - if g.cfg.stopTimeout > 0 { - ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout) +// stop 等待 [Actor] 停止 +func (g *Group) stop(ctx context.Context, err error) error { + stopCtx := context.Background() + if g.cfg.stopTimeout >= 0 { + log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout) + var cancel func() + stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout) defer cancel() } - for i := len(g.actors) - 1; i >= 0; i-- { - if g.actors[i].stopFunc != nil { - if g.cfg.concurrentStop { - go g.actors[i].stopFunc(ctx) - } else { - g.actors[i].stopFunc(ctx) - } + go func() { + // 终止异步 [Actor] + for _, actor := range g.asyncActors { + log.Tracef(ctx, "group is stopping group-actor %s", actor.name) + actor.sendStopSignal() } - } - for i := 1; i < len(g.actors); i++ { + // 终止同步 [Actor] 并等待 [Actor] 退出 + for i := len(g.syncActors) - 1; i >= 0; i-- { + actor := g.syncActors[i] + log.Tracef(ctx, "group is stopping group-actor %s", actor.name) + actor.sendStopSignal() + actor.wait() + } + }() + // 等待所有 [Actor] 退出, 超时立即退出 + for _, actor := range g.actors { select { - case <-c: - case <-ctx.Done(): + case <-actor.closeAfterStop: + log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name) + case <-actor.closeAfterTimeout: + log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name) + case <-stopCtx.Done(): + if err == nil { + err = stopCtx.Err() + } else { + err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err) + } + log.Error(ctx, err.Error()) return err } } + log.Tracef(ctx, "group return: %v", err) return err } type config struct { - concurrentStop bool - stopTimeout time.Duration + stopTimeout time.Duration // <0表示没有超时时间 } // Option 修改 [Group] 的配置; @@ -166,14 +211,8 @@ func (fn optionFunc) apply(cfg *config) { fn(cfg) } -// WithConcurrentStop 并发执行 [Actor] 的终止函数; -func WithConcurrentStop() Option { - return optionFunc(func(cfg *config) { - cfg.concurrentStop = true - }) -} - -// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间; +// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间. +// <0表示没有超时时间. func WithStopTimeout(d time.Duration) Option { return optionFunc(func(cfg *config) { cfg.stopTimeout = d diff --git a/exegroup/group_test.go b/exegroup/group_test.go index 14b3ab9..b6a68b8 100644 --- a/exegroup/group_test.go +++ b/exegroup/group_test.go @@ -9,7 +9,7 @@ import ( func Example_defaultGroup() { g := exegroup.Default() - g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { + g.NewSync().WithName("do nothing").WithStartFunc(func(ctx context.Context) error { <-ctx.Done() return ctx.Err() }) diff --git a/exegroup/signal.go b/exegroup/signal.go index a2040d6..d5df478 100644 --- a/exegroup/signal.go +++ b/exegroup/signal.go @@ -6,6 +6,8 @@ import ( "os" "os/signal" "syscall" + + "git.blauwelle.com/go/crate/log" ) // HandleSignal 提供信号处理函数; @@ -16,10 +18,13 @@ func HandleSignal(signals ...os.Signal) func(ctx context.Context) error { } signal.Notify(c, signals...) return func(ctx context.Context) error { + log.Tracef(ctx, "notify signal %v", signals) defer signal.Stop(c) select { case sig := <-c: - return fmt.Errorf("signal %s", sig) + err := fmt.Errorf("signal received: %s", sig) + log.Trace(ctx, err.Error()) + return err case <-ctx.Done(): return ctx.Err() }