diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f32e31a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +.DS_Store diff --git a/exegroup/actor.go b/exegroup/actor.go index 7a0941b..5be7515 100644 --- a/exegroup/actor.go +++ b/exegroup/actor.go @@ -2,32 +2,35 @@ package exegroup import "context" -type ( - GoFunc = func(ctx context.Context) error - StopFunc = func(ctx context.Context) -) - +// Actor 是 [Group] 调度的执行单元; +// +// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出; type Actor struct { - goFunc GoFunc - stopFunc StopFunc + goFunc func(ctx context.Context) error + stopFunc func(ctx context.Context) name string } -func NewActor() *Actor { - return &Actor{} -} - +// WithName 指定 [Actor] 的 name; func (actor *Actor) WithName(name string) *Actor { actor.name = name return actor } -func (actor *Actor) WithGo(goFunc GoFunc) *Actor { +// WithGo 指定 [Actor] 的 goFunc; +// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出; +func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor { actor.goFunc = goFunc return actor } -func (actor *Actor) WithGoStop(goFunc GoFunc, stopFunc StopFunc) *Actor { +// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc; +// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出; +// 1. goFunc 被 [Group] 在 goroutine 中启动; +// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用; +// +// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出; +func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor { actor.goFunc = goFunc actor.stopFunc = stopFunc return actor diff --git a/exegroup/group.go b/exegroup/group.go index c58c697..1a5ef3f 100644 --- a/exegroup/group.go +++ b/exegroup/group.go @@ -1,3 +1,43 @@ +// Package exegroup 管理1组执行单元的启动和终止; +// +// 使用举例: +// +// 新建 [Group]: +// +// g := New() +// +// 新建默认 [Group] (内置信号处理): +// +// g := Default() +// +// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出: +// +// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { +// <-ctx.Done() +// return ctx.Err() +// }) +// +// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出: +// +// server := &http.Server{Addr: ":80"} +// inShutdown := &atomic.Bool{} +// c := make(chan error, 1) +// goFunc := func(_ context.Context) error { +// err := server.ListenAndServe() +// if inShutdown.Load() { +// err = <-c +// } +// return err +// } +// stopFunc := func(ctx context.Context) { +// inShutdown.Store(true) +// c <- server.Shutdown(ctx) +// } +// g.New().WithName("server").WithGoStop(goFunc, stopFunc) +// +// 启动 [Group]: +// +// g.Run() package exegroup import ( @@ -6,11 +46,18 @@ import ( "time" ) +// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行; +// Group 的执行过程参考 [Group.Run]; +// +// Group 的配置包含: +// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行; +// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间; type Group struct { actors []*Actor cfg config } +// New 创建 [Group]; func New(opts ...Option) *Group { cfg := config{} for _, opt := range opts { @@ -21,18 +68,27 @@ func New(opts ...Option) *Group { } } +// Default 创建包含信号处理 [Actor] 的 [Group]; func Default(opts ...Option) *Group { g := New(opts...) g.New().WithName("signal").WithGo(HandleSignal()) return g } +// New 创建并添加 [Actor], +// 对 Actor 的配置通过链式调用完成; func (g *Group) New() *Actor { - actor := NewActor().WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) + actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) g.actors = append(g.actors, actor) return actor } +// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成; +// 当没有 Actor 时执行 Run 会 panic; +// +// Run 包含两个阶段: +// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段; +// 2. 执行 [Actor] 的终止函数并返回; func (g *Group) Run(ctx context.Context) error { g.validate() ctx, cancel := context.WithCancel(ctx) @@ -99,6 +155,7 @@ type config struct { stopTimeout time.Duration } +// Option 修改 [Group] 的配置; type Option interface { apply(cfg *config) } @@ -109,12 +166,14 @@ func (fn optionFunc) apply(cfg *config) { fn(cfg) } +// WithConcurrentStop 并发执行 [Actor] 的终止函数; func WithConcurrentStop() Option { return optionFunc(func(cfg *config) { cfg.concurrentStop = true }) } +// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间; func WithStopTimeout(d time.Duration) Option { return optionFunc(func(cfg *config) { cfg.stopTimeout = d diff --git a/exegroup/group_test.go b/exegroup/group_test.go new file mode 100644 index 0000000..c99b45b --- /dev/null +++ b/exegroup/group_test.go @@ -0,0 +1,16 @@ +package exegroup_test + +import ( + "context" + "git.blauwelle.com/go/crate/exegroup" + "log" +) + +func Example_defaultGroup() { + g := exegroup.Default() + g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }) + log.Println("exit:", g.Run(context.Background())) +} diff --git a/exegroup/httpserver.go b/exegroup/httpserver.go new file mode 100644 index 0000000..72f6c4a --- /dev/null +++ b/exegroup/httpserver.go @@ -0,0 +1,27 @@ +package exegroup + +import ( + "context" + "fmt" + "net/http" + "sync/atomic" +) + +// HttpListenAndServe 提供 [http.Server] 的启动和停止函数; +func HttpListenAndServe(port int, handler http.Handler) (func(ctx context.Context) error, func(ctx context.Context)) { + server := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: handler} + inShutdown := &atomic.Bool{} + c := make(chan error, 1) + goFunc := func(_ context.Context) error { + err := server.ListenAndServe() + if inShutdown.Load() { + err = <-c + } + return err + } + stopFunc := func(ctx context.Context) { + inShutdown.Store(true) + c <- server.Shutdown(ctx) + } + return goFunc, stopFunc +} diff --git a/exegroup/signal.go b/exegroup/signal.go index 90518e0..a2040d6 100644 --- a/exegroup/signal.go +++ b/exegroup/signal.go @@ -8,7 +8,8 @@ import ( "syscall" ) -func HandleSignal(signals ...os.Signal) GoFunc { +// HandleSignal 提供信号处理函数; +func HandleSignal(signals ...os.Signal) func(ctx context.Context) error { c := make(chan os.Signal, 1) if len(signals) == 0 { signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}