1
0
Fork 0

Merge v1.2.1-master

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2017-04-11 17:10:46 +02:00
parent a590155b0b
commit aeb17182b4
No known key found for this signature in database
GPG key ID: D808B4C167352E59
396 changed files with 27271 additions and 9969 deletions

View file

@ -1,133 +0,0 @@
package main
import (
"github.com/hashicorp/serf/command"
"github.com/hashicorp/serf/command/agent"
"github.com/mitchellh/cli"
"os"
"os/signal"
)
// Commands is the mapping of all the available Serf commands.
var Commands map[string]cli.CommandFactory
func init() {
ui := &cli.BasicUi{Writer: os.Stdout}
Commands = map[string]cli.CommandFactory{
"agent": func() (cli.Command, error) {
return &agent.Command{
Ui: ui,
ShutdownCh: make(chan struct{}),
}, nil
},
"event": func() (cli.Command, error) {
return &command.EventCommand{
Ui: ui,
}, nil
},
"query": func() (cli.Command, error) {
return &command.QueryCommand{
ShutdownCh: makeShutdownCh(),
Ui: ui,
}, nil
},
"force-leave": func() (cli.Command, error) {
return &command.ForceLeaveCommand{
Ui: ui,
}, nil
},
"join": func() (cli.Command, error) {
return &command.JoinCommand{
Ui: ui,
}, nil
},
"keygen": func() (cli.Command, error) {
return &command.KeygenCommand{
Ui: ui,
}, nil
},
"keys": func() (cli.Command, error) {
return &command.KeysCommand{
Ui: ui,
}, nil
},
"leave": func() (cli.Command, error) {
return &command.LeaveCommand{
Ui: ui,
}, nil
},
"members": func() (cli.Command, error) {
return &command.MembersCommand{
Ui: ui,
}, nil
},
"monitor": func() (cli.Command, error) {
return &command.MonitorCommand{
ShutdownCh: makeShutdownCh(),
Ui: ui,
}, nil
},
"tags": func() (cli.Command, error) {
return &command.TagsCommand{
Ui: ui,
}, nil
},
"reachability": func() (cli.Command, error) {
return &command.ReachabilityCommand{
ShutdownCh: makeShutdownCh(),
Ui: ui,
}, nil
},
"rtt": func() (cli.Command, error) {
return &command.RTTCommand{
Ui: ui,
}, nil
},
"info": func() (cli.Command, error) {
return &command.InfoCommand{
Ui: ui,
}, nil
},
"version": func() (cli.Command, error) {
return &command.VersionCommand{
Revision: GitCommit,
Version: Version,
VersionPrerelease: VersionPrerelease,
Ui: ui,
}, nil
},
}
}
// makeShutdownCh returns a channel that can be used for shutdown
// notifications for commands. This channel will send a message for every
// interrupt received.
func makeShutdownCh() <-chan struct{} {
resultCh := make(chan struct{})
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt)
go func() {
for {
<-signalCh
resultCh <- struct{}{}
}
}()
return resultCh
}

View file

@ -1,44 +0,0 @@
package main
import (
"fmt"
"github.com/mitchellh/cli"
"io/ioutil"
"log"
"os"
)
func main() {
os.Exit(realMain())
}
func realMain() int {
log.SetOutput(ioutil.Discard)
// Get the command line args. We shortcut "--version" and "-v" to
// just show the version.
args := os.Args[1:]
for _, arg := range args {
if arg == "-v" || arg == "--version" {
newArgs := make([]string, len(args)+1)
newArgs[0] = "version"
copy(newArgs[1:], args)
args = newArgs
break
}
}
cli := &cli.CLI{
Args: args,
Commands: Commands,
HelpFunc: cli.BasicHelpFunc("serf"),
}
exitCode, err := cli.Run()
if err != nil {
fmt.Fprintf(os.Stderr, "Error executing CLI: %s\n", err.Error())
return 1
}
return exitCode
}

View file

@ -1,27 +0,0 @@
package serf
import (
"github.com/hashicorp/memberlist"
)
// broadcast is an implementation of memberlist.Broadcast and is used
// to manage broadcasts across the memberlist channel that are related
// only to Serf.
type broadcast struct {
msg []byte
notify chan<- struct{}
}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
func (b *broadcast) Message() []byte {
return b.msg
}
func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}

View file

@ -1,80 +0,0 @@
package serf
import (
"time"
)
// coalescer is a simple interface that must be implemented to be
// used inside of a coalesceLoop
type coalescer interface {
// Can the coalescer handle this event, if not it is
// directly passed through to the destination channel
Handle(Event) bool
// Invoked to coalesce the given event
Coalesce(Event)
// Invoked to flush the coalesced events
Flush(outChan chan<- Event)
}
// coalescedEventCh returns an event channel where the events are coalesced
// using the given coalescer.
func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
inCh := make(chan Event, 1024)
go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
return inCh
}
// coalesceLoop is a simple long-running routine that manages the high-level
// flow of coalescing based on quiescence and a maximum quantum period.
func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
var quiescent <-chan time.Time
var quantum <-chan time.Time
shutdown := false
INGEST:
// Reset the timers
quantum = nil
quiescent = nil
for {
select {
case e := <-inCh:
// Ignore any non handled events
if !c.Handle(e) {
outCh <- e
continue
}
// Start a new quantum if we need to
// and restart the quiescent timer
if quantum == nil {
quantum = time.After(coalescePeriod)
}
quiescent = time.After(quiescentPeriod)
// Coalesce the event
c.Coalesce(e)
case <-quantum:
goto FLUSH
case <-quiescent:
goto FLUSH
case <-shutdownCh:
shutdown = true
goto FLUSH
}
}
FLUSH:
// Flush the coalesced events
c.Flush(outCh)
// Restart ingestion if we are not done
if !shutdown {
goto INGEST
}
}

View file

@ -1,68 +0,0 @@
package serf
type coalesceEvent struct {
Type EventType
Member *Member
}
type memberEventCoalescer struct {
lastEvents map[string]EventType
latestEvents map[string]coalesceEvent
}
func (c *memberEventCoalescer) Handle(e Event) bool {
switch e.EventType() {
case EventMemberJoin:
return true
case EventMemberLeave:
return true
case EventMemberFailed:
return true
case EventMemberUpdate:
return true
case EventMemberReap:
return true
default:
return false
}
}
func (c *memberEventCoalescer) Coalesce(raw Event) {
e := raw.(MemberEvent)
for _, m := range e.Members {
c.latestEvents[m.Name] = coalesceEvent{
Type: e.Type,
Member: &m,
}
}
}
func (c *memberEventCoalescer) Flush(outCh chan<- Event) {
// Coalesce the various events we got into a single set of events.
events := make(map[EventType]*MemberEvent)
for name, cevent := range c.latestEvents {
previous, ok := c.lastEvents[name]
// If we sent the same event before, then ignore
// unless it is a MemberUpdate
if ok && previous == cevent.Type && cevent.Type != EventMemberUpdate {
continue
}
// Update our last event
c.lastEvents[name] = cevent.Type
// Add it to our event
newEvent, ok := events[cevent.Type]
if !ok {
newEvent = &MemberEvent{Type: cevent.Type}
events[cevent.Type] = newEvent
}
newEvent.Members = append(newEvent.Members, *cevent.Member)
}
// Send out those events
for _, event := range events {
outCh <- *event
}
}

View file

@ -1,52 +0,0 @@
package serf
type latestUserEvents struct {
LTime LamportTime
Events []Event
}
type userEventCoalescer struct {
// Maps an event name into the latest versions
events map[string]*latestUserEvents
}
func (c *userEventCoalescer) Handle(e Event) bool {
// Only handle EventUser messages
if e.EventType() != EventUser {
return false
}
// Check if coalescing is enabled
user := e.(UserEvent)
return user.Coalesce
}
func (c *userEventCoalescer) Coalesce(e Event) {
user := e.(UserEvent)
latest, ok := c.events[user.Name]
// Create a new entry if there are none, or
// if this message has the newest LTime
if !ok || latest.LTime < user.LTime {
latest = &latestUserEvents{
LTime: user.LTime,
Events: []Event{e},
}
c.events[user.Name] = latest
return
}
// If the the same age, save it
if latest.LTime == user.LTime {
latest.Events = append(latest.Events, e)
}
}
func (c *userEventCoalescer) Flush(outChan chan<- Event) {
for _, latest := range c.events {
for _, e := range latest.Events {
outChan <- e
}
}
c.events = make(map[string]*latestUserEvents)
}

View file

@ -1,259 +0,0 @@
package serf
import (
"io"
"os"
"time"
"github.com/hashicorp/memberlist"
)
// ProtocolVersionMap is the mapping of Serf delegate protocol versions
// to memberlist protocol versions. We mask the memberlist protocols using
// our own protocol version.
var ProtocolVersionMap map[uint8]uint8
func init() {
ProtocolVersionMap = map[uint8]uint8{
4: 2,
3: 2,
2: 2,
}
}
// Config is the configuration for creating a Serf instance.
type Config struct {
// The name of this node. This must be unique in the cluster. If this
// is not set, Serf will set it to the hostname of the running machine.
NodeName string
// The tags for this role, if any. This is used to provide arbitrary
// key/value metadata per-node. For example, a "role" tag may be used to
// differentiate "load-balancer" from a "web" role as parts of the same cluster.
// Tags are deprecating 'Role', and instead it acts as a special key in this
// map.
Tags map[string]string
// EventCh is a channel that receives all the Serf events. The events
// are sent on this channel in proper ordering. Care must be taken that
// this channel doesn't block, either by processing the events quick
// enough or buffering the channel, otherwise it can block state updates
// within Serf itself. If no EventCh is specified, no events will be fired,
// but point-in-time snapshots of members can still be retrieved by
// calling Members on Serf.
EventCh chan<- Event
// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion uint8
// BroadcastTimeout is the amount of time to wait for a broadcast
// message to be sent to the cluster. Broadcast messages are used for
// things like leave messages and force remove messages. If this is not
// set, a timeout of 5 seconds will be set.
BroadcastTimeout time.Duration
// The settings below relate to Serf's event coalescence feature. Serf
// is able to coalesce multiple events into single events in order to
// reduce the amount of noise that is sent along the EventCh. For example
// if five nodes quickly join, the EventCh will be sent one EventMemberJoin
// containing the five nodes rather than five individual EventMemberJoin
// events. Coalescence can mitigate potential flapping behavior.
//
// Coalescence is disabled by default and can be enabled by setting
// CoalescePeriod.
//
// CoalescePeriod specifies the time duration to coalesce events.
// For example, if this is set to 5 seconds, then all events received
// within 5 seconds that can be coalesced will be.
//
// QuiescentPeriod specifies the duration of time where if no events
// are received, coalescence immediately happens. For example, if
// CoalscePeriod is set to 10 seconds but QuiscentPeriod is set to 2
// seconds, then the events will be coalesced and dispatched if no
// new events are received within 2 seconds of the last event. Otherwise,
// every event will always be delayed by at least 10 seconds.
CoalescePeriod time.Duration
QuiescentPeriod time.Duration
// The settings below relate to Serf's user event coalescing feature.
// The settings operate like above but only affect user messages and
// not the Member* messages that Serf generates.
UserCoalescePeriod time.Duration
UserQuiescentPeriod time.Duration
// The settings below relate to Serf keeping track of recently
// failed/left nodes and attempting reconnects.
//
// ReapInterval is the interval when the reaper runs. If this is not
// set (it is zero), it will be set to a reasonable default.
//
// ReconnectInterval is the interval when we attempt to reconnect
// to failed nodes. If this is not set (it is zero), it will be set
// to a reasonable default.
//
// ReconnectTimeout is the amount of time to attempt to reconnect to
// a failed node before giving up and considering it completely gone.
//
// TombstoneTimeout is the amount of time to keep around nodes
// that gracefully left as tombstones for syncing state with other
// Serf nodes.
ReapInterval time.Duration
ReconnectInterval time.Duration
ReconnectTimeout time.Duration
TombstoneTimeout time.Duration
// FlapTimeout is the amount of time less than which we consider a node
// being failed and rejoining looks like a flap for telemetry purposes.
// This should be set less than a typical reboot time, but large enough
// to see actual events, given our expected detection times for a failed
// node.
FlapTimeout time.Duration
// QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered
// faster than they can be disseminated
QueueDepthWarning int
// MaxQueueDepth is used to start dropping messages if the number
// of queued messages to broadcast exceeds this number. This is to
// prevent an unbounded growth of memory utilization
MaxQueueDepth int
// RecentIntentBuffer is used to set the size of recent join and leave intent
// messages that will be buffered. This is used to guard against
// the case where Serf broadcasts an intent that arrives before the
// Memberlist event. It is important that this not be too small to avoid
// continuous rebroadcasting of dead events.
RecentIntentBuffer int
// EventBuffer is used to control how many events are buffered.
// This is used to prevent re-delivery of events to a client. The buffer
// must be large enough to handle all "recent" events, since Serf will
// not deliver messages that are older than the oldest entry in the buffer.
// Thus if a client is generating too many events, it's possible that the
// buffer gets overrun and messages are not delivered.
EventBuffer int
// QueryBuffer is used to control how many queries are buffered.
// This is used to prevent re-delivery of queries to a client. The buffer
// must be large enough to handle all "recent" events, since Serf will not
// deliver queries older than the oldest entry in the buffer.
// Thus if a client is generating too many queries, it's possible that the
// buffer gets overrun and messages are not delivered.
QueryBuffer int
// QueryTimeoutMult configures the default timeout multipler for a query to run if no
// specific value is provided. Queries are real-time by nature, where the
// reply is time sensitive. As a result, results are collected in an async
// fashion, however the query must have a bounded duration. We want the timeout
// to be long enough that all nodes have time to receive the message, run a handler,
// and generate a reply. Once the timeout is exceeded, any further replies are ignored.
// The default value is
//
// Timeout = GossipInterval * QueryTimeoutMult * log(N+1)
//
QueryTimeoutMult int
// QueryResponseSizeLimit and QuerySizeLimit limit the inbound and
// outbound payload sizes for queries, respectively. These must fit
// in a UDP packet with some additional overhead, so tuning these
// past the default values of 1024 will depend on your network
// configuration.
QueryResponseSizeLimit int
QuerySizeLimit int
// MemberlistConfig is the memberlist configuration that Serf will
// use to do the underlying membership management and gossip. Some
// fields in the MemberlistConfig will be overwritten by Serf no
// matter what:
//
// * Name - This will always be set to the same as the NodeName
// in this configuration.
//
// * Events - Serf uses a custom event delegate.
//
// * Delegate - Serf uses a custom delegate.
//
MemberlistConfig *memberlist.Config
// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
// SnapshotPath if provided is used to snapshot live nodes as well
// as lamport clock values. When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
// succeeds and will also avoid replaying old user events.
SnapshotPath string
// RejoinAfterLeave controls our interaction with the snapshot file.
// When set to false (default), a leave causes a Serf to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool
// EnableNameConflictResolution controls if Serf will actively attempt
// to resolve a name conflict. Since each Serf member must have a unique
// name, a cluster can run into issues if multiple nodes claim the same
// name. Without automatic resolution, Serf merely logs some warnings, but
// otherwise does not take any action. Automatic resolution detects the
// conflict and issues a special query which asks the cluster for the
// Name -> IP:Port mapping. If there is a simple majority of votes, that
// node stays while the other node will leave the cluster and exit.
EnableNameConflictResolution bool
// DisableCoordinates controls if Serf will maintain an estimate of this
// node's network coordinate internally. A network coordinate is useful
// for estimating the network distance (i.e. round trip time) between
// two nodes. Enabling this option adds some overhead to ping messages.
DisableCoordinates bool
// KeyringFile provides the location of a writable file where Serf can
// persist changes to the encryption keyring.
KeyringFile string
// Merge can be optionally provided to intercept a cluster merge
// and conditionally abort the merge.
Merge MergeDelegate
}
// Init allocates the subdata structures
func (c *Config) Init() {
if c.Tags == nil {
c.Tags = make(map[string]string)
}
}
// DefaultConfig returns a Config struct that contains reasonable defaults
// for most of the configurations.
func DefaultConfig() *Config {
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
return &Config{
NodeName: hostname,
BroadcastTimeout: 5 * time.Second,
EventBuffer: 512,
QueryBuffer: 512,
LogOutput: os.Stderr,
ProtocolVersion: ProtocolVersionMax,
ReapInterval: 15 * time.Second,
RecentIntentBuffer: 128,
ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour,
QueueDepthWarning: 128,
MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour,
FlapTimeout: 60 * time.Second,
MemberlistConfig: memberlist.DefaultLANConfig(),
QueryTimeoutMult: 16,
QueryResponseSizeLimit: 1024,
QuerySizeLimit: 1024,
EnableNameConflictResolution: true,
DisableCoordinates: false,
}
}

