Merge pull request #1524 from jangie/update-dep-go-marathon

[Marathon] Bump go-marathon dep
This commit is contained in:
Emile Vauge 2017-05-03 13:06:36 +02:00 committed by GitHub
commit dcc4d92983
11 changed files with 268 additions and 91 deletions

8
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 0f1c54f36d88f4625c048686f733842d09d48525b1cf84dc05d4acc698cfd86f hash: 1aa32496b865dda72d76c7cba3458f1c2c467acf0b99aab4609323f109aa64f6
updated: 2017-04-11T17:02:21.540487905+02:00 updated: 2017-05-02T11:46:23.91434995-04:00
imports: imports:
- name: cloud.google.com/go - name: cloud.google.com/go
version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c
@ -95,6 +95,8 @@ imports:
subpackages: subpackages:
- Godeps/_workspace/src/github.com/coreos/go-systemd/journal - Godeps/_workspace/src/github.com/coreos/go-systemd/journal
- Godeps/_workspace/src/github.com/coreos/pkg/capnslog - Godeps/_workspace/src/github.com/coreos/pkg/capnslog
- Godeps/_workspace/src/github.com/ugorji/go/codec
- Godeps/_workspace/src/golang.org/x/net/context
- client - client
- pkg/fileutil - pkg/fileutil
- pkg/pathutil - pkg/pathutil
@ -199,7 +201,7 @@ imports:
- name: github.com/fatih/color - name: github.com/fatih/color
version: 9131ab34cf20d2f6d83fdc67168a5430d1c7dc23 version: 9131ab34cf20d2f6d83fdc67168a5430d1c7dc23
- name: github.com/gambol99/go-marathon - name: github.com/gambol99/go-marathon
version: 6b00a5b651b1beb2c6821863f7c60df490bd46c8 version: d672c6fbb499596869d95146a26e7d0746c06c54
- name: github.com/ghodss/yaml - name: github.com/ghodss/yaml
version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee
- name: github.com/go-ini/ini - name: github.com/go-ini/ini

View file

@ -91,7 +91,7 @@ import:
- package: k8s.io/client-go - package: k8s.io/client-go
version: v2.0.0 version: v2.0.0
- package: github.com/gambol99/go-marathon - package: github.com/gambol99/go-marathon
version: ^0.5.1 version: d672c6fbb499596869d95146a26e7d0746c06c54
- package: github.com/ArthurHlt/go-eureka-client - package: github.com/ArthurHlt/go-eureka-client
subpackages: subpackages:
- eureka - eureka

View file

