1
0
Fork 0

Switch to golang/dep.

This commit is contained in:
Ludovic Fernandez 2018-01-11 17:46:04 +01:00 committed by Traefiker
parent 709d50836b
commit 2618aef008
246 changed files with 42564 additions and 17452 deletions

16
vendor/github.com/coreos/etcd/mvcc/doc.go generated vendored Normal file
View file

@ -0,0 +1,16 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package mvcc defines etcd's stable MVCC storage.
package mvcc

219
vendor/github.com/coreos/etcd/mvcc/index.go generated vendored Normal file
View file

@ -0,0 +1,219 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"sort"
"sync"
"github.com/google/btree"
)
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}
type treeIndex struct {
sync.RWMutex
tree *btree.BTree
}
func newTreeIndex() index {
return &treeIndex{
tree: btree.New(32),
}
}
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(atRev)
}
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
}
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
return [][]byte{key}, []revision{rev}
}
keyi := &keyIndex{key: key}
endi := &keyIndex{key: end}
ti.RLock()
defer ti.RUnlock()
ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
curKeyi := item.(*keyIndex)
rev, _, _, err := curKeyi.get(atRev)
if err != nil {
return true
}
revs = append(revs, rev)
keys = append(keys, curKeyi.key)
return true
})
return keys, revs
}
func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
return ErrRevisionNotFound
}
ki := item.(*keyIndex)
return ki.tombstone(rev.main, rev.sub)
}
// RangeSince returns all revisions from key(including) to end(excluding)
// at or after the given rev. The returned slice is sorted in the order
// of revision.
func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
ti.RLock()
defer ti.RUnlock()
keyi := &keyIndex{key: key}
if end == nil {
item := ti.tree.Get(keyi)
if item == nil {
return nil
}
keyi = item.(*keyIndex)
return keyi.since(rev)
}
endi := &keyIndex{key: end}
var revs []revision
ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
curKeyi := item.(*keyIndex)
revs = append(revs, curKeyi.since(rev)...)
return true
})
sort.Sort(revisions(revs))
return revs
}
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
plog.Printf("store.index: compact %d", rev)
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
return available
}
func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}
func (a *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)
if a.tree.Len() != b.tree.Len() {
return false
}
equal := true
a.tree.Ascend(func(item btree.Item) bool {
aki := item.(*keyIndex)
bki := b.tree.Get(item).(*keyIndex)
if !aki.equal(bki) {
equal = false
return false
}
return true
})
return equal
}
func (ti *treeIndex) Insert(ki *keyIndex) {
ti.Lock()
defer ti.Unlock()
ti.tree.ReplaceOrInsert(ki)
}

332
vendor/github.com/coreos/etcd/mvcc/key_index.go generated vendored Normal file
View file

