Vendor integration dependencies.

This commit is contained in:
Timo Reimann 2017-02-07 22:33:23 +01:00
parent dd5e3fba01
commit 55b57c736b
2451 changed files with 731611 additions and 0 deletions

View file

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
The detector package houses implementation of master detectors.
The default implementation is the zookeeper master detector.
It uses zookeeper to detect the lead Mesos master during startup/failover.
*/
package detector

View file

@ -0,0 +1,155 @@
package detector
import (
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
util "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/upid"
)
var (
pluginLock sync.Mutex
plugins = map[string]PluginFactory{}
EmptySpecError = errors.New("empty master specification")
defaultFactory = PluginFactory(func(spec string) (Master, error) {
if len(spec) == 0 {
return nil, EmptySpecError
}
if strings.Index(spec, "@") < 0 {
spec = "master@" + spec
}
if pid, err := upid.Parse(spec); err == nil {
return NewStandalone(CreateMasterInfo(pid)), nil
} else {
return nil, err
}
})
)
type PluginFactory func(string) (Master, error)
// associates a plugin implementation with a Master specification prefix.
// packages that provide plugins are expected to invoke this func within
// their init() implementation. schedulers that wish to support plugins may
// anonymously import ("_") a package the auto-registers said plugins.
func Register(prefix string, f PluginFactory) error {
if prefix == "" {
return fmt.Errorf("illegal prefix: '%v'", prefix)
}
if f == nil {
return fmt.Errorf("nil plugin factories are not allowed")
}
pluginLock.Lock()
defer pluginLock.Unlock()
if _, found := plugins[prefix]; found {
return fmt.Errorf("detection plugin already registered for prefix '%s'", prefix)
}
plugins[prefix] = f
return nil
}
// Create a new detector given the provided specification. Examples are:
//
// - file://{path_to_local_file}
// - {ipaddress}:{port}
// - master@{ip_address}:{port}
// - master({id})@{ip_address}:{port}
//
// Support for the file:// prefix is intentionally hardcoded so that it may
// not be inadvertently overridden by a custom plugin implementation. Custom
// plugins are supported via the Register and MatchingPlugin funcs.
//
// Furthermore it is expected that master detectors returned from this func
// are not yet running and will only begin to spawn requisite background
// processing upon, or some time after, the first invocation of their Detect.
//
func New(spec string) (m Master, err error) {
if strings.HasPrefix(spec, "file://") {
var body []byte
path := spec[7:]
body, err = ioutil.ReadFile(path)
if err != nil {
log.V(1).Infof("failed to read from file at '%s'", path)
} else {
m, err = New(string(body))
}
} else if f, ok := MatchingPlugin(spec); ok {
m, err = f(spec)
} else {
m, err = defaultFactory(spec)
}
return
}
func MatchingPlugin(spec string) (PluginFactory, bool) {
pluginLock.Lock()
defer pluginLock.Unlock()
for prefix, f := range plugins {
if strings.HasPrefix(spec, prefix) {
return f, true
}
}
return nil, false
}
// Super-useful utility func that attempts to build a mesos.MasterInfo from a
// upid.UPID specification. An attempt is made to determine the IP address of
// the UPID's Host and any errors during such resolution will result in a nil
// returned result. A nil result is also returned upon errors parsing the Port
// specification of the UPID.
//
// TODO(jdef) make this a func of upid.UPID so that callers can invoke somePid.MasterInfo()?
//
func CreateMasterInfo(pid *upid.UPID) *mesos.MasterInfo {
if pid == nil {
return nil
}
port, err := strconv.Atoi(pid.Port)
if err != nil {
log.Errorf("failed to parse port: %v", err)
return nil
}
//TODO(jdef) what about (future) ipv6 support?
var ipv4 net.IP
if ipv4 = net.ParseIP(pid.Host); ipv4 != nil {
// This is needed for the people cross-compiling from macos to linux.
// The cross-compiled version of net.LookupIP() fails to handle plain IPs.
// See https://github.com/mesos/mesos-go/pull/117
} else if addrs, err := net.LookupIP(pid.Host); err == nil {
for _, ip := range addrs {
if ip = ip.To4(); ip != nil {
ipv4 = ip
break
}
}
if ipv4 == nil {
log.Errorf("host does not resolve to an IPv4 address: %v", pid.Host)
return nil
}
} else {
log.Errorf("failed to lookup IPs for host '%v': %v", pid.Host, err)
return nil
}
packedip := binary.BigEndian.Uint32(ipv4) // network byte order is big-endian
mi := util.NewMasterInfo(pid.ID, packedip, uint32(port))
mi.Pid = proto.String(pid.String())
if pid.Host != "" {
mi.Hostname = proto.String(pid.Host)
}
return mi
}

