Custom resource definition
Co-authored-by: Mathieu Lonjaret <mathieu.lonjaret@gmail.com>
This commit is contained in:
parent
cfaf47c8a2
commit
4c060a78cc
1348 changed files with 92364 additions and 55766 deletions
4
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
4
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
|
@ -288,7 +288,7 @@ func NewInformer(
|
|||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
@ -355,7 +355,7 @@ func NewIndexerInformer(
|
|||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
|
84
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
84
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
|
@ -31,11 +31,6 @@ import (
|
|||
// keyFunc is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
|
||||
//
|
||||
// 'compressor' may compress as many or as few items as it wants
|
||||
// (including returning an empty slice), but it should do what it
|
||||
// does quickly since it is called while the queue is locked.
|
||||
// 'compressor' may be nil if you don't want any delta compression.
|
||||
//
|
||||
// 'keyLister' is expected to return a list of keys that the consumer of
|
||||
// this queue "knows about". It is used to decide which items are missing
|
||||
// when Replace() is called; 'Deleted' deltas are produced for these items.
|
||||
|
@ -43,18 +38,30 @@ import (
|
|||
// TODO: consider merging keyLister with this object, tracking a list of
|
||||
// "known" keys when Pop() is called. Have to think about how that
|
||||
// affects error retrying.
|
||||
// TODO(lavalamp): I believe there is a possible race only when using an
|
||||
// external known object source that the above TODO would
|
||||
// fix.
|
||||
// NOTE: It is possible to misuse this and cause a race when using an
|
||||
// external known object source.
|
||||
// Whether there is a potential race depends on how the comsumer
|
||||
// modifies knownObjects. In Pop(), process function is called under
|
||||
// lock, so it is safe to update data structures in it that need to be
|
||||
// in sync with the queue (e.g. knownObjects).
|
||||
//
|
||||
// Example:
|
||||
// In case of sharedIndexInformer being a consumer
|
||||
// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
|
||||
// src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
|
||||
// there is no race as knownObjects (s.indexer) is modified safely
|
||||
// under DeltaFIFO's lock. The only exceptions are GetStore() and
|
||||
// GetIndexer() methods, which expose ways to modify the underlying
|
||||
// storage. Currently these two methods are used for creating Lister
|
||||
// and internal tests.
|
||||
//
|
||||
// Also see the comment on DeltaFIFO.
|
||||
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
|
||||
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
|
||||
f := &DeltaFIFO{
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
deltaCompressor: compressor,
|
||||
knownObjects: knownObjects,
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
knownObjects: knownObjects,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
|
@ -86,9 +93,6 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyL
|
|||
// items have been deleted when Replace() or Delete() are called. The deleted
|
||||
// object will be included in the DeleteFinalStateUnknown markers. These objects
|
||||
// could be stale.
|
||||
//
|
||||
// You may provide a function to compress deltas (e.g., represent a
|
||||
// series of Updates as a single Update).
|
||||
type DeltaFIFO struct {
|
||||
// lock/cond protects access to 'items' and 'queue'.
|
||||
lock sync.RWMutex
|
||||
|
@ -110,10 +114,6 @@ type DeltaFIFO struct {
|
|||
// insertion and retrieval, and should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
|
||||
// deltaCompressor tells us how to combine two or more
|
||||
// deltas. It may be nil.
|
||||
deltaCompressor DeltaCompressor
|
||||
|
||||
// knownObjects list keys that are "known", for the
|
||||
// purpose of figuring out which items have been deleted
|
||||
// when Replace() or Delete() is called.
|
||||
|
@ -133,7 +133,6 @@ var (
|
|||
var (
|
||||
// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
|
||||
// object with zero length is encountered (should be impossible,
|
||||
// even if such an object is accidentally produced by a DeltaCompressor--
|
||||
// but included for completeness).
|
||||
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
|
||||
)
|
||||
|
@ -213,8 +212,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
|||
if err == nil && !exists && !itemsExist {
|
||||
// Presumably, this was deleted when a relist happened.
|
||||
// Don't provide a second report of the same deletion.
|
||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||
// with knownObjects.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -305,8 +302,8 @@ func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
|||
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
||||
}
|
||||
|
||||
// queueActionLocked appends to the delta list for the object, calling
|
||||
// f.deltaCompressor if needed. Caller must lock first.
|
||||
// queueActionLocked appends to the delta list for the object.
|
||||
// Caller must lock first.
|
||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||
id, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
|
@ -322,9 +319,6 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
if f.deltaCompressor != nil {
|
||||
newDeltas = f.deltaCompressor.Compress(newDeltas)
|
||||
}
|
||||
|
||||
_, exists := f.items[id]
|
||||
if len(newDeltas) > 0 {
|
||||
|
@ -334,8 +328,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
f.items[id] = newDeltas
|
||||
f.cond.Broadcast()
|
||||
} else if exists {
|
||||
// The compression step removed all deltas, so
|
||||
// we need to remove this from our map (extra items
|
||||
// We need to remove this from our map (extra items
|
||||
// in the queue are ignored if they are not in the
|
||||
// map).
|
||||
delete(f.items, id)
|
||||
|
@ -355,8 +348,8 @@ func (f *DeltaFIFO) List() []interface{} {
|
|||
func (f *DeltaFIFO) listLocked() []interface{} {
|
||||
list := make([]interface{}, 0, len(f.items))
|
||||
for _, item := range f.items {
|
||||
// Copy item's slice so operations on this slice (delta
|
||||
// compression) won't interfere with the object we return.
|
||||
// Copy item's slice so operations on this slice
|
||||
// won't interfere with the object we return.
|
||||
item = copyDeltas(item)
|
||||
list = append(list, item.Newest().Object)
|
||||
}
|
||||
|
@ -394,8 +387,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
|
|||
defer f.lock.RUnlock()
|
||||
d, exists := f.items[key]
|
||||
if exists {
|
||||
// Copy item's slice so operations on this slice (delta
|
||||
// compression) won't interfere with the object we return.
|
||||
// Copy item's slice so operations on this slice
|
||||
// won't interfere with the object we return.
|
||||
d = copyDeltas(d)
|
||||
}
|
||||
return d, exists, nil
|
||||
|
@ -503,8 +496,6 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|||
}
|
||||
|
||||
// Detect deletions not already in the queue.
|
||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||
// with knownObjects. Unproven.
|
||||
knownKeys := f.knownObjects.ListKeys()
|
||||
queuedDeletions := 0
|
||||
for _, k := range knownKeys {
|
||||
|
@ -603,23 +594,6 @@ type KeyGetter interface {
|
|||
GetByKey(key string) (interface{}, bool, error)
|
||||
}
|
||||
|
||||
// DeltaCompressor is an algorithm that removes redundant changes.
|
||||
type DeltaCompressor interface {
|
||||
Compress(Deltas) Deltas
|
||||
}
|
||||
|
||||
// DeltaCompressorFunc should remove redundant changes; but changes that
|
||||
// are redundant depend on one's desired semantics, so this is an
|
||||
// injectable function.
|
||||
//
|
||||
// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
|
||||
type DeltaCompressorFunc func(Deltas) Deltas
|
||||
|
||||
// Compress just calls dc.
|
||||
func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
|
||||
return dc(d)
|
||||
}
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
|
@ -668,7 +642,7 @@ func (d Deltas) Newest() *Delta {
|
|||
|
||||
// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
|
||||
// the objects in the slice. This allows Get/List to return an object that we
|
||||
// know won't be clobbered by a subsequent call to a delta compressor.
|
||||
// know won't be clobbered by a subsequent modifications.
|
||||
func copyDeltas(d Deltas) Deltas {
|
||||
d2 := make(Deltas, len(d))
|
||||
copy(d2, d)
|
||||
|
|
2
vendor/k8s.io/client-go/tools/cache/fifo.go
generated
vendored
2
vendor/k8s.io/client-go/tools/cache/fifo.go
generated
vendored
|
@ -59,7 +59,7 @@ type Queue interface {
|
|||
// has since been added.
|
||||
AddIfNotPresent(interface{}) error
|
||||
|
||||
// Return true if the first batch of items has been popped
|
||||
// HasSynced returns true if the first batch of items has been popped
|
||||
HasSynced() bool
|
||||
|
||||
// Close queue
|
||||
|
|
17
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
17
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
|
@ -17,10 +17,9 @@ limitations under the License.
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
@ -63,8 +62,18 @@ type Getter interface {
|
|||
|
||||
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
|
||||
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
|
||||
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
optionsModifier := func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = fieldSelector.String()
|
||||
}
|
||||
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
|
||||
}
|
||||
|
||||
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
|
||||
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
|
||||
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
|
||||
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
|
||||
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
optionsModifier(&options)
|
||||
return c.Get().
|
||||
Namespace(namespace).
|
||||
Resource(resource).
|
||||
|
@ -74,7 +83,7 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe
|
|||
}
|
||||
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.Watch = true
|
||||
options.FieldSelector = fieldSelector.String()
|
||||
optionsModifier(&options)
|
||||
return c.Get().
|
||||
Namespace(namespace).
|
||||
Resource(resource).
|
||||
|
|
3
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
3
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
|
@ -24,6 +24,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
)
|
||||
|
@ -43,6 +45,7 @@ func NewCacheMutationDetector(name string) CacheMutationDetector {
|
|||
if !mutationDetectionEnabled {
|
||||
return dummyMutationDetector{}
|
||||
}
|
||||
glog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
|
||||
return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
|
||||
}
|
||||
|
||||
|
|
10
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
10
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
|
@ -108,8 +108,8 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
|
||||
r := &Reflector{
|
||||
name: name,
|
||||
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
|
||||
metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
|
||||
// we need this to be unique per process (some names are still the same) but obvious who it belongs to
|
||||
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
expectedType: reflect.TypeOf(expectedType),
|
||||
|
@ -120,7 +120,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||
return r
|
||||
}
|
||||
|
||||
func makeValidPromethusMetricLabel(in string) string {
|
||||
func makeValidPrometheusMetricLabel(in string) string {
|
||||
// this isn't perfect, but it removes our common characters
|
||||
return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
|
||||
}
|
||||
|
@ -302,12 +302,12 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
default:
|
||||
}
|
||||
|
||||
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
options = metav1.ListOptions{
|
||||
ResourceVersion: resourceVersion,
|
||||
// We want to avoid situations of hanging watchers. Stop any wachers that do not
|
||||
// receive any events within the timeout window.
|
||||
TimeoutSeconds: &timemoutseconds,
|
||||
TimeoutSeconds: &timeoutSeconds,
|
||||
}
|
||||
|
||||
r.metrics.numberOfWatches.Inc()
|
||||
|
|
59
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
59
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
|
@ -26,6 +26,7 @@ import (
|
|||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/buffer"
|
||||
"k8s.io/client-go/util/retry"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
@ -188,7 +189,7 @@ type deleteNotification struct {
|
|||
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
@ -334,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addAndStartListener(listener)
|
||||
s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
|
@ -372,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
|||
}
|
||||
|
||||
type sharedProcessor struct {
|
||||
listenersStarted bool
|
||||
listenersLock sync.RWMutex
|
||||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
|
@ -379,20 +381,15 @@ type sharedProcessor struct {
|
|||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
if p.listenersStarted {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
|
@ -423,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
|||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
p.listenersStarted = true
|
||||
}()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
|
@ -540,20 +538,35 @@ func (p *processorListener) pop() {
|
|||
}
|
||||
|
||||
func (p *processorListener) run() {
|
||||
defer utilruntime.HandleCrash()
|
||||
// this call blocks until the channel is closed. When a panic happens during the notification
|
||||
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
|
||||
// the next notification will be attempted. This is usually better than the alternative of never
|
||||
// delivering again.
|
||||
stopCh := make(chan struct{})
|
||||
wait.Until(func() {
|
||||
// this gives us a few quick retries before a long pause and then a few more quick retries
|
||||
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
|
||||
for next := range p.nextCh {
|
||||
switch notification := next.(type) {
|
||||
case updateNotification:
|
||||
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
||||
case addNotification:
|
||||
p.handler.OnAdd(notification.newObj)
|
||||
case deleteNotification:
|
||||
p.handler.OnDelete(notification.oldObj)
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
|
||||
}
|
||||
}
|
||||
// the only way to get here is if the p.nextCh is empty and closed
|
||||
return true, nil
|
||||
})
|
||||
|
||||
for next := range p.nextCh {
|
||||
switch notification := next.(type) {
|
||||
case updateNotification:
|
||||
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
||||
case addNotification:
|
||||
p.handler.OnAdd(notification.newObj)
|
||||
case deleteNotification:
|
||||
p.handler.OnDelete(notification.oldObj)
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
|
||||
// the only way to get here is if the p.nextCh is empty and closed
|
||||
if err == nil {
|
||||
close(stopCh)
|
||||
}
|
||||
}
|
||||
}, 1*time.Minute, stopCh)
|
||||
}
|
||||
|
||||
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue