From a81171d5f13a9c1bc6309276483f1453a444f5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Harasimowicz?= Date: Wed, 31 Jan 2018 15:32:04 +0100 Subject: [PATCH] Request buffering middleware --- Gopkg.lock | 9 +- autogen/gentemplates/gen.go | 80 ++++ docs/configuration/backends/consulcatalog.md | 9 +- docs/configuration/backends/docker.md | 5 + docs/configuration/backends/ecs.md | 5 + docs/configuration/backends/marathon.md | 5 + docs/configuration/backends/mesos.md | 5 + docs/configuration/backends/rancher.md | 5 + docs/configuration/commons.md | 20 + .../consulcatalog/consul_catalog_config.go | 15 + .../consul_catalog_config_test.go | 46 ++ provider/docker/config.go | 1 + provider/docker/config_container.go | 14 + .../docker/config_container_docker_test.go | 60 +++ .../docker/config_container_swarm_test.go | 12 + provider/ecs/config.go | 15 + provider/ecs/config_test.go | 59 +++ .../kubernetes/builder_configuration_test.go | 41 ++ provider/kubernetes/kubernetes.go | 15 + provider/kubernetes/kubernetes_test.go | 48 +- provider/kv/keynames.go | 6 + provider/kv/kv_config.go | 20 + provider/kv/kv_config_test.go | 53 +++ provider/label/names.go | 12 + provider/marathon/config.go | 15 + provider/marathon/config_test.go | 67 +++ provider/mesos/config.go | 15 + provider/mesos/config_test.go | 63 ++- provider/rancher/config.go | 15 + provider/rancher/config_test.go | 62 +++ server/server.go | 36 ++ templates/consul_catalog.tmpl | 10 + templates/docker.tmpl | 10 + templates/ecs.tmpl | 10 + templates/kubernetes.tmpl | 10 + templates/kv.tmpl | 10 + templates/marathon.tmpl | 10 + templates/mesos.tmpl | 10 + templates/rancher.tmpl | 10 + types/types.go | 10 + vendor/github.com/mailgun/multibuf/LICENSE | 202 +++++++++ vendor/github.com/mailgun/multibuf/buffer.go | 412 ++++++++++++++++++ .../github.com/vulcand/oxy/buffer/buffer.go | 398 +++++++++++++++++ .../vulcand/oxy/buffer/threshold.go | 225 ++++++++++ 44 files changed, 2155 insertions(+), 5 deletions(-) create mode 100644 vendor/github.com/mailgun/multibuf/LICENSE create mode 100644 vendor/github.com/mailgun/multibuf/buffer.go create mode 100644 vendor/github.com/vulcand/oxy/buffer/buffer.go create mode 100644 vendor/github.com/vulcand/oxy/buffer/threshold.go diff --git a/Gopkg.lock b/Gopkg.lock index 5b43d80d1..42d1e9ae1 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -778,6 +778,12 @@ packages = ["."] revision = "7c28d80e2ada649fc8ab1a37b86d30a2633bd47c" +[[projects]] + branch = "master" + name = "github.com/mailgun/multibuf" + packages = ["."] + revision = "565402cd71fbd9c12aa7e295324ea357e970a61e" + [[projects]] name = "github.com/mailgun/timetools" packages = ["."] @@ -1135,6 +1141,7 @@ branch = "master" name = "github.com/vulcand/oxy" packages = [ + "buffer", "cbreaker", "connlimit", "forward", @@ -1511,6 +1518,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "b9fee5807e09b19baf3763abc285ea122d64d60dba42aaf47eaf9e96774b46bf" + inputs-digest = "b2cb3354504a6350e3022513b39206a3ece7c2f1a509bcef907c45422a8afb31" solver-name = "gps-cdcl" solver-version = 1 diff --git a/autogen/gentemplates/gen.go b/autogen/gentemplates/gen.go index 9b963dea7..a4518fa2c 100644 --- a/autogen/gentemplates/gen.go +++ b/autogen/gentemplates/gen.go @@ -90,6 +90,16 @@ var _templatesConsul_catalogTmpl = []byte(`[backends] interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $service.Attributes }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{end}} {{range $index, $node := .Nodes}} @@ -268,6 +278,16 @@ var _templatesDockerTmpl = []byte(`{{$backendServers := .Servers}} interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{ $servers := index $backendServers $backendName }} {{range $serverName, $server := $servers }} {{if hasServices $server }} @@ -578,6 +598,16 @@ var _templatesEcsTmpl = []byte(`[backends] interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $firstInstance }} + {{if $buffering }} + [backends.backend-{{ $serviceName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $instances }} [backends.backend-{{ $serviceName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" @@ -767,6 +797,16 @@ var _templatesKubernetesTmpl = []byte(`[backends]{{range $backendName, $backend [backends."{{$backendName}}".loadbalancer.stickiness] cookieName = "{{$backend.LoadBalancer.Stickiness.CookieName}}" {{end}} + + {{if $backend.Buffering }} + [backends."{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $backend.Buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $backend.Buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $backend.Buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $backend.Buffering.MemResponseBodyBytes }} + retryExpression = "{{ $backend.Buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := $backend.Servers}} [backends."{{$backendName}}".servers."{{$serverName}}"] url = "{{$server.URL}}" @@ -910,6 +950,16 @@ var _templatesKvTmpl = []byte(`[backends] interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends.{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $backend}} [backends."{{ $backendName }}".servers."{{ $serverName }}"] url = "{{ $server.URL }}" @@ -1107,6 +1157,16 @@ var _templatesMarathonTmpl = []byte(`{{ $apps := .Applications }} interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $app }} + {{if $buffering }} + [backends."{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $app $serviceName }} [backends."{{ $backendName }}".servers."{{ $serverName }}"] url = "{{ $server.URL }}" @@ -1290,6 +1350,16 @@ var _templatesMesosTmpl = []byte(`[backends] interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $app }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $tasks }} [backends.backend-{{ $backendName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" @@ -1493,6 +1563,16 @@ var _templatesRancherTmpl = []byte(`{{ $backendServers := .Backends }} interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends."backend-{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $backend}} [backends.backend-{{ $backendName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" diff --git a/docs/configuration/backends/consulcatalog.md b/docs/configuration/backends/consulcatalog.md index 368d7d95a..ac90741ea 100644 --- a/docs/configuration/backends/consulcatalog.md +++ b/docs/configuration/backends/consulcatalog.md @@ -67,6 +67,11 @@ Additional settings can be defined using Consul Catalog tags. | `.port=80` | Register this port. Useful when the container exposes multiples ports. | | `.protocol=https` | Override the default `http` protocol. | | `.weight=10` | Assign this weight to the container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend. ex: `NetworkErrorRatio() > 0.` | | `.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | @@ -128,7 +133,7 @@ Additional settings can be defined using Consul Catalog tags. If you want that Træfik uses Consul tags correctly you need to defined them like that: -```json +```js traefik.enable=true traefik.tags=api traefik.tags=external @@ -136,7 +141,7 @@ traefik.tags=external If the prefix defined in Træfik configuration is `bla`, tags need to be defined like that: -```json +```js bla.enable=true bla.tags=api bla.tags=external diff --git a/docs/configuration/backends/docker.md b/docs/configuration/backends/docker.md index ab107264e..c57c432de 100644 --- a/docs/configuration/backends/docker.md +++ b/docs/configuration/backends/docker.md @@ -171,6 +171,11 @@ Labels can be used on containers to override default behaviour. | `traefik.protocol=https` | Override the default `http` protocol | | `traefik.weight=10` | Assign this weight to the container | | `traefik.backend=foo` | Give the name `foo` to the generated backend for this container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend | | `traefik.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | diff --git a/docs/configuration/backends/ecs.md b/docs/configuration/backends/ecs.md index b1d09d0f4..a5513c7dc 100644 --- a/docs/configuration/backends/ecs.md +++ b/docs/configuration/backends/ecs.md @@ -131,6 +131,11 @@ Labels can be used on task containers to override default behaviour: | `traefik.protocol=https` | Override the default `http` protocol | | `traefik.weight=10` | Assign this weight to the container | | `traefik.backend=foo` | Give the name `foo` to the generated backend for this container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend | | `traefik.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | diff --git a/docs/configuration/backends/marathon.md b/docs/configuration/backends/marathon.md index 4d3ba7ee8..6aac6fe17 100644 --- a/docs/configuration/backends/marathon.md +++ b/docs/configuration/backends/marathon.md @@ -168,6 +168,11 @@ The following labels can be defined on Marathon applications. They adjust the be | `traefik.protocol=https` | Override the default `http` protocol | | `traefik.weight=10` | Assign this weight to the container | | `traefik.backend=foo` | Give the name `foo` to the generated backend for this container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend | | `traefik.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | diff --git a/docs/configuration/backends/mesos.md b/docs/configuration/backends/mesos.md index 2bc82a738..56a8cc45b 100644 --- a/docs/configuration/backends/mesos.md +++ b/docs/configuration/backends/mesos.md @@ -104,6 +104,11 @@ The following labels can be defined on Mesos tasks. They adjust the behaviour fo | `traefik.protocol=https` | Override the default `http` protocol | | `traefik.weight=10` | Assign this weight to the container | | `traefik.backend=foo` | Give the name `foo` to the generated backend for this container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend | | `traefik.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | diff --git a/docs/configuration/backends/rancher.md b/docs/configuration/backends/rancher.md index 32e666953..79981b866 100644 --- a/docs/configuration/backends/rancher.md +++ b/docs/configuration/backends/rancher.md @@ -127,6 +127,11 @@ Labels can be used on task containers to override default behaviour: | `traefik.protocol=https` | Override the default `http` protocol | | `traefik.weight=10` | Assign this weight to the container | | `traefik.backend=foo` | Give the name `foo` to the generated backend for this container. | +| `traefik.backend.buffering.maxRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.maxResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memRequestBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | +| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Create a [circuit breaker](/basics/#backends) to be used against the backend | | `traefik.backend.healthcheck.path=/health` | Enable health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.port=8080` | Allow to use a different port for the health check. | diff --git a/docs/configuration/commons.md b/docs/configuration/commons.md index acf87a4f6..234a3a347 100644 --- a/docs/configuration/commons.md +++ b/docs/configuration/commons.md @@ -299,6 +299,26 @@ In the above example, frontend1 is configured to limit requests by the client's An average of 5 requests every 3 seconds is allowed and an average of 100 requests every 10 seconds. These can "burst" up to 10 and 200 in each period respectively. +## Buffering + +In some cases request/buffering can be enabled for a specific backend. +By enabling this, Træfik will read the entire request into memory (possibly buffering large requests into disk) and will reject requests that are over a specified limit. +This may help services deal with large data (multipart/form-data for example) more efficiently and should minimise time spent when sending data to a backend server. + +For more information please check [oxy/buffer](http://godoc.org/github.com/vulcand/oxy/buffer) documentation. + +Example configuration: + +```toml +[backends] + [backends.backend1] + [backends.backend1.buffering] + maxRequestBodyBytes = 10485760 + memRequestBodyBytes = 2097152 + maxResponseBodyBytes = 10485760 + memResponseBodyBytes = 2097152 + retryExpression = "IsNetworkError() && Attempts() <= 2" +``` ## Retry Configuration diff --git a/provider/consulcatalog/consul_catalog_config.go b/provider/consulcatalog/consul_catalog_config.go index a096e664b..90760773b 100644 --- a/provider/consulcatalog/consul_catalog_config.go +++ b/provider/consulcatalog/consul_catalog_config.go @@ -40,6 +40,7 @@ func (p *Provider) buildConfiguration(catalog []catalogUpdate) *types.Configurat "getLoadBalancer": p.getLoadBalancer, "getMaxConn": p.getMaxConn, "getHealthCheck": p.getHealthCheck, + "getBuffering": p.getBuffering, // Frontend functions "getFrontendRule": p.getFrontendRule, @@ -296,6 +297,20 @@ func (p *Provider) getHealthCheck(tags []string) *types.HealthCheck { } } +func (p *Provider) getBuffering(tags []string) *types.Buffering { + if !p.hasAttributePrefix(label.SuffixBackendBuffering, tags) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: p.getInt64Attribute(label.SuffixBackendBufferingMaxRequestBodyBytes, tags, 0), + MaxResponseBodyBytes: p.getInt64Attribute(label.SuffixBackendBufferingMaxResponseBodyBytes, tags, 0), + MemRequestBodyBytes: p.getInt64Attribute(label.SuffixBackendBufferingMemRequestBodyBytes, tags, 0), + MemResponseBodyBytes: p.getInt64Attribute(label.SuffixBackendBufferingMemResponseBodyBytes, tags, 0), + RetryExpression: p.getAttribute(label.SuffixBackendBufferingRetryExpression, tags, ""), + } +} + func (p *Provider) getRedirect(tags []string) *types.Redirect { if p.hasAttribute(label.SuffixFrontendRedirectEntryPoint, tags) { return &types.Redirect{ diff --git a/provider/consulcatalog/consul_catalog_config_test.go b/provider/consulcatalog/consul_catalog_config_test.go index 9524027ff..9afb21db9 100644 --- a/provider/consulcatalog/consul_catalog_config_test.go +++ b/provider/consulcatalog/consul_catalog_config_test.go @@ -1002,6 +1002,52 @@ func TestProviderGetHealthCheck(t *testing.T) { } } +func TestProviderGetBuffering(t *testing.T) { + p := &Provider{ + Prefix: "traefik", + } + + testCases := []struct { + desc string + tags []string + expected *types.Buffering + }{ + { + desc: "should return nil when no tags", + tags: []string{}, + expected: nil, + }, + { + desc: "should return a struct when has proper tags", + tags: []string{ + label.TraefikBackendBufferingMaxResponseBodyBytes + "=10485760", + label.TraefikBackendBufferingMemResponseBodyBytes + "=2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes + "=10485760", + label.TraefikBackendBufferingMemRequestBodyBytes + "=2097152", + label.TraefikBackendBufferingRetryExpression + "=IsNetworkError() && Attempts() <= 2", + }, + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + result := p.getBuffering(test.tags) + + assert.Equal(t, test.expected, result) + }) + } +} + func TestProviderGetRedirect(t *testing.T) { p := &Provider{ Prefix: "traefik", diff --git a/provider/docker/config.go b/provider/docker/config.go index cfcb35b77..1942ccc69 100644 --- a/provider/docker/config.go +++ b/provider/docker/config.go @@ -24,6 +24,7 @@ func (p *Provider) buildConfiguration(containersInspected []dockerData) *types.C "getProtocol": getFuncStringLabel(label.TraefikProtocol, label.DefaultProtocol), "getMaxConn": getMaxConn, "getHealthCheck": getHealthCheck, + "getBuffering": getBuffering, "getCircuitBreaker": getCircuitBreaker, "getLoadBalancer": getLoadBalancer, diff --git a/provider/docker/config_container.go b/provider/docker/config_container.go index 867dcbc0f..772bc7b3d 100644 --- a/provider/docker/config_container.go +++ b/provider/docker/config_container.go @@ -214,6 +214,20 @@ func getHealthCheck(container dockerData) *types.HealthCheck { } } +func getBuffering(container dockerData) *types.Buffering { + if !label.HasPrefix(container.Labels, label.TraefikBackendBuffering) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: label.GetInt64Value(container.Labels, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MaxResponseBodyBytes: label.GetInt64Value(container.Labels, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemRequestBodyBytes: label.GetInt64Value(container.Labels, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MemResponseBodyBytes: label.GetInt64Value(container.Labels, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: label.GetStringValue(container.Labels, label.TraefikBackendBufferingRetryExpression, ""), + } +} + func getRedirect(container dockerData) *types.Redirect { if label.Has(container.Labels, label.TraefikFrontendRedirectEntryPoint) { return &types.Redirect{ diff --git a/provider/docker/config_container_docker_test.go b/provider/docker/config_container_docker_test.go index bea44da54..a25a74b8b 100644 --- a/provider/docker/config_container_docker_test.go +++ b/provider/docker/config_container_docker_test.go @@ -108,6 +108,11 @@ func TestDockerBuildConfiguration(t *testing.T) { label.TraefikBackendLoadBalancerStickinessCookieName: "chocolate", label.TraefikBackendMaxConnAmount: "666", label.TraefikBackendMaxConnExtractorFunc: "client.ip", + label.TraefikBackendBufferingMaxResponseBodyBytes: "10485760", + label.TraefikBackendBufferingMemResponseBodyBytes: "2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes: "10485760", + label.TraefikBackendBufferingMemRequestBodyBytes: "2097152", + label.TraefikBackendBufferingRetryExpression: "IsNetworkError() && Attempts() <= 2", label.TraefikFrontendAuthBasic: "test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0", label.TraefikFrontendEntryPoints: "http,https", @@ -284,6 +289,13 @@ func TestDockerBuildConfiguration(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, }, @@ -1421,6 +1433,54 @@ func TestDockerGetHealthCheck(t *testing.T) { } } +func TestDockerGetBuffering(t *testing.T) { + testCases := []struct { + desc string + container docker.ContainerJSON + expected *types.Buffering + }{ + { + desc: "should return nil when no health check labels", + container: containerJSON( + name("test1"), + labels(map[string]string{})), + expected: nil, + }, + { + desc: "should return a struct when buffering labels are set", + container: containerJSON( + name("test1"), + labels(map[string]string{ + label.TraefikBackendBufferingMaxResponseBodyBytes: "10485760", + label.TraefikBackendBufferingMemResponseBodyBytes: "2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes: "10485760", + label.TraefikBackendBufferingMemRequestBodyBytes: "2097152", + label.TraefikBackendBufferingRetryExpression: "IsNetworkError() && Attempts() <= 2", + })), + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + dData := parseContainer(test.container) + + actual := getBuffering(dData) + + assert.Equal(t, test.expected, actual) + }) + } +} + func TestDockerGetHeaders(t *testing.T) { testCases := []struct { desc string diff --git a/provider/docker/config_container_swarm_test.go b/provider/docker/config_container_swarm_test.go index 09111673e..81594a496 100644 --- a/provider/docker/config_container_swarm_test.go +++ b/provider/docker/config_container_swarm_test.go @@ -117,6 +117,11 @@ func TestSwarmBuildConfiguration(t *testing.T) { label.TraefikBackendLoadBalancerStickinessCookieName: "chocolate", label.TraefikBackendMaxConnAmount: "666", label.TraefikBackendMaxConnExtractorFunc: "client.ip", + label.TraefikBackendBufferingMaxResponseBodyBytes: "10485760", + label.TraefikBackendBufferingMemResponseBodyBytes: "2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes: "10485760", + label.TraefikBackendBufferingMemRequestBodyBytes: "2097152", + label.TraefikBackendBufferingRetryExpression: "IsNetworkError() && Attempts() <= 2", label.TraefikFrontendAuthBasic: "test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0", label.TraefikFrontendEntryPoints: "http,https", @@ -292,6 +297,13 @@ func TestSwarmBuildConfiguration(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, networks: map[string]*docker.NetworkResource{ diff --git a/provider/ecs/config.go b/provider/ecs/config.go index fc3551ad5..b21be0fa4 100644 --- a/provider/ecs/config.go +++ b/provider/ecs/config.go @@ -25,6 +25,7 @@ func (p *Provider) buildConfiguration(services map[string][]ecsInstance) (*types "getLoadBalancer": getLoadBalancer, "getMaxConn": getMaxConn, "getHealthCheck": getHealthCheck, + "getBuffering": getBuffering, "getServers": getServers, // TODO Deprecated [breaking] @@ -174,6 +175,20 @@ func getHealthCheck(instance ecsInstance) *types.HealthCheck { } } +func getBuffering(instance ecsInstance) *types.Buffering { + if !hasPrefix(instance, label.TraefikBackendBuffering) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: getInt64Value(instance, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MaxResponseBodyBytes: getInt64Value(instance, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemRequestBodyBytes: getInt64Value(instance, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MemResponseBodyBytes: getInt64Value(instance, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: getStringValue(instance, label.TraefikBackendBufferingRetryExpression, ""), + } +} + func getServers(instances []ecsInstance) map[string]types.Server { var servers map[string]types.Server diff --git a/provider/ecs/config_test.go b/provider/ecs/config_test.go index 46b88bd3d..3bc2dc1e9 100644 --- a/provider/ecs/config_test.go +++ b/provider/ecs/config_test.go @@ -136,6 +136,11 @@ func TestBuildConfiguration(t *testing.T) { label.TraefikBackendLoadBalancerStickinessCookieName: aws.String("chocolate"), label.TraefikBackendMaxConnAmount: aws.String("666"), label.TraefikBackendMaxConnExtractorFunc: aws.String("client.ip"), + label.TraefikBackendBufferingMaxResponseBodyBytes: aws.String("10485760"), + label.TraefikBackendBufferingMemResponseBodyBytes: aws.String("2097152"), + label.TraefikBackendBufferingMaxRequestBodyBytes: aws.String("10485760"), + label.TraefikBackendBufferingMemRequestBodyBytes: aws.String("2097152"), + label.TraefikBackendBufferingRetryExpression: aws.String("IsNetworkError() && Attempts() <= 2"), label.TraefikFrontendAuthBasic: aws.String("test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0"), label.TraefikFrontendEntryPoints: aws.String("http,https"), @@ -222,6 +227,13 @@ func TestBuildConfiguration(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, Frontends: map[string]*types.Frontend{ @@ -948,6 +960,53 @@ func TestGetHealthCheck(t *testing.T) { } } +func TestGetBuffering(t *testing.T) { + testCases := []struct { + desc string + instance ecsInstance + expected *types.Buffering + }{ + { + desc: "should return nil when no buffering labels", + instance: ecsInstance{ + containerDefinition: &ecs.ContainerDefinition{ + DockerLabels: map[string]*string{}, + }}, + expected: nil, + }, + { + desc: "should return a struct when health check labels are set", + instance: ecsInstance{ + containerDefinition: &ecs.ContainerDefinition{ + DockerLabels: map[string]*string{ + label.TraefikBackendBufferingMaxResponseBodyBytes: aws.String("10485760"), + label.TraefikBackendBufferingMemResponseBodyBytes: aws.String("2097152"), + label.TraefikBackendBufferingMaxRequestBodyBytes: aws.String("10485760"), + label.TraefikBackendBufferingMemRequestBodyBytes: aws.String("2097152"), + label.TraefikBackendBufferingRetryExpression: aws.String("IsNetworkError() && Attempts() <= 2"), + }}}, + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + actual := getBuffering(test.instance) + + assert.Equal(t, test.expected, actual) + }) + } +} + func TestGetServers(t *testing.T) { testCases := []struct { desc string diff --git a/provider/kubernetes/builder_configuration_test.go b/provider/kubernetes/builder_configuration_test.go index d162571b3..89999d992 100644 --- a/provider/kubernetes/builder_configuration_test.go +++ b/provider/kubernetes/builder_configuration_test.go @@ -90,6 +90,47 @@ func circuitBreaker(exp string) func(*types.Backend) { } } +func buffering(opts ...func(*types.Buffering)) func(*types.Backend) { + return func(b *types.Backend) { + if b.Buffering == nil { + b.Buffering = &types.Buffering{} + } + for _, opt := range opts { + opt(b.Buffering) + } + } +} + +func maxRequestBodyBytes(value int64) func(*types.Buffering) { + return func(b *types.Buffering) { + b.MaxRequestBodyBytes = value + } +} + +func memRequestBodyBytes(value int64) func(*types.Buffering) { + return func(b *types.Buffering) { + b.MemRequestBodyBytes = value + } +} + +func maxResponseBodyBytes(value int64) func(*types.Buffering) { + return func(b *types.Buffering) { + b.MaxResponseBodyBytes = value + } +} + +func memResponseBodyBytes(value int64) func(*types.Buffering) { + return func(b *types.Buffering) { + b.MemResponseBodyBytes = value + } +} + +func retrying(exp string) func(*types.Buffering) { + return func(b *types.Buffering) { + b.RetryExpression = exp + } +} + // Frontend func buildFrontends(opts ...func(*types.Frontend) string) map[string]*types.Frontend { diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 7c440bfc6..7c0728c7e 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -295,6 +295,8 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) } } + templateObjects.Backends[r.Host+pa.Path].Buffering = getBuffering(service) + if service.Annotations[label.TraefikBackendLoadBalancerMethod] == "drr" { templateObjects.Backends[r.Host+pa.Path].LoadBalancer.Method = "drr" } @@ -538,3 +540,16 @@ func getFrontendRedirect(i *v1beta1.Ingress) *types.Redirect { } return nil } + +func getBuffering(service *v1.Service) *types.Buffering { + if label.HasPrefix(service.Annotations, label.TraefikBackendBuffering) { + return &types.Buffering{ + MaxRequestBodyBytes: label.GetInt64Value(service.Annotations, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MemRequestBodyBytes: label.GetInt64Value(service.Annotations, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MaxResponseBodyBytes: label.GetInt64Value(service.Annotations, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemResponseBodyBytes: label.GetInt64Value(service.Annotations, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: label.GetStringValue(service.Annotations, label.TraefikBackendBufferingRetryExpression, ""), + } + } + return nil +} diff --git a/provider/kubernetes/kubernetes_test.go b/provider/kubernetes/kubernetes_test.go index 87058393c..a432a220b 100644 --- a/provider/kubernetes/kubernetes_test.go +++ b/provider/kubernetes/kubernetes_test.go @@ -431,6 +431,9 @@ func TestServiceAnnotations(t *testing.T) { iRule( iHost("bar"), iPaths(onePath(iBackend("service2", intstr.FromInt(802))))), + iRule( + iHost("baz"), + iPaths(onePath(iBackend("service3", intstr.FromInt(803))))), ), ), } @@ -456,6 +459,19 @@ func TestServiceAnnotations(t *testing.T) { clusterIP("10.0.0.2"), sPorts(sPort(802, ""))), ), + buildService( + sName("service3"), + sNamespace("testing"), + sUID("3"), + sAnnotation(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + sAnnotation(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + sAnnotation(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + sAnnotation(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + sAnnotation(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), + sSpec( + clusterIP("10.0.0.3"), + sPorts(sPort(803, ""))), + ), } endpoints := []*v1.Endpoints{ @@ -481,6 +497,17 @@ func TestServiceAnnotations(t *testing.T) { eAddresses(eAddress("10.15.0.2")), ePorts(ePort(8080, "http"))), ), + buildEndpoint( + eNamespace("testing"), + eName("service3"), + eUID("3"), + subset( + eAddresses(eAddress("10.14.0.1")), + ePorts(ePort(8080, "http"))), + subset( + eAddresses(eAddress("10.12.0.1")), + ePorts(ePort(8080, "http"))), + ), } watchChan := make(chan interface{}) @@ -510,6 +537,19 @@ func TestServiceAnnotations(t *testing.T) { server("http://10.15.0.2:8080", weight(1))), lbMethod("wrr"), lbSticky(), ), + backend("baz", + servers( + server("http://10.14.0.1:8080", weight(1)), + server("http://10.12.0.1:8080", weight(1))), + lbMethod("wrr"), + buffering( + maxRequestBodyBytes(10485760), + memRequestBodyBytes(2097152), + maxResponseBodyBytes(10485760), + memResponseBodyBytes(2097152), + retrying("IsNetworkError() && Attempts() <= 2"), + ), + ), ), frontends( frontend("foo/bar", @@ -522,7 +562,13 @@ func TestServiceAnnotations(t *testing.T) { frontend("bar", headers(), passHostHeader(), - routes(route("bar", "Host:bar"))), + routes(route("bar", "Host:bar")), + ), + frontend("baz", + headers(), + passHostHeader(), + routes(route("baz", "Host:baz")), + ), ), ) diff --git a/provider/kv/keynames.go b/provider/kv/keynames.go index dd97e9c25..d0c3f91b1 100644 --- a/provider/kv/keynames.go +++ b/provider/kv/keynames.go @@ -15,6 +15,12 @@ const ( pathBackendServers = "/servers/" pathBackendServerURL = "/url" pathBackendServerWeight = "/weight" + pathBackendBuffering = "/buffering/" + pathBackendBufferingMaxResponseBodyBytes = pathBackendBuffering + "maxresponsebodybytes" + pathBackendBufferingMemResponseBodyBytes = pathBackendBuffering + "memresponsebodybytes" + pathBackendBufferingMaxRequestBodyBytes = pathBackendBuffering + "maxrequestbodybytes" + pathBackendBufferingMemRequestBodyBytes = pathBackendBuffering + "memrequestbodybytes" + pathBackendBufferingRetryExpression = pathBackendBuffering + "retryexpression" pathFrontends = "/frontends/" pathFrontendBackend = "/backend" diff --git a/provider/kv/kv_config.go b/provider/kv/kv_config.go index e3800d022..172f9f29a 100644 --- a/provider/kv/kv_config.go +++ b/provider/kv/kv_config.go @@ -60,6 +60,7 @@ func (p *Provider) buildConfiguration() *types.Configuration { "getLoadBalancer": p.getLoadBalancer, "getMaxConn": p.getMaxConn, "getHealthCheck": p.getHealthCheck, + "getBuffering": p.getBuffering, "getSticky": p.getSticky, // Deprecated [breaking] "hasStickinessLabel": p.hasStickinessLabel, // Deprecated [breaking] "getStickinessCookieName": p.getStickinessCookieName, // Deprecated [breaking] @@ -273,6 +274,25 @@ func (p *Provider) getHealthCheck(rootPath string) *types.HealthCheck { } } +func (p *Provider) getBuffering(rootPath string) *types.Buffering { + pathsBuffering := p.list(rootPath, pathBackendBuffering) + + var buffering *types.Buffering + if len(pathsBuffering) > 0 { + if buffering == nil { + buffering = &types.Buffering{} + } + + buffering.MaxRequestBodyBytes = p.getInt64(0, rootPath, pathBackendBufferingMaxRequestBodyBytes) + buffering.MaxResponseBodyBytes = p.getInt64(0, rootPath, pathBackendBufferingMaxResponseBodyBytes) + buffering.MemRequestBodyBytes = p.getInt64(0, rootPath, pathBackendBufferingMemRequestBodyBytes) + buffering.MemResponseBodyBytes = p.getInt64(0, rootPath, pathBackendBufferingMemResponseBodyBytes) + buffering.RetryExpression = p.get("", rootPath, pathBackendBufferingRetryExpression) + } + + return buffering +} + func (p *Provider) getTLSSection(prefix string) []*tls.Configuration { var tlsSection []*tls.Configuration diff --git a/provider/kv/kv_config_test.go b/provider/kv/kv_config_test.go index 0eae28959..d95d96e93 100644 --- a/provider/kv/kv_config_test.go +++ b/provider/kv/kv_config_test.go @@ -76,6 +76,11 @@ func TestProviderBuildConfiguration(t *testing.T) { withPair(pathBackendHealthCheckInterval, "30s"), withPair(pathBackendMaxConnAmount, "5"), withPair(pathBackendMaxConnExtractorFunc, "client.ip"), + withPair(pathBackendBufferingMaxResponseBodyBytes, "10485760"), + withPair(pathBackendBufferingMemResponseBodyBytes, "2097152"), + withPair(pathBackendBufferingMaxRequestBodyBytes, "10485760"), + withPair(pathBackendBufferingMemRequestBodyBytes, "2097152"), + withPair(pathBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), withPair("servers/server1/url", "http://172.17.0.2:80"), withPair("servers/server1/weight", "0"), withPair("servers/server2/weight", "0")), @@ -162,6 +167,13 @@ func TestProviderBuildConfiguration(t *testing.T) { Port: 80, Interval: "30s", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, Frontends: map[string]*types.Frontend{ @@ -1700,6 +1712,47 @@ func TestProviderGetHealthCheck(t *testing.T) { } } +func TestProviderGetBufferingReal(t *testing.T) { + testCases := []struct { + desc string + rootPath string + kvPairs []*store.KVPair + expected *types.Buffering + }{ + { + desc: "when all configuration keys defined", + rootPath: "traefik/backends/foo", + kvPairs: filler("traefik", + backend("foo", + withPair(pathBackendBufferingMaxResponseBodyBytes, "10485760"), + withPair(pathBackendBufferingMemResponseBodyBytes, "2097152"), + withPair(pathBackendBufferingMaxRequestBodyBytes, "10485760"), + withPair(pathBackendBufferingMemRequestBodyBytes, "2097152"), + withPair(pathBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"))), + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + p := newProviderMock(test.kvPairs) + + result := p.getBuffering(test.rootPath) + + assert.Equal(t, test.expected, result) + }) + } +} + func TestProviderGetTLSes(t *testing.T) { testCases := []struct { desc string diff --git a/provider/label/names.go b/provider/label/names.go index 2f4f7899b..c75a6dcdc 100644 --- a/provider/label/names.go +++ b/provider/label/names.go @@ -24,6 +24,12 @@ const ( SuffixBackendLoadBalancerStickinessCookieName = SuffixBackendLoadBalancer + ".stickiness.cookieName" SuffixBackendMaxConnAmount = "backend.maxconn.amount" SuffixBackendMaxConnExtractorFunc = "backend.maxconn.extractorfunc" + SuffixBackendBuffering = "backend.buffering" + SuffixBackendBufferingMaxRequestBodyBytes = SuffixBackendBuffering + ".maxRequestBodyBytes" + SuffixBackendBufferingMemRequestBodyBytes = SuffixBackendBuffering + ".memRequestBodyBytes" + SuffixBackendBufferingMaxResponseBodyBytes = SuffixBackendBuffering + ".maxResponseBodyBytes" + SuffixBackendBufferingMemResponseBodyBytes = SuffixBackendBuffering + ".memResponseBodyBytes" + SuffixBackendBufferingRetryExpression = SuffixBackendBuffering + ".retryExpression" SuffixFrontend = "frontend" SuffixFrontendAuthBasic = "frontend.auth.basic" SuffixFrontendBackend = "frontend.backend" @@ -80,6 +86,12 @@ const ( TraefikBackendLoadBalancerStickinessCookieName = Prefix + SuffixBackendLoadBalancerStickinessCookieName TraefikBackendMaxConnAmount = Prefix + SuffixBackendMaxConnAmount TraefikBackendMaxConnExtractorFunc = Prefix + SuffixBackendMaxConnExtractorFunc + TraefikBackendBuffering = Prefix + SuffixBackendBuffering + TraefikBackendBufferingMaxRequestBodyBytes = Prefix + SuffixBackendBufferingMaxRequestBodyBytes + TraefikBackendBufferingMemRequestBodyBytes = Prefix + SuffixBackendBufferingMemRequestBodyBytes + TraefikBackendBufferingMaxResponseBodyBytes = Prefix + SuffixBackendBufferingMaxResponseBodyBytes + TraefikBackendBufferingMemResponseBodyBytes = Prefix + SuffixBackendBufferingMemResponseBodyBytes + TraefikBackendBufferingRetryExpression = Prefix + SuffixBackendBufferingRetryExpression TraefikFrontend = Prefix + SuffixFrontend TraefikFrontendAuthBasic = Prefix + SuffixFrontendAuthBasic TraefikFrontendEntryPoints = Prefix + SuffixFrontendEntryPoints diff --git a/provider/marathon/config.go b/provider/marathon/config.go index 873516f8a..8deab6e20 100644 --- a/provider/marathon/config.go +++ b/provider/marathon/config.go @@ -32,6 +32,7 @@ func (p *Provider) buildConfiguration() *types.Configuration { "getLoadBalancer": getLoadBalancer, "getMaxConn": getMaxConn, "getHealthCheck": getHealthCheck, + "getBuffering": getBuffering, "getServers": p.getServers, // TODO Deprecated [breaking] @@ -455,6 +456,20 @@ func getHealthCheck(application marathon.Application) *types.HealthCheck { } } +func getBuffering(application marathon.Application) *types.Buffering { + if !label.HasPrefixP(application.Labels, label.TraefikBackendBuffering) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: label.GetInt64ValueP(application.Labels, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MaxResponseBodyBytes: label.GetInt64ValueP(application.Labels, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemRequestBodyBytes: label.GetInt64ValueP(application.Labels, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MemResponseBodyBytes: label.GetInt64ValueP(application.Labels, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: label.GetStringValueP(application.Labels, label.TraefikBackendBufferingRetryExpression, ""), + } +} + func (p *Provider) getServers(application marathon.Application, serviceName string) map[string]types.Server { var servers map[string]types.Server diff --git a/provider/marathon/config_test.go b/provider/marathon/config_test.go index c08f2ea89..dfe6df5ea 100644 --- a/provider/marathon/config_test.go +++ b/provider/marathon/config_test.go @@ -190,6 +190,11 @@ func TestBuildConfigurationNonAPIErrors(t *testing.T) { withLabel(label.TraefikBackendLoadBalancerStickinessCookieName, "chocolate"), withLabel(label.TraefikBackendMaxConnAmount, "666"), withLabel(label.TraefikBackendMaxConnExtractorFunc, "client.ip"), + withLabel(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), withLabel(label.TraefikFrontendAuthBasic, "test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0"), withLabel(label.TraefikFrontendEntryPoints, "http,https"), @@ -367,6 +372,13 @@ func TestBuildConfigurationNonAPIErrors(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, }, @@ -494,6 +506,11 @@ func TestBuildConfigurationServicesNonAPIErrors(t *testing.T) { withLabel(label.TraefikBackendLoadBalancerStickinessCookieName, "chocolate"), withLabel(label.TraefikBackendMaxConnAmount, "666"), withLabel(label.TraefikBackendMaxConnExtractorFunc, "client.ip"), + withLabel(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), withServiceLabel(label.TraefikPort, "80", "containous"), withServiceLabel(label.TraefikProtocol, "https", "containous"), @@ -674,6 +691,13 @@ func TestBuildConfigurationServicesNonAPIErrors(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, }, @@ -1535,6 +1559,49 @@ func TestGetHealthCheck(t *testing.T) { } } +func TestGetBuffering(t *testing.T) { + testCases := []struct { + desc string + application marathon.Application + expected *types.Buffering + }{ + { + desc: "should return nil when no buffering labels", + application: application(appPorts(80)), + expected: nil, + }, + { + desc: "should return a struct when buffering labels are set", + + application: application( + withLabel(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), + ), + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + actual := getBuffering(test.application) + + assert.Equal(t, test.expected, actual) + }) + } +} + func TestGetServers(t *testing.T) { testCases := []struct { desc string diff --git a/provider/mesos/config.go b/provider/mesos/config.go index e980dda5b..d641b51bc 100644 --- a/provider/mesos/config.go +++ b/provider/mesos/config.go @@ -26,6 +26,7 @@ func (p *Provider) buildConfiguration(tasks []state.Task) *types.Configuration { "getLoadBalancer": getLoadBalancer, "getMaxConn": getMaxConn, "getHealthCheck": getHealthCheck, + "getBuffering": getBuffering, "getServers": p.getServers, "getHost": p.getHost, "getServerPort": p.getServerPort, @@ -300,6 +301,20 @@ func getHealthCheck(task state.Task) *types.HealthCheck { } } +func getBuffering(task state.Task) *types.Buffering { + if !hasPrefix(task, label.TraefikBackendBuffering) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: getInt64Value(task, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MaxResponseBodyBytes: getInt64Value(task, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemRequestBodyBytes: getInt64Value(task, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MemResponseBodyBytes: getInt64Value(task, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: getStringValue(task, label.TraefikBackendBufferingRetryExpression, ""), + } +} + func (p *Provider) getServers(tasks []state.Task) map[string]types.Server { var servers map[string]types.Server diff --git a/provider/mesos/config_test.go b/provider/mesos/config_test.go index 0ef46fa67..774f86370 100644 --- a/provider/mesos/config_test.go +++ b/provider/mesos/config_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestBuildConfigurationNew(t *testing.T) { +func TestBuildConfiguration(t *testing.T) { p := &Provider{ Domain: "docker.localhost", ExposedByDefault: true, @@ -132,6 +132,11 @@ func TestBuildConfigurationNew(t *testing.T) { withLabel(label.TraefikBackendLoadBalancerStickinessCookieName, "chocolate"), withLabel(label.TraefikBackendMaxConnAmount, "666"), withLabel(label.TraefikBackendMaxConnExtractorFunc, "client.ip"), + withLabel(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), withLabel(label.TraefikFrontendAuthBasic, "test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0"), withLabel(label.TraefikFrontendEntryPoints, "http,https"), @@ -307,6 +312,13 @@ func TestBuildConfigurationNew(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, }, @@ -810,6 +822,55 @@ func TestGetHealthCheck(t *testing.T) { } } +func TestGetBuffering(t *testing.T) { + testCases := []struct { + desc string + task state.Task + expected *types.Buffering + }{ + { + desc: "should return nil when no buffering labels", + task: aTask("ID1", + withIP("10.10.10.10"), + withInfo("name1", withPorts(withPort("TCP", 80, "WEB"))), + withDefaultStatus(), + ), + expected: nil, + }, + { + desc: "should return a struct when health check labels are set", + task: aTask("ID1", + withLabel(label.TraefikBackendBufferingMaxResponseBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemResponseBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingMaxRequestBodyBytes, "10485760"), + withLabel(label.TraefikBackendBufferingMemRequestBodyBytes, "2097152"), + withLabel(label.TraefikBackendBufferingRetryExpression, "IsNetworkError() && Attempts() <= 2"), + withIP("10.10.10.10"), + withInfo("name1", withPorts(withPort("TCP", 80, "WEB"))), + withDefaultStatus(), + ), + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + actual := getBuffering(test.task) + + assert.Equal(t, test.expected, actual) + }) + } +} + func TestGetServers(t *testing.T) { testCases := []struct { desc string diff --git a/provider/rancher/config.go b/provider/rancher/config.go index 75410e3c3..5bf01c57e 100644 --- a/provider/rancher/config.go +++ b/provider/rancher/config.go @@ -24,6 +24,7 @@ func (p *Provider) buildConfiguration(services []rancherData) *types.Configurati "getLoadBalancer": getLoadBalancer, "getMaxConn": getMaxConn, "getHealthCheck": getHealthCheck, + "getBuffering": getBuffering, "getServers": getServers, // TODO Deprecated [breaking] @@ -235,6 +236,20 @@ func getHealthCheck(service rancherData) *types.HealthCheck { } } +func getBuffering(service rancherData) *types.Buffering { + if !label.HasPrefix(service.Labels, label.TraefikBackendBuffering) { + return nil + } + + return &types.Buffering{ + MaxRequestBodyBytes: label.GetInt64Value(service.Labels, label.TraefikBackendBufferingMaxRequestBodyBytes, 0), + MaxResponseBodyBytes: label.GetInt64Value(service.Labels, label.TraefikBackendBufferingMaxResponseBodyBytes, 0), + MemRequestBodyBytes: label.GetInt64Value(service.Labels, label.TraefikBackendBufferingMemRequestBodyBytes, 0), + MemResponseBodyBytes: label.GetInt64Value(service.Labels, label.TraefikBackendBufferingMemResponseBodyBytes, 0), + RetryExpression: label.GetStringValue(service.Labels, label.TraefikBackendBufferingRetryExpression, ""), + } +} + func getServers(service rancherData) map[string]types.Server { var servers map[string]types.Server diff --git a/provider/rancher/config_test.go b/provider/rancher/config_test.go index 1bb056303..f58a01ee3 100644 --- a/provider/rancher/config_test.go +++ b/provider/rancher/config_test.go @@ -50,6 +50,11 @@ func TestProviderBuildConfiguration(t *testing.T) { label.TraefikBackendLoadBalancerStickinessCookieName: "chocolate", label.TraefikBackendMaxConnAmount: "666", label.TraefikBackendMaxConnExtractorFunc: "client.ip", + label.TraefikBackendBufferingMaxResponseBodyBytes: "10485760", + label.TraefikBackendBufferingMemResponseBodyBytes: "2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes: "10485760", + label.TraefikBackendBufferingMemRequestBodyBytes: "2097152", + label.TraefikBackendBufferingRetryExpression: "IsNetworkError() && Attempts() <= 2", label.TraefikFrontendAuthBasic: "test:$apr1$H6uskkkW$IgXLP6ewTrSuBkTrqE8wj/,test2:$apr1$d9hr9HBB$4HxwgUir3HP4EsggP/QNo0", label.TraefikFrontendEntryPoints: "http,https", @@ -228,6 +233,13 @@ func TestProviderBuildConfiguration(t *testing.T) { Port: 880, Interval: "6", }, + Buffering: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, }, }, }, @@ -856,6 +868,56 @@ func TestGetHealthCheck(t *testing.T) { } } +func TestGetBuffering(t *testing.T) { + testCases := []struct { + desc string + service rancherData + expected *types.Buffering + }{ + { + desc: "should return nil when no buffering labels", + service: rancherData{ + Labels: map[string]string{}, + Health: "healthy", + State: "active", + }, + expected: nil, + }, + { + desc: "should return a struct when buffering labels are set", + service: rancherData{ + Labels: map[string]string{ + label.TraefikBackendBufferingMaxResponseBodyBytes: "10485760", + label.TraefikBackendBufferingMemResponseBodyBytes: "2097152", + label.TraefikBackendBufferingMaxRequestBodyBytes: "10485760", + label.TraefikBackendBufferingMemRequestBodyBytes: "2097152", + label.TraefikBackendBufferingRetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + Health: "healthy", + State: "active", + }, + expected: &types.Buffering{ + MaxResponseBodyBytes: 10485760, + MemResponseBodyBytes: 2097152, + MaxRequestBodyBytes: 10485760, + MemRequestBodyBytes: 2097152, + RetryExpression: "IsNetworkError() && Attempts() <= 2", + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + actual := getBuffering(test.service) + + assert.Equal(t, test.expected, actual) + }) + } +} + func TestGetServers(t *testing.T) { testCases := []struct { desc string diff --git a/server/server.go b/server/server.go index fbda4830e..ef8b69505 100644 --- a/server/server.go +++ b/server/server.go @@ -42,6 +42,7 @@ import ( "github.com/sirupsen/logrus" thoas_stats "github.com/thoas/stats" "github.com/urfave/negroni" + "github.com/vulcand/oxy/buffer" "github.com/vulcand/oxy/connlimit" "github.com/vulcand/oxy/forward" "github.com/vulcand/oxy/ratelimit" @@ -1150,6 +1151,16 @@ func (s *Server) loadConfig(configurations types.Configurations, globalConfigura n.UseFunc(secureMiddleware.HandlerFuncWithNext) } + if config.Backends[frontend.Backend].Buffering != nil { + bufferedLb, err := s.buildBufferingMiddleware(lb, config.Backends[frontend.Backend].Buffering) + + if err != nil { + log.Errorf("Error setting up buffering middleware: %s", err) + } else { + lb = bufferedLb + } + } + if config.Backends[frontend.Backend].CircuitBreaker != nil { log.Debugf("Creating circuit breaker %s", config.Backends[frontend.Backend].CircuitBreaker.Expression) expression := config.Backends[frontend.Backend].CircuitBreaker.Expression @@ -1508,3 +1519,28 @@ func (s *Server) wrapHTTPHandlerWithAccessLog(handler http.Handler, frontendName } return handler } + +func (s *Server) buildBufferingMiddleware(handler http.Handler, config *types.Buffering) (http.Handler, error) { + log.Debugf("Setting up buffering: request limits: %d (mem), %d (max), response limits: %d (mem), %d (max) with retry: '%s'", + config.MemRequestBodyBytes, config.MaxRequestBodyBytes, config.MemResponseBodyBytes, + config.MaxResponseBodyBytes, config.RetryExpression) + + if len(config.RetryExpression) > 0 { + return buffer.New( + handler, + buffer.MemRequestBodyBytes(config.MemRequestBodyBytes), + buffer.MaxRequestBodyBytes(config.MaxRequestBodyBytes), + buffer.MemResponseBodyBytes(config.MemResponseBodyBytes), + buffer.MaxResponseBodyBytes(config.MaxResponseBodyBytes), + buffer.Retry(config.RetryExpression), + ) + } + + return buffer.New( + handler, + buffer.MemRequestBodyBytes(config.MemRequestBodyBytes), + buffer.MaxRequestBodyBytes(config.MaxRequestBodyBytes), + buffer.MemResponseBodyBytes(config.MemResponseBodyBytes), + buffer.MaxResponseBodyBytes(config.MaxResponseBodyBytes), + ) +} diff --git a/templates/consul_catalog.tmpl b/templates/consul_catalog.tmpl index e0bb1cd08..f4c68a959 100644 --- a/templates/consul_catalog.tmpl +++ b/templates/consul_catalog.tmpl @@ -34,6 +34,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $service.Attributes }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{end}} {{range $index, $node := .Nodes}} diff --git a/templates/docker.tmpl b/templates/docker.tmpl index ad57bf229..44a013268 100644 --- a/templates/docker.tmpl +++ b/templates/docker.tmpl @@ -34,6 +34,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{ $servers := index $backendServers $backendName }} {{range $serverName, $server := $servers }} {{if hasServices $server }} diff --git a/templates/ecs.tmpl b/templates/ecs.tmpl index cf341ac3d..28349bb12 100644 --- a/templates/ecs.tmpl +++ b/templates/ecs.tmpl @@ -34,6 +34,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $firstInstance }} + {{if $buffering }} + [backends.backend-{{ $serviceName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $instances }} [backends.backend-{{ $serviceName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" diff --git a/templates/kubernetes.tmpl b/templates/kubernetes.tmpl index e24e89b02..45ba4b089 100644 --- a/templates/kubernetes.tmpl +++ b/templates/kubernetes.tmpl @@ -13,6 +13,16 @@ [backends."{{$backendName}}".loadbalancer.stickiness] cookieName = "{{$backend.LoadBalancer.Stickiness.CookieName}}" {{end}} + + {{if $backend.Buffering }} + [backends."{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $backend.Buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $backend.Buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $backend.Buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $backend.Buffering.MemResponseBodyBytes }} + retryExpression = "{{ $backend.Buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := $backend.Servers}} [backends."{{$backendName}}".servers."{{$serverName}}"] url = "{{$server.URL}}" diff --git a/templates/kv.tmpl b/templates/kv.tmpl index 56765ecfd..13a380f50 100644 --- a/templates/kv.tmpl +++ b/templates/kv.tmpl @@ -34,6 +34,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends.{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $backend}} [backends."{{ $backendName }}".servers."{{ $serverName }}"] url = "{{ $server.URL }}" diff --git a/templates/marathon.tmpl b/templates/marathon.tmpl index 9612e8017..c7a45d124 100644 --- a/templates/marathon.tmpl +++ b/templates/marathon.tmpl @@ -39,6 +39,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $app }} + {{if $buffering }} + [backends."{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $app $serviceName }} [backends."{{ $backendName }}".servers."{{ $serverName }}"] url = "{{ $server.URL }}" diff --git a/templates/mesos.tmpl b/templates/mesos.tmpl index 2273c80ea..a2e909c2f 100644 --- a/templates/mesos.tmpl +++ b/templates/mesos.tmpl @@ -37,6 +37,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $app }} + {{if $buffering }} + [backends.backend-{{ $backendName }}.buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $tasks }} [backends.backend-{{ $backendName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" diff --git a/templates/rancher.tmpl b/templates/rancher.tmpl index 70b9e22f2..01bd12938 100644 --- a/templates/rancher.tmpl +++ b/templates/rancher.tmpl @@ -36,6 +36,16 @@ interval = "{{ $healthCheck.Interval }}" {{end}} + {{ $buffering := getBuffering $backend }} + {{if $buffering }} + [backends."backend-{{ $backendName }}".buffering] + maxRequestBodyBytes = {{ $buffering.MaxRequestBodyBytes }} + memRequestBodyBytes = {{ $buffering.MemRequestBodyBytes }} + maxResponseBodyBytes = {{ $buffering.MaxResponseBodyBytes }} + memResponseBodyBytes = {{ $buffering.MemResponseBodyBytes }} + retryExpression = "{{ $buffering.RetryExpression }}" + {{end}} + {{range $serverName, $server := getServers $backend}} [backends.backend-{{ $backendName }}.servers.{{ $serverName }}] url = "{{ $server.URL }}" diff --git a/types/types.go b/types/types.go index 0ec8a4a10..9a22dd80e 100644 --- a/types/types.go +++ b/types/types.go @@ -25,6 +25,7 @@ type Backend struct { LoadBalancer *LoadBalancer `json:"loadBalancer,omitempty"` MaxConn *MaxConn `json:"maxConn,omitempty"` HealthCheck *HealthCheck `json:"healthCheck,omitempty"` + Buffering *Buffering `json:"buffering,omitempty"` } // MaxConn holds maximum connection configuration @@ -50,6 +51,15 @@ type CircuitBreaker struct { Expression string `json:"expression,omitempty"` } +// Buffering holds request/response buffering configuration/ +type Buffering struct { + MaxRequestBodyBytes int64 `json:"maxRequestBodyBytes,omitempty"` + MemRequestBodyBytes int64 `json:"memRequestBodyBytes,omitempty"` + MaxResponseBodyBytes int64 `json:"maxResponseBodyBytes,omitempty"` + MemResponseBodyBytes int64 `json:"memResponseBodyBytes,omitempty"` + RetryExpression string `json:"retryExpression,omitempty"` +} + // HealthCheck holds HealthCheck configuration type HealthCheck struct { Path string `json:"path,omitempty"` diff --git a/vendor/github.com/mailgun/multibuf/LICENSE b/vendor/github.com/mailgun/multibuf/LICENSE new file mode 100644 index 000000000..e06d20818 --- /dev/null +++ b/vendor/github.com/mailgun/multibuf/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/mailgun/multibuf/buffer.go b/vendor/github.com/mailgun/multibuf/buffer.go new file mode 100644 index 000000000..67f10dac8 --- /dev/null +++ b/vendor/github.com/mailgun/multibuf/buffer.go @@ -0,0 +1,412 @@ +// package multibuf implements buffer optimized for streaming large chunks of data, +// multiple reads and optional partial buffering to disk. +package multibuf + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" +) + +// MultiReader provides Read, Close, Seek and Size methods. In addition to that it supports WriterTo interface +// to provide efficient writing schemes, as functions like io.Copy use WriterTo when it's available. +type MultiReader interface { + io.Reader + io.Seeker + io.Closer + io.WriterTo + + // Size calculates and returns the total size of the reader and not the length remaining. + Size() (int64, error) +} + +// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been +// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used. +type WriterOnce interface { + // Write implements io.Writer + Write(p []byte) (int, error) + // Reader transfers all data written to this writer to MultiReader. If there was no data written it retuns an error + Reader() (MultiReader, error) + // WriterOnce owns the data before Reader has been called, so Close will close all the underlying files if Reader has not been called. + Close() error +} + +// MaxBytes, ignored if set to value >=, if request exceeds the specified limit, the reader will return error, +// by default buffer is not limited, negative values mean no limit +func MaxBytes(m int64) optionSetter { + return func(o *options) error { + o.maxBytes = m + return nil + } +} + +// MemBytes specifies the largest buffer to hold in RAM before writing to disk, default is 1MB +func MemBytes(m int64) optionSetter { + return func(o *options) error { + if m < 0 { + return fmt.Errorf("MemBytes should be >= 0") + } + o.memBytes = m + return nil + } +} + +// NewWriterOnce returns io.ReadWrite compatible object that can limit the size of the buffer and persist large buffers to disk. +// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been +// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used. +// By default NewWriterOnce returns unbound buffer that will allow to write up to 1MB in RAM and will start buffering to disk +// It supports multiple functional optional arguments: +// +// // Buffer up to 1MB in RAM and limit max buffer size to 20MB +// multibuf.NewWriterOnce(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20)) +// +// +func NewWriterOnce(setters ...optionSetter) (WriterOnce, error) { + o := options{ + memBytes: DefaultMemBytes, + maxBytes: DefaultMaxBytes, + } + if o.memBytes == 0 { + o.memBytes = DefaultMemBytes + } + for _, s := range setters { + if err := s(&o); err != nil { + return nil, err + } + } + return &writerOnce{o: o}, nil +} + +// New returns MultiReader that can limit the size of the buffer and persist large buffers to disk. +// By default New returns unbound buffer that will read up to 1MB in RAM and will start buffering to disk +// It supports multiple functional optional arguments: +// +// // Buffer up to 1MB in RAM and limit max buffer size to 20MB +// multibuf.New(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20)) +// +// +func New(input io.Reader, setters ...optionSetter) (MultiReader, error) { + o := options{ + memBytes: DefaultMemBytes, + maxBytes: DefaultMaxBytes, + } + + for _, s := range setters { + if err := s(&o); err != nil { + return nil, err + } + } + if o.memBytes == 0 { + o.memBytes = DefaultMemBytes + } + if o.maxBytes > 0 && o.maxBytes < o.memBytes { + o.memBytes = o.maxBytes + } + + memReader := &io.LimitedReader{ + R: input, // Read from this reader + N: o.memBytes, // Maximum amount of data to read + } + readers := make([]io.ReadSeeker, 0, 2) + + buffer, err := ioutil.ReadAll(memReader) + if err != nil { + return nil, err + } + readers = append(readers, bytes.NewReader(buffer)) + + var file *os.File + // This means that we have exceeded all the memory capacity and we will start buffering the body to disk. + totalBytes := int64(len(buffer)) + if memReader.N <= 0 { + file, err = ioutil.TempFile("", tempFilePrefix) + if err != nil { + return nil, err + } + os.Remove(file.Name()) + + readSrc := input + if o.maxBytes > 0 { + readSrc = &maxReader{R: input, Max: o.maxBytes - o.memBytes} + } + + writtenBytes, err := io.Copy(file, readSrc) + if err != nil { + return nil, err + } + totalBytes += writtenBytes + file.Seek(0, 0) + readers = append(readers, file) + } + + var cleanupFn cleanupFunc + if file != nil { + cleanupFn = func() error { + file.Close() + return nil + } + } + return newBuf(totalBytes, cleanupFn, readers...), nil +} + +// MaxSizeReachedError is returned when the maximum allowed buffer size is reached when reading +type MaxSizeReachedError struct { + MaxSize int64 +} + +func (e *MaxSizeReachedError) Error() string { + return fmt.Sprintf("Maximum size %d was reached", e) +} + +const ( + DefaultMemBytes = 1048576 + DefaultMaxBytes = -1 + // Equivalent of bytes.MinRead used in ioutil.ReadAll + DefaultBufferBytes = 512 +) + +// Constraints: +// - Implements io.Reader +// - Implements Seek(0, 0) +// - Designed for Write once, Read many times. +type multiReaderSeek struct { + length int64 + readers []io.ReadSeeker + mr io.Reader + cleanup cleanupFunc +} + +type cleanupFunc func() error + +func newBuf(length int64, cleanup cleanupFunc, readers ...io.ReadSeeker) *multiReaderSeek { + converted := make([]io.Reader, len(readers)) + for i, r := range readers { + // This conversion is safe as ReadSeeker includes Reader + converted[i] = r.(io.Reader) + } + + return &multiReaderSeek{ + length: length, + readers: readers, + mr: io.MultiReader(converted...), + cleanup: cleanup, + } +} + +func (mr *multiReaderSeek) Close() (err error) { + if mr.cleanup != nil { + return mr.cleanup() + } + return nil +} + +func (mr *multiReaderSeek) WriteTo(w io.Writer) (int64, error) { + b := make([]byte, DefaultBufferBytes) + var total int64 + for { + n, err := mr.mr.Read(b) + // Recommended way is to always handle non 0 reads despite the errors + if n > 0 { + nw, errw := w.Write(b[:n]) + total += int64(nw) + // Write must return a non-nil error if it returns nw < n + if nw != n || errw != nil { + return total, errw + } + } + if err != nil { + if err == io.EOF { + return total, nil + } + return total, err + } + } +} + +func (mr *multiReaderSeek) Read(p []byte) (n int, err error) { + return mr.mr.Read(p) +} + +func (mr *multiReaderSeek) Size() (int64, error) { + return mr.length, nil +} + +func (mr *multiReaderSeek) Seek(offset int64, whence int) (int64, error) { + // TODO: implement other whence + // TODO: implement real offsets + + if whence != 0 { + return 0, fmt.Errorf("multiReaderSeek: unsupported whence") + } + + if offset != 0 { + return 0, fmt.Errorf("multiReaderSeek: unsupported offset") + } + + for _, seeker := range mr.readers { + seeker.Seek(0, 0) + } + + ior := make([]io.Reader, len(mr.readers)) + for i, arg := range mr.readers { + ior[i] = arg.(io.Reader) + } + mr.mr = io.MultiReader(ior...) + + return 0, nil +} + +type options struct { + // MemBufferBytes sets up the size of the memory buffer for this request. + // If the data size exceeds the limit, the remaining request part will be saved on the file system. + memBytes int64 + + maxBytes int64 +} + +type optionSetter func(o *options) error + +// MaxReader does not allow to read more than Max bytes and returns error if this limit has been exceeded. +type maxReader struct { + R io.Reader // underlying reader + N int64 // bytes read + Max int64 // max bytes to read +} + +func (r *maxReader) Read(p []byte) (int, error) { + readBytes, err := r.R.Read(p) + if err != nil && err != io.EOF { + return readBytes, err + } + + r.N += int64(readBytes) + if r.N > r.Max { + return readBytes, &MaxSizeReachedError{MaxSize: r.Max} + } + return readBytes, err +} + +const ( + writerInit = iota + writerMem + writerFile + writerCalledRead + writerErr +) + +type writerOnce struct { + o options + err error + state int + mem *bytes.Buffer + file *os.File + total int64 + cleanupFn cleanupFunc +} + +// how many bytes we can still write to memory +func (w *writerOnce) writeToMem(p []byte) int { + left := w.o.memBytes - w.total + if left <= 0 { + return 0 + } + bufLen := len(p) + if int64(bufLen) < left { + return bufLen + } + return int(left) +} + +func (w *writerOnce) Write(p []byte) (int, error) { + out, err := w.write(p) + return out, err +} + +func (w *writerOnce) Close() error { + if w.file != nil { + return w.file.Close() + } + return nil +} + +func (w *writerOnce) write(p []byte) (int, error) { + if w.o.maxBytes > 0 && int64(len(p))+w.total > w.o.maxBytes { + return 0, fmt.Errorf("total size of %d exceeded allowed %d", int64(len(p))+w.total, w.o.maxBytes) + } + switch w.state { + case writerCalledRead: + return 0, fmt.Errorf("can not write after reader has been called") + case writerInit: + w.mem = &bytes.Buffer{} + w.state = writerMem + fallthrough + case writerMem: + writeToMem := w.writeToMem(p) + if writeToMem > 0 { + wrote, err := w.mem.Write(p[:writeToMem]) + w.total += int64(wrote) + if err != nil { + return wrote, err + } + } + left := len(p) - writeToMem + if left <= 0 { + return len(p), nil + } + // we can't write to memory any more, switch to file + if err := w.initFile(); err != nil { + return int(writeToMem), err + } + w.state = writerFile + wrote, err := w.file.Write(p[writeToMem:]) + w.total += int64(wrote) + return len(p), err + case writerFile: + wrote, err := w.file.Write(p) + w.total += int64(wrote) + return wrote, err + } + return 0, fmt.Errorf("unsupported state: %d", w.state) +} + +func (w *writerOnce) initFile() error { + file, err := ioutil.TempFile("", tempFilePrefix) + if err != nil { + return err + } + w.file = file + w.cleanupFn = func() error { + file.Close() + os.Remove(file.Name()) + return nil + } + return nil +} + +func (w *writerOnce) Reader() (MultiReader, error) { + switch w.state { + case writerInit: + return nil, fmt.Errorf("no data ready") + case writerCalledRead: + return nil, fmt.Errorf("reader has been called") + case writerMem: + w.state = writerCalledRead + return newBuf(w.total, nil, bytes.NewReader(w.mem.Bytes())), nil + case writerFile: + _, err := w.file.Seek(0, 0) + if err != nil { + return nil, err + } + // we are not responsible for file and buffer any more + w.state = writerCalledRead + br, fr := bytes.NewReader(w.mem.Bytes()), w.file + w.file = nil + w.mem = nil + return newBuf(w.total, w.cleanupFn, br, fr), nil + } + return nil, fmt.Errorf("unsupported state: %d\n", w.state) +} + +const tempFilePrefix = "temp-multibuf-" diff --git a/vendor/github.com/vulcand/oxy/buffer/buffer.go b/vendor/github.com/vulcand/oxy/buffer/buffer.go new file mode 100644 index 000000000..6d00375c6 --- /dev/null +++ b/vendor/github.com/vulcand/oxy/buffer/buffer.go @@ -0,0 +1,398 @@ +/* +Package buffer provides http.Handler middleware that solves several problems when dealing with http requests: + +Reads the entire request and response into buffer, optionally buffering it to disk for large requests. +Checks the limits for the requests and responses, rejecting in case if the limit was exceeded. +Changes request content-transfer-encoding from chunked and provides total size to the handlers. + +Examples of a buffering middleware: + + // sample HTTP handler + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("hello")) + }) + + // Buffer will read the body in buffer before passing the request to the handler + // calculate total size of the request and transform it from chunked encoding + // before passing to the server + buffer.New(handler) + + // This version will buffer up to 2MB in memory and will serialize any extra + // to a temporary file, if the request size exceeds 10MB it will reject the request + buffer.New(handler, + buffer.MemRequestBodyBytes(2 * 1024 * 1024), + buffer.MaxRequestBodyBytes(10 * 1024 * 1024)) + + // Will do the same as above, but with responses + buffer.New(handler, + buffer.MemResponseBodyBytes(2 * 1024 * 1024), + buffer.MaxResponseBodyBytes(10 * 1024 * 1024)) + + // Buffer will replay the request if the handler returns error at least 3 times + // before returning the response + buffer.New(handler, buffer.Retry(`IsNetworkError() && Attempts() <= 2`)) + +*/ +package buffer + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + + "bufio" + "net" + "reflect" + + "github.com/mailgun/multibuf" + log "github.com/sirupsen/logrus" + "github.com/vulcand/oxy/utils" +) + +const ( + // DefaultMemBodyBytes Store up to 1MB in RAM + DefaultMemBodyBytes = 1048576 + // DefaultMaxBodyBytes No limit by default + DefaultMaxBodyBytes = -1 + // DefaultMaxRetryAttempts Maximum retry attempts + DefaultMaxRetryAttempts = 10 +) + +var errHandler utils.ErrorHandler = &SizeErrHandler{} + +// Buffer is responsible for buffering requests and responses +// It buffers large requests and responses to disk, +type Buffer struct { + maxRequestBodyBytes int64 + memRequestBodyBytes int64 + + maxResponseBodyBytes int64 + memResponseBodyBytes int64 + + retryPredicate hpredicate + + next http.Handler + errHandler utils.ErrorHandler +} + +// New returns a new buffer middleware. New() function supports optional functional arguments +func New(next http.Handler, setters ...optSetter) (*Buffer, error) { + strm := &Buffer{ + next: next, + + maxRequestBodyBytes: DefaultMaxBodyBytes, + memRequestBodyBytes: DefaultMemBodyBytes, + + maxResponseBodyBytes: DefaultMaxBodyBytes, + memResponseBodyBytes: DefaultMemBodyBytes, + } + for _, s := range setters { + if err := s(strm); err != nil { + return nil, err + } + } + if strm.errHandler == nil { + strm.errHandler = errHandler + } + + return strm, nil +} + +type optSetter func(s *Buffer) error + +// Retry provides a predicate that allows buffer middleware to replay the request +// if it matches certain condition, e.g. returns special error code. Available functions are: +// +// Attempts() - limits the amount of retry attempts +// ResponseCode() - returns http response code +// IsNetworkError() - tests if response code is related to networking error +// +// Example of the predicate: +// +// `Attempts() <= 2 && ResponseCode() == 502` +// +func Retry(predicate string) optSetter { + return func(s *Buffer) error { + p, err := parseExpression(predicate) + if err != nil { + return err + } + s.retryPredicate = p + return nil + } +} + +// ErrorHandler sets error handler of the server +func ErrorHandler(h utils.ErrorHandler) optSetter { + return func(s *Buffer) error { + s.errHandler = h + return nil + } +} + +// MaxRequestBodyBytes sets the maximum request body size in bytes +func MaxRequestBodyBytes(m int64) optSetter { + return func(s *Buffer) error { + if m < 0 { + return fmt.Errorf("max bytes should be >= 0 got %d", m) + } + s.maxRequestBodyBytes = m + return nil + } +} + +// MaxRequestBody bytes sets the maximum request body to be stored in memory +// buffer middleware will serialize the excess to disk. +func MemRequestBodyBytes(m int64) optSetter { + return func(s *Buffer) error { + if m < 0 { + return fmt.Errorf("mem bytes should be >= 0 got %d", m) + } + s.memRequestBodyBytes = m + return nil + } +} + +// MaxResponseBodyBytes sets the maximum request body size in bytes +func MaxResponseBodyBytes(m int64) optSetter { + return func(s *Buffer) error { + if m < 0 { + return fmt.Errorf("max bytes should be >= 0 got %d", m) + } + s.maxResponseBodyBytes = m + return nil + } +} + +// MemResponseBodyBytes sets the maximum request body to be stored in memory +// buffer middleware will serialize the excess to disk. +func MemResponseBodyBytes(m int64) optSetter { + return func(s *Buffer) error { + if m < 0 { + return fmt.Errorf("mem bytes should be >= 0 got %d", m) + } + s.memResponseBodyBytes = m + return nil + } +} + +// Wrap sets the next handler to be called by buffer handler. +func (s *Buffer) Wrap(next http.Handler) error { + s.next = next + return nil +} + +func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if log.GetLevel() >= log.DebugLevel { + logEntry := log.WithField("Request", utils.DumpHttpRequest(req)) + logEntry.Debug("vulcand/oxy/buffer: begin ServeHttp on request") + defer logEntry.Debug("vulcand/oxy/buffer: competed ServeHttp on request") + } + + if err := s.checkLimit(req); err != nil { + log.Errorf("vulcand/oxy/buffer: request body over limit, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + + // Read the body while keeping limits in mind. This reader controls the maximum bytes + // to read into memory and disk. This reader returns an error if the total request size exceeds the + // prefefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1 + // and the reader would be unbounded bufio in the http.Server + body, err := multibuf.New(req.Body, multibuf.MaxBytes(s.maxRequestBodyBytes), multibuf.MemBytes(s.memRequestBodyBytes)) + if err != nil || body == nil { + log.Errorf("vulcand/oxy/buffer: error when reading request body, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + + // Set request body to buffered reader that can replay the read and execute Seek + // Note that we don't change the original request body as it's handled by the http server + // and we don'w want to mess with standard library + defer func() { + if body != nil { + errClose := body.Close() + if errClose != nil { + log.Errorf("vulcand/oxy/buffer: failed to close body, err: %v", errClose) + } + } + }() + + // We need to set ContentLength based on known request size. The incoming request may have been + // set without content length or using chunked TransferEncoding + totalSize, err := body.Size() + if err != nil { + log.Errorf("vulcand/oxy/buffer: failed to get request size, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + + if totalSize == 0 { + body = nil + } + + outreq := s.copyRequest(req, body, totalSize) + + attempt := 1 + for { + // We create a special writer that will limit the response size, buffer it to disk if necessary + writer, err := multibuf.NewWriterOnce(multibuf.MaxBytes(s.maxResponseBodyBytes), multibuf.MemBytes(s.memResponseBodyBytes)) + if err != nil { + log.Errorf("vulcand/oxy/buffer: failed create response writer, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + + // We are mimicking http.ResponseWriter to replace writer with our special writer + b := &bufferWriter{ + header: make(http.Header), + buffer: writer, + responseWriter: w, + } + defer b.Close() + + s.next.ServeHTTP(b, outreq) + if b.hijacked { + log.Infof("vulcand/oxy/buffer: connection was hijacked downstream. Not taking any action in buffer.") + return + } + + var reader multibuf.MultiReader + if b.expectBody(outreq) { + rdr, err := writer.Reader() + if err != nil { + log.Errorf("vulcand/oxy/buffer: failed to read response, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + defer rdr.Close() + reader = rdr + } + + if (s.retryPredicate == nil || attempt > DefaultMaxRetryAttempts) || + !s.retryPredicate(&context{r: req, attempt: attempt, responseCode: b.code}) { + utils.CopyHeaders(w.Header(), b.Header()) + w.WriteHeader(b.code) + if reader != nil { + io.Copy(w, reader) + } + return + } + + attempt += 1 + if body != nil { + if _, err := body.Seek(0, 0); err != nil { + log.Errorf("vulcand/oxy/buffer: failed to rewind response body, err: %v", err) + s.errHandler.ServeHTTP(w, req, err) + return + } + } + + outreq = s.copyRequest(req, body, totalSize) + log.Infof("vulcand/oxy/buffer: retry Request(%v %v) attempt %v", req.Method, req.URL, attempt) + } +} + +func (s *Buffer) copyRequest(req *http.Request, body io.ReadCloser, bodySize int64) *http.Request { + o := *req + o.URL = utils.CopyURL(req.URL) + o.Header = make(http.Header) + utils.CopyHeaders(o.Header, req.Header) + o.ContentLength = bodySize + // remove TransferEncoding that could have been previously set because we have transformed the request from chunked encoding + o.TransferEncoding = []string{} + // http.Transport will close the request body on any error, we are controlling the close process ourselves, so we override the closer here + if body == nil { + o.Body = nil + } else { + o.Body = ioutil.NopCloser(body.(io.Reader)) + } + return &o +} + +func (s *Buffer) checkLimit(req *http.Request) error { + if s.maxRequestBodyBytes <= 0 { + return nil + } + if req.ContentLength > s.maxRequestBodyBytes { + return &multibuf.MaxSizeReachedError{MaxSize: s.maxRequestBodyBytes} + } + return nil +} + +type bufferWriter struct { + header http.Header + code int + buffer multibuf.WriterOnce + responseWriter http.ResponseWriter + hijacked bool +} + +// RFC2616 #4.4 +func (b *bufferWriter) expectBody(r *http.Request) bool { + if r.Method == "HEAD" { + return false + } + if (b.code >= 100 && b.code < 200) || b.code == 204 || b.code == 304 { + return false + } + if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" { + return false + } + if b.header.Get("Content-Length") == "0" { + return false + } + return true +} + +func (b *bufferWriter) Close() error { + return b.buffer.Close() +} + +func (b *bufferWriter) Header() http.Header { + return b.header +} + +func (b *bufferWriter) Write(buf []byte) (int, error) { + return b.buffer.Write(buf) +} + +// WriteHeader sets rw.Code. +func (b *bufferWriter) WriteHeader(code int) { + b.code = code +} + +//CloseNotifier interface - this allows downstream connections to be terminated when the client terminates. +func (b *bufferWriter) CloseNotify() <-chan bool { + if cn, ok := b.responseWriter.(http.CloseNotifier); ok { + return cn.CloseNotify() + } + log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(b.responseWriter)) + return make(<-chan bool) +} + +//This allows connections to be hijacked for websockets for instance. +func (b *bufferWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if hi, ok := b.responseWriter.(http.Hijacker); ok { + conn, rw, err := hi.Hijack() + if err != nil { + b.hijacked = true + } + return conn, rw, err + } + log.Warningf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(b.responseWriter)) + return nil, nil, fmt.Errorf("The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(b.responseWriter)) +} + +type SizeErrHandler struct { +} + +func (e *SizeErrHandler) ServeHTTP(w http.ResponseWriter, req *http.Request, err error) { + if _, ok := err.(*multibuf.MaxSizeReachedError); ok { + w.WriteHeader(http.StatusRequestEntityTooLarge) + w.Write([]byte(http.StatusText(http.StatusRequestEntityTooLarge))) + return + } + utils.DefaultHandler.ServeHTTP(w, req, err) +} diff --git a/vendor/github.com/vulcand/oxy/buffer/threshold.go b/vendor/github.com/vulcand/oxy/buffer/threshold.go new file mode 100644 index 000000000..1294bcd60 --- /dev/null +++ b/vendor/github.com/vulcand/oxy/buffer/threshold.go @@ -0,0 +1,225 @@ +package buffer + +import ( + "fmt" + "net/http" + + "github.com/vulcand/predicate" +) + +func IsValidExpression(expr string) bool { + _, err := parseExpression(expr) + return err == nil +} + +type context struct { + r *http.Request + attempt int + responseCode int +} + +type hpredicate func(*context) bool + +// Parses expression in the go language into Failover predicates +func parseExpression(in string) (hpredicate, error) { + p, err := predicate.NewParser(predicate.Def{ + Operators: predicate.Operators{ + AND: and, + OR: or, + EQ: eq, + NEQ: neq, + LT: lt, + GT: gt, + LE: le, + GE: ge, + }, + Functions: map[string]interface{}{ + "RequestMethod": requestMethod, + "IsNetworkError": isNetworkError, + "Attempts": attempts, + "ResponseCode": responseCode, + }, + }) + if err != nil { + return nil, err + } + out, err := p.Parse(in) + if err != nil { + return nil, err + } + pr, ok := out.(hpredicate) + if !ok { + return nil, fmt.Errorf("expected predicate, got %T", out) + } + return pr, nil +} + +type toString func(c *context) string +type toInt func(c *context) int + +// RequestMethod returns mapper of the request to its method e.g. POST +func requestMethod() toString { + return func(c *context) string { + return c.r.Method + } +} + +// Attempts returns mapper of the request to the number of proxy attempts +func attempts() toInt { + return func(c *context) int { + return c.attempt + } +} + +// ResponseCode returns mapper of the request to the last response code, returns 0 if there was no response code. +func responseCode() toInt { + return func(c *context) int { + return c.responseCode + } +} + +// IsNetworkError returns a predicate that returns true if last attempt ended with network error. +func isNetworkError() hpredicate { + return func(c *context) bool { + return c.responseCode == http.StatusBadGateway || c.responseCode == http.StatusGatewayTimeout + } +} + +// and returns predicate by joining the passed predicates with logical 'and' +func and(fns ...hpredicate) hpredicate { + return func(c *context) bool { + for _, fn := range fns { + if !fn(c) { + return false + } + } + return true + } +} + +// or returns predicate by joining the passed predicates with logical 'or' +func or(fns ...hpredicate) hpredicate { + return func(c *context) bool { + for _, fn := range fns { + if fn(c) { + return true + } + } + return false + } +} + +// not creates negation of the passed predicate +func not(p hpredicate) hpredicate { + return func(c *context) bool { + return !p(c) + } +} + +// eq returns predicate that tests for equality of the value of the mapper and the constant +func eq(m interface{}, value interface{}) (hpredicate, error) { + switch mapper := m.(type) { + case toString: + return stringEQ(mapper, value) + case toInt: + return intEQ(mapper, value) + } + return nil, fmt.Errorf("unsupported argument: %T", m) +} + +// neq returns predicate that tests for inequality of the value of the mapper and the constant +func neq(m interface{}, value interface{}) (hpredicate, error) { + p, err := eq(m, value) + if err != nil { + return nil, err + } + return not(p), nil +} + +// lt returns predicate that tests that value of the mapper function is less than the constant +func lt(m interface{}, value interface{}) (hpredicate, error) { + switch mapper := m.(type) { + case toInt: + return intLT(mapper, value) + } + return nil, fmt.Errorf("unsupported argument: %T", m) +} + +// le returns predicate that tests that value of the mapper function is less or equal than the constant +func le(m interface{}, value interface{}) (hpredicate, error) { + l, err := lt(m, value) + if err != nil { + return nil, err + } + e, err := eq(m, value) + if err != nil { + return nil, err + } + return func(c *context) bool { + return l(c) || e(c) + }, nil +} + +// gt returns predicate that tests that value of the mapper function is greater than the constant +func gt(m interface{}, value interface{}) (hpredicate, error) { + switch mapper := m.(type) { + case toInt: + return intGT(mapper, value) + } + return nil, fmt.Errorf("unsupported argument: %T", m) +} + +// ge returns predicate that tests that value of the mapper function is less or equal than the constant +func ge(m interface{}, value interface{}) (hpredicate, error) { + g, err := gt(m, value) + if err != nil { + return nil, err + } + e, err := eq(m, value) + if err != nil { + return nil, err + } + return func(c *context) bool { + return g(c) || e(c) + }, nil +} + +func stringEQ(m toString, val interface{}) (hpredicate, error) { + value, ok := val.(string) + if !ok { + return nil, fmt.Errorf("expected string, got %T", val) + } + return func(c *context) bool { + return m(c) == value + }, nil +} + +func intEQ(m toInt, val interface{}) (hpredicate, error) { + value, ok := val.(int) + if !ok { + return nil, fmt.Errorf("expected int, got %T", val) + } + return func(c *context) bool { + return m(c) == value + }, nil +} + +func intLT(m toInt, val interface{}) (hpredicate, error) { + value, ok := val.(int) + if !ok { + return nil, fmt.Errorf("expected int, got %T", val) + } + return func(c *context) bool { + return m(c) < value + }, nil +} + +func intGT(m toInt, val interface{}) (hpredicate, error) { + value, ok := val.(int) + if !ok { + return nil, fmt.Errorf("expected int, got %T", val) + } + return func(c *context) bool { + return m(c) > value + }, nil +}