View file

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package detector
import (
mesos "github.com/mesos/mesos-go/mesosproto"
)
type MasterChanged interface {
// Invoked when the master changes
OnMasterChanged(*mesos.MasterInfo)
}
// AllMasters defines an optional interface that, if implemented by the same
// struct as implements MasterChanged, will receive an additional callbacks
// independently of leadership changes. it's possible that, as a result of a
// leadership change, both the OnMasterChanged and UpdatedMasters callbacks
// would be invoked.
//
// **NOTE:** Detector implementations are not required to support this optional
// interface. Please RTFM of the detector implementation that you want to use.
type AllMasters interface {
// UpdatedMasters is invoked upon a change in the membership of mesos
// masters, and is useful to clients that wish to know the entire set
// of Mesos masters currently running.
UpdatedMasters([]*mesos.MasterInfo)
}
// func/interface adapter
type OnMasterChanged func(*mesos.MasterInfo)
func (f OnMasterChanged) OnMasterChanged(mi *mesos.MasterInfo) {
f(mi)
}
// An abstraction of a Master detector which can be used to
// detect the leading master from a group.
type Master interface {
// Detect new master election. Every time a new master is elected, the
// detector will alert the observer. The first call to Detect is expected
// to kickstart any background detection processing (and not before then).
// If detection startup fails, or the listener cannot be added, then an
// error is returned.
Detect(MasterChanged) error
// returns a chan that, when closed, indicates the detector has terminated
Done() <-chan struct{}
// cancel the detector. it's ok to call this multiple times, or even if
// Detect() hasn't been invoked yet.
Cancel()
}

View file

