1
0
Fork 0

Add UDP in providers with labels

This commit is contained in:
Julien Salleyron 2020-02-20 22:24:05 +01:00 committed by GitHub
parent a20a5f1a44
commit bb4de11c51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1440 additions and 68 deletions

View file

@ -49,21 +49,36 @@ func (p *Provider) buildConfiguration(ctx context.Context, applications *maratho
continue
}
var tcpOrUDP bool
if len(confFromLabel.TCP.Routers) > 0 || len(confFromLabel.TCP.Services) > 0 {
tcpOrUDP = true
err := p.buildTCPServiceConfiguration(ctxApp, app, extraConf, confFromLabel.TCP)
if err != nil {
logger.Error(err)
continue
}
provider.BuildTCPRouterConfiguration(ctxApp, confFromLabel.TCP)
if len(confFromLabel.HTTP.Routers) == 0 &&
len(confFromLabel.HTTP.Middlewares) == 0 &&
len(confFromLabel.HTTP.Services) == 0 {
configurations[app.ID] = confFromLabel
continue
}
if len(confFromLabel.UDP.Routers) > 0 || len(confFromLabel.UDP.Services) > 0 {
tcpOrUDP = true
err := p.buildUDPServiceConfiguration(ctxApp, app, extraConf, confFromLabel.UDP)
if err != nil {
logger.Error(err)
} else {
provider.BuildUDPRouterConfiguration(ctxApp, confFromLabel.UDP)
}
}
if tcpOrUDP && len(confFromLabel.HTTP.Routers) == 0 &&
len(confFromLabel.HTTP.Middlewares) == 0 &&
len(confFromLabel.HTTP.Services) == 0 {
configurations[app.ID] = confFromLabel
continue
}
err = p.buildServiceConfiguration(ctxApp, app, extraConf, confFromLabel.HTTP)
if err != nil {
logger.Error(err)
@ -116,14 +131,15 @@ func (p *Provider) buildServiceConfiguration(ctx context.Context, app marathon.A
}
for _, task := range app.Tasks {
if p.taskFilter(ctx, *task, app) {
server, err := p.getServer(app, *task, extraConf, defaultServer)
if err != nil {
log.FromContext(appCtx).Errorf("Skip task: %v", err)
continue
}
servers = append(servers, server)
if !p.taskFilter(ctx, *task, app) {
continue
}
server, err := p.getServer(app, *task, extraConf, defaultServer)
if err != nil {
log.FromContext(appCtx).Errorf("Skip task: %v", err)
continue
}
servers = append(servers, server)
}
if len(servers) == 0 {
return fmt.Errorf("no server for the service %s", serviceName)
@ -175,6 +191,47 @@ func (p *Provider) buildTCPServiceConfiguration(ctx context.Context, app maratho
return nil
}
func (p *Provider) buildUDPServiceConfiguration(ctx context.Context, app marathon.Application, extraConf configuration, conf *dynamic.UDPConfiguration) error {
appName := getServiceName(app)
appCtx := log.With(ctx, log.Str("ApplicationID", appName))
if len(conf.Services) == 0 {
conf.Services = make(map[string]*dynamic.UDPService)
lb := &dynamic.UDPServersLoadBalancer{}
conf.Services[appName] = &dynamic.UDPService{
LoadBalancer: lb,
}
}
for serviceName, service := range conf.Services {
var servers []dynamic.UDPServer
defaultServer := dynamic.UDPServer{}
if len(service.LoadBalancer.Servers) > 0 {
defaultServer = service.LoadBalancer.Servers[0]
}
for _, task := range app.Tasks {
if p.taskFilter(ctx, *task, app) {
server, err := p.getUDPServer(app, *task, extraConf, defaultServer)
if err != nil {
log.FromContext(appCtx).Errorf("Skip task: %v", err)
continue
}
servers = append(servers, server)
}
}
if len(servers) == 0 {
return fmt.Errorf("no server for the service %s", serviceName)
}
service.LoadBalancer.Servers = servers
}
return nil
}
func (p *Provider) keepApplication(ctx context.Context, extraConf configuration, labels map[string]string) bool {
logger := log.FromContext(ctx)
@ -229,6 +286,24 @@ func (p *Provider) getTCPServer(app marathon.Application, task marathon.Task, ex
return server, nil
}
func (p *Provider) getUDPServer(app marathon.Application, task marathon.Task, extraConf configuration, defaultServer dynamic.UDPServer) (dynamic.UDPServer, error) {
host, err := p.getServerHost(task, app, extraConf)
if len(host) == 0 {
return dynamic.UDPServer{}, err
}
port, err := getPort(task, app, defaultServer.Port)
if err != nil {
return dynamic.UDPServer{}, err
}
server := dynamic.UDPServer{
Address: net.JoinHostPort(host, port),
}
return server, nil
}
func (p *Provider) getServer(app marathon.Application, task marathon.Task, extraConf configuration, defaultServer dynamic.Server) (dynamic.Server, error) {
host, err := p.getServerHost(task, app, extraConf)
if len(host) == 0 {