@ -0,0 +1,332 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"errors"
"fmt"
"github.com/google/btree"
)
var (
ErrRevisionNotFound = errors.New("mvcc: revision not found")
)
// keyIndex stores the revisions of a key in the backend.
// Each keyIndex has at least one key generation.
// Each generation might have several key versions.
// Tombstone on a key appends an tombstone version at the end
// of the current generation and creates a new empty generation.
// Each version of a key has an index pointing to the backend.
//
// For example: put(1.0);put(2.0);tombstone(3.0);put(4.0);tombstone(5.0) on key "foo"
// generate a keyIndex:
// key: "foo"
// rev: 5
// generations:
// {empty}
// {4.0, 5.0(t)}
// {1.0, 2.0, 3.0(t)}
//
// Compact a keyIndex removes the versions with smaller or equal to
// rev except the largest one. If the generation becomes empty
// during compaction, it will be removed. if all the generations get
// removed, the keyIndex should be removed.
// For example:
// compact(2) on the previous example
// generations:
// {empty}
// {4.0, 5.0(t)}
// {2.0, 3.0(t)}
//
// compact(4)
// generations:
// {empty}
// {4.0, 5.0(t)}
//
// compact(5):
// generations:
// {empty} -> key SHOULD be removed.
//
// compact(6):
// generations:
// {empty} -> key SHOULD be removed.
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
// put puts a revision to the keyIndex.
func (ki *keyIndex) put(main int64, sub int64) {
rev := revision{main: main, sub: sub}
if !rev.GreaterThan(ki.modified) {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 { // create a new key
keysGauge.Inc()
g.created = rev
}
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
}
func (ki *keyIndex) restore(created, modified revision, ver int64) {
if len(ki.generations) != 0 {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
ki.modified = modified
g := generation{created: created, ver: ver, revs: []revision{modified}}
ki.generations = append(ki.generations, g)
keysGauge.Inc()
}
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
// It also creates a new empty generation in the keyIndex.
// It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(main int64, sub int64) error {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}
// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
}
// since returns revisions since the given rev. Only the revision with the
// largest sub revision will be returned if multiple revisions have the same
// main revision.
func (ki *keyIndex) since(rev int64) []revision {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
since := revision{rev, 0}
var gi int
// find the generations to start checking
for gi = len(ki.generations) - 1; gi > 0; gi-- {
g := ki.generations[gi]
if g.isEmpty() {
continue
}
if since.GreaterThan(g.created) {
break
}
}
var revs []revision
var last int64
for ; gi < len(ki.generations); gi++ {
for _, r := range ki.generations[gi].revs {
if since.GreaterThan(r) {
continue
}
if r.main == last {
// replace the revision with a new one that has higher sub value,
// because the original one should not be seen by external
revs[len(revs)-1] = r
continue
}
revs = append(revs, r)
last = r.main
}
}
return revs
}
// compact compacts a keyIndex by removing the versions with smaller or equal
// revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
// walk until reaching the first revision that has an revision smaller or equal to
// the atRev.
// add it to the available map
f := func(rev revision) bool {
if rev.main <= atRev {
available[rev] = struct{}{}
return false
}
return true
}
i, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for i < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
i++
g = &ki.generations[i]
}
if !g.isEmpty() {
n := g.walk(f)
// remove the previous contents.
if n != -1 {
g.revs = g.revs[n:]
}
// remove any tombstone
if len(g.revs) == 1 && i != len(ki.generations)-1 {
delete(available, g.revs[0])
i++
}
}
// remove the previous generations.
ki.generations = ki.generations[i:]
}
func (ki *keyIndex) isEmpty() bool {
return len(ki.generations) == 1 && ki.generations[0].isEmpty()
}
// findGeneration finds out the generation of the keyIndex that the
// given rev belongs to. If the given rev is at the gap of two generations,
// which means that the key does not exist at the given rev, it returns nil.
func (ki *keyIndex) findGeneration(rev int64) *generation {
lastg := len(ki.generations) - 1
cg := lastg
for cg >= 0 {
if len(ki.generations[cg].revs) == 0 {
cg--
continue
}
g := ki.generations[cg]
if cg != lastg {
if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
return nil
}
}
if g.revs[0].main <= rev {
return &ki.generations[cg]
}
cg--
}
return nil
}
func (a *keyIndex) Less(b btree.Item) bool {
return bytes.Compare(a.key, b.(*keyIndex).key) == -1
}
func (a *keyIndex) equal(b *keyIndex) bool {
if !bytes.Equal(a.key, b.key) {
return false
}
if a.modified != b.modified {
return false
}
if len(a.generations) != len(b.generations) {
return false
}
for i := range a.generations {
ag, bg := a.generations[i], b.generations[i]
if !ag.equal(bg) {
return false
}
}
return true
}
func (ki *keyIndex) String() string {
var s string
for _, g := range ki.generations {
s += g.String()
}
return s
}
// generation contains multiple revisions of a key.
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
}
func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
// walk walks through the revisions in the generation in descending order.
// It passes the revision to the given function.
// walk returns until: 1. it finishes walking all pairs 2. the function returns false.
// walk returns the position at where it stopped. If it stopped after
// finishing walking, -1 will be returned.
func (g *generation) walk(f func(rev revision) bool) int {
l := len(g.revs)
for i := range g.revs {
ok := f(g.revs[l-i-1])
if !ok {
return l - i - 1
}
}
return -1
}
func (g *generation) String() string {
return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
}
func (a generation) equal(b generation) bool {
if a.ver != b.ver {
return false
}
if len(a.revs) != len(b.revs) {
return false
}
for i := range a.revs {
ar, br := a.revs[i], b.revs[i]
if ar != br {
return false
}
}
return true
}

147
vendor/github.com/coreos/etcd/mvcc/kv.go generated vendored Normal file
View file

@ -0,0 +1,147 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
type RangeOptions struct {
Limit int64
Rev int64
Count bool
}
type RangeResult struct {
KVs []mvccpb.KeyValue
Rev int64
Count int
}
type ReadView interface {
// FirstRev returns the first KV revision at the time of opening the txn.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64
// Rev returns the revision of the KV at the time of opening the txn.
Rev() int64
// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
// If `end` is nil, the request returns the key.
// If `end` is not nil and not empty, it gets the keys in range [key, range_end).
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}
type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
// The returned rev is the current revision of the KV when the operation is executed.
// It also generates one event for each key delete in the event history.
// if the `end` is nil, deleteRange deletes the key.
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}
// txnReadWrite coerces a read txn to a write, panicking on any write operation.
type txnReadWrite struct{ TxnRead }
func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
panic("unexpected Put")
}
func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read() TxnRead
// Write creates a write transaction.
Write() TxnWrite
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
Hash() (hash uint32, revision int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)
// Commit commits outstanding txns into the underlying backend.
Commit()
// Restore restores the KV store from a backend.
Restore(b backend.Backend) error
Close() error
}
// WatchableKV is a KV that can be watched.
type WatchableKV interface {
KV
Watchable
}
// Watchable is the interface that wraps the NewWatchStream function.
type Watchable interface {
// NewWatchStream returns a WatchStream that can be used to
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}
// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the current consistent index of the KV.
ConsistentIndex() uint64
}

53
vendor/github.com/coreos/etcd/mvcc/kv_view.go generated vendored Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
)
type readView struct{ kv KV }
func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.FirstRev()
}
func (rv *readView) Rev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.Rev()
}
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read()
defer tr.End()
return tr.Range(key, end, ro)
}
type writeView struct{ kv KV }
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.DeleteRange(key, end)
}
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.Put(key, value, lease)
}

