1
0
Fork 0

Added integration support for DataDog APM Tracing

This commit is contained in:
Alex Antonov 2018-06-28 11:40:04 -05:00 committed by Traefiker Bot
parent ba8c9295ac
commit 3192307d59
61 changed files with 9999 additions and 5 deletions

View file

@ -0,0 +1,42 @@
package tracer
import (
"context"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
)
type contextKey struct{}
var activeSpanKey = contextKey{}
// ContextWithSpan returns a copy of the given context which includes the span s.
func ContextWithSpan(ctx context.Context, s Span) context.Context {
return context.WithValue(ctx, activeSpanKey, s)
}
// SpanFromContext returns the span contained in the given context. A second return
// value indicates if a span was found in the context. If no span is found, a no-op
// span is returned.
func SpanFromContext(ctx context.Context) (Span, bool) {
if ctx == nil {
return &internal.NoopSpan{}, false
}
v := ctx.Value(activeSpanKey)
if s, ok := v.(ddtrace.Span); ok {
return s, true
}
return &internal.NoopSpan{}, false
}
// StartSpanFromContext returns a new span with the given operation name and options. If a span
// is found in the context, it will be used as the parent of the resulting span. If the ChildOf
// option is passed, the span from context will take precedence over it as the parent span.
func StartSpanFromContext(ctx context.Context, operationName string, opts ...StartSpanOption) (Span, context.Context) {
if s, ok := SpanFromContext(ctx); ok {
opts = append(opts, ChildOf(s.Context()))
}
s := StartSpan(operationName, opts...)
return s, ContextWithSpan(ctx, s)
}

View file

@ -0,0 +1,50 @@
// Package tracer contains Datadog's core tracing client. It is used to trace
// requests as they flow across web servers, databases and microservices, giving
// developers visibility into bottlenecks and troublesome requests. To start the
// tracer, simply call the start method along with an optional set of options.
// By default, the trace agent is considered to be found at "localhost:8126". In a
// setup where this would be different (let's say 127.0.0.1:1234), we could do:
// tracer.Start(tracer.WithAgentAddr("127.0.0.1:1234"))
// defer tracer.Stop()
//
// The tracing client can perform trace sampling. While the trace agent
// already samples traces to reduce bandwidth usage, client sampling reduces
// performance overhead. To make use of it, the package comes with a ready-to-use
// rate sampler that can be passed to the tracer. To use it and keep only 30% of the
// requests, one would do:
// s := tracer.NewRateSampler(0.3)
// tracer.Start(tracer.WithSampler(s))
//
// All spans created by the tracer contain a context hereby referred to as the span
// context. Note that this is different from Go's context. The span context is used
// to package essential information from a span, which is needed when creating child
// spans that inherit from it. Thus, a child span is created from a span's span context.
// The span context can originate from within the same process, but also a
// different process or even a different machine in the case of distributed tracing.
//
// To make use of distributed tracing, a span's context may be injected via a carrier
// into a transport (HTTP, RPC, etc.) to be extracted on the other end and used to
// create spans that are direct descendants of it. A couple of carrier interfaces
// which should cover most of the use-case scenarios are readily provided, such as
// HTTPCarrier and TextMapCarrier. Users are free to create their own, which will work
// with our propagation algorithm as long as they implement the TextMapReader and TextMapWriter
// interfaces. An example alternate implementation is the MDCarrier in our gRPC integration.
//
// As an example, injecting a span's context into an HTTP request would look like this:
// req, err := http.NewRequest("GET", "http://example.com", nil)
// // ...
// err := tracer.Inject(span.Context(), tracer.HTTPHeadersCarrier(req.Header))
// // ...
// http.DefaultClient.Do(req)
// Then, on the server side, to continue the trace one would do:
// sctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(req.Header))
// // ...
// span := tracer.StartSpan("child.span", tracer.ChildOf(sctx))
// In the same manner, any means can be used as a carrier to inject a context into a transport. Go's
// context can also be used as a means to transport spans within the same process. The methods
// StartSpanFromContext, ContextWithSpan and SpanFromContext exist for this reason.
//
// Some libraries and frameworks are supported out-of-the-box by using one
// of our integrations. You can see a list of supported integrations here:
// https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/contrib
package tracer // import "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

View file

@ -0,0 +1,69 @@
package tracer
import (
"fmt"
"log"
"strconv"
)
var errorPrefix = fmt.Sprintf("Datadog Tracer Error (%s): ", tracerVersion)
type traceEncodingError struct{ context error }
func (e *traceEncodingError) Error() string {
return fmt.Sprintf("error encoding trace: %s", e.context)
}
type spanBufferFullError struct{}
func (e *spanBufferFullError) Error() string {
return fmt.Sprintf("trace span cap (%d) reached, dropping trace", traceMaxSize)
}
type dataLossError struct {
count int // number of items lost
context error // any context error, if available
}
func (e *dataLossError) Error() string {
return fmt.Sprintf("lost traces (count: %d), error: %v", e.count, e.context)
}
type errorSummary struct {
Count int
Example string
}
func aggregateErrors(errChan <-chan error) map[string]errorSummary {
errs := make(map[string]errorSummary, len(errChan))
for {
select {
case err := <-errChan:
if err == nil {
break
}
key := fmt.Sprintf("%T", err)
summary := errs[key]
summary.Count++
summary.Example = err.Error()
errs[key] = summary
default: // stop when there's no more data
return errs
}
}
}
// logErrors logs the errors, preventing log file flooding, when there
// are many messages, it caps them and shows a quick summary.
// As of today it only logs using standard golang log package, but
// later we could send those stats to agent // TODO(ufoot).
func logErrors(errChan <-chan error) {
errs := aggregateErrors(errChan)
for _, v := range errs {
var repeat string
if v.Count > 1 {
repeat = " (repeated " + strconv.Itoa(v.Count) + " times)"
}
log.Println(errorPrefix + v.Example + repeat)
}
}

View file

