Compare commits

..

5 Commits

Author SHA1 Message Date
dd36810af7 add cdata; 2024-05-07 14:34:05 +08:00
e2f234c17e add cerror; 2024-05-07 14:25:18 +08:00
8671f2826d httpdata/add stacktrace; 2024-02-23 13:14:16 +08:00
5cf4430bc8 uptracehelper/change GoStop to Bootstrap; 2024-01-17 17:24:32 +08:00
a844e9e34c exegroup/rewrite control logic, make sync actor stop serial; 2024-01-17 16:17:51 +08:00
25 changed files with 670 additions and 160 deletions

8
cdata/go.mod Normal file
View File

@@ -0,0 +1,8 @@
module git.blauwelle.com/go/crate/cdata
go 1.21.3
require (
git.blauwelle.com/go/crate/cerrors v0.1.0
git.blauwelle.com/go/crate/runtimehelper v0.2.0
)

6
cdata/go.sum Normal file
View File

@@ -0,0 +1,6 @@
git.blauwelle.com/go/crate/cerrors v0.0.0-20240507062518-e2f234c17eb2 h1:h3WnMe7tQNbh6WARbixrf8wTUrWCDNc3F6ikb1HQhj4=
git.blauwelle.com/go/crate/cerrors v0.0.0-20240507062518-e2f234c17eb2/go.mod h1:vIWWWVAEwattbg1eVOUjnz5dEG7q7v/Ve0oR1HFsblM=
git.blauwelle.com/go/crate/cerrors v0.1.0 h1:+9TdG54+AUG1NmUgrEPsAPY8n8RIUAQy9qx4drU9RtE=
git.blauwelle.com/go/crate/cerrors v0.1.0/go.mod h1:vIWWWVAEwattbg1eVOUjnz5dEG7q7v/Ve0oR1HFsblM=
git.blauwelle.com/go/crate/runtimehelper v0.2.0 h1:W19wipPCyFSGHOWzqtfouNJu7MDeJobP+iRM4bPiJpM=
git.blauwelle.com/go/crate/runtimehelper v0.2.0/go.mod h1:yVMA0GkO9AS7iuPmalHKeWyv9en0JWj25rY1vpTuHhk=

64
cdata/response.go Normal file
View File

