You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
crate/exegroup/group.go

221 lines
5.3 KiB
Go

// Package exegroup 按照顺序启动和终止任务;
//
// Examples:
//
// 新建 [Group]:
//
// g := New()
//
// 新建默认 [Group] (内置信号处理):
//
// g := Default()
//
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
//
// g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
// <-ctx.Done()
// return ctx.Err()
// })
//
// 启动 [Group]:
//
// g.Run(ctx)
package exegroup
import (
"context"
"errors"
"fmt"
"time"
"git.blauwelle.com/go/crate/log"
)
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
// Group 的执行过程参考 [Group.Run];
//
// Group 的配置:
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
type Group struct {
errorsChan chan actorError
actors []*Actor
syncActors []*Actor
asyncActors []*Actor
cfg config
}
type actorError struct {
actor *Actor
err error
}
// New 创建 [Group];
func New(opts ...Option) *Group {
cfg := config{
stopTimeout: -1,
}
for _, opt := range opts {
opt.apply(&cfg)
}
return &Group{
cfg: cfg,
}
}
// Default 创建包含信号处理 [Actor] 的 [Group];
func Default(opts ...Option) *Group {
g := New(opts...)
g.NewAsync().WithName("signal").WithStartFunc(HandleSignal())
return g
}
func (g *Group) newActor() *Actor {
actor := &Actor{
group: g,
closeAfterStop: make(chan struct{}),
closeAfterTimeout: make(chan struct{}),
name: fmt.Sprintf("actor-%d", len(g.actors)),
stopTimeout: -1,
}
actor.startCtx, actor.startCancel = context.WithCancel(context.Background())
g.actors = append(g.actors, actor)
return actor
}
// NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewAsync() *Actor {
actor := g.newActor()
g.asyncActors = append(g.asyncActors, actor)
return actor
}
// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewSync() *Actor {
actor := g.newActor()
g.syncActors = append(g.syncActors, actor)
return actor
}
// Run 启动所有 [Actor] 并等待 [Actor] 执行完成;
// 当没有 Actor 时执行 Run 会 panic;
//
// Run 包含两个阶段:
// 1. 启动所有 [Actor];
// 2. 等待 ctx 或第1个Actor返回, 终止所有Actor.
func (g *Group) Run(ctx context.Context) error {
if err := g.validate(ctx); err != nil {
return err
}
g.errorsChan = make(chan actorError, len(g.actors))
g.start(ctx)
return g.stop(ctx, g.wait(ctx))
}
func (g *Group) validate(ctx context.Context) error {
if len(g.actors) == 0 {
err := errors.New("no actor")
log.Error(ctx, err.Error())
return err
}
for i, actor := range g.actors {
if actor.startFunc == nil {
err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name)
log.Error(ctx, err.Error())
return err
}
}
return nil
}
// start 启动所有的 [Actor] 后返回.
func (g *Group) start(ctx context.Context) {
for _, actor := range g.actors {
go actor.start()
log.Tracef(ctx, "group is starting group-actor %s", actor.name)
}
}
// wait 等待 ctx 或 [Actor] 返回
func (g *Group) wait(ctx context.Context) error {
log.Tracef(ctx, "group is waiting")
select {
case ae := <-g.errorsChan:
log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name)
if ae.err != nil {
return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err)
}
return nil
case <-ctx.Done():
log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error())
return ctx.Err()
}
}
// stop 等待 [Actor] 停止
func (g *Group) stop(ctx context.Context, err error) error {
stopCtx := context.Background()
if g.cfg.stopTimeout >= 0 {
log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout)
var cancel func()
stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout)
defer cancel()
}
go func() {
// 终止异步 [Actor]
for _, actor := range g.asyncActors {
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
}
// 终止同步 [Actor] 并等待 [Actor] 退出
for i := len(g.syncActors) - 1; i >= 0; i-- {
actor := g.syncActors[i]
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
actor.wait()
}
}()
// 等待所有 [Actor] 退出, 超时立即退出
for _, actor := range g.actors {
select {
case <-actor.closeAfterStop:
log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name)
case <-actor.closeAfterTimeout:
log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name)
case <-stopCtx.Done():
if err == nil {
err = stopCtx.Err()
} else {
err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err)
}
log.Error(ctx, err.Error())
return err
}
}
log.Tracef(ctx, "group return: %v", err)
return err
}
type config struct {
stopTimeout time.Duration // <0表示没有超时时间
}
// Option 修改 [Group] 的配置;
type Option interface {
apply(cfg *config)
}
type optionFunc func(cfg *config)
func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间.
// <0表示没有超时时间.
func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.stopTimeout = d
})
}