459
vendor/github.com/coreos/etcd/mvcc/kvstore.go generated vendored Normal file
View file

@ -0,0 +1,459 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"encoding/binary"
"errors"
"math"
"sync"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
var (
keyBucketName = []byte("key")
metaBucketName = []byte("meta")
consistentIndexKeyName = []byte("consistent_index")
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled")
ErrClosed = errors.New("mvcc: closed")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
)
const (
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
)
var restoreChunkKeys = 10000 // non-const for testing
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
}
type store struct {
ReadView
WriteView
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
ig ConsistentIndexGetter
b backend.Backend
kvindex index
le lease.Lessor
// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex
// currentRev is the revision of the last completed transaction.
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
fifoSched schedule.Scheduler
stopc chan struct{}
}
// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
s := &store{
b: b,
ig: ig,
kvindex: newTreeIndex(),
le: le,
currentRev: 1,
compactMainRev: -1,
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
}
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(keyBucketName)
tx.UnsafeCreateBucket(metaBucketName)
tx.Unlock()
s.b.ForceCommit()
if err := s.restore(); err != nil {
// TODO: return the error instead of panic here?
panic("failed to recover store from backend")
}
return s
}
func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
if ctx == nil || ctx.Err() != nil {
s.mu.Lock()
select {
case <-s.stopc:
default:
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
}
s.mu.Unlock()
return
}
close(ch)
}
func (s *store) Hash() (hash uint32, revision int64, err error) {
s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
return h, s.currentRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock()
defer s.revMu.Unlock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
return ch, ErrCompacted
}
if rev > s.currentRev {
return nil, ErrFutureRev
}
start := time.Now()
s.compactMainRev = rev
rbytes := newRevBytes()
revToBytes(revision{main: rev}, rbytes)
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
// ensure that desired compaction is persisted
s.b.ForceCommit()
keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
}
if !s.scheduleCompaction(rev, keep) {
s.compactBarrier(nil, ch)
return
}
close(ch)
}
s.fifoSched.Schedule(j)
indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
return ch, nil
}
// DefaultIgnores is a map of keys to ignore in hash checking.
var DefaultIgnores map[backend.IgnoreKey]struct{}
func init() {
DefaultIgnores = map[backend.IgnoreKey]struct{}{
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
}
}
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()
tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()
s.b.ForceCommit()
}
func (s *store) Restore(b backend.Backend) error {
s.mu.Lock()
defer s.mu.Unlock()
close(s.stopc)
s.fifoSched.Stop()
s.b = b
s.kvindex = newTreeIndex()
s.currentRev = 1
s.compactMainRev = -1
s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
return s.restore()
}
func (s *store) restore() error {
reportDbTotalSizeInBytesMu.Lock()
b := s.b
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
reportDbTotalSizeInBytesMu.Unlock()
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
keyToLease := make(map[string]lease.LeaseID)
// restore index
tx := s.b.BatchTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
plog.Printf("restore compact to %d", s.compactMainRev)
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}
// index keys concurrently as they're loaded in from tx
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// rkvc blocks if the total pending keys exceeds the restore
// chunk size to keep keys from consuming too much memory.
restoreChunk(rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
}
// next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
close(rkvc)
s.currentRev = <-revc
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}
for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
plog.Errorf("unexpected Attach error: %v", err)
}
}
tx.Unlock()
if scheduledCompact != 0 {
s.Compact(scheduledCompact)
plog.Printf("resume scheduled compaction at %d", scheduledCompact)
}
return nil
}
type revKeyValue struct {
key []byte
kv mvccpb.KeyValue
kstr string
}
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
go func() {
currentRev := int64(1)
defer func() { revc <- currentRev }()
// restore the tree index from streaming the unordered index.
kiCache := make(map[string]*keyIndex, restoreChunkKeys)
for rkv := range rkvc {
ki, ok := kiCache[rkv.kstr]
// purge kiCache if many keys but still missing in the cache
if !ok && len(kiCache) >= restoreChunkKeys {
i := 10
for k := range kiCache {
delete(kiCache, k)
if i--; i == 0 {
break
}
}
}
// cache miss, fetch from tree index if there
if !ok {
ki = &keyIndex{key: rkv.kv.Key}
if idxKey := idx.KeyIndex(ki); idxKey != nil {
kiCache[rkv.kstr], ki = idxKey, idxKey
ok = true
}
}
rev := bytesToRev(rkv.key)
currentRev = rev.main
if ok {
if isTombstone(rkv.key) {
ki.tombstone(rev.main, rev.sub)
continue
}
ki.put(rev.main, rev.sub)
} else if !isTombstone(rkv.key) {
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
idx.Insert(ki)
kiCache[rkv.kstr] = ki
}
}
}()
return rkvc, revc
}
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
delete(keyToLease, rkv.kstr)
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
keyToLease[rkv.kstr] = lid
} else {
delete(keyToLease, rkv.kstr)
}
kvc <- rkv
}
}
func (s *store) Close() error {
close(s.stopc)
s.fifoSched.Stop()
return nil
}
func (a *store) Equal(b *store) bool {
if a.currentRev != b.currentRev {
return false
}
if a.compactMainRev != b.compactMainRev {
return false
}
return a.kvindex.Equal(b.kvindex)
}
func (s *store) saveIndex(tx backend.BatchTx) {
if s.ig == nil {
return
}
bs := s.bytesBuf8
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
func (s *store) ConsistentIndex() uint64 {
// TODO: cache index in a uint64 field?
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
return binary.BigEndian.Uint64(vs[0])
}
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(b []byte) []byte {
if len(b) != revBytesLen {
plog.Panicf("cannot append mark to non normal revision bytes")
}
return append(b, markTombstone)
}
// isTombstone checks whether the revision bytes is a tombstone.
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}
// revBytesRange returns the range of revision bytes at
// the given revision.
func revBytesRange(rev revision) (start, end []byte) {
start = newRevBytes()
revToBytes(rev, start)
end = newRevBytes()
endRev := revision{main: rev.main, sub: rev.sub + 1}
revToBytes(endRev, end)
return start, end
}

