Upgrade Ingress Handling to work with networkingv1/Ingress
This commit is contained in:
parent
702e301990
commit
29908098e4
40 changed files with 1141 additions and 113 deletions
|
@ -13,11 +13,12 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
traefikversion "github.com/traefik/traefik/v2/pkg/version"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||
networkingv1 "k8s.io/api/networking/v1"
|
||||
networkingv1beta1 "k8s.io/api/networking/v1beta1"
|
||||
kubeerror "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
|
@ -54,12 +55,12 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
|
|||
// The stores can then be accessed via the Get* functions.
|
||||
type Client interface {
|
||||
WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error)
|
||||
GetIngresses() []*networkingv1beta1.Ingress
|
||||
GetIngresses() []*networkingv1.Ingress
|
||||
GetIngressClasses() ([]*networkingv1beta1.IngressClass, error)
|
||||
GetService(namespace, name string) (*corev1.Service, bool, error)
|
||||
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
|
||||
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
|
||||
UpdateIngressStatus(ing *networkingv1beta1.Ingress, ingStatus []corev1.LoadBalancerIngress) error
|
||||
UpdateIngressStatus(ing *networkingv1.Ingress, ingStatus []corev1.LoadBalancerIngress) error
|
||||
GetServerVersion() (*version.Version, error)
|
||||
}
|
||||
|
||||
|
@ -167,9 +168,20 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
|
|||
opts.LabelSelector = c.ingressLabelSelector
|
||||
}
|
||||
|
||||
serverVersion, err := c.GetServerVersion()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get server version: %w", err)
|
||||
}
|
||||
|
||||
for _, ns := range namespaces {
|
||||
factoryIngress := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(matchesLabelSelector))
|
||||
factoryIngress.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
|
||||
|
||||
if supportsNetworkingV1Ingress(serverVersion) {
|
||||
factoryIngress.Networking().V1().Ingresses().Informer().AddEventHandler(eventHandler)
|
||||
} else {
|
||||
factoryIngress.Networking().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
|
||||
}
|
||||
|
||||
c.factoriesIngress[ns] = factoryIngress
|
||||
|
||||
factoryKube := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns))
|
||||
|
@ -208,12 +220,6 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
|
|||
}
|
||||
}
|
||||
|
||||
serverVersion, err := c.GetServerVersion()
|
||||
if err != nil {
|
||||
log.WithoutContext().Errorf("Failed to get server version: %v", err)
|
||||
return eventCh, nil
|
||||
}
|
||||
|
||||
if supportsIngressClass(serverVersion) {
|
||||
c.clusterFactory = informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod)
|
||||
c.clusterFactory.Networking().V1beta1().IngressClasses().Informer().AddEventHandler(eventHandler)
|
||||
|
@ -230,42 +236,59 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
|
|||
}
|
||||
|
||||
// GetIngresses returns all Ingresses for observed namespaces in the cluster.
|
||||
func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress {
|
||||
var results []*networkingv1beta1.Ingress
|
||||
func (c *clientWrapper) GetIngresses() []*networkingv1.Ingress {
|
||||
var results []*networkingv1.Ingress
|
||||
|
||||
serverVersion, err := c.GetServerVersion()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get server version: %v", err)
|
||||
return results
|
||||
}
|
||||
|
||||
isNetworkingV1Supported := supportsNetworkingV1Ingress(serverVersion)
|
||||
|
||||
for ns, factory := range c.factoriesIngress {
|
||||
// extensions
|
||||
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
|
||||
}
|
||||
|
||||
for _, ing := range ings {
|
||||
n, err := extensionsToNetworking(ing)
|
||||
if isNetworkingV1Supported {
|
||||
// networking
|
||||
listNew, err := factory.Networking().V1().Ingresses().Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to convert ingress %s from extensions/v1beta1 to networking/v1beta1: %v", ns, err)
|
||||
log.WithoutContext().Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
|
||||
continue
|
||||
}
|
||||
results = append(results, n)
|
||||
|
||||
results = append(results, listNew...)
|
||||
continue
|
||||
}
|
||||
|
||||
// networking
|
||||
// networking beta
|
||||
list, err := factory.Networking().V1beta1().Ingresses().Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
|
||||
log.WithoutContext().Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ing := range list {
|
||||
n, err := toNetworkingV1(ing)
|
||||
if err != nil {
|
||||
log.WithoutContext().Errorf("Failed to convert ingress %s from networking/v1beta1 to networking/v1: %v", ns, err)
|
||||
continue
|
||||
}
|
||||
|
||||
addServiceFromV1Beta1(n, *ing)
|
||||
|
||||
results = append(results, n)
|
||||
}
|
||||
results = append(results, list...)
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func extensionsToNetworking(ing marshaler) (*networkingv1beta1.Ingress, error) {
|
||||
func toNetworkingV1(ing marshaler) (*networkingv1.Ingress, error) {
|
||||
data, err := ing.Marshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ni := &networkingv1beta1.Ingress{}
|
||||
ni := &networkingv1.Ingress{}
|
||||
err = ni.Unmarshal(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -274,16 +297,95 @@ func extensionsToNetworking(ing marshaler) (*networkingv1beta1.Ingress, error) {
|
|||
return ni, nil
|
||||
}
|
||||
|
||||
func addServiceFromV1Beta1(ing *networkingv1.Ingress, old networkingv1beta1.Ingress) {
|
||||
if old.Spec.Backend != nil {
|
||||
port := networkingv1.ServiceBackendPort{}
|
||||
if old.Spec.Backend.ServicePort.Type == intstr.Int {
|
||||
port.Number = old.Spec.Backend.ServicePort.IntVal
|
||||
} else {
|
||||
port.Name = old.Spec.Backend.ServicePort.StrVal
|
||||
}
|
||||
|
||||
if old.Spec.Backend.ServiceName != "" {
|
||||
ing.Spec.DefaultBackend = &networkingv1.IngressBackend{
|
||||
Service: &networkingv1.IngressServiceBackend{
|
||||
Name: old.Spec.Backend.ServiceName,
|
||||
Port: port,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for rc, rule := range ing.Spec.Rules {
|
||||
if rule.HTTP == nil {
|
||||
continue
|
||||
}
|
||||
for pc, path := range rule.HTTP.Paths {
|
||||
if path.Backend.Service == nil {
|
||||
oldBackend := old.Spec.Rules[rc].HTTP.Paths[pc].Backend
|
||||
|
||||
port := networkingv1.ServiceBackendPort{}
|
||||
if oldBackend.ServicePort.Type == intstr.Int {
|
||||
port.Number = oldBackend.ServicePort.IntVal
|
||||
} else {
|
||||
port.Name = oldBackend.ServicePort.StrVal
|
||||
}
|
||||
|
||||
svc := networkingv1.IngressServiceBackend{
|
||||
Name: oldBackend.ServiceName,
|
||||
Port: port,
|
||||
}
|
||||
|
||||
ing.Spec.Rules[rc].HTTP.Paths[pc].Backend.Service = &svc
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateIngressStatus updates an Ingress with a provided status.
|
||||
func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ingStatus []corev1.LoadBalancerIngress) error {
|
||||
func (c *clientWrapper) UpdateIngressStatus(src *networkingv1.Ingress, ingStatus []corev1.LoadBalancerIngress) error {
|
||||
if !c.isWatchedNamespace(src.Namespace) {
|
||||
return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", src.Namespace, src.Name)
|
||||
}
|
||||
|
||||
if src.GetObjectKind().GroupVersionKind().Group != "networking.k8s.io" {
|
||||
serverVersion, err := c.GetServerVersion()
|
||||
if err != nil {
|
||||
log.WithoutContext().Errorf("Failed to get server version: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !supportsNetworkingV1Ingress(serverVersion) {
|
||||
return c.updateIngressStatusOld(src, ingStatus)
|
||||
}
|
||||
|
||||
ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Networking().V1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err)
|
||||
}
|
||||
|
||||
logger := log.WithoutContext().WithField("namespace", ing.Namespace).WithField("ingress", ing.Name)
|
||||
|
||||
if isLoadBalancerIngressEquals(ing.Status.LoadBalancer.Ingress, ingStatus) {
|
||||
logger.Debug("Skipping ingress status update")
|
||||
return nil
|
||||
}
|
||||
|
||||
ingCopy := ing.DeepCopy()
|
||||
ingCopy.Status = networkingv1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: ingStatus}}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err = c.clientset.NetworkingV1().Ingresses(ingCopy.Namespace).UpdateStatus(ctx, ingCopy, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update ingress status %s/%s: %w", src.Namespace, src.Name, err)
|
||||
}
|
||||
|
||||
logger.Info("Updated ingress status")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) updateIngressStatusOld(src *networkingv1.Ingress, ingStatus []corev1.LoadBalancerIngress) error {
|
||||
ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err)
|
||||
|
@ -306,35 +408,6 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ingS
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to update ingress status %s/%s: %w", src.Namespace, src.Name, err)
|
||||
}
|
||||
|
||||
logger.Info("Updated ingress status")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, ingStatus []corev1.LoadBalancerIngress) error {
|
||||
ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err)
|
||||
}
|
||||
|
||||
logger := log.WithoutContext().WithField("namespace", ing.Namespace).WithField("ingress", ing.Name)
|
||||
|
||||
if isLoadBalancerIngressEquals(ing.Status.LoadBalancer.Ingress, ingStatus) {
|
||||
logger.Debug("Skipping ingress status update")
|
||||
return nil
|
||||
}
|
||||
|
||||
ingCopy := ing.DeepCopy()
|
||||
ingCopy.Status = extensionsv1beta1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: ingStatus}}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err = c.clientset.ExtensionsV1beta1().Ingresses(ingCopy.Namespace).UpdateStatus(ctx, ingCopy, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update ingress status %s/%s: %w", src.Namespace, src.Name, err)
|
||||
}
|
||||
|
||||
logger.Info("Updated ingress status")
|
||||
return nil
|
||||
}
|
||||
|
@ -488,3 +561,11 @@ func filterIngressClassByName(ingressClassName string, ics []*networkingv1beta1.
|
|||
|
||||
return ingressClasses
|
||||
}
|
||||
|
||||
// Ingress in networking.k8s.io/v1 is supported starting 1.19.
|
||||
// thus, we query it in K8s starting 1.19.
|
||||
func supportsNetworkingV1Ingress(serverVersion *version.Version) bool {
|
||||
ingressNetworkingVersion := version.Must(version.NewVersion("1.19"))
|
||||
|
||||
return serverVersion.GreaterThanOrEqual(ingressNetworkingVersion)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue