1
0
Fork 0

Merge branch 'v2.0' into v2.1

This commit is contained in:
Fernandez Ludovic 2019-12-02 18:20:29 +01:00
commit 89919dbe36
52 changed files with 677 additions and 336 deletions

View file

@ -25,14 +25,20 @@ const (
var singleton *HealthCheck
var once sync.Once
// BalancerHandler includes functionality for load-balancing management.
type BalancerHandler interface {
ServeHTTP(w http.ResponseWriter, req *http.Request)
// Balancer is the set of operations required to manage the list of servers in a
// load-balancer.
type Balancer interface {
Servers() []*url.URL
RemoveServer(u *url.URL) error
UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error
}
// BalancerHandler includes functionality for load-balancing management.
type BalancerHandler interface {
ServeHTTP(w http.ResponseWriter, req *http.Request)
Balancer
}
// metricsRegistry is a local interface in the health check package, exposing only the required metrics
// necessary for the health check package. This makes it easier for the tests.
type metricsRegistry interface {
@ -49,7 +55,7 @@ type Options struct {
Transport http.RoundTripper
Interval time.Duration
Timeout time.Duration
LB BalancerHandler
LB Balancer
}
func (opt Options) String() string {
@ -146,18 +152,18 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
enabledURLs := backend.LB.Servers()
var newDisabledURLs []backendURL
// FIXME re enable metrics
for _, disableURL := range backend.disabledURLs {
for _, disabledURL := range backend.disabledURLs {
// FIXME serverUpMetricValue := float64(0)
if err := checkHealth(disableURL.url, backend); err == nil {
if err := checkHealth(disabledURL.url, backend); err == nil {
logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d",
backend.name, disableURL.url.String(), disableURL.weight)
if err = backend.LB.UpsertServer(disableURL.url, roundrobin.Weight(disableURL.weight)); err != nil {
backend.name, disabledURL.url.String(), disabledURL.weight)
if err = backend.LB.UpsertServer(disabledURL.url, roundrobin.Weight(disabledURL.weight)); err != nil {
logger.Error(err)
}
// FIXME serverUpMetricValue = 1
} else {
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disableURL.url.String(), err)
newDisabledURLs = append(newDisabledURLs, disableURL)
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disabledURL.url.String(), err)
newDisabledURLs = append(newDisabledURLs, disabledURL)
}
// FIXME labelValues := []string{"backend", backend.name, "url", backendurl.url.String()}
// FIXME hc.metrics.BackendServerUpGauge().With(labelValues...).Set(serverUpMetricValue)
@ -177,7 +183,7 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
weight = 1
}
}
logger.Warnf("Health check failed: Remove from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
logger.Warnf("Health check failed, removing from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
if err := backend.LB.RemoveServer(enableURL); err != nil {
logger.Error(err)
}
@ -281,3 +287,38 @@ func (lb *LbStatusUpdater) UpsertServer(u *url.URL, options ...roundrobin.Server
}
return err
}
// Balancers is a list of Balancers(s) that implements the Balancer interface.
type Balancers []Balancer
// Servers returns the servers url from all the BalancerHandler
func (b Balancers) Servers() []*url.URL {
var servers []*url.URL
for _, lb := range b {
servers = append(servers, lb.Servers()...)
}
return servers
}
// RemoveServer removes the given server from all the BalancerHandler,
// and updates the status of the server to "DOWN".
func (b Balancers) RemoveServer(u *url.URL) error {
for _, lb := range b {
if err := lb.RemoveServer(u); err != nil {
return err
}
}
return nil
}
// UpsertServer adds the given server to all the BalancerHandler,
// and updates the status of the server to "UP".
func (b Balancers) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
for _, lb := range b {
if err := lb.UpsertServer(u, options...); err != nil {
return err
}
}
return nil
}

View file

@ -116,7 +116,18 @@ type CoreLogData map[string]interface{}
// LogData is the data captured by the middleware so that it can be logged.
type LogData struct {
Core CoreLogData
Request http.Header
Request request
OriginResponse http.Header
DownstreamResponse http.Header
DownstreamResponse downstreamResponse
}
type downstreamResponse struct {
headers http.Header
status int
size int64
}
type request struct {
headers http.Header
count int64
}

View file

@ -47,8 +47,6 @@ func (n noopCloser) Close() error {
type handlerParams struct {
logDataTable *LogData
crr *captureRequestReader
crw *captureResponseWriter
}
// Handler will write each request and its response to the access log.
@ -122,7 +120,7 @@ func NewHandler(config *types.AccessLog) (*Handler, error) {
go func() {
defer logHandler.wg.Done()
for handlerParams := range logHandler.logHandlerChan {
logHandler.logTheRoundTrip(handlerParams.logDataTable, handlerParams.crr, handlerParams.crw)
logHandler.logTheRoundTrip(handlerParams.logDataTable)
}
}()
}
@ -162,7 +160,12 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
StartLocal: now.Local(),
}
logDataTable := &LogData{Core: core, Request: req.Header}
logDataTable := &LogData{
Core: core,
Request: request{
headers: req.Header,
},
}
reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable))
@ -205,16 +208,21 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
core[ClientUsername] = usernameIfPresent(reqWithDataTable.URL)
}
logDataTable.DownstreamResponse = crw.Header()
logDataTable.DownstreamResponse = downstreamResponse{
headers: crw.Header().Clone(),
status: crw.Status(),
size: crw.Size(),
}
if crr != nil {
logDataTable.Request.count = crr.count
}
if h.config.BufferingSize > 0 {
h.logHandlerChan <- handlerParams{
logDataTable: logDataTable,
crr: crr,
crw: crw,
}
} else {
h.logTheRoundTrip(logDataTable, crr, crw)
h.logTheRoundTrip(logDataTable)
}
}
@ -264,7 +272,7 @@ func usernameIfPresent(theURL *url.URL) string {
}
// Logging handler to log frontend name, backend name, and elapsed time.
func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestReader, crw *captureResponseWriter) {
func (h *Handler) logTheRoundTrip(logDataTable *LogData) {
core := logDataTable.Core
retryAttempts, ok := core[RetryAttempts].(int)
@ -272,23 +280,22 @@ func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestRead
retryAttempts = 0
}
core[RetryAttempts] = retryAttempts
core[RequestContentSize] = logDataTable.Request.count
if crr != nil {
core[RequestContentSize] = crr.count
}
core[DownstreamStatus] = crw.Status()
status := logDataTable.DownstreamResponse.status
core[DownstreamStatus] = status
// n.b. take care to perform time arithmetic using UTC to avoid errors at DST boundaries.
totalDuration := time.Now().UTC().Sub(core[StartUTC].(time.Time))
core[Duration] = totalDuration
if h.keepAccessLog(crw.Status(), retryAttempts, totalDuration) {
core[DownstreamContentSize] = crw.Size()
if h.keepAccessLog(status, retryAttempts, totalDuration) {
size := logDataTable.DownstreamResponse.size
core[DownstreamContentSize] = size
if original, ok := core[OriginContentSize]; ok {
o64 := original.(int64)
if crw.Size() != o64 && crw.Size() != 0 {
core[GzipRatio] = float64(o64) / float64(crw.Size())
if size != o64 && size != 0 {
core[GzipRatio] = float64(o64) / float64(size)
}
}
@ -305,9 +312,9 @@ func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestRead
}
}
h.redactHeaders(logDataTable.Request, fields, "request_")
h.redactHeaders(logDataTable.Request.headers, fields, "request_")
h.redactHeaders(logDataTable.OriginResponse, fields, "origin_")
h.redactHeaders(logDataTable.DownstreamResponse, fields, "downstream_")
h.redactHeaders(logDataTable.DownstreamResponse.headers, fields, "downstream_")
h.mu.Lock()
defer h.mu.Unlock()

View file

@ -192,6 +192,7 @@ func TestLoggerJSON(t *testing.T) {
Format: JSONFormat,
},
expected: map[string]func(t *testing.T, value interface{}){
RequestContentSize: assertFloat64(0),
RequestHost: assertString(testHostname),
RequestAddr: assertString(testHostname),
RequestMethod: assertString(testMethod),

View file

@ -221,13 +221,11 @@ func (s *Header) processCorsHeaders(rw http.ResponseWriter, req *http.Request) b
}
reqAcMethod := req.Header.Get("Access-Control-Request-Method")
reqAcHeaders := req.Header.Get("Access-Control-Request-Headers")
originHeader := req.Header.Get("Origin")
if reqAcMethod != "" && reqAcHeaders != "" && originHeader != "" && req.Method == http.MethodOptions {
if reqAcMethod != "" && originHeader != "" && req.Method == http.MethodOptions {
// If the request is an OPTIONS request with an Access-Control-Request-Method header,
// and Access-Control-Request-Headers headers, and Origin headers,
// then it is a CORS preflight request,
// and Origin headers, then it is a CORS preflight request,
// and we need to build a custom response: https://www.w3.org/TR/cors/#preflight-request
if s.headers.AccessControlAllowCredentials {
rw.Header().Set("Access-Control-Allow-Credentials", "true")

View file

@ -275,6 +275,25 @@ func TestCORSPreflights(t *testing.T) {
"Access-Control-Allow-Headers": {"origin,X-Forwarded-For"},
},
},
{
desc: "No Request Headers Preflight",
header: NewHeader(emptyHandler, dynamic.Headers{
AccessControlAllowMethods: []string{"GET", "OPTIONS", "PUT"},
AccessControlAllowOrigin: "*",
AccessControlAllowHeaders: []string{"origin", "X-Forwarded-For"},
AccessControlMaxAge: 600,
}),
requestHeaders: map[string][]string{
"Access-Control-Request-Method": {"GET", "OPTIONS"},
"Origin": {"https://foo.bar.org"},
},
expected: map[string][]string{
"Access-Control-Allow-Origin": {"*"},
"Access-Control-Max-Age": {"600"},
"Access-Control-Allow-Methods": {"GET,OPTIONS,PUT"},
"Access-Control-Allow-Headers": {"origin,X-Forwarded-For"},
},
},
}
for _, test := range testCases {

View file

@ -34,7 +34,11 @@ type entryPointMiddleware struct {
}
func (e *entryPointMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
spanCtx, _ := e.Extract(opentracing.HTTPHeaders, tracing.HTTPHeadersCarrier(req.Header))
spanCtx, err := e.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
if err != nil {
log.FromContext(middlewares.GetLoggerCtx(req.Context(), "tracing", entryPointTypeName)).
Debugf("Failed to extract the context: %v", err)
}
span, req, finish := e.StartSpanf(req, ext.SpanKindRPCServerEnum, "EntryPoint", []string{e.entryPoint, req.Host}, " ", ext.RPCServerOption(spanCtx))
defer finish()

View file

@ -193,7 +193,7 @@ func (p *Provider) parseMetadataSourcedRancherData(ctx context.Context, stacks [
}
service := rancherData{
Name: service.Name + "/" + stack.Name,
Name: service.Name + "_" + stack.Name,
State: service.State,
Labels: service.Labels,
Port: servicePort,

View file

@ -40,7 +40,7 @@ func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper htt
metricsRegistry: metricsRegistry,
bufferPool: newBufferPool(),
defaultRoundTripper: defaultRoundTripper,
balancers: make(map[string][]healthcheck.BalancerHandler),
balancers: make(map[string]healthcheck.Balancers),
configs: configs,
}
}
@ -51,8 +51,12 @@ type Manager struct {
metricsRegistry metrics.Registry
bufferPool httputil.BufferPool
defaultRoundTripper http.RoundTripper
balancers map[string][]healthcheck.BalancerHandler
configs map[string]*runtime.ServiceInfo
// balancers is the map of all Balancers, keyed by service name.
// There is one Balancer per service handler, and there is one service handler per reference to a service
// (e.g. if 2 routers refer to the same service name, 2 service handlers are created),
// which is why there is not just one Balancer per service name.
balancers map[string]healthcheck.Balancers
configs map[string]*runtime.ServiceInfo
}
// BuildHTTP Creates a http.Handler for a service configuration.
@ -92,14 +96,14 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
}
case conf.Weighted != nil:
var err error
lb, err = m.getLoadBalancerWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
lb, err = m.getWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
if err != nil {
conf.AddError(err, true)
return nil, err
}
case conf.Mirroring != nil:
var err error
lb, err = m.getLoadBalancerMirrorServiceHandler(ctx, serviceName, conf.Mirroring, responseModifier)
lb, err = m.getMirrorServiceHandler(ctx, conf.Mirroring, responseModifier)
if err != nil {
conf.AddError(err, true)
return nil, err
@ -113,7 +117,7 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
return lb, nil
}
func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, serviceName string, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
serviceHandler, err := m.BuildHTTP(ctx, config.Service, responseModifier)
if err != nil {
return nil, err
@ -134,7 +138,7 @@ func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, servi
return handler, nil
}
func (m *Manager) getLoadBalancerWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
// TODO Handle accesslog and metrics with multiple service name
if config.Sticky != nil && config.Sticky.Cookie != nil {
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
@ -200,15 +204,12 @@ func (m *Manager) LaunchHealthCheck() {
for serviceName, balancers := range m.balancers {
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
// TODO aggregate
balancer := balancers[0]
// TODO Should all the services handle healthcheck? Handle different types
service := m.configs[serviceName].LoadBalancer
// Health Check
var backendHealthCheck *healthcheck.BackendConfig
if hcOpts := buildHealthCheckOptions(ctx, balancer, serviceName, service.HealthCheck); hcOpts != nil {
if hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck); hcOpts != nil {
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
hcOpts.Transport = m.defaultRoundTripper
@ -224,7 +225,7 @@ func (m *Manager) LaunchHealthCheck() {
healthcheck.GetHealthCheck().SetBackendsConfiguration(context.Background(), backendConfigs)
}
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.BalancerHandler, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
if hc == nil || hc.Path == "" {
return nil
}

View file

@ -1,25 +0,0 @@
package tracing
import "net/http"
// HTTPHeadersCarrier custom implementation to fix duplicated headers
// It has been fixed in https://github.com/opentracing/opentracing-go/pull/191
type HTTPHeadersCarrier http.Header
// Set conforms to the TextMapWriter interface.
func (c HTTPHeadersCarrier) Set(key, val string) {
h := http.Header(c)
h.Set(key, val)
}
// ForeachKey conforms to the TextMapReader interface.
func (c HTTPHeadersCarrier) ForeachKey(handler func(key, val string) error) error {
for k, vals := range c {
for _, v := range vals {
if err := handler(k, v); err != nil {
return err
}
}
}
return nil
}

View file

@ -134,7 +134,7 @@ func InjectRequestHeaders(r *http.Request) {
err := opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
HTTPHeadersCarrier(r.Header))
opentracing.HTTPHeadersCarrier(r.Header))
if err != nil {
log.FromContext(r.Context()).Error(err)
}