View file

@ -1,13 +0,0 @@
package serf
import (
"github.com/hashicorp/memberlist"
)
type conflictDelegate struct {
serf *Serf
}
func (c *conflictDelegate) NotifyConflict(existing, other *memberlist.Node) {
c.serf.handleNodeConflict(existing, other)
}

View file

@ -1,254 +0,0 @@
package serf
import (
"fmt"
"github.com/armon/go-metrics"
)
// delegate is the memberlist.Delegate implementation that Serf uses.
type delegate struct {
serf *Serf
}
func (d *delegate) NodeMeta(limit int) []byte {
roleBytes := d.serf.encodeTags(d.serf.config.Tags)
if len(roleBytes) > limit {
panic(fmt.Errorf("Node tags '%v' exceeds length limit of %d bytes", d.serf.config.Tags, limit))
}
return roleBytes
}
func (d *delegate) NotifyMsg(buf []byte) {
// If we didn't actually receive any data, then ignore it.
if len(buf) == 0 {
return
}
metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))
rebroadcast := false
rebroadcastQueue := d.serf.broadcasts
t := messageType(buf[0])
switch t {
case messageLeaveType:
var leave messageLeave
if err := decodeMessage(buf[1:], &leave); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding leave message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageLeaveType: %s", leave.Node)
rebroadcast = d.serf.handleNodeLeaveIntent(&leave)
case messageJoinType:
var join messageJoin
if err := decodeMessage(buf[1:], &join); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding join message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageJoinType: %s", join.Node)
rebroadcast = d.serf.handleNodeJoinIntent(&join)
case messageUserEventType:
var event messageUserEvent
if err := decodeMessage(buf[1:], &event); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
rebroadcast = d.serf.handleUserEvent(&event)
rebroadcastQueue = d.serf.eventBroadcasts
case messageQueryType:
var query messageQuery
if err := decodeMessage(buf[1:], &query); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
rebroadcast = d.serf.handleQuery(&query)
rebroadcastQueue = d.serf.queryBroadcasts
case messageQueryResponseType:
var resp messageQueryResponse
if err := decodeMessage(buf[1:], &resp); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding query response message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
d.serf.handleQueryResponse(&resp)
default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
}
if rebroadcast {
// Copy the buffer since it we cannot rely on the slice not changing
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
rebroadcastQueue.QueueBroadcast(&broadcast{
msg: newBuf,
notify: nil,
})
}
}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
msgs := d.serf.broadcasts.GetBroadcasts(overhead, limit)
// Determine the bytes used already
bytesUsed := 0
for _, msg := range msgs {
lm := len(msg)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
}
// Get any additional query broadcasts
queryMsgs := d.serf.queryBroadcasts.GetBroadcasts(overhead, limit-bytesUsed)
if queryMsgs != nil {
for _, m := range queryMsgs {
lm := len(m)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
}
msgs = append(msgs, queryMsgs...)
}
// Get any additional event broadcasts
eventMsgs := d.serf.eventBroadcasts.GetBroadcasts(overhead, limit-bytesUsed)
if eventMsgs != nil {
for _, m := range eventMsgs {
lm := len(m)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
}
msgs = append(msgs, eventMsgs...)
}
return msgs
}
func (d *delegate) LocalState(join bool) []byte {
d.serf.memberLock.RLock()
defer d.serf.memberLock.RUnlock()
d.serf.eventLock.RLock()
defer d.serf.eventLock.RUnlock()
// Create the message to send
pp := messagePushPull{
LTime: d.serf.clock.Time(),
StatusLTimes: make(map[string]LamportTime, len(d.serf.members)),
LeftMembers: make([]string, 0, len(d.serf.leftMembers)),
EventLTime: d.serf.eventClock.Time(),
Events: d.serf.eventBuffer,
QueryLTime: d.serf.queryClock.Time(),
}
// Add all the join LTimes
for name, member := range d.serf.members {
pp.StatusLTimes[name] = member.statusLTime
}
// Add all the left nodes
for _, member := range d.serf.leftMembers {
pp.LeftMembers = append(pp.LeftMembers, member.Name)
}
// Encode the push pull state
buf, err := encodeMessage(messagePushPullType, &pp)
if err != nil {
d.serf.logger.Printf("[ERR] serf: Failed to encode local state: %v", err)
return nil
}
return buf
}
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
// Ensure we have a message
if len(buf) == 0 {
d.serf.logger.Printf("[ERR] serf: Remote state is zero bytes")
return
}
// Check the message type
if messageType(buf[0]) != messagePushPullType {
d.serf.logger.Printf("[ERR] serf: Remote state has bad type prefix: %v", buf[0])
return
}
// Attempt a decode
pp := messagePushPull{}
if err := decodeMessage(buf[1:], &pp); err != nil {
d.serf.logger.Printf("[ERR] serf: Failed to decode remote state: %v", err)
return
}
// Witness the Lamport clocks first.
// We subtract 1 since no message with that clock has been sent yet
if pp.LTime > 0 {
d.serf.clock.Witness(pp.LTime - 1)
}
if pp.EventLTime > 0 {
d.serf.eventClock.Witness(pp.EventLTime - 1)
}
if pp.QueryLTime > 0 {
d.serf.queryClock.Witness(pp.QueryLTime - 1)
}
// Process the left nodes first to avoid the LTimes from being increment
// in the wrong order
leftMap := make(map[string]struct{}, len(pp.LeftMembers))
leave := messageLeave{}
for _, name := range pp.LeftMembers {
leftMap[name] = struct{}{}
leave.LTime = pp.StatusLTimes[name]
leave.Node = name
d.serf.handleNodeLeaveIntent(&leave)
}
// Update any other LTimes
join := messageJoin{}
for name, statusLTime := range pp.StatusLTimes {
// Skip the left nodes
if _, ok := leftMap[name]; ok {
continue
}
// Create an artificial join message
join.LTime = statusLTime
join.Node = name
d.serf.handleNodeJoinIntent(&join)
}
// If we are doing a join, and eventJoinIgnore is set
// then we set the eventMinTime to the EventLTime. This
// prevents any of the incoming events from being processed
if isJoin && d.serf.eventJoinIgnore {
d.serf.eventLock.Lock()
if pp.EventLTime > d.serf.eventMinTime {
d.serf.eventMinTime = pp.EventLTime
}
d.serf.eventLock.Unlock()
}
// Process all the events
userEvent := messageUserEvent{}
for _, events := range pp.Events {
if events == nil {
continue
}
userEvent.LTime = events.LTime
for _, e := range events.Events {
userEvent.Name = e.Name
userEvent.Payload = e.Payload
d.serf.handleUserEvent(&userEvent)
}
}
}