@ -0,0 +1,244 @@
package detector
import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"time"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
const (
defaultMesosHttpClientTimeout = 10 * time.Second //TODO(jdef) configurable via fiag?
defaultMesosLeaderSyncInterval = 30 * time.Second //TODO(jdef) configurable via fiag?
defaultMesosMasterPort = 5050
)
// enables easier unit testing
type fetcherFunc func(ctx context.Context, address string) (*upid.UPID, error)
type Standalone struct {
ch chan *mesos.MasterInfo
client *http.Client
tr *http.Transport
pollOnce sync.Once
initial *mesos.MasterInfo
done chan struct{}
cancelOnce sync.Once
leaderSyncInterval time.Duration
httpClientTimeout time.Duration
assumedMasterPort int
poller func(pf fetcherFunc)
fetchPid fetcherFunc
}
// Create a new stand alone master detector.
func NewStandalone(mi *mesos.MasterInfo) *Standalone {
log.V(2).Infof("creating new standalone detector for %+v", mi)
stand := &Standalone{
ch: make(chan *mesos.MasterInfo),
tr: &http.Transport{},
initial: mi,
done: make(chan struct{}),
leaderSyncInterval: defaultMesosLeaderSyncInterval,
httpClientTimeout: defaultMesosHttpClientTimeout,
assumedMasterPort: defaultMesosMasterPort,
}
stand.poller = stand._poller
stand.fetchPid = stand._fetchPid
return stand
}
func (s *Standalone) String() string {
return fmt.Sprintf("{initial: %+v}", s.initial)
}
// Detecting the new master.
func (s *Standalone) Detect(o MasterChanged) error {
log.V(2).Info("Detect()")
s.pollOnce.Do(func() {
log.V(1).Info("spinning up asyc master detector poller")
// delayed initialization allows unit tests to modify timeouts before detection starts
s.client = &http.Client{
Transport: s.tr,
Timeout: s.httpClientTimeout,
}
go s.poller(s.fetchPid)
})
if o != nil {
log.V(1).Info("spawning asyc master detector listener")
go func() {
log.V(2).Infof("waiting for polled to send updates")
pollWaiter:
for {
select {
case mi, ok := <-s.ch:
if !ok {
break pollWaiter
}
log.V(1).Infof("detected master change: %+v", mi)
o.OnMasterChanged(mi)
case <-s.done:
return
}
}
o.OnMasterChanged(nil)
}()
} else {
log.Warningf("detect called with a nil master change listener")
}
return nil
}
func (s *Standalone) Done() <-chan struct{} {
return s.done
}
func (s *Standalone) Cancel() {
s.cancelOnce.Do(func() { close(s.done) })
}
// poll for changes to master leadership via current leader's /state.json endpoint.
// we poll the `initial` leader, aborting if none was specified.
//
// TODO(jdef) follow the leader: change who we poll based on the prior leader
// TODO(jdef) somehow determine all masters in cluster from the state.json?
//
func (s *Standalone) _poller(pf fetcherFunc) {
defer func() {
defer s.Cancel()
log.Warning("shutting down standalone master detection")
}()
if s.initial == nil {
log.Errorf("aborting master poller since initial master info is nil")
return
}
addr := s.initial.GetHostname()
if len(addr) == 0 {
if s.initial.GetIp() == 0 {
log.Warningf("aborted mater poller since initial master info has no host")
return
}
ip := make([]byte, 4)
binary.BigEndian.PutUint32(ip, s.initial.GetIp())
addr = net.IP(ip).To4().String()
}
port := uint32(s.assumedMasterPort)
if s.initial.Port != nil && *s.initial.Port != 0 {
port = *s.initial.Port
}
addr = net.JoinHostPort(addr, strconv.Itoa(int(port)))
log.V(1).Infof("polling for master leadership at '%v'", addr)
var lastpid *upid.UPID
for {
startedAt := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), s.leaderSyncInterval)
if pid, err := pf(ctx, addr); err == nil {
if !pid.Equal(lastpid) {
log.V(2).Infof("detected leadership change from '%v' to '%v'", lastpid, pid)
lastpid = pid
elapsed := time.Now().Sub(startedAt)
mi := CreateMasterInfo(pid)
select {
case s.ch <- mi: // noop
case <-time.After(s.leaderSyncInterval - elapsed):
// no one heard the master change, oh well - poll again
goto continuePolling
case <-s.done:
cancel()
return
}
} else {
log.V(2).Infof("no change to master leadership: '%v'", lastpid)
}
} else if err == context.DeadlineExceeded {
if lastpid != nil {
lastpid = nil
select {
case s.ch <- nil: // lost master
case <-s.done: // no need to cancel ctx
return
}
}
goto continuePolling
} else {
select {
case <-s.done:
cancel()
return
default:
if err != context.Canceled {
log.Error(err)
}
}
}
if remaining := s.leaderSyncInterval - time.Now().Sub(startedAt); remaining > 0 {
log.V(3).Infof("master leader poller sleeping for %v", remaining)
time.Sleep(remaining)
}
continuePolling:
cancel()
}
}
// assumes that address is in host:port format
func (s *Standalone) _fetchPid(ctx context.Context, address string) (*upid.UPID, error) {
//TODO(jdef) need SSL support
uri := fmt.Sprintf("http://%s/state.json", address)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
var pid *upid.UPID
err = s.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("HTTP request failed with code %d: %v", res.StatusCode, res.Status)
}
blob, err1 := ioutil.ReadAll(res.Body)
if err1 != nil {
return err1
}
log.V(3).Infof("Got mesos state, content length %v", len(blob))
type State struct {
Leader string `json:"leader"` // ex: master(1)@10.22.211.18:5050
}
state := &State{}
err = json.Unmarshal(blob, state)
if err != nil {
return err
}
pid, err = upid.Parse(state.Leader)
return err
})
return pid, err
}
type responseHandler func(*http.Response, error) error
// hacked from https://blog.golang.org/context
func (s *Standalone) httpDo(ctx context.Context, req *http.Request, f responseHandler) error {
// Run the HTTP request in a goroutine and pass the response to f.
ch := make(chan error, 1)
go func() { ch <- f(s.client.Do(req)) }()
select {
case <-ctx.Done():
s.tr.CancelRequest(req)
<-ch // Wait for f to return.
return ctx.Err()
case err := <-ch:
return err
}
}

View file

@ -0,0 +1,88 @@
package zoo
import (
"sync"
"time"
"github.com/samuel/go-zookeeper/zk"
)
const (
defaultSessionTimeout = 60 * time.Second
currentPath = "."
)
var zkSessionTimeout = defaultSessionTimeout
type client2 struct {
*zk.Conn
path string
done chan struct{} // signal chan, closes when the underlying connection terminates
stopOnce sync.Once
}
func connect2(hosts []string, path string) (*client2, error) {
c, ev, err := zk.Connect(hosts, zkSessionTimeout)
if err != nil {
return nil, err
}
done := make(chan struct{})
go func() {
// close the 'done' chan when the zk event chan closes (signals termination of zk connection)
defer close(done)
for {
if _, ok := <-ev; !ok {
return
}
}
}()
return &client2{
Conn: c,
path: path,
done: done,
}, nil
}
func (c *client2) stopped() <-chan struct{} {
return c.done
}
func (c *client2) stop() {
c.stopOnce.Do(c.Close)
}
func (c *client2) data(path string) (data []byte, err error) {
data, _, err = c.Get(path)
return
}
func (c *client2) watchChildren(path string) (string, <-chan []string, <-chan error) {
errCh := make(chan error, 1)
snap := make(chan []string)
watchPath := c.path
if path != "" && path != currentPath {
watchPath = watchPath + path
}
go func() {
defer close(errCh)
for {
children, _, ev, err := c.ChildrenW(watchPath)
if err != nil {
errCh <- err
return
}
select {
case snap <- children:
case <-c.done:
return
}
e := <-ev // wait for the next watch-related event
if e.Err != nil {
errCh <- e.Err
return
}
}
}()
return watchPath, snap, errCh
}

View file

@ -0,0 +1,369 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zoo
import (
"encoding/json"
"fmt"
"math"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto"
)
const (
// prefix for nodes listed at the ZK URL path
nodePrefix = "info_"
nodeJSONPrefix = "json.info_"
defaultMinDetectorCyclePeriod = 1 * time.Second
)
// reasonable default for a noop change listener
var ignoreChanged = detector.OnMasterChanged(func(*mesos.MasterInfo) {})
type zkInterface interface {
stopped() <-chan struct{}
stop()
data(string) ([]byte, error)
watchChildren(string) (string, <-chan []string, <-chan error)
}
type infoCodec func(path, node string) (*mesos.MasterInfo, error)
// Detector uses ZooKeeper to detect new leading master.
type MasterDetector struct {
client zkInterface
leaderNode string
bootstrapLock sync.RWMutex // guard against concurrent invocations of bootstrapFunc
bootstrapFunc func() error // for one-time zk client initiation
// latch: only install, at most, one ignoreChanged listener; see MasterDetector.Detect
ignoreInstalled int32
// detection should not signal master change listeners more frequently than this
minDetectorCyclePeriod time.Duration
done chan struct{}
cancel func()
}
// Internal constructor function
func NewMasterDetector(zkurls string) (*MasterDetector, error) {
zkHosts, zkPath, err := parseZk(zkurls)
if err != nil {
log.Fatalln("Failed to parse url", err)
return nil, err
}
detector := &MasterDetector{
minDetectorCyclePeriod: defaultMinDetectorCyclePeriod,
done: make(chan struct{}),
cancel: func() {},
}
detector.bootstrapFunc = func() (err error) {
if detector.client == nil {
detector.client, err = connect2(zkHosts, zkPath)
}
return
}
log.V(2).Infoln("Created new detector to watch", zkHosts, zkPath)
return detector, nil
}
func parseZk(zkurls string) ([]string, string, error) {
u, err := url.Parse(zkurls)
if err != nil {
log.V(1).Infof("failed to parse url: %v", err)
return nil, "", err
}
if u.Scheme != "zk" {
return nil, "", fmt.Errorf("invalid url scheme for zk url: '%v'", u.Scheme)
}
return strings.Split(u.Host, ","), u.Path, nil
}
// returns a chan that, when closed, indicates termination of the detector
func (md *MasterDetector) Done() <-chan struct{} {
return md.done
}
func (md *MasterDetector) Cancel() {
md.bootstrapLock.RLock()
defer md.bootstrapLock.RUnlock()
md.cancel()
}
func (md *MasterDetector) childrenChanged(path string, list []string, obs detector.MasterChanged) {
md.notifyMasterChanged(path, list, obs)
md.notifyAllMasters(path, list, obs)
}
func (md *MasterDetector) notifyMasterChanged(path string, list []string, obs detector.MasterChanged) {
// mesos v0.24 writes JSON only, v0.23 writes json and protobuf, v0.22 and prior only write protobuf
topNode, codec := md.selectTopNode(list)
if md.leaderNode == topNode {
log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path)
return
}
log.V(2).Infof("changing leader node from %q -> %q", md.leaderNode, topNode)
md.leaderNode = topNode
var masterInfo *mesos.MasterInfo
if md.leaderNode != "" {
var err error
if masterInfo, err = codec(path, topNode); err != nil {
log.Errorln(err.Error())
}
}
log.V(2).Infof("detected master info: %+v", masterInfo)
logPanic(func() { obs.OnMasterChanged(masterInfo) })
}
// logPanic safely executes the given func, recovering from and logging a panic if one occurs.
func logPanic(f func()) {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from client panic: %v", r)
}
}()
f()
}
func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node))
if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
}
masterInfo := &mesos.MasterInfo{}
err = proto.Unmarshal(data, masterInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal protobuf MasterInfo data from zookeeper: %v", err)
}
return masterInfo, nil
}
func (md *MasterDetector) pullMasterJsonInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node))
if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
}
masterInfo := &mesos.MasterInfo{}
err = json.Unmarshal(data, masterInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json MasterInfo data from zookeeper: %v", err)
}
return masterInfo, nil
}
func (md *MasterDetector) notifyAllMasters(path string, list []string, obs detector.MasterChanged) {
all, ok := obs.(detector.AllMasters)
if !ok {
// not interested in entire master list
return
}
// mesos v0.24 writes JSON only, v0.23 writes json and protobuf, v0.22 and prior only write protobuf
masters := map[string]*mesos.MasterInfo{}
tryStore := func(node string, codec infoCodec) {
info, err := codec(path, node)
if err != nil {
log.Errorln(err.Error())
} else {
masters[info.GetId()] = info
}
}
for _, node := range list {
// compare https://github.com/apache/mesos/blob/0.23.0/src/master/detector.cpp#L437
if strings.HasPrefix(node, nodePrefix) {
tryStore(node, md.pullMasterInfo)
} else if strings.HasPrefix(node, nodeJSONPrefix) {
tryStore(node, md.pullMasterJsonInfo)
} else {
continue
}
}
masterList := make([]*mesos.MasterInfo, 0, len(masters))
for _, v := range masters {
masterList = append(masterList, v)
}
log.V(2).Infof("notifying of master membership change: %+v", masterList)
logPanic(func() { all.UpdatedMasters(masterList) })
}
func (md *MasterDetector) callBootstrap() (e error) {
log.V(2).Infoln("invoking detector boostrap")
md.bootstrapLock.Lock()
defer md.bootstrapLock.Unlock()
clientConfigured := md.client != nil
if e = md.bootstrapFunc(); e == nil && !clientConfigured && md.client != nil {
// chain the lifetime of this detector to that of the newly created client impl
client := md.client
md.cancel = client.stop
go func() {
defer close(md.done)
<-client.stopped()
}()
}
return
}
// the first call to Detect will kickstart a connection to zookeeper. a nil change listener may
// be spec'd, result of which is a detector that will still listen for master changes and record
// leaderhip changes internally but no listener would be notified. Detect may be called more than
// once, and each time the spec'd listener will be added to the list of those receiving notifications.
func (md *MasterDetector) Detect(f detector.MasterChanged) (err error) {
// kickstart zk client connectivity
if err := md.callBootstrap(); err != nil {
log.V(3).Infoln("failed to execute bootstrap function", err.Error())
return err
}
if f == nil {
// only ever install, at most, one ignoreChanged listener. multiple instances of it
// just consume resources and generate misleading log messages.
if !atomic.CompareAndSwapInt32(&md.ignoreInstalled, 0, 1) {
log.V(3).Infoln("ignoreChanged listener already installed")
return
}
f = ignoreChanged
}
log.V(3).Infoln("spawning detect()")
go md.detect(f)
return nil
}
func (md *MasterDetector) detect(f detector.MasterChanged) {
log.V(3).Infoln("detecting children at", currentPath)
detectLoop:
for {
select {
case <-md.Done():
return
default:
}
log.V(3).Infoln("watching children at", currentPath)
path, childrenCh, errCh := md.client.watchChildren(currentPath)
rewatch := false
for {
started := time.Now()
select {
case children := <-childrenCh:
md.childrenChanged(path, children, f)
case err, ok := <-errCh:
// check for a tie first (required for predictability (tests)); the downside of
// doing this is that a listener might get two callbacks back-to-back ("new leader",
// followed by "no leader").
select {
case children := <-childrenCh:
md.childrenChanged(path, children, f)
default:
}
if ok {
log.V(1).Infoln("child watch ended with error, master lost; error was:", err.Error())
} else {
// detector shutdown likely...
log.V(1).Infoln("child watch ended, master lost")
}
select {
case <-md.Done():
return
default:
if md.leaderNode != "" {
log.V(2).Infof("changing leader node from %q -> \"\"", md.leaderNode)
md.leaderNode = ""
f.OnMasterChanged(nil)
}
}
rewatch = true
}
// rate-limit master changes
if elapsed := time.Now().Sub(started); elapsed > 0 {
log.V(2).Infoln("resting before next detection cycle")
select {
case <-md.Done():
return
case <-time.After(md.minDetectorCyclePeriod - elapsed): // noop
}
}
if rewatch {
continue detectLoop
}
}
}
}
func (md *MasterDetector) selectTopNode(list []string) (topNode string, codec infoCodec) {
// mesos v0.24 writes JSON only, v0.23 writes json and protobuf, v0.22 and prior only write protobuf
topNode = selectTopNodePrefix(list, nodeJSONPrefix)
codec = md.pullMasterJsonInfo
if topNode == "" {
topNode = selectTopNodePrefix(list, nodePrefix)
codec = md.pullMasterInfo
if topNode != "" {
log.Warningf("Leading master is using a Protobuf binary format when registering "+
"with Zookeeper (%s): this will be deprecated as of Mesos 0.24 (see MESOS-2340).",
topNode)
}
}
return
}
func selectTopNodePrefix(list []string, pre string) (node string) {
var leaderSeq uint64 = math.MaxUint64
for _, v := range list {
if !strings.HasPrefix(v, pre) {
continue // only care about participants
}
seqStr := strings.TrimPrefix(v, pre)
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
log.Warningf("unexpected zk node format '%s': %v", seqStr, err)
continue
}
if seq < leaderSeq {
leaderSeq = seq
node = v
}
}
if node == "" {
log.V(3).Infoln("No top node found.")
} else {
log.V(3).Infof("Top node selected: '%s'", node)
}
return node
}

View file

@ -0,0 +1,3 @@
// Zookeeper-based mesos-master leaderhip detection.
// Implements support for optional detector.AllMasters interface.
package zoo

View file

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zoo
import (
"github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/mock"
)
// Impersontates a zk.Connection
// It implements interface Connector
type MockConnector struct {
mock.Mock
}
func NewMockConnector() *MockConnector {
return new(MockConnector)
}
func (conn *MockConnector) Close() {
conn.Called()
}
func (conn *MockConnector) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) {
args := conn.Called(path)
var (
arg0 []string
arg1 *zk.Stat
arg2 <-chan zk.Event
)
if args.Get(0) != nil {
arg0 = args.Get(0).([]string)
}
if args.Get(1) != nil {
arg1 = args.Get(1).(*zk.Stat)
}
if args.Get(2) != nil {
arg2 = args.Get(2).(<-chan zk.Event)
}
return arg0, arg1, arg2, args.Error(3)
}
func (conn *MockConnector) Children(path string) ([]string, *zk.Stat, error) {
args := conn.Called(path)
return args.Get(0).([]string),
args.Get(1).(*zk.Stat),
args.Error(2)
}
func (conn *MockConnector) Get(path string) ([]byte, *zk.Stat, error) {
args := conn.Called(path)
return args.Get(0).([]byte),
args.Get(1).(*zk.Stat),
args.Error(2)
}

View file

@ -0,0 +1,11 @@
package zoo
import (
"github.com/mesos/mesos-go/detector"
)
func init() {
detector.Register("zk://", detector.PluginFactory(func(spec string) (detector.Master, error) {
return NewMasterDetector(spec)
}))
}

View file

@ -0,0 +1,27 @@
package zoo
import (
"github.com/samuel/go-zookeeper/zk"
)
// Connector Interface to facade zk.Conn type
// since github.com/samuel/go-zookeeper/zk does not provide an interface
// for the zk.Conn object, this allows for mocking and easier testing.
type Connector interface {
Close()
Children(string) ([]string, *zk.Stat, error)
ChildrenW(string) ([]string, *zk.Stat, <-chan zk.Event, error)
Get(string) ([]byte, *zk.Stat, error)
}
//Factory is an adapter to trap the creation of zk.Conn instances
//since the official zk API does not expose an interface for zk.Conn.
type Factory interface {
create() (Connector, <-chan zk.Event, error)
}
type asFactory func() (Connector, <-chan zk.Event, error)
func (f asFactory) create() (Connector, <-chan zk.Event, error) {
return f()
}