Compare commits

..

8 Commits

30 changed files with 654 additions and 220 deletions

View File

@@ -3,7 +3,7 @@ module git.blauwelle.com/go/crate/bunrouterotel
go 1.21.1 go 1.21.1
require ( require (
github.com/uptrace/bunrouter v1.0.20 github.com/uptrace/bunrouter v1.0.21
go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/trace v1.21.0 go.opentelemetry.io/otel/trace v1.21.0
) )

View File

@@ -1,32 +1,22 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/uptrace/bunrouter v1.0.20 h1:jNvYNcJxF+lSYBQAaQjnE6I11Zs0m+3M5Ek7fq/Tp4c= github.com/uptrace/bunrouter v1.0.21 h1:HXarvX+N834sXyHpl+I/TuE11m19kLW/qG5u3YpHUag=
github.com/uptrace/bunrouter v1.0.20/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI= github.com/uptrace/bunrouter v1.0.21/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -2,6 +2,7 @@ package bunrouterotel
import ( import (
"net/http" "net/http"
"strings"
"github.com/uptrace/bunrouter" "github.com/uptrace/bunrouter"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@@ -13,9 +14,11 @@ import (
const ( const (
tracerName = "git.blauwelle.com/go/crate/bunrouterotel" tracerName = "git.blauwelle.com/go/crate/bunrouterotel"
version = "0.2.0" version = "0.5.0"
) )
var tracer = otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
type config struct { type config struct {
propagators propagation.TextMapPropagator propagators propagation.TextMapPropagator
} }
@@ -36,6 +39,15 @@ func WithPropagators(propagators propagation.TextMapPropagator) Option {
}) })
} }
func getForwardedFor(r *http.Request) []string {
h := r.Header.Get("X-Forwarded-For")
a := strings.Split(h, ",")
for i, s := range a {
a[i] = strings.Trim(s, " \t")
}
return a
}
// Middleware create a span, which record the request, // Middleware create a span, which record the request,
// HTTP status code is NOT recorded. // HTTP status code is NOT recorded.
func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc { func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
@@ -46,7 +58,6 @@ func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
if cfg.propagators == nil { if cfg.propagators == nil {
cfg.propagators = otel.GetTextMapPropagator() cfg.propagators = otel.GetTextMapPropagator()
} }
tracer := otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
return func(next bunrouter.HandlerFunc) bunrouter.HandlerFunc { return func(next bunrouter.HandlerFunc) bunrouter.HandlerFunc {
return func(w http.ResponseWriter, req bunrouter.Request) error { return func(w http.ResponseWriter, req bunrouter.Request) error {
ctx := cfg.propagators.Extract(req.Context(), propagation.HeaderCarrier(req.Header)) ctx := cfg.propagators.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
@@ -75,6 +86,10 @@ func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
attrs = append(attrs, attribute.String("http.route.param."+param.Key, param.Value)) attrs = append(attrs, attribute.String("http.route.param."+param.Key, param.Value))
} }
span.SetAttributes(attrs...) span.SetAttributes(attrs...)
forwardedFor := getForwardedFor(req.Request)
if forwardedFor != nil {
span.SetAttributes(attribute.StringSlice("http.forward_route", forwardedFor))
}
defer span.End() defer span.End()
return next(w, req) return next(w, req)
} }

9
exegroup/README.md Normal file
View File

@@ -0,0 +1,9 @@
# exegroup 按照顺序启动和终止任务
参考[group.go](./group.go)中的使用说明;
### `Group` / `Actor` / `Actor.startFunc` 超时时间的约定
- `>= 0` 的超时时间被用来初始化定时器, 到时间时触发超时;
- `< 0` 的超时时间表示永不超时;
- 所有超时时间初始化为负数(-1);

View File

@@ -1,14 +1,93 @@
package exegroup package exegroup
import "context" import (
"context"
"fmt"
"time"
"git.blauwelle.com/go/crate/log"
)
// Actor 是 [Group] 调度的执行单元; // Actor 是 [Group] 调度的执行单元;
// // startCtx, startCancel 需要在构造 Actor 时初始化, 在后续 start / wait 初始化会导致 data race.
// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出;
type Actor struct { type Actor struct {
goFunc func(ctx context.Context) error startFunc func(ctx context.Context) error
stopFunc func(ctx context.Context) group *Group
name string closeAfterStop chan struct{}
closeAfterTimeout chan struct{}
startCtx context.Context //nolint:containedctx
startCancel func()
name string
stopTimeout time.Duration // <0表示没有超时时间
isFastStop bool
}
func (actor *Actor) wrapFastStop() {
if !actor.isFastStop {
return
}
startFunc := actor.startFunc
actor.startFunc = func(ctx context.Context) error {
errChan := make(chan error, 1)
go func() {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("recover: %s", v)
}
errChan <- err
}()
err = startFunc(ctx)
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}
}
// start 同步启动 [Actor], [Actor].startFunc panic 或返回的 error 会被写入 [Actor].errorChan.
func (actor *Actor) start() {
actor.wrapFastStop()
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("recover: %s", v)
}
actor.group.errorsChan <- actorError{
actor: actor,
err: err,
}
log.Tracef(context.Background(), "group-actor %s return", actor.name)
close(actor.closeAfterStop)
}()
err = actor.startFunc(actor.startCtx)
}
// sendStopSignal 通知 [Actor] 退出.
func (actor *Actor) sendStopSignal() {
actor.startCancel()
if actor.stopTimeout >= 0 {
go func() {
timer := time.NewTimer(actor.stopTimeout)
select {
case <-actor.closeAfterStop:
timer.Stop()
case <-timer.C:
close(actor.closeAfterTimeout)
}
}()
}
}
// wait 等待 [Actor] 停止或超时.
func (actor *Actor) wait() {
select {
case <-actor.closeAfterTimeout:
case <-actor.closeAfterStop:
}
} }
// WithName 指定 [Actor] 的 name; // WithName 指定 [Actor] 的 name;
@@ -17,21 +96,21 @@ func (actor *Actor) WithName(name string) *Actor {
return actor return actor
} }
// WithGo 指定 [Actor] 的 goFunc; // WithStartFunc 指定 [Actor] 的 startFunc;
// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出; // 通过 WithStartFunc 注册的 startFunc 函数应该受 ctx 控制退出;
func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor { func (actor *Actor) WithStartFunc(startFunc func(ctx context.Context) error) *Actor {
actor.goFunc = goFunc actor.startFunc = startFunc
return actor return actor
} }
// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc; // WithFastStop 设置 [Actor] 接收停止信号后立即终止.
// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出; func (actor *Actor) WithFastStop(fastStop bool) *Actor {
// 1. goFunc 被 [Group] 在 goroutine 中启动; actor.isFastStop = fastStop
// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用; return actor
// }
// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出;
func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor { // WithStopTimeout 指定停止 [Actor] 的最长等待时间, 设置 <0 没有最长等待时间.
actor.goFunc = goFunc func (actor *Actor) WithStopTimeout(duration time.Duration) *Actor {
actor.stopFunc = stopFunc actor.stopTimeout = duration
return actor return actor
} }