@ -56,43 +56,47 @@ type Port struct {
// Application is the definition for an application in marathon // Application is the definition for an application in marathon
type Application struct { type Application struct {
ID string `json:"id,omitempty"` ID string `json:"id,omitempty"`
Cmd *string `json:"cmd,omitempty"` Cmd *string `json:"cmd,omitempty"`
Args *[]string `json:"args,omitempty"` Args *[]string `json:"args,omitempty"`
Constraints *[][]string `json:"constraints,omitempty"` Constraints *[][]string `json:"constraints,omitempty"`
Container *Container `json:"container,omitempty"` Container *Container `json:"container,omitempty"`
CPUs float64 `json:"cpus,omitempty"` CPUs float64 `json:"cpus,omitempty"`
Disk *float64 `json:"disk,omitempty"` GPUs *float64 `json:"gpus,omitempty"`
Env *map[string]string `json:"env,omitempty"` Disk *float64 `json:"disk,omitempty"`
Executor *string `json:"executor,omitempty"` Env *map[string]string `json:"env,omitempty"`
HealthChecks *[]HealthCheck `json:"healthChecks,omitempty"` Executor *string `json:"executor,omitempty"`
Instances *int `json:"instances,omitempty"` HealthChecks *[]HealthCheck `json:"healthChecks,omitempty"`
Mem *float64 `json:"mem,omitempty"` ReadinessChecks *[]ReadinessCheck `json:"readinessChecks,omitempty"`
Tasks []*Task `json:"tasks,omitempty"` Instances *int `json:"instances,omitempty"`
Ports []int `json:"ports"` Mem *float64 `json:"mem,omitempty"`
PortDefinitions *[]PortDefinition `json:"portDefinitions,omitempty"` Tasks []*Task `json:"tasks,omitempty"`
RequirePorts *bool `json:"requirePorts,omitempty"` Ports []int `json:"ports"`
BackoffSeconds *float64 `json:"backoffSeconds,omitempty"` PortDefinitions *[]PortDefinition `json:"portDefinitions,omitempty"`
BackoffFactor *float64 `json:"backoffFactor,omitempty"` RequirePorts *bool `json:"requirePorts,omitempty"`
MaxLaunchDelaySeconds *float64 `json:"maxLaunchDelaySeconds,omitempty"` BackoffSeconds *float64 `json:"backoffSeconds,omitempty"`
TaskKillGracePeriodSeconds *float64 `json:"taskKillGracePeriodSeconds,omitempty"` BackoffFactor *float64 `json:"backoffFactor,omitempty"`
Deployments []map[string]string `json:"deployments,omitempty"` MaxLaunchDelaySeconds *float64 `json:"maxLaunchDelaySeconds,omitempty"`
Dependencies []string `json:"dependencies"` TaskKillGracePeriodSeconds *float64 `json:"taskKillGracePeriodSeconds,omitempty"`
TasksRunning int `json:"tasksRunning,omitempty"` Deployments []map[string]string `json:"deployments,omitempty"`
TasksStaged int `json:"tasksStaged,omitempty"` // Available when embedding readiness information through query parameter.
TasksHealthy int `json:"tasksHealthy,omitempty"` ReadinessCheckResults *[]ReadinessCheckResult `json:"readinessCheckResults,omitempty"`
TasksUnhealthy int `json:"tasksUnhealthy,omitempty"` Dependencies []string `json:"dependencies"`
TaskStats map[string]TaskStats `json:"taskStats,omitempty"` TasksRunning int `json:"tasksRunning,omitempty"`
User string `json:"user,omitempty"` TasksStaged int `json:"tasksStaged,omitempty"`
UpgradeStrategy *UpgradeStrategy `json:"upgradeStrategy,omitempty"` TasksHealthy int `json:"tasksHealthy,omitempty"`
Uris *[]string `json:"uris,omitempty"` TasksUnhealthy int `json:"tasksUnhealthy,omitempty"`
Version string `json:"version,omitempty"` TaskStats map[string]TaskStats `json:"taskStats,omitempty"`
VersionInfo *VersionInfo `json:"versionInfo,omitempty"` User string `json:"user,omitempty"`
Labels *map[string]string `json:"labels,omitempty"` UpgradeStrategy *UpgradeStrategy `json:"upgradeStrategy,omitempty"`
AcceptedResourceRoles []string `json:"acceptedResourceRoles,omitempty"` Uris *[]string `json:"uris,omitempty"`
LastTaskFailure *LastTaskFailure `json:"lastTaskFailure,omitempty"` Version string `json:"version,omitempty"`
Fetch *[]Fetch `json:"fetch,omitempty"` VersionInfo *VersionInfo `json:"versionInfo,omitempty"`
IPAddressPerTask *IPAddressPerTask `json:"ipAddress,omitempty"` Labels *map[string]string `json:"labels,omitempty"`
AcceptedResourceRoles []string `json:"acceptedResourceRoles,omitempty"`
LastTaskFailure *LastTaskFailure `json:"lastTaskFailure,omitempty"`
Fetch *[]Fetch `json:"fetch,omitempty"`
IPAddressPerTask *IPAddressPerTask `json:"ipAddress,omitempty"`
} }
// ApplicationVersions is a collection of application versions for a specific app in marathon // ApplicationVersions is a collection of application versions for a specific app in marathon
@ -180,6 +184,22 @@ func (r *Application) CPU(cpu float64) *Application {
return r return r
} }
// SetGPUs set the amount of GPU per instance which is assigned to the application
// gpu: the GPU (check MESOS docs) per instance
func (r *Application) SetGPUs(gpu float64) *Application {
r.GPUs = &gpu
return r
}
// EmptyGPUs explicitly empties GPUs -- use this if you need to empty
// gpus of an application that already has gpus set (setting port definitions to nil will
// keep the current value)
func (r *Application) EmptyGPUs() *Application {
g := 0.0
r.GPUs = &g
return r
}
// Storage sets the amount of disk space the application is assigned, which for docker // Storage sets the amount of disk space the application is assigned, which for docker
// application I don't believe is relevant // application I don't believe is relevant
// disk: the disk space in MB // disk: the disk space in MB
@ -388,6 +408,26 @@ func (r *Application) HasHealthChecks() bool {
return r.HealthChecks != nil && len(*r.HealthChecks) > 0 return r.HealthChecks != nil && len(*r.HealthChecks) > 0
} }
// AddReadinessCheck adds a readiness check.
func (r *Application) AddReadinessCheck(readinessCheck ReadinessCheck) *Application {
if r.ReadinessChecks == nil {
r.EmptyReadinessChecks()
}
readinessChecks := *r.ReadinessChecks
readinessChecks = append(readinessChecks, readinessCheck)
r.ReadinessChecks = &readinessChecks
return r
}
// EmptyReadinessChecks empties the readiness checks.
func (r *Application) EmptyReadinessChecks() *Application {
r.ReadinessChecks = &[]ReadinessCheck{}
return r
}
// DeploymentIDs retrieves the application deployments IDs // DeploymentIDs retrieves the application deployments IDs
func (r *Application) DeploymentIDs() []*DeploymentID { func (r *Application) DeploymentIDs() []*DeploymentID {
var deployments []*DeploymentID var deployments []*DeploymentID

View file

@ -174,14 +174,19 @@ type marathonClient struct {
ipAddress string ipAddress string
// the http server // the http server
eventsHTTP *http.Server eventsHTTP *http.Server
// the http client use for making requests
httpClient *http.Client
// the marathon hosts // the marathon hosts
hosts *cluster hosts *cluster
// a map of service you wish to listen to // a map of service you wish to listen to
listeners map[EventsChannel]EventsChannelContext listeners map[EventsChannel]EventsChannelContext
// a custom logger for debug log messages // a custom logger for debug log messages
debugLog *log.Logger debugLog *log.Logger
// the marathon HTTP client to ensure consistency in requests
client *httpClient
}
type httpClient struct {
// the configuration for the marathon HTTP client
config Config
} }
// NewClient creates a new marathon client // NewClient creates a new marathon client
@ -197,8 +202,11 @@ func NewClient(config Config) (Marathon, error) {
config.PollingWaitTime = defaultPollingWaitTime config.PollingWaitTime = defaultPollingWaitTime
} }
// step: setup shared client
client := &httpClient{config: config}
// step: create a new cluster // step: create a new cluster
hosts, err := newCluster(config.HTTPClient, config.URL) hosts, err := newCluster(client, config.URL, config.DCOSToken != "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -209,11 +217,11 @@ func NewClient(config Config) (Marathon, error) {
} }
return &marathonClient{ return &marathonClient{
config: config, config: config,
listeners: make(map[EventsChannel]EventsChannelContext), listeners: make(map[EventsChannel]EventsChannelContext),
hosts: hosts, hosts: hosts,
httpClient: config.HTTPClient, debugLog: log.New(debugLogOutput, "", 0),
debugLog: log.New(debugLogOutput, "", 0), client: client,
}, nil }, nil
} }
@ -246,34 +254,25 @@ func (r *marathonClient) apiDelete(uri string, post, result interface{}) error {
return r.apiCall("DELETE", uri, post, result) return r.apiCall("DELETE", uri, post, result)
} }
func (r *marathonClient) apiCall(method, uri string, body, result interface{}) error { func (r *marathonClient) apiCall(method, url string, body, result interface{}) error {
for { for {
// step: grab a member from the cluster and attempt to perform the request
member, err := r.hosts.getMember()
if err != nil {
return ErrMarathonDown
}
// step: Create the endpoint url
url := fmt.Sprintf("%s/%s", member, uri)
if r.config.DCOSToken != "" {
url = fmt.Sprintf("%s/%s", member+"/marathon", uri)
}
// step: marshall the request to json // step: marshall the request to json
var requestBody []byte var requestBody []byte
var err error
if body != nil { if body != nil {
if requestBody, err = json.Marshal(body); err != nil { if requestBody, err = json.Marshal(body); err != nil {
return err return err
} }
} }
// step: create the api request // step: create the API request
request, err := r.buildAPIRequest(method, url, bytes.NewReader(requestBody)) request, member, err := r.buildAPIRequest(method, url, bytes.NewReader(requestBody))
if err != nil { if err != nil {
return err return err
} }
response, err := r.httpClient.Do(request)
// step: perform the API request
response, err := r.client.Do(request)
if err != nil { if err != nil {
r.hosts.markDown(member) r.hosts.markDown(member)
// step: attempt the request on another member // step: attempt the request on another member
@ -318,20 +317,38 @@ func (r *marathonClient) apiCall(method, uri string, body, result interface{}) e
} }
// buildAPIRequest creates a default API request // buildAPIRequest creates a default API request
func (r *marathonClient) buildAPIRequest(method, url string, reader io.Reader) (*http.Request, error) { func (r *marathonClient) buildAPIRequest(method, uri string, reader io.Reader) (request *http.Request, member string, err error) {
// Make the http request to Marathon // Grab a member from the cluster
request, err := http.NewRequest(method, url, reader) member, err = r.hosts.getMember()
if err != nil {
return nil, "", ErrMarathonDown
}
// Build the HTTP request to Marathon
request, err = r.client.buildMarathonRequest(method, member, uri, reader)
if err != nil {
return nil, member, err
}
return request, member, nil
}
func (rc *httpClient) buildMarathonRequest(method string, member string, uri string, reader io.Reader) (request *http.Request, err error) {
// Create the endpoint URL
url := fmt.Sprintf("%s/%s", member, uri)
// Instantiate an HTTP request
request, err = http.NewRequest(method, url, reader)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Add any basic auth and the content headers // Add any basic auth and the content headers
if r.config.HTTPBasicAuthUser != "" && r.config.HTTPBasicPassword != "" { if rc.config.HTTPBasicAuthUser != "" && rc.config.HTTPBasicPassword != "" {
request.SetBasicAuth(r.config.HTTPBasicAuthUser, r.config.HTTPBasicPassword) request.SetBasicAuth(rc.config.HTTPBasicAuthUser, rc.config.HTTPBasicPassword)
} }
if r.config.DCOSToken != "" { if rc.config.DCOSToken != "" {
request.Header.Add("Authorization", "token="+r.config.DCOSToken) request.Header.Add("Authorization", "token="+rc.config.DCOSToken)
} }
request.Header.Add("Content-Type", "application/json") request.Header.Add("Content-Type", "application/json")
@ -340,6 +357,10 @@ func (r *marathonClient) buildAPIRequest(method, url string, reader io.Reader) (
return request, nil return request, nil
} }
func (rc *httpClient) Do(request *http.Request) (response *http.Response, err error) {
return rc.config.HTTPClient.Do(request)
}
var oneLogLineRegex = regexp.MustCompile(`(?m)^\s*`) var oneLogLineRegex = regexp.MustCompile(`(?m)^\s*`)
// oneLogLine removes indentation at the beginning of each line and // oneLogLine removes indentation at the beginning of each line and

View file

@ -18,7 +18,6 @@ package marathon
import ( import (
"fmt" "fmt"
"net/http"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
@ -38,8 +37,8 @@ type cluster struct {
sync.RWMutex sync.RWMutex
// a collection of nodes // a collection of nodes
members []*member members []*member
// the http client // the marathon HTTP client to ensure consistency in requests
client *http.Client client *httpClient
} }
// member represents an individual endpoint // member represents an individual endpoint
@ -51,7 +50,7 @@ type member struct {
} }
// newCluster returns a new marathon cluster // newCluster returns a new marathon cluster
func newCluster(client *http.Client, marathonURL string) (*cluster, error) { func newCluster(client *httpClient, marathonURL string, isDCOS bool) (*cluster, error) {
// step: extract and basic validate the endpoints // step: extract and basic validate the endpoints
var members []*member var members []*member
var defaultProto string var defaultProto string
@ -83,6 +82,13 @@ func newCluster(client *http.Client, marathonURL string) (*cluster, error) {
return nil, newInvalidEndpointError("endpoint: %s must have a host", endpoint) return nil, newInvalidEndpointError("endpoint: %s must have a host", endpoint)
} }
// step: if DCOS is set and no path is given, set the default DCOS path.
// done in order to maintain compatibility with automatic addition of the
// default DCOS path.
if isDCOS && strings.TrimLeft(u.Path, "/") == "" {
u.Path = defaultDCOSPath
}
// step: create a new node for this endpoint // step: create a new node for this endpoint
members = append(members, &member{endpoint: u.String()}) members = append(members, &member{endpoint: u.String()})
} }
@ -125,9 +131,12 @@ func (c *cluster) markDown(endpoint string) {
func (c *cluster) healthCheckNode(node *member) { func (c *cluster) healthCheckNode(node *member) {
// step: wait for the node to become active ... we are assuming a /ping is enough here // step: wait for the node to become active ... we are assuming a /ping is enough here
for { for {
res, err := c.client.Get(fmt.Sprintf("%s/ping", node.endpoint)) req, err := c.client.buildMarathonRequest("GET", node.endpoint, "/ping", nil)
if err == nil && res.StatusCode == 200 { if err == nil {
break res, err := c.client.Do(req)
if err == nil && res.StatusCode == 200 {
break
}
} }
<-time.After(time.Duration(5 * time.Second)) <-time.After(time.Duration(5 * time.Second))
} }

View file

@ -25,6 +25,8 @@ import (
const defaultPollingWaitTime = 500 * time.Millisecond const defaultPollingWaitTime = 500 * time.Millisecond
const defaultDCOSPath = "marathon"
// EventsTransport describes which transport should be used to deliver Marathon events // EventsTransport describes which transport should be used to deliver Marathon events
type EventsTransport int type EventsTransport int

View file

@ -42,8 +42,9 @@ type DeploymentID struct {
// DeploymentStep is a step in the application deployment plan // DeploymentStep is a step in the application deployment plan
type DeploymentStep struct { type DeploymentStep struct {
Action string `json:"action"` Action string `json:"action"`
App string `json:"app"` App string `json:"app"`
ReadinessCheckResults *[]ReadinessCheckResult `json:"readinessCheckResults,omitempty"`
} }
// StepActions is a series of deployment steps // StepActions is a series of deployment steps

View file

@ -273,9 +273,10 @@ type EventGroupChangeFailed struct {
// EventDeploymentSuccess describes a 'deployment_success' event. // EventDeploymentSuccess describes a 'deployment_success' event.
type EventDeploymentSuccess struct { type EventDeploymentSuccess struct {
ID string `json:"id"` ID string `json:"id"`
EventType string `json:"eventType"` EventType string `json:"eventType"`
Timestamp string `json:"timestamp"` Timestamp string `json:"timestamp"`
Plan *DeploymentPlan `json:"plan"`
} }
// EventDeploymentFailed describes a 'deployment_failed' event. // EventDeploymentFailed describes a 'deployment_failed' event.

View file

@ -27,6 +27,7 @@ type HealthCheck struct {
GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"` GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"`
IntervalSeconds int `json:"intervalSeconds,omitempty"` IntervalSeconds int `json:"intervalSeconds,omitempty"`
TimeoutSeconds int `json:"timeoutSeconds,omitempty"` TimeoutSeconds int `json:"timeoutSeconds,omitempty"`
IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,ommitempty"`
} }
// SetCommand sets the given command on the health check. // SetCommand sets the given command on the health check.
@ -59,6 +60,12 @@ func (h HealthCheck) SetMaxConsecutiveFailures(i int) HealthCheck {
return h return h
} }
// SetIgnoreHTTP1xx sets ignore http 1xx on the health check.
func (h HealthCheck) SetIgnoreHTTP1xx(ignore bool) HealthCheck {
h.IgnoreHTTP1xx = &ignore
return h
}
// NewDefaultHealthCheck creates a default application health check // NewDefaultHealthCheck creates a default application health check
func NewDefaultHealthCheck() *HealthCheck { func NewDefaultHealthCheck() *HealthCheck {
portIndex := 0 portIndex := 0

99
vendor/github.com/gambol99/go-marathon/readiness.go generated vendored Normal file
View file

@ -0,0 +1,99 @@
/*
Copyright 2017 Rohith All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package marathon
import "time"
// ReadinessCheck represents a readiness check.
type ReadinessCheck struct {
Name *string `json:"name,omitempty"`
Protocol string `json:"protocol,omitempty"`
Path string `json:"path,omitempty"`
PortName string `json:"portName,omitempty"`
IntervalSeconds int `json:"intervalSeconds,omitempty"`
TimeoutSeconds int `json:"timeoutSeconds,omitempty"`
HTTPStatusCodesForReady *[]int `json:"httpStatusCodesForReady,omitempty"`
PreserveLastResponse *bool `json:"preserveLastResponse,omitempty"`
}
// SetName sets the name on the readiness check.
func (rc *ReadinessCheck) SetName(name string) *ReadinessCheck {
rc.Name = &name
return rc
}
// SetProtocol sets the protocol on the readiness check.
func (rc *ReadinessCheck) SetProtocol(proto string) *ReadinessCheck {
rc.Protocol = proto
return rc
}
// SetPath sets the path on the readiness check.
func (rc *ReadinessCheck) SetPath(p string) *ReadinessCheck {
rc.Path = p
return rc
}
// SetPortName sets the port name on the readiness check.
func (rc *ReadinessCheck) SetPortName(name string) *ReadinessCheck {
rc.PortName = name
return rc
}
// SetInterval sets the interval on the readiness check.
func (rc *ReadinessCheck) SetInterval(interval time.Duration) *ReadinessCheck {
secs := int(interval.Seconds())
rc.IntervalSeconds = secs
return rc
}
// SetTimeout sets the timeout on the readiness check.
func (rc *ReadinessCheck) SetTimeout(timeout time.Duration) *ReadinessCheck {
secs := int(timeout.Seconds())
rc.TimeoutSeconds = secs
return rc
}
// SetHTTPStatusCodesForReady sets the HTTP status codes for ready on the
// readiness check.
func (rc *ReadinessCheck) SetHTTPStatusCodesForReady(codes []int) *ReadinessCheck {
rc.HTTPStatusCodesForReady = &codes
return rc
}
// SetPreserveLastResponse sets the preserve last response flag on the
// readiness check.
func (rc *ReadinessCheck) SetPreserveLastResponse(preserve bool) *ReadinessCheck {
rc.PreserveLastResponse = &preserve
return rc
}
// ReadinessLastResponse holds the result of the last response embedded in a
// readiness check result.
type ReadinessLastResponse struct {
Body string `json:"body"`
ContentType string `json:"contentType"`
Status int `json:"status"`
}
// ReadinessCheckResult is the result of a readiness check.
type ReadinessCheckResult struct {
Name string `json:"name"`
TaskID string `json:"taskId"`
Ready bool `json:"ready"`
LastResponse ReadinessLastResponse `json:"lastResponse,omitempty"`
}

View file

@ -167,19 +167,14 @@ func (r *marathonClient) registerSSESubscription() error {
if r.subscribedToSSE { if r.subscribedToSSE {
return nil return nil
} }
// Get a member from the cluster
marathon, err := r.hosts.getMember()
if err != nil {
return err
}
request, err := r.buildAPIRequest("GET", fmt.Sprintf("%s/%s", marathon, marathonAPIEventStream), nil) request, _, err := r.buildAPIRequest("GET", marathonAPIEventStream, nil)
if err != nil { if err != nil {
return err return err
} }
// Try to connect to stream, reusing the http client settings // Try to connect to stream, reusing the http client settings
stream, err := eventsource.SubscribeWith("", r.httpClient, request) stream, err := eventsource.SubscribeWith("", r.config.HTTPClient, request)
if err != nil { if err != nil {
return err return err
} }