update go-marathon to 441a03a

in order to get the latest fixes regarding SSE subscription failover.
This commit is contained in:
Marco Jantke 2017-06-13 10:18:10 +02:00 committed by Ludovic Fernandez
parent 885b9f371c
commit 2ddae2e856
7 changed files with 162 additions and 43 deletions

View file

@ -4,6 +4,7 @@ import (
"log"
"net/http"
"strings"
"sync"
)
type subscription struct {
@ -32,6 +33,8 @@ type Server struct {
subs chan *subscription
unregister chan *subscription
quit chan bool
isClosed bool
isClosedMutex sync.RWMutex
}
// Create a new Server ready for handler creation and publishing events
@ -51,6 +54,7 @@ func NewServer() *Server {
// Stop handling publishing
func (srv *Server) Close() {
srv.quit <- true
srv.markServerClosed()
}
// Create a new handler for serving a specified channel
@ -69,6 +73,12 @@ func (srv *Server) Handler(channel string) http.HandlerFunc {
}
w.WriteHeader(http.StatusOK)
// If the Handler is still active even though the server is closed, stop here.
// Otherwise the Handler will block while publishing to srv.subs indefinitely.
if srv.isServerClosed() {
return
}
sub := &subscription{
channel: channel,
lastEventId: req.Header.Get("Last-Event-ID"),
@ -165,3 +175,15 @@ func (srv *Server) run() {
}
}
}
func (srv *Server) isServerClosed() bool {
srv.isClosedMutex.RLock()
defer srv.isClosedMutex.RUnlock()
return srv.isClosed
}
func (srv *Server) markServerClosed() {
srv.isClosedMutex.Lock()
defer srv.isClosedMutex.Unlock()
srv.isClosed = true
}

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
@ -27,6 +28,10 @@ type Stream struct {
Errors chan error
// Logger is a logger that, when set, will be used for logging debug messages
Logger *log.Logger
// isClosed is a marker that the stream is/should be closed
isClosed bool
// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
isClosedMutex sync.RWMutex
}
type SubscriptionError struct {
@ -61,7 +66,7 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques
c: client,
req: request,
lastEventId: lastEventId,
retry: (time.Millisecond * 3000),
retry: time.Millisecond * 3000,
Events: make(chan Event),
Errors: make(chan error),
}
@ -75,6 +80,29 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques
return stream, nil
}
// Close will close the stream. It is safe for concurrent access and can be called multiple times.
func (stream *Stream) Close() {
if stream.isStreamClosed() {
return
}
stream.markStreamClosed()
close(stream.Errors)
close(stream.Events)
}
func (stream *Stream) isStreamClosed() bool {
stream.isClosedMutex.RLock()
defer stream.isClosedMutex.RUnlock()
return stream.isClosed
}
func (stream *Stream) markStreamClosed() {
stream.isClosedMutex.Lock()
defer stream.isClosedMutex.Unlock()
stream.isClosed = true
}
// Go's http package doesn't copy headers across when it encounters
// redirects so we need to do that manually.
func checkRedirect(req *http.Request, via []*http.Request) error {
@ -112,15 +140,27 @@ func (stream *Stream) connect() (r io.ReadCloser, err error) {
func (stream *Stream) stream(r io.ReadCloser) {
defer r.Close()
// receives events until an error is encountered
stream.receiveEvents(r)
// tries to reconnect and start the stream again
stream.retryRestartStream()
}
func (stream *Stream) receiveEvents(r io.ReadCloser) {
dec := NewDecoder(r)
for {
ev, err := dec.Decode()
if stream.isStreamClosed() {
return
}
if err != nil {
stream.Errors <- err
// respond to all errors by reconnecting and trying again
break
return
}
pub := ev.(*publication)
if pub.Retry() > 0 {
stream.retry = time.Duration(pub.Retry()) * time.Millisecond
@ -130,20 +170,25 @@ func (stream *Stream) stream(r io.ReadCloser) {
}
stream.Events <- ev
}
}
func (stream *Stream) retryRestartStream() {
backoff := stream.retry
for {
time.Sleep(backoff)
if stream.Logger != nil {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}
time.Sleep(backoff)
if stream.isStreamClosed() {
return
}
// NOTE: because of the defer we're opening the new connection
// before closing the old one. Shouldn't be a problem in practice,
// but something to be aware of.
next, err := stream.connect()
r, err := stream.connect()
if err == nil {
go stream.stream(next)
break
go stream.stream(r)
return
}
stream.Errors <- err
backoff *= 2