Migrate to EndpointSlices API

This commit is contained in:
Jesper Noordsij 2024-06-21 14:56:03 +02:00 committed by GitHub
parent 61defcdd66
commit a8a92eb2a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
88 changed files with 2177 additions and 1555 deletions

View file

@ -11,9 +11,11 @@ import (
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/types"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
kerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
ktypes "k8s.io/apimachinery/pkg/types"
kinformers "k8s.io/client-go/informers"
kclientset "k8s.io/client-go/kubernetes"
@ -61,9 +63,9 @@ type Client interface {
ListTLSRoutes() ([]*gatev1alpha2.TLSRoute, error)
ListNamespaces(selector labels.Selector) ([]string, error)
ListReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error)
ListEndpointSlicesForService(namespace, serviceName string) ([]*discoveryv1.EndpointSlice, error)
GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
}
type clientWrapper struct {
@ -222,7 +224,7 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
if err != nil {
return nil, err
}
_, err = factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
_, err = factoryKube.Discovery().V1().EndpointSlices().Informer().AddEventHandler(eventHandler)
if err != nil {
return nil, err
}
@ -543,16 +545,20 @@ func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo
return service, exist, err
}
// GetEndpoints returns the named endpoints from the given namespace.
func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
// ListEndpointSlicesForService returns the EndpointSlices for the given service name in the given namespace.
func (c *clientWrapper) ListEndpointSlicesForService(namespace, serviceName string) ([]*discoveryv1.EndpointSlice, error) {
if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get endpoints %s/%s: namespace is not within watched namespaces", namespace, name)
return nil, fmt.Errorf("failed to get endpointslices for service %s/%s: namespace is not within watched namespaces", namespace, serviceName)
}
endpoint, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1().Endpoints().Lister().Endpoints(namespace).Get(name)
exist, err := translateNotFoundError(err)
serviceLabelRequirement, err := labels.NewRequirement(discoveryv1.LabelServiceName, selection.Equals, []string{serviceName})
if err != nil {
return nil, fmt.Errorf("failed to create service label selector requirement: %w", err)
}
serviceSelector := labels.NewSelector()
serviceSelector = serviceSelector.Add(*serviceLabelRequirement)
return endpoint, exist, err
return c.factoriesKube[c.lookupNamespace(namespace)].Discovery().V1().EndpointSlices().Lister().EndpointSlices(namespace).List(serviceSelector)
}
// GetSecret returns the named secret from the given namespace.

View file