View file

@ -0,0 +1,66 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"encoding/binary"
"time"
)
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
totalStart := time.Now()
defer dbCompactionTotalDurations.Observe(float64(time.Since(totalStart) / time.Millisecond))
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
batchsize := int64(10000)
last := make([]byte, 8+1+8)
for {
var rev revision
start := time.Now()
tx := s.b.BatchTx()
tx.Lock()
keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(keyBucketName, key)
}
}
if len(keys) < int(batchsize) {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
tx.Unlock()
plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
return true
}
// update last
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
tx.Unlock()
dbCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
}
}

253
vendor/github.com/coreos/etcd/mvcc/kvstore_txn.go generated vendored Normal file
View file

@ -0,0 +1,253 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
type storeTxnRead struct {
s *store
tx backend.ReadTx
firstRev int64
rev int64
}
func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
}
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) End() {
tr.tx.Unlock()
tr.s.mu.RUnlock()
}
type storeTxnWrite struct {
*storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}
func (s *store) Write() TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: &storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
rev := tw.beginRev
if len(tw.changes) > 0 {
rev++
}
return tw.rangeKeys(key, end, rev, ro)
}
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, int64(tw.beginRev + 1)
}
return 0, int64(tw.beginRev)
}
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return int64(tw.beginRev + 1)
}
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
tw.s.saveIndex(tw.tx)
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
}
if rev <= 0 {
rev = curRev
}
if rev < tr.s.compactMainRev {
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
if ro.Count {
return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
}
var kvs []mvccpb.KeyValue
for _, revpair := range revpairs {
start, end := revBytesRange(revpair)
_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
break
}
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev += 1
}
keys, revs := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for i, key := range keys {
tw.delete(key, revs[i])
}
return int64(len(keys))
}
func (tw *storeTxnWrite) delete(key []byte, rev revision) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(ibytes)
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
tw.changes = append(tw.changes, kv)
item := lease.LeaseItem{Key: string(key)}
leaseID := tw.s.le.GetLease(item)
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
plog.Errorf("cannot detach %v", err)
}
}
}
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }

174
vendor/github.com/coreos/etcd/mvcc/metrics.go generated vendored Normal file
View file

@ -0,0 +1,174 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
var (
rangeCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "range_total",
Help: "Total number of ranges seen by this member.",
})
putCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "put_total",
Help: "Total number of puts seen by this member.",
})
deleteCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "delete_total",
Help: "Total number of deletes seen by this member.",
})
txnCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "txn_total",
Help: "Total number of txns seen by this member.",
})
keysGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "keys_total",
Help: "Total number of keys.",
})
watchStreamGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "watch_stream_total",
Help: "Total number of watch streams.",
})
watcherGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "watcher_total",
Help: "Total number of watchers.",
})
slowWatcherGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "slow_watcher_total",
Help: "Total number of unsynced slow watchers.",
})
totalEventsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "events_total",
Help: "Total number of events sent by this member.",
})
pendingEventsGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "pending_events_total",
Help: "Total number of pending events to be sent.",
})
indexCompactionPauseDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "index_compaction_pause_duration_milliseconds",
Help: "Bucketed histogram of index compaction pause duration.",
// 0.5ms -> 1second
Buckets: prometheus.ExponentialBuckets(0.5, 2, 12),
})
dbCompactionPauseDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_compaction_pause_duration_milliseconds",
Help: "Bucketed histogram of db compaction pause duration.",
// 1ms -> 4second
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
})
dbCompactionTotalDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_compaction_total_duration_milliseconds",
Help: "Bucketed histogram of db compaction total duration.",
// 100ms -> 800second
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
})
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database in bytes.",
},
func() float64 {
reportDbTotalSizeInBytesMu.RLock()
defer reportDbTotalSizeInBytesMu.RUnlock()
return reportDbTotalSizeInBytes()
},
)
// overridden by mvcc initialization
reportDbTotalSizeInBytesMu sync.RWMutex
reportDbTotalSizeInBytes func() float64 = func() float64 { return 0 }
)
func init() {
prometheus.MustRegister(rangeCounter)
prometheus.MustRegister(putCounter)
prometheus.MustRegister(deleteCounter)
prometheus.MustRegister(txnCounter)
prometheus.MustRegister(keysGauge)
prometheus.MustRegister(watchStreamGauge)
prometheus.MustRegister(watcherGauge)
prometheus.MustRegister(slowWatcherGauge)
prometheus.MustRegister(totalEventsCounter)
prometheus.MustRegister(pendingEventsGauge)
prometheus.MustRegister(indexCompactionPauseDurations)
prometheus.MustRegister(dbCompactionPauseDurations)
prometheus.MustRegister(dbCompactionTotalDurations)
prometheus.MustRegister(dbTotalSize)
}
// ReportEventReceived reports that an event is received.
// This function should be called when the external systems received an
// event from mvcc.Watcher.
func ReportEventReceived(n int) {
pendingEventsGauge.Sub(float64(n))
totalEventsCounter.Add(float64(n))
}

67
vendor/github.com/coreos/etcd/mvcc/metrics_txn.go generated vendored Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
)
type metricsTxnWrite struct {
TxnWrite
ranges uint
puts uint
deletes uint
}
func newMetricsTxnRead(tr TxnRead) TxnRead {
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
}
func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
return &metricsTxnWrite{tw, 0, 0, 0}
}
func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
tw.ranges++
return tw.TxnWrite.Range(key, end, ro)
}
func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
tw.deletes++
return tw.TxnWrite.DeleteRange(key, end)
}
func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw.puts++
return tw.TxnWrite.Put(key, value, lease)
}
func (tw *metricsTxnWrite) End() {
defer tw.TxnWrite.End()
if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
if sum > 1 {
txnCounter.Inc()
}
return
}
switch {
case tw.ranges == 1:
rangeCounter.Inc()
case tw.puts == 1:
putCounter.Inc()
case tw.deletes == 1:
deleteCounter.Inc()
}
}

67
vendor/github.com/coreos/etcd/mvcc/revision.go generated vendored Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import "encoding/binary"
// revBytesLen is the byte length of a normal revision.
// First 8 bytes is the revision.main in big-endian format. The 9th byte
// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
const revBytesLen = 8 + 1 + 8
// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}
func newRevBytes() []byte {
return make([]byte, revBytesLen, markedRevBytesLen)
}
func revToBytes(rev revision, bytes []byte) {
binary.BigEndian.PutUint64(bytes, uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
type revisions []revision
func (a revisions) Len() int { return len(a) }
func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) }
func (a revisions) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

56
vendor/github.com/coreos/etcd/mvcc/util.go generated vendored Normal file
View file

@ -0,0 +1,56 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"encoding/binary"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
func UpdateConsistentIndex(be backend.Backend, index uint64) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
var oldi uint64
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) != 0 {
oldi = binary.BigEndian.Uint64(vs[0])
}
if index <= oldi {
return
}
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, index)
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
ibytes := newRevBytes()
revToBytes(revision{main: kv.ModRevision}, ibytes)
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
be.BatchTx().Lock()
be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
be.BatchTx().Unlock()
}

522
vendor/github.com/coreos/etcd/mvcc/watchable_store.go generated vendored Normal file
View file