View file

@ -1,168 +0,0 @@
package serf
import (
"fmt"
"net"
"sync"
"time"
)
// EventType are all the types of events that may occur and be sent
// along the Serf channel.
type EventType int
const (
EventMemberJoin EventType = iota
EventMemberLeave
EventMemberFailed
EventMemberUpdate
EventMemberReap
EventUser
EventQuery
)
func (t EventType) String() string {
switch t {
case EventMemberJoin:
return "member-join"
case EventMemberLeave:
return "member-leave"
case EventMemberFailed:
return "member-failed"
case EventMemberUpdate:
return "member-update"
case EventMemberReap:
return "member-reap"
case EventUser:
return "user"
case EventQuery:
return "query"
default:
panic(fmt.Sprintf("unknown event type: %d", t))
}
}
// Event is a generic interface for exposing Serf events
// Clients will usually need to use a type switches to get
// to a more useful type
type Event interface {
EventType() EventType
String() string
}
// MemberEvent is the struct used for member related events
// Because Serf coalesces events, an event may contain multiple members.
type MemberEvent struct {
Type EventType
Members []Member
}
func (m MemberEvent) EventType() EventType {
return m.Type
}
func (m MemberEvent) String() string {
switch m.Type {
case EventMemberJoin:
return "member-join"
case EventMemberLeave:
return "member-leave"
case EventMemberFailed:
return "member-failed"
case EventMemberUpdate:
return "member-update"
case EventMemberReap:
return "member-reap"
default:
panic(fmt.Sprintf("unknown event type: %d", m.Type))
}
}
// UserEvent is the struct used for events that are triggered
// by the user and are not related to members
type UserEvent struct {
LTime LamportTime
Name string
Payload []byte
Coalesce bool
}
func (u UserEvent) EventType() EventType {
return EventUser
}
func (u UserEvent) String() string {
return fmt.Sprintf("user-event: %s", u.Name)
}
// Query is the struct used EventQuery type events
type Query struct {
LTime LamportTime
Name string
Payload []byte
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
respLock sync.Mutex
}
func (q *Query) EventType() EventType {
return EventQuery
}
func (q *Query) String() string {
return fmt.Sprintf("query: %s", q.Name)
}
// Deadline returns the time by which a response must be sent
func (q *Query) Deadline() time.Time {
return q.deadline
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
q.respLock.Lock()
defer q.respLock.Unlock()
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("Response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("Response is past the deadline")
}
// Create response
resp := messageQueryResponse{
LTime: q.LTime,
ID: q.id,
From: q.serf.config.NodeName,
Payload: buf,
}
// Format the response
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
// Send the response
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
// Clera the deadline, response sent
q.deadline = time.Time{}
return nil
}

View file

@ -1,21 +0,0 @@
package serf
import (
"github.com/hashicorp/memberlist"
)
type eventDelegate struct {
serf *Serf
}
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
e.serf.handleNodeJoin(n)
}
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.serf.handleNodeLeave(n)
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
e.serf.handleNodeUpdate(n)
}

View file

@ -1,312 +0,0 @@
package serf
import (
"encoding/base64"
"log"
"strings"
)
const (
// This is the prefix we use for queries that are internal to Serf.
// They are handled internally, and not forwarded to a client.
InternalQueryPrefix = "_serf_"
// pingQuery is run to check for reachability
pingQuery = "ping"
// conflictQuery is run to resolve a name conflict
conflictQuery = "conflict"
// installKeyQuery is used to install a new key
installKeyQuery = "install-key"
// useKeyQuery is used to change the primary encryption key
useKeyQuery = "use-key"
// removeKeyQuery is used to remove a key from the keyring
removeKeyQuery = "remove-key"
// listKeysQuery is used to list all known keys in the cluster
listKeysQuery = "list-keys"
)
// internalQueryName is used to generate a query name for an internal query
func internalQueryName(name string) string {
return InternalQueryPrefix + name
}
// serfQueries is used to listen for queries that start with
// _serf and respond to them as appropriate.
type serfQueries struct {
inCh chan Event
logger *log.Logger
outCh chan<- Event
serf *Serf
shutdownCh <-chan struct{}
}
// nodeKeyResponse is used to store the result from an individual node while
// replying to key modification queries
type nodeKeyResponse struct {
// Result indicates true/false if there were errors or not
Result bool
// Message contains error messages or other information
Message string
// Keys is used in listing queries to relay a list of installed keys
Keys []string
}
// newSerfQueries is used to create a new serfQueries. We return an event
// channel that is ingested and forwarded to an outCh. Any Queries that
// have the InternalQueryPrefix are handled instead of forwarded.
func newSerfQueries(serf *Serf, logger *log.Logger, outCh chan<- Event, shutdownCh <-chan struct{}) (chan<- Event, error) {
inCh := make(chan Event, 1024)
q := &serfQueries{
inCh: inCh,
logger: logger,
outCh: outCh,
serf: serf,
shutdownCh: shutdownCh,
}
go q.stream()
return inCh, nil
}
// stream is a long running routine to ingest the event stream
func (s *serfQueries) stream() {
for {
select {
case e := <-s.inCh:
// Check if this is a query we should process
if q, ok := e.(*Query); ok && strings.HasPrefix(q.Name, InternalQueryPrefix) {
go s.handleQuery(q)
} else if s.outCh != nil {
s.outCh <- e
}
case <-s.shutdownCh:
return
}
}
}
// handleQuery is invoked when we get an internal query
func (s *serfQueries) handleQuery(q *Query) {
// Get the queryName after the initial prefix
queryName := q.Name[len(InternalQueryPrefix):]
switch queryName {
case pingQuery:
// Nothing to do, we will ack the query
case conflictQuery:
s.handleConflict(q)
case installKeyQuery:
s.handleInstallKey(q)
case useKeyQuery:
s.handleUseKey(q)
case removeKeyQuery:
s.handleRemoveKey(q)
case listKeysQuery:
s.handleListKeys(q)
default:
s.logger.Printf("[WARN] serf: Unhandled internal query '%s'", queryName)
}
}
// handleConflict is invoked when we get a query that is attempting to
// disambiguate a name conflict. They payload is a node name, and the response
// should the address we believe that node is at, if any.
func (s *serfQueries) handleConflict(q *Query) {
// The target node name is the payload
node := string(q.Payload)
// Do not respond to the query if it is about us
if node == s.serf.config.NodeName {
return
}
s.logger.Printf("[DEBUG] serf: Got conflict resolution query for '%s'", node)
// Look for the member info
var out *Member
s.serf.memberLock.Lock()
if member, ok := s.serf.members[node]; ok {
out = &member.Member
}
s.serf.memberLock.Unlock()
// Encode the response
buf, err := encodeMessage(messageConflictResponseType, out)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode conflict query response: %v", err)
return
}
// Send our answer
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to conflict query: %v", err)
}
}
// sendKeyResponse handles responding to key-related queries.
func (s *serfQueries) sendKeyResponse(q *Query, resp *nodeKeyResponse) {
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err)
return
}
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
}
}
// handleInstallKey is invoked whenever a new encryption key is received from
// another member in the cluster, and handles the process of installing it onto
// the memberlist keyring. This type of query may fail if the provided key does
// not fit the constraints that memberlist enforces. If the query fails, the
// response will contain the error message so that it may be relayed.
func (s *serfQueries) handleInstallKey(q *Query) {
response := nodeKeyResponse{Result: false}
keyring := s.serf.config.MemberlistConfig.Keyring
req := keyRequest{}
err := decodeMessage(q.Payload[1:], &req)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err)
goto SEND
}
if !s.serf.EncryptionEnabled() {
response.Message = "No keyring to modify (encryption not enabled)"
s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)")
goto SEND
}
s.logger.Printf("[INFO] serf: Received install-key query")
if err := keyring.AddKey(req.Key); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to install key: %s", err)
goto SEND
}
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
}
response.Result = true
SEND:
s.sendKeyResponse(q, &response)
}
// handleUseKey is invoked whenever a query is received to mark a different key
// in the internal keyring as the primary key. This type of query may fail due
// to operator error (requested key not in ring), and thus sends error messages
// back in the response.
func (s *serfQueries) handleUseKey(q *Query) {
response := nodeKeyResponse{Result: false}
keyring := s.serf.config.MemberlistConfig.Keyring
req := keyRequest{}
err := decodeMessage(q.Payload[1:], &req)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err)
goto SEND
}
if !s.serf.EncryptionEnabled() {
response.Message = "No keyring to modify (encryption not enabled)"
s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)")
goto SEND
}
s.logger.Printf("[INFO] serf: Received use-key query")
if err := keyring.UseKey(req.Key); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to change primary key: %s", err)
goto SEND
}
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
}
response.Result = true
SEND:
s.sendKeyResponse(q, &response)
}
// handleRemoveKey is invoked when a query is received to remove a particular
// key from the keyring. This type of query can fail if the key requested for
// deletion is currently the primary key in the keyring, so therefore it will
// reply to the query with any relevant errors from the operation.
func (s *serfQueries) handleRemoveKey(q *Query) {
response := nodeKeyResponse{Result: false}
keyring := s.serf.config.MemberlistConfig.Keyring
req := keyRequest{}
err := decodeMessage(q.Payload[1:], &req)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err)
goto SEND
}
if !s.serf.EncryptionEnabled() {
response.Message = "No keyring to modify (encryption not enabled)"
s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)")
goto SEND
}
s.logger.Printf("[INFO] serf: Received remove-key query")
if err := keyring.RemoveKey(req.Key); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to remove key: %s", err)
goto SEND
}
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
}
response.Result = true
SEND:
s.sendKeyResponse(q, &response)
}
// handleListKeys is invoked when a query is received to return a list of all
// installed keys the Serf instance knows of. For performance, the keys are
// encoded to base64 on each of the members to remove this burden from the
// node asking for the results.
func (s *serfQueries) handleListKeys(q *Query) {
response := nodeKeyResponse{Result: false}
keyring := s.serf.config.MemberlistConfig.Keyring
if !s.serf.EncryptionEnabled() {
response.Message = "Keyring is empty (encryption not enabled)"
s.logger.Printf("[ERR] serf: Keyring is empty (encryption not enabled)")
goto SEND
}
s.logger.Printf("[INFO] serf: Received list-keys query")
for _, keyBytes := range keyring.GetKeys() {
// Encode the keys before sending the response. This should help take
// some the burden of doing this off of the asking member.
key := base64.StdEncoding.EncodeToString(keyBytes)
response.Keys = append(response.Keys, key)
}
response.Result = true
SEND:
s.sendKeyResponse(q, &response)
}