@@ -0,0 +1,64 @@
package cdata
import (
"errors"
"git.blauwelle.com/go/crate/cerrors"
"git.blauwelle.com/go/crate/runtimehelper"
)
const maximumFrames = 16
type Response[T any] struct {
Data T `json:"data,omitempty"`
Code cerrors.Code `json:"code"`
Message string `json:"message,omitempty"`
Debug string `json:"debug,omitempty"`
Traceback []runtimehelper.Frame `json:"traceback,omitempty"`
}
type PageData[T any] struct {
List []T `json:"list"`
PageIndex int `json:"pageIndex"` // >=1
PageSize int `json:"pageSize"` // >=1
Total int `json:"total"` // maybe 0
}
func NewOkResponse[T any](data T) Response[T] {
return Response[T]{
Code: cerrors.CodeOK,
Data: data,
}
}
func NewResponseFromError(err error) Response[struct{}] {
e := &cerrors.Error{}
if !errors.As(err, &e) {
response := Response[struct{}]{
Code: cerrors.CodeUnknown,
Message: "未知错误",
}
if cerrors.FlagDebug {
response.Debug = err.Error()
response.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return response
}
return Response[struct{}]{
Code: e.Code,
Message: e.Message,
Debug: e.Debug,
Traceback: e.Traceback,
}
}
func NewErrorResponse(code cerrors.Code, message string) Response[struct{}] {
response := Response[struct{}]{
Code: code,
Message: message,
}
if cerrors.FlagDebug {
response.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return response
}

141
cerrors/.golangci.yaml Normal file
View File

@@ -0,0 +1,141 @@
## 基于 golangci-lint@v1.52.2
run:
timeout: 1m
build-tags: [ ]
skip-dirs: [ ]
skip-files: [ ]
linters:
disable-all: true
enable:
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- typecheck
- unused
- asasalint
- asciicheck
- bidichk
- bodyclose
- containedctx
- cyclop
- dupl
- durationcheck
- errname
- errorlint
- exhaustive
- exportloopref
- funlen
- gocheckcompilerdirectives
- gochecknoinits
- goconst
- gocritic
- gocyclo
- goimports
- gomnd
- goprintffuncname
- gosec
- lll
- loggercheck
- makezero
- nakedret
- nestif
- nilnil
- noctx
- nolintlint
- prealloc
- predeclared
- promlinter
- reassign
- revive
- rowserrcheck
- stylecheck
- tenv
- testableexamples
- testpackage
- tparallel
- unconvert
- unparam
- usestdlibvars
- wastedassign
- whitespace
linters-settings:
errcheck:
check-type-assertions: true
exclude-functions: [ ]
govet:
enable-all: true
disable: [ ]
cyclop:
max-complexity: 10
package-average: 0.0
dupl:
threshold: 150
exhaustive:
check:
- switch
- map
funlen:
lines: 100
statements: 60
gocritic:
disabled-checks:
- commentFormatting
settings:
captLocal:
paramsOnly: false
underef:
skipRecvDeref: false
gocyclo:
min-complexity: 20
gomnd:
ignored-functions:
- os.Chmod
- os.Mkdir
- os.MkdirAll
- os.OpenFile
- os.WriteFile
- prometheus.ExponentialBuckets
- prometheus.ExponentialBucketsRange
- prometheus.LinearBuckets
lll:
line-length: 240
nakedret:
max-func-lines: 10
nestif:
min-complexity: 5
predeclared:
ignore: ""
q: false
reassign:
patterns:
- ".*"
rowserrcheck:
packages:
- github.com/jmoiron/sqlx
tenv:
all: true
usestdlibvars:
time-month: true
time-layout: true
crypto-hash: true
default-rpc-path: true
os-dev-null: true
sql-isolation-level: true
tls-signature-scheme: true
constant-kind: true
syslog-priority: true
issues:
max-same-issues: 10
exclude-rules:
- source: "//noinspection"
linters: [ gocritic ]
- path: "_test\\.go"
linters:
- bodyclose
- dupl
- funlen
- goconst
- gosec
- noctx

14
cerrors/code.go Normal file
View File

@@ -0,0 +1,14 @@
package cerrors
type Code string
const (
CodeOK Code = "ok"
CodeError Code = "error"
CodeBadRequest Code = "bad_request"
CodeInternal Code = "internal"
CodeInternalUpstream Code = "internal.upstream"
CodeUnexpect Code = "unexpect"
CodeUnexpectPanic Code = "unexpect.panic"
CodeUnknown Code = "unknown"
)

47
cerrors/error.go Normal file
View File

@@ -0,0 +1,47 @@
package cerrors
import (
"git.blauwelle.com/go/crate/runtimehelper"
)
const maximumFrames = 16
type Error struct {
Code Code
Message string
Debug string
Traceback []runtimehelper.Frame
}
func (err *Error) Error() string {
if err.Message == "" {
return string(err.Code)
}
return string(err.Code) + ": " + err.Message
}
func New(code Code, message string) error {
e := &Error{
Code: code,
Message: message,
}
if FlagDebug {
e.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return e
}
func Wrap(err error, code Code, message string) error {
e2 := &Error{
Code: code,
Message: message,
}
if e1, ok := err.(*Error); ok { //nolint:errorlint
e2.Debug = e1.Debug
e2.Traceback = e1.Traceback
} else if FlagDebug {
e2.Debug = err.Error()
e2.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return e2
}

7
cerrors/flag.go Normal file
View File

@@ -0,0 +1,7 @@
package cerrors
import (
"os"
)
var FlagDebug = os.Getenv("DEBUG") != ""

5
cerrors/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module git.blauwelle.com/go/crate/cerrors
go 1.21.1
require git.blauwelle.com/go/crate/runtimehelper v0.2.0

2
cerrors/go.sum Normal file
View File

@@ -0,0 +1,2 @@
git.blauwelle.com/go/crate/runtimehelper v0.2.0 h1:W19wipPCyFSGHOWzqtfouNJu7MDeJobP+iRM4bPiJpM=
git.blauwelle.com/go/crate/runtimehelper v0.2.0/go.mod h1:yVMA0GkO9AS7iuPmalHKeWyv9en0JWj25rY1vpTuHhk=

8
cerrors/std.go Normal file
View File

@@ -0,0 +1,8 @@
package cerrors
import (
"errors"
)
var Is = errors.Is
var As = errors.As

9
exegroup/README.md Normal file
View File

@@ -0,0 +1,9 @@
# exegroup 按照顺序启动和终止任务
参考[group.go](./group.go)中的使用说明;
### `Group` / `Actor` / `Actor.startFunc` 超时时间的约定
- `>= 0` 的超时时间被用来初始化定时器, 到时间时触发超时;
- `< 0` 的超时时间表示永不超时;
- 所有超时时间初始化为负数(-1);

View File

@@ -1,14 +1,93 @@
package exegroup
import "context"
import (
"context"
"fmt"
"time"
"git.blauwelle.com/go/crate/log"
)
// Actor 是 [Group] 调度的执行单元;
//
// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出;
// startCtx, startCancel 需要在构造 Actor 时初始化, 在后续 start / wait 初始化会导致 data race.
type Actor struct {
goFunc func(ctx context.Context) error
stopFunc func(ctx context.Context)
startFunc func(ctx context.Context) error
group *Group
closeAfterStop chan struct{}
closeAfterTimeout chan struct{}
startCtx context.Context //nolint:containedctx
startCancel func()
name string
stopTimeout time.Duration // <0表示没有超时时间
isFastStop bool
}
func (actor *Actor) wrapFastStop() {
if !actor.isFastStop {
return
}
startFunc := actor.startFunc
actor.startFunc = func(ctx context.Context) error {
errChan := make(chan error, 1)
go func() {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("recover: %s", v)
}
errChan <- err
}()
err = startFunc(ctx)
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}
}
// start 同步启动 [Actor], [Actor].startFunc panic 或返回的 error 会被写入 [Actor].errorChan.
func (actor *Actor) start() {
actor.wrapFastStop()
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("recover: %s", v)
}
actor.group.errorsChan <- actorError{
actor: actor,
err: err,
}
log.Tracef(context.Background(), "group-actor %s return", actor.name)
close(actor.closeAfterStop)
}()
err = actor.startFunc(actor.startCtx)
}
// sendStopSignal 通知 [Actor] 退出.
func (actor *Actor) sendStopSignal() {
actor.startCancel()
if actor.stopTimeout >= 0 {
go func() {
timer := time.NewTimer(actor.stopTimeout)
select {
case <-actor.closeAfterStop:
timer.Stop()
case <-timer.C:
close(actor.closeAfterTimeout)
}
}()
}
}
// wait 等待 [Actor] 停止或超时.
func (actor *Actor) wait() {
select {
case <-actor.closeAfterTimeout:
case <-actor.closeAfterStop:
}
}
// WithName 指定 [Actor] 的 name;
@@ -17,21 +96,21 @@ func (actor *Actor) WithName(name string) *Actor {
return actor
}
// WithGo 指定 [Actor] 的 goFunc;
// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出;
func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor {
actor.goFunc = goFunc
// WithStartFunc 指定 [Actor] 的 startFunc;
// 通过 WithStartFunc 注册的 startFunc 函数应该受 ctx 控制退出;
func (actor *Actor) WithStartFunc(startFunc func(ctx context.Context) error) *Actor {
actor.startFunc = startFunc
return actor
}
// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc;
// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出;
// 1. goFunc 被 [Group] 在 goroutine 中启动;
// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用;
//
// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出;
func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor {
actor.goFunc = goFunc
actor.stopFunc = stopFunc
// WithFastStop 设置 [Actor] 接收停止信号后立即终止.
func (actor *Actor) WithFastStop(fastStop bool) *Actor {
actor.isFastStop = fastStop
return actor
}
// WithStopTimeout 指定停止 [Actor] 的最长等待时间, 设置 <0 没有最长等待时间.
func (actor *Actor) WithStopTimeout(duration time.Duration) *Actor {
actor.stopTimeout = duration
return actor
}

View File

@@ -2,11 +2,13 @@ package eghttp
import (
"context"
"errors"
"net"
"net/http"
"strconv"
"sync/atomic"
"time"
"git.blauwelle.com/go/crate/log"
)
const (
@@ -30,6 +32,7 @@ func WithHandler(handler http.Handler) Option {
})
}
// WithServer 指定要启动的服务器. 注意会替换原有的服务器, 导致早前配置服务器相关的 [Option] 失效.
func WithServer(server *http.Server) Option {
return optionFunc(func(cfg *config) {
cfg.server = server
@@ -49,9 +52,17 @@ func WithServerOption(fn func(server *http.Server)) Option {
})
}
// WithShutdownTimeout 设置关闭服务的超时时间. <0 没有最长等待时间.
func WithShutdownTimeout(timeout time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.shutdownTimeout = timeout
})
}
type config struct {
server *http.Server
startFn func(server *http.Server) error
shutdownTimeout time.Duration
}
func newDefaultConfig() *config {
@@ -63,6 +74,7 @@ func newDefaultConfig() *config {
startFn: func(server *http.Server) error {
return server.ListenAndServe()
},
shutdownTimeout: -1,
}
}
@@ -72,24 +84,31 @@ func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
// HTTPListenAndServe 创建 [http.Server] 并提供启动和停止函数;
func HTTPListenAndServe(opts ...Option) (func(ctx context.Context) error, func(ctx context.Context)) {
// ListenAndServe 构造1个函数, 启动 [http.Server] 并受 ctx 控制终止.
func ListenAndServe(opts ...Option) func(ctx context.Context) error {
cfg := newDefaultConfig()
for _, opt := range opts {
opt.apply(cfg)
}
inShutdown := &atomic.Bool{}
c := make(chan error, 1)
goFunc := func(_ context.Context) error {
errChan := make(chan error, 2) //nolint:gomnd
return func(ctx context.Context) error {
go func() {
err := cfg.startFn(cfg.server)
if inShutdown.Load() {
err = <-c
log.Tracef(ctx, "server return with %s", err)
if !errors.Is(err, http.ErrServerClosed) {
errChan <- err
}
return err
}()
go func() {
<-ctx.Done()
shutdownCtx := context.Background()
if cfg.shutdownTimeout >= 0 {
var cancel func()
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.shutdownTimeout)
defer cancel()
}
stopFunc := func(ctx context.Context) {
inShutdown.Store(true)
c <- cfg.server.Shutdown(ctx)
errChan <- cfg.server.Shutdown(shutdownCtx)
}()
return <-errChan
}
return goFunc, stopFunc
}

View File

@@ -1,3 +1,5 @@
module git.blauwelle.com/go/crate/exegroup
go 1.20
require git.blauwelle.com/go/crate/log v0.15.0

4
exegroup/go.sum Normal file
View File

@@ -0,0 +1,4 @@
git.blauwelle.com/go/crate/log v0.14.0 h1:y7hJXP+ZPY/wD+wlEzKgakpki8/l0LwZWqxtJ92Wy58=
git.blauwelle.com/go/crate/log v0.14.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80=
git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=

View File

@@ -1,6 +1,6 @@
// Package exegroup 管理1组执行单元的启动和终止;
// Package exegroup 按照顺序启动和终止任务;
//
// 使用举例:
// Examples:
//
// 新建 [Group]:
//
@@ -12,54 +12,48 @@
//
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
//
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
// g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
// <-ctx.Done()
// return ctx.Err()
// })
//
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
//
// server := &http.Server{Addr: ":80"}
// inShutdown := &atomic.Bool{}
// c := make(chan error, 1)
// goFunc := func(_ context.Context) error {
// err := server.ListenAndServe()
// if inShutdown.Load() {
// err = <-c
// }
// return err
// }
// stopFunc := func(ctx context.Context) {
// inShutdown.Store(true)
// c <- server.Shutdown(ctx)
// }
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
//
// 启动 [Group]:
//
// g.Run()
// g.Run(ctx)
package exegroup
import (
"context"
"errors"
"fmt"
"time"
"git.blauwelle.com/go/crate/log"
)
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
// Group 的执行过程参考 [Group.Run];
//
// Group 的配置包含:
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
// Group 的配置:
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
type Group struct {
errorsChan chan actorError
actors []*Actor
syncActors []*Actor
asyncActors []*Actor
cfg config
}
type actorError struct {
actor *Actor
err error
}
// New 创建 [Group];
func New(opts ...Option) *Group {
cfg := config{}
cfg := config{
stopTimeout: -1,
}
for _, opt := range opts {
opt.apply(&cfg)
}
@@ -71,88 +65,139 @@ func New(opts ...Option) *Group {
// Default 创建包含信号处理 [Actor] 的 [Group];
func Default(opts ...Option) *Group {
g := New(opts...)
g.New().WithName("signal").WithGo(HandleSignal())
g.NewAsync().WithName("signal").WithStartFunc(HandleSignal())
return g
}
// New 创建并添加 [Actor],
// 对 Actor 的配置通过链式调用完成;
func (g *Group) New() *Actor {
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
func (g *Group) newActor() *Actor {
actor := &Actor{
group: g,
closeAfterStop: make(chan struct{}),
closeAfterTimeout: make(chan struct{}),
name: fmt.Sprintf("actor-%d", len(g.actors)),
stopTimeout: -1,
}
actor.startCtx, actor.startCancel = context.WithCancel(context.Background())
g.actors = append(g.actors, actor)
return actor
}
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成;
// NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewAsync() *Actor {
actor := g.newActor()
g.asyncActors = append(g.asyncActors, actor)
return actor
}
// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewSync() *Actor {
actor := g.newActor()
g.syncActors = append(g.syncActors, actor)
return actor
}
// Run 启动所有 [Actor] 并等待 [Actor] 执行完成;
// 当没有 Actor 时执行 Run 会 panic;
//
// Run 包含两个阶段:
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段;
// 2. 执行 [Actor] 的终止函数并返回;
// 1. 启动所有 [Actor];
// 2. 等待 ctx 或第1个Actor返回, 终止所有Actor.
func (g *Group) Run(ctx context.Context) error {
g.validate()
ctx, cancel := context.WithCancel(ctx)
c := make(chan error, len(g.actors))
g.start(ctx, c)
return g.wait(c, cancel)
if err := g.validate(ctx); err != nil {
return err
}
g.errorsChan = make(chan actorError, len(g.actors))
g.start(ctx)
return g.stop(ctx, g.wait(ctx))
}
func (g *Group) validate() {
func (g *Group) validate(ctx context.Context) error {
if len(g.actors) == 0 {
panic("no actor")
err := errors.New("no actor")
log.Error(ctx, err.Error())
return err
}
for _, actor := range g.actors {
if actor.goFunc == nil {
panic(actor.name + " has nil goFunc")
}
}
}
func (g *Group) start(ctx context.Context, c chan error) {
for _, actor := range g.actors {
go func(actor *Actor) {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("%v", v)
}
c <- err
}()
err = actor.goFunc(ctx)
}(actor)
}
}
func (g *Group) wait(c chan error, cancel context.CancelFunc) error {
err := <-c
cancel()
ctx := context.Background()
if g.cfg.stopTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
defer cancel()
}
for i := len(g.actors) - 1; i >= 0; i-- {
if g.actors[i].stopFunc != nil {
if g.cfg.concurrentStop {
go g.actors[i].stopFunc(ctx)
} else {
g.actors[i].stopFunc(ctx)
}
}
}
for i := 1; i < len(g.actors); i++ {
select {
case <-c:
case <-ctx.Done():
for i, actor := range g.actors {
if actor.startFunc == nil {
err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name)
log.Error(ctx, err.Error())
return err
}
}
return nil
}
// start 启动所有的 [Actor] 后返回.
func (g *Group) start(ctx context.Context) {
for _, actor := range g.actors {
go actor.start()
log.Tracef(ctx, "group is starting group-actor %s", actor.name)
}
}
// wait 等待 ctx 或 [Actor] 返回
func (g *Group) wait(ctx context.Context) error {
log.Tracef(ctx, "group is waiting")
select {
case ae := <-g.errorsChan:
log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name)
if ae.err != nil {
return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err)
}
return nil
case <-ctx.Done():
log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error())
return ctx.Err()
}
}
// stop 等待 [Actor] 停止
func (g *Group) stop(ctx context.Context, err error) error {
stopCtx := context.Background()
if g.cfg.stopTimeout >= 0 {
log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout)
var cancel func()
stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout)
defer cancel()
}
go func() {
// 终止异步 [Actor]
for _, actor := range g.asyncActors {
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
}
// 终止同步 [Actor] 并等待 [Actor] 退出
for i := len(g.syncActors) - 1; i >= 0; i-- {
actor := g.syncActors[i]
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
actor.wait()
}
}()
// 等待所有 [Actor] 退出, 超时立即退出
for _, actor := range g.actors {
select {
case <-actor.closeAfterStop:
log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name)
case <-actor.closeAfterTimeout:
log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name)
case <-stopCtx.Done():
if err == nil {
err = stopCtx.Err()
} else {
err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err)
}
log.Error(ctx, err.Error())
return err
}
}
log.Tracef(ctx, "group return: %v", err)
return err
}
type config struct {
concurrentStop bool
stopTimeout time.Duration
stopTimeout time.Duration // <0表示没有超时时间
}
// Option 修改 [Group] 的配置;
@@ -166,14 +211,8 @@ func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}
// WithConcurrentStop 并发执行 [Actor] 的终止函数;
func WithConcurrentStop() Option {
return optionFunc(func(cfg *config) {
cfg.concurrentStop = true
})
}
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间.
// <0表示没有超时时间.
func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.stopTimeout = d

