You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
221 lines
5.3 KiB
Go
221 lines
5.3 KiB
Go
// Package exegroup 按照顺序启动和终止任务;
|
|
//
|
|
// Examples:
|
|
//
|
|
// 新建 [Group]:
|
|
//
|
|
// g := New()
|
|
//
|
|
// 新建默认 [Group] (内置信号处理):
|
|
//
|
|
// g := Default()
|
|
//
|
|
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
|
|
//
|
|
// g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
|
|
// <-ctx.Done()
|
|
// return ctx.Err()
|
|
// })
|
|
//
|
|
// 启动 [Group]:
|
|
//
|
|
// g.Run(ctx)
|
|
package exegroup
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.blauwelle.com/go/crate/log"
|
|
)
|
|
|
|
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
|
|
// Group 的执行过程参考 [Group.Run];
|
|
//
|
|
// Group 的配置:
|
|
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
|
|
type Group struct {
|
|
errorsChan chan actorError
|
|
actors []*Actor
|
|
syncActors []*Actor
|
|
asyncActors []*Actor
|
|
cfg config
|
|
}
|
|
|
|
type actorError struct {
|
|
actor *Actor
|
|
err error
|
|
}
|
|
|
|
// New 创建 [Group];
|
|
func New(opts ...Option) *Group {
|
|
cfg := config{
|
|
stopTimeout: -1,
|
|
}
|
|
for _, opt := range opts {
|
|
opt.apply(&cfg)
|
|
}
|
|
return &Group{
|
|
cfg: cfg,
|
|
}
|
|
}
|
|
|
|
// Default 创建包含信号处理 [Actor] 的 [Group];
|
|
func Default(opts ...Option) *Group {
|
|
g := New(opts...)
|
|
g.NewAsync().WithName("signal").WithStartFunc(HandleSignal())
|
|
return g
|
|
}
|
|
|
|
func (g *Group) newActor() *Actor {
|
|
actor := &Actor{
|
|
group: g,
|
|
closeAfterStop: make(chan struct{}),
|
|
closeAfterTimeout: make(chan struct{}),
|
|
name: fmt.Sprintf("actor-%d", len(g.actors)),
|
|
stopTimeout: -1,
|
|
}
|
|
actor.startCtx, actor.startCancel = context.WithCancel(context.Background())
|
|
g.actors = append(g.actors, actor)
|
|
return actor
|
|
}
|
|
|
|
// NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
|
|
func (g *Group) NewAsync() *Actor {
|
|
actor := g.newActor()
|
|
g.asyncActors = append(g.asyncActors, actor)
|
|
return actor
|
|
}
|
|
|
|
// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
|
|
func (g *Group) NewSync() *Actor {
|
|
actor := g.newActor()
|
|
g.syncActors = append(g.syncActors, actor)
|
|
return actor
|
|
}
|
|
|
|
// Run 启动所有 [Actor] 并等待 [Actor] 执行完成;
|
|
// 当没有 Actor 时执行 Run 会 panic;
|
|
//
|
|
// Run 包含两个阶段:
|
|
// 1. 启动所有 [Actor];
|
|
// 2. 等待 ctx 或第1个Actor返回, 终止所有Actor.
|
|
func (g *Group) Run(ctx context.Context) error {
|
|
if err := g.validate(ctx); err != nil {
|
|
return err
|
|
}
|
|
g.errorsChan = make(chan actorError, len(g.actors))
|
|
g.start(ctx)
|
|
|
|
return g.stop(ctx, g.wait(ctx))
|
|
}
|
|
|
|
func (g *Group) validate(ctx context.Context) error {
|
|
if len(g.actors) == 0 {
|
|
err := errors.New("no actor")
|
|
log.Error(ctx, err.Error())
|
|
return err
|
|
}
|
|
for i, actor := range g.actors {
|
|
if actor.startFunc == nil {
|
|
err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name)
|
|
log.Error(ctx, err.Error())
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// start 启动所有的 [Actor] 后返回.
|
|
func (g *Group) start(ctx context.Context) {
|
|
for _, actor := range g.actors {
|
|
go actor.start()
|
|
log.Tracef(ctx, "group is starting group-actor %s", actor.name)
|
|
}
|
|
}
|
|
|
|
// wait 等待 ctx 或 [Actor] 返回
|
|
func (g *Group) wait(ctx context.Context) error {
|
|
log.Tracef(ctx, "group is waiting")
|
|
select {
|
|
case ae := <-g.errorsChan:
|
|
log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name)
|
|
if ae.err != nil {
|
|
return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err)
|
|
}
|
|
return nil
|
|
case <-ctx.Done():
|
|
log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error())
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// stop 等待 [Actor] 停止
|
|
func (g *Group) stop(ctx context.Context, err error) error {
|
|
stopCtx := context.Background()
|
|
if g.cfg.stopTimeout >= 0 {
|
|
log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout)
|
|
var cancel func()
|
|
stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout)
|
|
defer cancel()
|
|
}
|
|
go func() {
|
|
// 终止异步 [Actor]
|
|
for _, actor := range g.asyncActors {
|
|
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
|
|
actor.sendStopSignal()
|
|
}
|
|
// 终止同步 [Actor] 并等待 [Actor] 退出
|
|
for i := len(g.syncActors) - 1; i >= 0; i-- {
|
|
actor := g.syncActors[i]
|
|
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
|
|
actor.sendStopSignal()
|
|
actor.wait()
|
|
}
|
|
}()
|
|
// 等待所有 [Actor] 退出, 超时立即退出
|
|
for _, actor := range g.actors {
|
|
select {
|
|
case <-actor.closeAfterStop:
|
|
log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name)
|
|
case <-actor.closeAfterTimeout:
|
|
log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name)
|
|
case <-stopCtx.Done():
|
|
if err == nil {
|
|
err = stopCtx.Err()
|
|
} else {
|
|
err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err)
|
|
}
|
|
log.Error(ctx, err.Error())
|
|
return err
|
|
}
|
|
}
|
|
log.Tracef(ctx, "group return: %v", err)
|
|
return err
|
|
}
|
|
|
|
type config struct {
|
|
stopTimeout time.Duration // <0表示没有超时时间
|
|
}
|
|
|
|
// Option 修改 [Group] 的配置;
|
|
type Option interface {
|
|
apply(cfg *config)
|
|
}
|
|
|
|
type optionFunc func(cfg *config)
|
|
|
|
func (fn optionFunc) apply(cfg *config) {
|
|
fn(cfg)
|
|
}
|
|
|
|
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间.
|
|
// <0表示没有超时时间.
|
|
func WithStopTimeout(d time.Duration) Option {
|
|
return optionFunc(func(cfg *config) {
|
|
cfg.stopTimeout = d
|
|
})
|
|
}
|