@ -0,0 +1,163 @@
package tracer
import (
"os"
"path/filepath"
"time"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)
// config holds the tracer configuration.
type config struct {
// debug, when true, writes details to logs.
debug bool
// serviceName specifies the name of this application.
serviceName string
// sampler specifies the sampler that will be used for sampling traces.
sampler Sampler
// agentAddr specifies the hostname and of the agent where the traces
// are sent to.
agentAddr string
// globalTags holds a set of tags that will be automatically applied to
// all spans.
globalTags map[string]interface{}
// transport specifies the Transport interface which will be used to send data to the agent.
transport transport
// propagator propagates span context cross-process
propagator Propagator
}
// StartOption represents a function that can be provided as a parameter to Start.
type StartOption func(*config)
// defaults sets the default values for a config.
func defaults(c *config) {
c.serviceName = filepath.Base(os.Args[0])
c.sampler = NewAllSampler()
c.agentAddr = defaultAddress
}
// WithDebugMode enables debug mode on the tracer, resulting in more verbose logging.
func WithDebugMode(enabled bool) StartOption {
return func(c *config) {
c.debug = enabled
}
}
// WithPropagator sets an alternative propagator to be used by the tracer.
func WithPropagator(p Propagator) StartOption {
return func(c *config) {
c.propagator = p
}
}
// WithServiceName sets the default service name to be used with the tracer.
func WithServiceName(name string) StartOption {
return func(c *config) {
c.serviceName = name
}
}
// WithAgentAddr sets the address where the agent is located. The default is
// localhost:8126. It should contain both host and port.
func WithAgentAddr(addr string) StartOption {
return func(c *config) {
c.agentAddr = addr
}
}
// WithGlobalTag sets a key/value pair which will be set as a tag on all spans
// created by tracer. This option may be used multiple times.
func WithGlobalTag(k string, v interface{}) StartOption {
return func(c *config) {
if c.globalTags == nil {
c.globalTags = make(map[string]interface{})
}
c.globalTags[k] = v
}
}
// WithSampler sets the given sampler to be used with the tracer. By default
// an all-permissive sampler is used.
func WithSampler(s Sampler) StartOption {
return func(c *config) {
c.sampler = s
}
}
// StartSpanOption is a configuration option for StartSpan. It is aliased in order
// to help godoc group all the functions returning it together. It is considered
// more correct to refer to it as the type as the origin, ddtrace.StartSpanOption.
type StartSpanOption = ddtrace.StartSpanOption
// Tag sets the given key/value pair as a tag on the started Span.
func Tag(k string, v interface{}) StartSpanOption {
return func(cfg *ddtrace.StartSpanConfig) {
if cfg.Tags == nil {
cfg.Tags = map[string]interface{}{}
}
cfg.Tags[k] = v
}
}
// ServiceName sets the given service name on the started span. For example "http.server".
func ServiceName(name string) StartSpanOption {
return Tag(ext.ServiceName, name)
}
// ResourceName sets the given resource name on the started span. A resource could
// be an SQL query, a URL, an RPC method or something else.
func ResourceName(name string) StartSpanOption {
return Tag(ext.ResourceName, name)
}
// SpanType sets the given span type on the started span. Some examples in the case of
// the Datadog APM product could be "web", "db" or "cache".
func SpanType(name string) StartSpanOption {
return Tag(ext.SpanType, name)
}
// ChildOf tells StartSpan to use the given span context as a parent for the
// created span.
func ChildOf(ctx ddtrace.SpanContext) StartSpanOption {
return func(cfg *ddtrace.StartSpanConfig) {
cfg.Parent = ctx
}
}
// StartTime sets a custom time as the start time for the created span. By
// default a span is started using the creation time.
func StartTime(t time.Time) StartSpanOption {
return func(cfg *ddtrace.StartSpanConfig) {
cfg.StartTime = t
}
}
// FinishOption is a configuration option for FinishSpan. It is aliased in order
// to help godoc group all the functions returning it together. It is considered
// more correct to refer to it as the type as the origin, ddtrace.FinishOption.
type FinishOption = ddtrace.FinishOption
// FinishTime sets the given time as the finishing time for the span. By default,
// the current time is used.
func FinishTime(t time.Time) FinishOption {
return func(cfg *ddtrace.FinishConfig) {
cfg.FinishTime = t
}
}
// WithError adds the given error to the span before marking it as finished. If it is
// nil it will be disregarded.
func WithError(err error) FinishOption {
return func(cfg *ddtrace.FinishConfig) {
cfg.Error = err
}
}

View file

@ -0,0 +1,116 @@
package tracer
import (
"bytes"
"encoding/binary"
"io"
"github.com/tinylib/msgp/msgp"
)
// payload is a wrapper on top of the msgpack encoder which allows constructing an
// encoded array by pushing its entries sequentially, one at a time. It basically
// allows us to encode as we would with a stream, except that the contents of the stream
// can be read as a slice by the msgpack decoder at any time. It follows the guidelines
// from the msgpack array spec:
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
//
// payload implements io.Reader and can be used with the decoder directly. To create
// a new payload use the newPayload method.
//
// payload is not safe for concurrent use.
//
// This structure basically allows us to push traces into the payload one at a time
// in order to always have knowledge of the payload size, but also making it possible
// for the agent to decode it as an array.
type payload struct {
// header specifies the first few bytes in the msgpack stream
// indicating the type of array (fixarray, array16 or array32)
// and the number of items contained in the stream.
header []byte
// off specifies the current read position on the header.
off int
// count specifies the number of items in the stream.
count uint64
// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer
}
var _ io.Reader = (*payload)(nil)
// newPayload returns a ready to use payload.
func newPayload() *payload {
p := &payload{
header: make([]byte, 8),
off: 8,
}
return p
}
// push pushes a new item into the stream.
func (p *payload) push(t spanList) error {
if err := msgp.Encode(&p.buf, t); err != nil {
return err
}
p.count++
p.updateHeader()
return nil
}
// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
return int(p.count)
}
// size returns the payload size in bytes. After the first read the value becomes
// inaccurate by up to 8 bytes.
func (p *payload) size() int {
return p.buf.Len() + len(p.header) - p.off
}
// reset resets the internal buffer, counter and read offset.
func (p *payload) reset() {
p.off = 8
p.count = 0
p.buf.Reset()
}
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
msgpackArrayFix byte = 144 // up to 15 items
msgpackArray16 = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
msgpackArray32 = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
)
// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
n := p.count
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
p.off = 7
case n <= 1<<16-1:
binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
p.header[5] = msgpackArray16
p.off = 5
default: // n <= 1<<32-1
binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
p.header[3] = msgpackArray32
p.off = 3
}
}
// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *payload) Read(b []byte) (n int, err error) {
if p.off < len(p.header) {
// reading header
n = copy(b, p.header[p.off:])
p.off += n
return n, nil
}
return p.buf.Read(b)
}

View file

@ -0,0 +1,52 @@
package tracer
import (
"errors"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
)
// Propagator implementations should be able to inject and extract
// SpanContexts into an implementation specific carrier.
type Propagator interface {
// Inject takes the SpanContext and injects it into the carrier.
Inject(context ddtrace.SpanContext, carrier interface{}) error
// Extract returns the SpanContext from the given carrier.
Extract(carrier interface{}) (ddtrace.SpanContext, error)
}
// TextMapWriter allows setting key/value pairs of strings on the underlying
// data structure. Carriers implementing TextMapWriter are compatible to be
// used with Datadog's TextMapPropagator.
type TextMapWriter interface {
// Set sets the given key/value pair.
Set(key, val string)
}
// TextMapReader allows iterating over sets of key/value pairs. Carriers implementing
// TextMapReader are compatible to be used with Datadog's TextMapPropagator.
type TextMapReader interface {
// ForeachKey iterates over all keys that exist in the underlying
// carrier. It takes a callback function which will be called
// using all key/value pairs as arguments. ForeachKey will return
// the first error returned by the handler.
ForeachKey(handler func(key, val string) error) error
}
var (
// ErrInvalidCarrier is returned when the carrier provided to the propagator
// does not implemented the correct interfaces.
ErrInvalidCarrier = errors.New("invalid carrier")
// ErrInvalidSpanContext is returned when the span context found in the
// carrier is not of the expected type.
ErrInvalidSpanContext = errors.New("invalid span context")
// ErrSpanContextCorrupted is returned when there was a problem parsing
// the information found in the carrier.
ErrSpanContextCorrupted = errors.New("span context corrupted")
// ErrSpanContextNotFound represents missing information in the given carrier.
ErrSpanContextNotFound = errors.New("span context not found")
)

