1
0
Fork 0

Updates for jaeger tracing client.

This commit is contained in:
Tristan Colgate-McFarlane 2018-08-01 12:52:03 +01:00 committed by Traefiker Bot
parent 4809476c19
commit 6f6ebb8025
92 changed files with 6824 additions and 819 deletions

View file

@ -1,26 +1,21 @@
// Copyright (c) 2016 Uber Technologies, Inc.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// http://www.apache.org/licenses/LICENSE-2.0
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jaeger
import (
"fmt"
"sync"
"sync/atomic"
"time"
@ -155,26 +150,40 @@ func (r *compositeReporter) Close() {
}
}
// ------------------------------
// ------------- REMOTE REPORTER -----------------
type reporterQueueItemType int
const (
defaultQueueSize = 100
defaultBufferFlushInterval = 10 * time.Second
defaultBufferFlushInterval = 1 * time.Second
reporterQueueItemSpan reporterQueueItemType = iota
reporterQueueItemClose
)
type remoteReporter struct {
// must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
queueLength int64
reporterOptions
sender Transport
queue chan *Span
queueDrained sync.WaitGroup
flushSignal chan *sync.WaitGroup
type reporterQueueItem struct {
itemType reporterQueueItemType
span *Span
close *sync.WaitGroup
}
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender
type remoteReporter struct {
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
queueLength int64
closed int64 // 0 - not closed, 1 - closed
reporterOptions
sender Transport
queue chan reporterQueueItem
}
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
// Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped).
// Periodically the transport buffer is flushed even if it hasn't reached max packet size.
// Calls to Close() block until all spans reported prior to the call to Close are flushed.
func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
options := reporterOptions{}
for _, option := range opts {
@ -195,18 +204,21 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
reporter := &remoteReporter{
reporterOptions: options,
sender: sender,
flushSignal: make(chan *sync.WaitGroup),
queue: make(chan *Span, options.queueSize),
queue: make(chan reporterQueueItem, options.queueSize),
}
go reporter.processQueue()
return reporter
}
// Report implements Report() method of Reporter.
// It passes the span to a background go-routine for submission to Jaeger.
// It passes the span to a background go-routine for submission to Jaeger backend.
// If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented.
// If Report() is called after the reporter has been Close()-ed, the additional spans will not be
// sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly,
// because some of them may still be successfully added to the queue.
func (r *remoteReporter) Report(span *Span) {
select {
case r.queue <- span:
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}:
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
@ -215,53 +227,63 @@ func (r *remoteReporter) Report(span *Span) {
// Close implements Close() method of Reporter by waiting for the queue to be drained.
func (r *remoteReporter) Close() {
r.queueDrained.Add(1)
close(r.queue)
r.queueDrained.Wait()
if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
r.logger.Error("Repeated attempt to close the reporter is ignored")
return
}
r.sendCloseEvent()
r.sender.Close()
}
func (r *remoteReporter) sendCloseEvent() {
wg := &sync.WaitGroup{}
wg.Add(1)
item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg}
r.queue <- item // if the queue is full we will block until there is space
atomic.AddInt64(&r.queueLength, 1)
wg.Wait()
}
// processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
// Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
// reporting new spans.
func (r *remoteReporter) processQueue() {
// flush causes the Sender to flush its accumulated spans and clear the buffer
flush := func() {
if flushed, err := r.sender.Flush(); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
}
}
timer := time.NewTicker(r.bufferFlushInterval)
for {
select {
case span, ok := <-r.queue:
if ok {
atomic.AddInt64(&r.queueLength, -1)
case <-timer.C:
flush()
case item := <-r.queue:
atomic.AddInt64(&r.queueLength, -1)
switch item.itemType {
case reporterQueueItemSpan:
span := item.span
if flushed, err := r.sender.Append(span); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
r.logger.Error(err.Error())
r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
} else {
// queue closed
case reporterQueueItemClose:
timer.Stop()
r.flush()
r.queueDrained.Done()
flush()
item.close.Done()
return
}
case <-timer.C:
r.flush()
case wg := <-r.flushSignal: // for testing
r.flush()
wg.Done()
}
}
}
// flush causes the Sender to flush its accumulated spans and clear the buffer
func (r *remoteReporter) flush() {
if flushed, err := r.sender.Flush(); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
r.logger.Error(err.Error())
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
}
}