View File

@@ -2,11 +2,13 @@ package eghttp
import ( import (
"context" "context"
"errors"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"git.blauwelle.com/go/crate/log"
) )
const ( const (
@@ -30,6 +32,7 @@ func WithHandler(handler http.Handler) Option {
}) })
} }
// WithServer 指定要启动的服务器. 注意会替换原有的服务器, 导致早前配置服务器相关的 [Option] 失效.
func WithServer(server *http.Server) Option { func WithServer(server *http.Server) Option {
return optionFunc(func(cfg *config) { return optionFunc(func(cfg *config) {
cfg.server = server cfg.server = server
@@ -49,9 +52,17 @@ func WithServerOption(fn func(server *http.Server)) Option {
}) })
} }
// WithShutdownTimeout 设置关闭服务的超时时间. <0 没有最长等待时间.
func WithShutdownTimeout(timeout time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.shutdownTimeout = timeout
})
}
type config struct { type config struct {
server *http.Server server *http.Server
startFn func(server *http.Server) error startFn func(server *http.Server) error
shutdownTimeout time.Duration
} }
func newDefaultConfig() *config { func newDefaultConfig() *config {
@@ -63,6 +74,7 @@ func newDefaultConfig() *config {
startFn: func(server *http.Server) error { startFn: func(server *http.Server) error {
return server.ListenAndServe() return server.ListenAndServe()
}, },
shutdownTimeout: -1,
} }
} }
@@ -72,24 +84,31 @@ func (fn optionFunc) apply(cfg *config) {
fn(cfg) fn(cfg)
} }
// HTTPListenAndServe 创建 [http.Server] 并提供启动和停止函数; // ListenAndServe 构造1个函数, 启动 [http.Server] 并受 ctx 控制终止.
func HTTPListenAndServe(opts ...Option) (func(ctx context.Context) error, func(ctx context.Context)) { func ListenAndServe(opts ...Option) func(ctx context.Context) error {
cfg := newDefaultConfig() cfg := newDefaultConfig()
for _, opt := range opts { for _, opt := range opts {
opt.apply(cfg) opt.apply(cfg)
} }
inShutdown := &atomic.Bool{} errChan := make(chan error, 2) //nolint:gomnd
c := make(chan error, 1) return func(ctx context.Context) error {
goFunc := func(_ context.Context) error { go func() {
err := cfg.startFn(cfg.server) err := cfg.startFn(cfg.server)
if inShutdown.Load() { log.Tracef(ctx, "server return with %s", err)
err = <-c if !errors.Is(err, http.ErrServerClosed) {
} errChan <- err
return err }
}()
go func() {
<-ctx.Done()
shutdownCtx := context.Background()
if cfg.shutdownTimeout >= 0 {
var cancel func()
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.shutdownTimeout)
defer cancel()
}
errChan <- cfg.server.Shutdown(shutdownCtx)
}()
return <-errChan
} }
stopFunc := func(ctx context.Context) {
inShutdown.Store(true)
c <- cfg.server.Shutdown(ctx)
}
return goFunc, stopFunc
} }

