1
0
Fork 0

Create Global Backend Ingress

This commit is contained in:
Daniel Tomcej 2018-07-03 10:58:03 -06:00 committed by Traefiker Bot
parent 41eb4f1c70
commit 461ebf6d88
5 changed files with 298 additions and 4 deletions

View file

@ -42,6 +42,33 @@ func iRules(opts ...func(*extensionsv1beta1.IngressSpec)) func(*extensionsv1beta
}
}
func iSpecBackends(opts ...func(*extensionsv1beta1.IngressSpec)) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
s := &extensionsv1beta1.IngressSpec{}
for _, opt := range opts {
opt(s)
}
i.Spec = *s
}
}
func iSpecBackend(opts ...func(*extensionsv1beta1.IngressBackend)) func(*extensionsv1beta1.IngressSpec) {
return func(s *extensionsv1beta1.IngressSpec) {
p := &extensionsv1beta1.IngressBackend{}
for _, opt := range opts {
opt(p)
}
s.Backend = p
}
}
func iIngressBackend(name string, port intstr.IntOrString) func(*extensionsv1beta1.IngressBackend) {
return func(p *extensionsv1beta1.IngressBackend) {
p.ServiceName = name
p.ServicePort = port
}
}
func iRule(opts ...func(*extensionsv1beta1.IngressRule)) func(*extensionsv1beta1.IngressSpec) {
return func(spec *extensionsv1beta1.IngressSpec) {
r := &extensionsv1beta1.IngressRule{}

View file

@ -36,6 +36,8 @@ const (
ruleTypeReplacePath = "ReplacePath"
traefikDefaultRealm = "traefik"
traefikDefaultIngressClass = "traefik"
defaultBackendName = "global-default-backend"
defaultFrontendName = "global-default-frontend"
)
// IngressEndpoint holds the endpoint information for the Kubernetes provider
@ -164,7 +166,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) {
ingresses := k8sClient.GetIngresses()
templateObjects := types.Configuration{
templateObjects := &types.Configuration{
Backends: map[string]*types.Backend{},
Frontends: map[string]*types.Frontend{},
}
@ -184,6 +186,14 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
}
templateObjects.TLS = append(templateObjects.TLS, tlsSection...)
if i.Spec.Backend != nil {
err := p.addGlobalBackend(k8sClient, i, templateObjects)
if err != nil {
log.Errorf("Error creating global backend for ingress %s/%s: %v", i.Namespace, i.Name, err)
continue
}
}
var weightAllocator weightAllocator = &defaultWeightAllocator{}
annotationPercentageWeights := getAnnotationName(i.Annotations, annotationKubernetesServiceWeights)
if _, ok := i.Annotations[annotationPercentageWeights]; ok {
@ -351,7 +361,7 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
log.Errorf("Cannot update Ingress %s/%s due to error: %v", i.Namespace, i.Name, err)
}
}
return &templateObjects, nil
return templateObjects, nil
}
func (p *Provider) updateIngressStatus(i *extensionsv1beta1.Ingress, k8sClient Client) error {
@ -401,6 +411,100 @@ func (p *Provider) loadConfig(templateObjects types.Configuration) *types.Config
return configuration
}
func (p *Provider) addGlobalBackend(cl Client, i *extensionsv1beta1.Ingress, templateObjects *types.Configuration) error {
// Ensure that we are not duplicating the frontend
if _, exists := templateObjects.Frontends[defaultFrontendName]; exists {
return errors.New("duplicate frontend: " + defaultFrontendName)
}
// Ensure we are not duplicating the backend
if _, exists := templateObjects.Backends[defaultBackendName]; exists {
return errors.New("duplicate backend: " + defaultBackendName)
}
templateObjects.Backends[defaultBackendName] = &types.Backend{
Servers: make(map[string]types.Server),
LoadBalancer: &types.LoadBalancer{
Method: "wrr",
},
}
service, exists, err := cl.GetService(i.Namespace, i.Spec.Backend.ServiceName)
if err != nil {
return fmt.Errorf("error while retrieving service information from k8s API %s/%s: %v", i.Namespace, i.Spec.Backend.ServiceName, err)
}
if !exists {
return fmt.Errorf("service not found for %s/%s", i.Namespace, i.Spec.Backend.ServiceName)
}
templateObjects.Backends[defaultBackendName].CircuitBreaker = getCircuitBreaker(service)
templateObjects.Backends[defaultBackendName].LoadBalancer = getLoadBalancer(service)
templateObjects.Backends[defaultBackendName].MaxConn = getMaxConn(service)
templateObjects.Backends[defaultBackendName].Buffering = getBuffering(service)
endpoints, exists, err := cl.GetEndpoints(service.Namespace, service.Name)
if err != nil {
return fmt.Errorf("error retrieving endpoint information from k8s API %s/%s: %v", service.Namespace, service.Name, err)
}
if !exists {
return fmt.Errorf("endpoints not found for %s/%s", service.Namespace, service.Name)
}
if len(endpoints.Subsets) == 0 {
return fmt.Errorf("endpoints not available for %s/%s", service.Namespace, service.Name)
}
for _, subset := range endpoints.Subsets {
endpointPort := endpointPortNumber(corev1.ServicePort{Protocol: "TCP", Port: int32(i.Spec.Backend.ServicePort.IntValue())}, subset.Ports)
if endpointPort == 0 {
// endpoint port does not match service.
continue
}
protocol := "http"
for _, address := range subset.Addresses {
if endpointPort == 443 || strings.HasPrefix(i.Spec.Backend.ServicePort.String(), "https") {
protocol = "https"
}
url := fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address.IP, strconv.FormatInt(int64(endpointPort), 10)))
name := url
if address.TargetRef != nil && address.TargetRef.Name != "" {
name = address.TargetRef.Name
}
templateObjects.Backends[defaultBackendName].Servers[name] = types.Server{
URL: url,
Weight: label.DefaultWeight,
}
}
}
passHostHeader := getBoolValue(i.Annotations, annotationKubernetesPreserveHost, !p.DisablePassHostHeaders)
passTLSCert := getBoolValue(i.Annotations, annotationKubernetesPassTLSCert, p.EnablePassTLSCert)
priority := getIntValue(i.Annotations, annotationKubernetesPriority, 0)
entryPoints := getSliceStringValue(i.Annotations, annotationKubernetesFrontendEntryPoints)
templateObjects.Frontends[defaultFrontendName] = &types.Frontend{
Backend: defaultBackendName,
PassHostHeader: passHostHeader,
PassTLSCert: passTLSCert,
Routes: make(map[string]types.Route),
Priority: priority,
WhiteList: getWhiteList(i),
Redirect: getFrontendRedirect(i),
EntryPoints: entryPoints,
Headers: getHeader(i),
Errors: getErrorPages(i),
RateLimit: getRateLimit(i),
}
templateObjects.Frontends[defaultFrontendName].Routes["/"] = types.Route{
Rule: "PathPrefix:/",
}
return nil
}
func getRuleForPath(pa extensionsv1beta1.HTTPIngressPath, i *extensionsv1beta1.Ingress) (string, error) {
if len(pa.Path) == 0 {
return "", nil

View file

@ -221,6 +221,134 @@ func TestLoadIngresses(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestLoadGlobalIngressWithPortNumbers(t *testing.T) {
ingresses := []*extensionsv1beta1.Ingress{
buildIngress(
iNamespace("testing"),
iSpecBackends(iSpecBackend(iIngressBackend("service1", intstr.FromInt(80)))),
),
}
services := []*corev1.Service{
buildService(
sName("service1"),
sNamespace("testing"),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(80, ""))),
),
}
endpoints := []*corev1.Endpoints{
buildEndpoint(
eNamespace("testing"),
eName("service1"),
eUID("1"),
subset(
eAddresses(eAddress("10.10.0.1")),
ePorts(ePort(8080, ""))),
),
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
}
provider := Provider{}
actual, err := provider.loadIngresses(client)
require.NoError(t, err, "error loading ingresses")
expected := buildConfiguration(
backends(
backend("global-default-backend",
lbMethod("wrr"),
servers(
server("http://10.10.0.1:8080", weight(1)),
),
),
),
frontends(
frontend("global-default-backend",
frontendName("global-default-frontend"),
passHostHeader(),
routes(
route("/", "PathPrefix:/"),
),
),
),
)
assert.Equal(t, expected, actual)
}
func TestLoadGlobalIngressWithHttpsPortNames(t *testing.T) {
ingresses := []*extensionsv1beta1.Ingress{
buildIngress(
iNamespace("testing"),
iSpecBackends(iSpecBackend(iIngressBackend("service1", intstr.FromString("https-global")))),
),
}
services := []*corev1.Service{
buildService(
sName("service1"),
sNamespace("testing"),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(8443, "https-global"))),
),
}
endpoints := []*corev1.Endpoints{
buildEndpoint(
eNamespace("testing"),
eName("service1"),
eUID("1"),
subset(
eAddresses(eAddress("10.10.0.1")),
ePorts(ePort(8080, ""))),
),
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
}
provider := Provider{}
actual, err := provider.loadIngresses(client)
require.NoError(t, err, "error loading ingresses")
expected := buildConfiguration(
backends(
backend("global-default-backend",
lbMethod("wrr"),
servers(
server("https://10.10.0.1:8080", weight(1)),
),
),
),
frontends(
frontend("global-default-backend",
frontendName("global-default-frontend"),
passHostHeader(),
routes(
route("/", "PathPrefix:/"),
),
),
),
)
assert.Equal(t, expected, actual)
}
func TestRuleType(t *testing.T) {
tests := []struct {
desc string
@ -1557,8 +1685,13 @@ func TestKubeAPIErrors(t *testing.T) {
provider := Provider{}
if _, err := provider.loadIngresses(client); err != apiErr {
t.Errorf("Got error %v, wanted error %v", err, apiErr)
if _, err := provider.loadIngresses(client); err != nil {
if client.apiServiceError != nil {
assert.EqualError(t, err, "failed kube api call")
}
if client.apiEndpointsError != nil {
assert.EqualError(t, err, "failed kube api call")
}
}
})
}