1
0
Fork 0

Allow to use internal node IPs for NodePort services

This commit is contained in:
Joris Vergeer 2024-02-27 10:54:04 +01:00 committed by GitHub
parent 73769af0fe
commit c1ef742977
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 813 additions and 51 deletions

View file

@ -46,6 +46,7 @@ type ServiceIng struct {
PassHostHeader *bool `json:"passHostHeader"`
Sticky *dynamic.Sticky `json:"sticky,omitempty" label:"allowEmpty"`
NativeLB bool `json:"nativeLB,omitempty"`
NodePortLB bool `json:"nodePortLB,omitempty"`
}
// SetDefaults sets the default values.

View file

@ -39,12 +39,14 @@ type Client interface {
GetIngressClasses() ([]*netv1.IngressClass, error)
GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetNodes() ([]*corev1.Node, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
UpdateIngressStatus(ing *netv1.Ingress, ingStatus []netv1.IngressLoadBalancerIngress) error
}
type clientWrapper struct {
clientset kclientset.Interface
factoryClusterScope kinformers.SharedInformerFactory
factoriesKube map[string]kinformers.SharedInformerFactory
factoriesSecret map[string]kinformers.SharedInformerFactory
factoriesIngress map[string]kinformers.SharedInformerFactory
@ -196,11 +198,18 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
c.factoriesSecret[ns] = factorySecret
}
c.factoryClusterScope = kinformers.NewSharedInformerFactory(c.clientset, resyncPeriod)
_, err = c.factoryClusterScope.Core().V1().Nodes().Informer().AddEventHandler(eventHandler)
if err != nil {
return nil, err
}
for _, ns := range namespaces {
c.factoriesIngress[ns].Start(stopCh)
c.factoriesKube[ns].Start(stopCh)
c.factoriesSecret[ns].Start(stopCh)
}
c.factoryClusterScope.Start(stopCh)
for _, ns := range namespaces {
for typ, ok := range c.factoriesIngress[ns].WaitForCacheSync(stopCh) {
@ -222,6 +231,12 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
}
}
for t, ok := range c.factoryClusterScope.WaitForCacheSync(stopCh) {
if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s", t.String())
}
}
if !c.disableIngressClassInformer {
c.clusterFactory = kinformers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod)
@ -346,6 +361,12 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool,
return secret, exist, err
}
func (c *clientWrapper) GetNodes() ([]*corev1.Node, bool, error) {
nodes, err := c.factoryClusterScope.Core().V1().Nodes().Lister().List(labels.Everything())
exist, err := translateNotFoundError(err)
return nodes, exist, err
}
func (c *clientWrapper) GetIngressClasses() ([]*netv1.IngressClass, error) {
if c.clusterFactory == nil {
return nil, errors.New("cluster factory not loaded")

View file

@ -16,11 +16,13 @@ type clientMock struct {
services []*corev1.Service
secrets []*corev1.Secret
endpoints []*corev1.Endpoints
nodes []*corev1.Node
ingressClasses []*netv1.IngressClass
apiServiceError error
apiSecretError error
apiEndpointsError error
apiNodesError error
apiIngressStatusError error
watchChan chan interface{}
@ -43,6 +45,8 @@ func newClientMock(path string) clientMock {
c.secrets = append(c.secrets, o)
case *corev1.Endpoints:
c.endpoints = append(c.endpoints, o)
case *corev1.Node:
c.nodes = append(c.nodes, o)
case *netv1.Ingress:
c.ingresses = append(c.ingresses, o)
case *netv1.IngressClass:
@ -86,6 +90,14 @@ func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, boo
return &corev1.Endpoints{}, false, nil
}
func (c clientMock) GetNodes() ([]*corev1.Node, bool, error) {
if c.apiNodesError != nil {
return nil, false, c.apiNodesError
}
return c.nodes, true, nil
}
func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
if c.apiSecretError != nil {
return nil, false, c.apiSecretError

View file

@ -0,0 +1,45 @@
kind: Ingress
apiVersion: networking.k8s.io/v1
metadata:
name: ""
namespace: testing
spec:
rules:
- host: traefik.tchouk
http:
paths:
- path: /bar
backend:
service:
name: service1
port:
number: 8080
pathType: Prefix
---
kind: Service
apiVersion: v1
metadata:
name: service1
namespace: testing
annotations:
traefik.ingress.kubernetes.io/service.nodeportlb: "true"
spec:
ports:
- port: 8080
nodePort: 32456
clusterIP: 10.0.0.1
type: NodePort
externalName: traefik.wtf
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node
status:
addresses:
- type: InternalIP
address: 172.16.4.4

View file

@ -599,6 +599,41 @@ func (p *Provider) loadService(client Client, namespace string, backend netv1.In
return svc, nil
}
if svcConfig.Service.NodePortLB && service.Spec.Type == corev1.ServiceTypeNodePort {
nodes, nodesExists, nodesErr := client.GetNodes()
if nodesErr != nil {
return nil, nodesErr
}
if !nodesExists || len(nodes) == 0 {
return nil, fmt.Errorf("nodes not found in namespace %s", namespace)
}
protocol := getProtocol(portSpec, portSpec.Name, svcConfig)
var servers []dynamic.Server
for _, node := range nodes {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
hostPort := net.JoinHostPort(addr.Address, strconv.Itoa(int(portSpec.NodePort)))
servers = append(servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", protocol, hostPort),
})
}
}
}
if len(servers) == 0 {
return nil, fmt.Errorf("no servers were generated for service %s in namespace", backend.Service.Name)
}
svc.LoadBalancer.Servers = servers
return svc, nil
}
}
if service.Spec.Type == corev1.ServiceTypeExternalName {

View file

@ -1694,6 +1694,58 @@ func TestLoadConfigurationFromIngressesWithNativeLB(t *testing.T) {
}
}
func TestLoadConfigurationFromIngressesWithNodePortLB(t *testing.T) {
testCases := []struct {
desc string
ingressClass string
expected *dynamic.Configuration
}{
{
desc: "Ingress with node port lb",
expected: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{},
HTTP: &dynamic.HTTPConfiguration{
Middlewares: map[string]*dynamic.Middleware{},
Routers: map[string]*dynamic.Router{
"testing-traefik-tchouk-bar": {
Rule: "Host(`traefik.tchouk`) && PathPrefix(`/bar`)",
Service: "testing-service1-8080",
},
},
Services: map[string]*dynamic.Service{
"testing-service1-8080": {
LoadBalancer: &dynamic.ServersLoadBalancer{
ResponseForwarding: &dynamic.ResponseForwarding{FlushInterval: dynamic.DefaultFlushInterval},
PassHostHeader: Bool(true),
Servers: []dynamic.Server{
{
URL: "http://172.16.4.4:32456",
},
},
},
},
},
},
},
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
clientMock := newClientMock(generateTestFilename(test.desc))
p := Provider{IngressClass: test.ingressClass}
conf := p.loadConfigurationFromIngresses(context.Background(), clientMock)
assert.Equal(t, test.expected, conf)
})
}
}
func generateTestFilename(desc string) string {
return filepath.Join("fixtures", strings.ReplaceAll(desc, " ", "-")+".yml")
}