View file

@ -0,0 +1,50 @@
package tracer
import (
cryptorand "crypto/rand"
"log"
"math"
"math/big"
"math/rand"
"sync"
"time"
)
// random holds a thread-safe source of random numbers.
var random *rand.Rand
func init() {
var seed int64
n, err := cryptorand.Int(cryptorand.Reader, big.NewInt(math.MaxInt64))
if err == nil {
seed = n.Int64()
} else {
log.Printf("%scannot generate random seed: %v; using current time\n", errorPrefix, err)
seed = time.Now().UnixNano()
}
random = rand.New(&safeSource{
source: rand.NewSource(seed),
})
}
// safeSource holds a thread-safe implementation of rand.Source64.
type safeSource struct {
source rand.Source
sync.Mutex
}
func (rs *safeSource) Int63() int64 {
rs.Lock()
n := rs.source.Int63()
rs.Unlock()
return n
}
func (rs *safeSource) Uint64() uint64 { return uint64(rs.Int63()) }
func (rs *safeSource) Seed(seed int64) {
rs.Lock()
rs.Seed(seed)
rs.Unlock()
}

View file

@ -0,0 +1,72 @@
package tracer
import (
"math"
"sync"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
)
// Sampler is the generic interface of any sampler. It must be safe for concurrent use.
type Sampler interface {
// Sample returns true if the given span should be sampled.
Sample(span Span) bool
}
// RateSampler is a sampler implementation which randomly selects spans using a
// provided rate. For example, a rate of 0.75 will permit 75% of the spans.
// RateSampler implementations should be safe for concurrent use.
type RateSampler interface {
Sampler
// Rate returns the current sample rate.
Rate() float64
// SetRate sets a new sample rate.
SetRate(rate float64)
}
// rateSampler samples from a sample rate.
type rateSampler struct {
sync.RWMutex
rate float64
}
// NewAllSampler is a short-hand for NewRateSampler(1). It is all-permissive.
func NewAllSampler() RateSampler { return NewRateSampler(1) }
// NewRateSampler returns an initialized RateSampler with a given sample rate.
func NewRateSampler(rate float64) RateSampler {
return &rateSampler{rate: rate}
}
// Rate returns the current rate of the sampler.
func (r *rateSampler) Rate() float64 {
r.RLock()
defer r.RUnlock()
return r.rate
}
// SetRate sets a new sampling rate.
func (r *rateSampler) SetRate(rate float64) {
r.Lock()
r.rate = rate
r.Unlock()
}
// constants used for the Knuth hashing, same as agent.
const knuthFactor = uint64(1111111111111111111)
// Sample returns true if the given span should be sampled.
func (r *rateSampler) Sample(spn ddtrace.Span) bool {
s, ok := spn.(*span)
if !ok {
return false
}
r.RLock()
defer r.RUnlock()
if r.rate < 1 {
return s.TraceID*knuthFactor < uint64(r.rate*math.MaxUint64)
}
return true
}

View file

@ -0,0 +1,231 @@
//go:generate msgp -unexported -marshal=false -o=span_msgp.go -tests=false
package tracer
import (
"fmt"
"reflect"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)
type (
// spanList implements msgp.Encodable on top of a slice of spans.
spanList []*span
// spanLists implements msgp.Decodable on top of a slice of spanList.
// This type is only used in tests.
spanLists []spanList
)
var (
_ ddtrace.Span = (*span)(nil)
_ msgp.Encodable = (*spanList)(nil)
_ msgp.Decodable = (*spanLists)(nil)
)
// span represents a computation. Callers must call Finish when a span is
// complete to ensure it's submitted.
type span struct {
sync.RWMutex `msg:"-"`
Name string `msg:"name"` // operation name
Service string `msg:"service"` // service name (i.e. "grpc.server", "http.request")
Resource string `msg:"resource"` // resource name (i.e. "/user?id=123", "SELECT * FROM users")
Type string `msg:"type"` // protocol associated with the span (i.e. "web", "db", "cache")
Start int64 `msg:"start"` // span start time expressed in nanoseconds since epoch
Duration int64 `msg:"duration"` // duration of the span expressed in nanoseconds
Meta map[string]string `msg:"meta,omitempty"` // arbitrary map of metadata
Metrics map[string]float64 `msg:"metrics,omitempty"` // arbitrary map of numeric metrics
SpanID uint64 `msg:"span_id"` // identifier of this span
TraceID uint64 `msg:"trace_id"` // identifier of the root span
ParentID uint64 `msg:"parent_id"` // identifier of the span's direct parent
Error int32 `msg:"error"` // error status of the span; 0 means no errors
finished bool `msg:"-"` // true if the span has been submitted to a tracer.
context *spanContext `msg:"-"` // span propagation context
}
// Context yields the SpanContext for this Span. Note that the return
// value of Context() is still valid after a call to Finish(). This is
// called the span context and it is different from Go's context.
func (s *span) Context() ddtrace.SpanContext { return s.context }
// SetBaggageItem sets a key/value pair as baggage on the span. Baggage items
// are propagated down to descendant spans and injected cross-process. Use with
// care as it adds extra load onto your tracing layer.
func (s *span) SetBaggageItem(key, val string) {
s.context.setBaggageItem(key, val)
}
// BaggageItem gets the value for a baggage item given its key. Returns the
// empty string if the value isn't found in this Span.
func (s *span) BaggageItem(key string) string {
return s.context.baggageItem(key)
}
// SetTag adds a set of key/value metadata to the span.
func (s *span) SetTag(key string, value interface{}) {
s.Lock()
defer s.Unlock()
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
return
}
if key == ext.Error {
s.setTagError(value)
return
}
if v, ok := value.(string); ok {
s.setTagString(key, v)
return
}
if v, ok := toFloat64(value); ok {
s.setTagNumeric(key, v)
return
}
// not numeric, not a string and not an error, the likelihood of this
// happening is close to zero, but we should nevertheless account for it.
s.Meta[key] = fmt.Sprint(value)
}
// setTagError sets the error tag. It accounts for various valid scenarios.
// This method is not safe for concurrent use.
func (s *span) setTagError(value interface{}) {
switch v := value.(type) {
case bool:
// bool value as per Opentracing spec.
if !v {
s.Error = 0
} else {
s.Error = 1
}
case error:
// if anyone sets an error value as the tag, be nice here
// and provide all the benefits.
s.Error = 1
s.Meta[ext.ErrorMsg] = v.Error()
s.Meta[ext.ErrorType] = reflect.TypeOf(v).String()
s.Meta[ext.ErrorStack] = string(debug.Stack())
case nil:
// no error
s.Error = 0
default:
// in all other cases, let's assume that setting this tag
// is the result of an error.
s.Error = 1
}
}
// setTagString sets a string tag. This method is not safe for concurrent use.
func (s *span) setTagString(key, v string) {
switch key {
case ext.ServiceName:
s.Service = v
case ext.ResourceName:
s.Resource = v
case ext.SpanType:
s.Type = v
default:
s.Meta[key] = v
}
}
// setTagNumeric sets a numeric tag, in our case called a metric. This method
// is not safe for concurrent use.
func (s *span) setTagNumeric(key string, v float64) {
switch key {
case ext.SamplingPriority:
// setting sampling priority per spec
s.Metrics[samplingPriorityKey] = v
s.context.setSamplingPriority(int(v))
default:
s.Metrics[key] = v
}
}
// Finish closes this Span (but not its children) providing the duration
// of its part of the tracing session.
func (s *span) Finish(opts ...ddtrace.FinishOption) {
var cfg ddtrace.FinishConfig
for _, fn := range opts {
fn(&cfg)
}
var t int64
if cfg.FinishTime.IsZero() {
t = now()
} else {
t = cfg.FinishTime.UnixNano()
}
if cfg.Error != nil {
s.SetTag(ext.Error, cfg.Error)
}
s.finish(t)
}
// SetOperationName sets or changes the operation name.
func (s *span) SetOperationName(operationName string) {
s.Lock()
defer s.Unlock()
s.Name = operationName
}
func (s *span) finish(finishTime int64) {
s.Lock()
defer s.Unlock()
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
// already finished
return
}
if s.Duration == 0 {
s.Duration = finishTime - s.Start
}
s.finished = true
if !s.context.sampled {
// not sampled
return
}
s.context.finish()
}
// String returns a human readable representation of the span. Not for
// production, just debugging.
func (s *span) String() string {
lines := []string{
fmt.Sprintf("Name: %s", s.Name),
fmt.Sprintf("Service: %s", s.Service),
fmt.Sprintf("Resource: %s", s.Resource),
fmt.Sprintf("TraceID: %d", s.TraceID),
fmt.Sprintf("SpanID: %d", s.SpanID),
fmt.Sprintf("ParentID: %d", s.ParentID),
fmt.Sprintf("Start: %s", time.Unix(0, s.Start)),
fmt.Sprintf("Duration: %s", time.Duration(s.Duration)),
fmt.Sprintf("Error: %d", s.Error),
fmt.Sprintf("Type: %s", s.Type),
"Tags:",
}
s.RLock()
for key, val := range s.Meta {
lines = append(lines, fmt.Sprintf("\t%s:%s", key, val))
}
for key, val := range s.Metrics {
lines = append(lines, fmt.Sprintf("\t%s:%f", key, val))
}
s.RUnlock()
return strings.Join(lines, "\n")
}
const samplingPriorityKey = "_sampling_priority_v1"

