You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
crate/exegroup/group.go

123 lines
2.1 KiB
Go

package exegroup
import (
"context"
"fmt"
"time"
)
type Group struct {
actors []*Actor
cfg config
}
func New(opts ...Option) *Group {
cfg := config{}
for _, opt := range opts {
opt.apply(&cfg)
}
return &Group{
cfg: cfg,
}
}
func Default(opts ...Option) *Group {
g := New(opts...)
g.New().WithName("signal").WithGo(HandleSignal())
return g
}
func (g *Group) New() *Actor {
actor := NewActor().WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
g.actors = append(g.actors, actor)
return actor
}
func (g *Group) Run(ctx context.Context) error {
g.validate()
ctx, cancel := context.WithCancel(ctx)
c := make(chan error, len(g.actors))
g.start(ctx, c)
return g.wait(c, cancel)
}
func (g *Group) validate() {
if len(g.actors) == 0 {
panic("no actor")
}
for _, actor := range g.actors {
if actor.goFunc == nil {
panic(actor.name + "has nil goFunc")
}
}
}
func (g *Group) start(ctx context.Context, c chan error) {
for _, actor := range g.actors {
go func(actor *Actor) {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("%v", v)
}
c <- err
}()
err = actor.goFunc(ctx)
}(actor)
}
}
func (g *Group) wait(c chan error, cancel context.CancelFunc) (err error) {
err = <-c
cancel()
ctx := context.Background()
if g.cfg.stopTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
defer cancel()
}
for _, m := range g.actors {
if m.stopFunc != nil {
if g.cfg.concurrentStop {
go m.stopFunc(ctx)
} else {
m.stopFunc(ctx)
}
}
}
for i := 1; i < len(g.actors); i++ {
select {
case <-c:
case <-ctx.Done():
return
}
}
return
}
type config struct {
concurrentStop bool
stopTimeout time.Duration
}
type Option interface {
apply(cfg *config)
}
type optionFunc func(cfg *config)
func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
func WithConcurrentStop() Option {
return optionFunc(func(cfg *config) {
cfg.concurrentStop = true
})
}
func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.stopTimeout = d
})
}