TLS Support for ConsulCatalog
This commit is contained in:
parent
bad71d1a36
commit
cb54e414ed
26 changed files with 1335 additions and 131 deletions
53
vendor/github.com/hashicorp/consul/api/acl.go
generated
vendored
53
vendor/github.com/hashicorp/consul/api/acl.go
generated
vendored
|
@ -1,5 +1,9 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// ACLCLientType is the client type token
|
||||
ACLClientType = "client"
|
||||
|
@ -18,6 +22,16 @@ type ACLEntry struct {
|
|||
Rules string
|
||||
}
|
||||
|
||||
// ACLReplicationStatus is used to represent the status of ACL replication.
|
||||
type ACLReplicationStatus struct {
|
||||
Enabled bool
|
||||
Running bool
|
||||
SourceDatacenter string
|
||||
ReplicatedIndex uint64
|
||||
LastSuccess time.Time
|
||||
LastError time.Time
|
||||
}
|
||||
|
||||
// ACL can be used to query the ACL endpoints
|
||||
type ACL struct {
|
||||
c *Client
|
||||
|
@ -28,6 +42,24 @@ func (c *Client) ACL() *ACL {
|
|||
return &ACL{c}
|
||||
}
|
||||
|
||||
// Bootstrap is used to perform a one-time ACL bootstrap operation on a cluster
|
||||
// to get the first management token.
|
||||
func (a *ACL) Bootstrap() (string, *WriteMeta, error) {
|
||||
r := a.c.newRequest("PUT", "/v1/acl/bootstrap")
|
||||
rtt, resp, err := requireOK(a.c.doRequest(r))
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{RequestTime: rtt}
|
||||
var out struct{ ID string }
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return out.ID, wm, nil
|
||||
}
|
||||
|
||||
// Create is used to generate a new token with the given parameters
|
||||
func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) {
|
||||
r := a.c.newRequest("PUT", "/v1/acl/create")
|
||||
|
@ -138,3 +170,24 @@ func (a *ACL) List(q *QueryOptions) ([]*ACLEntry, *QueryMeta, error) {
|
|||
}
|
||||
return entries, qm, nil
|
||||
}
|
||||
|
||||
// Replication returns the status of the ACL replication process in the datacenter
|
||||
func (a *ACL) Replication(q *QueryOptions) (*ACLReplicationStatus, *QueryMeta, error) {
|
||||
r := a.c.newRequest("GET", "/v1/acl/replication")
|
||||
r.setQueryOptions(q)
|
||||
rtt, resp, err := requireOK(a.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
var entries *ACLReplicationStatus
|
||||
if err := decodeBody(resp, &entries); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return entries, qm, nil
|
||||
}
|
||||
|
|
181
vendor/github.com/hashicorp/consul/api/agent.go
generated
vendored
181
vendor/github.com/hashicorp/consul/api/agent.go
generated
vendored
|
@ -15,6 +15,7 @@ type AgentCheck struct {
|
|||
Output string
|
||||
ServiceID string
|
||||
ServiceName string
|
||||
Definition HealthCheckDefinition
|
||||
}
|
||||
|
||||
// AgentService represents a service known to the agent
|
||||
|
@ -25,6 +26,8 @@ type AgentService struct {
|
|||
Port int
|
||||
Address string
|
||||
EnableTagOverride bool
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// AgentMember represents a cluster member known to the agent
|
||||
|
@ -42,6 +45,19 @@ type AgentMember struct {
|
|||
DelegateCur uint8
|
||||
}
|
||||
|
||||
// AllSegments is used to select for all segments in MembersOpts.
|
||||
const AllSegments = "_all"
|
||||
|
||||
// MembersOpts is used for querying member information.
|
||||
type MembersOpts struct {
|
||||
// WAN is whether to show members from the WAN.
|
||||
WAN bool
|
||||
|
||||
// Segment is the LAN segment to show members for. Setting this to the
|
||||
// AllSegments value above will show members in all segments.
|
||||
Segment string
|
||||
}
|
||||
|
||||
// AgentServiceRegistration is used to register a new service
|
||||
type AgentServiceRegistration struct {
|
||||
ID string `json:",omitempty"`
|
||||
|
@ -65,17 +81,22 @@ type AgentCheckRegistration struct {
|
|||
|
||||
// AgentServiceCheck is used to define a node or service level check
|
||||
type AgentServiceCheck struct {
|
||||
Script string `json:",omitempty"`
|
||||
DockerContainerID string `json:",omitempty"`
|
||||
Shell string `json:",omitempty"` // Only supported for Docker.
|
||||
Interval string `json:",omitempty"`
|
||||
Timeout string `json:",omitempty"`
|
||||
TTL string `json:",omitempty"`
|
||||
HTTP string `json:",omitempty"`
|
||||
TCP string `json:",omitempty"`
|
||||
Status string `json:",omitempty"`
|
||||
Notes string `json:",omitempty"`
|
||||
TLSSkipVerify bool `json:",omitempty"`
|
||||
CheckID string `json:",omitempty"`
|
||||
Name string `json:",omitempty"`
|
||||
Args []string `json:"ScriptArgs,omitempty"`
|
||||
Script string `json:",omitempty"` // Deprecated, use Args.
|
||||
DockerContainerID string `json:",omitempty"`
|
||||
Shell string `json:",omitempty"` // Only supported for Docker.
|
||||
Interval string `json:",omitempty"`
|
||||
Timeout string `json:",omitempty"`
|
||||
TTL string `json:",omitempty"`
|
||||
HTTP string `json:",omitempty"`
|
||||
Header map[string][]string `json:",omitempty"`
|
||||
Method string `json:",omitempty"`
|
||||
TCP string `json:",omitempty"`
|
||||
Status string `json:",omitempty"`
|
||||
Notes string `json:",omitempty"`
|
||||
TLSSkipVerify bool `json:",omitempty"`
|
||||
|
||||
// In Consul 0.7 and later, checks that are associated with a service
|
||||
// may also contain this optional DeregisterCriticalServiceAfter field,
|
||||
|
@ -87,6 +108,47 @@ type AgentServiceCheck struct {
|
|||
}
|
||||
type AgentServiceChecks []*AgentServiceCheck
|
||||
|
||||
// AgentToken is used when updating ACL tokens for an agent.
|
||||
type AgentToken struct {
|
||||
Token string
|
||||
}
|
||||
|
||||
// Metrics info is used to store different types of metric values from the agent.
|
||||
type MetricsInfo struct {
|
||||
Timestamp string
|
||||
Gauges []GaugeValue
|
||||
Points []PointValue
|
||||
Counters []SampledValue
|
||||
Samples []SampledValue
|
||||
}
|
||||
|
||||
// GaugeValue stores one value that is updated as time goes on, such as
|
||||
// the amount of memory allocated.
|
||||
type GaugeValue struct {
|
||||
Name string
|
||||
Value float32
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
// PointValue holds a series of points for a metric.
|
||||
type PointValue struct {
|
||||
Name string
|
||||
Points []float32
|
||||
}
|
||||
|
||||
// SampledValue stores info about a metric that is incremented over time,
|
||||
// such as the number of requests to an HTTP endpoint.
|
||||
type SampledValue struct {
|
||||
Name string
|
||||
Count int
|
||||
Sum float64
|
||||
Min float64
|
||||
Max float64
|
||||
Mean float64
|
||||
Stddev float64
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
// Agent can be used to query the Agent endpoints
|
||||
type Agent struct {
|
||||
c *Client
|
||||
|
@ -117,6 +179,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
|
|||
return out, nil
|
||||
}
|
||||
|
||||
// Metrics is used to query the agent we are speaking to for
|
||||
// its current internal metric data
|
||||
func (a *Agent) Metrics() (*MetricsInfo, error) {
|
||||
r := a.c.newRequest("GET", "/v1/agent/metrics")
|
||||
_, resp, err := requireOK(a.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out *MetricsInfo
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Reload triggers a configuration reload for the agent we are connected to.
|
||||
func (a *Agent) Reload() error {
|
||||
r := a.c.newRequest("PUT", "/v1/agent/reload")
|
||||
|
@ -194,6 +273,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
|
|||
return out, nil
|
||||
}
|
||||
|
||||
// MembersOpts returns the known gossip members and can be passed
|
||||
// additional options for WAN/segment filtering.
|
||||
func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) {
|
||||
r := a.c.newRequest("GET", "/v1/agent/members")
|
||||
r.params.Set("segment", opts.Segment)
|
||||
if opts.WAN {
|
||||
r.params.Set("wan", "1")
|
||||
}
|
||||
|
||||
_, resp, err := requireOK(a.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out []*AgentMember
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ServiceRegister is used to register a new service with
|
||||
// the local agent
|
||||
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {
|
||||
|
@ -437,8 +538,9 @@ func (a *Agent) DisableNodeMaintenance() error {
|
|||
|
||||
// Monitor returns a channel which will receive streaming logs from the agent
|
||||
// Providing a non-nil stopCh can be used to close the connection and stop the
|
||||
// log stream
|
||||
func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) (chan string, error) {
|
||||
// log stream. An empty string will be sent down the given channel when there's
|
||||
// nothing left to stream, after which the caller should close the stopCh.
|
||||
func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
|
||||
r := a.c.newRequest("GET", "/v1/agent/monitor")
|
||||
r.setQueryOptions(q)
|
||||
if loglevel != "" {
|
||||
|
@ -462,10 +564,61 @@ func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions)
|
|||
default:
|
||||
}
|
||||
if scanner.Scan() {
|
||||
logCh <- scanner.Text()
|
||||
// An empty string signals to the caller that
|
||||
// the scan is done, so make sure we only emit
|
||||
// that when the scanner says it's done, not if
|
||||
// we happen to ingest an empty line.
|
||||
if text := scanner.Text(); text != "" {
|
||||
logCh <- text
|
||||
} else {
|
||||
logCh <- " "
|
||||
}
|
||||
} else {
|
||||
logCh <- ""
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return logCh, nil
|
||||
}
|
||||
|
||||
// UpdateACLToken updates the agent's "acl_token". See updateToken for more
|
||||
// details.
|
||||
func (a *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
|
||||
return a.updateToken("acl_token", token, q)
|
||||
}
|
||||
|
||||
// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken
|
||||
// for more details.
|
||||
func (a *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
|
||||
return a.updateToken("acl_agent_token", token, q)
|
||||
}
|
||||
|
||||
// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See
|
||||
// updateToken for more details.
|
||||
func (a *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
|
||||
return a.updateToken("acl_agent_master_token", token, q)
|
||||
}
|
||||
|
||||
// UpdateACLReplicationToken updates the agent's "acl_replication_token". See
|
||||
// updateToken for more details.
|
||||
func (a *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
|
||||
return a.updateToken("acl_replication_token", token, q)
|
||||
}
|
||||
|
||||
// updateToken can be used to update an agent's ACL token after the agent has
|
||||
// started. The tokens are not persisted, so will need to be updated again if
|
||||
// the agent is restarted.
|
||||
func (a *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
|
||||
r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
|
||||
r.setWriteOptions(q)
|
||||
r.obj = &AgentToken{Token: token}
|
||||
rtt, resp, err := requireOK(a.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{RequestTime: rtt}
|
||||
return wm, nil
|
||||
}
|
||||
|
|
226
vendor/github.com/hashicorp/consul/api/api.go
generated
vendored
226
vendor/github.com/hashicorp/consul/api/api.go
generated
vendored
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -19,6 +18,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/hashicorp/go-rootcerts"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -38,6 +38,26 @@ const (
|
|||
// whether or not to use HTTPS.
|
||||
HTTPSSLEnvName = "CONSUL_HTTP_SSL"
|
||||
|
||||
// HTTPCAFile defines an environment variable name which sets the
|
||||
// CA file to use for talking to Consul over TLS.
|
||||
HTTPCAFile = "CONSUL_CACERT"
|
||||
|
||||
// HTTPCAPath defines an environment variable name which sets the
|
||||
// path to a directory of CA certs to use for talking to Consul over TLS.
|
||||
HTTPCAPath = "CONSUL_CAPATH"
|
||||
|
||||
// HTTPClientCert defines an environment variable name which sets the
|
||||
// client cert file to use for talking to Consul over TLS.
|
||||
HTTPClientCert = "CONSUL_CLIENT_CERT"
|
||||
|
||||
// HTTPClientKey defines an environment variable name which sets the
|
||||
// client key file to use for talking to Consul over TLS.
|
||||
HTTPClientKey = "CONSUL_CLIENT_KEY"
|
||||
|
||||
// HTTPTLSServerName defines an environment variable name which sets the
|
||||
// server name to use as the SNI host when connecting via TLS
|
||||
HTTPTLSServerName = "CONSUL_TLS_SERVER_NAME"
|
||||
|
||||
// HTTPSSLVerifyEnvName defines an environment variable name which sets
|
||||
// whether or not to disable certificate checking.
|
||||
HTTPSSLVerifyEnvName = "CONSUL_HTTP_SSL_VERIFY"
|
||||
|
@ -85,6 +105,26 @@ type QueryOptions struct {
|
|||
// relayed back to the sender through N other random nodes. Must be
|
||||
// a value from 0 to 5 (inclusive).
|
||||
RelayFactor uint8
|
||||
|
||||
// ctx is an optional context pass through to the underlying HTTP
|
||||
// request layer. Use Context() and WithContext() to manage this.
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (o *QueryOptions) Context() context.Context {
|
||||
if o != nil && o.ctx != nil {
|
||||
return o.ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func (o *QueryOptions) WithContext(ctx context.Context) *QueryOptions {
|
||||
o2 := new(QueryOptions)
|
||||
if o != nil {
|
||||
*o2 = *o
|
||||
}
|
||||
o2.ctx = ctx
|
||||
return o2
|
||||
}
|
||||
|
||||
// WriteOptions are used to parameterize a write
|
||||
|
@ -101,6 +141,26 @@ type WriteOptions struct {
|
|||
// relayed back to the sender through N other random nodes. Must be
|
||||
// a value from 0 to 5 (inclusive).
|
||||
RelayFactor uint8
|
||||
|
||||
// ctx is an optional context pass through to the underlying HTTP
|
||||
// request layer. Use Context() and WithContext() to manage this.
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (o *WriteOptions) Context() context.Context {
|
||||
if o != nil && o.ctx != nil {
|
||||
return o.ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func (o *WriteOptions) WithContext(ctx context.Context) *WriteOptions {
|
||||
o2 := new(WriteOptions)
|
||||
if o != nil {
|
||||
*o2 = *o
|
||||
}
|
||||
o2.ctx = ctx
|
||||
return o2
|
||||
}
|
||||
|
||||
// QueryMeta is used to return meta data about a query
|
||||
|
@ -149,6 +209,9 @@ type Config struct {
|
|||
// Datacenter to use. If not provided, the default agent datacenter is used.
|
||||
Datacenter string
|
||||
|
||||
// Transport is the Transport to use for the http client.
|
||||
Transport *http.Transport
|
||||
|
||||
// HttpClient is the client to use. Default will be
|
||||
// used if not provided.
|
||||
HttpClient *http.Client
|
||||
|
@ -163,6 +226,8 @@ type Config struct {
|
|||
// Token is used to provide a per-request ACL token
|
||||
// which overrides the agent's default token.
|
||||
Token string
|
||||
|
||||
TLSConfig TLSConfig
|
||||
}
|
||||
|
||||
// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
|
||||
|
@ -177,6 +242,10 @@ type TLSConfig struct {
|
|||
// communication, defaults to the system bundle if not specified.
|
||||
CAFile string
|
||||
|
||||
// CAPath is the optional path to a directory of CA certificates to use for
|
||||
// Consul communication, defaults to the system bundle if not specified.
|
||||
CAPath string
|
||||
|
||||
// CertFile is the optional path to the certificate for Consul
|
||||
// communication. If this is set then you need to also set KeyFile.
|
||||
CertFile string
|
||||
|
@ -212,11 +281,9 @@ func DefaultNonPooledConfig() *Config {
|
|||
// given function to make the transport.
|
||||
func defaultConfig(transportFn func() *http.Transport) *Config {
|
||||
config := &Config{
|
||||
Address: "127.0.0.1:8500",
|
||||
Scheme: "http",
|
||||
HttpClient: &http.Client{
|
||||
Transport: transportFn(),
|
||||
},
|
||||
Address: "127.0.0.1:8500",
|
||||
Scheme: "http",
|
||||
Transport: transportFn(),
|
||||
}
|
||||
|
||||
if addr := os.Getenv(HTTPAddrEnvName); addr != "" {
|
||||
|
@ -254,27 +321,28 @@ func defaultConfig(transportFn func() *http.Transport) *Config {
|
|||
}
|
||||
}
|
||||
|
||||
if verify := os.Getenv(HTTPSSLVerifyEnvName); verify != "" {
|
||||
doVerify, err := strconv.ParseBool(verify)
|
||||
if v := os.Getenv(HTTPTLSServerName); v != "" {
|
||||
config.TLSConfig.Address = v
|
||||
}
|
||||
if v := os.Getenv(HTTPCAFile); v != "" {
|
||||
config.TLSConfig.CAFile = v
|
||||
}
|
||||
if v := os.Getenv(HTTPCAPath); v != "" {
|
||||
config.TLSConfig.CAPath = v
|
||||
}
|
||||
if v := os.Getenv(HTTPClientCert); v != "" {
|
||||
config.TLSConfig.CertFile = v
|
||||
}
|
||||
if v := os.Getenv(HTTPClientKey); v != "" {
|
||||
config.TLSConfig.KeyFile = v
|
||||
}
|
||||
if v := os.Getenv(HTTPSSLVerifyEnvName); v != "" {
|
||||
doVerify, err := strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
log.Printf("[WARN] client: could not parse %s: %s", HTTPSSLVerifyEnvName, err)
|
||||
}
|
||||
|
||||
if !doVerify {
|
||||
tlsClientConfig, err := SetupTLSConfig(&TLSConfig{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
|
||||
// We don't expect this to fail given that we aren't
|
||||
// parsing any of the input, but we panic just in case
|
||||
// since this doesn't have an error return.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
transport := transportFn()
|
||||
transport.TLSClientConfig = tlsClientConfig
|
||||
config.HttpClient.Transport = transport
|
||||
config.TLSConfig.InsecureSkipVerify = true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,17 +377,14 @@ func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) {
|
|||
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
|
||||
}
|
||||
|
||||
if tlsConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(tlsConfig.CAFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read CA file: %v", err)
|
||||
if tlsConfig.CAFile != "" || tlsConfig.CAPath != "" {
|
||||
rootConfig := &rootcerts.Config{
|
||||
CAFile: tlsConfig.CAFile,
|
||||
CAPath: tlsConfig.CAPath,
|
||||
}
|
||||
|
||||
caPool := x509.NewCertPool()
|
||||
if !caPool.AppendCertsFromPEM(data) {
|
||||
return nil, fmt.Errorf("failed to parse CA certificate")
|
||||
if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsClientConfig.RootCAs = caPool
|
||||
}
|
||||
|
||||
return tlsClientConfig, nil
|
||||
|
@ -343,14 +408,47 @@ func NewClient(config *Config) (*Client, error) {
|
|||
config.Scheme = defConfig.Scheme
|
||||
}
|
||||
|
||||
if config.Transport == nil {
|
||||
config.Transport = defConfig.Transport
|
||||
}
|
||||
|
||||
if config.TLSConfig.Address == "" {
|
||||
config.TLSConfig.Address = defConfig.TLSConfig.Address
|
||||
}
|
||||
|
||||
if config.TLSConfig.CAFile == "" {
|
||||
config.TLSConfig.CAFile = defConfig.TLSConfig.CAFile
|
||||
}
|
||||
|
||||
if config.TLSConfig.CAPath == "" {
|
||||
config.TLSConfig.CAPath = defConfig.TLSConfig.CAPath
|
||||
}
|
||||
|
||||
if config.TLSConfig.CertFile == "" {
|
||||
config.TLSConfig.CertFile = defConfig.TLSConfig.CertFile
|
||||
}
|
||||
|
||||
if config.TLSConfig.KeyFile == "" {
|
||||
config.TLSConfig.KeyFile = defConfig.TLSConfig.KeyFile
|
||||
}
|
||||
|
||||
if !config.TLSConfig.InsecureSkipVerify {
|
||||
config.TLSConfig.InsecureSkipVerify = defConfig.TLSConfig.InsecureSkipVerify
|
||||
}
|
||||
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = defConfig.HttpClient
|
||||
var err error
|
||||
config.HttpClient, err = NewHttpClient(config.Transport, config.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
parts := strings.SplitN(config.Address, "://", 2)
|
||||
if len(parts) == 2 {
|
||||
switch parts[0] {
|
||||
case "http":
|
||||
config.Scheme = "http"
|
||||
case "https":
|
||||
config.Scheme = "https"
|
||||
case "unix":
|
||||
|
@ -367,9 +465,38 @@ func NewClient(config *Config) (*Client, error) {
|
|||
config.Address = parts[1]
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
config: *config,
|
||||
if config.Token == "" {
|
||||
config.Token = defConfig.Token
|
||||
}
|
||||
|
||||
return &Client{config: *config}, nil
|
||||
}
|
||||
|
||||
// NewHttpClient returns an http client configured with the given Transport and TLS
|
||||
// config.
|
||||
func NewHttpClient(transport *http.Transport, tlsConf TLSConfig) (*http.Client, error) {
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
// TODO (slackpad) - Once we get some run time on the HTTP/2 support we
|
||||
// should turn it on by default if TLS is enabled. We would basically
|
||||
// just need to call http2.ConfigureTransport(transport) here. We also
|
||||
// don't want to introduce another external dependency on
|
||||
// golang.org/x/net/http2 at this time. For a complete recipe for how
|
||||
// to enable HTTP/2 support on a transport suitable for the API client
|
||||
// library see agent/http_test.go:TestHTTPServer_H2.
|
||||
|
||||
if transport.TLSClientConfig == nil {
|
||||
tlsClientConfig, err := SetupTLSConfig(&tlsConf)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport.TLSClientConfig = tlsClientConfig
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
|
@ -382,6 +509,7 @@ type request struct {
|
|||
body io.Reader
|
||||
header http.Header
|
||||
obj interface{}
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// setQueryOptions is used to annotate the request with
|
||||
|
@ -419,6 +547,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
|
|||
if q.RelayFactor != 0 {
|
||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||
}
|
||||
r.ctx = q.ctx
|
||||
}
|
||||
|
||||
// durToMsec converts a duration to a millisecond specified string. If the
|
||||
|
@ -436,13 +565,20 @@ func durToMsec(dur time.Duration) string {
|
|||
// serverError is a string we look for to detect 500 errors.
|
||||
const serverError = "Unexpected response code: 500"
|
||||
|
||||
// IsServerError returns true for 500 errors from the Consul servers, these are
|
||||
// usually retryable at a later time.
|
||||
func IsServerError(err error) bool {
|
||||
// IsRetryableError returns true for 500 errors from the Consul servers, and
|
||||
// network connection errors. These are usually retryable at a later time.
|
||||
// This applies to reads but NOT to writes. This may return true for errors
|
||||
// on writes that may have still gone through, so do not use this to retry
|
||||
// any write operations.
|
||||
func IsRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if _, ok := err.(net.Error); ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// TODO (slackpad) - Make a real error type here instead of using
|
||||
// a string check.
|
||||
return strings.Contains(err.Error(), serverError)
|
||||
|
@ -463,6 +599,7 @@ func (r *request) setWriteOptions(q *WriteOptions) {
|
|||
if q.RelayFactor != 0 {
|
||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||
}
|
||||
r.ctx = q.ctx
|
||||
}
|
||||
|
||||
// toHTTP converts the request to an HTTP request
|
||||
|
@ -472,11 +609,11 @@ func (r *request) toHTTP() (*http.Request, error) {
|
|||
|
||||
// Check if we should encode the body
|
||||
if r.body == nil && r.obj != nil {
|
||||
if b, err := encodeBody(r.obj); err != nil {
|
||||
b, err := encodeBody(r.obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
r.body = b
|
||||
}
|
||||
r.body = b
|
||||
}
|
||||
|
||||
// Create the HTTP request
|
||||
|
@ -494,6 +631,9 @@ func (r *request) toHTTP() (*http.Request, error) {
|
|||
if r.config.HttpAuth != nil {
|
||||
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
|
||||
}
|
||||
if r.ctx != nil {
|
||||
return req.WithContext(r.ctx), nil
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
@ -531,7 +671,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
|
|||
}
|
||||
start := time.Now()
|
||||
resp, err := c.config.HttpClient.Do(req)
|
||||
diff := time.Now().Sub(start)
|
||||
diff := time.Since(start)
|
||||
return diff, resp, err
|
||||
}
|
||||
|
||||
|
@ -574,6 +714,8 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*
|
|||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if _, err := ioutil.ReadAll(resp.Body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wm, nil
|
||||
}
|
||||
|
|
5
vendor/github.com/hashicorp/consul/api/catalog.go
generated
vendored
5
vendor/github.com/hashicorp/consul/api/catalog.go
generated
vendored
|
@ -4,14 +4,18 @@ type Node struct {
|
|||
ID string
|
||||
Node string
|
||||
Address string
|
||||
Datacenter string
|
||||
TaggedAddresses map[string]string
|
||||
Meta map[string]string
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
type CatalogService struct {
|
||||
ID string
|
||||
Node string
|
||||
Address string
|
||||
Datacenter string
|
||||
TaggedAddresses map[string]string
|
||||
NodeMeta map[string]string
|
||||
ServiceID string
|
||||
|
@ -38,6 +42,7 @@ type CatalogRegistration struct {
|
|||
Datacenter string
|
||||
Service *AgentService
|
||||
Check *AgentCheck
|
||||
SkipNodeUpdate bool
|
||||
}
|
||||
|
||||
type CatalogDeregistration struct {
|
||||
|
|
43
vendor/github.com/hashicorp/consul/api/coordinate.go
generated
vendored
43
vendor/github.com/hashicorp/consul/api/coordinate.go
generated
vendored
|
@ -6,8 +6,9 @@ import (
|
|||
|
||||
// CoordinateEntry represents a node and its associated network coordinate.
|
||||
type CoordinateEntry struct {
|
||||
Node string
|
||||
Coord *coordinate.Coordinate
|
||||
Node string
|
||||
Segment string
|
||||
Coord *coordinate.Coordinate
|
||||
}
|
||||
|
||||
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter
|
||||
|
@ -65,3 +66,41 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err
|
|||
}
|
||||
return out, qm, nil
|
||||
}
|
||||
|
||||
// Update inserts or updates the LAN coordinate of a node.
|
||||
func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta, error) {
|
||||
r := c.c.newRequest("PUT", "/v1/coordinate/update")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = coord
|
||||
rtt, resp, err := requireOK(c.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{}
|
||||
wm.RequestTime = rtt
|
||||
|
||||
return wm, nil
|
||||
}
|
||||
|
||||
// Node is used to return the coordinates of a single in the LAN pool.
|
||||
func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
|
||||
r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
|
||||
r.setQueryOptions(q)
|
||||
rtt, resp, err := requireOK(c.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
var out []*CoordinateEntry
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return out, qm, nil
|
||||
}
|
||||
|
|
16
vendor/github.com/hashicorp/consul/api/health.go
generated
vendored
16
vendor/github.com/hashicorp/consul/api/health.go
generated
vendored
|
@ -33,6 +33,22 @@ type HealthCheck struct {
|
|||
Output string
|
||||
ServiceID string
|
||||
ServiceName string
|
||||
ServiceTags []string
|
||||
|
||||
Definition HealthCheckDefinition
|
||||
}
|
||||
|
||||
// HealthCheckDefinition is used to store the details about
|
||||
// a health check's execution.
|
||||
type HealthCheckDefinition struct {
|
||||
HTTP string
|
||||
Header map[string][]string
|
||||
Method string
|
||||
TLSSkipVerify bool
|
||||
TCP string
|
||||
Interval ReadableDuration
|
||||
Timeout ReadableDuration
|
||||
DeregisterCriticalServiceAfter ReadableDuration
|
||||
}
|
||||
|
||||
// HealthChecks is a collection of HealthCheck structs.
|
||||
|
|
53
vendor/github.com/hashicorp/consul/api/kv.go
generated
vendored
53
vendor/github.com/hashicorp/consul/api/kv.go
generated
vendored
|
@ -49,17 +49,18 @@ type KVPairs []*KVPair
|
|||
type KVOp string
|
||||
|
||||
const (
|
||||
KVSet KVOp = "set"
|
||||
KVDelete KVOp = "delete"
|
||||
KVDeleteCAS KVOp = "delete-cas"
|
||||
KVDeleteTree KVOp = "delete-tree"
|
||||
KVCAS KVOp = "cas"
|
||||
KVLock KVOp = "lock"
|
||||
KVUnlock KVOp = "unlock"
|
||||
KVGet KVOp = "get"
|
||||
KVGetTree KVOp = "get-tree"
|
||||
KVCheckSession KVOp = "check-session"
|
||||
KVCheckIndex KVOp = "check-index"
|
||||
KVSet KVOp = "set"
|
||||
KVDelete KVOp = "delete"
|
||||
KVDeleteCAS KVOp = "delete-cas"
|
||||
KVDeleteTree KVOp = "delete-tree"
|
||||
KVCAS KVOp = "cas"
|
||||
KVLock KVOp = "lock"
|
||||
KVUnlock KVOp = "unlock"
|
||||
KVGet KVOp = "get"
|
||||
KVGetTree KVOp = "get-tree"
|
||||
KVCheckSession KVOp = "check-session"
|
||||
KVCheckIndex KVOp = "check-index"
|
||||
KVCheckNotExists KVOp = "check-not-exists"
|
||||
)
|
||||
|
||||
// KVTxnOp defines a single operation inside a transaction.
|
||||
|
@ -251,7 +252,7 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti
|
|||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, nil, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
res := strings.Contains(buf.String(), "true")
|
||||
return res, qm, nil
|
||||
}
|
||||
|
||||
|
@ -295,7 +296,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
|
|||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, nil, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
res := strings.Contains(buf.String(), "true")
|
||||
return res, qm, nil
|
||||
}
|
||||
|
||||
|
@ -352,19 +353,19 @@ type TxnResponse struct {
|
|||
//
|
||||
// Here's an example:
|
||||
//
|
||||
// ops := KVTxnOps{
|
||||
// &KVTxnOp{
|
||||
// Verb: KVLock,
|
||||
// Key: "test/lock",
|
||||
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
// Value: []byte("hello"),
|
||||
// },
|
||||
// &KVTxnOp{
|
||||
// Verb: KVGet,
|
||||
// Key: "another/key",
|
||||
// },
|
||||
// }
|
||||
// ok, response, _, err := kv.Txn(&ops, nil)
|
||||
// ops := KVTxnOps{
|
||||
// &KVTxnOp{
|
||||
// Verb: KVLock,
|
||||
// Key: "test/lock",
|
||||
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
// Value: []byte("hello"),
|
||||
// },
|
||||
// &KVTxnOp{
|
||||
// Verb: KVGet,
|
||||
// Key: "another/key",
|
||||
// },
|
||||
// }
|
||||
// ok, response, _, err := kv.Txn(&ops, nil)
|
||||
//
|
||||
// If there is a problem making the transaction request then an error will be
|
||||
// returned. Otherwise, the ok value will be true if the transaction succeeded
|
||||
|
|
33
vendor/github.com/hashicorp/consul/api/lock.go
generated
vendored
33
vendor/github.com/hashicorp/consul/api/lock.go
generated
vendored
|
@ -143,22 +143,23 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|||
// Check if we need to create a session first
|
||||
l.lockSession = l.opts.Session
|
||||
if l.lockSession == "" {
|
||||
if s, err := l.createSession(); err != nil {
|
||||
s, err := l.createSession()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create session: %v", err)
|
||||
} else {
|
||||
l.sessionRenew = make(chan struct{})
|
||||
l.lockSession = s
|
||||
session := l.c.Session()
|
||||
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
|
||||
|
||||
// If we fail to acquire the lock, cleanup the session
|
||||
defer func() {
|
||||
if !l.isHeld {
|
||||
close(l.sessionRenew)
|
||||
l.sessionRenew = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
l.sessionRenew = make(chan struct{})
|
||||
l.lockSession = s
|
||||
session := l.c.Session()
|
||||
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
|
||||
|
||||
// If we fail to acquire the lock, cleanup the session
|
||||
defer func() {
|
||||
if !l.isHeld {
|
||||
close(l.sessionRenew)
|
||||
l.sessionRenew = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Setup the query options
|
||||
|
@ -179,7 +180,7 @@ WAIT:
|
|||
|
||||
// Handle the one-shot mode.
|
||||
if l.opts.LockTryOnce && attempts > 0 {
|
||||
elapsed := time.Now().Sub(start)
|
||||
elapsed := time.Since(start)
|
||||
if elapsed > qOpts.WaitTime {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -369,7 +370,7 @@ RETRY:
|
|||
// by doing retries. Note that we have to attempt the retry in a non-
|
||||
// blocking fashion so that we have a clean place to reset the retry
|
||||
// counter if service is restored.
|
||||
if retries > 0 && IsServerError(err) {
|
||||
if retries > 0 && IsRetryableError(err) {
|
||||
time.Sleep(l.opts.MonitorRetryTime)
|
||||
retries--
|
||||
opts.WaitIndex = 0
|
||||
|
|
25
vendor/github.com/hashicorp/consul/api/operator_area.go
generated
vendored
25
vendor/github.com/hashicorp/consul/api/operator_area.go
generated
vendored
|
@ -25,6 +25,10 @@ type Area struct {
|
|||
// RetryJoin specifies the address of Consul servers to join to, such as
|
||||
// an IPs or hostnames with an optional port number. This is optional.
|
||||
RetryJoin []string
|
||||
|
||||
// UseTLS specifies whether gossip over this area should be encrypted with TLS
|
||||
// if possible.
|
||||
UseTLS bool
|
||||
}
|
||||
|
||||
// AreaJoinResponse is returned when a join occurs and gives the result for each
|
||||
|
@ -100,6 +104,27 @@ func (op *Operator) AreaCreate(area *Area, q *WriteOptions) (string, *WriteMeta,
|
|||
return out.ID, wm, nil
|
||||
}
|
||||
|
||||
// AreaUpdate will update the configuration of the network area with the given ID.
|
||||
func (op *Operator) AreaUpdate(areaID string, area *Area, q *WriteOptions) (string, *WriteMeta, error) {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/area/"+areaID)
|
||||
r.setWriteOptions(q)
|
||||
r.obj = area
|
||||
rtt, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{}
|
||||
wm.RequestTime = rtt
|
||||
|
||||
var out struct{ ID string }
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return out.ID, wm, nil
|
||||
}
|
||||
|
||||
// AreaGet returns a single network area.
|
||||
func (op *Operator) AreaGet(areaID string, q *QueryOptions) ([]*Area, *QueryMeta, error) {
|
||||
var out []*Area
|
||||
|
|
6
vendor/github.com/hashicorp/consul/api/operator_autopilot.go
generated
vendored
6
vendor/github.com/hashicorp/consul/api/operator_autopilot.go
generated
vendored
|
@ -39,6 +39,10 @@ type AutopilotConfiguration struct {
|
|||
// cluster before promoting them to voters.
|
||||
DisableUpgradeMigration bool
|
||||
|
||||
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
|
||||
// performing upgrade migrations. If left blank, the Consul version will be used.
|
||||
UpgradeVersionTag string
|
||||
|
||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||
// This is a read-only field.
|
||||
CreateIndex uint64
|
||||
|
@ -192,7 +196,7 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
|
|||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
res := strings.Contains(buf.String(), "true")
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
|
3
vendor/github.com/hashicorp/consul/api/operator_keyring.go
generated
vendored
3
vendor/github.com/hashicorp/consul/api/operator_keyring.go
generated
vendored
|
@ -13,6 +13,9 @@ type KeyringResponse struct {
|
|||
// The datacenter name this request corresponds to
|
||||
Datacenter string
|
||||
|
||||
// Segment has the network segment this request corresponds to.
|
||||
Segment string
|
||||
|
||||
// A map of the encryption keys to the number of nodes they're installed on
|
||||
Keys map[string]int
|
||||
|
||||
|
|
5
vendor/github.com/hashicorp/consul/api/operator_raft.go
generated
vendored
5
vendor/github.com/hashicorp/consul/api/operator_raft.go
generated
vendored
|
@ -17,6 +17,9 @@ type RaftServer struct {
|
|||
// Leader is true if this server is the current cluster leader.
|
||||
Leader bool
|
||||
|
||||
// Protocol version is the raft protocol version used by the server
|
||||
ProtocolVersion string
|
||||
|
||||
// Voter is true if this server has a vote in the cluster. This might
|
||||
// be false if the server is staging and still coming online, or if
|
||||
// it's a non-voting server, which will be added in a future release of
|
||||
|
@ -24,7 +27,7 @@ type RaftServer struct {
|
|||
Voter bool
|
||||
}
|
||||
|
||||
// RaftConfigration is returned when querying for the current Raft configuration.
|
||||
// RaftConfiguration is returned when querying for the current Raft configuration.
|
||||
type RaftConfiguration struct {
|
||||
// Servers has the list of servers in the Raft configuration.
|
||||
Servers []*RaftServer
|
||||
|
|
11
vendor/github.com/hashicorp/consul/api/operator_segment.go
generated
vendored
Normal file
11
vendor/github.com/hashicorp/consul/api/operator_segment.go
generated
vendored
Normal file
|
@ -0,0 +1,11 @@
|
|||
package api
|
||||
|
||||
// SegmentList returns all the available LAN segments.
|
||||
func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) {
|
||||
var out []string
|
||||
qm, err := op.c.query("/v1/operator/segment", &out, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return out, qm, nil
|
||||
}
|
33
vendor/github.com/hashicorp/consul/api/semaphore.go
generated
vendored
33
vendor/github.com/hashicorp/consul/api/semaphore.go
generated
vendored
|
@ -155,22 +155,23 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|||
// Check if we need to create a session first
|
||||
s.lockSession = s.opts.Session
|
||||
if s.lockSession == "" {
|
||||
if sess, err := s.createSession(); err != nil {
|
||||
sess, err := s.createSession()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create session: %v", err)
|
||||
} else {
|
||||
s.sessionRenew = make(chan struct{})
|
||||
s.lockSession = sess
|
||||
session := s.c.Session()
|
||||
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
|
||||
|
||||
// If we fail to acquire the lock, cleanup the session
|
||||
defer func() {
|
||||
if !s.isHeld {
|
||||
close(s.sessionRenew)
|
||||
s.sessionRenew = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.sessionRenew = make(chan struct{})
|
||||
s.lockSession = sess
|
||||
session := s.c.Session()
|
||||
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
|
||||
|
||||
// If we fail to acquire the lock, cleanup the session
|
||||
defer func() {
|
||||
if !s.isHeld {
|
||||
close(s.sessionRenew)
|
||||
s.sessionRenew = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Create the contender entry
|
||||
|
@ -197,7 +198,7 @@ WAIT:
|
|||
|
||||
// Handle the one-shot mode.
|
||||
if s.opts.SemaphoreTryOnce && attempts > 0 {
|
||||
elapsed := time.Now().Sub(start)
|
||||
elapsed := time.Since(start)
|
||||
if elapsed > qOpts.WaitTime {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -491,7 +492,7 @@ RETRY:
|
|||
// by doing retries. Note that we have to attempt the retry in a non-
|
||||
// blocking fashion so that we have a clean place to reset the retry
|
||||
// counter if service is restored.
|
||||
if retries > 0 && IsServerError(err) {
|
||||
if retries > 0 && IsRetryableError(err) {
|
||||
time.Sleep(s.opts.MonitorRetryTime)
|
||||
retries--
|
||||
opts.WaitIndex = 0
|
||||
|
|
9
vendor/github.com/hashicorp/consul/api/session.go
generated
vendored
9
vendor/github.com/hashicorp/consul/api/session.go
generated
vendored
|
@ -145,7 +145,9 @@ func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta,
|
|||
// RenewPeriodic is used to periodically invoke Session.Renew on a
|
||||
// session until a doneCh is closed. This is meant to be used in a long running
|
||||
// goroutine to ensure a session stays valid.
|
||||
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error {
|
||||
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh <-chan struct{}) error {
|
||||
ctx := q.Context()
|
||||
|
||||
ttl, err := time.ParseDuration(initialTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -179,6 +181,11 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d
|
|||
// Attempt a session destroy
|
||||
s.Destroy(id, q)
|
||||
return nil
|
||||
|
||||
case <-ctx.Done():
|
||||
// Bail immediately since attempting the destroy would
|
||||
// use the canceled context in q, which would just bail.
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue