You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
crate/exegroup/group.go

182 lines
3.9 KiB
Go

// Package exegroup 管理1组执行单元的启动和终止;
//
// 使用举例:
//
// 新建 [Group]:
//
// g := New()
//
// 新建默认 [Group] (内置信号处理):
//
// g := Default()
//
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
//
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
// <-ctx.Done()
// return ctx.Err()
// })
//
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
//
// server := &http.Server{Addr: ":80"}
// inShutdown := &atomic.Bool{}
// c := make(chan error, 1)
// goFunc := func(_ context.Context) error {
// err := server.ListenAndServe()
// if inShutdown.Load() {
// err = <-c
// }
// return err
// }
// stopFunc := func(ctx context.Context) {
// inShutdown.Store(true)
// c <- server.Shutdown(ctx)
// }
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
//
// 启动 [Group]:
//
// g.Run()
package exegroup
import (
"context"
"fmt"
"time"
)
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
// Group 的执行过程参考 [Group.Run];
//
// Group 的配置包含:
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
type Group struct {
actors []*Actor
cfg config
}
// New 创建 [Group];
func New(opts ...Option) *Group {
cfg := config{}
for _, opt := range opts {
opt.apply(&cfg)
}
return &Group{
cfg: cfg,
}
}
// Default 创建包含信号处理 [Actor] 的 [Group];
func Default(opts ...Option) *Group {
g := New(opts...)
g.New().WithName("signal").WithGo(HandleSignal())
return g
}
// New 创建并添加 [Actor],
// 对 Actor 的配置通过链式调用完成;
func (g *Group) New() *Actor {
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
g.actors = append(g.actors, actor)
return actor
}
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成;
// 当没有 Actor 时执行 Run 会 panic;
//
// Run 包含两个阶段:
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段;
// 2. 执行 [Actor] 的终止函数并返回;
func (g *Group) Run(ctx context.Context) error {
g.validate()
ctx, cancel := context.WithCancel(ctx)
c := make(chan error, len(g.actors))
g.start(ctx, c)
return g.wait(c, cancel)
}
func (g *Group) validate() {
if len(g.actors) == 0 {
panic("no actor")
}
for _, actor := range g.actors {
if actor.goFunc == nil {
panic(actor.name + "has nil goFunc")
}
}
}
func (g *Group) start(ctx context.Context, c chan error) {
for _, actor := range g.actors {
go func(actor *Actor) {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("%v", v)
}
c <- err
}()
err = actor.goFunc(ctx)
}(actor)
}
}
func (g *Group) wait(c chan error, cancel context.CancelFunc) (err error) {
err = <-c
cancel()
ctx := context.Background()
if g.cfg.stopTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
defer cancel()
}
for _, m := range g.actors {
if m.stopFunc != nil {
if g.cfg.concurrentStop {
go m.stopFunc(ctx)
} else {
m.stopFunc(ctx)
}
}
}
for i := 1; i < len(g.actors); i++ {
select {
case <-c:
case <-ctx.Done():
return
}
}
return
}
type config struct {
concurrentStop bool
stopTimeout time.Duration
}
// Option 修改 [Group] 的配置;
type Option interface {
apply(cfg *config)
}
type optionFunc func(cfg *config)
func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
// WithConcurrentStop 并发执行 [Actor] 的终止函数;
func WithConcurrentStop() Option {
return optionFunc(func(cfg *config) {
cfg.concurrentStop = true
})
}
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.stopTimeout = d
})
}