|
|
|
// Package exegroup 管理1组执行单元的启动和终止;
|
|
|
|
//
|
|
|
|
// 使用举例:
|
|
|
|
//
|
|
|
|
// 新建 [Group]:
|
|
|
|
//
|
|
|
|
// g := New()
|
|
|
|
//
|
|
|
|
// 新建默认 [Group] (内置信号处理):
|
|
|
|
//
|
|
|
|
// g := Default()
|
|
|
|
//
|
|
|
|
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
|
|
|
|
//
|
|
|
|
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
|
|
|
|
// <-ctx.Done()
|
|
|
|
// return ctx.Err()
|
|
|
|
// })
|
|
|
|
//
|
|
|
|
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
|
|
|
|
//
|
|
|
|
// server := &http.Server{Addr: ":80"}
|
|
|
|
// inShutdown := &atomic.Bool{}
|
|
|
|
// c := make(chan error, 1)
|
|
|
|
// goFunc := func(_ context.Context) error {
|
|
|
|
// err := server.ListenAndServe()
|
|
|
|
// if inShutdown.Load() {
|
|
|
|
// err = <-c
|
|
|
|
// }
|
|
|
|
// return err
|
|
|
|
// }
|
|
|
|
// stopFunc := func(ctx context.Context) {
|
|
|
|
// inShutdown.Store(true)
|
|
|
|
// c <- server.Shutdown(ctx)
|
|
|
|
// }
|
|
|
|
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
|
|
|
|
//
|
|
|
|
// 启动 [Group]:
|
|
|
|
//
|
|
|
|
// g.Run()
|
|
|
|
package exegroup
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
|
|
|
|
// Group 的执行过程参考 [Group.Run];
|
|
|
|
//
|
|
|
|
// Group 的配置包含:
|
|
|
|
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
|
|
|
|
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
|
|
|
|
type Group struct {
|
|
|
|
actors []*Actor
|
|
|
|
cfg config
|
|
|
|
}
|
|
|
|
|
|
|
|
// New 创建 [Group];
|
|
|
|
func New(opts ...Option) *Group {
|
|
|
|
cfg := config{}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt.apply(&cfg)
|
|
|
|
}
|
|
|
|
return &Group{
|
|
|
|
cfg: cfg,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Default 创建包含信号处理 [Actor] 的 [Group];
|
|
|
|
func Default(opts ...Option) *Group {
|
|
|
|
g := New(opts...)
|
|
|
|
g.New().WithName("signal").WithGo(HandleSignal())
|
|
|
|
return g
|
|
|
|
}
|
|
|
|
|
|
|
|
// New 创建并添加 [Actor],
|
|
|
|
// 对 Actor 的配置通过链式调用完成;
|
|
|
|
func (g *Group) New() *Actor {
|
|
|
|
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
|
|
|
|
g.actors = append(g.actors, actor)
|
|
|
|
return actor
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成;
|
|
|
|
// 当没有 Actor 时执行 Run 会 panic;
|
|
|
|
//
|
|
|
|
// Run 包含两个阶段:
|
|
|
|
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段;
|
|
|
|
// 2. 执行 [Actor] 的终止函数并返回;
|
|
|
|
func (g *Group) Run(ctx context.Context) error {
|
|
|
|
g.validate()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
c := make(chan error, len(g.actors))
|
|
|
|
g.start(ctx, c)
|
|
|
|
return g.wait(c, cancel)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) validate() {
|
|
|
|
if len(g.actors) == 0 {
|
|
|
|
panic("no actor")
|
|
|
|
}
|
|
|
|
for _, actor := range g.actors {
|
|
|
|
if actor.goFunc == nil {
|
|
|
|
panic(actor.name + " has nil goFunc")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) start(ctx context.Context, c chan error) {
|
|
|
|
for _, actor := range g.actors {
|
|
|
|
go func(actor *Actor) {
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
|
|
if v := recover(); v != nil {
|
|
|
|
err = fmt.Errorf("%v", v)
|
|
|
|
}
|
|
|
|
c <- err
|
|
|
|
}()
|
|
|
|
err = actor.goFunc(ctx)
|
|
|
|
}(actor)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) wait(c chan error, cancel context.CancelFunc) error {
|
|
|
|
err := <-c
|
|
|
|
cancel()
|
|
|
|
ctx := context.Background()
|
|
|
|
if g.cfg.stopTimeout > 0 {
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
|
|
|
|
defer cancel()
|
|
|
|
}
|
|
|
|
for i := len(g.actors) - 1; i >= 0; i-- {
|
|
|
|
if g.actors[i].stopFunc != nil {
|
|
|
|
if g.cfg.concurrentStop {
|
|
|
|
go g.actors[i].stopFunc(ctx)
|
|
|
|
} else {
|
|
|
|
g.actors[i].stopFunc(ctx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i := 1; i < len(g.actors); i++ {
|
|
|
|
select {
|
|
|
|
case <-c:
|
|
|
|
case <-ctx.Done():
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
type config struct {
|
|
|
|
concurrentStop bool
|
|
|
|
stopTimeout time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
// Option 修改 [Group] 的配置;
|
|
|
|
type Option interface {
|
|
|
|
apply(cfg *config)
|
|
|
|
}
|
|
|
|
|
|
|
|
type optionFunc func(cfg *config)
|
|
|
|
|
|
|
|
func (fn optionFunc) apply(cfg *config) {
|
|
|
|
fn(cfg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithConcurrentStop 并发执行 [Actor] 的终止函数;
|
|
|
|
func WithConcurrentStop() Option {
|
|
|
|
return optionFunc(func(cfg *config) {
|
|
|
|
cfg.concurrentStop = true
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
|
|
|
|
func WithStopTimeout(d time.Duration) Option {
|
|
|
|
return optionFunc(func(cfg *config) {
|
|
|
|
cfg.stopTimeout = d
|
|
|
|
})
|
|
|
|
}
|