View file

@ -1,166 +0,0 @@
package serf
import (
"encoding/base64"
"fmt"
"sync"
)
// KeyManager encapsulates all functionality within Serf for handling
// encryption keyring changes across a cluster.
type KeyManager struct {
serf *Serf
// Lock to protect read and write operations
l sync.RWMutex
}
// keyRequest is used to contain input parameters which get broadcasted to all
// nodes as part of a key query operation.
type keyRequest struct {
Key []byte
}
// KeyResponse is used to relay a query for a list of all keys in use.
type KeyResponse struct {
Messages map[string]string // Map of node name to response message
NumNodes int // Total nodes memberlist knows of
NumResp int // Total responses received
NumErr int // Total errors from request
// Keys is a mapping of the base64-encoded value of the key bytes to the
// number of nodes that have the key installed.
Keys map[string]int
}
// streamKeyResp takes care of reading responses from a channel and composing
// them into a KeyResponse. It will update a KeyResponse *in place* and
// therefore has nothing to return.
func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
for r := range ch {
var nodeResponse nodeKeyResponse
resp.NumResp++
// Decode the response
if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageKeyResponseType {
resp.Messages[r.From] = fmt.Sprintf(
"Invalid key query response type: %v", r.Payload)
resp.NumErr++
goto NEXT
}
if err := decodeMessage(r.Payload[1:], &nodeResponse); err != nil {
resp.Messages[r.From] = fmt.Sprintf(
"Failed to decode key query response: %v", r.Payload)
resp.NumErr++
goto NEXT
}
if !nodeResponse.Result {
resp.Messages[r.From] = nodeResponse.Message
resp.NumErr++
}
// Currently only used for key list queries, this adds keys to a counter
// and increments them for each node response which contains them.
for _, key := range nodeResponse.Keys {
if _, ok := resp.Keys[key]; !ok {
resp.Keys[key] = 1
} else {
resp.Keys[key]++
}
}
NEXT:
// Return early if all nodes have responded. This allows us to avoid
// waiting for the full timeout when there is nothing left to do.
if resp.NumResp == resp.NumNodes {
return
}
}
}
// handleKeyRequest performs query broadcasting to all members for any type of
// key operation and manages gathering responses and packing them up into a
// KeyResponse for uniform response handling.
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
resp := &KeyResponse{
Messages: make(map[string]string),
Keys: make(map[string]int),
}
qName := internalQueryName(query)
// Decode the new key into raw bytes
rawKey, err := base64.StdEncoding.DecodeString(key)
if err != nil {
return resp, err
}
// Encode the query request
req, err := encodeMessage(messageKeyRequestType, keyRequest{Key: rawKey})
if err != nil {
return resp, err
}
qParam := k.serf.DefaultQueryParams()
queryResp, err := k.serf.Query(qName, req, qParam)
if err != nil {
return resp, err
}
// Handle the response stream and populate the KeyResponse
resp.NumNodes = k.serf.memberlist.NumMembers()
k.streamKeyResp(resp, queryResp.respCh)
// Check the response for any reported failure conditions
if resp.NumErr != 0 {
return resp, fmt.Errorf("%d/%d nodes reported failure", resp.NumErr, resp.NumNodes)
}
if resp.NumResp != resp.NumNodes {
return resp, fmt.Errorf("%d/%d nodes reported success", resp.NumResp, resp.NumNodes)
}
return resp, nil
}
// InstallKey handles broadcasting a query to all members and gathering
// responses from each of them, returning a list of messages from each node
// and any applicable error conditions.
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, installKeyQuery)
}
// UseKey handles broadcasting a primary key change to all members in the
// cluster, and gathering any response messages. If successful, there should
// be an empty KeyResponse returned.
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, useKeyQuery)
}
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
// will receive this event, and if they have the key in their keyring, remove
// it. If any errors are encountered, RemoveKey will collect and relay them.
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, removeKeyQuery)
}
// ListKeys is used to collect installed keys from members in a Serf cluster
// and return an aggregated list of all installed keys. This is useful to
// operators to ensure that there are no lingering keys installed on any agents.
// Since having multiple keys installed can cause performance penalties in some
// cases, it's important to verify this information and remove unneeded keys.
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
k.l.RLock()
defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery)
}

View file