View File

@@ -1,3 +1,5 @@
module git.blauwelle.com/go/crate/exegroup module git.blauwelle.com/go/crate/exegroup
go 1.20 go 1.20
require git.blauwelle.com/go/crate/log v0.15.0

4
exegroup/go.sum Normal file
View File

@@ -0,0 +1,4 @@
git.blauwelle.com/go/crate/log v0.14.0 h1:y7hJXP+ZPY/wD+wlEzKgakpki8/l0LwZWqxtJ92Wy58=
git.blauwelle.com/go/crate/log v0.14.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80=
git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=

View File

@@ -1,6 +1,6 @@
// Package exegroup 管理1组执行单元的启动和终止; // Package exegroup 按照顺序启动和终止任务;
// //
// 使用举例: // Examples:
// //
// 新建 [Group]: // 新建 [Group]:
// //
@@ -12,54 +12,48 @@
// //
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出: // 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
// //
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { // g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
// <-ctx.Done() // <-ctx.Done()
// return ctx.Err() // return ctx.Err()
// }) // })
// //
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
//
// server := &http.Server{Addr: ":80"}
// inShutdown := &atomic.Bool{}
// c := make(chan error, 1)
// goFunc := func(_ context.Context) error {
// err := server.ListenAndServe()
// if inShutdown.Load() {
// err = <-c
// }
// return err
// }
// stopFunc := func(ctx context.Context) {
// inShutdown.Store(true)
// c <- server.Shutdown(ctx)
// }
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
//
// 启动 [Group]: // 启动 [Group]:
// //
// g.Run() // g.Run(ctx)
package exegroup package exegroup
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
"git.blauwelle.com/go/crate/log"
) )
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行; // Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
// Group 的执行过程参考 [Group.Run]; // Group 的执行过程参考 [Group.Run];
// //
// Group 的配置包含: // Group 的配置:
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间; // - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
type Group struct { type Group struct {
actors []*Actor errorsChan chan actorError
cfg config actors []*Actor
syncActors []*Actor
asyncActors []*Actor
cfg config
}
type actorError struct {
actor *Actor
err error
} }
// New 创建 [Group]; // New 创建 [Group];
func New(opts ...Option) *Group { func New(opts ...Option) *Group {
cfg := config{} cfg := config{
stopTimeout: -1,
}
for _, opt := range opts { for _, opt := range opts {
opt.apply(&cfg) opt.apply(&cfg)
} }
@@ -71,88 +65,139 @@ func New(opts ...Option) *Group {
// Default 创建包含信号处理 [Actor] 的 [Group]; // Default 创建包含信号处理 [Actor] 的 [Group];
func Default(opts ...Option) *Group { func Default(opts ...Option) *Group {
g := New(opts...) g := New(opts...)
g.New().WithName("signal").WithGo(HandleSignal()) g.NewAsync().WithName("signal").WithStartFunc(HandleSignal())
return g return g
} }
// New 创建并添加 [Actor], func (g *Group) newActor() *Actor {
// 对 Actor 的配置通过链式调用完成; actor := &Actor{
func (g *Group) New() *Actor { group: g,
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1)) closeAfterStop: make(chan struct{}),
closeAfterTimeout: make(chan struct{}),
name: fmt.Sprintf("actor-%d", len(g.actors)),
stopTimeout: -1,
}
actor.startCtx, actor.startCancel = context.WithCancel(context.Background())
g.actors = append(g.actors, actor) g.actors = append(g.actors, actor)
return actor return actor
} }
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成; // NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewAsync() *Actor {
actor := g.newActor()
g.asyncActors = append(g.asyncActors, actor)
return actor
}
// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
func (g *Group) NewSync() *Actor {
actor := g.newActor()
g.syncActors = append(g.syncActors, actor)
return actor
}
// Run 启动所有 [Actor] 并等待 [Actor] 执行完成;
// 当没有 Actor 时执行 Run 会 panic; // 当没有 Actor 时执行 Run 会 panic;
// //
// Run 包含两个阶段: // Run 包含两个阶段:
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段; // 1. 启动所有 [Actor];
// 2. 执行 [Actor] 的终止函数并返回; // 2. 等待 ctx 或第1个Actor返回, 终止所有Actor.
func (g *Group) Run(ctx context.Context) error { func (g *Group) Run(ctx context.Context) error {
g.validate() if err := g.validate(ctx); err != nil {
ctx, cancel := context.WithCancel(ctx) return err
c := make(chan error, len(g.actors)) }
g.start(ctx, c) g.errorsChan = make(chan actorError, len(g.actors))
return g.wait(c, cancel) g.start(ctx)
return g.stop(ctx, g.wait(ctx))
} }
func (g *Group) validate() { func (g *Group) validate(ctx context.Context) error {
if len(g.actors) == 0 { if len(g.actors) == 0 {
panic("no actor") err := errors.New("no actor")
log.Error(ctx, err.Error())
return err
} }
for _, actor := range g.actors { for i, actor := range g.actors {
if actor.goFunc == nil { if actor.startFunc == nil {
panic(actor.name + " has nil goFunc") err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name)
} log.Error(ctx, err.Error())
}
}
func (g *Group) start(ctx context.Context, c chan error) {
for _, actor := range g.actors {
go func(actor *Actor) {
var err error
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("%v", v)
}
c <- err
}()
err = actor.goFunc(ctx)
}(actor)
}
}
func (g *Group) wait(c chan error, cancel context.CancelFunc) error {
err := <-c
cancel()
ctx := context.Background()
if g.cfg.stopTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
defer cancel()
}
for i := len(g.actors) - 1; i >= 0; i-- {
if g.actors[i].stopFunc != nil {
if g.cfg.concurrentStop {
go g.actors[i].stopFunc(ctx)
} else {
g.actors[i].stopFunc(ctx)
}
}
}
for i := 1; i < len(g.actors); i++ {
select {
case <-c:
case <-ctx.Done():
return err return err
} }
} }
return nil
}
// start 启动所有的 [Actor] 后返回.
func (g *Group) start(ctx context.Context) {
for _, actor := range g.actors {
go actor.start()
log.Tracef(ctx, "group is starting group-actor %s", actor.name)
}
}
// wait 等待 ctx 或 [Actor] 返回
func (g *Group) wait(ctx context.Context) error {
log.Tracef(ctx, "group is waiting")
select {
case ae := <-g.errorsChan:
log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name)
if ae.err != nil {
return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err)
}
return nil
case <-ctx.Done():
log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error())
return ctx.Err()
}
}
// stop 等待 [Actor] 停止
func (g *Group) stop(ctx context.Context, err error) error {
stopCtx := context.Background()
if g.cfg.stopTimeout >= 0 {
log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout)
var cancel func()
stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout)
defer cancel()
}
go func() {
// 终止异步 [Actor]
for _, actor := range g.asyncActors {
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
}
// 终止同步 [Actor] 并等待 [Actor] 退出
for i := len(g.syncActors) - 1; i >= 0; i-- {
actor := g.syncActors[i]
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
actor.sendStopSignal()
actor.wait()
}
}()
// 等待所有 [Actor] 退出, 超时立即退出
for _, actor := range g.actors {
select {
case <-actor.closeAfterStop:
log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name)
case <-actor.closeAfterTimeout:
log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name)
case <-stopCtx.Done():
if err == nil {
err = stopCtx.Err()
} else {
err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err)
}
log.Error(ctx, err.Error())
return err
}
}
log.Tracef(ctx, "group return: %v", err)
return err return err
} }
type config struct { type config struct {
concurrentStop bool stopTimeout time.Duration // <0表示没有超时时间
stopTimeout time.Duration
} }
// Option 修改 [Group] 的配置; // Option 修改 [Group] 的配置;
@@ -166,14 +211,8 @@ func (fn optionFunc) apply(cfg *config) {
fn(cfg) fn(cfg)
} }
// WithConcurrentStop 并发执行 [Actor] 的终止函数; // WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间.
func WithConcurrentStop() Option { // <0表示没有超时时间.
return optionFunc(func(cfg *config) {
cfg.concurrentStop = true
})
}
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
func WithStopTimeout(d time.Duration) Option { func WithStopTimeout(d time.Duration) Option {
return optionFunc(func(cfg *config) { return optionFunc(func(cfg *config) {
cfg.stopTimeout = d cfg.stopTimeout = d

View File

@@ -9,7 +9,7 @@ import (
func Example_defaultGroup() { func Example_defaultGroup() {
g := exegroup.Default() g := exegroup.Default()
g.New().WithName("do nothing").WithGo(func(ctx context.Context) error { g.NewSync().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
<-ctx.Done() <-ctx.Done()
return ctx.Err() return ctx.Err()
}) })

View File

@@ -6,6 +6,8 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"git.blauwelle.com/go/crate/log"
) )
// HandleSignal 提供信号处理函数; // HandleSignal 提供信号处理函数;
@@ -16,10 +18,13 @@ func HandleSignal(signals ...os.Signal) func(ctx context.Context) error {
} }
signal.Notify(c, signals...) signal.Notify(c, signals...)
return func(ctx context.Context) error { return func(ctx context.Context) error {
log.Tracef(ctx, "notify signal %v", signals)
defer signal.Stop(c) defer signal.Stop(c)
select { select {
case sig := <-c: case sig := <-c:
return fmt.Errorf("signal %s", sig) err := fmt.Errorf("signal received: %s", sig)
log.Trace(ctx, err.Error())
return err
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }

View File

@@ -11,4 +11,5 @@ const (
CodeUnexpect Code = "unexpect" CodeUnexpect Code = "unexpect"
CodeUnexpectPanic Code = "unexpect.panic" CodeUnexpectPanic Code = "unexpect.panic"
CodeUnknown Code = "unknown" CodeUnknown Code = "unknown"
CodeUnauthorized Code = "unauthorized"
) )

View File

@@ -1,16 +1,24 @@
package httpdata package httpdata
import (
"errors"
"git.blauwelle.com/go/crate/runtimehelper"
)
type Response struct { type Response struct {
Data any `json:"data,omitempty"` Data any `json:"data,omitempty"`
Code Code `json:"code"` Code Code `json:"code"`
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
Debug string `json:"debug,omitempty"`
Traceback []runtimehelper.Frame `json:"traceback,omitempty"`
} }
type PageData struct { type PageData struct {
List []any `json:"list"` List any `json:"list"`
PageIndex int `json:"pageIndex"` // >=1 PageIndex int64 `json:"pageIndex"` // >=1
PageSize int `json:"pageSize"` // >=1 PageSize int64 `json:"pageSize"` // >=1
Total int `json:"total"` // maybe 0 Total int64 `json:"total"` // maybe 0
} }
func NewOkResponse(data any) Response { func NewOkResponse(data any) Response {
@@ -20,3 +28,24 @@ func NewOkResponse(data any) Response {
Data: data, Data: data,
} }
} }
func NewErrorResponse(err error) Response {
ue := UniverseError{}
if !errors.As(err, &ue) {
response := Response{
Code: CodeUnknown,
Message: "未知错误",
}
if FlagDebug {
response.Debug = err.Error()
response.Traceback = runtimehelper.Stack(1, maximumFrames)
}
return response
}
return Response{
Code: ue.Code,
Message: ue.Message,
Debug: ue.Debug,
Traceback: ue.Traceback,
}
}

View File

@@ -2,11 +2,24 @@ package httpdata
import ( import (
"errors" "errors"
"git.blauwelle.com/go/crate/runtimehelper"
) )
const maximumFrames = 8
type UniverseError struct { type UniverseError struct {
Code Code Code Code
Message string Message string
Debug string
Traceback []runtimehelper.Frame
}
func (err UniverseError) Error() string {
if err.Message == "" {
return string(err.Code)
}
return string(err.Code) + ": " + err.Message
} }
func NewUniverseError(code Code, message string) error { func NewUniverseError(code Code, message string) error {
@@ -23,13 +36,6 @@ func NewBadRequestError(message string) error {
} }
} }
func (err UniverseError) Error() string {
if err.Message == "" {
return string(err.Code)
}
return string(err.Code) + ": " + err.Message
}
func IsUniverseError(err error) bool { func IsUniverseError(err error) bool {
return errors.As(err, &UniverseError{}) return errors.As(err, &UniverseError{})
} }

7
httpdata/flag.go Normal file
View File

@@ -0,0 +1,7 @@
package httpdata
import (
"os"
)
var FlagDebug = os.Getenv("DEBUG") == "true"

View File

@@ -1,3 +1,5 @@
module git.blauwelle.com/go/crate/httpdata module git.blauwelle.com/go/crate/httpdata
go 1.21.1 go 1.21
require git.blauwelle.com/go/crate/runtimehelper v0.2.0

2
httpdata/go.sum Normal file
View File

@@ -0,0 +1,2 @@
git.blauwelle.com/go/crate/runtimehelper v0.2.0 h1:W19wipPCyFSGHOWzqtfouNJu7MDeJobP+iRM4bPiJpM=
git.blauwelle.com/go/crate/runtimehelper v0.2.0/go.mod h1:yVMA0GkO9AS7iuPmalHKeWyv9en0JWj25rY1vpTuHhk=

View File

@@ -21,11 +21,12 @@ func main() {
} }
``` ```
`log` 模块包含日志处理的代码, 由 3 个包组成: `log` 模块包含日志处理的代码, 由 4 个包组成:
1. [logsdk](./logsdk): 日志实现; 1. [logsdk](./logsdk): 日志实现;
2. [logjson](./logsdk/logjson): 控制台 JSON 日志处理器(`Processor`); 2. [logjson](./logsdk/logjson): 控制台 JSON 日志处理器(`Processor`);
3. [log](.): 根目录, 提供全局 `Logger`, 把 `Logger` / `Entry` 相关的方法封装成函数. 3. [logtext](./logsdk/logtext): 控制台文本日志处理器(`Processor`);
4. [log](.): 根目录, 提供全局 `Logger`, 把 `Logger` / `Entry` 相关的方法封装成函数.
## 基本概念 ## 基本概念
@@ -88,7 +89,7 @@ log.Logger()
关闭强制生成调用栈, `log.Logger().SetReportStackLevel(logsdk.LevelDisabled)`; 关闭强制生成调用栈, `log.Logger().SetReportStackLevel(logsdk.LevelDisabled)`;
记录 panic, 在 `defer v = recover()` 后执行 `log.Z(context.Background(), v)`, 其中 `Z` 需要是附加调用栈的等级; 记录 panic, 在 `defer v = recover()` 后执行 `log.Z(context.Background(), v)`, 其中 `Z` 需要是附加调用栈的等级(例如: `Error`);
mock, 实现 mock 日志处理器对生成的日志进行处理; mock, 实现 mock 日志处理器对生成的日志进行处理;
@@ -96,5 +97,6 @@ mock, 实现 mock 日志处理器对生成的日志进行处理;
## 日志处理器 ## 日志处理器
- [logsdk/logjson](logsdk/logjson) 控制台 JSON 日志 - [logsdk/logjson](logsdk/logjson) 控制台 JSON 日志.
- [go get git.blauwelle.com/go/crate/logotel](../logotel) OpenTelemetry 日志 - [logtext](./logsdk/logtext): 控制台文本日志.
- [go get git.blauwelle.com/go/crate/logotel](../logotel) OpenTelemetry 日志.

View File

@@ -2,7 +2,6 @@ package logjson
import ( import (
"io" "io"
"os"
"time" "time"
) )
@@ -60,10 +59,10 @@ func newConfig(opts ...Option) *config {
opt.apply(cfg) opt.apply(cfg)
} }
if !cfg.hasPool { if !cfg.hasPool {
cfg.bytesBufferPool = NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize) cfg.bytesBufferPool = DefaultBytesBufferPool
} }
if cfg.output == nil { if cfg.output == nil {
cfg.output = NewSyncWriter(os.Stderr) cfg.output = DefaultStderrSyncWriter
} }
if cfg.timestampFormat == "" { if cfg.timestampFormat == "" {
cfg.timestampFormat = time.RFC3339Nano cfg.timestampFormat = time.RFC3339Nano

View File

@@ -10,6 +10,8 @@ const (
bytesBufferMaximumSize = 4096 bytesBufferMaximumSize = 4096
) )
var DefaultBytesBufferPool = NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize)
type BytesBufferPool interface { type BytesBufferPool interface {
Get() *bytes.Buffer Get() *bytes.Buffer
Put(buffer *bytes.Buffer) Put(buffer *bytes.Buffer)

View File

@@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"os"
"git.blauwelle.com/go/crate/log/logsdk" "git.blauwelle.com/go/crate/log/logsdk"
) )
@@ -69,11 +68,11 @@ func (processor *Processor) Process(_ context.Context, entry logsdk.ReadonlyEntr
// Encode 2次分配 // Encode 2次分配
if err := encoder.Encode(m); err != nil { if err := encoder.Encode(m); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "JSON processor cannot encode log %#v: %s\n", m, err.Error()) _, _ = fmt.Fprintf(processor.output, "JSON processor cannot encode log %#v: %s\n", m, err.Error())
} }
if _, err := buf.WriteTo(processor.output); err != nil { if _, err := buf.WriteTo(processor.output); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "JSON processor cannot write log: %s\n", err.Error()) _, _ = fmt.Fprintf(processor.output, "JSON processor cannot write log: %s\n", err.Error())
} }
} }

