Introduce k8s informer factory

This commit is contained in:
Kim Min 2018-03-22 17:14:04 +08:00 committed by Traefiker Bot
parent 007a1fc7f2
commit f94fa78565
214 changed files with 16048 additions and 114 deletions

View file

@ -4,17 +4,14 @@ import (
"errors"
"fmt"
"io/ioutil"
"sync"
"time"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/log"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -22,13 +19,6 @@ import (
const resyncPeriod = 10 * time.Minute
const (
kindIngresses = "ingresses"
kindServices = "services"
kindEndpoints = "endpoints"
kindSecrets = "secrets"
)
type resourceEventHandler struct {
ev chan<- interface{}
}
@ -45,18 +35,6 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
eventHandlerFunc(reh.ev, obj)
}
type informerManager struct {
informers []cache.SharedInformer
syncFuncs []cache.InformerSynced
}
func (im *informerManager) extend(informer cache.SharedInformer, withSyncFunc bool) {
im.informers = append(im.informers, informer)
if withSyncFunc {
im.syncFuncs = append(im.syncFuncs, informer.HasSynced)
}
}
// Client is a client for the Provider master.
// WatchAll starts the watch of the Provider resources and updates the stores.
// The stores can then be accessed via the Get* functions.
@ -70,20 +48,14 @@ type Client interface {
type clientImpl struct {
clientset *kubernetes.Clientset
ingStores []cache.Store
svcStores map[string]cache.Store
epStores map[string]cache.Store
secStores map[string]cache.Store
factories map[string]informers.SharedInformerFactory
isNamespaceAll bool
}
func newClientImpl(clientset *kubernetes.Clientset) Client {
return &clientImpl{
clientset: clientset,
ingStores: []cache.Store{},
svcStores: map[string]cache.Store{},
epStores: map[string]cache.Store{},
secStores: map[string]cache.Store{},
factories: make(map[string]informers.SharedInformerFactory),
}
}
@ -140,7 +112,7 @@ func createClientFromConfig(c *rest.Config) (Client, error) {
func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
eventCh := make(chan interface{}, 1)
kubeLabelSelector, err := labels.Parse(labelSelector)
_, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
}
@ -150,129 +122,83 @@ func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopC
c.isNamespaceAll = true
}
var informManager informerManager
eventHandler := newResourceEventHandler(eventCh)
for _, ns := range namespaces {
ns := ns
informManager.extend(c.WatchIngresses(ns, kubeLabelSelector, eventCh), true)
informManager.extend(c.WatchObjects(ns, kindServices, &corev1.Service{}, c.svcStores, eventCh), true)
informManager.extend(c.WatchObjects(ns, kindEndpoints, &corev1.Endpoints{}, c.epStores, eventCh), true)
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
informManager.extend(c.WatchObjects(ns, kindSecrets, &corev1.Secret{}, c.secStores, eventCh), false)
}
var wg sync.WaitGroup
for _, informer := range informManager.informers {
informer := informer
safe.Go(func() {
wg.Add(1)
informer.Run(stopCh)
wg.Done()
factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, func(opts *metav1.ListOptions) {
opts.LabelSelector = labelSelector
})
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
c.factories[ns] = factory
}
if !cache.WaitForCacheSync(stopCh, informManager.syncFuncs...) {
return nil, fmt.Errorf("timed out waiting for controller caches to sync")
for _, ns := range namespaces {
c.factories[ns].Start(stopCh)
}
safe.Go(func() {
<-stopCh
wg.Wait()
close(eventCh)
})
for _, ns := range namespaces {
for t, ok := range c.factories[ns].WaitForCacheSync(stopCh) {
if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", t.String(), ns)
}
}
}
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
for _, ns := range namespaces {
c.factories[ns].Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factories[ns].Start(stopCh)
}
return eventCh, nil
}
// WatchIngresses sets up a watch on Ingress objects and returns a corresponding shared informer.
func (c *clientImpl) WatchIngresses(namespace string, labelSelector labels.Selector, watchCh chan<- interface{}) cache.SharedInformer {
listOptions := metav1.ListOptions{
LabelSelector: labelSelector.String(),
FieldSelector: fields.Everything().String(),
}
informer := loadInformer(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return c.clientset.ExtensionsV1beta1().Ingresses(namespace).List(listOptions)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.clientset.ExtensionsV1beta1().Ingresses(namespace).Watch(listOptions)
},
}, &extensionsv1beta1.Ingress{}, watchCh)
c.ingStores = append(c.ingStores, informer.GetStore())
return informer
}
// WatchObjects sets up a watch on objects and returns a corresponding shared informer.
func (c *clientImpl) WatchObjects(namespace, kind string, object runtime.Object, storeMap map[string]cache.Store, watchCh chan<- interface{}) cache.SharedInformer {
listWatch := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
kind,
namespace,
fields.Everything())
informer := loadInformer(listWatch, object, watchCh)
storeMap[namespace] = informer.GetStore()
return informer
}
func loadInformer(listWatch cache.ListerWatcher, object runtime.Object, watchCh chan<- interface{}) cache.SharedInformer {
informer := cache.NewSharedInformer(
listWatch,
object,
resyncPeriod,
)
informer.AddEventHandler(newResourceEventHandler(watchCh))
return informer
}
// GetIngresses returns all Ingresses for observed namespaces in the cluster.
func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress {
var result []*extensionsv1beta1.Ingress
for _, store := range c.ingStores {
for _, obj := range store.List() {
ing := obj.(*extensionsv1beta1.Ingress)
for ns, factory := range c.factories {
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything())
if err != nil {
log.Errorf("Failed to list ingresses in namespace %s: %s", ns, err)
}
for _, ing := range ings {
result = append(result, ing)
}
}
return result
}
// GetService returns the named service from the given namespace.
func (c *clientImpl) GetService(namespace, name string) (*corev1.Service, bool, error) {
var service *corev1.Service
item, exists, err := c.svcStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
item, exists, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Services().Informer().GetStore().GetByKey(namespace + "/" + name)
if item != nil {
service = item.(*corev1.Service)
}
return service, exists, err
}
// GetEndpoints returns the named endpoints from the given namespace.
func (c *clientImpl) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
var endpoint *corev1.Endpoints
item, exists, err := c.epStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
item, exists, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Endpoints().Informer().GetStore().GetByKey(namespace + "/" + name)
if item != nil {
endpoint = item.(*corev1.Endpoints)
}
return endpoint, exists, err
}
// GetSecret returns the named secret from the given namespace.
func (c *clientImpl) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
var secret *corev1.Secret
item, exists, err := c.secStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
item, exists, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Secrets().Informer().GetStore().GetByKey(namespace + "/" + name)
if err == nil && item != nil {
secret = item.(*corev1.Secret)
}
return secret, exists, err
}