1
0
Fork 0

Merge 'v2.1' into master

This commit is contained in:
Fernandez Ludovic 2020-02-29 00:13:44 +01:00
commit e9d0a16a3b
67 changed files with 827 additions and 329 deletions

View file

@ -6,7 +6,7 @@ import (
"net/http"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/go-acme/lego/v3/challenge"

View file

@ -184,15 +184,19 @@ func (p *Provider) addServerTCP(ctx context.Context, item itemData, loadBalancer
return errors.New("load-balancer is not defined")
}
var port string
if len(loadBalancer.Servers) > 0 {
port = loadBalancer.Servers[0].Port
}
if len(loadBalancer.Servers) == 0 {
loadBalancer.Servers = []dynamic.TCPServer{{}}
}
var port string
if item.Port != "" {
if item.Port != "" && port == "" {
port = item.Port
loadBalancer.Servers[0].Port = ""
}
loadBalancer.Servers[0].Port = ""
if port == "" {
return errors.New("port is missing")
@ -250,10 +254,10 @@ func (p *Provider) addServer(ctx context.Context, item itemData, loadBalancer *d
loadBalancer.Servers = []dynamic.Server{server}
}
if item.Port != "" {
if item.Port != "" && port == "" {
port = item.Port
loadBalancer.Servers[0].Port = ""
}
loadBalancer.Servers[0].Port = ""
if port == "" {
return errors.New("port is missing")

View file

@ -1479,7 +1479,7 @@ func Test_buildConfiguration(t *testing.T) {
LoadBalancer: &dynamic.ServersLoadBalancer{
Servers: []dynamic.Server{
{
URL: "h2c://127.0.0.1:80",
URL: "h2c://127.0.0.1:8080",
},
},
PassHostHeader: Bool(true),
@ -1531,7 +1531,7 @@ func Test_buildConfiguration(t *testing.T) {
LoadBalancer: &dynamic.ServersLoadBalancer{
Servers: []dynamic.Server{
{
URL: "http://127.0.0.1:80",
URL: "http://127.0.0.1:8080",
},
},
PassHostHeader: Bool(true),

View file

@ -4,14 +4,16 @@ import (
"context"
"fmt"
"strconv"
"strings"
"text/template"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/provider"
"github.com/containous/traefik/v2/pkg/provider/constraints"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/containous/traefik/v2/pkg/types"
"github.com/hashicorp/consul/api"
@ -151,7 +153,7 @@ func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error
}
var data []itemData
for name := range consulServiceNames {
for _, name := range consulServiceNames {
consulServices, healthServices, err := p.fetchService(ctx, name)
if err != nil {
return nil, err
@ -204,10 +206,55 @@ func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.Catalo
return consulServices, healthServices, err
}
func (p *Provider) fetchServices(ctx context.Context) (map[string][]string, error) {
func (p *Provider) fetchServices(ctx context.Context) ([]string, error) {
// The query option "Filter" is not supported by /catalog/services.
// https://www.consul.io/api/catalog.html#list-services
opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.RequireConsistent, UseCache: p.Cache}
serviceNames, _, err := p.client.Catalog().Services(opts)
return serviceNames, err
if err != nil {
return nil, err
}
// The keys are the service names, and the array values provide all known tags for a given service.
// https://www.consul.io/api/catalog.html#list-services
var filtered []string
for svcName, tags := range serviceNames {
logger := log.FromContext(log.With(ctx, log.Str("serviceName", svcName)))
if !p.ExposedByDefault && !contains(tags, p.Prefix+".enable=true") {
logger.Debug("Filtering disabled item")
continue
}
if contains(tags, p.Prefix+".enable=false") {
logger.Debug("Filtering disabled item")
continue
}
matches, err := constraints.MatchTags(tags, p.Constraints)
if err != nil {
logger.Errorf("Error matching constraints expression: %v", err)
continue
}
if !matches {
logger.Debugf("Container pruned by constraint expression: %q", p.Constraints)
continue
}
filtered = append(filtered, svcName)
}
return filtered, err
}
func contains(values []string, val string) bool {
for _, value := range values {
if strings.EqualFold(value, val) {
return true
}
}
return false
}
func createClient(cfg *EndpointConfig) (*api.Client, error) {

View file

@ -11,7 +11,7 @@ import (
"text/template"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"

View file

@ -110,6 +110,19 @@ func (p *Provider) addWatcher(pool *safe.Pool, directory string, configurationCh
case <-ctx.Done():
return
case evt := <-watcher.Events:
if evt.Op == fsnotify.Remove {
err = watcher.Remove(evt.Name)
if err != nil {
log.WithoutContext().WithField(log.ProviderName, providerName).
Errorf("Could not remove watcher for %s: %s", directory, err)
}
err = watcher.Add(directory)
if err != nil {
log.WithoutContext().WithField(log.ProviderName, providerName).
Errorf("Could not re-add watcher for %s: %s", directory, err)
}
}
if p.Directory == "" {
_, evtFileName := filepath.Split(evt.Name)
_, confFileName := filepath.Split(p.Filename)

View file

@ -169,6 +169,7 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factoriesCrd[ns] = factoryCrd
c.factoriesKube[ns] = factoryKube
@ -193,15 +194,6 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
}
}
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
for _, ns := range namespaces {
c.factoriesKube[ns].Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factoriesKube[ns].Start(stopCh)
}
return eventCh, nil
}

View file

@ -12,7 +12,7 @@ import (
"strings"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"

View file

@ -100,7 +100,7 @@ func (p *Provider) loadIngressRouteConfiguration(ctx context.Context, client Cli
errBuild := cb.buildServicesLB(ctx, ingressRoute.Namespace, spec, serviceName, conf.Services)
if errBuild != nil {
logger.Error(err)
logger.Error(errBuild)
continue
}
} else if len(route.Services) == 1 {
@ -307,9 +307,9 @@ func (c configBuilder) loadServers(fallbackNamespace string, svc v1alpha1.LoadBa
var servers []dynamic.Server
if service.Spec.Type == corev1.ServiceTypeExternalName {
protocol := "http"
if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, "https") {
protocol = "https"
protocol, err := parseServiceProtocol(svc.Scheme, portSpec.Name, portSpec.Port)
if err != nil {
return nil, err
}
return append(servers, dynamic.Server{
@ -341,17 +341,9 @@ func (c configBuilder) loadServers(fallbackNamespace string, svc v1alpha1.LoadBa
return nil, fmt.Errorf("cannot define a port for %s/%s", namespace, sanitizedName)
}
protocol := httpProtocol
scheme := svc.Scheme
switch scheme {
case httpProtocol, httpsProtocol, "h2c":
protocol = scheme
case "":
if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, httpsProtocol) {
protocol = httpsProtocol
}
default:
return nil, fmt.Errorf("invalid scheme %q specified", scheme)
protocol, err := parseServiceProtocol(svc.Scheme, portSpec.Name, portSpec.Port)
if err != nil {
return nil, err
}
for _, addr := range subset.Addresses {
@ -448,3 +440,19 @@ func getTLSHTTP(ctx context.Context, ingressRoute *v1alpha1.IngressRoute, k8sCli
return nil
}
// parseServiceProtocol parses the scheme, port name, and number to determine the correct protocol.
// an error is returned if the scheme provided is invalid.
func parseServiceProtocol(providedScheme string, portName string, portNumber int32) (string, error) {
switch providedScheme {
case httpProtocol, httpsProtocol, "h2c":
return providedScheme, nil
case "":
if portNumber == 443 || strings.HasPrefix(portName, httpsProtocol) {
return httpsProtocol, nil
}
return httpProtocol, nil
}
return "", fmt.Errorf("invalid scheme %q specified", providedScheme)
}

View file

@ -3186,3 +3186,72 @@ func TestLoadIngressRouteUDPs(t *testing.T) {
})
}
}
func TestParseServiceProtocol(t *testing.T) {
testCases := []struct {
desc string
scheme string
portName string
portNumber int32
expected string
expectedError bool
}{
{
desc: "Empty scheme and name",
scheme: "",
portName: "",
portNumber: 1000,
expected: "http",
},
{
desc: "h2c scheme and emptyname",
scheme: "h2c",
portName: "",
portNumber: 1000,
expected: "h2c",
},
{
desc: "invalid scheme",
scheme: "foo",
portName: "",
portNumber: 1000,
expectedError: true,
},
{
desc: "Empty scheme and https name",
scheme: "",
portName: "https-secure",
portNumber: 1000,
expected: "https",
},
{
desc: "Empty scheme and port number",
scheme: "",
portName: "",
portNumber: 443,
expected: "https",
},
{
desc: "https scheme",
scheme: "https",
portName: "",
portNumber: 1000,
expected: "https",
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
protocol, err := parseServiceProtocol(test.scheme, test.portName, test.portNumber)
if test.expectedError {
assert.Error(t, err)
} else {
assert.Equal(t, test.expected, protocol)
}
})
}
}

View file

@ -138,6 +138,7 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factories[ns] = factory
}
@ -153,15 +154,6 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
}
}
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
for _, ns := range namespaces {
c.factories[ns].Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factories[ns].Start(stopCh)
}
return eventCh, nil
}

View file

@ -11,7 +11,7 @@ import (
"strings"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"

View file

@ -13,7 +13,7 @@ import (
etcdv3 "github.com/abronan/valkeyrie/store/etcd/v3"
"github.com/abronan/valkeyrie/store/redis"
"github.com/abronan/valkeyrie/store/zookeeper"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/kv"
"github.com/containous/traefik/v2/pkg/job"

View file

@ -9,7 +9,7 @@ import (
"text/template"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"

View file

@ -6,7 +6,7 @@ import (
"text/template"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/cenkalti/backoff/v4"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"