Add plugin's support for provider
Co-authored-by: Julien Salleyron <julien@traefik.io>
This commit is contained in:
parent
de2437cfec
commit
63ef0f1cee
24 changed files with 928 additions and 116 deletions
|
@ -4,11 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/traefik/yaegi/interp"
|
||||
"github.com/traefik/yaegi/stdlib"
|
||||
)
|
||||
|
@ -34,13 +30,15 @@ type pluginContext struct {
|
|||
|
||||
// Builder is a plugin builder.
|
||||
type Builder struct {
|
||||
descriptors map[string]pluginContext
|
||||
middlewareDescriptors map[string]pluginContext
|
||||
providerDescriptors map[string]pluginContext
|
||||
}
|
||||
|
||||
// NewBuilder creates a new Builder.
|
||||
func NewBuilder(client *Client, plugins map[string]Descriptor, devPlugin *DevPlugin) (*Builder, error) {
|
||||
pb := &Builder{
|
||||
descriptors: map[string]pluginContext{},
|
||||
middlewareDescriptors: map[string]pluginContext{},
|
||||
providerDescriptors: map[string]pluginContext{},
|
||||
}
|
||||
|
||||
for pName, desc := range plugins {
|
||||
|
@ -52,17 +50,30 @@ func NewBuilder(client *Client, plugins map[string]Descriptor, devPlugin *DevPlu
|
|||
|
||||
i := interp.New(interp.Options{GoPath: client.GoPath()})
|
||||
i.Use(stdlib.Symbols)
|
||||
i.Use(ppSymbols())
|
||||
|
||||
_, err = i.Eval(fmt.Sprintf(`import "%s"`, manifest.Import))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s: failed to import plugin code %q: %w", desc.ModuleName, manifest.Import, err)
|
||||
}
|
||||
|
||||
pb.descriptors[pName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: client.GoPath(),
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
switch manifest.Type {
|
||||
case "middleware":
|
||||
pb.middlewareDescriptors[pName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: client.GoPath(),
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
}
|
||||
case "provider":
|
||||
pb.providerDescriptors[pName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: client.GoPath(),
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknow plugin type: %s", manifest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,101 +85,32 @@ func NewBuilder(client *Client, plugins map[string]Descriptor, devPlugin *DevPlu
|
|||
|
||||
i := interp.New(interp.Options{GoPath: devPlugin.GoPath})
|
||||
i.Use(stdlib.Symbols)
|
||||
i.Use(ppSymbols())
|
||||
|
||||
_, err = i.Eval(fmt.Sprintf(`import "%s"`, manifest.Import))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s: failed to import plugin code %q: %w", devPlugin.ModuleName, manifest.Import, err)
|
||||
}
|
||||
|
||||
pb.descriptors[devPluginName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: devPlugin.GoPath,
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
switch manifest.Type {
|
||||
case "middleware":
|
||||
pb.middlewareDescriptors[devPluginName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: devPlugin.GoPath,
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
}
|
||||
case "provider":
|
||||
pb.providerDescriptors[devPluginName] = pluginContext{
|
||||
interpreter: i,
|
||||
GoPath: devPlugin.GoPath,
|
||||
Import: manifest.Import,
|
||||
BasePkg: manifest.BasePkg,
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknow plugin type: %s", manifest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
// Build builds a plugin.
|
||||
func (b Builder) Build(pName string, config map[string]interface{}, middlewareName string) (Constructor, error) {
|
||||
if b.descriptors == nil {
|
||||
return nil, fmt.Errorf("plugin: no plugin definition in the static configuration: %s", pName)
|
||||
}
|
||||
|
||||
descriptor, ok := b.descriptors[pName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin: unknown plugin type: %s", pName)
|
||||
}
|
||||
|
||||
m, err := newMiddleware(descriptor, config, middlewareName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m.NewHandler, err
|
||||
}
|
||||
|
||||
// Middleware is a HTTP handler plugin wrapper.
|
||||
type Middleware struct {
|
||||
middlewareName string
|
||||
fnNew reflect.Value
|
||||
config reflect.Value
|
||||
}
|
||||
|
||||
func newMiddleware(descriptor pluginContext, config map[string]interface{}, middlewareName string) (*Middleware, error) {
|
||||
basePkg := descriptor.BasePkg
|
||||
if basePkg == "" {
|
||||
basePkg = strings.ReplaceAll(path.Base(descriptor.Import), "-", "_")
|
||||
}
|
||||
|
||||
vConfig, err := descriptor.interpreter.Eval(basePkg + `.CreateConfig()`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plugin: failed to eval CreateConfig: %w", err)
|
||||
}
|
||||
|
||||
cfg := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToSliceHookFunc(","),
|
||||
WeaklyTypedInput: true,
|
||||
Result: vConfig.Interface(),
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plugin: failed to create configuration decoder: %w", err)
|
||||
}
|
||||
|
||||
err = decoder.Decode(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plugin: failed to decode configuration: %w", err)
|
||||
}
|
||||
|
||||
fnNew, err := descriptor.interpreter.Eval(basePkg + `.New`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plugin: failed to eval New: %w", err)
|
||||
}
|
||||
|
||||
return &Middleware{
|
||||
middlewareName: middlewareName,
|
||||
fnNew: fnNew,
|
||||
config: vConfig,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewHandler creates a new HTTP handler.
|
||||
func (m *Middleware) NewHandler(ctx context.Context, next http.Handler) (http.Handler, error) {
|
||||
args := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(next), m.config, reflect.ValueOf(m.middlewareName)}
|
||||
results := m.fnNew.Call(args)
|
||||
|
||||
if len(results) > 1 && results[1].Interface() != nil {
|
||||
return nil, results[1].Interface().(error)
|
||||
}
|
||||
|
||||
handler, ok := results[0].Interface().(http.Handler)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin: invalid handler type: %T", results[0].Interface())
|
||||
}
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
|
94
pkg/plugins/middlewares.go
Normal file
94
pkg/plugins/middlewares.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
// Build builds a middleware plugin.
|
||||
func (b Builder) Build(pName string, config map[string]interface{}, middlewareName string) (Constructor, error) {
|
||||
if b.middlewareDescriptors == nil {
|
||||
return nil, fmt.Errorf("no plugin definition in the static configuration: %s", pName)
|
||||
}
|
||||
|
||||
descriptor, ok := b.middlewareDescriptors[pName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown plugin type: %s", pName)
|
||||
}
|
||||
|
||||
m, err := newMiddleware(descriptor, config, middlewareName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m.NewHandler, err
|
||||
}
|
||||
|
||||
// Middleware is a HTTP handler plugin wrapper.
|
||||
type Middleware struct {
|
||||
middlewareName string
|
||||
fnNew reflect.Value
|
||||
config reflect.Value
|
||||
}
|
||||
|
||||
func newMiddleware(descriptor pluginContext, config map[string]interface{}, middlewareName string) (*Middleware, error) {
|
||||
basePkg := descriptor.BasePkg
|
||||
if basePkg == "" {
|
||||
basePkg = strings.ReplaceAll(path.Base(descriptor.Import), "-", "_")
|
||||
}
|
||||
|
||||
vConfig, err := descriptor.interpreter.Eval(basePkg + `.CreateConfig()`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to eval CreateConfig: %w", err)
|
||||
}
|
||||
|
||||
cfg := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToSliceHookFunc(","),
|
||||
WeaklyTypedInput: true,
|
||||
Result: vConfig.Interface(),
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create configuration decoder: %w", err)
|
||||
}
|
||||
|
||||
err = decoder.Decode(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode configuration: %w", err)
|
||||
}
|
||||
|
||||
fnNew, err := descriptor.interpreter.Eval(basePkg + `.New`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to eval New: %w", err)
|
||||
}
|
||||
|
||||
return &Middleware{
|
||||
middlewareName: middlewareName,
|
||||
fnNew: fnNew,
|
||||
config: vConfig,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewHandler creates a new HTTP handler.
|
||||
func (m *Middleware) NewHandler(ctx context.Context, next http.Handler) (http.Handler, error) {
|
||||
args := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(next), m.config, reflect.ValueOf(m.middlewareName)}
|
||||
results := m.fnNew.Call(args)
|
||||
|
||||
if len(results) > 1 && results[1].Interface() != nil {
|
||||
return nil, results[1].Interface().(error)
|
||||
}
|
||||
|
||||
handler, ok := results[0].Interface().(http.Handler)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid handler type: %T", results[0].Interface())
|
||||
}
|
||||
|
||||
return handler, nil
|
||||
}
|
|
@ -81,7 +81,10 @@ func checkDevPluginConfiguration(plugin *DevPlugin) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if m.Type != "middleware" {
|
||||
switch m.Type {
|
||||
case "middleware", "provider":
|
||||
// noop
|
||||
default:
|
||||
return errors.New("unsupported type")
|
||||
}
|
||||
|
||||
|
|
196
pkg/plugins/providers.go
Normal file
196
pkg/plugins/providers.go
Normal file
|
@ -0,0 +1,196 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/provider"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
)
|
||||
|
||||
// PP the interface of a plugin's provider.
|
||||
type PP interface {
|
||||
Init() error
|
||||
Provide(cfgChan chan<- json.Marshaler) error
|
||||
Stop() error
|
||||
}
|
||||
|
||||
type _PP struct {
|
||||
WInit func() error
|
||||
WProvide func(cfgChan chan<- json.Marshaler) error
|
||||
WStop func() error
|
||||
}
|
||||
|
||||
func (p _PP) Init() error {
|
||||
return p.WInit()
|
||||
}
|
||||
|
||||
func (p _PP) Provide(cfgChan chan<- json.Marshaler) error {
|
||||
return p.WProvide(cfgChan)
|
||||
}
|
||||
|
||||
func (p _PP) Stop() error {
|
||||
return p.WStop()
|
||||
}
|
||||
|
||||
func ppSymbols() map[string]map[string]reflect.Value {
|
||||
return map[string]map[string]reflect.Value{
|
||||
"github.com/traefik/traefik/v2/pkg/plugins": {
|
||||
"PP": reflect.ValueOf((*PP)(nil)),
|
||||
"_PP": reflect.ValueOf((*_PP)(nil)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BuildProvider builds a plugin's provider.
|
||||
func (b Builder) BuildProvider(pName string, config map[string]interface{}) (provider.Provider, error) {
|
||||
if b.providerDescriptors == nil {
|
||||
return nil, fmt.Errorf("no plugin definition in the static configuration: %s", pName)
|
||||
}
|
||||
|
||||
descriptor, ok := b.providerDescriptors[pName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown plugin type: %s", pName)
|
||||
}
|
||||
|
||||
return newProvider(descriptor, config, "plugin-"+pName)
|
||||
}
|
||||
|
||||
// Provider is a plugin's provider wrapper.
|
||||
type Provider struct {
|
||||
name string
|
||||
pp PP
|
||||
}
|
||||
|
||||
func newProvider(descriptor pluginContext, config map[string]interface{}, providerName string) (*Provider, error) {
|
||||
basePkg := descriptor.BasePkg
|
||||
if basePkg == "" {
|
||||
basePkg = strings.ReplaceAll(path.Base(descriptor.Import), "-", "_")
|
||||
}
|
||||
|
||||
vConfig, err := descriptor.interpreter.Eval(basePkg + `.CreateConfig()`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to eval CreateConfig: %w", err)
|
||||
}
|
||||
|
||||
cfg := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToSliceHookFunc(","),
|
||||
WeaklyTypedInput: true,
|
||||
Result: vConfig.Interface(),
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create configuration decoder: %w", err)
|
||||
}
|
||||
|
||||
err = decoder.Decode(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode configuration: %w", err)
|
||||
}
|
||||
|
||||
_, err = descriptor.interpreter.Eval(`package wrapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
` + basePkg + ` "` + descriptor.Import + `"
|
||||
"github.com/traefik/traefik/v2/pkg/plugins"
|
||||
)
|
||||
|
||||
func NewWrapper(ctx context.Context, config *` + basePkg + `.Config, name string) (plugins.PP, error) {
|
||||
p, err := ` + basePkg + `.New(ctx, config, name)
|
||||
var pv plugins.PP = p
|
||||
return pv, err
|
||||
}
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to eval wrapper: %w", err)
|
||||
}
|
||||
|
||||
fnNew, err := descriptor.interpreter.Eval("wrapper.NewWrapper")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to eval New: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
args := []reflect.Value{reflect.ValueOf(ctx), vConfig, reflect.ValueOf(providerName)}
|
||||
results := fnNew.Call(args)
|
||||
|
||||
if len(results) > 1 && results[1].Interface() != nil {
|
||||
return nil, results[1].Interface().(error)
|
||||
}
|
||||
|
||||
prov, ok := results[0].Interface().(PP)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid provider type: %T", results[0].Interface())
|
||||
}
|
||||
|
||||
return &Provider{name: providerName, pp: prov}, nil
|
||||
}
|
||||
|
||||
// Init wraps the Init method of a plugin.
|
||||
func (p *Provider) Init() error {
|
||||
return p.pp.Init()
|
||||
}
|
||||
|
||||
// Provide wraps the Provide method of a plugin.
|
||||
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.WithoutContext().WithField(log.ProviderName, p.name).Errorf("panic inside the plugin %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
cfgChan := make(chan json.Marshaler)
|
||||
|
||||
err := p.pp.Provide(cfgChan)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error from %s: %w", p.name, err)
|
||||
}
|
||||
|
||||
pool.GoCtx(func(ctx context.Context) {
|
||||
logger := log.FromContext(log.With(ctx, log.Str(log.ProviderName, p.name)))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := p.pp.Stop()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to stop the provider: %v", err)
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
case cfgPg := <-cfgChan:
|
||||
marshalJSON, err := cfgPg.MarshalJSON()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal configuration: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
cfg := &dynamic.Configuration{}
|
||||
err = json.Unmarshal(marshalJSON, cfg)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to unmarshal configuration: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: p.name,
|
||||
Configuration: cfg,
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue