Create init method on provider interface
This commit is contained in:
parent
b2a57ca1f3
commit
027093a5a5
52 changed files with 2760 additions and 131 deletions
408
vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go
generated
vendored
Normal file
408
vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go
generated
vendored
Normal file
|
@ -0,0 +1,408 @@
|
|||
package appinsights
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/clock"
|
||||
)
|
||||
|
||||
var (
|
||||
submit_retries = []time.Duration{time.Duration(10 * time.Second), time.Duration(30 * time.Second), time.Duration(60 * time.Second)}
|
||||
)
|
||||
|
||||
type TelemetryBufferItems []Telemetry
|
||||
|
||||
type InMemoryChannel struct {
|
||||
endpointAddress string
|
||||
isDeveloperMode bool
|
||||
collectChan chan Telemetry
|
||||
controlChan chan *inMemoryChannelControl
|
||||
batchSize int
|
||||
batchInterval time.Duration
|
||||
waitgroup sync.WaitGroup
|
||||
throttle *throttleManager
|
||||
transmitter transmitter
|
||||
}
|
||||
|
||||
type inMemoryChannelControl struct {
|
||||
// If true, flush the buffer.
|
||||
flush bool
|
||||
|
||||
// If true, stop listening on the channel. (Flush is required if any events are to be sent)
|
||||
stop bool
|
||||
|
||||
// If stopping and flushing, this specifies whether to retry submissions on error.
|
||||
retry bool
|
||||
|
||||
// If retrying, what is the max time to wait before finishing up?
|
||||
timeout time.Duration
|
||||
|
||||
// If specified, a message will be sent on this channel when all pending telemetry items have been submitted
|
||||
callback chan struct{}
|
||||
}
|
||||
|
||||
func NewInMemoryChannel(config *TelemetryConfiguration) *InMemoryChannel {
|
||||
channel := &InMemoryChannel{
|
||||
endpointAddress: config.EndpointUrl,
|
||||
collectChan: make(chan Telemetry),
|
||||
controlChan: make(chan *inMemoryChannelControl),
|
||||
batchSize: config.MaxBatchSize,
|
||||
batchInterval: config.MaxBatchInterval,
|
||||
throttle: newThrottleManager(),
|
||||
transmitter: newTransmitter(config.EndpointUrl),
|
||||
}
|
||||
|
||||
go channel.acceptLoop()
|
||||
|
||||
return channel
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) EndpointAddress() string {
|
||||
return channel.endpointAddress
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) Send(item Telemetry) {
|
||||
if item != nil && channel.collectChan != nil {
|
||||
channel.collectChan <- item
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) Flush() {
|
||||
if channel.controlChan != nil {
|
||||
channel.controlChan <- &inMemoryChannelControl{
|
||||
flush: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) Stop() {
|
||||
if channel.controlChan != nil {
|
||||
channel.controlChan <- &inMemoryChannelControl{
|
||||
stop: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) IsThrottled() bool {
|
||||
return channel.throttle != nil && channel.throttle.IsThrottled()
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) Close(timeout ...time.Duration) <-chan struct{} {
|
||||
if channel.controlChan != nil {
|
||||
callback := make(chan struct{})
|
||||
|
||||
ctl := &inMemoryChannelControl{
|
||||
stop: true,
|
||||
flush: true,
|
||||
retry: false,
|
||||
callback: callback,
|
||||
}
|
||||
|
||||
if len(timeout) > 0 {
|
||||
ctl.retry = true
|
||||
ctl.timeout = timeout[0]
|
||||
}
|
||||
|
||||
channel.controlChan <- ctl
|
||||
|
||||
return callback
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) acceptLoop() {
|
||||
channelState := newInMemoryChannelState(channel)
|
||||
|
||||
for !channelState.stopping {
|
||||
channelState.start()
|
||||
}
|
||||
|
||||
channelState.stop()
|
||||
}
|
||||
|
||||
// Data shared between parts of a channel
|
||||
type inMemoryChannelState struct {
|
||||
channel *InMemoryChannel
|
||||
stopping bool
|
||||
buffer TelemetryBufferItems
|
||||
retry bool
|
||||
retryTimeout time.Duration
|
||||
callback chan struct{}
|
||||
timer clock.Timer
|
||||
}
|
||||
|
||||
func newInMemoryChannelState(channel *InMemoryChannel) *inMemoryChannelState {
|
||||
return &inMemoryChannelState{
|
||||
channel: channel,
|
||||
buffer: make(TelemetryBufferItems, 0, 16),
|
||||
stopping: false,
|
||||
timer: currentClock.NewTimer(channel.batchInterval),
|
||||
}
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Initialize buffer and accept first message, handle controls.
|
||||
func (state *inMemoryChannelState) start() bool {
|
||||
if len(state.buffer) > 16 {
|
||||
// Start out with the size of the previous buffer
|
||||
state.buffer = make(TelemetryBufferItems, 0, cap(state.buffer))
|
||||
} else if len(state.buffer) > 0 {
|
||||
// Start out with at least 16 slots
|
||||
state.buffer = make(TelemetryBufferItems, 0, 16)
|
||||
}
|
||||
|
||||
// Wait for an event
|
||||
select {
|
||||
case event := <-state.channel.collectChan:
|
||||
if event == nil {
|
||||
// Channel closed? Not intercepted by Send()?
|
||||
panic("Received nil event")
|
||||
}
|
||||
|
||||
state.buffer = append(state.buffer, event)
|
||||
|
||||
case ctl := <-state.channel.controlChan:
|
||||
// The buffer is empty, so there would be no point in flushing
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
|
||||
if ctl.stop {
|
||||
state.stopping = true
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if len(state.buffer) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
return state.waitToSend()
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Wait for buffer to fill, timeout to expire, or flush
|
||||
func (state *inMemoryChannelState) waitToSend() bool {
|
||||
// Things that are used by the sender if we receive a control message
|
||||
state.retryTimeout = 0
|
||||
state.retry = true
|
||||
state.callback = nil
|
||||
|
||||
// Delay until timeout passes or buffer fills up
|
||||
state.timer.Reset(state.channel.batchInterval)
|
||||
for {
|
||||
select {
|
||||
case event := <-state.channel.collectChan:
|
||||
if event == nil {
|
||||
// Channel closed? Not intercepted by Send()?
|
||||
panic("Received nil event")
|
||||
}
|
||||
|
||||
state.buffer = append(state.buffer, event)
|
||||
if len(state.buffer) >= state.channel.batchSize {
|
||||
return state.send()
|
||||
}
|
||||
|
||||
case ctl := <-state.channel.controlChan:
|
||||
if ctl.stop {
|
||||
state.stopping = true
|
||||
state.retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
// No flush? Just exit.
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if ctl.flush {
|
||||
state.retryTimeout = ctl.timeout
|
||||
state.callback = ctl.callback
|
||||
return state.send()
|
||||
}
|
||||
|
||||
case _ = <-state.timer.C():
|
||||
// Timeout expired
|
||||
return state.send()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Check and wait on throttle, submit pending telemetry
|
||||
func (state *inMemoryChannelState) send() bool {
|
||||
// Hold up transmission if we're being throttled
|
||||
if !state.stopping && state.channel.throttle.IsThrottled() {
|
||||
if !state.waitThrottle() {
|
||||
// Stopped
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Send
|
||||
if len(state.buffer) > 0 {
|
||||
state.channel.waitgroup.Add(1)
|
||||
|
||||
// If we have a callback, wait on the waitgroup now that it's
|
||||
// incremented.
|
||||
state.channel.signalWhenDone(state.callback)
|
||||
|
||||
go func(buffer TelemetryBufferItems, retry bool, retryTimeout time.Duration) {
|
||||
defer state.channel.waitgroup.Done()
|
||||
state.channel.transmitRetry(buffer, retry, retryTimeout)
|
||||
}(state.buffer, state.retry, state.retryTimeout)
|
||||
} else if state.callback != nil {
|
||||
state.channel.signalWhenDone(state.callback)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Wait for throttle to expire while dropping messages
|
||||
func (state *inMemoryChannelState) waitThrottle() bool {
|
||||
// Channel is currently throttled. Once the buffer fills, messages will
|
||||
// be lost... If we're exiting, then we'll just try to submit anyway. That
|
||||
// request may be throttled and transmitRetry will perform the backoff correctly.
|
||||
|
||||
diagnosticsWriter.Write("Channel is throttled, events may be dropped.")
|
||||
throttleDone := state.channel.throttle.NotifyWhenReady()
|
||||
dropped := 0
|
||||
|
||||
defer diagnosticsWriter.Printf("Channel dropped %d events while throttled", dropped)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-throttleDone:
|
||||
close(throttleDone)
|
||||
return true
|
||||
|
||||
case event := <-state.channel.collectChan:
|
||||
// If there's still room in the buffer, then go ahead and add it.
|
||||
if len(state.buffer) < state.channel.batchSize {
|
||||
state.buffer = append(state.buffer, event)
|
||||
} else {
|
||||
if dropped == 0 {
|
||||
diagnosticsWriter.Write("Buffer is full, dropping further events.")
|
||||
}
|
||||
|
||||
dropped++
|
||||
}
|
||||
|
||||
case ctl := <-state.channel.controlChan:
|
||||
if ctl.stop {
|
||||
state.stopping = true
|
||||
state.retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
return false
|
||||
} else {
|
||||
// Make an exception when stopping
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot flush
|
||||
// TODO: Figure out what to do about callback?
|
||||
if ctl.flush {
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Clean up and close telemetry channel
|
||||
func (state *inMemoryChannelState) stop() {
|
||||
close(state.channel.collectChan)
|
||||
close(state.channel.controlChan)
|
||||
|
||||
state.channel.collectChan = nil
|
||||
state.channel.controlChan = nil
|
||||
|
||||
// Throttle can't close until transmitters are done using it.
|
||||
state.channel.waitgroup.Wait()
|
||||
state.channel.throttle.Stop()
|
||||
|
||||
state.channel.throttle = nil
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry bool, retryTimeout time.Duration) {
|
||||
payload := items.serialize()
|
||||
retryTimeRemaining := retryTimeout
|
||||
|
||||
for _, wait := range submit_retries {
|
||||
result, err := channel.transmitter.Transmit(payload, items)
|
||||
if err == nil && result != nil && result.IsSuccess() {
|
||||
return
|
||||
}
|
||||
|
||||
if !retry {
|
||||
diagnosticsWriter.Write("Refusing to retry telemetry submission (retry==false)")
|
||||
return
|
||||
}
|
||||
|
||||
// Check for success, determine if we need to retry anything
|
||||
if result != nil {
|
||||
if result.CanRetry() {
|
||||
// Filter down to failed items
|
||||
payload, items = result.GetRetryItems(payload, items)
|
||||
if len(payload) == 0 || len(items) == 0 {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
diagnosticsWriter.Write("Cannot retry telemetry submission")
|
||||
return
|
||||
}
|
||||
|
||||
// Check for throttling
|
||||
if result.IsThrottled() {
|
||||
if result.retryAfter != nil {
|
||||
diagnosticsWriter.Printf("Channel is throttled until %s", *result.retryAfter)
|
||||
channel.throttle.RetryAfter(*result.retryAfter)
|
||||
} else {
|
||||
// TODO: Pick a time
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if retryTimeout > 0 {
|
||||
// We're on a time schedule here. Make sure we don't try longer
|
||||
// than we have been allowed.
|
||||
if retryTimeRemaining < wait {
|
||||
// One more chance left -- we'll wait the max time we can
|
||||
// and then retry on the way out.
|
||||
currentClock.Sleep(retryTimeRemaining)
|
||||
break
|
||||
} else {
|
||||
// Still have time left to go through the rest of the regular
|
||||
// retry schedule
|
||||
retryTimeRemaining -= wait
|
||||
}
|
||||
}
|
||||
|
||||
diagnosticsWriter.Printf("Waiting %s to retry submission", wait)
|
||||
currentClock.Sleep(wait)
|
||||
|
||||
// Wait if the channel is throttled and we're not on a schedule
|
||||
if channel.IsThrottled() && retryTimeout == 0 {
|
||||
diagnosticsWriter.Printf("Channel is throttled; extending wait time.")
|
||||
ch := channel.throttle.NotifyWhenReady()
|
||||
result := <-ch
|
||||
close(ch)
|
||||
|
||||
if !result {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// One final try
|
||||
_, err := channel.transmitter.Transmit(payload, items)
|
||||
if err != nil {
|
||||
diagnosticsWriter.Write("Gave up transmitting payload; exhausted retries")
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) signalWhenDone(callback chan struct{}) {
|
||||
if callback != nil {
|
||||
go func() {
|
||||
channel.waitgroup.Wait()
|
||||
close(callback)
|
||||
}()
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue