commit 4f028bc117a7e13e8716b1e0067770dcd182532b Author: Ge Song Date: Sat Mar 25 23:29:06 2023 +0800 新增 exegroup 库; diff --git a/exegroup/actor.go b/exegroup/actor.go new file mode 100644 index 0000000..7a0941b --- /dev/null +++ b/exegroup/actor.go @@ -0,0 +1,34 @@ +package exegroup + +import "context" + +type ( + GoFunc = func(ctx context.Context) error + StopFunc = func(ctx context.Context) +) + +type Actor struct { + goFunc GoFunc + stopFunc StopFunc + name string +} + +func NewActor() *Actor { + return &Actor{} +} + +func (actor *Actor) WithName(name string) *Actor { + actor.name = name + return actor +} + +func (actor *Actor) WithGo(goFunc GoFunc) *Actor { + actor.goFunc = goFunc + return actor +} + +func (actor *Actor) WithGoStop(goFunc GoFunc, stopFunc StopFunc) *Actor { + actor.goFunc = goFunc + actor.stopFunc = stopFunc + return actor +} diff --git a/exegroup/go.mod b/exegroup/go.mod new file mode 100644 index 0000000..08ee572 --- /dev/null +++ b/exegroup/go.mod @@ -0,0 +1,3 @@ +module git.blauwelle.com/go/crate/exegroup + +go 1.20 diff --git a/exegroup/group.go b/exegroup/group.go new file mode 100644 index 0000000..c58c697 --- /dev/null +++ b/exegroup/group.go @@ -0,0 +1,122 @@ +package exegroup + +import ( + "context" + "fmt" + "time" +) + +type Group struct { + actors []*Actor + cfg config +} + +func New(opts ...Option) *Group { + cfg := config{} + for _, opt := range opts { + opt.apply(&cfg) + } + return &Group{ + cfg: cfg, + } +} + +func Default(opts ...Option) *Group { + g := New(opts...) + g.New().WithName("signal").WithGo(HandleSignal()) + return g +} + +func (g *Group) New() *Actor { + actor := NewActor().WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) + g.actors = append(g.actors, actor) + return actor +} + +func (g *Group) Run(ctx context.Context) error { + g.validate() + ctx, cancel := context.WithCancel(ctx) + c := make(chan error, len(g.actors)) + g.start(ctx, c) + return g.wait(c, cancel) +} + +func (g *Group) validate() { + if len(g.actors) == 0 { + panic("no actor") + } + for _, actor := range g.actors { + if actor.goFunc == nil { + panic(actor.name + "has nil goFunc") + } + } +} + +func (g *Group) start(ctx context.Context, c chan error) { + for _, actor := range g.actors { + go func(actor *Actor) { + var err error + defer func() { + if v := recover(); v != nil { + err = fmt.Errorf("%v", v) + } + c <- err + }() + err = actor.goFunc(ctx) + }(actor) + } +} + +func (g *Group) wait(c chan error, cancel context.CancelFunc) (err error) { + err = <-c + cancel() + ctx := context.Background() + if g.cfg.stopTimeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout) + defer cancel() + } + for _, m := range g.actors { + if m.stopFunc != nil { + if g.cfg.concurrentStop { + go m.stopFunc(ctx) + } else { + m.stopFunc(ctx) + } + } + } + for i := 1; i < len(g.actors); i++ { + select { + case <-c: + case <-ctx.Done(): + return + } + } + return +} + +type config struct { + concurrentStop bool + stopTimeout time.Duration +} + +type Option interface { + apply(cfg *config) +} + +type optionFunc func(cfg *config) + +func (fn optionFunc) apply(cfg *config) { + fn(cfg) +} + +func WithConcurrentStop() Option { + return optionFunc(func(cfg *config) { + cfg.concurrentStop = true + }) +} + +func WithStopTimeout(d time.Duration) Option { + return optionFunc(func(cfg *config) { + cfg.stopTimeout = d + }) +} diff --git a/exegroup/signal.go b/exegroup/signal.go new file mode 100644 index 0000000..90518e0 --- /dev/null +++ b/exegroup/signal.go @@ -0,0 +1,26 @@ +package exegroup + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" +) + +func HandleSignal(signals ...os.Signal) GoFunc { + c := make(chan os.Signal, 1) + if len(signals) == 0 { + signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} + } + signal.Notify(c, signals...) + return func(ctx context.Context) error { + defer signal.Stop(c) + select { + case sig := <-c: + return fmt.Errorf("signal %s", sig) + case <-ctx.Done(): + return ctx.Err() + } + } +}