1
0
Fork 0

Add support for readiness checks.

This commit is contained in:
Timo Reimann 2017-08-18 03:08:03 +02:00 committed by Traefiker
parent 3f76f73e8c
commit ea3510d1f3
8 changed files with 483 additions and 117 deletions

View file

@ -1,12 +1,21 @@
package marathon
import "github.com/gambol99/go-marathon"
import (
"time"
"github.com/gambol99/go-marathon"
)
const testTaskName string = "taskID"
// Functions related to building applications.
func createApplication(ops ...func(*marathon.Application)) marathon.Application {
func application(ops ...func(*marathon.Application)) marathon.Application {
app := marathon.Application{}
app.EmptyLabels()
app.Deployments = []map[string]string{}
app.ReadinessChecks = &[]marathon.ReadinessCheck{}
app.ReadinessCheckResults = &[]marathon.ReadinessCheckResult{}
for _, op := range ops {
op(&app)
@ -63,10 +72,41 @@ func ipAddrPerTask(port int) func(*marathon.Application) {
}
}
func deployments(ids ...string) func(*marathon.Application) {
return func(app *marathon.Application) {
for _, id := range ids {
app.Deployments = append(app.Deployments, map[string]string{
"ID": id,
})
}
}
}
func readinessCheck(timeout time.Duration) func(*marathon.Application) {
return func(app *marathon.Application) {
app.ReadinessChecks = &[]marathon.ReadinessCheck{
{
Path: "/ready",
TimeoutSeconds: int(timeout.Seconds()),
},
}
}
}
func readinessCheckResult(taskID string, ready bool) func(*marathon.Application) {
return func(app *marathon.Application) {
*app.ReadinessCheckResults = append(*app.ReadinessCheckResults, marathon.ReadinessCheckResult{
TaskID: taskID,
Ready: ready,
})
}
}
// Functions related to building tasks.
func createTask(ops ...func(*marathon.Task)) marathon.Task {
func task(ops ...func(*marathon.Task)) marathon.Task {
t := marathon.Task{
ID: testTaskName,
// The vast majority of tests expect the task state to be TASK_RUNNING.
State: string(taskStateRunning),
}
@ -78,8 +118,8 @@ func createTask(ops ...func(*marathon.Task)) marathon.Task {
return t
}
func createLocalhostTask(ops ...func(*marathon.Task)) marathon.Task {
t := createTask(
func localhostTask(ops ...func(*marathon.Task)) marathon.Task {
t := task(
host("localhost"),
ipAddresses("127.0.0.1"),
)
@ -129,3 +169,15 @@ func healthCheckResultLiveness(alive ...bool) func(*marathon.Task) {
}
}
}
func startedAt(timestamp string) func(*marathon.Task) {
return func(t *marathon.Task) {
t.StartedAt = timestamp
}
}
func startedAtFromNow(offset time.Duration) func(*marathon.Task) {
return func(t *marathon.Task) {
t.StartedAt = time.Now().Add(-offset).Format(time.RFC3339)
}
}

View file

@ -26,6 +26,13 @@ import (
const (
traceMaxScanTokenSize = 1024 * 1024
marathonEventIDs = marathon.EventIDApplications |
marathon.EventIDAddHealthCheck |
marathon.EventIDDeploymentSuccess |
marathon.EventIDDeploymentFailed |
marathon.EventIDDeploymentInfo |
marathon.EventIDDeploymentStepSuccess |
marathon.EventIDDeploymentStepFailed
)
// TaskState denotes the Mesos state a task can have.
@ -52,6 +59,8 @@ type Provider struct {
KeepAlive flaeg.Duration `description:"Set a non-default TCP Keep Alive time in seconds"`
ForceTaskHostname bool `description:"Force to use the task's hostname."`
Basic *Basic `description:"Enable basic authentication"`
RespectReadinessChecks bool `description:"Filter out tasks with non-successful readiness checks during deployments"`
readyChecker *readinessChecker
marathonClient marathon.Marathon
}
@ -80,6 +89,13 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
config.HTTPBasicAuthUser = p.Basic.HTTPBasicAuthUser
config.HTTPBasicPassword = p.Basic.HTTPBasicPassword
}
var rc *readinessChecker
if p.RespectReadinessChecks {
log.Debug("Enabling Marathon readiness checker")
rc = defaultReadinessChecker(p.Trace)
}
p.readyChecker = rc
if len(p.DCOSToken) > 0 {
config.DCOSToken = p.DCOSToken
}
@ -104,7 +120,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
p.marathonClient = client
if p.Watch {
update, err := client.AddEventsListener(marathon.EventIDApplications)
update, err := client.AddEventsListener(marathonEventIDs)
if err != nil {
log.Errorf("Failed to register for events, %s", err)
return err
@ -116,7 +132,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
case <-stop:
return
case event := <-update:
log.Debug("Provider event received", event)
log.Debugf("Received provider event %s", event)
configuration := p.loadMarathonConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
@ -175,6 +191,8 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
v := url.Values{}
v.Add("embed", "apps.tasks")
v.Add("embed", "apps.deployments")
v.Add("embed", "apps.readiness")
applications, err := p.marathonClient.Applications(v)
if err != nil {
log.Errorf("Failed to retrieve Marathon applications: %s", err)
@ -258,6 +276,11 @@ func (p *Provider) taskFilter(task marathon.Task, application marathon.Applicati
}
}
if ready := p.readyChecker.Do(task, application); !ready {
log.Infof("Filtering unready task %s from application %s", task.ID, application.ID)
return false
}
return true
}

View file

@ -52,8 +52,8 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
}{
{
desc: "simple application",
application: createApplication(appPorts(80)),
task: createLocalhostTask(taskPorts(80)),
application: application(appPorts(80)),
task: localhostTask(taskPorts(80)),
expectedFrontends: map[string]*types.Frontend{
"frontend-app": {
Backend: "backend-app",
@ -78,8 +78,8 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "filtered task",
application: createApplication(appPorts(80)),
task: createLocalhostTask(
application: application(appPorts(80)),
task: localhostTask(
taskPorts(80),
state(taskStateStaging),
),
@ -97,12 +97,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "load balancer / circuit breaker labels",
application: createApplication(
application: application(
appPorts(80),
label(types.LabelBackendLoadbalancerMethod, "drr"),
label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5"),
),
task: createLocalhostTask(taskPorts(80)),
task: localhostTask(taskPorts(80)),
expectedFrontends: map[string]*types.Frontend{
"frontend-app": {
Backend: "backend-app",
@ -132,12 +132,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "general max connection labels",
application: createApplication(
application: application(
appPorts(80),
label(types.LabelBackendMaxconnAmount, "1000"),
label(types.LabelBackendMaxconnExtractorfunc, "client.ip"),
),
task: createLocalhostTask(taskPorts(80)),
task: localhostTask(taskPorts(80)),
expectedFrontends: map[string]*types.Frontend{
"frontend-app": {
Backend: "backend-app",
@ -165,11 +165,11 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "max connection amount label only",
application: createApplication(
application: application(
appPorts(80),
label(types.LabelBackendMaxconnAmount, "1000"),
),
task: createLocalhostTask(taskPorts(80)),
task: localhostTask(taskPorts(80)),
expectedFrontends: map[string]*types.Frontend{
"frontend-app": {
Backend: "backend-app",
@ -194,11 +194,11 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "max connection extractor function label only",
application: createApplication(
application: application(
appPorts(80),
label(types.LabelBackendMaxconnExtractorfunc, "client.ip"),
),
task: createLocalhostTask(taskPorts(80)),
task: localhostTask(taskPorts(80)),
expectedFrontends: map[string]*types.Frontend{
"frontend-app": {
Backend: "backend-app",
@ -223,12 +223,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
},
{
desc: "health check labels",
application: createApplication(
application: application(
appPorts(80),
label(types.LabelBackendHealthcheckPath, "/path"),
label(types.LabelBackendHealthcheckInterval, "5m"),
),
task: createTask(
task: task(
host("127.0.0.1"),
taskPorts(80),
),
@ -298,36 +298,37 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) {
func TestMarathonTaskFilter(t *testing.T) {
cases := []struct {
desc string
task marathon.Task
application marathon.Application
expected bool
desc string
task marathon.Task
application marathon.Application
readyChecker *readinessChecker
expected bool
}{
{
desc: "missing port",
task: createTask(),
application: createApplication(),
task: task(),
application: application(),
expected: false,
},
{
desc: "task not running",
task: createTask(
task: task(
taskPorts(80),
state(taskStateStaging),
),
application: createApplication(appPorts(80)),
application: application(appPorts(80)),
expected: false,
},
{
desc: "existing port",
task: createTask(taskPorts(80)),
application: createApplication(appPorts(80)),
task: task(taskPorts(80)),
application: application(appPorts(80)),
expected: true,
},
{
desc: "ambiguous port specification",
task: createTask(taskPorts(80, 443)),
application: createApplication(
task: task(taskPorts(80, 443)),
application: application(
appPorts(80, 443),
label(types.LabelPort, "443"),
label(types.LabelPortIndex, "1"),
@ -336,8 +337,8 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
desc: "healthcheck available",
task: createTask(taskPorts(80)),
application: createApplication(
task: task(taskPorts(80)),
application: application(
appPorts(80),
healthChecks(marathon.NewDefaultHealthCheck()),
),
@ -345,11 +346,11 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
desc: "healthcheck result false",
task: createTask(
task: task(
taskPorts(80),
healthCheckResultLiveness(false),
),
application: createApplication(
application: application(
appPorts(80),
healthChecks(marathon.NewDefaultHealthCheck()),
),
@ -357,11 +358,11 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
desc: "healthcheck results mixed",
task: createTask(
task: task(
taskPorts(80),
healthCheckResultLiveness(true, false),
),
application: createApplication(
application: application(
appPorts(80),
healthChecks(marathon.NewDefaultHealthCheck()),
),
@ -369,23 +370,35 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
desc: "healthcheck result true",
task: createTask(
task: task(
taskPorts(80),
healthCheckResultLiveness(true),
),
application: createApplication(
application: application(
appPorts(80),
healthChecks(marathon.NewDefaultHealthCheck()),
),
expected: true,
},
{
desc: "readiness check false",
task: task(taskPorts(80)),
application: application(
appPorts(80),
deployments("deploymentId"),
readinessCheck(0),
readinessCheckResult(testTaskName, false),
),
readyChecker: testReadinessChecker(),
expected: false,
},
}
for _, c := range cases {
c := c
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
provider := &Provider{}
provider := &Provider{readyChecker: c.readyChecker}
actual := provider.taskFilter(c.task, c.application)
if actual != c.expected {
t.Errorf("actual %v, expected %v", actual, c.expected)
@ -403,19 +416,19 @@ func TestMarathonApplicationFilterConstraints(t *testing.T) {
}{
{
desc: "tags missing",
application: createApplication(),
application: application(),
marathonLBCompatibility: false,
expected: false,
},
{
desc: "tag matching",
application: createApplication(label(types.LabelTags, "valid")),
application: application(label(types.LabelTags, "valid")),
marathonLBCompatibility: false,
expected: true,
},
{
desc: "LB compatibility tag matching",
application: createApplication(
application: application(
label("HAPROXY_GROUP", "valid"),
label(types.LabelTags, "notvalid"),
),
@ -495,7 +508,7 @@ func TestMarathonApplicationFilterEnabled(t *testing.T) {
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
provider := &Provider{ExposedByDefault: c.exposedByDefault}
app := createApplication(label(types.LabelEnable, c.enabledLabel))
app := application(label(types.LabelEnable, c.enabledLabel))
if provider.applicationFilter(app) != c.expected {
t.Errorf("got unexpected filtering = %t", !c.expected)
}
@ -514,70 +527,70 @@ func TestMarathonGetPort(t *testing.T) {
}{
{
desc: "port missing",
application: createApplication(),
task: createTask(),
application: application(),
task: task(),
expected: "",
},
{
desc: "numeric port",
application: createApplication(label(types.LabelPort, "80")),
task: createTask(),
application: application(label(types.LabelPort, "80")),
task: task(),
expected: "80",
},
{
desc: "string port",
application: createApplication(label(types.LabelPort, "foobar")),
task: createTask(taskPorts(80)),
application: application(label(types.LabelPort, "foobar")),
task: task(taskPorts(80)),
expected: "",
},
{
desc: "negative port",
application: createApplication(label(types.LabelPort, "-1")),
task: createTask(taskPorts(80)),
application: application(label(types.LabelPort, "-1")),
task: task(taskPorts(80)),
expected: "",
},
{
desc: "task port available",
application: createApplication(),
task: createTask(taskPorts(80)),
application: application(),
task: task(taskPorts(80)),
expected: "80",
},
{
desc: "port definition available",
application: createApplication(
application: application(
portDefinition(443),
),
task: createTask(),
task: task(),
expected: "443",
},
{
desc: "IP-per-task port available",
application: createApplication(ipAddrPerTask(8000)),
task: createTask(),
application: application(ipAddrPerTask(8000)),
task: task(),
expected: "8000",
},
{
desc: "multiple task ports available",
application: createApplication(),
task: createTask(taskPorts(80, 443)),
application: application(),
task: task(taskPorts(80, 443)),
expected: "80",
},
{
desc: "numeric port index specified",
application: createApplication(label(types.LabelPortIndex, "1")),
task: createTask(taskPorts(80, 443)),
application: application(label(types.LabelPortIndex, "1")),
task: task(taskPorts(80, 443)),
expected: "443",
},
{
desc: "string port index specified",
application: createApplication(label(types.LabelPortIndex, "foobar")),
task: createTask(taskPorts(80)),
application: application(label(types.LabelPortIndex, "foobar")),
task: task(taskPorts(80)),
expected: "",
},
{
desc: "task and application ports specified",
application: createApplication(appPorts(9999)),
task: createTask(taskPorts(7777)),
application: application(appPorts(9999)),
task: task(taskPorts(7777)),
expected: "7777",
},
}
@ -602,12 +615,12 @@ func TestMarathonGetWeight(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "0",
},
{
desc: "label existing",
application: createApplication(label(types.LabelWeight, "10")),
application: application(label(types.LabelWeight, "10")),
expected: "10",
},
}
@ -633,12 +646,12 @@ func TestMarathonGetDomain(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "docker.localhost",
},
{
desc: "label existing",
application: createApplication(label(types.LabelDomain, "foo.bar")),
application: application(label(types.LabelDomain, "foo.bar")),
expected: "foo.bar",
},
}
@ -666,12 +679,12 @@ func TestMarathonGetProtocol(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "http",
},
{
desc: "label existing",
application: createApplication(label(types.LabelProtocol, "https")),
application: application(label(types.LabelProtocol, "https")),
expected: "https",
},
}
@ -697,12 +710,12 @@ func TestMarathonGetSticky(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "false",
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackendLoadbalancerSticky, "true")),
application: application(label(types.LabelBackendLoadbalancerSticky, "true")),
expected: "true",
},
}
@ -728,12 +741,12 @@ func TestMarathonGetPassHostHeader(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "true",
},
{
desc: "label existing",
application: createApplication(label(types.LabelFrontendPassHostHeader, "false")),
application: application(label(types.LabelFrontendPassHostHeader, "false")),
expected: "false",
},
}
@ -759,17 +772,17 @@ func TestMarathonMaxConnAmount(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: math.MaxInt64,
},
{
desc: "non-integer value",
application: createApplication(label(types.LabelBackendMaxconnAmount, "foobar")),
application: application(label(types.LabelBackendMaxconnAmount, "foobar")),
expected: math.MaxInt64,
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackendMaxconnAmount, "32")),
application: application(label(types.LabelBackendMaxconnAmount, "32")),
expected: 32,
},
}
@ -795,12 +808,12 @@ func TestMarathonGetMaxConnExtractorFunc(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "request.host",
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackendMaxconnExtractorfunc, "client.ip")),
application: application(label(types.LabelBackendMaxconnExtractorfunc, "client.ip")),
expected: "client.ip",
},
}
@ -826,12 +839,12 @@ func TestMarathonGetLoadBalancerMethod(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "wrr",
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackendLoadbalancerMethod, "drr")),
application: application(label(types.LabelBackendLoadbalancerMethod, "drr")),
expected: "drr",
},
}
@ -857,12 +870,12 @@ func TestMarathonGetCircuitBreakerExpression(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: "NetworkErrorRatio() > 1",
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5")),
application: application(label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5")),
expected: "NetworkErrorRatio() > 0.5",
},
}
@ -888,12 +901,12 @@ func TestMarathonGetEntryPoints(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: []string{},
},
{
desc: "label existing",
application: createApplication(label(types.LabelFrontendEntryPoints, "http,https")),
application: application(label(types.LabelFrontendEntryPoints, "http,https")),
expected: []string{"http", "https"},
},
}
@ -920,13 +933,13 @@ func TestMarathonGetFrontendRule(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(appID("test")),
application: application(appID("test")),
marathonLBCompatibility: true,
expected: "Host:test.docker.localhost",
},
{
desc: "HAProxy vhost available and LB compat disabled",
application: createApplication(
application: application(
appID("test"),
label("HAPROXY_0_VHOST", "foo.bar"),
),
@ -935,14 +948,14 @@ func TestMarathonGetFrontendRule(t *testing.T) {
},
{
desc: "HAProxy vhost available and LB compat enabled",
application: createApplication(label("HAPROXY_0_VHOST", "foo.bar")),
application: application(label("HAPROXY_0_VHOST", "foo.bar")),
marathonLBCompatibility: true,
expected: "Host:foo.bar",
},
{
desc: "frontend rule available",
application: createApplication(
application: application(
label(types.LabelFrontendRule, "Host:foo.bar"),
label("HAPROXY_0_VHOST", "unused"),
),
@ -975,12 +988,12 @@ func TestMarathonGetBackend(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(appID("/group/app")),
application: application(appID("/group/app")),
expected: "-group-app",
},
{
desc: "label existing",
application: createApplication(label(types.LabelBackend, "bar")),
application: application(label(types.LabelBackend, "bar")),
expected: "bar",
},
}
@ -1056,7 +1069,7 @@ func TestMarathonHasHealthCheckLabels(t *testing.T) {
c := c
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
app := createApplication()
app := application()
if c.value != nil {
app.AddLabel(types.LabelBackendHealthcheckPath, *c.value)
}
@ -1090,7 +1103,7 @@ func TestMarathonGetHealthCheckPath(t *testing.T) {
c := c
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
app := createApplication()
app := application()
if c.value != "" {
app.AddLabel(types.LabelBackendHealthcheckPath, c.value)
}
@ -1124,7 +1137,7 @@ func TestMarathonGetHealthCheckInterval(t *testing.T) {
c := c
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
app := createApplication()
app := application()
if c.value != "" {
app.AddLabel(types.LabelBackendHealthcheckInterval, c.value)
}
@ -1148,49 +1161,49 @@ func TestGetBackendServer(t *testing.T) {
}{
{
desc: "application without IP-per-task",
application: createApplication(),
application: application(),
expectedServer: host,
},
{
desc: "task hostname override",
application: createApplication(ipAddrPerTask(8000)),
application: application(ipAddrPerTask(8000)),
forceTaskHostname: true,
expectedServer: host,
},
{
desc: "task IP address missing",
application: createApplication(ipAddrPerTask(8000)),
task: createTask(),
application: application(ipAddrPerTask(8000)),
task: task(),
expectedServer: "",
},
{
desc: "single task IP address",
application: createApplication(ipAddrPerTask(8000)),
task: createTask(ipAddresses("1.1.1.1")),
application: application(ipAddrPerTask(8000)),
task: task(ipAddresses("1.1.1.1")),
expectedServer: "1.1.1.1",
},
{
desc: "multiple task IP addresses without index label",
application: createApplication(ipAddrPerTask(8000)),
task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")),
application: application(ipAddrPerTask(8000)),
task: task(ipAddresses("1.1.1.1", "2.2.2.2")),
expectedServer: "",
},
{
desc: "multiple task IP addresses with invalid index label",
application: createApplication(
application: application(
label("traefik.ipAddressIdx", "invalid"),
ipAddrPerTask(8000),
),
task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")),
task: task(ipAddresses("1.1.1.1", "2.2.2.2")),
expectedServer: "",
},
{
desc: "multiple task IP addresses with valid index label",
application: createApplication(
application: application(
label("traefik.ipAddressIdx", "1"),
ipAddrPerTask(8000),
),
task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")),
task: task(ipAddresses("1.1.1.1", "2.2.2.2")),
expectedServer: "2.2.2.2",
},
}
@ -1275,12 +1288,12 @@ func TestMarathonGetBasicAuth(t *testing.T) {
}{
{
desc: "label missing",
application: createApplication(),
application: application(),
expected: []string{},
},
{
desc: "label existing",
application: createApplication(label(types.LabelFrontendAuthBasic, "user:password")),
application: application(label(types.LabelFrontendAuthBasic, "user:password")),
expected: []string{"user:password"},
},
}

View file

@ -0,0 +1,122 @@
package marathon
import (
"time"
"github.com/containous/traefik/log"
marathon "github.com/gambol99/go-marathon"
)
const (
// readinessCheckDefaultTimeout is the default timeout for a readiness
// check if no check timeout is specified on the application spec. This
// should really never be the case, but better be safe than sorry.
readinessCheckDefaultTimeout time.Duration = 10 * time.Second
// readinessCheckSafetyMargin is some buffer duration to account for
// small offsets in readiness check execution.
readinessCheckSafetyMargin time.Duration = 5 * time.Second
readinessLogHeader string = "Marathon readiness check: "
)
type readinessChecker struct {
checkDefaultTimeout time.Duration
checkSafetyMargin time.Duration
traceLogging bool
}
func defaultReadinessChecker(isTraceLogging bool) *readinessChecker {
return &readinessChecker{
checkDefaultTimeout: readinessCheckDefaultTimeout,
checkSafetyMargin: readinessCheckSafetyMargin,
traceLogging: isTraceLogging,
}
}
func (rc *readinessChecker) Do(task marathon.Task, app marathon.Application) bool {
if rc == nil {
// Readiness checker disabled.
return true
}
switch {
case len(app.Deployments) == 0:
// We only care about readiness during deployments; post-deployment readiness
// can be covered by a periodic post-deployment probe (i.e., Traefik health checks).
rc.tracef("task %s app %s: ready = true [no deployment ongoing]", task.ID, app.ID)
return true
case app.ReadinessChecks == nil || len(*app.ReadinessChecks) == 0:
// Applications without configured readiness checks are always considered
// ready.
rc.tracef("task %s app %s: ready = true [no readiness checks on app]", task.ID, app.ID)
return true
}
// Loop through all readiness check results and return the results for
// matching task IDs.
if app.ReadinessCheckResults != nil {
for _, readinessCheckResult := range *app.ReadinessCheckResults {
if readinessCheckResult.TaskID == task.ID {
rc.tracef("task %s app %s: ready = %t [evaluating readiness check ready state]", task.ID, app.ID, readinessCheckResult.Ready)
return readinessCheckResult.Ready
}
}
}
// There's a corner case sometimes hit where the first new task of a
// deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding
// readiness check result being included in the API response. This only happens
// in a very short (yet unlucky) time frame and does not repeat for subsequent
// tasks of the same deployment.
// Complicating matters, the situation may occur for both initially deploying
// applications as well as rolling-upgraded ones where one or more tasks from
// a previous deployment exist already and are joined by new tasks from a
// subsequent deployment. We must always make sure that pre-existing tasks
// maintain their ready state while newly launched tasks must be considered
// unready until a check result appears.
// We distinguish the two cases by comparing the current time with the start
// time of the task: It should take Marathon at most one readiness check timeout
// interval (plus some safety margin to account for the delayed nature of
// distributed systems) for readiness check results to be returned along the API
// response. Once the task turns old enough, we assume it to be part of a
// pre-existing deployment and mark it as ready. Note that it is okay to err
// on the side of caution and consider a task unready until the safety time
// window has elapsed because a newly created task should be readiness-checked
// and be given a result fairly shortly after its creation (i.e., on the scale
// of seconds).
readinessCheckTimeoutSecs := (*app.ReadinessChecks)[0].TimeoutSeconds
readinessCheckTimeout := time.Duration(readinessCheckTimeoutSecs) * time.Second
if readinessCheckTimeout == 0 {
rc.tracef("task %s app %s: readiness check timeout not set, using default value %s", task.ID, app.ID, rc.checkDefaultTimeout)
readinessCheckTimeout = rc.checkDefaultTimeout
} else {
readinessCheckTimeout += rc.checkSafetyMargin
}
startTime, err := time.Parse(time.RFC3339, task.StartedAt)
if err != nil {
// An unparseable start time should never occur; if it does, we assume the
// problem should be surfaced as quickly as possible, which is easiest if
// we shun the task from rotation.
log.Warnf("Failed to parse start-time %s of task %s from application %s: %s (assuming unready)", task.StartedAt, task.ID, app.ID, err)
return false
}
since := time.Since(startTime)
if since < readinessCheckTimeout {
rc.tracef("task %s app %s: ready = false [task with start-time %s not within assumed check timeout window of %s (elapsed time since task start: %s)]", task.ID, app.ID, startTime.Format(time.RFC3339), readinessCheckTimeout, since)
return false
}
// Finally, we can be certain this task is not part of the deployment (i.e.,
// it's an old task that's going to transition into the TASK_KILLING and/or
// TASK_KILLED state as new tasks' readiness checks gradually turn green.)
rc.tracef("task %s app %s: ready = true [task with start-time %s not involved in deployment (elapsed time since task start: %s)]", task.ID, app.ID, startTime.Format(time.RFC3339), since)
return true
}
func (rc *readinessChecker) tracef(format string, args ...interface{}) {
if rc.traceLogging {
log.Debugf(readinessLogHeader+format, args...)
}
}

View file

@ -0,0 +1,134 @@
package marathon
import (
"testing"
"time"
"github.com/gambol99/go-marathon"
)
func testReadinessChecker() *readinessChecker {
return defaultReadinessChecker(false)
}
func TestDisabledReadinessChecker(t *testing.T) {
var rc *readinessChecker
tsk := task()
app := application(
deployments("deploymentId"),
readinessCheck(0),
readinessCheckResult(testTaskName, false),
)
if ready := rc.Do(tsk, app); ready == false {
t.Error("expected ready = true")
}
}
func TestEnabledReadinessChecker(t *testing.T) {
tests := []struct {
desc string
task marathon.Task
app marathon.Application
rc readinessChecker
expectedReady bool
}{
{
desc: "no deployment running",
task: task(),
app: application(),
expectedReady: true,
},
{
desc: "no readiness checks defined",
task: task(),
app: application(deployments("deploymentId")),
expectedReady: true,
},
{
desc: "readiness check result negative",
task: task(),
app: application(
deployments("deploymentId"),
readinessCheck(0),
readinessCheckResult("otherTaskID", true),
readinessCheckResult(testTaskName, false),
),
expectedReady: false,
},
{
desc: "readiness check result positive",
task: task(),
app: application(
deployments("deploymentId"),
readinessCheck(0),
readinessCheckResult("otherTaskID", false),
readinessCheckResult(testTaskName, true),
),
expectedReady: true,
},
{
desc: "no readiness check result with default timeout",
task: task(startedAtFromNow(3 * time.Minute)),
app: application(
deployments("deploymentId"),
readinessCheck(0),
),
rc: readinessChecker{
checkDefaultTimeout: 5 * time.Minute,
},
expectedReady: false,
},
{
desc: "no readiness check result with readiness check timeout",
task: task(startedAtFromNow(4 * time.Minute)),
app: application(
deployments("deploymentId"),
readinessCheck(3*time.Minute),
),
rc: readinessChecker{
checkSafetyMargin: 3 * time.Minute,
},
expectedReady: false,
},
{
desc: "invalid task start time",
task: task(startedAt("invalid")),
app: application(
deployments("deploymentId"),
readinessCheck(0),
),
expectedReady: false,
},
{
desc: "task not involved in deployment",
task: task(startedAtFromNow(1 * time.Hour)),
app: application(
deployments("deploymentId"),
readinessCheck(0),
),
rc: readinessChecker{
checkDefaultTimeout: 10 * time.Second,
},
expectedReady: true,
},
}
for _, test := range tests {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
rc := testReadinessChecker()
if test.rc.checkDefaultTimeout > 0 {
rc.checkDefaultTimeout = test.rc.checkDefaultTimeout
}
if test.rc.checkSafetyMargin > 0 {
rc.checkSafetyMargin = test.rc.checkSafetyMargin
}
actualReady := test.rc.Do(test.task, test.app)
if actualReady != test.expectedReady {
t.Errorf("actual ready = %t, expected ready = %t", actualReady, test.expectedReady)
}
})
}
}