View File

@@ -2,9 +2,14 @@ package logjson
import ( import (
"io" "io"
"os"
"sync" "sync"
) )
var (
DefaultStderrSyncWriter = NewSyncWriter(os.Stderr)
)
// NewSyncWriter 返回写互斥的 io.Writer // NewSyncWriter 返回写互斥的 io.Writer
func NewSyncWriter(writer io.Writer) io.Writer { func NewSyncWriter(writer io.Writer) io.Writer {
return &syncWriter{ return &syncWriter{

View File

@@ -0,0 +1,69 @@
package logtext
import (
"io"
"time"
"git.blauwelle.com/go/crate/log/logsdk/logjson"
)
type Option interface {
apply(cfg *config)
}
func WithBufferPool(pool logjson.BytesBufferPool) Option {
return optionFunc(func(cfg *config) {
cfg.bytesBufferPool = pool
})
}
func WithOutput(w io.Writer) Option {
return optionFunc(func(cfg *config) {
cfg.output = w
})
}
func WithTimeFormat(format string) Option {
return optionFunc(func(cfg *config) {
cfg.timeFormat = format
cfg.timePadding = len(format)
})
}
func WithDisableTime(disable bool) Option {
return optionFunc(func(cfg *config) {
cfg.disableTime = disable
})
}
type config struct {
bytesBufferPool logjson.BytesBufferPool
output io.Writer
timeFormat string
timePadding int
disableTime bool
}
func newConfig(opts ...Option) *config {
cfg := new(config)
for _, opt := range opts {
opt.apply(cfg)
}
if cfg.bytesBufferPool == nil {
cfg.bytesBufferPool = logjson.DefaultBytesBufferPool
}
if cfg.output == nil {
cfg.output = logjson.DefaultStderrSyncWriter
}
if cfg.timeFormat == "" {
cfg.timeFormat = time.RFC3339Nano
cfg.timePadding = len(cfg.timeFormat)
}
return cfg
}
type optionFunc func(cfg *config)
func (fn optionFunc) apply(cfg *config) {
fn(cfg)
}

View File

@@ -0,0 +1,146 @@
package logtext
import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"git.blauwelle.com/go/crate/log/logsdk"
"git.blauwelle.com/go/crate/log/logsdk/logjson"
)
var _ logsdk.EntryProcessor = &Processor{}
func New(opts ...Option) *Processor {
cfg := newConfig(opts...)
return &Processor{
bytesBufferPool: cfg.bytesBufferPool,
output: cfg.output,
timeFormat: cfg.timeFormat,
timePadding: cfg.timePadding,
disableTime: cfg.disableTime,
}
}
type Processor struct {
bytesBufferPool logjson.BytesBufferPool
output io.Writer
timeFormat string
timePadding int
disableTime bool
}
func (processor *Processor) Process(_ context.Context, entry logsdk.ReadonlyEntry) {
buf := processor.bytesBufferPool.Get()
buf.Reset()
defer processor.bytesBufferPool.Put(buf)
buf.Write(formatLevel(entry.Level))
if !processor.disableTime {
buf.WriteByte(' ')
timeValue := entry.Time.Format(processor.timeFormat)
buf.WriteString(timeValue)
writeSpace(buf, processor.timePadding-len(timeValue))
}
for _, field := range entry.Fields {
buf.WriteByte(' ')
buf.WriteString(field.Key)
buf.WriteByte('=')
writeValue(buf, field.Value)
}
if entry.Message != "" {
buf.WriteByte(' ')
buf.WriteString(entry.Message)
}
buf.WriteByte('\n')
if entry.Caller.IsValid() && len(entry.Stack) == 0 {
writeFrame(buf, entry.Caller)
}
if len(entry.Stack) > 0 {
for _, frame := range entry.Stack {
writeFrame(buf, frame)
}
}
if _, err := buf.WriteTo(processor.output); err != nil {
_, _ = fmt.Fprintf(processor.output, "TEXT processor cannot write log: %s\n", err.Error())
}
}
func writeSpace(buf *bytes.Buffer, n int) {
for i := 0; i < n; i++ {
buf.WriteByte(' ')
}
}
func writeFrame(buf *bytes.Buffer, frame logsdk.Frame) {
buf.WriteByte('|')
buf.WriteByte(' ')
buf.WriteString(frame.Function)
buf.WriteByte('\n')
buf.WriteByte('|')
buf.WriteByte(' ')
buf.WriteByte('\t')
buf.WriteString(frame.File)
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(frame.Line))
buf.WriteByte('\n')
}
func writeValue(buf *bytes.Buffer, value any) {
switch value := value.(type) {
case nil:
buf.WriteString("<nil>")
case string:
buf.WriteString(value)
case error:
buf.WriteString(value.Error())
case fmt.Stringer:
buf.WriteString(value.String())
default:
_, _ = fmt.Fprintf(buf, "%v", value)
}
}
func formatLevel(level logsdk.Level) []byte {
switch level {
case logsdk.LevelPanic:
return []byte(LevelPanicValue)
case logsdk.LevelFatal:
return []byte(LevelFatalValue)
case logsdk.LevelError:
return []byte(LevelErrorValue)
case logsdk.LevelWarn:
return []byte(LevelWarnValue)
case logsdk.LevelInfo:
return []byte(LevelInfoValue)
case logsdk.LevelDebug:
return []byte(LevelDebugValue)
case logsdk.LevelTrace:
return []byte(LevelTraceValue)
case logsdk.LevelDisabled:
fallthrough
default:
return []byte(levelUnknownValue)
}
}
const (
LevelPanicValue = "PNC"
LevelFatalValue = "FTL"
LevelErrorValue = "ERR"
LevelWarnValue = "WRN"
LevelInfoValue = "INF"
LevelDebugValue = "DBG"
LevelTraceValue = "TRC"
levelUnknownValue = "UNK"
)

View File

@@ -3,7 +3,7 @@ module git.blauwelle.com/go/crate/logotel
go 1.20 go 1.20
require ( require (
git.blauwelle.com/go/crate/log v0.14.0 git.blauwelle.com/go/crate/log v0.15.0
go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/trace v1.21.0 go.opentelemetry.io/otel/trace v1.21.0
) )

View File

@@ -1,28 +1,18 @@
git.blauwelle.com/go/crate/log v0.9.0 h1:H01AQIKcYybeCZGdReBzMoWhkXPQJAoY1t+K0J1asEk= git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80=
git.blauwelle.com/go/crate/log v0.9.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0= git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
git.blauwelle.com/go/crate/log v0.14.0 h1:y7hJXP+ZPY/wD+wlEzKgakpki8/l0LwZWqxtJ92Wy58=
git.blauwelle.com/go/crate/log v0.14.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y=
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY=
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -4,18 +4,13 @@ import (
"git.blauwelle.com/go/crate/log/logsdk/logjson" "git.blauwelle.com/go/crate/log/logsdk/logjson"
) )
const (
bytesBufferInitialSize = 512
bytesBufferMaximumSize = 4096
)
func newConfig(opts ...Option) *config { func newConfig(opts ...Option) *config {
cfg := defaultConfig() cfg := defaultConfig()
for _, opt := range opts { for _, opt := range opts {
opt.apply(cfg) opt.apply(cfg)
} }
if !cfg.hasPool { if !cfg.hasPool {
cfg.bytesBufferPool = logjson.NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize) cfg.bytesBufferPool = logjson.DefaultBytesBufferPool
} }
return cfg return cfg
} }