@ -0,0 +1,522 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"sync"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
const (
// chanBufLen is the length of the buffered chan
// for sending out watched events.
// TODO: find a good buf value. 1024 is just a random one that
// seems to be reasonable.
chanBufLen = 1024
// maxWatchersPerSync is the number of watchers to sync in a single batch
maxWatchersPerSync = 512
)
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
rev() int64
}
type watchableStore struct {
*store
// mu protects watcher groups and batches. It should never be locked
// before locking store.mu to avoid deadlock.
mu sync.RWMutex
// victims are watcher batches that were blocked on the watch channel
victims []watcherBatch
victimc chan struct{}
// contains all unsynced watchers that needs to sync with events that have happened
unsynced watcherGroup
// contains all synced watchers that are in sync with the progress of the store.
// The key of the map is the key that the watcher watches on.
synced watcherGroup
stopc chan struct{}
wg sync.WaitGroup
}
// cancelFunc updates unsynced and synced maps when running
// cancel operations.
type cancelFunc func()
func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newWatchableStore(b, le, ig)
}
func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
s := &watchableStore{
store: NewStore(b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
if s.le != nil {
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}
func (s *watchableStore) Close() error {
close(s.stopc)
s.wg.Wait()
return s.store.Close()
}
func (s *watchableStore) NewWatchStream() WatchStream {
watchStreamGauge.Inc()
return &watchStream{
watchable: s,
ch: make(chan WatchResponse, chanBufLen),
cancels: make(map[WatchID]cancelFunc),
watchers: make(map[WatchID]*watcher),
}
}
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
wa := &watcher{
key: key,
end: end,
minRev: startRev,
id: id,
ch: ch,
fcs: fcs,
}
s.mu.Lock()
s.revMu.RLock()
synced := startRev > s.store.currentRev || startRev == 0
if synced {
wa.minRev = s.store.currentRev + 1
if startRev > wa.minRev {
wa.minRev = startRev
}
}
if synced {
s.synced.add(wa)
} else {
slowWatcherGauge.Inc()
s.unsynced.add(wa)
}
s.revMu.RUnlock()
s.mu.Unlock()
watcherGauge.Inc()
return wa, func() { s.cancelWatcher(wa) }
}
// cancelWatcher removes references of the watcher from the watchableStore
func (s *watchableStore) cancelWatcher(wa *watcher) {
for {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
break
} else if s.synced.delete(wa) {
break
} else if wa.compacted {
break
}
if !wa.victim {
panic("watcher not victim but not in watch groups")
}
var victimBatch watcherBatch
for _, wb := range s.victims {
if wb[wa] != nil {
victimBatch = wb
break
}
}
if victimBatch != nil {
slowWatcherGauge.Dec()
delete(victimBatch, wa)
break
}
// victim being processed so not accessible; retry
s.mu.Unlock()
time.Sleep(time.Millisecond)
}
watcherGauge.Dec()
s.mu.Unlock()
}
func (s *watchableStore) Restore(b backend.Backend) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.store.Restore(b)
if err != nil {
return err
}
for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
}
s.synced = newWatcherGroup()
return nil
}
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()
for {
s.mu.RLock()
st := time.Now()
lastUnsyncedWatchers := s.unsynced.size()
s.mu.RUnlock()
unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers()
}
syncDuration := time.Since(st)
waitDuration := 100 * time.Millisecond
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
// be fair to other store operations by yielding time taken
waitDuration = syncDuration
}
select {
case <-time.After(waitDuration):
case <-s.stopc:
return
}
}
}
// syncVictimsLoop tries to write precomputed watcher responses to
// watchers that had a blocked watcher channel
func (s *watchableStore) syncVictimsLoop() {
defer s.wg.Done()
for {
for s.moveVictims() != 0 {
// try to update all victim watchers
}
s.mu.RLock()
isEmpty := len(s.victims) == 0
s.mu.RUnlock()
var tickc <-chan time.Time
if !isEmpty {
tickc = time.After(10 * time.Millisecond)
}
select {
case <-tickc:
case <-s.victimc:
case <-s.stopc:
return
}
}
}
// moveVictims tries to update watches with already pending event data
func (s *watchableStore) moveVictims() (moved int) {
s.mu.Lock()
victims := s.victims
s.victims = nil
s.mu.Unlock()
var newVictim watcherBatch
for _, wb := range victims {
// try to send responses again
for w, eb := range wb {
// watcher has observed the store up to, but not including, w.minRev
rev := w.minRev - 1
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
if newVictim == nil {
newVictim = make(watcherBatch)
}
newVictim[w] = eb
continue
}
moved++
}
// assign completed victim watchers to unsync/sync
s.mu.Lock()
s.store.revMu.RLock()
curRev := s.store.currentRev
for w, eb := range wb {
if newVictim != nil && newVictim[w] != nil {
// couldn't send watch response; stays victim
continue
}
w.victim = false
if eb.moreRev != 0 {
w.minRev = eb.moreRev
}
if w.minRev <= curRev {
s.unsynced.add(w)
} else {
slowWatcherGauge.Dec()
s.synced.add(w)
}
}
s.store.revMu.RUnlock()
s.mu.Unlock()
}
if len(newVictim) > 0 {
s.mu.Lock()
s.victims = append(s.victims, newVictim)
s.mu.Unlock()
}
return moved
}
// syncWatchers syncs unsynced watchers by:
// 1. choose a set of watchers from the unsynced watcher group
// 2. iterate over the set to get the minimum revision and remove compacted watchers
// 3. use minimum revision to get all key-value pairs and send those events to watchers
// 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() int {
s.mu.Lock()
defer s.mu.Unlock()
if s.unsynced.size() == 0 {
return 0
}
s.store.revMu.RLock()
defer s.store.revMu.RUnlock()
// in order to find key-value pairs from unsynced watchers, we need to
// find min revision index, and these revisions can be used to
// query the backend store of key-value pairs
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.Lock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
evs := kvsToEvents(wg, revs, vs)
tx.Unlock()
var victims watcherBatch
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
w.minRev = curRev + 1
eb, ok := wb[w]
if !ok {
// bring un-notified watcher to synced
s.synced.add(w)
s.unsynced.delete(w)
continue
}
if eb.moreRev != 0 {
w.minRev = eb.moreRev
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
if victims == nil {
victims = make(watcherBatch)
}
w.victim = true
}
if w.victim {
victims[w] = eb
} else {
if eb.moreRev != 0 {
// stay unsynced; more to read
continue
}
s.synced.add(w)
}
s.unsynced.delete(w)
}
s.addVictim(victims)
vsz := 0
for _, v := range s.victims {
vsz += len(v)
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
return s.unsynced.size()
}
// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
plog.Panicf("cannot unmarshal event: %v", err)
}
if !wg.contains(string(kv.Key)) {
continue
}
ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
// patch in mod revision so watchers won't skip
kv.ModRevision = bytesToRev(revs[i]).main
}
evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}
return evs
}
// notify notifies the fact that given event at the given rev just happened to
// watchers that watch on the key of the event.
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
plog.Panicf("unexpected multiple revisions in notification")
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
w.victim = true
victim[w] = eb
s.synced.delete(w)
slowWatcherGauge.Inc()
}
}
s.addVictim(victim)
}
func (s *watchableStore) addVictim(victim watcherBatch) {
if victim == nil {
return
}
s.victims = append(s.victims, victim)
select {
case s.victimc <- struct{}{}:
default:
}
}
func (s *watchableStore) rev() int64 { return s.store.Rev() }
func (s *watchableStore) progress(w *watcher) {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.synced.watchers[w]; ok {
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
// If the ch is full, this watcher is receiving events.
// We do not need to send progress at all.
}
}
type watcher struct {
// the watcher key
key []byte
// end indicates the end of the range to watch.
// If end is set, the watcher is on a range.
end []byte
// victim is set when ch is blocked and undergoing victim processing
victim bool
// compacted is set when the watcher is removed because of compaction
compacted bool
// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID
fcs []FilterFunc
// a chan to send out the watch response.
// The chan might be shared with other watchers.
ch chan<- WatchResponse
}
func (w *watcher) send(wr WatchResponse) bool {
progressEvent := len(wr.Events) == 0
if len(w.fcs) != 0 {
ne := make([]mvccpb.Event, 0, len(wr.Events))
for i := range wr.Events {
filtered := false
for _, filter := range w.fcs {
if filter(wr.Events[i]) {
filtered = true
break
}
}
if !filtered {
ne = append(ne, wr.Events[i])
}
}
wr.Events = ne
}
// if all events are filtered out, we should send nothing.
if !progressEvent && len(wr.Events) == 0 {
return true
}
select {
case w.ch <- wr:
return true
default:
return false
}
}

