Add support for Gateway API BackendTLSPolicies
This commit is contained in:
parent
9750bbc353
commit
1ebd12ff82
9 changed files with 657 additions and 66 deletions
|
@ -25,6 +25,7 @@ import (
|
|||
"k8s.io/client-go/util/retry"
|
||||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
|
||||
gatev1alpha3 "sigs.k8s.io/gateway-api/apis/v1alpha3"
|
||||
gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
gateclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
|
||||
gateinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
|
||||
|
@ -48,30 +49,6 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
|
|||
eventHandlerFunc(reh.ev, obj)
|
||||
}
|
||||
|
||||
// Client is a client for the Provider master.
|
||||
// WatchAll starts the watch of the Provider resources and updates the stores.
|
||||
// The stores can then be accessed via the Get* functions.
|
||||
type Client interface {
|
||||
WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error)
|
||||
UpdateGatewayStatus(ctx context.Context, gateway ktypes.NamespacedName, status gatev1.GatewayStatus) error
|
||||
UpdateGatewayClassStatus(ctx context.Context, name string, status gatev1.GatewayClassStatus) error
|
||||
UpdateHTTPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error
|
||||
UpdateGRPCRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.GRPCRouteStatus) error
|
||||
UpdateTCPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TCPRouteStatus) error
|
||||
UpdateTLSRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TLSRouteStatus) error
|
||||
ListGatewayClasses() ([]*gatev1.GatewayClass, error)
|
||||
ListGateways() []*gatev1.Gateway
|
||||
ListHTTPRoutes() ([]*gatev1.HTTPRoute, error)
|
||||
ListGRPCRoutes() ([]*gatev1.GRPCRoute, error)
|
||||
ListTCPRoutes() ([]*gatev1alpha2.TCPRoute, error)
|
||||
ListTLSRoutes() ([]*gatev1alpha2.TLSRoute, error)
|
||||
ListNamespaces(selector labels.Selector) ([]string, error)
|
||||
ListReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error)
|
||||
ListEndpointSlicesForService(namespace, serviceName string) ([]*discoveryv1.EndpointSlice, error)
|
||||
GetService(namespace, name string) (*corev1.Service, bool, error)
|
||||
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
|
||||
}
|
||||
|
||||
type clientWrapper struct {
|
||||
csGateway gateclientset.Interface
|
||||
csKube kclientset.Interface
|
||||
|
@ -198,6 +175,16 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
|
|||
}
|
||||
|
||||
for _, ns := range namespaces {
|
||||
factoryKube := kinformers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, kinformers.WithNamespace(ns))
|
||||
_, err = factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = factoryKube.Discovery().V1().EndpointSlices().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
factoryGateway := gateinformers.NewSharedInformerFactoryWithOptions(c.csGateway, resyncPeriod, gateinformers.WithNamespace(ns))
|
||||
_, err = factoryGateway.Gateway().V1().Gateways().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
|
@ -225,16 +212,14 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
factoryKube := kinformers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, kinformers.WithNamespace(ns))
|
||||
_, err = factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = factoryKube.Discovery().V1().EndpointSlices().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
_, err = factoryGateway.Gateway().V1alpha3().BackendTLSPolicies().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = factoryKube.Core().V1().ConfigMaps().Informer().AddEventHandler(eventHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
factorySecret := kinformers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, kinformers.WithNamespace(ns), kinformers.WithTweakListOptions(notOwnedByHelm))
|
||||
|
@ -367,8 +352,6 @@ func (c *clientWrapper) ListTLSRoutes() ([]*gatev1alpha2.TLSRoute, error) {
|
|||
|
||||
func (c *clientWrapper) ListReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error) {
|
||||
if !c.isWatchedNamespace(namespace) {
|
||||
log.Warn().Msgf("Failed to get ReferenceGrants: %q is not within watched namespaces", namespace)
|
||||
|
||||
return nil, fmt.Errorf("failed to get ReferenceGrants: namespace %s is not within watched namespaces", namespace)
|
||||
}
|
||||
|
||||
|
@ -424,7 +407,7 @@ func (c *clientWrapper) UpdateGatewayClassStatus(ctx context.Context, name strin
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update GatewayClass %q status: %w", name, err)
|
||||
return fmt.Errorf("failed to update GatewayClass %s status: %w", name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -459,7 +442,7 @@ func (c *clientWrapper) UpdateGatewayStatus(ctx context.Context, gateway ktypes.
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update Gateway %q status: %w", gateway.Name, err)
|
||||
return fmt.Errorf("failed to update Gateway %s/%s status: %w", gateway.Namespace, gateway.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -486,7 +469,6 @@ func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes.
|
|||
for _, parentStatus := range currentRoute.Status.Parents {
|
||||
if parentStatus.ControllerName != controllerName {
|
||||
parentStatuses = append(parentStatuses, parentStatus)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,7 +493,7 @@ func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes.
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update HTTPRoute %q status: %w", route.Name, err)
|
||||
return fmt.Errorf("failed to update HTTPRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -538,7 +520,6 @@ func (c *clientWrapper) UpdateGRPCRouteStatus(ctx context.Context, route ktypes.
|
|||
for _, parentStatus := range currentRoute.Status.Parents {
|
||||
if parentStatus.ControllerName != controllerName {
|
||||
parentStatuses = append(parentStatuses, parentStatus)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,7 +571,6 @@ func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.N
|
|||
for _, parentStatus := range currentRoute.Status.Parents {
|
||||
if parentStatus.ControllerName != controllerName {
|
||||
parentStatuses = append(parentStatuses, parentStatus)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -615,7 +595,7 @@ func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.N
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update TCPRoute %q status: %w", route.Name, err)
|
||||
return fmt.Errorf("failed to update TCPRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -642,7 +622,6 @@ func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.N
|
|||
for _, parentStatus := range currentRoute.Status.Parents {
|
||||
if parentStatus.ControllerName != controllerName {
|
||||
parentStatuses = append(parentStatuses, parentStatus)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -667,7 +646,69 @@ func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.N
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update TLSRoute %q status: %w", route.Name, err)
|
||||
return fmt.Errorf("failed to update TLSRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) UpdateBackendTLSPolicyStatus(ctx context.Context, policy ktypes.NamespacedName, status gatev1alpha2.PolicyStatus) error {
|
||||
if !c.isWatchedNamespace(policy.Namespace) {
|
||||
return fmt.Errorf("updating BackendTLSPolicy status %s/%s: namespace is not within watched namespaces", policy.Namespace, policy.Name)
|
||||
}
|
||||
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentPolicy, err := c.factoriesGateway[c.lookupNamespace(policy.Namespace)].Gateway().V1alpha3().BackendTLSPolicies().Lister().BackendTLSPolicies(policy.Namespace).Get(policy.Name)
|
||||
if err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
ancestorStatuses := make([]gatev1alpha2.PolicyAncestorStatus, len(status.Ancestors))
|
||||
copy(ancestorStatuses, status.Ancestors)
|
||||
|
||||
// keep statuses added by other gateway controllers,
|
||||
// and statuses for Traefik gateway controller but not for the same Gateway as the one in parameter (AncestorRef).
|
||||
for _, ancestorStatus := range currentPolicy.Status.Ancestors {
|
||||
if ancestorStatus.ControllerName != controllerName {
|
||||
ancestorStatuses = append(ancestorStatuses, ancestorStatus)
|
||||
continue
|
||||
}
|
||||
|
||||
if slices.ContainsFunc(status.Ancestors, func(status gatev1alpha2.PolicyAncestorStatus) bool {
|
||||
return reflect.DeepEqual(ancestorStatus.AncestorRef, status.AncestorRef)
|
||||
}) {
|
||||
continue
|
||||
}
|
||||
|
||||
ancestorStatuses = append(ancestorStatuses, ancestorStatus)
|
||||
}
|
||||
|
||||
if len(ancestorStatuses) > 16 {
|
||||
return fmt.Errorf("failed to update BackendTLSPolicy %s/%s status: PolicyAncestor statuses count exceeds 16", policy.Namespace, policy.Name)
|
||||
}
|
||||
|
||||
// do not update status when nothing has changed.
|
||||
if policyAncestorStatusesEqual(currentPolicy.Status.Ancestors, ancestorStatuses) {
|
||||
return nil
|
||||
}
|
||||
|
||||
currentPolicy = currentPolicy.DeepCopy()
|
||||
currentPolicy.Status = gatev1alpha2.PolicyStatus{
|
||||
Ancestors: ancestorStatuses,
|
||||
}
|
||||
|
||||
if _, err = c.csGateway.GatewayV1alpha3().BackendTLSPolicies(policy.Namespace).UpdateStatus(ctx, currentPolicy, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update BackendTLSPolicy %s/%s status: %w", policy.Namespace, policy.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -701,6 +742,32 @@ func (c *clientWrapper) ListEndpointSlicesForService(namespace, serviceName stri
|
|||
return c.factoriesKube[c.lookupNamespace(namespace)].Discovery().V1().EndpointSlices().Lister().EndpointSlices(namespace).List(serviceSelector)
|
||||
}
|
||||
|
||||
// ListBackendTLSPoliciesForService returns the BackendTLSPolicy for the given service name in the given namespace.
|
||||
func (c *clientWrapper) ListBackendTLSPoliciesForService(namespace, serviceName string) ([]*gatev1alpha3.BackendTLSPolicy, error) {
|
||||
if !c.isWatchedNamespace(namespace) {
|
||||
return nil, fmt.Errorf("failed to get BackendTLSPolicies for service %s/%s: namespace is not within watched namespaces", namespace, serviceName)
|
||||
}
|
||||
|
||||
policies, err := c.factoriesGateway[c.lookupNamespace(namespace)].Gateway().V1alpha3().BackendTLSPolicies().Lister().BackendTLSPolicies(namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list BackendTLSPolicies in namespace %s", namespace)
|
||||
}
|
||||
|
||||
var servicePolicies []*gatev1alpha3.BackendTLSPolicy
|
||||
for _, policy := range policies {
|
||||
for _, ref := range policy.Spec.TargetRefs {
|
||||
// The policy does not target the service.
|
||||
if ref.Group != groupCore || ref.Kind != kindService || string(ref.Name) != serviceName {
|
||||
continue
|
||||
}
|
||||
|
||||
servicePolicies = append(servicePolicies, policy)
|
||||
}
|
||||
}
|
||||
|
||||
return servicePolicies, nil
|
||||
}
|
||||
|
||||
// GetSecret returns the named secret from the given namespace.
|
||||
func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
|
||||
if !c.isWatchedNamespace(namespace) {
|
||||
|
@ -713,6 +780,18 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool,
|
|||
return secret, exist, err
|
||||
}
|
||||
|
||||
// GetConfigMap returns the named configMap from the given namespace.
|
||||
func (c *clientWrapper) GetConfigMap(namespace, name string) (*corev1.ConfigMap, bool, error) {
|
||||
if !c.isWatchedNamespace(namespace) {
|
||||
return nil, false, fmt.Errorf("failed to get configMap %s/%s: namespace is not within watched namespaces", namespace, name)
|
||||
}
|
||||
|
||||
configMap, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1().ConfigMaps().Lister().ConfigMaps(namespace).Get(name)
|
||||
exist, err := translateNotFoundError(err)
|
||||
|
||||
return configMap, exist, err
|
||||
}
|
||||
|
||||
// lookupNamespace returns the lookup namespace key for the given namespace.
|
||||
// When listening on all namespaces, it returns the client-go identifier ("")
|
||||
// for all-namespaces. Otherwise, it returns the given namespace.
|
||||
|
@ -761,6 +840,36 @@ func gatewayStatusEqual(statusA, statusB gatev1.GatewayStatus) bool {
|
|||
conditionsEqual(statusA.Conditions, statusB.Conditions)
|
||||
}
|
||||
|
||||
func policyAncestorStatusesEqual(policyAncestorStatusesA, policyAncestorStatusesB []gatev1alpha2.PolicyAncestorStatus) bool {
|
||||
if len(policyAncestorStatusesA) != len(policyAncestorStatusesB) {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, sA := range policyAncestorStatusesA {
|
||||
if !slices.ContainsFunc(policyAncestorStatusesB, func(sB gatev1alpha2.PolicyAncestorStatus) bool {
|
||||
return policyAncestorStatusEqual(sB, sA)
|
||||
}) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for _, sB := range policyAncestorStatusesB {
|
||||
if !slices.ContainsFunc(policyAncestorStatusesA, func(sA gatev1alpha2.PolicyAncestorStatus) bool {
|
||||
return policyAncestorStatusEqual(sA, sB)
|
||||
}) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func policyAncestorStatusEqual(sA, sB gatev1alpha2.PolicyAncestorStatus) bool {
|
||||
return sA.ControllerName == sB.ControllerName &&
|
||||
reflect.DeepEqual(sA.AncestorRef, sB.AncestorRef) &&
|
||||
conditionsEqual(sA.Conditions, sB.Conditions)
|
||||
}
|
||||
|
||||
func routeParentStatusesEqual(routeParentStatusesA, routeParentStatusesB []gatev1alpha2.RouteParentStatus) bool {
|
||||
if len(routeParentStatusesA) != len(routeParentStatusesB) {
|
||||
return false
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue