1
0
Fork 0

Fix route attachments to gateways

Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
Kevin Pollet 2024-05-28 14:30:04 +02:00 committed by GitHub
parent 6e61fe0de1
commit e9bd2b45ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 2243 additions and 2498 deletions

View file

@ -32,11 +32,11 @@ type resourceEventHandler struct {
ev chan<- interface{}
}
func (reh *resourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
func (reh *resourceEventHandler) OnAdd(obj interface{}, _ bool) {
eventHandlerFunc(reh.ev, obj)
}
func (reh *resourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
func (reh *resourceEventHandler) OnUpdate(_, newObj interface{}) {
eventHandlerFunc(reh.ev, newObj)
}
@ -49,19 +49,21 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
// The stores can then be accessed via the Get* functions.
type Client interface {
WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error)
GetGatewayClasses() ([]*gatev1.GatewayClass, error)
UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error
UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error
UpdateHTTPRouteStatus(ctx context.Context, gateway *gatev1.Gateway, nsName ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error
GetGateways() []*gatev1.Gateway
GetHTTPRoutes(namespaces []string) ([]*gatev1.HTTPRoute, error)
GetTCPRoutes(namespaces []string) ([]*gatev1alpha2.TCPRoute, error)
GetTLSRoutes(namespaces []string) ([]*gatev1alpha2.TLSRoute, error)
GetReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error)
UpdateHTTPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error
UpdateTCPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TCPRouteStatus) error
UpdateTLSRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TLSRouteStatus) error
ListGatewayClasses() ([]*gatev1.GatewayClass, error)
ListGateways() []*gatev1.Gateway
ListHTTPRoutes() ([]*gatev1.HTTPRoute, error)
ListTCPRoutes() ([]*gatev1alpha2.TCPRoute, error)
ListTLSRoutes() ([]*gatev1alpha2.TLSRoute, error)
ListNamespaces(selector labels.Selector) ([]string, error)
ListReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error)
GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
GetNamespaces(selector labels.Selector) ([]string, error)
}
type clientWrapper struct {
@ -280,7 +282,7 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
return eventCh, nil
}
func (c *clientWrapper) GetNamespaces(selector labels.Selector) ([]string, error) {
func (c *clientWrapper) ListNamespaces(selector labels.Selector) ([]string, error) {
ns, err := c.factoryNamespace.Core().V1().Namespaces().Lister().List(selector)
if err != nil {
return nil, err
@ -297,22 +299,12 @@ func (c *clientWrapper) GetNamespaces(selector labels.Selector) ([]string, error
return namespaces, nil
}
func (c *clientWrapper) GetHTTPRoutes(namespaces []string) ([]*gatev1.HTTPRoute, error) {
func (c *clientWrapper) ListHTTPRoutes() ([]*gatev1.HTTPRoute, error) {
var httpRoutes []*gatev1.HTTPRoute
for _, namespace := range namespaces {
if !c.isWatchedNamespace(namespace) {
log.Warn().Msgf("Failed to get HTTPRoutes: %q is not within watched namespaces", namespace)
continue
}
for _, namespace := range c.watchedNamespaces {
routes, err := c.factoriesGateway[c.lookupNamespace(namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(namespace).List(labels.Everything())
if err != nil {
return nil, err
}
if len(routes) == 0 {
log.Debug().Msgf("No HTTPRoutes found in namespace %q", namespace)
continue
return nil, fmt.Errorf("listing HTTP routes in namespace %s", namespace)
}
httpRoutes = append(httpRoutes, routes...)
@ -321,53 +313,35 @@ func (c *clientWrapper) GetHTTPRoutes(namespaces []string) ([]*gatev1.HTTPRoute,
return httpRoutes, nil
}
func (c *clientWrapper) GetTCPRoutes(namespaces []string) ([]*gatev1alpha2.TCPRoute, error) {
func (c *clientWrapper) ListTCPRoutes() ([]*gatev1alpha2.TCPRoute, error) {
var tcpRoutes []*gatev1alpha2.TCPRoute
for _, namespace := range namespaces {
if !c.isWatchedNamespace(namespace) {
log.Warn().Msgf("Failed to get TCPRoutes: %q is not within watched namespaces", namespace)
continue
}
for _, namespace := range c.watchedNamespaces {
routes, err := c.factoriesGateway[c.lookupNamespace(namespace)].Gateway().V1alpha2().TCPRoutes().Lister().TCPRoutes(namespace).List(labels.Everything())
if err != nil {
return nil, err
}
if len(routes) == 0 {
log.Debug().Msgf("No TCPRoutes found in namespace %q", namespace)
continue
return nil, fmt.Errorf("listing TCP routes in namespace %s", namespace)
}
tcpRoutes = append(tcpRoutes, routes...)
}
return tcpRoutes, nil
}
func (c *clientWrapper) GetTLSRoutes(namespaces []string) ([]*gatev1alpha2.TLSRoute, error) {
func (c *clientWrapper) ListTLSRoutes() ([]*gatev1alpha2.TLSRoute, error) {
var tlsRoutes []*gatev1alpha2.TLSRoute
for _, namespace := range namespaces {
if !c.isWatchedNamespace(namespace) {
log.Warn().Msgf("Failed to get TLSRoutes: %q is not within watched namespaces", namespace)
continue
}
for _, namespace := range c.watchedNamespaces {
routes, err := c.factoriesGateway[c.lookupNamespace(namespace)].Gateway().V1alpha2().TLSRoutes().Lister().TLSRoutes(namespace).List(labels.Everything())
if err != nil {
return nil, err
}
if len(routes) == 0 {
log.Debug().Msgf("No TLSRoutes found in namespace %q", namespace)
continue
return nil, fmt.Errorf("listing TLS routes in namespace %s", namespace)
}
tlsRoutes = append(tlsRoutes, routes...)
}
return tlsRoutes, nil
}
func (c *clientWrapper) GetReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error) {
func (c *clientWrapper) ListReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error) {
if !c.isWatchedNamespace(namespace) {
log.Warn().Msgf("Failed to get ReferenceGrants: %q is not within watched namespaces", namespace)
@ -382,7 +356,7 @@ func (c *clientWrapper) GetReferenceGrants(namespace string) ([]*gatev1beta1.Ref
return referenceGrants, nil
}
func (c *clientWrapper) GetGateways() []*gatev1.Gateway {
func (c *clientWrapper) ListGateways() []*gatev1.Gateway {
var result []*gatev1.Gateway
for ns, factory := range c.factoriesGateway {
@ -397,7 +371,7 @@ func (c *clientWrapper) GetGateways() []*gatev1.Gateway {
return result
}
func (c *clientWrapper) GetGatewayClasses() ([]*gatev1.GatewayClass, error) {
func (c *clientWrapper) ListGatewayClasses() ([]*gatev1.GatewayClass, error) {
return c.factoryGatewayClass.Gateway().V1().GatewayClasses().Lister().List(labels.Everything())
}
@ -437,7 +411,7 @@ func (c *clientWrapper) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStat
return fmt.Errorf("cannot update Gateway status %s/%s: namespace is not within watched namespaces", gateway.Namespace, gateway.Name)
}
if statusEquals(gateway.Status, gatewayStatus) {
if gatewayStatusEquals(gateway.Status, gatewayStatus) {
return nil
}
@ -455,89 +429,106 @@ func (c *clientWrapper) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStat
return nil
}
func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, gateway *gatev1.Gateway, nsName ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error {
if !c.isWatchedNamespace(nsName.Namespace) {
return fmt.Errorf("updating HTTPRoute status %s/%s: namespace is not within watched namespaces", nsName.Namespace, nsName.Name)
func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error {
if !c.isWatchedNamespace(route.Namespace) {
return fmt.Errorf("updating HTTPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
}
route, err := c.factoriesGateway[c.lookupNamespace(nsName.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(nsName.Namespace).Get(nsName.Name)
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(route.Namespace).Get(route.Name)
if err != nil {
return fmt.Errorf("getting HTTPRoute %s/%s: %w", nsName.Namespace, nsName.Name, err)
return fmt.Errorf("getting HTTPRoute %s/%s: %w", route.Namespace, route.Name, err)
}
var statuses []gatev1.RouteParentStatus
for _, status := range route.Status.Parents {
if status.ControllerName != controllerName {
statuses = append(statuses, status)
continue
}
if status.ParentRef.Namespace != nil && string(*status.ParentRef.Namespace) != gateway.Namespace {
statuses = append(statuses, status)
continue
}
if string(status.ParentRef.Name) != gateway.Name {
statuses = append(statuses, status)
// TODO: keep statuses for gateways managed by other Traefik instances.
var parentStatuses []gatev1.RouteParentStatus
for _, currentParentStatus := range currentRoute.Status.Parents {
if currentParentStatus.ControllerName != controllerName {
parentStatuses = append(parentStatuses, currentParentStatus)
continue
}
}
statuses = append(statuses, status.Parents...)
route = route.DeepCopy()
route.Status = gatev1.HTTPRouteStatus{
parentStatuses = append(parentStatuses, status.Parents...)
currentRoute = currentRoute.DeepCopy()
currentRoute.Status = gatev1.HTTPRouteStatus{
RouteStatus: gatev1.RouteStatus{
Parents: statuses,
Parents: parentStatuses,
},
}
if _, err := c.csGateway.GatewayV1().HTTPRoutes(nsName.Namespace).UpdateStatus(ctx, route, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("updating HTTPRoute %s/%s status: %w", nsName.Namespace, nsName.Name, err)
if _, err := c.csGateway.GatewayV1().HTTPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("updating HTTPRoute %s/%s status: %w", route.Namespace, route.Name, err)
}
return nil
}
func statusEquals(oldStatus, newStatus gatev1.GatewayStatus) bool {
if len(oldStatus.Listeners) != len(newStatus.Listeners) {
return false
func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TCPRouteStatus) error {
if !c.isWatchedNamespace(route.Namespace) {
return fmt.Errorf("updating TCPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
}
if !conditionsEquals(oldStatus.Conditions, newStatus.Conditions) {
return false
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TCPRoutes().Lister().TCPRoutes(route.Namespace).Get(route.Name)
if err != nil {
return fmt.Errorf("getting TCPRoute %s/%s: %w", route.Namespace, route.Name, err)
}
listenerMatches := 0
for _, newListener := range newStatus.Listeners {
for _, oldListener := range oldStatus.Listeners {
if newListener.Name == oldListener.Name {
if !conditionsEquals(newListener.Conditions, oldListener.Conditions) {
return false
}
listenerMatches++
}
// TODO: keep statuses for gateways managed by other Traefik instances.
var parentStatuses []gatev1alpha2.RouteParentStatus
for _, currentParentStatus := range currentRoute.Status.Parents {
if currentParentStatus.ControllerName != controllerName {
parentStatuses = append(parentStatuses, currentParentStatus)
continue
}
}
return listenerMatches == len(oldStatus.Listeners)
parentStatuses = append(parentStatuses, status.Parents...)
currentRoute = currentRoute.DeepCopy()
currentRoute.Status = gatev1alpha2.TCPRouteStatus{
RouteStatus: gatev1.RouteStatus{
Parents: parentStatuses,
},
}
if _, err := c.csGateway.GatewayV1alpha2().TCPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("updating TCPRoute %s/%s status: %w", route.Namespace, route.Name, err)
}
return nil
}
func conditionsEquals(conditionsA, conditionsB []metav1.Condition) bool {
if len(conditionsA) != len(conditionsB) {
return false
func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TLSRouteStatus) error {
if !c.isWatchedNamespace(route.Namespace) {
return fmt.Errorf("updating TLSRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
}
conditionMatches := 0
for _, conditionA := range conditionsA {
for _, conditionB := range conditionsB {
if conditionA.Type == conditionB.Type {
if conditionA.Reason != conditionB.Reason || conditionA.Status != conditionB.Status || conditionA.Message != conditionB.Message || conditionA.ObservedGeneration != conditionB.ObservedGeneration {
return false
}
conditionMatches++
}
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TLSRoutes().Lister().TLSRoutes(route.Namespace).Get(route.Name)
if err != nil {
return fmt.Errorf("getting TLSRoute %s/%s: %w", route.Namespace, route.Name, err)
}
// TODO: keep statuses for gateways managed by other Traefik instances.
var parentStatuses []gatev1alpha2.RouteParentStatus
for _, currentParentStatus := range currentRoute.Status.Parents {
if currentParentStatus.ControllerName != controllerName {
parentStatuses = append(parentStatuses, currentParentStatus)
continue
}
}
return conditionMatches == len(conditionsA)
parentStatuses = append(parentStatuses, status.Parents...)
currentRoute = currentRoute.DeepCopy()
currentRoute.Status = gatev1alpha2.TLSRouteStatus{
RouteStatus: gatev1.RouteStatus{
Parents: parentStatuses,
},
}
if _, err := c.csGateway.GatewayV1alpha2().TLSRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("updating TLSRoute %s/%s status: %w", route.Namespace, route.Name, err)
}
return nil
}
// GetService returns the named service from the given namespace.
@ -582,11 +573,21 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool,
// The distinction is necessary because we index all informers on the special
// identifier iff all-namespaces are requested but receive specific namespace
// identifiers from the Kubernetes API, so we have to bridge this gap.
func (c *clientWrapper) lookupNamespace(ns string) string {
func (c *clientWrapper) lookupNamespace(namespace string) string {
if c.isNamespaceAll {
return metav1.NamespaceAll
}
return ns
return namespace
}
// isWatchedNamespace checks to ensure that the namespace is being watched before we request
// it to ensure we don't panic by requesting an out-of-watch object.
func (c *clientWrapper) isWatchedNamespace(namespace string) bool {
if c.isNamespaceAll {
return true
}
return slices.Contains(c.watchedNamespaces, namespace)
}
// eventHandlerFunc will pass the obj on to the events channel or drop it.
@ -608,12 +609,51 @@ func translateNotFoundError(err error) (bool, error) {
return err == nil, err
}
// isWatchedNamespace checks to ensure that the namespace is being watched before we request
// it to ensure we don't panic by requesting an out-of-watch object.
func (c *clientWrapper) isWatchedNamespace(ns string) bool {
if c.isNamespaceAll {
return true
func gatewayStatusEquals(statusA, statusB gatev1.GatewayStatus) bool {
if len(statusA.Listeners) != len(statusB.Listeners) {
return false
}
return slices.Contains(c.watchedNamespaces, ns)
if !conditionsEquals(statusA.Conditions, statusB.Conditions) {
return false
}
listenerMatches := 0
for _, newListener := range statusB.Listeners {
for _, oldListener := range statusA.Listeners {
if newListener.Name == oldListener.Name {
if !conditionsEquals(newListener.Conditions, oldListener.Conditions) {
return false
}
if newListener.AttachedRoutes != oldListener.AttachedRoutes {
return false
}
listenerMatches++
}
}
}
return listenerMatches == len(statusA.Listeners)
}
func conditionsEquals(conditionsA, conditionsB []metav1.Condition) bool {
if len(conditionsA) != len(conditionsB) {
return false
}
conditionMatches := 0
for _, conditionA := range conditionsA {
for _, conditionB := range conditionsB {
if conditionA.Type == conditionB.Type {
if conditionA.Reason != conditionB.Reason || conditionA.Status != conditionB.Status || conditionA.Message != conditionB.Message || conditionA.ObservedGeneration != conditionB.ObservedGeneration {
return false
}
conditionMatches++
}
}
}
return conditionMatches == len(conditionsA)
}