View File

@@ -16,6 +16,14 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
const (
fieldPrefix = "log.field."
)
var (
tracer = otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
)
// New 创建 log/opentelemetry 处理器 // New 创建 log/opentelemetry 处理器
func New(opts ...Option) *Processor { func New(opts ...Option) *Processor {
cfg := newConfig(opts...) cfg := newConfig(opts...)
@@ -41,7 +49,7 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
if name == "" { if name == "" {
name = "default" name = "default"
} }
ctx, span = otel.Tracer("git.blauwelle.com/go/crate/logotel").Start(ctx, name) //nolint:ineffassign,staticcheck,wastedassign ctx, span = tracer.Start(ctx, name) //nolint:ineffassign,staticcheck,wastedassign
defer span.End() defer span.End()
} }
} }
@@ -70,7 +78,7 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
buf.WriteByte('\n') buf.WriteByte('\n')
} }
processor.bufferPool.Put(buf) processor.bufferPool.Put(buf)
attrs = append(attrs, attribute.String("zz.stack", buf.String())) attrs = append(attrs, attribute.String("log.stack", buf.String()))
} }
for _, field := range entry.Fields { for _, field := range entry.Fields {
attrs = append(attrs, fieldToKV(field)) attrs = append(attrs, fieldToKV(field))
@@ -84,22 +92,22 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
func fieldToKV(field logsdk.KV) attribute.KeyValue { func fieldToKV(field logsdk.KV) attribute.KeyValue {
switch value := field.Value.(type) { switch value := field.Value.(type) {
case nil: case nil:
return attribute.String(field.Key, "<nil>") return attribute.String(fieldPrefix+field.Key, "<nil>")
case string: case string:
return attribute.String(field.Key, value) return attribute.String(fieldPrefix+field.Key, value)
case int: case int:
return attribute.Int(field.Key, value) return attribute.Int(fieldPrefix+field.Key, value)
case int64: case int64:
return attribute.Int64(field.Key, value) return attribute.Int64(fieldPrefix+field.Key, value)
case float64: case float64:
return attribute.Float64(field.Key, value) return attribute.Float64(fieldPrefix+field.Key, value)
case bool: case bool:
return attribute.Bool(field.Key, value) return attribute.Bool(fieldPrefix+field.Key, value)
case error: case error:
return attribute.String(field.Key, value.Error()) return attribute.String(fieldPrefix+field.Key, value.Error())
case fmt.Stringer: case fmt.Stringer:
return attribute.String(field.Key, value.String()) return attribute.String(fieldPrefix+field.Key, value.String())
} }
return attribute.String(field.Key, fmt.Sprint(field.Value)) return attribute.String(fieldPrefix+field.Key, fmt.Sprint(field.Value))
} }

6
logotel/version.go Normal file
View File

@@ -0,0 +1,6 @@
package logotel
const (
tracerName = "git.blauwelle.com/go/crate/logotel"
version = "0.15.0"
)

View File

@@ -2,6 +2,7 @@ package uptracehelper
import ( import (
"context" "context"
"time"
"github.com/uptrace/uptrace-go/uptrace" "github.com/uptrace/uptrace-go/uptrace"
) )
@@ -10,6 +11,7 @@ type Config struct {
ServiceName string ServiceName string
ServiceVersion string ServiceVersion string
DeploymentEnvironment string DeploymentEnvironment string
ShutdownTimeout time.Duration
} }
func Setup(cfg Config) { func Setup(cfg Config) {
@@ -23,19 +25,21 @@ func Setup(cfg Config) {
uptrace.ConfigureOpentelemetry(opts...) uptrace.ConfigureOpentelemetry(opts...)
} }
// GoStop 初始化并等待 uptrace // Bootstrap 初始化并等待 uptrace
// 配合 [git.blauwelle.com/go/crate/exegroup] 使用 // 配合 [git.blauwelle.com/go/crate/exegroup] 使用
// env // env
// - UPTRACE_DISABLED 存在就不再初始化 // - UPTRACE_DISABLED 存在就不再初始化
// - UPTRACE_DSN 服务端地址 // - UPTRACE_DSN 服务端地址
func GoStop(cfg Config) (func(ctx context.Context) error, func(ctx context.Context)) { func Bootstrap(cfg Config) func(ctx context.Context) error {
Setup(cfg) Setup(cfg)
shutdownErr := make(chan error, 1) return func(ctx context.Context) error {
goFunc := func(context.Context) error { <-ctx.Done()
return <-shutdownErr shutdownCtx := context.Background()
if cfg.ShutdownTimeout >= 0 {
var cancel func()
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.ShutdownTimeout)
defer cancel()
}
return uptrace.Shutdown(shutdownCtx)
} }
stopFunc := func(ctx context.Context) {
shutdownErr <- uptrace.Shutdown(ctx)
}
return goFunc, stopFunc
} }