View file

@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/mvcc/mvccpb"
)
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
// end write txn under watchable store lock so the updates are visible
// when asynchronous event posting checks the current store revision
tw.s.mu.Lock()
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }

171
vendor/github.com/coreos/etcd/mvcc/watcher.go generated vendored Normal file
View file

@ -0,0 +1,171 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"errors"
"sync"
"github.com/coreos/etcd/mvcc/mvccpb"
)
var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
)
type WatchID int64
// FilterFunc returns true if the given event should be filtered out.
type FilterFunc func(e mvccpb.Event) bool
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If `startRev` <=0, watch observes events after currentRev.
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
// RequestProgress requests the progress of the watcher with given ID. The response
// will only be sent if the watcher is currently synced.
// The responses will be sent through the WatchRespone Chan attached
// with this stream to ensure correct ordering.
// The responses contains no events. The revision in the response is the progress
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
// Close closes Chan and release all related resources.
Close()
// Rev returns the current revision of the KV the stream watches on.
Rev() int64
}
type WatchResponse struct {
// WatchID is the WatchID of the watcher this response sent to.
WatchID WatchID
// Events contains all the events that needs to send.
Events []mvccpb.Event
// Revision is the revision of the KV when the watchResponse is created.
// For a normal response, the revision should be the same as the last
// modified revision inside Events. For a delayed response to a unsynced
// watcher, the revision is greater than the last modified revision
// inside Events.
Revision int64
// CompactRevision is set when the watcher is cancelled due to compaction.
CompactRevision int64
}
// watchStream contains a collection of watchers that share
// one streaming chan to send out watched events and other control events.
type watchStream struct {
watchable watchable
ch chan WatchResponse
mu sync.Mutex // guards fields below it
// nextID is the ID pre-allocated for next new watcher in this stream
nextID WatchID
closed bool
cancels map[WatchID]cancelFunc
watchers map[WatchID]*watcher
}
// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1
}
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1
}
id := ws.nextID
ws.nextID++
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w
return id
}
func (ws *watchStream) Chan() <-chan WatchResponse {
return ws.ch
}
func (ws *watchStream) Cancel(id WatchID) error {
ws.mu.Lock()
cancel, ok := ws.cancels[id]
ok = ok && !ws.closed
if ok {
delete(ws.cancels, id)
delete(ws.watchers, id)
}
ws.mu.Unlock()
if !ok {
return ErrWatcherNotExist
}
cancel()
return nil
}
func (ws *watchStream) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()
for _, cancel := range ws.cancels {
cancel()
}
ws.closed = true
close(ws.ch)
watchStreamGauge.Dec()
}
func (ws *watchStream) Rev() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.rev()
}
func (ws *watchStream) RequestProgress(id WatchID) {
ws.mu.Lock()
w, ok := ws.watchers[id]
ws.mu.Unlock()
if !ok {
return
}
ws.watchable.progress(w)
}

283
vendor/github.com/coreos/etcd/mvcc/watcher_group.go generated vendored Normal file
View file