@ -1,45 +0,0 @@
package serf
import (
"sync/atomic"
)
// LamportClock is a thread safe implementation of a lamport clock. It
// uses efficient atomic operations for all of its functions, falling back
// to a heavy lock only if there are enough CAS failures.
type LamportClock struct {
counter uint64
}
// LamportTime is the value of a LamportClock.
type LamportTime uint64
// Time is used to return the current value of the lamport clock
func (l *LamportClock) Time() LamportTime {
return LamportTime(atomic.LoadUint64(&l.counter))
}
// Increment is used to increment and return the value of the lamport clock
func (l *LamportClock) Increment() LamportTime {
return LamportTime(atomic.AddUint64(&l.counter, 1))
}
// Witness is called to update our local clock if necessary after
// witnessing a clock value received from another process
func (l *LamportClock) Witness(v LamportTime) {
WITNESS:
// If the other value is old, we do not need to do anything
cur := atomic.LoadUint64(&l.counter)
other := uint64(v)
if other < cur {
return
}
// Ensure that our local clock is at least one ahead.
if !atomic.CompareAndSwapUint64(&l.counter, cur, other+1) {
// The CAS failed, so we just retry. Eventually our CAS should
// succeed or a future witness will pass us by and our witness
// will end.
goto WITNESS
}
}

View file

@ -1,44 +0,0 @@
package serf
import (
"net"
"github.com/hashicorp/memberlist"
)
type MergeDelegate interface {
NotifyMerge([]*Member) error
}
type mergeDelegate struct {
serf *Serf
}
func (m *mergeDelegate) NotifyMerge(nodes []*memberlist.Node) error {
members := make([]*Member, len(nodes))
for idx, n := range nodes {
members[idx] = m.nodeToMember(n)
}
return m.serf.config.Merge.NotifyMerge(members)
}
func (m *mergeDelegate) NotifyAlive(peer *memberlist.Node) error {
member := m.nodeToMember(peer)
return m.serf.config.Merge.NotifyMerge([]*Member{member})
}
func (m *mergeDelegate) nodeToMember(n *memberlist.Node) *Member {
return &Member{
Name: n.Name,
Addr: net.IP(n.Addr),
Port: n.Port,
Tags: m.serf.decodeTags(n.Meta),
Status: StatusNone,
ProtocolMin: n.PMin,
ProtocolMax: n.PMax,
ProtocolCur: n.PCur,
DelegateMin: n.DMin,
DelegateMax: n.DMax,
DelegateCur: n.DCur,
}
}

View file

@ -1,147 +0,0 @@
package serf
import (
"bytes"
"github.com/hashicorp/go-msgpack/codec"
"time"
)
// messageType are the types of gossip messages Serf will send along
// memberlist.
type messageType uint8
const (
messageLeaveType messageType = iota
messageJoinType
messagePushPullType
messageUserEventType
messageQueryType
messageQueryResponseType
messageConflictResponseType
messageKeyRequestType
messageKeyResponseType
)
const (
// Ack flag is used to force receiver to send an ack back
queryFlagAck uint32 = 1 << iota
// NoBroadcast is used to prevent re-broadcast of a query.
// this can be used to selectively send queries to individual members
queryFlagNoBroadcast
)
// filterType is used with a queryFilter to specify the type of
// filter we are sending
type filterType uint8
const (
filterNodeType filterType = iota
filterTagType
)
// messageJoin is the message broadcasted after we join to
// associated the node with a lamport clock
type messageJoin struct {
LTime LamportTime
Node string
}
// messageLeave is the message broadcasted to signal the intentional to
// leave.
type messageLeave struct {
LTime LamportTime
Node string
}
// messagePushPullType is used when doing a state exchange. This
// is a relatively large message, but is sent infrequently
type messagePushPull struct {
LTime LamportTime // Current node lamport time
StatusLTimes map[string]LamportTime // Maps the node to its status time
LeftMembers []string // List of left nodes
EventLTime LamportTime // Lamport time for event clock
Events []*userEvents // Recent events
QueryLTime LamportTime // Lamport time for query clock
}
// messageUserEvent is used for user-generated events
type messageUserEvent struct {
LTime LamportTime
Name string
Payload []byte
CC bool // "Can Coalesce". Zero value is compatible with Serf 0.1
}
// messageQuery is used for query events
type messageQuery struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
}
// Ack checks if the ack flag is set
func (m *messageQuery) Ack() bool {
return (m.Flags & queryFlagAck) != 0
}
// NoBroadcast checks if the no broadcast flag is set
func (m *messageQuery) NoBroadcast() bool {
return (m.Flags & queryFlagNoBroadcast) != 0
}
// filterNode is used with the filterNodeType, and is a list
// of node names
type filterNode []string
// filterTag is used with the filterTagType and is a regular
// expression to apply to a tag
type filterTag struct {
Tag string
Expr string
}
// messageQueryResponse is used to respond to a query
type messageQueryResponse struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID
From string // Node name
Flags uint32 // Used to provide various flags
Payload []byte // Optional response payload
}
// Ack checks if the ack flag is set
func (m *messageQueryResponse) Ack() bool {
return (m.Flags & queryFlagAck) != 0
}
func decodeMessage(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
}
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(t))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(msg)
return buf.Bytes(), err
}
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(f))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(filt)
return buf.Bytes(), err
}

View file

@ -1,89 +0,0 @@
package serf
import (
"bytes"
"log"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/coordinate"
)
// pingDelegate is notified when memberlist successfully completes a direct ping
// of a peer node. We use this to update our estimated network coordinate, as
// well as cache the coordinate of the peer.
type pingDelegate struct {
serf *Serf
}
const (
// PingVersion is an internal version for the ping message, above the normal
// versioning we get from the protocol version. This enables small updates
// to the ping message without a full protocol bump.
PingVersion = 1
)
// AckPayload is called to produce a payload to send back in response to a ping
// request.
func (p *pingDelegate) AckPayload() []byte {
var buf bytes.Buffer
// The first byte is the version number, forming a simple header.
version := []byte{PingVersion}
buf.Write(version)
// The rest of the message is the serialized coordinate.
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
}
return buf.Bytes()
}
// NotifyPingComplete is called when this node successfully completes a direct ping
// of a peer node.
func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Duration, payload []byte) {
if payload == nil || len(payload) == 0 {
return
}
// Verify ping version in the header.
version := payload[0]
if version != PingVersion {
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
return
}
// Process the remainder of the message as a coordinate.
r := bytes.NewReader(payload[1:])
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
var coord coordinate.Coordinate
if err := dec.Decode(&coord); err != nil {
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
}
// Apply the update. Since this is a coordinate coming from some place
// else we harden this and look for dimensionality problems proactively.
before := p.serf.coordClient.GetCoordinate()
if before.IsCompatibleWith(&coord) {
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
} else {
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
}
}

View file

@ -1,210 +0,0 @@
package serf
import (
"math"
"regexp"
"sync"
"time"
)
// QueryParam is provided to Query() to configure the parameters of the
// query. If not provided, sane defaults will be used.
type QueryParam struct {
// If provided, we restrict the nodes that should respond to those
// with names in this list
FilterNodes []string
// FilterTags maps a tag name to a regular expression that is applied
// to restrict the nodes that should respond
FilterTags map[string]string
// If true, we are requesting an delivery acknowledgement from
// every node that meets the filter requirement. This means nodes
// the receive the message but do not pass the filters, will not
// send an ack.
RequestAck bool
// The timeout limits how long the query is left open. If not provided,
// then a default timeout is used based on the configuration of Serf
Timeout time.Duration
}
// DefaultQueryTimeout returns the default timeout value for a query
// Computed as GossipInterval * QueryTimeoutMult * log(N+1)
func (s *Serf) DefaultQueryTimeout() time.Duration {
n := s.memberlist.NumMembers()
timeout := s.config.MemberlistConfig.GossipInterval
timeout *= time.Duration(s.config.QueryTimeoutMult)
timeout *= time.Duration(math.Ceil(math.Log10(float64(n + 1))))
return timeout
}
// DefaultQueryParam is used to return the default query parameters
func (s *Serf) DefaultQueryParams() *QueryParam {
return &QueryParam{
FilterNodes: nil,
FilterTags: nil,
RequestAck: false,
Timeout: s.DefaultQueryTimeout(),
}
}
// encodeFilters is used to convert the filters into the wire format
func (q *QueryParam) encodeFilters() ([][]byte, error) {
var filters [][]byte
// Add the node filter
if len(q.FilterNodes) > 0 {
if buf, err := encodeFilter(filterNodeType, q.FilterNodes); err != nil {
return nil, err
} else {
filters = append(filters, buf)
}
}
// Add the tag filters
for tag, expr := range q.FilterTags {
filt := filterTag{tag, expr}
if buf, err := encodeFilter(filterTagType, &filt); err != nil {
return nil, err
} else {
filters = append(filters, buf)
}
}
return filters, nil
}
// QueryResponse is returned for each new Query. It is used to collect
// Ack's as well as responses and to provide those back to a client.
type QueryResponse struct {
// ackCh is used to send the name of a node for which we've received an ack
ackCh chan string
// deadline is the query end time (start + query timeout)
deadline time.Time
// Query ID
id uint32
// Stores the LTime of the query
lTime LamportTime
// respCh is used to send a response from a node
respCh chan NodeResponse
closed bool
closeLock sync.Mutex
}
// newQueryResponse is used to construct a new query response
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
resp := &QueryResponse{
deadline: time.Now().Add(q.Timeout),
id: q.ID,
lTime: q.LTime,
respCh: make(chan NodeResponse, n),
}
if q.Ack() {
resp.ackCh = make(chan string, n)
}
return resp
}
// Close is used to close the query, which will close the underlying
// channels and prevent further deliveries
func (r *QueryResponse) Close() {
r.closeLock.Lock()
defer r.closeLock.Unlock()
if r.closed {
return
}
r.closed = true
if r.ackCh != nil {
close(r.ackCh)
}
if r.respCh != nil {
close(r.respCh)
}
}
// Deadline returns the ending deadline of the query
func (r *QueryResponse) Deadline() time.Time {
return r.deadline
}
// Finished returns if the query is finished running
func (r *QueryResponse) Finished() bool {
return r.closed || time.Now().After(r.deadline)
}
// AckCh returns a channel that can be used to listen for acks
// Channel will be closed when the query is finished. This is nil,
// if the query did not specify RequestAck.
func (r *QueryResponse) AckCh() <-chan string {
return r.ackCh
}
// ResponseCh returns a channel that can be used to listen for responses.
// Channel will be closed when the query is finished.
func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
return r.respCh
}
// NodeResponse is used to represent a single response from a node
type NodeResponse struct {
From string
Payload []byte
}
// shouldProcessQuery checks if a query should be proceeded given
// a set of filers.
func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
for _, filter := range filters {
switch filterType(filter[0]) {
case filterNodeType:
// Decode the filter
var nodes filterNode
if err := decodeMessage(filter[1:], &nodes); err != nil {
s.logger.Printf("[WARN] serf: failed to decode filterNodeType: %v", err)
return false
}
// Check if we are being targeted
found := false
for _, n := range nodes {
if n == s.config.NodeName {
found = true
break
}
}
if !found {
return false
}
case filterTagType:
// Decode the filter
var filt filterTag
if err := decodeMessage(filter[1:], &filt); err != nil {
s.logger.Printf("[WARN] serf: failed to decode filterTagType: %v", err)
return false
}
// Check if we match this regex
tags := s.config.Tags
matched, err := regexp.MatchString(filt.Expr, tags[filt.Tag])
if err != nil {
s.logger.Printf("[WARN] serf: failed to compile filter regex (%s): %v", filt.Expr, err)
return false
}
if !matched {
return false
}
default:
s.logger.Printf("[WARN] serf: query has unrecognized filter type: %d", filter[0])
return false
}
}
return true
}

File diff suppressed because it is too large Load diff

View file

@ -1,560 +0,0 @@
package serf
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/serf/coordinate"
)
/*
Serf supports using a "snapshot" file that contains various
transactional data that is used to help Serf recover quickly
and gracefully from a failure. We append member events, as well
as the latest clock values to the file during normal operation,
and periodically checkpoint and roll over the file. During a restore,
we can replay the various member events to recall a list of known
nodes to re-join, as well as restore our clock values to avoid replaying
old events.
*/
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
type Snapshotter struct {
aliveNodes map[string]string
clock *LamportClock
coordClient *coordinate.Client
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
}
// PreviousNode is used to represent the previously known alive nodes
type PreviousNode struct {
Name string
Addr string
}
func (p PreviousNode) String() string {
return fmt.Sprintf("%s: %s", p.Name, p.Addr)
}
// NewSnapshotter creates a new Snapshotter that records events up to a
// max byte size before rotating the file. It can also be used to
// recover old state. Snapshotter works by reading an event channel it returns,
// passing through to an output channel, and persisting relevant events to disk.
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
// if you intend to rejoin the same cluster after a leave.
func NewSnapshotter(path string,
maxSize int,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
coordClient *coordinate.Client,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
}
// Determine the offset
info, err := fh.Stat()
if err != nil {
fh.Close()
return nil, nil, fmt.Errorf("failed to stat snapshot: %v", err)
}
offset := info.Size()
// Create the snapshotter
snap := &Snapshotter{
aliveNodes: make(map[string]string),
clock: clock,
coordClient: coordClient,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
lastClock: 0,
lastEventClock: 0,
lastQueryClock: 0,
leaveCh: make(chan struct{}),
logger: logger,
maxSize: int64(maxSize),
path: path,
offset: offset,
outCh: outCh,
rejoinAfterLeave: rejoinAfterLeave,
shutdownCh: shutdownCh,
waitCh: make(chan struct{}),
}
// Recover the last known state
if err := snap.replay(); err != nil {
fh.Close()
return nil, nil, err
}
// Start handling new commands
go snap.stream()
return inCh, snap, nil
}
// LastClock returns the last known clock time
func (s *Snapshotter) LastClock() LamportTime {
return s.lastClock
}
// LastEventClock returns the last known event clock time
func (s *Snapshotter) LastEventClock() LamportTime {
return s.lastEventClock
}
// LastQueryClock returns the last known query clock time
func (s *Snapshotter) LastQueryClock() LamportTime {
return s.lastQueryClock
}
// AliveNodes returns the last known alive nodes
func (s *Snapshotter) AliveNodes() []*PreviousNode {
// Copy the previously known
previous := make([]*PreviousNode, 0, len(s.aliveNodes))
for name, addr := range s.aliveNodes {
previous = append(previous, &PreviousNode{name, addr})
}
// Randomize the order, prevents hot shards
for i := range previous {
j := rand.Intn(i + 1)
previous[i], previous[j] = previous[j], previous[i]
}
return previous
}
// Wait is used to wait until the snapshotter finishes shut down
func (s *Snapshotter) Wait() {
<-s.waitCh
}
// Leave is used to remove known nodes to prevent a restart from
// causing a join. Otherwise nodes will re-join after leaving!
func (s *Snapshotter) Leave() {
select {
case s.leaveCh <- struct{}{}:
case <-s.shutdownCh:
}
}
// stream is a long running routine that is used to handle events
func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
coordinateTicker := time.NewTicker(coordinateUpdateInterval)
defer coordinateTicker.Stop()
for {
select {
case <-s.leaveCh:
s.leaving = true
// If we plan to re-join, keep our state
if !s.rejoinAfterLeave {
s.aliveNodes = make(map[string]string)
}
s.tryAppend("leave\n")
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush leave to snapshot: %v", err)
}
if err := s.fh.Sync(); err != nil {
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
}
case e := <-s.inCh:
// Forward the event immediately
if s.outCh != nil {
s.outCh <- e
}
// Stop recording events after a leave is issued
if s.leaving {
continue
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
case <-clockTicker.C:
s.updateClock()
case <-coordinateTicker.C:
s.updateCoordinate()
case <-s.shutdownCh:
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
}
if err := s.fh.Sync(); err != nil {
s.logger.Printf("[ERR] serf: failed to sync snapshot: %v", err)
}
s.fh.Close()
close(s.waitCh)
return
}
}
}
// processMemberEvent is used to handle a single member event
func (s *Snapshotter) processMemberEvent(e MemberEvent) {
switch e.Type {
case EventMemberJoin:
for _, mem := range e.Members {
addr := net.TCPAddr{IP: mem.Addr, Port: int(mem.Port)}
s.aliveNodes[mem.Name] = addr.String()
s.tryAppend(fmt.Sprintf("alive: %s %s\n", mem.Name, addr.String()))
}
case EventMemberLeave:
fallthrough
case EventMemberFailed:
for _, mem := range e.Members {
delete(s.aliveNodes, mem.Name)
s.tryAppend(fmt.Sprintf("not-alive: %s\n", mem.Name))
}
}
s.updateClock()
}
// updateClock is called periodically to check if we should udpate our
// clock value. This is done after member events but should also be done
// periodically due to race conditions with join and leave intents
func (s *Snapshotter) updateClock() {
lastSeen := s.clock.Time() - 1
if lastSeen > s.lastClock {
s.lastClock = lastSeen
s.tryAppend(fmt.Sprintf("clock: %d\n", s.lastClock))
}
}
// updateCoordinate is called periodically to write out the current local
// coordinate. It's safe to call this if coordinates aren't enabled (nil
// client) and it will be a no-op.
func (s *Snapshotter) updateCoordinate() {
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
} else {
s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
}
}
}
// processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks
if e.LTime <= s.lastEventClock {
return
}
s.lastEventClock = e.LTime
s.tryAppend(fmt.Sprintf("event-clock: %d\n", e.LTime))
}
// processQuery is used to handle a single query event
func (s *Snapshotter) processQuery(q *Query) {
// Ignore old clocks
if q.LTime <= s.lastQueryClock {
return
}
s.lastQueryClock = q.LTime
s.tryAppend(fmt.Sprintf("query-clock: %d\n", q.LTime))
}
// tryAppend will invoke append line but will not return an error
func (s *Snapshotter) tryAppend(l string) {
if err := s.appendLine(l); err != nil {
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
}
}
// appendLine is used to append a line to the existing log
func (s *Snapshotter) appendLine(l string) error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now())
n, err := s.buffered.WriteString(l)
if err != nil {
return err
}
// Check if we should flush
now := time.Now()
if now.Sub(s.lastFlush) > flushInterval {
s.lastFlush = now
if err := s.buffered.Flush(); err != nil {
return err
}
}
// Check if a compaction is necessary
s.offset += int64(n)
if s.offset > s.maxSize {
return s.compact()
}
return nil
}
// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
// Try to open the file to new fiel
newPath := s.path + tmpExt
fh, err := os.OpenFile(newPath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("failed to open new snapshot: %v", err)
}
// Create a buffered writer
buf := bufio.NewWriter(fh)
// Write out the live nodes
var offset int64
for name, addr := range s.aliveNodes {
line := fmt.Sprintf("alive: %s %s\n", name, addr)
n, err := buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Write out the clocks
line := fmt.Sprintf("clock: %d\n", s.lastClock)
n, err := buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
line = fmt.Sprintf("event-clock: %d\n", s.lastEventClock)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
line = fmt.Sprintf("query-clock: %d\n", s.lastQueryClock)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
// Write out the coordinate.
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
fh.Close()
return err
}
line = fmt.Sprintf("coordinate: %s\n", encoded)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Flush the new snapshot
err = buf.Flush()
fh.Close()
if err != nil {
return fmt.Errorf("failed to flush new snapshot: %v", err)
}
// We now need to swap the old snapshot file with the new snapshot.
// Turns out, Windows won't let us rename the files if we have
// open handles to them or if the destination already exists. This
// means we are forced to close the existing handles, delete the
// old file, move the new one in place, and then re-open the file
// handles.
// Flush the existing snapshot, ignoring errors since we will
// delete it momentarily.
s.buffered.Flush()
s.buffered = nil
// Close the file handle to the old snapshot
s.fh.Close()
s.fh = nil
// Delete the old file
if err := os.Remove(s.path); err != nil {
return fmt.Errorf("failed to remove old snapshot: %v", err)
}
// Move the new file into place
if err := os.Rename(newPath, s.path); err != nil {
return fmt.Errorf("failed to install new snapshot: %v", err)
}
// Open the new snapshot
fh, err = os.OpenFile(s.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("failed to open snapshot: %v", err)
}
buf = bufio.NewWriter(fh)
// Rotate our handles
s.fh = fh
s.buffered = buf
s.offset = offset
s.lastFlush = time.Now()
return nil
}
// replay is used to seek to reset our internal state by replaying
// the snapshot file. It is used at initialization time to read old
// state
func (s *Snapshotter) replay() error {
// Seek to the beginning
if _, err := s.fh.Seek(0, os.SEEK_SET); err != nil {
return err
}
// Read each line
reader := bufio.NewReader(s.fh)
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
// Skip the newline
line = line[:len(line)-1]
// Switch on the prefix
if strings.HasPrefix(line, "alive: ") {
info := strings.TrimPrefix(line, "alive: ")
addrIdx := strings.LastIndex(info, " ")
if addrIdx == -1 {
s.logger.Printf("[WARN] serf: Failed to parse address: %v", line)
continue
}
addr := info[addrIdx+1:]
name := info[:addrIdx]
s.aliveNodes[name] = addr
} else if strings.HasPrefix(line, "not-alive: ") {
name := strings.TrimPrefix(line, "not-alive: ")
delete(s.aliveNodes, name)
} else if strings.HasPrefix(line, "clock: ") {
timeStr := strings.TrimPrefix(line, "clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert clock time: %v", err)
continue
}
s.lastClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "event-clock: ") {
timeStr := strings.TrimPrefix(line, "event-clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert event clock time: %v", err)
continue
}
s.lastEventClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "query-clock: ") {
timeStr := strings.TrimPrefix(line, "query-clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert query clock time: %v", err)
continue
}
s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") {
if s.coordClient == nil {
s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
continue
}
coordStr := strings.TrimPrefix(line, "coordinate: ")
var coord coordinate.Coordinate
err := json.Unmarshal([]byte(coordStr), &coord)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
s.coordClient.SetCoordinate(&coord)
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {
s.logger.Printf("[INFO] serf: Ignoring previous leave in snapshot")
continue
}
s.aliveNodes = make(map[string]string)
s.lastClock = 0
s.lastEventClock = 0
s.lastQueryClock = 0
} else if strings.HasPrefix(line, "#") {
// Skip comment lines
} else {
s.logger.Printf("[WARN] serf: Unrecognized snapshot line: %v", line)
}
}
// Seek to the end
if _, err := s.fh.Seek(0, os.SEEK_END); err != nil {
return err
}
return nil
}

View file

@ -1,12 +0,0 @@
package main
// The git commit that was compiled. This will be filled in by the compiler.
var GitCommit string
// The main version number that is being run at the moment.
const Version = "0.7.0"
// A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release
// such as "dev" (in development), "beta", "rc1", etc.
const VersionPrerelease = ""