View File

@@ -9,7 +9,7 @@ import (
func Example_defaultGroup() {
g := exegroup.Default()
g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
g.NewSync().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
})

View File

@@ -6,6 +6,8 @@ import (
"os"
"os/signal"
"syscall"
"git.blauwelle.com/go/crate/log"
)
// HandleSignal 提供信号处理函数;
@@ -16,10 +18,13 @@ func HandleSignal(signals ...os.Signal) func(ctx context.Context) error {
}
signal.Notify(c, signals...)
return func(ctx context.Context) error {
log.Tracef(ctx, "notify signal %v", signals)
defer signal.Stop(c)
select {
case sig := <-c:
return fmt.Errorf("signal %s", sig)
err := fmt.Errorf("signal received: %s", sig)
log.Trace(ctx, err.Error())
return err
case <-ctx.Done():
return ctx.Err()
}

View File

@@ -11,4 +11,5 @@ const (
CodeUnexpect Code = "unexpect"
CodeUnexpectPanic Code = "unexpect.panic"
CodeUnknown Code = "unknown"
CodeUnauthorized Code = "unauthorized"
)

View File

@@ -1,16 +1,24 @@
package httpdata
import (
"errors"
"git.blauwelle.com/go/crate/runtimehelper"
)
type Response struct {
Data any `json:"data,omitempty"`
Code Code `json:"code"`
Message string `json:"message,omitempty"`
Debug string `json:"debug,omitempty"`
Traceback []runtimehelper.Frame `json:"traceback,omitempty"`
}
type PageData struct {
List []any `json:"list"`
PageIndex int `json:"pageIndex"` // >=1
PageSize int `json:"pageSize"` // >=1
Total int `json:"total"` // maybe 0
List any `json:"list"`
PageIndex int64 `json:"pageIndex"` // >=1
PageSize int64 `json:"pageSize"` // >=1
Total int64 `json:"total"` // maybe 0
}
func NewOkResponse(data any) Response {
@@ -20,3 +28,24 @@ func NewOkResponse(data any) Response {
Data: data,
}
}
func NewErrorResponse(err error) Response {
ue := UniverseError{}
if !errors.As(err, &ue) {
response := Response{
Code: CodeUnknown,
Message: "未知错误",
}
if FlagDebug {
response.Debug = err.Error()
response.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return response
}
return Response{
Code: ue.Code,
Message: ue.Message,
Debug: ue.Debug,
Traceback: ue.Traceback,
}
}

View File

@@ -2,11 +2,24 @@ package httpdata
import (
"errors"
"git.blauwelle.com/go/crate/runtimehelper"
)
const maximumFrames = 8
type UniverseError struct {
Code Code
Message string
Debug string
Traceback []runtimehelper.Frame
}
func (err UniverseError) Error() string {
if err.Message == "" {
return string(err.Code)
}
return string(err.Code) + ": " + err.Message
}
func NewUniverseError(code Code, message string) error {
@@ -23,13 +36,6 @@ func NewBadRequestError(message string) error {
}
}
func (err UniverseError) Error() string {
if err.Message == "" {
return string(err.Code)
}
return string(err.Code) + ": " + err.Message
}
func IsUniverseError(err error) bool {
return errors.As(err, &UniverseError{})
}

7
httpdata/flag.go Normal file
View File

@@ -0,0 +1,7 @@
package httpdata
import (
"os"
)
var FlagDebug = os.Getenv("DEBUG") == "true"

View File

@@ -1,3 +1,5 @@
module git.blauwelle.com/go/crate/httpdata
go 1.21.1
go 1.21
require git.blauwelle.com/go/crate/runtimehelper v0.2.0

2
httpdata/go.sum Normal file
View File

@@ -0,0 +1,2 @@
git.blauwelle.com/go/crate/runtimehelper v0.2.0 h1:W19wipPCyFSGHOWzqtfouNJu7MDeJobP+iRM4bPiJpM=
git.blauwelle.com/go/crate/runtimehelper v0.2.0/go.mod h1:yVMA0GkO9AS7iuPmalHKeWyv9en0JWj25rY1vpTuHhk=

View File

@@ -2,6 +2,7 @@ package uptracehelper
import (
"context"
"time"
"github.com/uptrace/uptrace-go/uptrace"
)
@@ -10,6 +11,7 @@ type Config struct {
ServiceName string
ServiceVersion string
DeploymentEnvironment string
ShutdownTimeout time.Duration
}
func Setup(cfg Config) {
@@ -23,19 +25,21 @@ func Setup(cfg Config) {
uptrace.ConfigureOpentelemetry(opts...)
}
// GoStop 初始化并等待 uptrace
// Bootstrap 初始化并等待 uptrace
// 配合 [git.blauwelle.com/go/crate/exegroup] 使用
// env
// - UPTRACE_DISABLED 存在就不再初始化
// - UPTRACE_DSN 服务端地址
func GoStop(cfg Config) (func(ctx context.Context) error, func(ctx context.Context)) {
func Bootstrap(cfg Config) func(ctx context.Context) error {
Setup(cfg)
shutdownErr := make(chan error, 1)
goFunc := func(context.Context) error {
return <-shutdownErr
return func(ctx context.Context) error {
<-ctx.Done()
shutdownCtx := context.Background()
if cfg.ShutdownTimeout >= 0 {
var cancel func()
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.ShutdownTimeout)
defer cancel()
}
stopFunc := func(ctx context.Context) {
shutdownErr <- uptrace.Shutdown(ctx)
return uptrace.Shutdown(shutdownCtx)
}
return goFunc, stopFunc
}