Compare commits
10 Commits
bunroutero
...
httpdata/v
| Author | SHA1 | Date | |
|---|---|---|---|
| 8671f2826d | |||
| 5cf4430bc8 | |||
| a844e9e34c | |||
| 1100fbb1d4 | |||
| 8d24ba105d | |||
| bef1a8cdc0 | |||
| 5a5fe14ea5 | |||
| 7a6bbdc082 | |||
| b1698ca966 | |||
| 3f9d189806 |
@@ -3,13 +3,13 @@ module git.blauwelle.com/go/crate/bunrouterotel
|
||||
go 1.21.1
|
||||
|
||||
require (
|
||||
github.com/uptrace/bunrouter v1.0.20
|
||||
go.opentelemetry.io/otel v1.19.0
|
||||
go.opentelemetry.io/otel/trace v1.19.0
|
||||
github.com/uptrace/bunrouter v1.0.21
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/trace v1.21.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
)
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
|
||||
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/uptrace/bunrouter v1.0.20 h1:jNvYNcJxF+lSYBQAaQjnE6I11Zs0m+3M5Ek7fq/Tp4c=
|
||||
github.com/uptrace/bunrouter v1.0.20/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI=
|
||||
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
|
||||
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
|
||||
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
|
||||
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
|
||||
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
|
||||
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
|
||||
github.com/uptrace/bunrouter v1.0.21 h1:HXarvX+N834sXyHpl+I/TuE11m19kLW/qG5u3YpHUag=
|
||||
github.com/uptrace/bunrouter v1.0.21/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@@ -2,6 +2,7 @@ package bunrouterotel
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/uptrace/bunrouter"
|
||||
"go.opentelemetry.io/otel"
|
||||
@@ -13,9 +14,11 @@ import (
|
||||
|
||||
const (
|
||||
tracerName = "git.blauwelle.com/go/crate/bunrouterotel"
|
||||
version = "0.2.0"
|
||||
version = "0.5.0"
|
||||
)
|
||||
|
||||
var tracer = otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
|
||||
|
||||
type config struct {
|
||||
propagators propagation.TextMapPropagator
|
||||
}
|
||||
@@ -36,6 +39,15 @@ func WithPropagators(propagators propagation.TextMapPropagator) Option {
|
||||
})
|
||||
}
|
||||
|
||||
func getForwardedFor(r *http.Request) []string {
|
||||
h := r.Header.Get("X-Forwarded-For")
|
||||
a := strings.Split(h, ",")
|
||||
for i, s := range a {
|
||||
a[i] = strings.Trim(s, " \t")
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Middleware create a span, which record the request,
|
||||
// HTTP status code is NOT recorded.
|
||||
func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
|
||||
@@ -46,7 +58,6 @@ func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
|
||||
if cfg.propagators == nil {
|
||||
cfg.propagators = otel.GetTextMapPropagator()
|
||||
}
|
||||
tracer := otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
|
||||
return func(next bunrouter.HandlerFunc) bunrouter.HandlerFunc {
|
||||
return func(w http.ResponseWriter, req bunrouter.Request) error {
|
||||
ctx := cfg.propagators.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
|
||||
@@ -75,6 +86,10 @@ func Middleware(serverName string, opts ...Option) bunrouter.MiddlewareFunc {
|
||||
attrs = append(attrs, attribute.String("http.route.param."+param.Key, param.Value))
|
||||
}
|
||||
span.SetAttributes(attrs...)
|
||||
forwardedFor := getForwardedFor(req.Request)
|
||||
if forwardedFor != nil {
|
||||
span.SetAttributes(attribute.StringSlice("http.forward_route", forwardedFor))
|
||||
}
|
||||
defer span.End()
|
||||
return next(w, req)
|
||||
}
|
||||
|
||||
9
exegroup/README.md
Normal file
9
exegroup/README.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# exegroup 按照顺序启动和终止任务
|
||||
|
||||
参考[group.go](./group.go)中的使用说明;
|
||||
|
||||
### `Group` / `Actor` / `Actor.startFunc` 超时时间的约定
|
||||
|
||||
- `>= 0` 的超时时间被用来初始化定时器, 到时间时触发超时;
|
||||
- `< 0` 的超时时间表示永不超时;
|
||||
- 所有超时时间初始化为负数(-1);
|
||||
@@ -1,14 +1,93 @@
|
||||
package exegroup
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.blauwelle.com/go/crate/log"
|
||||
)
|
||||
|
||||
// Actor 是 [Group] 调度的执行单元;
|
||||
//
|
||||
// Actor.stopFunc 可以是 nil, 这时 Actor 需要受 goFunc 的 ctx 控制退出;
|
||||
// startCtx, startCancel 需要在构造 Actor 时初始化, 在后续 start / wait 初始化会导致 data race.
|
||||
type Actor struct {
|
||||
goFunc func(ctx context.Context) error
|
||||
stopFunc func(ctx context.Context)
|
||||
name string
|
||||
startFunc func(ctx context.Context) error
|
||||
group *Group
|
||||
closeAfterStop chan struct{}
|
||||
closeAfterTimeout chan struct{}
|
||||
startCtx context.Context //nolint:containedctx
|
||||
startCancel func()
|
||||
name string
|
||||
stopTimeout time.Duration // <0表示没有超时时间
|
||||
isFastStop bool
|
||||
}
|
||||
|
||||
func (actor *Actor) wrapFastStop() {
|
||||
if !actor.isFastStop {
|
||||
return
|
||||
}
|
||||
startFunc := actor.startFunc
|
||||
actor.startFunc = func(ctx context.Context) error {
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
var err error
|
||||
defer func() {
|
||||
if v := recover(); v != nil {
|
||||
err = fmt.Errorf("recover: %s", v)
|
||||
}
|
||||
errChan <- err
|
||||
}()
|
||||
err = startFunc(ctx)
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errChan:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// start 同步启动 [Actor], [Actor].startFunc panic 或返回的 error 会被写入 [Actor].errorChan.
|
||||
func (actor *Actor) start() {
|
||||
actor.wrapFastStop()
|
||||
var err error
|
||||
defer func() {
|
||||
if v := recover(); v != nil {
|
||||
err = fmt.Errorf("recover: %s", v)
|
||||
}
|
||||
actor.group.errorsChan <- actorError{
|
||||
actor: actor,
|
||||
err: err,
|
||||
}
|
||||
log.Tracef(context.Background(), "group-actor %s return", actor.name)
|
||||
close(actor.closeAfterStop)
|
||||
}()
|
||||
err = actor.startFunc(actor.startCtx)
|
||||
}
|
||||
|
||||
// sendStopSignal 通知 [Actor] 退出.
|
||||
func (actor *Actor) sendStopSignal() {
|
||||
actor.startCancel()
|
||||
if actor.stopTimeout >= 0 {
|
||||
go func() {
|
||||
timer := time.NewTimer(actor.stopTimeout)
|
||||
select {
|
||||
case <-actor.closeAfterStop:
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
close(actor.closeAfterTimeout)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// wait 等待 [Actor] 停止或超时.
|
||||
func (actor *Actor) wait() {
|
||||
select {
|
||||
case <-actor.closeAfterTimeout:
|
||||
case <-actor.closeAfterStop:
|
||||
}
|
||||
}
|
||||
|
||||
// WithName 指定 [Actor] 的 name;
|
||||
@@ -17,21 +96,21 @@ func (actor *Actor) WithName(name string) *Actor {
|
||||
return actor
|
||||
}
|
||||
|
||||
// WithGo 指定 [Actor] 的 goFunc;
|
||||
// 通过 WithGo 注册的 goFunc 函数应该受 ctx 控制退出;
|
||||
func (actor *Actor) WithGo(goFunc func(ctx context.Context) error) *Actor {
|
||||
actor.goFunc = goFunc
|
||||
// WithStartFunc 指定 [Actor] 的 startFunc;
|
||||
// 通过 WithStartFunc 注册的 startFunc 函数应该受 ctx 控制退出;
|
||||
func (actor *Actor) WithStartFunc(startFunc func(ctx context.Context) error) *Actor {
|
||||
actor.startFunc = startFunc
|
||||
return actor
|
||||
}
|
||||
|
||||
// WithGoStop 指定 [Actor] 的 goFunc 和 stopFunc;
|
||||
// goFunc 不受 ctx 控制退出, 而是在 stopFunc 调用后退出;
|
||||
// 1. goFunc 被 [Group] 在 goroutine 中启动;
|
||||
// 2. stopFunc 在被 [Group] 启动的任意 goFunc 返回后被调用;
|
||||
//
|
||||
// 使用 stopFunc 可以在 stopFunc 的 ctx 中指定等待期限, 让 goFunc 延迟到等待期限强制退出;
|
||||
func (actor *Actor) WithGoStop(goFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) *Actor {
|
||||
actor.goFunc = goFunc
|
||||
actor.stopFunc = stopFunc
|
||||
// WithFastStop 设置 [Actor] 接收停止信号后立即终止.
|
||||
func (actor *Actor) WithFastStop(fastStop bool) *Actor {
|
||||
actor.isFastStop = fastStop
|
||||
return actor
|
||||
}
|
||||
|
||||
// WithStopTimeout 指定停止 [Actor] 的最长等待时间, 设置 <0 没有最长等待时间.
|
||||
func (actor *Actor) WithStopTimeout(duration time.Duration) *Actor {
|
||||
actor.stopTimeout = duration
|
||||
return actor
|
||||
}
|
||||
|
||||
@@ -2,11 +2,13 @@ package eghttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.blauwelle.com/go/crate/log"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,6 +32,7 @@ func WithHandler(handler http.Handler) Option {
|
||||
})
|
||||
}
|
||||
|
||||
// WithServer 指定要启动的服务器. 注意会替换原有的服务器, 导致早前配置服务器相关的 [Option] 失效.
|
||||
func WithServer(server *http.Server) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.server = server
|
||||
@@ -49,9 +52,17 @@ func WithServerOption(fn func(server *http.Server)) Option {
|
||||
})
|
||||
}
|
||||
|
||||
// WithShutdownTimeout 设置关闭服务的超时时间. <0 没有最长等待时间.
|
||||
func WithShutdownTimeout(timeout time.Duration) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.shutdownTimeout = timeout
|
||||
})
|
||||
}
|
||||
|
||||
type config struct {
|
||||
server *http.Server
|
||||
startFn func(server *http.Server) error
|
||||
server *http.Server
|
||||
startFn func(server *http.Server) error
|
||||
shutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
func newDefaultConfig() *config {
|
||||
@@ -63,6 +74,7 @@ func newDefaultConfig() *config {
|
||||
startFn: func(server *http.Server) error {
|
||||
return server.ListenAndServe()
|
||||
},
|
||||
shutdownTimeout: -1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,24 +84,31 @@ func (fn optionFunc) apply(cfg *config) {
|
||||
fn(cfg)
|
||||
}
|
||||
|
||||
// HTTPListenAndServe 创建 [http.Server] 并提供启动和停止函数;
|
||||
func HTTPListenAndServe(opts ...Option) (func(ctx context.Context) error, func(ctx context.Context)) {
|
||||
// ListenAndServe 构造1个函数, 启动 [http.Server] 并受 ctx 控制终止.
|
||||
func ListenAndServe(opts ...Option) func(ctx context.Context) error {
|
||||
cfg := newDefaultConfig()
|
||||
for _, opt := range opts {
|
||||
opt.apply(cfg)
|
||||
}
|
||||
inShutdown := &atomic.Bool{}
|
||||
c := make(chan error, 1)
|
||||
goFunc := func(_ context.Context) error {
|
||||
err := cfg.startFn(cfg.server)
|
||||
if inShutdown.Load() {
|
||||
err = <-c
|
||||
}
|
||||
return err
|
||||
errChan := make(chan error, 2) //nolint:gomnd
|
||||
return func(ctx context.Context) error {
|
||||
go func() {
|
||||
err := cfg.startFn(cfg.server)
|
||||
log.Tracef(ctx, "server return with %s", err)
|
||||
if !errors.Is(err, http.ErrServerClosed) {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
shutdownCtx := context.Background()
|
||||
if cfg.shutdownTimeout >= 0 {
|
||||
var cancel func()
|
||||
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.shutdownTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
errChan <- cfg.server.Shutdown(shutdownCtx)
|
||||
}()
|
||||
return <-errChan
|
||||
}
|
||||
stopFunc := func(ctx context.Context) {
|
||||
inShutdown.Store(true)
|
||||
c <- cfg.server.Shutdown(ctx)
|
||||
}
|
||||
return goFunc, stopFunc
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
module git.blauwelle.com/go/crate/exegroup
|
||||
|
||||
go 1.20
|
||||
|
||||
require git.blauwelle.com/go/crate/log v0.15.0
|
||||
|
||||
4
exegroup/go.sum
Normal file
4
exegroup/go.sum
Normal file
@@ -0,0 +1,4 @@
|
||||
git.blauwelle.com/go/crate/log v0.14.0 h1:y7hJXP+ZPY/wD+wlEzKgakpki8/l0LwZWqxtJ92Wy58=
|
||||
git.blauwelle.com/go/crate/log v0.14.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
|
||||
git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80=
|
||||
git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
|
||||
@@ -1,6 +1,6 @@
|
||||
// Package exegroup 管理1组执行单元的启动和终止;
|
||||
// Package exegroup 按照顺序启动和终止任务;
|
||||
//
|
||||
// 使用举例:
|
||||
// Examples:
|
||||
//
|
||||
// 新建 [Group]:
|
||||
//
|
||||
@@ -12,54 +12,48 @@
|
||||
//
|
||||
// 新增1个 [Actor], 通过 ctx 控制 [Actor] 退出:
|
||||
//
|
||||
// g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
|
||||
// g.New().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
|
||||
// <-ctx.Done()
|
||||
// return ctx.Err()
|
||||
// })
|
||||
//
|
||||
// 新增1个 [Actor], 通过 stopFunc 控制 [Actor] 退出:
|
||||
//
|
||||
// server := &http.Server{Addr: ":80"}
|
||||
// inShutdown := &atomic.Bool{}
|
||||
// c := make(chan error, 1)
|
||||
// goFunc := func(_ context.Context) error {
|
||||
// err := server.ListenAndServe()
|
||||
// if inShutdown.Load() {
|
||||
// err = <-c
|
||||
// }
|
||||
// return err
|
||||
// }
|
||||
// stopFunc := func(ctx context.Context) {
|
||||
// inShutdown.Store(true)
|
||||
// c <- server.Shutdown(ctx)
|
||||
// }
|
||||
// g.New().WithName("server").WithGoStop(goFunc, stopFunc)
|
||||
//
|
||||
// 启动 [Group]:
|
||||
//
|
||||
// g.Run()
|
||||
// g.Run(ctx)
|
||||
package exegroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.blauwelle.com/go/crate/log"
|
||||
)
|
||||
|
||||
// Group 管理1组 [Actor], 每个 [Actor] 在1个goroutine 中运行;
|
||||
// Group 的执行过程参考 [Group.Run];
|
||||
//
|
||||
// Group 的配置包含:
|
||||
// - [WithConcurrentStop] 并发执行 [Actor] 的终止函数, 默认不并发执行;
|
||||
// Group 的配置:
|
||||
// - [WithStopTimeout] 指定 [Group.Run] 从进入终止过程到返回的最长时间, 默认不限制最长时间;
|
||||
type Group struct {
|
||||
actors []*Actor
|
||||
cfg config
|
||||
errorsChan chan actorError
|
||||
actors []*Actor
|
||||
syncActors []*Actor
|
||||
asyncActors []*Actor
|
||||
cfg config
|
||||
}
|
||||
|
||||
type actorError struct {
|
||||
actor *Actor
|
||||
err error
|
||||
}
|
||||
|
||||
// New 创建 [Group];
|
||||
func New(opts ...Option) *Group {
|
||||
cfg := config{}
|
||||
cfg := config{
|
||||
stopTimeout: -1,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt.apply(&cfg)
|
||||
}
|
||||
@@ -71,88 +65,139 @@ func New(opts ...Option) *Group {
|
||||
// Default 创建包含信号处理 [Actor] 的 [Group];
|
||||
func Default(opts ...Option) *Group {
|
||||
g := New(opts...)
|
||||
g.New().WithName("signal").WithGo(HandleSignal())
|
||||
g.NewAsync().WithName("signal").WithStartFunc(HandleSignal())
|
||||
return g
|
||||
}
|
||||
|
||||
// New 创建并添加 [Actor],
|
||||
// 对 Actor 的配置通过链式调用完成;
|
||||
func (g *Group) New() *Actor {
|
||||
actor := new(Actor).WithName(fmt.Sprintf("actor-%03d", len(g.actors)+1))
|
||||
func (g *Group) newActor() *Actor {
|
||||
actor := &Actor{
|
||||
group: g,
|
||||
closeAfterStop: make(chan struct{}),
|
||||
closeAfterTimeout: make(chan struct{}),
|
||||
name: fmt.Sprintf("actor-%d", len(g.actors)),
|
||||
stopTimeout: -1,
|
||||
}
|
||||
actor.startCtx, actor.startCancel = context.WithCancel(context.Background())
|
||||
g.actors = append(g.actors, actor)
|
||||
return actor
|
||||
}
|
||||
|
||||
// Run 启动所有 [Actor]s 并等待 [Actor]s 执行完成;
|
||||
// NewAsync 创建并添加异步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
|
||||
func (g *Group) NewAsync() *Actor {
|
||||
actor := g.newActor()
|
||||
g.asyncActors = append(g.asyncActors, actor)
|
||||
return actor
|
||||
}
|
||||
|
||||
// NewSync 创建并添加同步退出的 [Actor], 对 Actor 的配置通过链式调用完成;
|
||||
func (g *Group) NewSync() *Actor {
|
||||
actor := g.newActor()
|
||||
g.syncActors = append(g.syncActors, actor)
|
||||
return actor
|
||||
}
|
||||
|
||||
// Run 启动所有 [Actor] 并等待 [Actor] 执行完成;
|
||||
// 当没有 Actor 时执行 Run 会 panic;
|
||||
//
|
||||
// Run 包含两个阶段:
|
||||
// 1. 启动 [Actor] 并等待, Actor 的运行函数返回错误时进入下1个阶段;
|
||||
// 2. 执行 [Actor] 的终止函数并返回;
|
||||
// 1. 启动所有 [Actor];
|
||||
// 2. 等待 ctx 或第1个Actor返回, 终止所有Actor.
|
||||
func (g *Group) Run(ctx context.Context) error {
|
||||
g.validate()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
c := make(chan error, len(g.actors))
|
||||
g.start(ctx, c)
|
||||
return g.wait(c, cancel)
|
||||
if err := g.validate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
g.errorsChan = make(chan actorError, len(g.actors))
|
||||
g.start(ctx)
|
||||
|
||||
return g.stop(ctx, g.wait(ctx))
|
||||
}
|
||||
|
||||
func (g *Group) validate() {
|
||||
func (g *Group) validate(ctx context.Context) error {
|
||||
if len(g.actors) == 0 {
|
||||
panic("no actor")
|
||||
err := errors.New("no actor")
|
||||
log.Error(ctx, err.Error())
|
||||
return err
|
||||
}
|
||||
for _, actor := range g.actors {
|
||||
if actor.goFunc == nil {
|
||||
panic(actor.name + " has nil goFunc")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) start(ctx context.Context, c chan error) {
|
||||
for _, actor := range g.actors {
|
||||
go func(actor *Actor) {
|
||||
var err error
|
||||
defer func() {
|
||||
if v := recover(); v != nil {
|
||||
err = fmt.Errorf("%v", v)
|
||||
}
|
||||
c <- err
|
||||
}()
|
||||
err = actor.goFunc(ctx)
|
||||
}(actor)
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) wait(c chan error, cancel context.CancelFunc) error {
|
||||
err := <-c
|
||||
cancel()
|
||||
ctx := context.Background()
|
||||
if g.cfg.stopTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), g.cfg.stopTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
for i := len(g.actors) - 1; i >= 0; i-- {
|
||||
if g.actors[i].stopFunc != nil {
|
||||
if g.cfg.concurrentStop {
|
||||
go g.actors[i].stopFunc(ctx)
|
||||
} else {
|
||||
g.actors[i].stopFunc(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 1; i < len(g.actors); i++ {
|
||||
select {
|
||||
case <-c:
|
||||
case <-ctx.Done():
|
||||
for i, actor := range g.actors {
|
||||
if actor.startFunc == nil {
|
||||
err := fmt.Errorf("group-actor %d: %s has nil startFunc", i, actor.name)
|
||||
log.Error(ctx, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// start 启动所有的 [Actor] 后返回.
|
||||
func (g *Group) start(ctx context.Context) {
|
||||
for _, actor := range g.actors {
|
||||
go actor.start()
|
||||
log.Tracef(ctx, "group is starting group-actor %s", actor.name)
|
||||
}
|
||||
}
|
||||
|
||||
// wait 等待 ctx 或 [Actor] 返回
|
||||
func (g *Group) wait(ctx context.Context) error {
|
||||
log.Tracef(ctx, "group is waiting")
|
||||
select {
|
||||
case ae := <-g.errorsChan:
|
||||
log.Tracef(ctx, "group is waiting: group-actor %s return", ae.actor.name)
|
||||
if ae.err != nil {
|
||||
return fmt.Errorf("group-actor %s return with %w", ae.actor.name, ae.err)
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
log.Tracef(ctx, "group is waiting: %s", ctx.Err().Error())
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// stop 等待 [Actor] 停止
|
||||
func (g *Group) stop(ctx context.Context, err error) error {
|
||||
stopCtx := context.Background()
|
||||
if g.cfg.stopTimeout >= 0 {
|
||||
log.Tracef(ctx, "set group stop timeout to %s", g.cfg.stopTimeout)
|
||||
var cancel func()
|
||||
stopCtx, cancel = context.WithTimeout(stopCtx, g.cfg.stopTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
go func() {
|
||||
// 终止异步 [Actor]
|
||||
for _, actor := range g.asyncActors {
|
||||
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
|
||||
actor.sendStopSignal()
|
||||
}
|
||||
// 终止同步 [Actor] 并等待 [Actor] 退出
|
||||
for i := len(g.syncActors) - 1; i >= 0; i-- {
|
||||
actor := g.syncActors[i]
|
||||
log.Tracef(ctx, "group is stopping group-actor %s", actor.name)
|
||||
actor.sendStopSignal()
|
||||
actor.wait()
|
||||
}
|
||||
}()
|
||||
// 等待所有 [Actor] 退出, 超时立即退出
|
||||
for _, actor := range g.actors {
|
||||
select {
|
||||
case <-actor.closeAfterStop:
|
||||
log.Tracef(ctx, "group is stopping group-actor %s: stopped", actor.name)
|
||||
case <-actor.closeAfterTimeout:
|
||||
log.Errorf(ctx, "group is stopping group-actor %s: timeout", actor.name)
|
||||
case <-stopCtx.Done():
|
||||
if err == nil {
|
||||
err = stopCtx.Err()
|
||||
} else {
|
||||
err = fmt.Errorf("group is stopping group-actor %s: global timeout: %w", actor.name, err)
|
||||
}
|
||||
log.Error(ctx, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Tracef(ctx, "group return: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
type config struct {
|
||||
concurrentStop bool
|
||||
stopTimeout time.Duration
|
||||
stopTimeout time.Duration // <0表示没有超时时间
|
||||
}
|
||||
|
||||
// Option 修改 [Group] 的配置;
|
||||
@@ -166,14 +211,8 @@ func (fn optionFunc) apply(cfg *config) {
|
||||
fn(cfg)
|
||||
}
|
||||
|
||||
// WithConcurrentStop 并发执行 [Actor] 的终止函数;
|
||||
func WithConcurrentStop() Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.concurrentStop = true
|
||||
})
|
||||
}
|
||||
|
||||
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间;
|
||||
// WithStopTimeout 指定 [Group.Run] 从进入终止过程到返回的最长时间.
|
||||
// <0表示没有超时时间.
|
||||
func WithStopTimeout(d time.Duration) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.stopTimeout = d
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
func Example_defaultGroup() {
|
||||
g := exegroup.Default()
|
||||
g.New().WithName("do nothing").WithGo(func(ctx context.Context) error {
|
||||
g.NewSync().WithName("do nothing").WithStartFunc(func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
})
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"git.blauwelle.com/go/crate/log"
|
||||
)
|
||||
|
||||
// HandleSignal 提供信号处理函数;
|
||||
@@ -16,10 +18,13 @@ func HandleSignal(signals ...os.Signal) func(ctx context.Context) error {
|
||||
}
|
||||
signal.Notify(c, signals...)
|
||||
return func(ctx context.Context) error {
|
||||
log.Tracef(ctx, "notify signal %v", signals)
|
||||
defer signal.Stop(c)
|
||||
select {
|
||||
case sig := <-c:
|
||||
return fmt.Errorf("signal %s", sig)
|
||||
err := fmt.Errorf("signal received: %s", sig)
|
||||
log.Trace(ctx, err.Error())
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
@@ -11,4 +11,5 @@ const (
|
||||
CodeUnexpect Code = "unexpect"
|
||||
CodeUnexpectPanic Code = "unexpect.panic"
|
||||
CodeUnknown Code = "unknown"
|
||||
CodeUnauthorized Code = "unauthorized"
|
||||
)
|
||||
|
||||
@@ -1,16 +1,24 @@
|
||||
package httpdata
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.blauwelle.com/go/crate/runtimehelper"
|
||||
)
|
||||
|
||||
type Response struct {
|
||||
Data any `json:"data,omitempty"`
|
||||
Code Code `json:"code"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Data any `json:"data,omitempty"`
|
||||
Code Code `json:"code"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Debug string `json:"debug,omitempty"`
|
||||
Traceback []runtimehelper.Frame `json:"traceback,omitempty"`
|
||||
}
|
||||
|
||||
type PageData struct {
|
||||
List []any `json:"list"`
|
||||
PageIndex int `json:"pageIndex"` // >=1
|
||||
PageSize int `json:"pageSize"` // >=1
|
||||
Total int `json:"total"` // maybe 0
|
||||
List any `json:"list"`
|
||||
PageIndex int64 `json:"pageIndex"` // >=1
|
||||
PageSize int64 `json:"pageSize"` // >=1
|
||||
Total int64 `json:"total"` // maybe 0
|
||||
}
|
||||
|
||||
func NewOkResponse(data any) Response {
|
||||
@@ -20,3 +28,24 @@ func NewOkResponse(data any) Response {
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func NewErrorResponse(err error) Response {
|
||||
ue := UniverseError{}
|
||||
if !errors.As(err, &ue) {
|
||||
response := Response{
|
||||
Code: CodeUnknown,
|
||||
Message: "未知错误",
|
||||
}
|
||||
if FlagDebug {
|
||||
response.Debug = err.Error()
|
||||
response.Traceback = runtimehelper.Stack(1, maximumFrames)
|
||||
}
|
||||
return response
|
||||
}
|
||||
return Response{
|
||||
Code: ue.Code,
|
||||
Message: ue.Message,
|
||||
Debug: ue.Debug,
|
||||
Traceback: ue.Traceback,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,24 @@ package httpdata
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.blauwelle.com/go/crate/runtimehelper"
|
||||
)
|
||||
|
||||
const maximumFrames = 8
|
||||
|
||||
type UniverseError struct {
|
||||
Code Code
|
||||
Message string
|
||||
Code Code
|
||||
Message string
|
||||
Debug string
|
||||
Traceback []runtimehelper.Frame
|
||||
}
|
||||
|
||||
func (err UniverseError) Error() string {
|
||||
if err.Message == "" {
|
||||
return string(err.Code)
|
||||
}
|
||||
return string(err.Code) + ": " + err.Message
|
||||
}
|
||||
|
||||
func NewUniverseError(code Code, message string) error {
|
||||
@@ -23,13 +36,6 @@ func NewBadRequestError(message string) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (err UniverseError) Error() string {
|
||||
if err.Message == "" {
|
||||
return string(err.Code)
|
||||
}
|
||||
return string(err.Code) + ": " + err.Message
|
||||
}
|
||||
|
||||
func IsUniverseError(err error) bool {
|
||||
return errors.As(err, &UniverseError{})
|
||||
}
|
||||
|
||||
7
httpdata/flag.go
Normal file
7
httpdata/flag.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package httpdata
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
var FlagDebug = os.Getenv("DEBUG") == "true"
|
||||
@@ -1,3 +1,5 @@
|
||||
module git.blauwelle.com/go/crate/httpdata
|
||||
|
||||
go 1.21.1
|
||||
go 1.21
|
||||
|
||||
require git.blauwelle.com/go/crate/runtimehelper v0.2.0
|
||||
|
||||
2
httpdata/go.sum
Normal file
2
httpdata/go.sum
Normal file
@@ -0,0 +1,2 @@
|
||||
git.blauwelle.com/go/crate/runtimehelper v0.2.0 h1:W19wipPCyFSGHOWzqtfouNJu7MDeJobP+iRM4bPiJpM=
|
||||
git.blauwelle.com/go/crate/runtimehelper v0.2.0/go.mod h1:yVMA0GkO9AS7iuPmalHKeWyv9en0JWj25rY1vpTuHhk=
|
||||
@@ -21,11 +21,12 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
`log` 模块包含日志处理的代码, 由 3 个包组成:
|
||||
`log` 模块包含日志处理的代码, 由 4 个包组成:
|
||||
|
||||
1. [logsdk](./logsdk): 日志实现;
|
||||
2. [logjson](./logsdk/logjson): 控制台 JSON 日志处理器(`Processor`);
|
||||
3. [log](.): 根目录, 提供全局 `Logger`, 把 `Logger` / `Entry` 相关的方法封装成函数.
|
||||
3. [logtext](./logsdk/logtext): 控制台文本日志处理器(`Processor`);
|
||||
4. [log](.): 根目录, 提供全局 `Logger`, 把 `Logger` / `Entry` 相关的方法封装成函数.
|
||||
|
||||
## 基本概念
|
||||
|
||||
@@ -88,7 +89,7 @@ log.Logger()
|
||||
|
||||
关闭强制生成调用栈, `log.Logger().SetReportStackLevel(logsdk.LevelDisabled)`;
|
||||
|
||||
记录 panic, 在 `defer v = recover()` 后执行 `log.Z(context.Background(), v)`, 其中 `Z` 需要是附加调用栈的等级;
|
||||
记录 panic, 在 `defer v = recover()` 后执行 `log.Z(context.Background(), v)`, 其中 `Z` 需要是附加调用栈的等级(例如: `Error`);
|
||||
|
||||
mock, 实现 mock 日志处理器对生成的日志进行处理;
|
||||
|
||||
@@ -96,5 +97,6 @@ mock, 实现 mock 日志处理器对生成的日志进行处理;
|
||||
|
||||
## 日志处理器
|
||||
|
||||
- [logsdk/logjson](logsdk/logjson) 控制台 JSON 日志
|
||||
- [go get git.blauwelle.com/go/crate/logotel](../logotel) OpenTelemetry 日志
|
||||
- [logsdk/logjson](logsdk/logjson) 控制台 JSON 日志.
|
||||
- [logtext](./logsdk/logtext): 控制台文本日志.
|
||||
- [go get git.blauwelle.com/go/crate/logotel](../logotel) OpenTelemetry 日志.
|
||||
|
||||
@@ -2,7 +2,6 @@ package logjson
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -60,10 +59,10 @@ func newConfig(opts ...Option) *config {
|
||||
opt.apply(cfg)
|
||||
}
|
||||
if !cfg.hasPool {
|
||||
cfg.bytesBufferPool = NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize)
|
||||
cfg.bytesBufferPool = DefaultBytesBufferPool
|
||||
}
|
||||
if cfg.output == nil {
|
||||
cfg.output = NewSyncWriter(os.Stderr)
|
||||
cfg.output = DefaultStderrSyncWriter
|
||||
}
|
||||
if cfg.timestampFormat == "" {
|
||||
cfg.timestampFormat = time.RFC3339Nano
|
||||
|
||||
@@ -10,6 +10,8 @@ const (
|
||||
bytesBufferMaximumSize = 4096
|
||||
)
|
||||
|
||||
var DefaultBytesBufferPool = NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize)
|
||||
|
||||
type BytesBufferPool interface {
|
||||
Get() *bytes.Buffer
|
||||
Put(buffer *bytes.Buffer)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"git.blauwelle.com/go/crate/log/logsdk"
|
||||
)
|
||||
@@ -69,11 +68,11 @@ func (processor *Processor) Process(_ context.Context, entry logsdk.ReadonlyEntr
|
||||
|
||||
// Encode 2次分配
|
||||
if err := encoder.Encode(m); err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "JSON processor cannot encode log %#v: %s\n", m, err.Error())
|
||||
_, _ = fmt.Fprintf(processor.output, "JSON processor cannot encode log %#v: %s\n", m, err.Error())
|
||||
}
|
||||
|
||||
if _, err := buf.WriteTo(processor.output); err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "JSON processor cannot write log: %s\n", err.Error())
|
||||
_, _ = fmt.Fprintf(processor.output, "JSON processor cannot write log: %s\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,14 @@ package logjson
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultStderrSyncWriter = NewSyncWriter(os.Stderr)
|
||||
)
|
||||
|
||||
// NewSyncWriter 返回写互斥的 io.Writer
|
||||
func NewSyncWriter(writer io.Writer) io.Writer {
|
||||
return &syncWriter{
|
||||
|
||||
69
log/logsdk/logtext/option.go
Normal file
69
log/logsdk/logtext/option.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package logtext
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"git.blauwelle.com/go/crate/log/logsdk/logjson"
|
||||
)
|
||||
|
||||
type Option interface {
|
||||
apply(cfg *config)
|
||||
}
|
||||
|
||||
func WithBufferPool(pool logjson.BytesBufferPool) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.bytesBufferPool = pool
|
||||
})
|
||||
}
|
||||
|
||||
func WithOutput(w io.Writer) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.output = w
|
||||
})
|
||||
}
|
||||
|
||||
func WithTimeFormat(format string) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.timeFormat = format
|
||||
cfg.timePadding = len(format)
|
||||
})
|
||||
}
|
||||
|
||||
func WithDisableTime(disable bool) Option {
|
||||
return optionFunc(func(cfg *config) {
|
||||
cfg.disableTime = disable
|
||||
})
|
||||
}
|
||||
|
||||
type config struct {
|
||||
bytesBufferPool logjson.BytesBufferPool
|
||||
output io.Writer
|
||||
timeFormat string
|
||||
timePadding int
|
||||
disableTime bool
|
||||
}
|
||||
|
||||
func newConfig(opts ...Option) *config {
|
||||
cfg := new(config)
|
||||
for _, opt := range opts {
|
||||
opt.apply(cfg)
|
||||
}
|
||||
if cfg.bytesBufferPool == nil {
|
||||
cfg.bytesBufferPool = logjson.DefaultBytesBufferPool
|
||||
}
|
||||
if cfg.output == nil {
|
||||
cfg.output = logjson.DefaultStderrSyncWriter
|
||||
}
|
||||
if cfg.timeFormat == "" {
|
||||
cfg.timeFormat = time.RFC3339Nano
|
||||
cfg.timePadding = len(cfg.timeFormat)
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
type optionFunc func(cfg *config)
|
||||
|
||||
func (fn optionFunc) apply(cfg *config) {
|
||||
fn(cfg)
|
||||
}
|
||||
146
log/logsdk/logtext/processor.go
Normal file
146
log/logsdk/logtext/processor.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package logtext
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"git.blauwelle.com/go/crate/log/logsdk"
|
||||
"git.blauwelle.com/go/crate/log/logsdk/logjson"
|
||||
)
|
||||
|
||||
var _ logsdk.EntryProcessor = &Processor{}
|
||||
|
||||
func New(opts ...Option) *Processor {
|
||||
cfg := newConfig(opts...)
|
||||
return &Processor{
|
||||
bytesBufferPool: cfg.bytesBufferPool,
|
||||
output: cfg.output,
|
||||
timeFormat: cfg.timeFormat,
|
||||
timePadding: cfg.timePadding,
|
||||
disableTime: cfg.disableTime,
|
||||
}
|
||||
}
|
||||
|
||||
type Processor struct {
|
||||
bytesBufferPool logjson.BytesBufferPool
|
||||
output io.Writer
|
||||
timeFormat string
|
||||
timePadding int
|
||||
disableTime bool
|
||||
}
|
||||
|
||||
func (processor *Processor) Process(_ context.Context, entry logsdk.ReadonlyEntry) {
|
||||
buf := processor.bytesBufferPool.Get()
|
||||
buf.Reset()
|
||||
defer processor.bytesBufferPool.Put(buf)
|
||||
|
||||
buf.Write(formatLevel(entry.Level))
|
||||
|
||||
if !processor.disableTime {
|
||||
buf.WriteByte(' ')
|
||||
timeValue := entry.Time.Format(processor.timeFormat)
|
||||
buf.WriteString(timeValue)
|
||||
writeSpace(buf, processor.timePadding-len(timeValue))
|
||||
}
|
||||
|
||||
for _, field := range entry.Fields {
|
||||
buf.WriteByte(' ')
|
||||
buf.WriteString(field.Key)
|
||||
buf.WriteByte('=')
|
||||
writeValue(buf, field.Value)
|
||||
}
|
||||
|
||||
if entry.Message != "" {
|
||||
buf.WriteByte(' ')
|
||||
buf.WriteString(entry.Message)
|
||||
}
|
||||
|
||||
buf.WriteByte('\n')
|
||||
|
||||
if entry.Caller.IsValid() && len(entry.Stack) == 0 {
|
||||
writeFrame(buf, entry.Caller)
|
||||
}
|
||||
|
||||
if len(entry.Stack) > 0 {
|
||||
for _, frame := range entry.Stack {
|
||||
writeFrame(buf, frame)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := buf.WriteTo(processor.output); err != nil {
|
||||
_, _ = fmt.Fprintf(processor.output, "TEXT processor cannot write log: %s\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func writeSpace(buf *bytes.Buffer, n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
buf.WriteByte(' ')
|
||||
}
|
||||
}
|
||||
|
||||
func writeFrame(buf *bytes.Buffer, frame logsdk.Frame) {
|
||||
buf.WriteByte('|')
|
||||
buf.WriteByte(' ')
|
||||
buf.WriteString(frame.Function)
|
||||
buf.WriteByte('\n')
|
||||
buf.WriteByte('|')
|
||||
buf.WriteByte(' ')
|
||||
buf.WriteByte('\t')
|
||||
buf.WriteString(frame.File)
|
||||
buf.WriteByte(':')
|
||||
buf.WriteString(strconv.Itoa(frame.Line))
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
func writeValue(buf *bytes.Buffer, value any) {
|
||||
switch value := value.(type) {
|
||||
case nil:
|
||||
buf.WriteString("<nil>")
|
||||
case string:
|
||||
buf.WriteString(value)
|
||||
case error:
|
||||
buf.WriteString(value.Error())
|
||||
case fmt.Stringer:
|
||||
buf.WriteString(value.String())
|
||||
default:
|
||||
_, _ = fmt.Fprintf(buf, "%v", value)
|
||||
}
|
||||
}
|
||||
|
||||
func formatLevel(level logsdk.Level) []byte {
|
||||
switch level {
|
||||
case logsdk.LevelPanic:
|
||||
return []byte(LevelPanicValue)
|
||||
case logsdk.LevelFatal:
|
||||
return []byte(LevelFatalValue)
|
||||
case logsdk.LevelError:
|
||||
return []byte(LevelErrorValue)
|
||||
case logsdk.LevelWarn:
|
||||
return []byte(LevelWarnValue)
|
||||
case logsdk.LevelInfo:
|
||||
return []byte(LevelInfoValue)
|
||||
case logsdk.LevelDebug:
|
||||
return []byte(LevelDebugValue)
|
||||
case logsdk.LevelTrace:
|
||||
return []byte(LevelTraceValue)
|
||||
case logsdk.LevelDisabled:
|
||||
fallthrough
|
||||
default:
|
||||
return []byte(levelUnknownValue)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
LevelPanicValue = "PNC"
|
||||
LevelFatalValue = "FTL"
|
||||
LevelErrorValue = "ERR"
|
||||
LevelWarnValue = "WRN"
|
||||
LevelInfoValue = "INF"
|
||||
LevelDebugValue = "DBG"
|
||||
LevelTraceValue = "TRC"
|
||||
|
||||
levelUnknownValue = "UNK"
|
||||
)
|
||||
@@ -3,7 +3,13 @@ module git.blauwelle.com/go/crate/logotel
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
git.blauwelle.com/go/crate/log v0.9.0
|
||||
go.opentelemetry.io/otel v1.13.0
|
||||
go.opentelemetry.io/otel/trace v1.13.0
|
||||
git.blauwelle.com/go/crate/log v0.15.0
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/trace v1.21.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
)
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
git.blauwelle.com/go/crate/log v0.9.0 h1:H01AQIKcYybeCZGdReBzMoWhkXPQJAoY1t+K0J1asEk=
|
||||
git.blauwelle.com/go/crate/log v0.9.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
|
||||
git.blauwelle.com/go/crate/log v0.15.0 h1:nOPCB5a2F9fCvhiSkymxQRO639hoaOlDU95aMSPWf80=
|
||||
git.blauwelle.com/go/crate/log v0.15.0/go.mod h1:jfVfpRODZTA70A8IkApVeGsS1zfLk1D77sLWZM/w+L0=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y=
|
||||
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
|
||||
go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY=
|
||||
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
||||
@@ -4,18 +4,13 @@ import (
|
||||
"git.blauwelle.com/go/crate/log/logsdk/logjson"
|
||||
)
|
||||
|
||||
const (
|
||||
bytesBufferInitialSize = 512
|
||||
bytesBufferMaximumSize = 4096
|
||||
)
|
||||
|
||||
func newConfig(opts ...Option) *config {
|
||||
cfg := defaultConfig()
|
||||
for _, opt := range opts {
|
||||
opt.apply(cfg)
|
||||
}
|
||||
if !cfg.hasPool {
|
||||
cfg.bytesBufferPool = logjson.NewBytesBufferPool(bytesBufferInitialSize, bytesBufferMaximumSize)
|
||||
cfg.bytesBufferPool = logjson.DefaultBytesBufferPool
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
@@ -16,6 +16,14 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
fieldPrefix = "log.field."
|
||||
)
|
||||
|
||||
var (
|
||||
tracer = otel.Tracer(tracerName, trace.WithInstrumentationVersion("semver:"+version))
|
||||
)
|
||||
|
||||
// New 创建 log/opentelemetry 处理器
|
||||
func New(opts ...Option) *Processor {
|
||||
cfg := newConfig(opts...)
|
||||
@@ -37,7 +45,11 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
|
||||
span := trace.SpanFromContext(ctx)
|
||||
if !span.IsRecording() {
|
||||
if processor.defaultSpan {
|
||||
ctx, span = otel.Tracer("git.blauwelle.com/go/crate/logotel").Start(ctx, "default") //nolint:ineffassign,staticcheck,wastedassign
|
||||
name := entry.Message
|
||||
if name == "" {
|
||||
name = "default"
|
||||
}
|
||||
ctx, span = tracer.Start(ctx, name) //nolint:ineffassign,staticcheck,wastedassign
|
||||
defer span.End()
|
||||
}
|
||||
}
|
||||
@@ -66,7 +78,7 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
processor.bufferPool.Put(buf)
|
||||
attrs = append(attrs, attribute.String("zz.stack", buf.String()))
|
||||
attrs = append(attrs, attribute.String("log.stack", buf.String()))
|
||||
}
|
||||
for _, field := range entry.Fields {
|
||||
attrs = append(attrs, fieldToKV(field))
|
||||
@@ -80,22 +92,22 @@ func (processor *Processor) Process(ctx context.Context, entry logsdk.ReadonlyEn
|
||||
func fieldToKV(field logsdk.KV) attribute.KeyValue {
|
||||
switch value := field.Value.(type) {
|
||||
case nil:
|
||||
return attribute.String(field.Key, "<nil>")
|
||||
return attribute.String(fieldPrefix+field.Key, "<nil>")
|
||||
case string:
|
||||
return attribute.String(field.Key, value)
|
||||
return attribute.String(fieldPrefix+field.Key, value)
|
||||
case int:
|
||||
return attribute.Int(field.Key, value)
|
||||
return attribute.Int(fieldPrefix+field.Key, value)
|
||||
case int64:
|
||||
return attribute.Int64(field.Key, value)
|
||||
return attribute.Int64(fieldPrefix+field.Key, value)
|
||||
case float64:
|
||||
return attribute.Float64(field.Key, value)
|
||||
return attribute.Float64(fieldPrefix+field.Key, value)
|
||||
case bool:
|
||||
return attribute.Bool(field.Key, value)
|
||||
return attribute.Bool(fieldPrefix+field.Key, value)
|
||||
case error:
|
||||
return attribute.String(field.Key, value.Error())
|
||||
return attribute.String(fieldPrefix+field.Key, value.Error())
|
||||
case fmt.Stringer:
|
||||
return attribute.String(field.Key, value.String())
|
||||
return attribute.String(fieldPrefix+field.Key, value.String())
|
||||
}
|
||||
|
||||
return attribute.String(field.Key, fmt.Sprint(field.Value))
|
||||
return attribute.String(fieldPrefix+field.Key, fmt.Sprint(field.Value))
|
||||
}
|
||||
|
||||
6
logotel/version.go
Normal file
6
logotel/version.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package logotel
|
||||
|
||||
const (
|
||||
tracerName = "git.blauwelle.com/go/crate/logotel"
|
||||
version = "0.15.0"
|
||||
)
|
||||
@@ -2,31 +2,30 @@ module git.blauwelle.com/go/crate/uptracehelper
|
||||
|
||||
go 1.20
|
||||
|
||||
require github.com/uptrace/uptrace-go v1.20.2
|
||||
require github.com/uptrace/uptrace-go v1.21.0
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/go-logr/logr v1.3.0 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0 // indirect
|
||||
go.opentelemetry.io/otel v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.19.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
|
||||
go.opentelemetry.io/otel v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.21.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/net v0.18.0 // indirect
|
||||
golang.org/x/sys v0.14.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
|
||||
google.golang.org/grpc v1.59.0 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
)
|
||||
|
||||
@@ -6,55 +6,52 @@ github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
|
||||
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/uptrace/uptrace-go v1.20.2 h1:+wfqrWqDpVi5lyjyqsj5ylkoQkWVRd61tGFe3szGxoc=
|
||||
github.com/uptrace/uptrace-go v1.20.2/go.mod h1:StKbo3wpWwl1pFM/Uj9HXKOHAkEqiQ38Y8OVGvmSLz4=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0 h1:2JydY5UiDpqvj2p7sO9bgHuhTy4hgTZ0ymehdq/Ob0Q=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0/go.mod h1:ch3a5QxOqVWxas4CzjCFFOOQe+7HgAXC/N1oVxS9DK4=
|
||||
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
|
||||
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0/go.mod h1:UVAO61+umUsHLtYb8KXXRoHtxUkdOPkYidzW3gipRLQ=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0/go.mod h1:1MsF6Y7gTqosgoZvHlzcaaM8DIMNZgJh87ykokoNH7Y=
|
||||
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
|
||||
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
|
||||
go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o=
|
||||
go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.19.0 h1:EJoTO5qysMsYCa+w4UghwFV/ptQgqSL/8Ni+hx+8i1k=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.19.0/go.mod h1:XjG0jQyFJrv2PbMvwND7LwCEhsJzCzV5210euduKcKY=
|
||||
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
|
||||
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
|
||||
github.com/uptrace/uptrace-go v1.21.0 h1:oJoUjhiVT7aiuoG6B3ClVHtJozLn3cK9hQt8U5dQO1M=
|
||||
github.com/uptrace/uptrace-go v1.21.0/go.mod h1:/aXAFGKOqeAFBqWa1xtzLnGX2xJm1GScqz9NJ0TJjLM=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 h1:m9ReioVPIffxjJlGNRd0d5poy+9oTro3D+YbiEzUDOc=
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1/go.mod h1:CANkrsXNzqOKXfOomu2zhOmc1/J5UZK9SGjrat6ZCG0=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 h1:jd0+5t/YynESZqsSyPz+7PAFdEop0dlN0+PkyHYo8oI=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 h1:VhlEQAPp9R1ktYfrPk5SOryw1e9LDDTZCbIPFrho0ec=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0/go.mod h1:kB3ufRbfU+CQ4MlUcqtW8Z7YEOBeK2DJ6CmR5rYYF3E=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
|
||||
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
|
||||
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
|
||||
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 h1:HJMDndgxest5n2y77fnErkM62iUsptE/H8p0dC2Huo4=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405/go.mod h1:oT32Z4o8Zv2xPQTg0pbVaPr0MPOH6f14RgXt7zfIpwg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE=
|
||||
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 h1:I6WNifs6pF9tNdSob2W24JtyxIYjzFB9qDlpUC76q+U=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
|
||||
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
|
||||
@@ -2,6 +2,7 @@ package uptracehelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/uptrace/uptrace-go/uptrace"
|
||||
)
|
||||
@@ -10,6 +11,7 @@ type Config struct {
|
||||
ServiceName string
|
||||
ServiceVersion string
|
||||
DeploymentEnvironment string
|
||||
ShutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
func Setup(cfg Config) {
|
||||
@@ -23,19 +25,21 @@ func Setup(cfg Config) {
|
||||
uptrace.ConfigureOpentelemetry(opts...)
|
||||
}
|
||||
|
||||
// GoStop 初始化并等待 uptrace
|
||||
// Bootstrap 初始化并等待 uptrace
|
||||
// 配合 [git.blauwelle.com/go/crate/exegroup] 使用
|
||||
// env
|
||||
// - UPTRACE_DISABLED 存在就不再初始化
|
||||
// - UPTRACE_DSN 服务端地址
|
||||
func GoStop(cfg Config) (func(ctx context.Context) error, func(ctx context.Context)) {
|
||||
func Bootstrap(cfg Config) func(ctx context.Context) error {
|
||||
Setup(cfg)
|
||||
shutdownErr := make(chan error, 1)
|
||||
goFunc := func(context.Context) error {
|
||||
return <-shutdownErr
|
||||
return func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
shutdownCtx := context.Background()
|
||||
if cfg.ShutdownTimeout >= 0 {
|
||||
var cancel func()
|
||||
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, cfg.ShutdownTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
return uptrace.Shutdown(shutdownCtx)
|
||||
}
|
||||
stopFunc := func(ctx context.Context) {
|
||||
shutdownErr <- uptrace.Shutdown(ctx)
|
||||
}
|
||||
return goFunc, stopFunc
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user