@ -17,21 +17,26 @@ spec:
task: whoami
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami
name: whoami-abc
namespace: default
labels:
kubernetes.io/service-name: whoami
subsets:
addressType: IPv4
ports:
- name: web
port: 80
- name: web2
port: 8080
endpoints:
- addresses:
- ip: 10.10.0.1
- ip: 10.10.0.2
ports:
- name: web
port: 80
- name: web2
port: 8000
- 10.10.0.1
- 10.10.0.2
conditions:
ready: true
---
apiVersion: v1
@ -53,21 +58,26 @@ spec:
task: whoami
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami-bar
name: whoami-bar-abc
namespace: bar
labels:
kubernetes.io/service-name: whoami-bar
subsets:
addressType: IPv4
ports:
- name: web
port: 80
- name: web2
port: 8000
endpoints:
- addresses:
- ip: 10.10.0.11
- ip: 10.10.0.12
ports:
- name: web
port: 80
- name: web2
port: 8000
- 10.10.0.11
- 10.10.0.12
conditions:
ready: true
---
apiVersion: v1
@ -86,19 +96,24 @@ spec:
task: whoami2
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami2
name: whoami2-abc
namespace: default
labels:
kubernetes.io/service-name: whoami2
subsets:
addressType: IPv4
ports:
- name: web
port: 8080
endpoints:
- addresses:
- ip: 10.10.0.3
- ip: 10.10.0.4
ports:
- name: web
port: 8080
- 10.10.0.3
- 10.10.0.4
conditions:
ready: true
---
apiVersion: v1
@ -117,19 +132,24 @@ spec:
task: whoami2
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoamitls
name: whoamitls-abc
namespace: default
labels:
kubernetes.io/service-name: whoamitls
subsets:
addressType: IPv4
ports:
- name: websecure
port: 8443
endpoints:
- addresses:
- ip: 10.10.0.5
- ip: 10.10.0.6
ports:
- name: websecure
port: 8443
- 10.10.0.5
- 10.10.0.6
conditions:
ready: true
---
apiVersion: v1
@ -148,19 +168,24 @@ spec:
task: whoami3
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami3
name: whoami3-abc
namespace: default
labels:
kubernetes.io/service-name: whoami3
subsets:
addressType: IPv4
ports:
- name: websecure2
port: 8443
endpoints:
- addresses:
- ip: 10.10.0.7
- ip: 10.10.0.8
ports:
- name: websecure2
port: 8443
- 10.10.0.7
- 10.10.0.8
conditions:
ready: true
---
apiVersion: v1
@ -201,23 +226,28 @@ spec:
port: 443
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoamitcp
name: whoamitcp-abc
namespace: default
labels:
kubernetes.io/service-name: whoamitcp
subsets:
addressType: IPv4
ports:
- name: tcp-1
protocol: TCP
port: 9000
- name: tcp-2
protocol: TCP
port: 10000
endpoints:
- addresses:
- ip: 10.10.0.9
- ip: 10.10.0.10
ports:
- name: tcp-1
protocol: TCP
port: 9000
- name: tcp-2
protocol: TCP
port: 10000
- 10.10.0.9
- 10.10.0.10
conditions:
ready: true
---
apiVersion: v1
@ -236,23 +266,28 @@ spec:
name: tcp-2
---
kind: Endpoints
apiVersion: v1
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoamitcp-bar
name: whoamitcp-bar-abc
namespace: bar
labels:
kubernetes.io/service-name: whoamitcp-bar
subsets:
addressType: IPv4
ports:
- name: tcp-1
protocol: TCP
port: 9000
- name: tcp-2
protocol: TCP
port: 10000
endpoints:
- addresses:
- ip: 10.10.0.13
- ip: 10.10.0.14
ports:
- name: tcp-1
protocol: TCP
port: 9000
- name: tcp-2
protocol: TCP
port: 10000
- 10.10.0.13
- 10.10.0.14
conditions:
ready: true
---
apiVersion: v1

View file

