完善 exegroup 的文档并提供 http 服务函数;

develop
Ge Song 2 years ago
parent 4f028bc117
commit bd1fc116ec

2
.gitignore vendored

@ -0,0 +1,2 @@
.idea/
.DS_Store

@ -2,32 +2,35 @@ package exegroup
import "context"
type (
GoFunc = func(ctx context.Context) error
StopFunc = func(ctx context.Context)
)
// Actor 是 [Group] 调度的执行单元;
//
// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出;
type Actor struct {
goFunc GoFunc
stopFunc StopFunc
goFunc func(ctx context.Context) error
stopFunc func(ctx context.Context)
name string
}
func NewActor() *Actor {
return &Actor{}
}
// WithName 指定 [Actor] 的 name;
func (actor *Actor) WithName(name string) *Actor {
actor.name = name
return actor
}
func (actor *Actor) WithGo(goFunc GoFunc) *Actor {
// WithGo 指定 [Actor] 的 goFunc;
// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出;
func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor {
actor.goFunc = goFunc
return actor
}
func (actor *Actor) WithGoStop(goFunc GoFunc, stopFunc StopFunc) *Actor {
// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc;
// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出;
// 1. goFunc 被 [Group] 在 goroutine 中启动;
// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用;
//
// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出;
func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor {
actor.goFunc = goFunc
actor.stopFunc = stopFunc
return actor

@ -1,3 +1,43 @@
// Package exegroup 管理1组执行单元的启动和终止;
//
// 使用举例:
//
// 新建 [Group]:
//
// g := New()
//
// 新建默认 [Group] (内置信号处理):
//
// g := Default()
//
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
//
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
// <-ctx.Done()
// return ctx.Err()
// })
//
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
//
// server := &http.Server{Addr: ":80"}
// inShutdown := &atomic.Bool{}
// c := make(chan error, 1)
// goFunc := func(_ context.Context) error {
// err := server.ListenAndServe()
// if inShutdown.Load() {
// err = <-c
// }
// return err
// }
// stopFunc := func(ctx context.Context) {
// inShutdown.Store(true)
// c <- server.Shutdown(ctx)
// }
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
//
// 启动 [Group]:
//
// g.Run()
package exegroup
import (
@ -6,11 +46,18 @@ import (
"time"
)
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
// Group 的执行过程参考 [Group.Run];
//
// Group 的配置包含:
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
type Group struct {
actors []*Actor
cfg config
}
// New 创建 [Group];
func New(opts ...Option) *Group {
cfg := config{}
for _, opt := range opts {
@ -21,18 +68,27 @@ func New(opts ...Option) *Group {
}
}
// Default 创建包含信号处理 [Actor] 的 [Group];
func Default(opts ...Option) *Group {
g := New(opts...)
g.New().WithName("signal").WithGo(HandleSignal())
return g
}
// New 创建并添加 [Actor],
// 对 Actor 的配置通过链式调用完成;
func (g *Group) New() *Actor {
actor := NewActor().WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
g.actors = append(g.actors, actor)
return actor
}
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成;
// 当没有 Actor 时执行 Run 会 panic;
//
// Run 包含两个阶段:
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段;
// 2. 执行 [Actor] 的终止函数并返回;
func (g *Group) Run(ctx context.Context) error {
g.validate()
ctx, cancel := context.WithCancel(ctx)
@ -99,6 +155,7 @@ type config struct {
stopTimeout time.Duration
}
// Option 修改 [Group] 的配置;
type Option interface {
apply(cfg *config)
}
@ -109,12 +166,14 @@ func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
// WithConcurrentStop 并发执行 [Actor] 的终止函数;
func WithConcurrentStop() Option {
return optionFunc(func(cfg *config) {
cfg.concurrentStop = true
})
}
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.stopTimeout = d

@ -0,0 +1,16 @@
package exegroup_test
import (
"context"
"git.blauwelle.com/go/crate/exegroup"
"log"
)
func Example_defaultGroup() {
g := exegroup.Default()
g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
})
log.Println("exit:", g.Run(context.Background()))
}

@ -0,0 +1,27 @@
package exegroup
import (
"context"
"fmt"
"net/http"
"sync/atomic"
)
// HttpListenAndServe 提供 [http.Server] 的启动和停止函数;
func HttpListenAndServe(port int, handler http.Handler) (func(ctx context.Context) error, func(ctx context.Context)) {
server := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: handler}
inShutdown := &atomic.Bool{}
c := make(chan error, 1)
goFunc := func(_ context.Context) error {
err := server.ListenAndServe()
if inShutdown.Load() {
err = <-c
}
return err
}
stopFunc := func(ctx context.Context) {
inShutdown.Store(true)
c <- server.Shutdown(ctx)
}
return goFunc, stopFunc
}

@ -8,7 +8,8 @@ import (
"syscall"
)
func HandleSignal(signals ...os.Signal) GoFunc {
// HandleSignal 提供信号处理函数;
func HandleSignal(signals ...os.Signal) func(ctx context.Context) error {
c := make(chan os.Signal, 1)
if len(signals) == 0 {
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}

Loading…
Cancel
Save