master
parent
a1b7b3879f
commit
ee3004130e
26 changed files with 3 additions and 1495 deletions
@ -1,12 +0,0 @@ |
||||
package rescue |
||||
|
||||
import "git.diulo.com/mogfee/kit/core/logx" |
||||
|
||||
func Recover(cleanups ...func()) { |
||||
for _, cleanup := range cleanups { |
||||
cleanup() |
||||
} |
||||
if p := recover(); p != nil { |
||||
logx.ErrorStack(p) |
||||
} |
||||
} |
@ -1,8 +0,0 @@ |
||||
package sysx |
||||
|
||||
import "go.uber.org/automaxprocs/maxprocs" |
||||
|
||||
// Automatically set GOMAXPROCS to match Linux container CPU quota.
|
||||
func init() { |
||||
maxprocs.Set(maxprocs.Logger(nil)) |
||||
} |
@ -1,22 +0,0 @@ |
||||
package sysx |
||||
|
||||
import ( |
||||
"os" |
||||
|
||||
"github.com/zeromicro/go-zero/core/stringx" |
||||
) |
||||
|
||||
var hostname string |
||||
|
||||
func init() { |
||||
var err error |
||||
hostname, err = os.Hostname() |
||||
if err != nil { |
||||
hostname = stringx.RandId() |
||||
} |
||||
} |
||||
|
||||
// Hostname returns the name of the host, if no hostname, a random id is returned.
|
||||
func Hostname() string { |
||||
return hostname |
||||
} |
@ -1,11 +0,0 @@ |
||||
package sysx |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestHostname(t *testing.T) { |
||||
assert.True(t, len(Hostname()) > 0) |
||||
} |
@ -1,29 +0,0 @@ |
||||
package threading |
||||
|
||||
import "sync" |
||||
|
||||
type RoutineGroup struct { |
||||
waitGroup sync.WaitGroup |
||||
} |
||||
|
||||
func NewRoutineGroup() *RoutineGroup { |
||||
return new(RoutineGroup) |
||||
} |
||||
func (g *RoutineGroup) Run(fn func()) { |
||||
g.waitGroup.Add(1) |
||||
go func() { |
||||
defer g.waitGroup.Done() |
||||
fn() |
||||
}() |
||||
} |
||||
|
||||
func (g *RoutineGroup) RunSafe(fn func()) { |
||||
g.waitGroup.Add(1) |
||||
GoSafe(func() { |
||||
defer g.waitGroup.Done() |
||||
fn() |
||||
}) |
||||
} |
||||
func (g *RoutineGroup) Wait() { |
||||
g.waitGroup.Wait() |
||||
} |
@ -1,24 +0,0 @@ |
||||
package threading |
||||
|
||||
import ( |
||||
"bytes" |
||||
"git.diulo.com/mogfee/kit/core/rescue" |
||||
"runtime" |
||||
"strconv" |
||||
) |
||||
|
||||
func GoSafe(fn func()) { |
||||
go RunSafe(fn) |
||||
} |
||||
func RoutineId() uint64 { |
||||
b := make([]byte, 64) |
||||
b = b[:runtime.Stack(b, false)] |
||||
b = bytes.TrimPrefix(b, []byte("goroutine ")) |
||||
b = b[:bytes.IndexByte(b, ' ')] |
||||
n, _ := strconv.ParseUint(string(b), 10, 64) |
||||
return n |
||||
} |
||||
func RunSafe(fn func()) { |
||||
defer rescue.Recover() |
||||
fn() |
||||
} |
@ -1,26 +0,0 @@ |
||||
package threading |
||||
|
||||
import ( |
||||
"git.diulo.com/mogfee/kit/core/rescue" |
||||
"git.diulo.com/mogfee/kit/lang" |
||||
) |
||||
|
||||
type TaskRunner struct { |
||||
limitChan chan lang.PlaceholderType |
||||
} |
||||
|
||||
func NewTaskRunner(concurrency int) *TaskRunner { |
||||
return &TaskRunner{ |
||||
limitChan: make(chan lang.PlaceholderType, concurrency), |
||||
} |
||||
} |
||||
|
||||
func (r *TaskRunner) Schedule(task func()) { |
||||
r.limitChan <- lang.Placeholder |
||||
go func() { |
||||
defer rescue.Recover(func() { |
||||
<-r.limitChan |
||||
}) |
||||
task() |
||||
}() |
||||
} |
@ -1,20 +0,0 @@ |
||||
package threading |
||||
|
||||
type WorkerGroup struct { |
||||
job func() |
||||
workers int |
||||
} |
||||
|
||||
func NewWorkerGroup(job func(), works int) *WorkerGroup { |
||||
return &WorkerGroup{ |
||||
job: job, |
||||
workers: works, |
||||
} |
||||
} |
||||
func (wg WorkerGroup) Start() { |
||||
group := NewRoutineGroup() |
||||
for i := 0; i < wg.workers; i++ { |
||||
group.RunSafe(wg.job) |
||||
} |
||||
group.Wait() |
||||
} |
@ -1,117 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/url" |
||||
"sync" |
||||
|
||||
"github.com/zeromicro/go-zero/core/lang" |
||||
"github.com/zeromicro/go-zero/core/logx" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/exporters/jaeger" |
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" |
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" |
||||
"go.opentelemetry.io/otel/exporters/zipkin" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" |
||||
) |
||||
|
||||
const ( |
||||
kindJaeger = "jaeger" |
||||
kindZipkin = "zipkin" |
||||
kindOtlpGrpc = "otlpgrpc" |
||||
kindOtlpHttp = "otlphttp" |
||||
) |
||||
|
||||
var ( |
||||
agents = make(map[string]lang.PlaceholderType) |
||||
lock sync.Mutex |
||||
tp *sdktrace.TracerProvider |
||||
) |
||||
|
||||
// StartAgent starts an opentelemetry agent.
|
||||
func StartAgent(c Config) { |
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
|
||||
_, ok := agents[c.Endpoint] |
||||
if ok { |
||||
return |
||||
} |
||||
|
||||
// if error happens, let later calls run.
|
||||
if err := startAgent(c); err != nil { |
||||
return |
||||
} |
||||
|
||||
agents[c.Endpoint] = lang.Placeholder |
||||
} |
||||
|
||||
// StopAgent shuts down the span processors in the order they were registered.
|
||||
func StopAgent() { |
||||
_ = tp.Shutdown(context.Background()) |
||||
} |
||||
|
||||
func createExporter(c Config) (sdktrace.SpanExporter, error) { |
||||
// Just support jaeger and zipkin now, more for later
|
||||
switch c.Batcher { |
||||
case kindJaeger: |
||||
u, _ := url.Parse(c.Endpoint) |
||||
if u.Scheme == "udp" { |
||||
return jaeger.New(jaeger.WithAgentEndpoint(jaeger.WithAgentHost(u.Hostname()), jaeger.WithAgentPort(u.Port()))) |
||||
} |
||||
return jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint))) |
||||
case kindZipkin: |
||||
return zipkin.New(c.Endpoint) |
||||
case kindOtlpGrpc: |
||||
// Always treat trace exporter as optional component, so we use nonblock here,
|
||||
// otherwise this would slow down app start up even set a dial timeout here when
|
||||
// endpoint can not reach.
|
||||
// If the connection not dial success, the global otel ErrorHandler will catch error
|
||||
// when reporting data like other exporters.
|
||||
return otlptracegrpc.New( |
||||
context.Background(), |
||||
otlptracegrpc.WithInsecure(), |
||||
otlptracegrpc.WithEndpoint(c.Endpoint), |
||||
) |
||||
case kindOtlpHttp: |
||||
// Not support flexible configuration now.
|
||||
return otlptracehttp.New( |
||||
context.Background(), |
||||
otlptracehttp.WithInsecure(), |
||||
otlptracehttp.WithEndpoint(c.Endpoint), |
||||
) |
||||
default: |
||||
return nil, fmt.Errorf("unknown exporter: %s", c.Batcher) |
||||
} |
||||
} |
||||
|
||||
func startAgent(c Config) error { |
||||
opts := []sdktrace.TracerProviderOption{ |
||||
// Set the sampling rate based on the parent span to 100%
|
||||
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))), |
||||
// Record information about this application in a Resource.
|
||||
sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))), |
||||
} |
||||
|
||||
if len(c.Endpoint) > 0 { |
||||
exp, err := createExporter(c) |
||||
if err != nil { |
||||
logx.Error(err) |
||||
return err |
||||
} |
||||
|
||||
// Always be sure to batch in production.
|
||||
opts = append(opts, sdktrace.WithBatcher(exp)) |
||||
} |
||||
|
||||
tp = sdktrace.NewTracerProvider(opts...) |
||||
otel.SetTracerProvider(tp) |
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { |
||||
logx.Errorf("[otel] error: %v", err) |
||||
})) |
||||
|
||||
return nil |
||||
} |
@ -1,76 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/zeromicro/go-zero/core/logx" |
||||
) |
||||
|
||||
func TestStartAgent(t *testing.T) { |
||||
logx.Disable() |
||||
|
||||
const ( |
||||
endpoint1 = "localhost:1234" |
||||
endpoint2 = "remotehost:1234" |
||||
endpoint3 = "localhost:1235" |
||||
endpoint4 = "localhost:1236" |
||||
endpoint5 = "udp://localhost:6831" |
||||
) |
||||
c1 := Config{ |
||||
Name: "foo", |
||||
} |
||||
c2 := Config{ |
||||
Name: "bar", |
||||
Endpoint: endpoint1, |
||||
Batcher: kindJaeger, |
||||
} |
||||
c3 := Config{ |
||||
Name: "any", |
||||
Endpoint: endpoint2, |
||||
Batcher: kindZipkin, |
||||
} |
||||
c4 := Config{ |
||||
Name: "bla", |
||||
Endpoint: endpoint3, |
||||
Batcher: "otlp", |
||||
} |
||||
c5 := Config{ |
||||
Name: "grpc", |
||||
Endpoint: endpoint3, |
||||
Batcher: kindOtlpGrpc, |
||||
} |
||||
c6 := Config{ |
||||
Name: "otlphttp", |
||||
Endpoint: endpoint4, |
||||
Batcher: kindOtlpHttp, |
||||
} |
||||
c7 := Config{ |
||||
Name: "UDP", |
||||
Endpoint: endpoint5, |
||||
Batcher: kindJaeger, |
||||
} |
||||
|
||||
StartAgent(c1) |
||||
StartAgent(c1) |
||||
StartAgent(c2) |
||||
StartAgent(c3) |
||||
StartAgent(c4) |
||||
StartAgent(c5) |
||||
StartAgent(c6) |
||||
StartAgent(c7) |
||||
|
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
|
||||
// because remotehost cannot be resolved
|
||||
assert.Equal(t, 5, len(agents)) |
||||
_, ok := agents[""] |
||||
assert.True(t, ok) |
||||
_, ok = agents[endpoint1] |
||||
assert.True(t, ok) |
||||
_, ok = agents[endpoint2] |
||||
assert.False(t, ok) |
||||
_, ok = agents[endpoint5] |
||||
assert.True(t, ok) |
||||
} |
@ -1,40 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"go.opentelemetry.io/otel/attribute" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" |
||||
gcodes "google.golang.org/grpc/codes" |
||||
) |
||||
|
||||
const ( |
||||
// GRPCStatusCodeKey is convention for numeric status code of a gRPC request.
|
||||
GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code") |
||||
// RPCNameKey is the name of message transmitted or received.
|
||||
RPCNameKey = attribute.Key("name") |
||||
// RPCMessageTypeKey is the type of message transmitted or received.
|
||||
RPCMessageTypeKey = attribute.Key("message.type") |
||||
// RPCMessageIDKey is the identifier of message transmitted or received.
|
||||
RPCMessageIDKey = attribute.Key("message.id") |
||||
// RPCMessageCompressedSizeKey is the compressed size of the message transmitted or received in bytes.
|
||||
RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size") |
||||
// RPCMessageUncompressedSizeKey is the uncompressed size of the message
|
||||
// transmitted or received in bytes.
|
||||
RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size") |
||||
) |
||||
|
||||
// Semantic conventions for common RPC attributes.
|
||||
var ( |
||||
// RPCSystemGRPC is the semantic convention for gRPC as the remoting system.
|
||||
RPCSystemGRPC = semconv.RPCSystemKey.String("grpc") |
||||
// RPCNameMessage is the semantic convention for a message named message.
|
||||
RPCNameMessage = RPCNameKey.String("message") |
||||
// RPCMessageTypeSent is the semantic conventions for sent RPC message types.
|
||||
RPCMessageTypeSent = RPCMessageTypeKey.String("SENT") |
||||
// RPCMessageTypeReceived is the semantic conventions for the received RPC message types.
|
||||
RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED") |
||||
) |
||||
|
||||
// StatusCodeAttr returns an attribute.KeyValue that represents the give c.
|
||||
func StatusCodeAttr(c gcodes.Code) attribute.KeyValue { |
||||
return GRPCStatusCodeKey.Int64(int64(c)) |
||||
} |
@ -1,12 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
gcodes "google.golang.org/grpc/codes" |
||||
) |
||||
|
||||
func TestStatusCodeAttr(t *testing.T) { |
||||
assert.Equal(t, GRPCStatusCodeKey.Int(int(gcodes.DataLoss)), StatusCodeAttr(gcodes.DataLoss)) |
||||
} |
@ -1,12 +0,0 @@ |
||||
package trace |
||||
|
||||
// TraceName represents the tracing name.
|
||||
const TraceName = "go-zero" |
||||
|
||||
// A Config is an opentelemetry config.
|
||||
type Config struct { |
||||
Name string `json:",optional"` |
||||
Endpoint string `json:",optional"` |
||||
Sampler float64 `json:",default=1.0"` |
||||
Batcher string `json:",default=jaeger,options=jaeger|zipkin|otlpgrpc|otlphttp"` |
||||
} |
@ -1,38 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/protobuf/proto" |
||||
) |
||||
|
||||
const messageEvent = "message" |
||||
|
||||
var ( |
||||
// MessageSent is the type of sent messages.
|
||||
MessageSent = messageType(RPCMessageTypeSent) |
||||
// MessageReceived is the type of received messages.
|
||||
MessageReceived = messageType(RPCMessageTypeReceived) |
||||
) |
||||
|
||||
type messageType attribute.KeyValue |
||||
|
||||
// Event adds an event of the messageType to the span associated with the
|
||||
// passed context with id and size (if message is a proto message).
|
||||
func (m messageType) Event(ctx context.Context, id int, message any) { |
||||
span := trace.SpanFromContext(ctx) |
||||
if p, ok := message.(proto.Message); ok { |
||||
span.AddEvent(messageEvent, trace.WithAttributes( |
||||
attribute.KeyValue(m), |
||||
RPCMessageIDKey.Int(id), |
||||
RPCMessageUncompressedSizeKey.Int(proto.Size(p)), |
||||
)) |
||||
} else { |
||||
span.AddEvent(messageEvent, trace.WithAttributes( |
||||
attribute.KeyValue(m), |
||||
RPCMessageIDKey.Int(id), |
||||
)) |
||||
} |
||||
} |
@ -1,73 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/codes" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/protobuf/reflect/protoreflect" |
||||
"google.golang.org/protobuf/types/dynamicpb" |
||||
) |
||||
|
||||
func TestMessageType_Event(t *testing.T) { |
||||
var span mockSpan |
||||
ctx := trace.ContextWithSpan(context.Background(), &span) |
||||
MessageReceived.Event(ctx, 1, "foo") |
||||
assert.Equal(t, messageEvent, span.name) |
||||
assert.NotEmpty(t, span.options) |
||||
} |
||||
|
||||
func TestMessageType_EventProtoMessage(t *testing.T) { |
||||
var span mockSpan |
||||
var message mockMessage |
||||
ctx := trace.ContextWithSpan(context.Background(), &span) |
||||
MessageReceived.Event(ctx, 1, message) |
||||
assert.Equal(t, messageEvent, span.name) |
||||
assert.NotEmpty(t, span.options) |
||||
} |
||||
|
||||
type mockSpan struct { |
||||
name string |
||||
options []trace.EventOption |
||||
} |
||||
|
||||
func (m *mockSpan) End(options ...trace.SpanEndOption) { |
||||
} |
||||
|
||||
func (m *mockSpan) AddEvent(name string, options ...trace.EventOption) { |
||||
m.name = name |
||||
m.options = options |
||||
} |
||||
|
||||
func (m *mockSpan) IsRecording() bool { |
||||
return false |
||||
} |
||||
|
||||
func (m *mockSpan) RecordError(err error, options ...trace.EventOption) { |
||||
} |
||||
|
||||
func (m *mockSpan) SpanContext() trace.SpanContext { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockSpan) SetStatus(code codes.Code, description string) { |
||||
} |
||||
|
||||
func (m *mockSpan) SetName(name string) { |
||||
} |
||||
|
||||
func (m *mockSpan) SetAttributes(kv ...attribute.KeyValue) { |
||||
} |
||||
|
||||
func (m *mockSpan) TracerProvider() trace.TracerProvider { |
||||
return nil |
||||
} |
||||
|
||||
type mockMessage struct{} |
||||
|
||||
func (m mockMessage) ProtoReflect() protoreflect.Message { |
||||
return new(dynamicpb.Message) |
||||
} |
@ -1,11 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/propagation" |
||||
) |
||||
|
||||
func init() { |
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( |
||||
propagation.TraceContext{}, propagation.Baggage{})) |
||||
} |
@ -1,56 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"go.opentelemetry.io/otel/baggage" |
||||
"go.opentelemetry.io/otel/propagation" |
||||
sdktrace "go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/metadata" |
||||
) |
||||
|
||||
// assert that metadataSupplier implements the TextMapCarrier interface
|
||||
var _ propagation.TextMapCarrier = (*metadataSupplier)(nil) |
||||
|
||||
type metadataSupplier struct { |
||||
metadata *metadata.MD |
||||
} |
||||
|
||||
func (s *metadataSupplier) Get(key string) string { |
||||
values := s.metadata.Get(key) |
||||
if len(values) == 0 { |
||||
return "" |
||||
} |
||||
|
||||
return values[0] |
||||
} |
||||
|
||||
func (s *metadataSupplier) Set(key, value string) { |
||||
s.metadata.Set(key, value) |
||||
} |
||||
|
||||
func (s *metadataSupplier) Keys() []string { |
||||
out := make([]string, 0, len(*s.metadata)) |
||||
for key := range *s.metadata { |
||||
out = append(out, key) |
||||
} |
||||
|
||||
return out |
||||
} |
||||
|
||||
// Inject injects cross-cutting concerns from the ctx into the metadata.
|
||||
func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) { |
||||
p.Inject(ctx, &metadataSupplier{ |
||||
metadata: metadata, |
||||
}) |
||||
} |
||||
|
||||
// Extract extracts the metadata from ctx.
|
||||
func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) ( |
||||
baggage.Baggage, sdktrace.SpanContext) { |
||||
ctx = p.Extract(ctx, &metadataSupplier{ |
||||
metadata: metadata, |
||||
}) |
||||
|
||||
return baggage.FromContext(ctx), sdktrace.SpanContextFromContext(ctx) |
||||
} |
@ -1,356 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/propagation" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/metadata" |
||||
) |
||||
|
||||
const ( |
||||
traceIDStr = "4bf92f3577b34da6a3ce929d0e0e4736" |
||||
spanIDStr = "00f067aa0ba902b7" |
||||
) |
||||
|
||||
var ( |
||||
traceID = mustTraceIDFromHex(traceIDStr) |
||||
spanID = mustSpanIDFromHex(spanIDStr) |
||||
) |
||||
|
||||
func mustTraceIDFromHex(s string) (t trace.TraceID) { |
||||
var err error |
||||
t, err = trace.TraceIDFromHex(s) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
return |
||||
} |
||||
|
||||
func mustSpanIDFromHex(s string) (t trace.SpanID) { |
||||
var err error |
||||
t, err = trace.SpanIDFromHex(s) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
return |
||||
} |
||||
|
||||
func TestExtractValidTraceContext(t *testing.T) { |
||||
stateStr := "key1=value1,key2=value2" |
||||
state, err := trace.ParseTraceState(stateStr) |
||||
require.NoError(t, err) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
traceparent string |
||||
tracestate string |
||||
sc trace.SpanContext |
||||
}{ |
||||
{ |
||||
name: "not sampled", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "sampled", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceFlags: trace.FlagsSampled, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "valid tracestate", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
tracestate: stateStr, |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceState: state, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "invalid tracestate perserves traceparent", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
tracestate: "invalid$@#=invalid", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version not sampled", |
||||
traceparent: "02-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version sampled", |
||||
traceparent: "02-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceFlags: trace.FlagsSampled, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version sample bit set", |
||||
traceparent: "02-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-09", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceFlags: trace.FlagsSampled, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version sample bit not set", |
||||
traceparent: "02-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-08", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version additional data", |
||||
traceparent: "02-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00-XYZxsf09", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "B3 format ending in dash", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00-", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "future version B3 format ending in dash", |
||||
traceparent: "03-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00-", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
} |
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( |
||||
propagation.TraceContext{}, propagation.Baggage{})) |
||||
propagator := otel.GetTextMapPropagator() |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
ctx := context.Background() |
||||
md := metadata.MD{} |
||||
md.Set("traceparent", tt.traceparent) |
||||
md.Set("tracestate", tt.tracestate) |
||||
_, spanCtx := Extract(ctx, propagator, &md) |
||||
assert.Equal(t, tt.sc, spanCtx) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestExtractInvalidTraceContext(t *testing.T) { |
||||
tests := []struct { |
||||
name string |
||||
header string |
||||
}{ |
||||
{ |
||||
name: "wrong version length", |
||||
header: "0000-00000000000000000000000000000000-0000000000000000-01", |
||||
}, |
||||
{ |
||||
name: "wrong trace ID length", |
||||
header: "00-ab00000000000000000000000000000000-cd00000000000000-01", |
||||
}, |
||||
{ |
||||
name: "wrong span ID length", |
||||
header: "00-ab000000000000000000000000000000-cd0000000000000000-01", |
||||
}, |
||||
{ |
||||
name: "wrong trace flag length", |
||||
header: "00-ab000000000000000000000000000000-cd00000000000000-0100", |
||||
}, |
||||
{ |
||||
name: "bogus version", |
||||
header: "qw-00000000000000000000000000000000-0000000000000000-01", |
||||
}, |
||||
{ |
||||
name: "bogus trace ID", |
||||
header: "00-qw000000000000000000000000000000-cd00000000000000-01", |
||||
}, |
||||
{ |
||||
name: "bogus span ID", |
||||
header: "00-ab000000000000000000000000000000-qw00000000000000-01", |
||||
}, |
||||
{ |
||||
name: "bogus trace flag", |
||||
header: "00-ab000000000000000000000000000000-cd00000000000000-qw", |
||||
}, |
||||
{ |
||||
name: "upper case version", |
||||
header: "A0-00000000000000000000000000000000-0000000000000000-01", |
||||
}, |
||||
{ |
||||
name: "upper case trace ID", |
||||
header: "00-AB000000000000000000000000000000-cd00000000000000-01", |
||||
}, |
||||
{ |
||||
name: "upper case span ID", |
||||
header: "00-ab000000000000000000000000000000-CD00000000000000-01", |
||||
}, |
||||
{ |
||||
name: "upper case trace flag", |
||||
header: "00-ab000000000000000000000000000000-cd00000000000000-A1", |
||||
}, |
||||
{ |
||||
name: "zero trace ID and span ID", |
||||
header: "00-00000000000000000000000000000000-0000000000000000-01", |
||||
}, |
||||
{ |
||||
name: "trace-flag unused bits set", |
||||
header: "00-ab000000000000000000000000000000-cd00000000000000-09", |
||||
}, |
||||
{ |
||||
name: "missing options", |
||||
header: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7", |
||||
}, |
||||
{ |
||||
name: "empty options", |
||||
header: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-", |
||||
}, |
||||
} |
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( |
||||
propagation.TraceContext{}, propagation.Baggage{})) |
||||
propagator := otel.GetTextMapPropagator() |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
ctx := context.Background() |
||||
md := metadata.MD{} |
||||
md.Set("traceparent", tt.header) |
||||
_, spanCtx := Extract(ctx, propagator, &md) |
||||
assert.Equal(t, trace.SpanContext{}, spanCtx) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestInjectValidTraceContext(t *testing.T) { |
||||
stateStr := "key1=value1,key2=value2" |
||||
state, err := trace.ParseTraceState(stateStr) |
||||
require.NoError(t, err) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
traceparent string |
||||
tracestate string |
||||
sc trace.SpanContext |
||||
}{ |
||||
{ |
||||
name: "not sampled", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "sampled", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceFlags: trace.FlagsSampled, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "unsupported trace flag bits dropped", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceFlags: 0xff, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
{ |
||||
name: "with tracestate", |
||||
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", |
||||
tracestate: stateStr, |
||||
sc: trace.NewSpanContext(trace.SpanContextConfig{ |
||||
TraceID: traceID, |
||||
SpanID: spanID, |
||||
TraceState: state, |
||||
Remote: true, |
||||
}), |
||||
}, |
||||
} |
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( |
||||
propagation.TraceContext{}, propagation.Baggage{})) |
||||
propagator := otel.GetTextMapPropagator() |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
ctx := context.Background() |
||||
ctx = trace.ContextWithRemoteSpanContext(ctx, tt.sc) |
||||
|
||||
want := metadata.MD{} |
||||
want.Set("traceparent", tt.traceparent) |
||||
if len(tt.tracestate) > 0 { |
||||
want.Set("tracestate", tt.tracestate) |
||||
} |
||||
|
||||
md := metadata.MD{} |
||||
Inject(ctx, propagator, &md) |
||||
assert.Equal(t, want, md) |
||||
|
||||
mm := &metadataSupplier{ |
||||
metadata: &md, |
||||
} |
||||
assert.NotEmpty(t, mm.Keys()) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestInvalidSpanContextDropped(t *testing.T) { |
||||
invalidSC := trace.SpanContext{} |
||||
require.False(t, invalidSC.IsValid()) |
||||
ctx := trace.ContextWithRemoteSpanContext(context.Background(), invalidSC) |
||||
|
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( |
||||
propagation.TraceContext{}, propagation.Baggage{})) |
||||
propagator := otel.GetTextMapPropagator() |
||||
|
||||
md := metadata.MD{} |
||||
Inject(ctx, propagator, &md) |
||||
mm := &metadataSupplier{ |
||||
metadata: &md, |
||||
} |
||||
assert.Empty(t, mm.Keys()) |
||||
assert.Equal(t, "", mm.Get("traceparent"), "injected invalid SpanContext") |
||||
} |
@ -1,90 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
"net" |
||||
"strings" |
||||
|
||||
ztrace "github.com/zeromicro/go-zero/internal/trace" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/peer" |
||||
) |
||||
|
||||
const localhost = "127.0.0.1" |
||||
|
||||
var ( |
||||
// SpanIDFromContext returns the span id from ctx.
|
||||
SpanIDFromContext = ztrace.SpanIDFromContext |
||||
// TraceIDFromContext returns the trace id from ctx.
|
||||
TraceIDFromContext = ztrace.TraceIDFromContext |
||||
) |
||||
|
||||
// ParseFullMethod returns the method name and attributes.
|
||||
func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) { |
||||
name := strings.TrimLeft(fullMethod, "/") |
||||
parts := strings.SplitN(name, "/", 2) |
||||
if len(parts) != 2 { |
||||
// Invalid format, does not follow `/package.service/method`.
|
||||
return name, []attribute.KeyValue(nil) |
||||
} |
||||
|
||||
var attrs []attribute.KeyValue |
||||
if service := parts[0]; service != "" { |
||||
attrs = append(attrs, semconv.RPCServiceKey.String(service)) |
||||
} |
||||
if method := parts[1]; method != "" { |
||||
attrs = append(attrs, semconv.RPCMethodKey.String(method)) |
||||
} |
||||
|
||||
return name, attrs |
||||
} |
||||
|
||||
// PeerAttr returns the peer attributes.
|
||||
func PeerAttr(addr string) []attribute.KeyValue { |
||||
host, port, err := net.SplitHostPort(addr) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
|
||||
if len(host) == 0 { |
||||
host = localhost |
||||
} |
||||
|
||||
return []attribute.KeyValue{ |
||||
semconv.NetPeerIPKey.String(host), |
||||
semconv.NetPeerPortKey.String(port), |
||||
} |
||||
} |
||||
|
||||
// PeerFromCtx returns the peer from ctx.
|
||||
func PeerFromCtx(ctx context.Context) string { |
||||
p, ok := peer.FromContext(ctx) |
||||
if !ok || p == nil { |
||||
return "" |
||||
} |
||||
|
||||
return p.Addr.String() |
||||
} |
||||
|
||||
// SpanInfo returns the span info.
|
||||
func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { |
||||
attrs := []attribute.KeyValue{RPCSystemGRPC} |
||||
name, mAttrs := ParseFullMethod(fullMethod) |
||||
attrs = append(attrs, mAttrs...) |
||||
attrs = append(attrs, PeerAttr(peerAddress)...) |
||||
return name, attrs |
||||
} |
||||
|
||||
// TracerFromContext returns a tracer in ctx, otherwise returns a global tracer.
|
||||
func TracerFromContext(ctx context.Context) (tracer trace.Tracer) { |
||||
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { |
||||
tracer = span.TracerProvider().Tracer(TraceName) |
||||
} else { |
||||
tracer = otel.Tracer(TraceName) |
||||
} |
||||
|
||||
return |
||||
} |
@ -1,204 +0,0 @@ |
||||
package trace |
||||
|
||||
import ( |
||||
"context" |
||||
"net" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/peer" |
||||
) |
||||
|
||||
func TestPeerFromContext(t *testing.T) { |
||||
addrs, err := net.InterfaceAddrs() |
||||
assert.Nil(t, err) |
||||
assert.NotEmpty(t, addrs) |
||||
tests := []struct { |
||||
name string |
||||
ctx context.Context |
||||
empty bool |
||||
}{ |
||||
{ |
||||
name: "empty", |
||||
ctx: context.Background(), |
||||
empty: true, |
||||
}, |
||||
{ |
||||
name: "nil", |
||||
ctx: peer.NewContext(context.Background(), nil), |
||||
empty: true, |
||||
}, |
||||
{ |
||||
name: "with value", |
||||
ctx: peer.NewContext(context.Background(), &peer.Peer{ |
||||
Addr: addrs[0], |
||||
}), |
||||
}, |
||||
} |
||||
|
||||
for _, test := range tests { |
||||
test := test |
||||
t.Run(test.name, func(t *testing.T) { |
||||
t.Parallel() |
||||
addr := PeerFromCtx(test.ctx) |
||||
assert.Equal(t, test.empty, len(addr) == 0) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestParseFullMethod(t *testing.T) { |
||||
tests := []struct { |
||||
fullMethod string |
||||
name string |
||||
attr []attribute.KeyValue |
||||
}{ |
||||
{ |
||||
fullMethod: "/grpc.test.EchoService/Echo", |
||||
name: "grpc.test.EchoService/Echo", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("grpc.test.EchoService"), |
||||
semconv.RPCMethodKey.String("Echo"), |
||||
}, |
||||
}, { |
||||
fullMethod: "/com.example.ExampleRmiService/exampleMethod", |
||||
name: "com.example.ExampleRmiService/exampleMethod", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("com.example.ExampleRmiService"), |
||||
semconv.RPCMethodKey.String("exampleMethod"), |
||||
}, |
||||
}, { |
||||
fullMethod: "/MyCalcService.Calculator/Add", |
||||
name: "MyCalcService.Calculator/Add", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("MyCalcService.Calculator"), |
||||
semconv.RPCMethodKey.String("Add"), |
||||
}, |
||||
}, { |
||||
fullMethod: "/MyServiceReference.ICalculator/Add", |
||||
name: "MyServiceReference.ICalculator/Add", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("MyServiceReference.ICalculator"), |
||||
semconv.RPCMethodKey.String("Add"), |
||||
}, |
||||
}, { |
||||
fullMethod: "/MyServiceWithNoPackage/theMethod", |
||||
name: "MyServiceWithNoPackage/theMethod", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("MyServiceWithNoPackage"), |
||||
semconv.RPCMethodKey.String("theMethod"), |
||||
}, |
||||
}, { |
||||
fullMethod: "/pkg.svr", |
||||
name: "pkg.svr", |
||||
attr: []attribute.KeyValue(nil), |
||||
}, { |
||||
fullMethod: "/pkg.svr/", |
||||
name: "pkg.svr/", |
||||
attr: []attribute.KeyValue{ |
||||
semconv.RPCServiceKey.String("pkg.svr"), |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, test := range tests { |
||||
n, a := ParseFullMethod(test.fullMethod) |
||||
assert.Equal(t, test.name, n) |
||||
assert.Equal(t, test.attr, a) |
||||
} |
||||
} |
||||
|
||||
func TestSpanInfo(t *testing.T) { |
||||
val, kvs := SpanInfo("/fullMethod", "remote") |
||||
assert.Equal(t, "fullMethod", val) |
||||
assert.NotEmpty(t, kvs) |
||||
} |
||||
|
||||
func TestPeerAttr(t *testing.T) { |
||||
tests := []struct { |
||||
name string |
||||
addr string |
||||
expect []attribute.KeyValue |
||||
}{ |
||||
{ |
||||
name: "empty", |
||||
}, |
||||
{ |
||||
name: "port only", |
||||
addr: ":8080", |
||||
expect: []attribute.KeyValue{ |
||||
semconv.NetPeerIPKey.String(localhost), |
||||
semconv.NetPeerPortKey.String("8080"), |
||||
}, |
||||
}, |
||||
{ |
||||
name: "port only", |
||||
addr: "192.168.0.2:8080", |
||||
expect: []attribute.KeyValue{ |
||||
semconv.NetPeerIPKey.String("192.168.0.2"), |
||||
semconv.NetPeerPortKey.String("8080"), |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, test := range tests { |
||||
test := test |
||||
t.Run(test.name, func(t *testing.T) { |
||||
t.Parallel() |
||||
kvs := PeerAttr(test.addr) |
||||
assert.EqualValues(t, test.expect, kvs) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestTracerFromContext(t *testing.T) { |
||||
traceFn := func(ctx context.Context, hasTraceId bool) { |
||||
spanContext := trace.SpanContextFromContext(ctx) |
||||
assert.Equal(t, spanContext.IsValid(), hasTraceId) |
||||
parentTraceId := spanContext.TraceID().String() |
||||
|
||||
tracer := TracerFromContext(ctx) |
||||
_, span := tracer.Start(ctx, "b") |
||||
defer span.End() |
||||
|
||||
spanContext = span.SpanContext() |
||||
assert.True(t, spanContext.IsValid()) |
||||
if hasTraceId { |
||||
assert.Equal(t, parentTraceId, spanContext.TraceID().String()) |
||||
} |
||||
|
||||
} |
||||
|
||||
t.Run("context", func(t *testing.T) { |
||||
opts := []sdktrace.TracerProviderOption{ |
||||
// Set the sampling rate based on the parent span to 100%
|
||||
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(1))), |
||||
// Record information about this application in a Resource.
|
||||
sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String("test"))), |
||||
} |
||||
tp = sdktrace.NewTracerProvider(opts...) |
||||
otel.SetTracerProvider(tp) |
||||
ctx, span := tp.Tracer(TraceName).Start(context.Background(), "a") |
||||
|
||||
defer span.End() |
||||
traceFn(ctx, true) |
||||
}) |
||||
|
||||
t.Run("global", func(t *testing.T) { |
||||
opts := []sdktrace.TracerProviderOption{ |
||||
// Set the sampling rate based on the parent span to 100%
|
||||
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(1))), |
||||
// Record information about this application in a Resource.
|
||||
sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String("test"))), |
||||
} |
||||
tp = sdktrace.NewTracerProvider(opts...) |
||||
otel.SetTracerProvider(tp) |
||||
|
||||
traceFn(context.Background(), false) |
||||
}) |
||||
} |
@ -1,8 +0,0 @@ |
||||
package trace |
||||
|
||||
import "net/http" |
||||
|
||||
// TraceIdKey is the trace id header.
|
||||
// https://www.w3.org/TR/trace-context/#trace-id
|
||||
// May change it to trace-id afterwards.
|
||||
var TraceIdKey = http.CanonicalHeaderKey("x-trace-id") |
@ -1,11 +0,0 @@ |
||||
package devserver |
||||
|
||||
type Config struct { |
||||
Enabled bool `json:",default=true"` |
||||
Host string `json:",optional"` |
||||
Port int `json:",default=6470"` |
||||
MetricsPath string `json:",default=/metrics"` |
||||
HealthPath string `json:",default=/healthz"` |
||||
EnableMetrics bool `json:",default=true"` |
||||
EnablePprof bool `json:",default=true"` |
||||
} |
@ -1,73 +0,0 @@ |
||||
package devserver |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"git.diulo.com/mogfee/kit/core/logx" |
||||
"git.diulo.com/mogfee/kit/internal/health" |
||||
"github.com/felixge/fgprof" |
||||
"github.com/prometheus/client_golang/prometheus/promhttp" |
||||
"github.com/zeromicro/go-zero/core/threading" |
||||
"net/http" |
||||
"net/http/pprof" |
||||
"sync" |
||||
) |
||||
|
||||
var once sync.Once |
||||
|
||||
type Server struct { |
||||
config Config |
||||
server *http.ServeMux |
||||
routes []string |
||||
} |
||||
|
||||
func NewServer(cfg Config) *Server { |
||||
return &Server{ |
||||
config: cfg, |
||||
server: http.NewServeMux(), |
||||
} |
||||
} |
||||
func (s *Server) addRoutes() { |
||||
s.handleFunc("/", func(writer http.ResponseWriter, r *http.Request) { |
||||
_ = json.NewEncoder(writer).Encode(s.routes) |
||||
}) |
||||
//health
|
||||
s.handleFunc(s.config.HealthPath, health.CreateHttpHandler()) |
||||
//metrics
|
||||
if s.config.EnableMetrics { |
||||
s.handleFunc(s.config.MetricsPath, promhttp.Handler().ServeHTTP) |
||||
} |
||||
|
||||
if s.config.EnablePprof { |
||||
|
||||
s.handleFunc("/debug/fgprof", fgprof.Handler().(http.HandlerFunc)) |
||||
s.handleFunc("/debug/pprof/", pprof.Index) |
||||
s.handleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
||||
s.handleFunc("/debug/pprof/profile", pprof.Profile) |
||||
s.handleFunc("/debug/pprof/symbol", pprof.Symbol) |
||||
s.handleFunc("/debug/pprof/trace", pprof.Trace) |
||||
} |
||||
} |
||||
func (s *Server) handleFunc(pattern string, handler http.HandlerFunc) { |
||||
s.server.HandleFunc(pattern, handler) |
||||
s.routes = append(s.routes, pattern) |
||||
} |
||||
|
||||
func (s *Server) StartASync() { |
||||
s.addRoutes() |
||||
threading.GoSafe(func() { |
||||
addr := fmt.Sprintf("%s:%d", s.config.Host, s.config.Port) |
||||
logx.Infof("Starting dev http server at %s", addr) |
||||
if err := http.ListenAndServe(addr, s.server); err != nil { |
||||
logx.Error(err) |
||||
} |
||||
}) |
||||
} |
||||
func StartAgent(c Config) { |
||||
once.Do(func() { |
||||
if c.Enabled { |
||||
s := NewServer(c) |
||||
s.StartASync() |
||||
} |
||||
}) |
||||
} |
@ -1,109 +0,0 @@ |
||||
package health |
||||
|
||||
import ( |
||||
"fmt" |
||||
"github.com/zeromicro/go-zero/core/syncx" |
||||
"net/http" |
||||
"strings" |
||||
"sync" |
||||
) |
||||
|
||||
var defaultHealthManager = newComboHealthManager() |
||||
|
||||
type ( |
||||
Probe interface { |
||||
MarkReady() |
||||
MarkNotReady() |
||||
IsReady() bool |
||||
Name() string |
||||
} |
||||
healthManager struct { |
||||
ready syncx.AtomicBool |
||||
name string |
||||
} |
||||
comboHealthManager struct { |
||||
mu sync.Mutex |
||||
probes []Probe |
||||
} |
||||
) |
||||
|
||||
func AddProbe(probe Probe) { |
||||
defaultHealthManager.addProbe(probe) |
||||
} |
||||
func CreateHttpHandler() http.HandlerFunc { |
||||
return func(w http.ResponseWriter, r *http.Request) { |
||||
if defaultHealthManager.IsReady() { |
||||
_, _ = w.Write([]byte("OK")) |
||||
} else { |
||||
http.Error(w, "Service Unavailable\n"+defaultHealthManager.verboseInfo(), |
||||
http.StatusServiceUnavailable) |
||||
} |
||||
} |
||||
} |
||||
func NewHealthManager(name string) Probe { |
||||
return &healthManager{ |
||||
name: name, |
||||
} |
||||
} |
||||
|
||||
func (h *healthManager) MarkReady() { |
||||
h.ready.Set(true) |
||||
} |
||||
|
||||
func (h *healthManager) MarkNotReady() { |
||||
h.ready.Set(false) |
||||
} |
||||
|
||||
func (h *healthManager) IsReady() bool { |
||||
return h.ready.True() |
||||
} |
||||
|
||||
func (h *healthManager) Name() string { |
||||
return h.name |
||||
} |
||||
func newComboHealthManager() *comboHealthManager { |
||||
return &comboHealthManager{} |
||||
} |
||||
|
||||
func (p *comboHealthManager) MarkReady() { |
||||
p.mu.Lock() |
||||
defer p.mu.Unlock() |
||||
for _, probe := range p.probes { |
||||
probe.MarkReady() |
||||
} |
||||
} |
||||
func (p *comboHealthManager) MarkNotReady() { |
||||
p.mu.Lock() |
||||
defer p.mu.Unlock() |
||||
for _, probe := range p.probes { |
||||
probe.MarkNotReady() |
||||
} |
||||
} |
||||
func (p *comboHealthManager) IsReady() bool { |
||||
p.mu.Lock() |
||||
defer p.mu.Unlock() |
||||
for _, probe := range p.probes { |
||||
if !probe.IsReady() { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
func (p *comboHealthManager) verboseInfo() string { |
||||
p.mu.Lock() |
||||
defer p.mu.Unlock() |
||||
var info strings.Builder |
||||
for _, probe := range p.probes { |
||||
if probe.IsReady() { |
||||
info.WriteString(fmt.Sprintf("%s is ready\n", probe.Name())) |
||||
} else { |
||||
info.WriteString(fmt.Sprintf("%s is not ready\n", probe.Name())) |
||||
} |
||||
} |
||||
return info.String() |
||||
} |
||||
func (p *comboHealthManager) addProbe(probe Probe) { |
||||
p.mu.Lock() |
||||
defer p.mu.Unlock() |
||||
p.probes = append(p.probes, probe) |
||||
} |
Loading…
Reference in new issue