@ -370,6 +370,10 @@ func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRe
}
func (p *Provider) loadHTTPServers(namespace string, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, error) {
if backendRef.Port == nil {
return nil, errors.New("port is required for Kubernetes Service reference")
}
service, exists, err := p.client.GetService(namespace, string(backendRef.Name))
if err != nil {
return nil, fmt.Errorf("getting service: %w", err)
@ -378,56 +382,58 @@ func (p *Provider) loadHTTPServers(namespace string, backendRef gatev1.HTTPBacke
return nil, errors.New("service not found")
}
var portSpec corev1.ServicePort
var match bool
var svcPort *corev1.ServicePort
for _, p := range service.Spec.Ports {
if backendRef.Port == nil || p.Port == int32(*backendRef.Port) {
portSpec = p
match = true
if p.Port == int32(*backendRef.Port) {
svcPort = &p
break
}
}
if !match {
return nil, errors.New("service port not found")
if svcPort == nil {
return nil, fmt.Errorf("service port %d not found", *backendRef.Port)
}
endpoints, endpointsExists, err := p.client.GetEndpoints(namespace, string(backendRef.Name))
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name))
if err != nil {
return nil, fmt.Errorf("getting endpoints: %w", err)
return nil, fmt.Errorf("getting endpointslices: %w", err)
}
if !endpointsExists {
return nil, errors.New("endpoints not found")
}
if len(endpoints.Subsets) == 0 {
return nil, errors.New("subset not found")
if len(endpointSlices) == 0 {
return nil, errors.New("endpointslices not found")
}
lb := &dynamic.ServersLoadBalancer{}
lb.SetDefaults()
var port int32
var portStr string
for _, subset := range endpoints.Subsets {
for _, p := range subset.Ports {
if portSpec.Name == p.Name {
port = p.Port
protocol := getProtocol(*svcPort)
addresses := map[string]struct{}{}
for _, endpointSlice := range endpointSlices {
var port int32
for _, p := range endpointSlice.Ports {
if svcPort.Name == *p.Name {
port = *p.Port
break
}
}
if port == 0 {
return nil, errors.New("cannot define a port")
continue
}
protocol := getProtocol(portSpec)
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
portStr = strconv.FormatInt(int64(port), 10)
for _, addr := range subset.Addresses {
lb.Servers = append(lb.Servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(addr.IP, portStr)),
})
for _, address := range endpoint.Addresses {
if _, ok := addresses[address]; ok {
continue
}
addresses[address] = struct{}{}
lb.Servers = append(lb.Servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
})
}
}
}

View file

@ -206,82 +206,71 @@ func (p *Provider) loadTCPServices(namespace string, backendRefs []gatev1.Backen
return nil, nil, fmt.Errorf("unsupported BackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name)
}
svc := dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{},
if backendRef.Port == nil {
return nil, nil, errors.New("port is required for Kubernetes Service reference")
}
service, exists, err := p.client.GetService(namespace, string(backendRef.Name))
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("getting service: %w", err)
}
if !exists {
return nil, nil, errors.New("service not found")
}
if len(service.Spec.Ports) > 1 && backendRef.Port == nil {
// If the port is unspecified and the backend is a Service
// object consisting of multiple port definitions, the route
// must be dropped from the Gateway. The controller should
// raise the "ResolvedRefs" condition on the Gateway with the
// "DroppedRoutes" reason. The gateway status for this route
// should be updated with a condition that describes the error
// more specifically.
log.Error().Msg("A multiple ports Kubernetes Service cannot be used if unspecified backendRef.Port")
continue
}
var portSpec corev1.ServicePort
var match bool
var svcPort *corev1.ServicePort
for _, p := range service.Spec.Ports {
if backendRef.Port == nil || p.Port == int32(*backendRef.Port) {
portSpec = p
match = true
if p.Port == int32(*backendRef.Port) {
svcPort = &p
break
}
}
if !match {
return nil, nil, errors.New("service port not found")
if svcPort == nil {
return nil, nil, fmt.Errorf("service port %d not found", *backendRef.Port)
}
endpoints, endpointsExists, endpointsErr := p.client.GetEndpoints(namespace, string(backendRef.Name))
if endpointsErr != nil {
return nil, nil, endpointsErr
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name))
if err != nil {
return nil, nil, fmt.Errorf("getting endpointslices: %w", err)
}
if len(endpointSlices) == 0 {
return nil, nil, errors.New("endpointslices not found")
}
if !endpointsExists {
return nil, nil, errors.New("endpoints not found")
}
svc := dynamic.TCPService{LoadBalancer: &dynamic.TCPServersLoadBalancer{}}
if len(endpoints.Subsets) == 0 {
return nil, nil, errors.New("subset not found")
}
var port int32
var portStr string
for _, subset := range endpoints.Subsets {
for _, p := range subset.Ports {
if portSpec.Name == p.Name {
port = p.Port
addresses := map[string]struct{}{}
for _, endpointSlice := range endpointSlices {
var port int32
for _, p := range endpointSlice.Ports {
if svcPort.Name == *p.Name {
port = *p.Port
break
}
}
if port == 0 {
return nil, nil, errors.New("cannot define a port")
continue
}
portStr = strconv.FormatInt(int64(port), 10)
for _, addr := range subset.Addresses {
svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.TCPServer{
Address: net.JoinHostPort(addr.IP, portStr),
})
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
for _, address := range endpoint.Addresses {
if _, ok := addresses[address]; ok {
continue
}
addresses[address] = struct{}{}
svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.TCPServer{
Address: net.JoinHostPort(address, strconv.Itoa(int(port))),
})
}
}
}
serviceName := provider.Normalize(service.Namespace + "-" + service.Name + "-" + portStr)
serviceName := provider.Normalize(service.Namespace + "-" + service.Name + "-" + strconv.Itoa(int(svcPort.Port)))
services[serviceName] = &svc
wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.TCPWRRService{Name: serviceName, Weight: &weight})