View file

@ -0,0 +1,448 @@
package tracer
// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *span) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
return
}
switch msgp.UnsafeString(field) {
case "name":
z.Name, err = dc.ReadString()
if err != nil {
return
}
case "service":
z.Service, err = dc.ReadString()
if err != nil {
return
}
case "resource":
z.Resource, err = dc.ReadString()
if err != nil {
return
}
case "type":
z.Type, err = dc.ReadString()
if err != nil {
return
}
case "start":
z.Start, err = dc.ReadInt64()
if err != nil {
return
}
case "duration":
z.Duration, err = dc.ReadInt64()
if err != nil {
return
}
case "meta":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
return
}
if z.Meta == nil && zb0002 > 0 {
z.Meta = make(map[string]string, zb0002)
} else if len(z.Meta) > 0 {
for key, _ := range z.Meta {
delete(z.Meta, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
za0001, err = dc.ReadString()
if err != nil {
return
}
za0002, err = dc.ReadString()
if err != nil {
return
}
z.Meta[za0001] = za0002
}
case "metrics":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
return
}
if z.Metrics == nil && zb0003 > 0 {
z.Metrics = make(map[string]float64, zb0003)
} else if len(z.Metrics) > 0 {
for key, _ := range z.Metrics {
delete(z.Metrics, key)
}
}
for zb0003 > 0 {
zb0003--
var za0003 string
var za0004 float64
za0003, err = dc.ReadString()
if err != nil {
return
}
za0004, err = dc.ReadFloat64()
if err != nil {
return
}
z.Metrics[za0003] = za0004
}
case "span_id":
z.SpanID, err = dc.ReadUint64()
if err != nil {
return
}
case "trace_id":
z.TraceID, err = dc.ReadUint64()
if err != nil {
return
}
case "parent_id":
z.ParentID, err = dc.ReadUint64()
if err != nil {
return
}
case "error":
z.Error, err = dc.ReadInt32()
if err != nil {
return
}
default:
err = dc.Skip()
if err != nil {
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *span) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 12
// write "name"
err = en.Append(0x8c, 0xa4, 0x6e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Name)
if err != nil {
return
}
// write "service"
err = en.Append(0xa7, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Service)
if err != nil {
return
}
// write "resource"
err = en.Append(0xa8, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Resource)
if err != nil {
return
}
// write "type"
err = en.Append(0xa4, 0x74, 0x79, 0x70, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Type)
if err != nil {
return
}
// write "start"
err = en.Append(0xa5, 0x73, 0x74, 0x61, 0x72, 0x74)
if err != nil {
return
}
err = en.WriteInt64(z.Start)
if err != nil {
return
}
// write "duration"
err = en.Append(0xa8, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e)
if err != nil {
return
}
err = en.WriteInt64(z.Duration)
if err != nil {
return
}
// write "meta"
err = en.Append(0xa4, 0x6d, 0x65, 0x74, 0x61)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.Meta)))
if err != nil {
return
}
for za0001, za0002 := range z.Meta {
err = en.WriteString(za0001)
if err != nil {
return
}
err = en.WriteString(za0002)
if err != nil {
return
}
}
// write "metrics"
err = en.Append(0xa7, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.Metrics)))
if err != nil {
return
}
for za0003, za0004 := range z.Metrics {
err = en.WriteString(za0003)
if err != nil {
return
}
err = en.WriteFloat64(za0004)
if err != nil {
return
}
}
// write "span_id"
err = en.Append(0xa7, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.SpanID)
if err != nil {
return
}
// write "trace_id"
err = en.Append(0xa8, 0x74, 0x72, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.TraceID)
if err != nil {
return
}
// write "parent_id"
err = en.Append(0xa9, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ParentID)
if err != nil {
return
}
// write "error"
err = en.Append(0xa5, 0x65, 0x72, 0x72, 0x6f, 0x72)
if err != nil {
return
}
err = en.WriteInt32(z.Error)
if err != nil {
return
}
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *span) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.StringPrefixSize + len(z.Service) + 9 + msgp.StringPrefixSize + len(z.Resource) + 5 + msgp.StringPrefixSize + len(z.Type) + 6 + msgp.Int64Size + 9 + msgp.Int64Size + 5 + msgp.MapHeaderSize
if z.Meta != nil {
for za0001, za0002 := range z.Meta {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
}
}
s += 8 + msgp.MapHeaderSize
if z.Metrics != nil {
for za0003, za0004 := range z.Metrics {
_ = za0004
s += msgp.StringPrefixSize + len(za0003) + msgp.Float64Size
}
}
s += 8 + msgp.Uint64Size + 9 + msgp.Uint64Size + 10 + msgp.Uint64Size + 6 + msgp.Int32Size
return
}
// DecodeMsg implements msgp.Decodable
func (z *spanList) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
return
}
if cap((*z)) >= int(zb0002) {
(*z) = (*z)[:zb0002]
} else {
(*z) = make(spanList, zb0002)
}
for zb0001 := range *z {
if dc.IsNil() {
err = dc.ReadNil()
if err != nil {
return
}
(*z)[zb0001] = nil
} else {
if (*z)[zb0001] == nil {
(*z)[zb0001] = new(span)
}
err = (*z)[zb0001].DecodeMsg(dc)
if err != nil {
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z spanList) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(uint32(len(z)))
if err != nil {
return
}
for zb0003 := range z {
if z[zb0003] == nil {
err = en.WriteNil()
if err != nil {
return
}
} else {
err = z[zb0003].EncodeMsg(en)
if err != nil {
return
}
}
}
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z spanList) Msgsize() (s int) {
s = msgp.ArrayHeaderSize
for zb0003 := range z {
if z[zb0003] == nil {
s += msgp.NilSize
} else {
s += z[zb0003].Msgsize()
}
}
return
}
// DecodeMsg implements msgp.Decodable
func (z *spanLists) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
return
}
if cap((*z)) >= int(zb0003) {
(*z) = (*z)[:zb0003]
} else {
(*z) = make(spanLists, zb0003)
}
for zb0001 := range *z {
var zb0004 uint32
zb0004, err = dc.ReadArrayHeader()
if err != nil {
return
}
if cap((*z)[zb0001]) >= int(zb0004) {
(*z)[zb0001] = ((*z)[zb0001])[:zb0004]
} else {
(*z)[zb0001] = make(spanList, zb0004)
}
for zb0002 := range (*z)[zb0001] {
if dc.IsNil() {
err = dc.ReadNil()
if err != nil {
return
}
(*z)[zb0001][zb0002] = nil
} else {
if (*z)[zb0001][zb0002] == nil {
(*z)[zb0001][zb0002] = new(span)
}
err = (*z)[zb0001][zb0002].DecodeMsg(dc)
if err != nil {
return
}
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z spanLists) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(uint32(len(z)))
if err != nil {
return
}
for zb0005 := range z {
err = en.WriteArrayHeader(uint32(len(z[zb0005])))
if err != nil {
return
}
for zb0006 := range z[zb0005] {
if z[zb0005][zb0006] == nil {
err = en.WriteNil()
if err != nil {
return
}
} else {
err = z[zb0005][zb0006].EncodeMsg(en)
if err != nil {
return
}
}
}
}
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z spanLists) Msgsize() (s int) {
s = msgp.ArrayHeaderSize
for zb0005 := range z {
s += msgp.ArrayHeaderSize
for zb0006 := range z[zb0005] {
if z[zb0005][zb0006] == nil {
s += msgp.NilSize
} else {
s += z[zb0005][zb0006].Msgsize()
}
}
}
return
}

View file

@ -0,0 +1,193 @@
package tracer
import (
"sync"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
)
var _ ddtrace.SpanContext = (*spanContext)(nil)
// SpanContext represents a span state that can propagate to descendant spans
// and across process boundaries. It contains all the information needed to
// spawn a direct descendant of the span that it belongs to. It can be used
// to create distributed tracing by propagating it using the provided interfaces.
type spanContext struct {
// the below group should propagate only locally
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
sampled bool // whether this span will be sampled or not
// the below group should propagate cross-process
traceID uint64
spanID uint64
mu sync.RWMutex // guards below fields
baggage map[string]string
priority int
hasPriority bool
}
// newSpanContext creates a new SpanContext to serve as context for the given
// span. If the provided parent is not nil, the context will inherit the trace,
// baggage and other values from it. This method also pushes the span into the
// new context's trace and as a result, it should not be called multiple times
// for the same span.
func newSpanContext(span *span, parent *spanContext) *spanContext {
context := &spanContext{
traceID: span.TraceID,
spanID: span.SpanID,
sampled: true,
span: span,
}
if v, ok := span.Metrics[samplingPriorityKey]; ok {
context.hasPriority = true
context.priority = int(v)
}
if parent != nil {
context.trace = parent.trace
context.sampled = parent.sampled
context.hasPriority = parent.hasSamplingPriority()
context.priority = parent.samplingPriority()
parent.ForeachBaggageItem(func(k, v string) bool {
context.setBaggageItem(k, v)
return true
})
}
if context.trace == nil {
context.trace = newTrace()
}
// put span in context's trace
context.trace.push(span)
return context
}
// SpanID implements ddtrace.SpanContext.
func (c *spanContext) SpanID() uint64 { return c.spanID }
// TraceID implements ddtrace.SpanContext.
func (c *spanContext) TraceID() uint64 { return c.traceID }
// ForeachBaggageItem implements ddtrace.SpanContext.
func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
c.mu.RLock()
defer c.mu.RUnlock()
for k, v := range c.baggage {
if !handler(k, v) {
break
}
}
}
func (c *spanContext) setSamplingPriority(p int) {
c.mu.Lock()
defer c.mu.Unlock()
c.priority = p
c.hasPriority = true
}
func (c *spanContext) samplingPriority() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.priority
}
func (c *spanContext) hasSamplingPriority() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.hasPriority
}
func (c *spanContext) setBaggageItem(key, val string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.baggage == nil {
c.baggage = make(map[string]string, 1)
}
c.baggage[key] = val
}
func (c *spanContext) baggageItem(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.baggage[key]
}
// finish marks this span as finished in the trace.
func (c *spanContext) finish() { c.trace.ackFinish() }
// trace holds information about a specific trace. This structure is shared
// between all spans in a trace.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
}
var (
// traceStartSize is the initial size of our trace buffer,
// by default we allocate for a handful of spans within the trace,
// reasonable as span is actually way bigger, and avoids re-allocating
// over and over. Could be fine-tuned at runtime.
traceStartSize = 10
// traceMaxSize is the maximum number of spans we keep in memory.
// This is to avoid memory leaks, if above that value, spans are randomly
// dropped and ignore, resulting in corrupted tracing data, but ensuring
// original program continues to work as expected.
traceMaxSize = int(1e5)
)
// newTrace creates a new trace using the given callback which will be called
// upon completion of the trace.
func newTrace() *trace {
return &trace{spans: make([]*span, 0, traceStartSize)}
}
// push pushes a new span into the trace. If the buffer is full, it returns
// a errBufferFull error.
func (t *trace) push(sp *span) {
t.mu.Lock()
defer t.mu.Unlock()
if t.full {
return
}
if len(t.spans) >= traceMaxSize {
// capacity is reached, we will not be able to complete this trace.
t.full = true
t.spans = nil // GC
if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have a tracer we can submit errors too.
tr.pushError(&spanBufferFullError{})
}
return
}
t.spans = append(t.spans, sp)
}
// ackFinish aknowledges that another span in the trace has finished, and checks
// if the trace is complete, in which case it calls the onFinish function.
func (t *trace) ackFinish() {
t.mu.Lock()
defer t.mu.Unlock()
if t.full {
// capacity has been reached, the buffer is no longer tracking
// all the spans in the trace, so the below conditions will not
// be accurate and would trigger a pre-mature flush, exposing us
// to a race condition where spans can be modified while flushing.
return
}
t.finished++
if len(t.spans) != t.finished {
return
}
if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have a tracer that can receive completed traces.
tr.pushTrace(t.spans)
}
t.spans = nil
t.finished = 0 // important, because a buffer can be used for several flushes
}

