1
0
Fork 0

Bump kubernetes/client-go

This commit is contained in:
Kim Min 2018-02-14 16:56:04 +08:00 committed by Traefiker Bot
parent 029fa83690
commit 83a92596c3
901 changed files with 169303 additions and 306433 deletions

View file

@ -20,9 +20,10 @@ import (
"sync"
"time"
"k8s.io/client-go/pkg/runtime"
utilruntime "k8s.io/client-go/pkg/util/runtime"
"k8s.io/client-go/pkg/util/wait"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
// Config contains all the settings for a Controller.
@ -50,6 +51,11 @@ type Config struct {
// queue.
FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
@ -57,26 +63,33 @@ type Config struct {
RetryOnError bool
}
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework.
type Controller struct {
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
// TODO make the "Controller" private, and convert all references to use ControllerInterface instead
type ControllerInterface interface {
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
// New makes a new Controller from the given Config.
func New(c *Config) *Controller {
ctlr := &Controller{
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
@ -84,37 +97,43 @@ func New(c *Config) *Controller {
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
// Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool {
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
// Requeue adds the provided object back into the queue if it does not already exist.
func (c *Controller) Requeue(obj interface{}) error {
return c.config.Queue.AddIfNotPresent(Deltas{
Delta{
Type: Sync,
Object: obj,
},
})
func (c *controller) LastSyncResourceVersion() string {
if c.reflector == nil {
return ""
}
return c.reflector.LastSyncResourceVersion()
}
// processLoop drains the work queue.
@ -126,10 +145,13 @@ func (c *Controller) Requeue(obj interface{}) error {
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *Controller) processLoop() {
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
@ -188,6 +210,47 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
}
}
// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler ResourceEventHandler
}
// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
}
// OnUpdate ensures the proper handler is called depending on whether the filter matches
func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
newer := r.FilterFunc(newObj)
older := r.FilterFunc(oldObj)
switch {
case newer && older:
r.Handler.OnUpdate(oldObj, newObj)
case newer && !older:
r.Handler.OnAdd(newObj)
case !newer && older:
r.Handler.OnDelete(oldObj)
default:
// do nothing
}
}
// OnDelete calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnDelete(obj)
}
// DeletionHandlingMetaNamespaceKeyFunc checks for
// DeletedFinalStateUnknown objects before calling
// MetaNamespaceKeyFunc.
@ -218,7 +281,7 @@ func NewInformer(
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, *Controller) {
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
@ -277,6 +340,7 @@ func NewInformer(
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// * h is the object you want notifications sent to.
// * indexers is the indexer for the received object type.
//
func NewIndexerInformer(
lw ListerWatcher,
@ -284,7 +348,7 @@ func NewIndexerInformer(
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, *Controller) {
) (Indexer, Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)