1
0
Fork 0

Merge branch 'v1.3'

This commit is contained in:
Fernandez Ludovic 2017-06-30 00:04:04 +02:00
commit 3776e58041
9 changed files with 111 additions and 36 deletions

View file

@ -150,8 +150,6 @@ type Marathon interface {
}
var (
// ErrInvalidResponse is thrown when marathon responds with invalid or error response
ErrInvalidResponse = errors.New("invalid response from Marathon")
// ErrMarathonDown is thrown when all the marathon endpoints are down
ErrMarathonDown = errors.New("all the Marathon hosts are presently down")
// ErrTimeoutError is thrown when the operation has timed out
@ -303,8 +301,7 @@ func (r *marathonClient) apiCall(method, path string, body, result interface{})
if response.StatusCode >= 200 && response.StatusCode <= 299 {
if result != nil {
if err := json.Unmarshal(respBody, result); err != nil {
r.debugLog.Printf("apiCall(): failed to unmarshall the response from marathon, error: %s\n", err)
return ErrInvalidResponse
return fmt.Errorf("failed to unmarshal response from Marathon: %s", err)
}
}
return nil

View file

@ -27,7 +27,7 @@ type HealthCheck struct {
GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"`
IntervalSeconds int `json:"intervalSeconds,omitempty"`
TimeoutSeconds int `json:"timeoutSeconds,omitempty"`
IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,ommitempty"`
IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,omitempty"`
}
// SetCommand sets the given command on the health check.

View file

@ -16,12 +16,54 @@ limitations under the License.
package marathon
import (
"encoding/json"
"fmt"
)
const UnreachableStrategyAbsenceReasonDisabled = "disabled"
// UnreachableStrategy is the unreachable strategy applied to an application.
type UnreachableStrategy struct {
EnabledUnreachableStrategy
AbsenceReason string
}
// EnabledUnreachableStrategy covers parameters pertaining to present unreachable strategies.
type EnabledUnreachableStrategy struct {
InactiveAfterSeconds *float64 `json:"inactiveAfterSeconds,omitempty"`
ExpungeAfterSeconds *float64 `json:"expungeAfterSeconds,omitempty"`
}
type unreachableStrategy UnreachableStrategy
// UnmarshalJSON unmarshals the given JSON into an UnreachableStrategy. It
// populates parameters for present strategies, and otherwise only sets the
// absence reason.
func (us *UnreachableStrategy) UnmarshalJSON(b []byte) error {
var u unreachableStrategy
var errEnabledUS, errNonEnabledUS error
if errEnabledUS = json.Unmarshal(b, &u); errEnabledUS == nil {
*us = UnreachableStrategy(u)
return nil
}
if errNonEnabledUS = json.Unmarshal(b, &us.AbsenceReason); errNonEnabledUS == nil {
return nil
}
return fmt.Errorf("failed to unmarshal unreachable strategy: unmarshaling into enabled returned error '%s'; unmarshaling into non-enabled returned error '%s'", errEnabledUS, errNonEnabledUS)
}
// MarshalJSON marshals the unreachable strategy.
func (us *UnreachableStrategy) MarshalJSON() ([]byte, error) {
if us.AbsenceReason == "" {
return json.Marshal(us.EnabledUnreachableStrategy)
}
return json.Marshal(us.AbsenceReason)
}
// SetInactiveAfterSeconds sets the period after which instance will be marked as inactive.
func (us UnreachableStrategy) SetInactiveAfterSeconds(cap float64) UnreachableStrategy {
us.InactiveAfterSeconds = &cap

View file

@ -159,7 +159,9 @@ func (f *Forwarder) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// serveHTTP forwards HTTP traffic using the configured transport
func (f *httpForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx *handlerContext) {
start := time.Now().UTC()
response, err := f.roundTripper.RoundTrip(f.copyRequest(req, req.URL))
if err != nil {
ctx.log.Errorf("Error forwarding to %v, err: %v", req.URL, err)
ctx.errHandler.ServeHTTP(w, req, err)
@ -169,6 +171,16 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx
utils.CopyHeaders(w.Header(), response.Header)
// Remove hop-by-hop headers.
utils.RemoveHeaders(w.Header(), HopHeaders...)
announcedTrailerKeyCount := len(response.Trailer)
if announcedTrailerKeyCount > 0 {
trailerKeys := make([]string, 0, announcedTrailerKeyCount)
for k := range response.Trailer {
trailerKeys = append(trailerKeys, k)
}
w.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
w.WriteHeader(response.StatusCode)
stream := f.streamResponse
@ -179,6 +191,20 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx
}
}
written, err := io.Copy(newResponseFlusher(w, stream), response.Body)
if err != nil {
ctx.log.Errorf("Error copying upstream response body: %v", err)
ctx.errHandler.ServeHTTP(w, req, err)
return
}
defer response.Body.Close()
forceSetTrailers := len(response.Trailer) != announcedTrailerKeyCount
shallowCopyTrailers(w.Header(), response.Trailer, forceSetTrailers)
if written != 0 {
w.Header().Set(ContentLength, strconv.FormatInt(written, 10))
}
if req.TLS != nil {
ctx.log.Infof("Round trip: %v, code: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v",
@ -192,17 +218,6 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx
req.URL, response.StatusCode, time.Now().UTC().Sub(start))
}
defer response.Body.Close()
if err != nil {
ctx.log.Errorf("Error copying upstream response Body: %v", err)
ctx.errHandler.ServeHTTP(w, req, err)
return
}
if written != 0 {
w.Header().Set(ContentLength, strconv.FormatInt(written, 10))
}
}
// copyRequest makes a copy of the specified request to be sent using the configured
@ -364,3 +379,12 @@ func isWebsocketRequest(req *http.Request) bool {
}
return containsHeader(Connection, "upgrade") && containsHeader(Upgrade, "websocket")
}
func shallowCopyTrailers(dstHeader, srcTrailer http.Header, forceSetTrailers bool) {
for k, vv := range srcTrailer {
if forceSetTrailers {
k = http.TrailerPrefix + k
}
dstHeader[k] = vv
}
}