@ -0,0 +1,283 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"math"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/adt"
)
var (
// watchBatchMaxRevs is the maximum distinct revisions that
// may be sent to an unsynced watcher at a time. Declared as
// var instead of const for testing purposes.
watchBatchMaxRevs = 1000
)
type eventBatch struct {
// evs is a batch of revision-ordered events
evs []mvccpb.Event
// revs is the minimum unique revisions observed for this batch
revs int
// moreRev is first revision with more events following this batch
moreRev int64
}
func (eb *eventBatch) add(ev mvccpb.Event) {
if eb.revs > watchBatchMaxRevs {
// maxed out batch size
return
}
if len(eb.evs) == 0 {
// base case
eb.revs = 1
eb.evs = append(eb.evs, ev)
return
}
// revision accounting
ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
evRev := ev.Kv.ModRevision
if evRev > ebRev {
eb.revs++
if eb.revs > watchBatchMaxRevs {
eb.moreRev = evRev
return
}
}
eb.evs = append(eb.evs, ev)
}
type watcherBatch map[*watcher]*eventBatch
func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
eb := wb[w]
if eb == nil {
eb = &eventBatch{}
wb[w] = eb
}
eb.add(ev)
}
// newWatcherBatch maps watchers to their matched events. It enables quick
// events look up by watcher.
func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
if len(wg.watchers) == 0 {
return nil
}
wb := make(watcherBatch)
for _, ev := range evs {
for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
if ev.Kv.ModRevision >= w.minRev {
// don't double notify
wb.add(w, ev)
}
}
}
return wb
}
type watcherSet map[*watcher]struct{}
func (w watcherSet) add(wa *watcher) {
if _, ok := w[wa]; ok {
panic("add watcher twice!")
}
w[wa] = struct{}{}
}
func (w watcherSet) union(ws watcherSet) {
for wa := range ws {
w.add(wa)
}
}
func (w watcherSet) delete(wa *watcher) {
if _, ok := w[wa]; !ok {
panic("removing missing watcher!")
}
delete(w, wa)
}
type watcherSetByKey map[string]watcherSet
func (w watcherSetByKey) add(wa *watcher) {
set := w[string(wa.key)]
if set == nil {
set = make(watcherSet)
w[string(wa.key)] = set
}
set.add(wa)
}
func (w watcherSetByKey) delete(wa *watcher) bool {
k := string(wa.key)
if v, ok := w[k]; ok {
if _, ok := v[wa]; ok {
delete(v, wa)
if len(v) == 0 {
// remove the set; nothing left
delete(w, k)
}
return true
}
}
return false
}
// watcherGroup is a collection of watchers organized by their ranges
type watcherGroup struct {
// keyWatchers has the watchers that watch on a single key
keyWatchers watcherSetByKey
// ranges has the watchers that watch a range; it is sorted by interval
ranges adt.IntervalTree
// watchers is the set of all watchers
watchers watcherSet
}
func newWatcherGroup() watcherGroup {
return watcherGroup{
keyWatchers: make(watcherSetByKey),
watchers: make(watcherSet),
}
}
// add puts a watcher in the group.
func (wg *watcherGroup) add(wa *watcher) {
wg.watchers.add(wa)
if wa.end == nil {
wg.keyWatchers.add(wa)
return
}
// interval already registered?
ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
if iv := wg.ranges.Find(ivl); iv != nil {
iv.Val.(watcherSet).add(wa)
return
}
// not registered, put in interval tree
ws := make(watcherSet)
ws.add(wa)
wg.ranges.Insert(ivl, ws)
}
// contains is whether the given key has a watcher in the group.
func (wg *watcherGroup) contains(key string) bool {
_, ok := wg.keyWatchers[key]
return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
}
// size gives the number of unique watchers in the group.
func (wg *watcherGroup) size() int { return len(wg.watchers) }
// delete removes a watcher from the group.
func (wg *watcherGroup) delete(wa *watcher) bool {
if _, ok := wg.watchers[wa]; !ok {
return false
}
wg.watchers.delete(wa)
if wa.end == nil {
wg.keyWatchers.delete(wa)
return true
}
ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
iv := wg.ranges.Find(ivl)
if iv == nil {
return false
}
ws := iv.Val.(watcherSet)
delete(ws, wa)
if len(ws) == 0 {
// remove interval missing watchers
if ok := wg.ranges.Delete(ivl); !ok {
panic("could not remove watcher from interval tree")
}
}
return true
}
// choose selects watchers from the watcher group to update
func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
if len(wg.watchers) < maxWatchers {
return wg, wg.chooseAll(curRev, compactRev)
}
ret := newWatcherGroup()
for w := range wg.watchers {
if maxWatchers <= 0 {
break
}
maxWatchers--
ret.add(w)
}
return &ret, ret.chooseAll(curRev, compactRev)
}
func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
panic("watcher current revision should not exceed current revision")
}
if w.minRev < compactRev {
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}
continue
}
if minRev > w.minRev {
minRev = w.minRev
}
}
return minRev
}
// watcherSetByKey gets the set of watchers that receive events on the given key.
func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
wkeys := wg.keyWatchers[key]
wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
// zero-copy cases
switch {
case len(wranges) == 0:
// no need to merge ranges or copy; reuse single-key set
return wkeys
case len(wranges) == 0 && len(wkeys) == 0:
return nil
case len(wranges) == 1 && len(wkeys) == 0:
return wranges[0].Val.(watcherSet)
}
// copy case
ret := make(watcherSet)
ret.union(wg.keyWatchers[key])
for _, item := range wranges {
ret.union(item.Val.(watcherSet))
}
return ret
}