View file

@ -0,0 +1,198 @@
package tracer
import (
"net/http"
"strconv"
"strings"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
)
// HTTPHeadersCarrier wraps an http.Header as a TextMapWriter and TextMapReader, allowing
// it to be used using the provided Propagator implementation.
type HTTPHeadersCarrier http.Header
var _ TextMapWriter = (*HTTPHeadersCarrier)(nil)
var _ TextMapReader = (*HTTPHeadersCarrier)(nil)
// Set implements TextMapWriter.
func (c HTTPHeadersCarrier) Set(key, val string) {
h := http.Header(c)
h.Add(key, val)
}
// ForeachKey implements TextMapReader.
func (c HTTPHeadersCarrier) ForeachKey(handler func(key, val string) error) error {
for k, vals := range c {
for _, v := range vals {
if err := handler(k, v); err != nil {
return err
}
}
}
return nil
}
// TextMapCarrier allows the use of a regular map[string]string as both TextMapWriter
// and TextMapReader, making it compatible with the provided Propagator.
type TextMapCarrier map[string]string
var _ TextMapWriter = (*TextMapCarrier)(nil)
var _ TextMapReader = (*TextMapCarrier)(nil)
// Set implements TextMapWriter.
func (c TextMapCarrier) Set(key, val string) {
c[key] = val
}
// ForeachKey conforms to the TextMapReader interface.
func (c TextMapCarrier) ForeachKey(handler func(key, val string) error) error {
for k, v := range c {
if err := handler(k, v); err != nil {
return err
}
}
return nil
}
const (
// DefaultBaggageHeaderPrefix specifies the prefix that will be used in
// HTTP headers or text maps to prefix baggage keys.
DefaultBaggageHeaderPrefix = "ot-baggage-"
// DefaultTraceIDHeader specifies the key that will be used in HTTP headers
// or text maps to store the trace ID.
DefaultTraceIDHeader = "x-datadog-trace-id"
// DefaultParentIDHeader specifies the key that will be used in HTTP headers
// or text maps to store the parent ID.
DefaultParentIDHeader = "x-datadog-parent-id"
// DefaultPriorityHeader specifies the key that will be used in HTTP headers
// or text maps to store the sampling priority value.
DefaultPriorityHeader = "x-datadog-sampling-priority"
)
// PropagatorConfig defines the configuration for initializing a propagator.
type PropagatorConfig struct {
// BaggagePrefix specifies the prefix that will be used to store baggage
// items in a map. It defaults to DefaultBaggageHeaderPrefix.
BaggagePrefix string
// TraceHeader specifies the map key that will be used to store the trace ID.
// It defaults to DefaultTraceIDHeader.
TraceHeader string
// ParentHeader specifies the map key that will be used to store the parent ID.
// It defaults to DefaultParentIDHeader.
ParentHeader string
// PriorityHeader specifies the map key that will be used to store the sampling priority.
// It deafults to DefaultPriorityHeader.
PriorityHeader string
}
// NewPropagator returns a new propagator which uses TextMap to inject
// and extract values. It propagates trace and span IDs and baggage.
// To use the defaults, nil may be provided in place of the config.
func NewPropagator(cfg *PropagatorConfig) Propagator {
if cfg == nil {
cfg = new(PropagatorConfig)
}
if cfg.BaggagePrefix == "" {
cfg.BaggagePrefix = DefaultBaggageHeaderPrefix
}
if cfg.TraceHeader == "" {
cfg.TraceHeader = DefaultTraceIDHeader
}
if cfg.ParentHeader == "" {
cfg.ParentHeader = DefaultParentIDHeader
}
if cfg.PriorityHeader == "" {
cfg.PriorityHeader = DefaultPriorityHeader
}
return &propagator{cfg}
}
// propagator implements a propagator which uses TextMap internally.
// It propagates the trace and span IDs, as well as the baggage from the
// context.
type propagator struct{ cfg *PropagatorConfig }
// Inject defines the Propagator to propagate SpanContext data
// out of the current process. The implementation propagates the
// TraceID and the current active SpanID, as well as the Span baggage.
func (p *propagator) Inject(spanCtx ddtrace.SpanContext, carrier interface{}) error {
switch v := carrier.(type) {
case TextMapWriter:
return p.injectTextMap(spanCtx, v)
default:
return ErrInvalidCarrier
}
}
func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWriter) error {
ctx, ok := spanCtx.(*spanContext)
if !ok || ctx.traceID == 0 || ctx.spanID == 0 {
return ErrInvalidSpanContext
}
// propagate the TraceID and the current active SpanID
writer.Set(p.cfg.TraceHeader, strconv.FormatUint(ctx.traceID, 10))
writer.Set(p.cfg.ParentHeader, strconv.FormatUint(ctx.spanID, 10))
if ctx.hasSamplingPriority() {
writer.Set(p.cfg.PriorityHeader, strconv.Itoa(ctx.samplingPriority()))
}
// propagate OpenTracing baggage
for k, v := range ctx.baggage {
writer.Set(p.cfg.BaggagePrefix+k, v)
}
return nil
}
// Extract implements Propagator.
func (p *propagator) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
switch v := carrier.(type) {
case TextMapReader:
return p.extractTextMap(v)
default:
return nil, ErrInvalidCarrier
}
}
func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext, error) {
var ctx spanContext
err := reader.ForeachKey(func(k, v string) error {
var err error
key := strings.ToLower(k)
switch key {
case p.cfg.TraceHeader:
ctx.traceID, err = strconv.ParseUint(v, 10, 64)
if err != nil {
return ErrSpanContextCorrupted
}
case p.cfg.ParentHeader:
ctx.spanID, err = strconv.ParseUint(v, 10, 64)
if err != nil {
return ErrSpanContextCorrupted
}
case p.cfg.PriorityHeader:
ctx.priority, err = strconv.Atoi(v)
if err != nil {
return ErrSpanContextCorrupted
}
ctx.hasPriority = true
default:
if strings.HasPrefix(key, p.cfg.BaggagePrefix) {
ctx.setBaggageItem(strings.TrimPrefix(key, p.cfg.BaggagePrefix), v)
}
}
return nil
})
if err != nil {
return nil, err
}
if ctx.traceID == 0 || ctx.spanID == 0 {
return nil, ErrSpanContextNotFound
}
return &ctx, nil
}

View file

@ -0,0 +1,10 @@
// +build !windows
package tracer
import "time"
// now returns current UTC time in nanos.
func now() int64 {
return time.Now().UTC().UnixNano()
}

View file

@ -0,0 +1,35 @@
package tracer
import (
"log"
"time"
"golang.org/x/sys/windows"
)
// This method is more precise than the go1.8 time.Now on Windows
// See https://msdn.microsoft.com/en-us/library/windows/desktop/hh706895(v=vs.85).aspx
// It is however ~10x slower and requires Windows 8+.
func highPrecisionNow() int64 {
var ft windows.Filetime
windows.GetSystemTimePreciseAsFileTime(&ft)
return ft.Nanoseconds()
}
func lowPrecisionNow() int64 {
return time.Now().UTC().UnixNano()
}
var now func() int64
// If GetSystemTimePreciseAsFileTime is not available we default to the less
// precise implementation based on time.Now()
func init() {
if err := windows.LoadGetSystemTimePreciseAsFileTime(); err != nil {
log.Printf("Unable to load high precison timer, defaulting to time.Now()")
now = lowPrecisionNow
} else {
log.Printf("Using high precision timer")
now = highPrecisionNow
}
}

View file

@ -0,0 +1,378 @@
package tracer
import (
"errors"
"log"
"os"
"strconv"
"time"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
)
var _ ddtrace.Tracer = (*tracer)(nil)
// tracer creates, buffers and submits Spans which are used to time blocks of
// computation. They are accumulated and streamed into an internal payload,
// which is flushed to the agent whenever its size exceeds a specific threshold
// or when a certain interval of time has passed, whichever happens first.
//
// tracer operates based on a worker loop which responds to various request
// channels. It additionally holds two buffers which accumulates error and trace
// queues to be processed by the payload encoder.
type tracer struct {
*config
*payload
flushAllReq chan chan<- struct{}
flushTracesReq chan struct{}
flushErrorsReq chan struct{}
exitReq chan struct{}
payloadQueue chan []*span
errorBuffer chan error
// stopped is a channel that will be closed when the worker has exited.
stopped chan struct{}
// syncPush is used for testing. When non-nil, it causes pushTrace to become
// a synchronous (blocking) operation, meaning that it will only return after
// the trace has been fully processed and added onto the payload.
syncPush chan struct{}
}
const (
// flushInterval is the interval at which the payload contents will be flushed
// to the transport.
flushInterval = 2 * time.Second
// payloadMaxLimit is the maximum payload size allowed and should indicate the
// maximum size of the package that the agent can receive.
payloadMaxLimit = 9.5 * 1024 * 1024 // 9.5 MB
// payloadSizeLimit specifies the maximum allowed size of the payload before
// it will trigger a flush to the transport.
payloadSizeLimit = payloadMaxLimit / 2
)
// Start starts the tracer with the given set of options. It will stop and replace
// any running tracer, meaning that calling it several times will result in a restart
// of the tracer by replacing the current instance with a new one.
func Start(opts ...StartOption) {
if internal.Testing {
return // mock tracer active
}
t := internal.GetGlobalTracer()
internal.SetGlobalTracer(newTracer(opts...))
t.Stop()
}
// Stop stops the started tracer. Subsequent calls are valid but become no-op.
func Stop() {
internal.SetGlobalTracer(&internal.NoopTracer{})
}
// Span is an alias for ddtrace.Span. It is here to allow godoc to group methods returning
// ddtrace.Span. It is recommended and is considered more correct to refer to this type as
// ddtrace.Span instead.
type Span = ddtrace.Span
// StartSpan starts a new span with the given operation name and set of options.
// If the tracer is not started, calling this function is a no-op.
func StartSpan(operationName string, opts ...StartSpanOption) Span {
return internal.GetGlobalTracer().StartSpan(operationName, opts...)
}
// Extract extracts a SpanContext from the carrier. The carrier is expected
// to implement TextMapReader, otherwise an error is returned.
// If the tracer is not started, calling this function is a no-op.
func Extract(carrier interface{}) (ddtrace.SpanContext, error) {
return internal.GetGlobalTracer().Extract(carrier)
}
// Inject injects the given SpanContext into the carrier. The carrier is
// expected to implement TextMapWriter, otherwise an error is returned.
// If the tracer is not started, calling this function is a no-op.
func Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
return internal.GetGlobalTracer().Inject(ctx, carrier)
}
const (
// payloadQueueSize is the buffer size of the trace channel.
payloadQueueSize = 1000
// errorBufferSize is the buffer size of the error channel.
errorBufferSize = 200
)
func newTracer(opts ...StartOption) *tracer {
c := new(config)
defaults(c)
for _, fn := range opts {
fn(c)
}
if c.transport == nil {
c.transport = newTransport(c.agentAddr)
}
if c.propagator == nil {
c.propagator = NewPropagator(nil)
}
t := &tracer{
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
}
go t.worker()
return t
}
// worker receives finished traces to be added into the payload, as well
// as periodically flushes traces to the transport.
func (t *tracer) worker() {
defer close(t.stopped)
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
for {
select {
case trace := <-t.payloadQueue:
t.pushPayload(trace)
case <-ticker.C:
t.flush()
case done := <-t.flushAllReq:
t.flush()
done <- struct{}{}
case <-t.flushTracesReq:
t.flushTraces()
case <-t.flushErrorsReq:
t.flushErrors()
case <-t.exitReq:
t.flush()
return
}
}
}
func (t *tracer) pushTrace(trace []*span) {
select {
case <-t.stopped:
return
default:
}
select {
case t.payloadQueue <- trace:
default:
t.pushError(&dataLossError{
context: errors.New("payload queue full, dropping trace"),
count: len(trace),
})
}
if t.syncPush != nil {
// only in tests
<-t.syncPush
}
}
func (t *tracer) pushError(err error) {
select {
case <-t.stopped:
return
default:
}
if len(t.errorBuffer) >= cap(t.errorBuffer)/2 { // starts being full, anticipate, try and flush soon
select {
case t.flushErrorsReq <- struct{}{}:
default: // a flush was already requested, skip
}
}
select {
case t.errorBuffer <- err:
default:
// OK, if we get this, our error error buffer is full,
// we can assume it is filled with meaningful messages which
// are going to be logged and hopefully read, nothing better
// we can do, blocking would make things worse.
}
}
// StartSpan creates, starts, and returns a new Span with the given `operationName`.
func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOption) ddtrace.Span {
var opts ddtrace.StartSpanConfig
for _, fn := range options {
fn(&opts)
}
var startTime int64
if opts.StartTime.IsZero() {
startTime = now()
} else {
startTime = opts.StartTime.UnixNano()
}
var context *spanContext
if opts.Parent != nil {
if ctx, ok := opts.Parent.(*spanContext); ok {
context = ctx
}
}
id := random.Uint64()
// span defaults
span := &span{
Name: operationName,
Service: t.config.serviceName,
Resource: operationName,
Meta: map[string]string{},
Metrics: map[string]float64{},
SpanID: id,
TraceID: id,
ParentID: 0,
Start: startTime,
}
if context != nil {
// this is a child span
span.TraceID = context.traceID
span.ParentID = context.spanID
if context.hasSamplingPriority() {
span.Metrics[samplingPriorityKey] = float64(context.samplingPriority())
}
if context.span != nil {
context.span.RLock()
span.Service = context.span.Service
context.span.RUnlock()
}
}
span.context = newSpanContext(span, context)
if context == nil || context.span == nil {
// this is either a global root span or a process-level root span
span.SetTag(ext.Pid, strconv.Itoa(os.Getpid()))
t.sample(span)
}
// add tags from options
for k, v := range opts.Tags {
span.SetTag(k, v)
}
// add global tags
for k, v := range t.config.globalTags {
span.SetTag(k, v)
}
return span
}
// Stop stops the tracer.
func (t *tracer) Stop() {
select {
case <-t.stopped:
return
default:
t.exitReq <- struct{}{}
<-t.stopped
}
}
// Inject uses the configured or default TextMap Propagator.
func (t *tracer) Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
return t.config.propagator.Inject(ctx, carrier)
}
// Extract uses the configured or default TextMap Propagator.
func (t *tracer) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
return t.config.propagator.Extract(carrier)
}
// flushTraces will push any currently buffered traces to the server.
func (t *tracer) flushTraces() {
if t.payload.itemCount() == 0 {
return
}
size, count := t.payload.size(), t.payload.itemCount()
if t.config.debug {
log.Printf("Sending payload: size: %d traces: %d\n", size, count)
}
err := t.config.transport.send(t.payload)
if err != nil && size > payloadMaxLimit {
// we couldn't send the payload and it is getting too big to be
// accepted by the agent, we have to drop it.
t.payload.reset()
t.pushError(&dataLossError{context: err, count: count})
}
if err == nil {
// send succeeded
t.payload.reset()
}
}
// flushErrors will process log messages that were queued
func (t *tracer) flushErrors() {
logErrors(t.errorBuffer)
}
func (t *tracer) flush() {
t.flushTraces()
t.flushErrors()
}
// forceFlush forces a flush of data (traces and services) to the agent.
// Flushes are done by a background task on a regular basis, so you never
// need to call this manually, mostly useful for testing and debugging.
func (t *tracer) forceFlush() {
done := make(chan struct{})
t.flushAllReq <- done
<-done
}
// pushPayload pushes the trace onto the payload. If the payload becomes
// larger than the threshold as a result, it sends a flush request.
func (t *tracer) pushPayload(trace []*span) {
if err := t.payload.push(trace); err != nil {
t.pushError(&traceEncodingError{context: err})
}
if t.payload.size() > payloadSizeLimit {
// getting large
select {
case t.flushTracesReq <- struct{}{}:
default:
// flush already queued
}
}
if t.syncPush != nil {
// only in tests
t.syncPush <- struct{}{}
}
}
// sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent.
const sampleRateMetricKey = "_sample_rate"
// Sample samples a span with the internal sampler.
func (t *tracer) sample(span *span) {
sampler := t.config.sampler
sampled := sampler.Sample(span)
span.context.sampled = sampled
if !sampled {
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
// the span was sampled using a rate sampler which wasn't all permissive,
// so we make note of the sampling rate.
span.Lock()
defer span.Unlock()
if span.finished {
// we don't touch finished span as they might be flushing
return
}
span.Metrics[sampleRateMetricKey] = rs.Rate()
}
}

View file

@ -0,0 +1,120 @@
package tracer
import (
"fmt"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"time"
)
var tracerVersion = "v1.0"
const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultAddress = defaultHostname + ":" + defaultPort
defaultHTTPTimeout = time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
)
// Transport is an interface for span submission to the agent.
type transport interface {
send(p *payload) error
}
// newTransport returns a new Transport implementation that sends traces to a
// trace agent running on the given hostname and port. If the zero values for
// hostname and port are provided, the default values will be used ("localhost"
// for hostname, and "8126" for port).
//
// In general, using this method is only necessary if you have a trace agent
// running on a non-default port or if it's located on another machine.
func newTransport(addr string) transport {
return newHTTPTransport(addr)
}
// newDefaultTransport return a default transport for this tracing client
func newDefaultTransport() transport {
return newHTTPTransport(defaultAddress)
}
type httpTransport struct {
traceURL string // the delivery URL for traces
client *http.Client // the HTTP client used in the POST
headers map[string]string // the Transport headers
}
// newHTTPTransport returns an httpTransport for the given endpoint
func newHTTPTransport(addr string) *httpTransport {
// initialize the default EncoderPool with Encoder headers
defaultHeaders := map[string]string{
"Datadog-Meta-Lang": "go",
"Datadog-Meta-Lang-Version": strings.TrimPrefix(runtime.Version(), "go"),
"Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS,
"Datadog-Meta-Tracer-Version": tracerVersion,
"Content-Type": "application/msgpack",
}
host, port, _ := net.SplitHostPort(addr)
if host == "" {
host = defaultHostname
}
if port == "" {
port = defaultPort
}
addr = fmt.Sprintf("%s:%s", host, port)
return &httpTransport{
traceURL: fmt.Sprintf("http://%s/v0.3/traces", addr),
client: &http.Client{
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: defaultHTTPTimeout,
},
headers: defaultHeaders,
}
}
func (t *httpTransport) send(p *payload) error {
// prepare the client and send the payload
req, err := http.NewRequest("POST", t.traceURL, p)
if err != nil {
return fmt.Errorf("cannot create http request: %v", err)
}
for header, value := range t.headers {
req.Header.Set(header, value)
}
req.Header.Set(traceCountHeader, strconv.Itoa(p.itemCount()))
req.Header.Set("Content-Length", strconv.Itoa(p.size()))
response, err := t.client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
if code := response.StatusCode; code >= 400 {
// error, check the body for context information and
// return a nice error.
msg := make([]byte, 1000)
n, _ := response.Body.Read(msg)
txt := http.StatusText(code)
if n > 0 {
return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
}
return fmt.Errorf("%s", txt)
}
return nil
}

View file

@ -0,0 +1,32 @@
package tracer
// toFloat64 attempts to convert value into a float64. If it succeeds it returns
// the value and true, otherwise 0 and false.
func toFloat64(value interface{}) (f float64, ok bool) {
switch i := value.(type) {
case byte:
return float64(i), true
case float32:
return float64(i), true
case float64:
return i, true
case int:
return float64(i), true
case int16:
return float64(i), true
case int32:
return float64(i), true
case int64:
return float64(i), true
case uint:
return float64(i), true
case uint16:
return float64(i), true
case uint32:
return float64(i), true
case uint64:
return float64(i), true
default:
return 0, false
}
}