1
0
Fork 0

Update go-marathon

This commit is contained in:
Timo Reimann 2017-12-19 16:00:09 +01:00 committed by Traefiker
parent 3142a4f4b3
commit 877770f7cf
24 changed files with 450 additions and 97 deletions

View file

@ -1,5 +1,5 @@
/*
Copyright 2014 Rohith All rights reserved.
Copyright 2014 The go-marathon Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -103,8 +103,7 @@ func (r *marathonClient) registerSubscription() error {
case EventsTransportCallback:
return r.registerCallbackSubscription()
case EventsTransportSSE:
r.registerSSESubscription()
return nil
return r.registerSSESubscription()
default:
return fmt.Errorf("the events transport: %d is not supported", r.config.EventsTransport)
}
@ -167,27 +166,34 @@ func (r *marathonClient) registerCallbackSubscription() error {
// connect to the SSE stream and to process the received events. To establish
// the connection it tries the active cluster members until no more member is
// active. When this happens it will retry to get a connection every 5 seconds.
func (r *marathonClient) registerSSESubscription() {
func (r *marathonClient) registerSSESubscription() error {
if r.subscribedToSSE {
return
return nil
}
if r.config.HTTPSSEClient.Timeout != 0 {
return fmt.Errorf(
"global timeout must not be set for SSE connections (found %s) -- remove global timeout from HTTP client or provide separate SSE HTTP client without global timeout",
r.config.HTTPSSEClient.Timeout,
)
}
go func() {
for {
stream, err := r.connectToSSE()
if err != nil {
r.debugLog.Printf("Error connecting SSE subscription: %s", err)
r.debugLog("Error connecting SSE subscription: %s", err)
<-time.After(5 * time.Second)
continue
}
err = r.listenToSSE(stream)
stream.Close()
r.debugLog.Printf("Error on SSE subscription: %s", err)
r.debugLog("Error on SSE subscription: %s", err)
}
}()
r.subscribedToSSE = true
return nil
}
// connectToSSE tries to establish an *eventsource.Stream to any of the Marathon cluster members, marking the
@ -209,15 +215,15 @@ func (r *marathonClient) connectToSSE() (*eventsource.Stream, error) {
// its underlying fields for performance reasons. See note that at least the Transport
// should be reused here: https://golang.org/pkg/net/http/#Client
httpClient := &http.Client{
Transport: r.config.HTTPClient.Transport,
CheckRedirect: r.config.HTTPClient.CheckRedirect,
Jar: r.config.HTTPClient.Jar,
Timeout: r.config.HTTPClient.Timeout,
Transport: r.config.HTTPSSEClient.Transport,
CheckRedirect: r.config.HTTPSSEClient.CheckRedirect,
Jar: r.config.HTTPSSEClient.Jar,
Timeout: r.config.HTTPSSEClient.Timeout,
}
stream, err := eventsource.SubscribeWith("", httpClient, request)
if err != nil {
r.debugLog.Printf("Error subscribing to Marathon event stream: %s", err)
r.debugLog("Error subscribing to Marathon event stream: %s", err)
r.hosts.markDown(member)
continue
}
@ -231,7 +237,7 @@ func (r *marathonClient) listenToSSE(stream *eventsource.Stream) error {
select {
case ev := <-stream.Events:
if err := r.handleEvent(ev.Data()); err != nil {
r.debugLog.Printf("listenToSSE(): failed to handle event: %v", err)
r.debugLog("listenToSSE(): failed to handle event: %v", err)
}
case err := <-stream.Errors:
return err
@ -319,12 +325,12 @@ func (r *marathonClient) handleCallbackEvent(writer http.ResponseWriter, request
body, err := ioutil.ReadAll(request.Body)
if err != nil {
// TODO should this return a 500?
r.debugLog.Printf("handleCallbackEvent(): failed to read request body, error: %s\n", err)
r.debugLog("handleCallbackEvent(): failed to read request body, error: %s", err)
return
}
if err := r.handleEvent(string(body[:])); err != nil {
// TODO should this return a 500?
r.debugLog.Printf("handleCallbackEvent(): failed to handle event: %v\n", err)
r.debugLog("handleCallbackEvent(): failed to handle event: %v", err)
}
}