mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 19:29:57 +00:00
TUN-5989: Add in-memory otlp exporter
This commit is contained in:
90
tracing/client.go
Normal file
90
tracing/client.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTraceAmount = 20
|
||||
)
|
||||
|
||||
var (
|
||||
errNoTraces = errors.New("no traces recorded to be exported")
|
||||
)
|
||||
|
||||
type InMemoryClient interface {
|
||||
// Spans returns a copy of the list of in-memory stored spans as a base64
|
||||
// encoded otlp protobuf string.
|
||||
Spans() (string, error)
|
||||
}
|
||||
|
||||
// InMemoryOtlpClient is a client implementation for otlptrace.Client
|
||||
type InMemoryOtlpClient struct {
|
||||
mu sync.Mutex
|
||||
spans []*tracepb.ResourceSpans
|
||||
}
|
||||
|
||||
func (mc *InMemoryOtlpClient) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *InMemoryOtlpClient) Stop(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UploadTraces adds the provided list of spans to the in-memory list.
|
||||
func (mc *InMemoryOtlpClient) UploadTraces(_ context.Context, protoSpans []*tracepb.ResourceSpans) error {
|
||||
mc.mu.Lock()
|
||||
defer mc.mu.Unlock()
|
||||
// Catch to make sure too many traces aren't being added to response header.
|
||||
// Returning nil makes sure we don't fail to send the traces we already recorded.
|
||||
if len(mc.spans)+len(protoSpans) > maxTraceAmount {
|
||||
return nil
|
||||
}
|
||||
mc.spans = append(mc.spans, protoSpans...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Spans returns the list of in-memory stored spans as a base64 encoded otlp protobuf string.
|
||||
func (mc *InMemoryOtlpClient) Spans() (string, error) {
|
||||
mc.mu.Lock()
|
||||
defer mc.mu.Unlock()
|
||||
if len(mc.spans) <= 0 {
|
||||
return "", errNoTraces
|
||||
}
|
||||
pbRequest := &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: mc.spans,
|
||||
}
|
||||
data, err := proto.Marshal(pbRequest)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(data), nil
|
||||
}
|
||||
|
||||
// NoopOtlpClient is a client implementation for otlptrace.Client that does nothing
|
||||
type NoopOtlpClient struct{}
|
||||
|
||||
func (mc *NoopOtlpClient) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *NoopOtlpClient) Stop(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *NoopOtlpClient) UploadTraces(_ context.Context, _ []*tracepb.ResourceSpans) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Spans always returns no traces error
|
||||
func (mc *NoopOtlpClient) Spans() (string, error) {
|
||||
return "", errNoTraces
|
||||
}
|
161
tracing/client_test.go
Normal file
161
tracing/client_test.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
|
||||
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
resourceSchemaUrl = "http://example.com/custom-resource-schema"
|
||||
instrumentSchemaUrl = semconv.SchemaURL
|
||||
)
|
||||
|
||||
var (
|
||||
traceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
|
||||
spanId = []byte{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8}
|
||||
parentSpanId = []byte{0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x09, 0x08}
|
||||
startTime = time.Date(2022, 4, 4, 0, 0, 0, 0, time.UTC)
|
||||
endTime = startTime.Add(5 * time.Second)
|
||||
|
||||
traceState, _ = trace.ParseTraceState("key1=val1,key2=val2")
|
||||
instrScope = &commonpb.InstrumentationScope{Name: "go.opentelemetry.io/test/otel", Version: "v1.6.0"}
|
||||
otlpKeyValues = []*commonpb.KeyValue{
|
||||
{
|
||||
Key: "string_key",
|
||||
Value: &commonpb.AnyValue{
|
||||
Value: &commonpb.AnyValue_StringValue{
|
||||
StringValue: "string value",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "bool_key",
|
||||
Value: &commonpb.AnyValue{
|
||||
Value: &commonpb.AnyValue_BoolValue{
|
||||
BoolValue: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
otlpResource = &resourcepb.Resource{
|
||||
Attributes: []*commonpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &commonpb.AnyValue{
|
||||
Value: &commonpb.AnyValue_StringValue{
|
||||
StringValue: "service-name",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
var _ otlptrace.Client = (*InMemoryOtlpClient)(nil)
|
||||
var _ InMemoryClient = (*InMemoryOtlpClient)(nil)
|
||||
var _ otlptrace.Client = (*NoopOtlpClient)(nil)
|
||||
var _ InMemoryClient = (*NoopOtlpClient)(nil)
|
||||
|
||||
func TestUploadTraces(t *testing.T) {
|
||||
client := &InMemoryOtlpClient{}
|
||||
spans := createResourceSpans([]*tracepb.Span{createOtlpSpan(traceId)})
|
||||
spans2 := createResourceSpans([]*tracepb.Span{createOtlpSpan(traceId)})
|
||||
err := client.UploadTraces(context.Background(), spans)
|
||||
assert.NoError(t, err)
|
||||
err = client.UploadTraces(context.Background(), spans2)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, client.spans, 2)
|
||||
}
|
||||
|
||||
func TestSpans(t *testing.T) {
|
||||
client := &InMemoryOtlpClient{}
|
||||
spans := createResourceSpans([]*tracepb.Span{createOtlpSpan(traceId)})
|
||||
err := client.UploadTraces(context.Background(), spans)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, client.spans, 1)
|
||||
enc, err := client.Spans()
|
||||
assert.NoError(t, err)
|
||||
expected := "CsECCiAKHgoMc2VydmljZS5uYW1lEg4KDHNlcnZpY2UtbmFtZRLxAQonCh1nby5vcGVudGVsZW1ldHJ5LmlvL3Rlc3Qvb3RlbBIGdjEuNi4wEp0BChAAAQIDBAUGBwgJCgsMDQ4PEgj//v38+/r5+BoTa2V5MT12YWwxLGtleTI9dmFsMiIIDw4NDAsKCQgqCnRyYWNlX25hbWUwATkAANJvaYjiFkEA8teZaojiFkocCgpzdHJpbmdfa2V5Eg4KDHN0cmluZyB2YWx1ZUoOCghib29sX2tleRICEAF6EhIOc3RhdHVzIG1lc3NhZ2UYARomaHR0cHM6Ly9vcGVudGVsZW1ldHJ5LmlvL3NjaGVtYXMvMS43LjAaKWh0dHA6Ly9leGFtcGxlLmNvbS9jdXN0b20tcmVzb3VyY2Utc2NoZW1h"
|
||||
assert.Equal(t, expected, enc)
|
||||
}
|
||||
|
||||
func TestSpansEmpty(t *testing.T) {
|
||||
client := &InMemoryOtlpClient{}
|
||||
err := client.UploadTraces(context.Background(), []*tracepb.ResourceSpans{})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, client.spans, 0)
|
||||
_, err = client.Spans()
|
||||
assert.ErrorIs(t, err, errNoTraces)
|
||||
}
|
||||
|
||||
func TestSpansNil(t *testing.T) {
|
||||
client := &InMemoryOtlpClient{}
|
||||
err := client.UploadTraces(context.Background(), nil)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, client.spans, 0)
|
||||
_, err = client.Spans()
|
||||
assert.ErrorIs(t, err, errNoTraces)
|
||||
}
|
||||
|
||||
func TestSpansTooManySpans(t *testing.T) {
|
||||
client := &InMemoryOtlpClient{}
|
||||
for i := 0; i < maxTraceAmount+1; i++ {
|
||||
spans := createResourceSpans([]*tracepb.Span{createOtlpSpan(traceId)})
|
||||
err := client.UploadTraces(context.Background(), spans)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
assert.Len(t, client.spans, maxTraceAmount)
|
||||
_, err := client.Spans()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func createResourceSpans(spans []*tracepb.Span) []*tracepb.ResourceSpans {
|
||||
return []*tracepb.ResourceSpans{createResourceSpan(spans)}
|
||||
}
|
||||
|
||||
func createResourceSpan(spans []*tracepb.Span) *tracepb.ResourceSpans {
|
||||
return &tracepb.ResourceSpans{
|
||||
Resource: otlpResource,
|
||||
ScopeSpans: []*tracepb.ScopeSpans{
|
||||
{
|
||||
Scope: instrScope,
|
||||
Spans: spans,
|
||||
SchemaUrl: instrumentSchemaUrl,
|
||||
},
|
||||
},
|
||||
InstrumentationLibrarySpans: nil,
|
||||
SchemaUrl: resourceSchemaUrl,
|
||||
}
|
||||
}
|
||||
|
||||
func createOtlpSpan(tid []byte) *tracepb.Span {
|
||||
return &tracepb.Span{
|
||||
TraceId: tid,
|
||||
SpanId: spanId,
|
||||
TraceState: traceState.String(),
|
||||
ParentSpanId: parentSpanId,
|
||||
Name: "trace_name",
|
||||
Kind: tracepb.Span_SPAN_KIND_INTERNAL,
|
||||
StartTimeUnixNano: uint64(startTime.UnixNano()),
|
||||
EndTimeUnixNano: uint64(endTime.UnixNano()),
|
||||
Attributes: otlpKeyValues,
|
||||
DroppedAttributesCount: 0,
|
||||
Events: nil,
|
||||
DroppedEventsCount: 0,
|
||||
Links: nil,
|
||||
DroppedLinksCount: 0,
|
||||
Status: &tracepb.Status{
|
||||
Message: "status message",
|
||||
Code: tracepb.Status_STATUS_CODE_OK,
|
||||
},
|
||||
}
|
||||
}
|
116
tracing/tracing.go
Normal file
116
tracing/tracing.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
otelContrib "go.opentelemetry.io/contrib/propagators/Jaeger"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
service = "cloudflared"
|
||||
tracerInstrumentName = "origin"
|
||||
|
||||
tracerContextName = "cf-trace-id"
|
||||
tracerContextNameOverride = "uber-trace-id"
|
||||
)
|
||||
|
||||
var (
|
||||
Http2TransportAttribute = trace.WithAttributes(TransportAttributeKey.String("http2"))
|
||||
QuicTransportAttribute = trace.WithAttributes(TransportAttributeKey.String("quic"))
|
||||
|
||||
TransportAttributeKey = attribute.Key("transport")
|
||||
TrafficAttributeKey = attribute.Key("traffic")
|
||||
|
||||
errNoopTracerProvider = errors.New("noop tracer provider records no spans")
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register the jaeger propagator globally.
|
||||
otel.SetTextMapPropagator(otelContrib.Jaeger{})
|
||||
}
|
||||
|
||||
type TracedRequest struct {
|
||||
*http.Request
|
||||
trace.TracerProvider
|
||||
exporter InMemoryClient
|
||||
}
|
||||
|
||||
// NewTracedRequest creates a new tracer for the current request context.
|
||||
func NewTracedRequest(req *http.Request) *TracedRequest {
|
||||
ctx, exists := extractTrace(req)
|
||||
if !exists {
|
||||
return &TracedRequest{req, trace.NewNoopTracerProvider(), &NoopOtlpClient{}}
|
||||
}
|
||||
mc := new(InMemoryOtlpClient)
|
||||
exp, err := otlptrace.New(req.Context(), mc)
|
||||
if err != nil {
|
||||
return &TracedRequest{req, trace.NewNoopTracerProvider(), &NoopOtlpClient{}}
|
||||
}
|
||||
tp := tracesdk.NewTracerProvider(
|
||||
// We want to dump to in-memory exporter immediately
|
||||
tracesdk.WithSyncer(exp),
|
||||
// Record information about this application in a Resource.
|
||||
tracesdk.WithResource(resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceNameKey.String(service),
|
||||
)),
|
||||
)
|
||||
|
||||
return &TracedRequest{req.WithContext(ctx), tp, mc}
|
||||
}
|
||||
|
||||
func (cft *TracedRequest) Tracer() trace.Tracer {
|
||||
return cft.TracerProvider.Tracer(tracerInstrumentName)
|
||||
}
|
||||
|
||||
// Spans returns the spans as base64 encoded protobuf otlp traces.
|
||||
func (cft *TracedRequest) Spans() (string, error) {
|
||||
return cft.exporter.Spans()
|
||||
}
|
||||
|
||||
// EndWithStatus will set a status for the span and then end it.
|
||||
func EndWithStatus(span trace.Span, code codes.Code, status string) {
|
||||
if span == nil {
|
||||
return
|
||||
}
|
||||
span.SetStatus(code, status)
|
||||
span.End()
|
||||
}
|
||||
|
||||
// extractTrace attempts to check for a cf-trace-id from a request header.
|
||||
func extractTrace(req *http.Request) (context.Context, bool) {
|
||||
// Only add tracing for requests with appropriately tagged headers
|
||||
remoteTraces := req.Header.Values(tracerContextName)
|
||||
if len(remoteTraces) <= 0 {
|
||||
// Strip the cf-trace-id header
|
||||
req.Header.Del(tracerContextName)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
traceHeader := make(map[string]string, 1)
|
||||
for _, t := range remoteTraces {
|
||||
// Override the 'cf-trace-id' as 'uber-trace-id' so the jaeger propagator can extract it.
|
||||
// Last entry wins if multiple provided
|
||||
traceHeader[tracerContextNameOverride] = t
|
||||
}
|
||||
|
||||
// Strip the cf-trace-id header
|
||||
req.Header.Del(tracerContextName)
|
||||
|
||||
if traceHeader[tracerContextNameOverride] == "" {
|
||||
return nil, false
|
||||
}
|
||||
remoteCtx := otel.GetTextMapPropagator().Extract(req.Context(), propagation.MapCarrier(traceHeader))
|
||||
return remoteCtx, true
|
||||
}
|
50
tracing/tracing_test.go
Normal file
50
tracing/tracing_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func TestNewCfTracer(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "http://localhost", nil)
|
||||
req.Header.Add(tracerContextName, "14cb070dde8e51fc5ae8514e69ba42ca:b38f1bf5eae406f3:0:1")
|
||||
tr := NewTracedRequest(req)
|
||||
assert.NotNil(t, tr)
|
||||
assert.IsType(t, tracesdk.NewTracerProvider(), tr.TracerProvider)
|
||||
assert.IsType(t, &InMemoryOtlpClient{}, tr.exporter)
|
||||
}
|
||||
|
||||
func TestNewCfTracerMultiple(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "http://localhost", nil)
|
||||
req.Header.Add(tracerContextName, "1241ce3ecdefc68854e8514e69ba42ca:b38f1bf5eae406f3:0:1")
|
||||
req.Header.Add(tracerContextName, "14cb070dde8e51fc5ae8514e69ba42ca:b38f1bf5eae406f3:0:1")
|
||||
tr := NewTracedRequest(req)
|
||||
assert.NotNil(t, tr)
|
||||
assert.IsType(t, tracesdk.NewTracerProvider(), tr.TracerProvider)
|
||||
assert.IsType(t, &InMemoryOtlpClient{}, tr.exporter)
|
||||
}
|
||||
|
||||
func TestNewCfTracerNilHeader(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "http://localhost", nil)
|
||||
req.Header[http.CanonicalHeaderKey(tracerContextName)] = nil
|
||||
tr := NewTracedRequest(req)
|
||||
assert.NotNil(t, tr)
|
||||
assert.IsType(t, trace.NewNoopTracerProvider(), tr.TracerProvider)
|
||||
assert.IsType(t, &NoopOtlpClient{}, tr.exporter)
|
||||
}
|
||||
|
||||
func TestNewCfTracerInvalidHeaders(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "http://localhost", nil)
|
||||
for _, test := range [][]string{nil, {""}} {
|
||||
req.Header[http.CanonicalHeaderKey(tracerContextName)] = test
|
||||
tr := NewTracedRequest(req)
|
||||
assert.NotNil(t, tr)
|
||||
assert.IsType(t, trace.NewNoopTracerProvider(), tr.TracerProvider)
|
||||
assert.IsType(t, &NoopOtlpClient{}, tr.exporter)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user