[kubernetes] ignore empty endpoint changes
This commit is contained in:
parent
0624cefc10
commit
ab71dad51a
5 changed files with 707 additions and 54 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/k8s"
|
||||
traefikversion "github.com/traefik/traefik/v2/pkg/version"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
networkingv1 "k8s.io/api/networking/v1"
|
||||
|
@ -34,22 +35,6 @@ type marshaler interface {
|
|||
Marshal() ([]byte, error)
|
||||
}
|
||||
|
||||
type resourceEventHandler struct {
|
||||
ev chan<- interface{}
|
||||
}
|
||||
|
||||
func (reh *resourceEventHandler) OnAdd(obj interface{}) {
|
||||
eventHandlerFunc(reh.ev, obj)
|
||||
}
|
||||
|
||||
func (reh *resourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
|
||||
eventHandlerFunc(reh.ev, newObj)
|
||||
}
|
||||
|
||||
func (reh *resourceEventHandler) OnDelete(obj interface{}) {
|
||||
eventHandlerFunc(reh.ev, obj)
|
||||
}
|
||||
|
||||
// Client is a client for the Provider master.
|
||||
// WatchAll starts the watch of the Provider resources and updates the stores.
|
||||
// The stores can then be accessed via the Get* functions.
|
||||
|
@ -151,7 +136,7 @@ func newClientImpl(clientset kubernetes.Interface) *clientWrapper {
|
|||
// WatchAll starts namespace-specific controllers for all relevant kinds.
|
||||
func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) {
|
||||
eventCh := make(chan interface{}, 1)
|
||||
eventHandler := &resourceEventHandler{eventCh}
|
||||
eventHandler := &k8s.ResourceEventHandler{Ev: eventCh}
|
||||
|
||||
if len(namespaces) == 0 {
|
||||
namespaces = []string{metav1.NamespaceAll}
|
||||
|
@ -508,16 +493,6 @@ func (c *clientWrapper) GetServerVersion() (*version.Version, error) {
|
|||
return version.NewVersion(serverVersion.GitVersion)
|
||||
}
|
||||
|
||||
// eventHandlerFunc will pass the obj on to the events channel or drop it.
|
||||
// This is so passing the events along won't block in the case of high volume.
|
||||
// The events are only used for signaling anyway so dropping a few is ok.
|
||||
func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
|
||||
select {
|
||||
case events <- obj:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// translateNotFoundError will translate a "not found" error to a boolean return
|
||||
// value which indicates if the resource exists and a nil error.
|
||||
func translateNotFoundError(err error) (bool, error) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package ingress
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -190,6 +191,105 @@ func TestClientIgnoresHelmOwnedSecrets(t *testing.T) {
|
|||
assert.False(t, found)
|
||||
}
|
||||
|
||||
func TestClientIgnoresEmptyEndpointUpdates(t *testing.T) {
|
||||
emptyEndpoint := &corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "empty-endpoint",
|
||||
Namespace: "test",
|
||||
ResourceVersion: "1244",
|
||||
Annotations: map[string]string{
|
||||
"test-annotation": "_",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
filledEndpoint := &corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "filled-endpoint",
|
||||
Namespace: "test",
|
||||
ResourceVersion: "1234",
|
||||
},
|
||||
Subsets: []corev1.EndpointSubset{{
|
||||
Addresses: []corev1.EndpointAddress{{
|
||||
IP: "10.13.37.1",
|
||||
}},
|
||||
Ports: []corev1.EndpointPort{{
|
||||
Name: "testing",
|
||||
Port: 1337,
|
||||
Protocol: "tcp",
|
||||
}},
|
||||
}},
|
||||
}
|
||||
|
||||
kubeClient := kubefake.NewSimpleClientset(emptyEndpoint, filledEndpoint)
|
||||
|
||||
discovery, _ := kubeClient.Discovery().(*fakediscovery.FakeDiscovery)
|
||||
discovery.FakedServerVersion = &version.Info{
|
||||
GitVersion: "v1.19",
|
||||
}
|
||||
|
||||
client := newClientImpl(kubeClient)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
eventCh, err := client.WatchAll(nil, stopCh)
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
ep, ok := event.(*corev1.Endpoints)
|
||||
require.True(t, ok)
|
||||
|
||||
assert.True(t, ep.Name == "empty-endpoint" || ep.Name == "filled-endpoint")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
assert.Fail(t, "expected to receive event for endpoints")
|
||||
}
|
||||
|
||||
emptyEndpoint, err = kubeClient.CoreV1().Endpoints("test").Get(context.TODO(), "empty-endpoint", metav1.GetOptions{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Update endpoint annotation and resource version (apparently not done by fake client itself)
|
||||
// to show an update that should not trigger an update event on our eventCh.
|
||||
// This reflects the behavior of kubernetes controllers which use endpoint annotations for leader election.
|
||||
emptyEndpoint.Annotations["test-annotation"] = "___"
|
||||
emptyEndpoint.ResourceVersion = "1245"
|
||||
_, err = kubeClient.CoreV1().Endpoints("test").Update(context.TODO(), emptyEndpoint, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
ep, ok := event.(*corev1.Endpoints)
|
||||
require.True(t, ok)
|
||||
|
||||
assert.Fail(t, "didn't expect to receive event for empty endpoint update", ep.Name)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
|
||||
filledEndpoint, err = kubeClient.CoreV1().Endpoints("test").Get(context.TODO(), "filled-endpoint", metav1.GetOptions{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
filledEndpoint.Subsets[0].Addresses[0].IP = "10.13.37.2"
|
||||
filledEndpoint.ResourceVersion = "1235"
|
||||
_, err = kubeClient.CoreV1().Endpoints("test").Update(context.TODO(), filledEndpoint, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
ep, ok := event.(*corev1.Endpoints)
|
||||
require.True(t, ok)
|
||||
|
||||
assert.Equal(t, "filled-endpoint", ep.Name)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
assert.Fail(t, "expected to receive event for filled endpoint")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-eventCh:
|
||||
assert.Fail(t, "received more than one event")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientUsesCorrectServerVersion(t *testing.T) {
|
||||
ingressV1Beta := &v1beta1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue