1
0
Fork 0

Merge current v2.5 into master

This commit is contained in:
Tom Moulard 2021-12-06 17:19:29 +01:00
commit 89cd9e8ddd
No known key found for this signature in database
GPG key ID: 521ABE0C1A0DEAF6
102 changed files with 2402 additions and 1429 deletions

View file

@ -1,3 +1,5 @@
# This file is not meant as a usable Traefik configuration.
# It is only used in tests as a sample for serialization.
[global]
checkNewVersion = true
sendAnonymousUsage = true

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"os"
"github.com/traefik/yaegi/interp"
"github.com/traefik/yaegi/stdlib"
@ -46,7 +47,7 @@ func NewBuilder(client *Client, plugins map[string]Descriptor, localPlugins map[
return nil, fmt.Errorf("%s: failed to read manifest: %w", desc.ModuleName, err)
}
i := interp.New(interp.Options{GoPath: client.GoPath()})
i := interp.New(interp.Options{GoPath: client.GoPath(), Env: os.Environ()})
err = i.Use(stdlib.Symbols)
if err != nil {
@ -89,7 +90,7 @@ func NewBuilder(client *Client, plugins map[string]Descriptor, localPlugins map[
return nil, fmt.Errorf("%s: failed to read manifest: %w", desc.ModuleName, err)
}
i := interp.New(interp.Options{GoPath: localGoPath})
i := interp.New(interp.Options{GoPath: localGoPath, Env: os.Environ()})
err = i.Use(stdlib.Symbols)
if err != nil {

View file

@ -9,6 +9,24 @@ spec:
prefixes:
- /tobestripped
---
apiVersion: traefik.containo.us/v1alpha1
kind: Middleware
metadata:
name: ratelimit
namespace: default
spec:
rateLimit:
period: 1m
average: 6
burst: 12
sourceCriterion:
ipStrategy:
excludedIPs:
- 127.0.0.1/32
- 192.168.1.7
---
apiVersion: traefik.containo.us/v1alpha1
kind: Middleware
@ -40,5 +58,6 @@ spec:
port: 80
middlewares:
- name: stripprefix
- name: ratelimit
- name: addprefix
namespace: foo

View file

@ -443,6 +443,10 @@ func createRateLimitMiddleware(rateLimit *v1alpha1.RateLimit) (*dynamic.RateLimi
}
}
if rateLimit.SourceCriterion != nil {
rl.SourceCriterion = rateLimit.SourceCriterion
}
return rl, nil
}

View file

@ -1472,10 +1472,22 @@ func TestLoadIngressRoutes(t *testing.T) {
Service: "default-test2-route-23c7f4c450289ee29016",
Rule: "Host(`foo.com`) && PathPrefix(`/tobestripped`)",
Priority: 12,
Middlewares: []string{"default-stripprefix", "foo-addprefix"},
Middlewares: []string{"default-stripprefix", "default-ratelimit", "foo-addprefix"},
},
},
Middlewares: map[string]*dynamic.Middleware{
"default-ratelimit": {
RateLimit: &dynamic.RateLimit{
Average: 6,
Burst: 12,
Period: ptypes.Duration(60 * time.Second),
SourceCriterion: &dynamic.SourceCriterion{
IPStrategy: &dynamic.IPStrategy{
ExcludedIPs: []string{"127.0.0.1/32", "192.168.1.7"},
},
},
},
},
"default-stripprefix": {
StripPrefix: &dynamic.StripPrefix{
Prefixes: []string{"/tobestripped"},

View file

@ -8,7 +8,8 @@ import (
"time"
)
const receiveMTU = 8192
// maxDatagramSize is the maximum size of a UDP datagram.
const maxDatagramSize = 65535
const closeRetryInterval = 500 * time.Millisecond
@ -135,7 +136,8 @@ func (l *Listener) readLoop() {
// Allocating a new buffer for every read avoids
// overwriting data in c.msgs in case the next packet is received
// before c.msgs is emptied via Read()
buf := make([]byte, receiveMTU)
buf := make([]byte, maxDatagramSize)
n, raddr, err := l.pConn.ReadFrom(buf)
if err != nil {
return
@ -144,6 +146,7 @@ func (l *Listener) readLoop() {
if err != nil {
continue
}
select {
case conn.receiveCh <- buf[:n]:
case <-conn.doneCh:
@ -249,7 +252,9 @@ func (c *Conn) readLoop() {
}
}
// Read implements io.Reader for a Conn.
// Read reads up to len(p) bytes into p from the connection.
// Each call corresponds to at most one datagram.
// If p is smaller than the datagram, the extra bytes will be discarded.
func (c *Conn) Read(p []byte) (int, error) {
select {
case c.readCh <- p:
@ -258,22 +263,21 @@ func (c *Conn) Read(p []byte) (int, error) {
c.lastActivity = time.Now()
c.muActivity.Unlock()
return n, nil
case <-c.doneCh:
return 0, io.EOF
}
}
// Write implements io.Writer for a Conn.
// Write writes len(p) bytes from p to the underlying connection.
// Each call sends at most one datagram.
// It is an error to send a message larger than the system's max UDP datagram size.
func (c *Conn) Write(p []byte) (n int, err error) {
l := c.listener
if l == nil {
return 0, io.EOF
}
c.muActivity.Lock()
c.lastActivity = time.Now()
c.muActivity.Unlock()
return l.pConn.WriteTo(p, c.rAddr)
return c.listener.pConn.WriteTo(p, c.rAddr)
}
func (c *Conn) close() {

View file

@ -1,9 +1,11 @@
package udp
import (
"crypto/rand"
"errors"
"io"
"net"
"runtime"
"testing"
"time"
@ -317,6 +319,61 @@ func TestShutdown(t *testing.T) {
}
}
func TestReadLoopMaxDataSize(t *testing.T) {
if runtime.GOOS == "darwin" {
// sudo sysctl -w net.inet.udp.maxdgram=65507
t.Skip("Skip test on darwin as the maximum dgram size is set to 9216 bytes by default")
}
// Theoretical maximum size of data in a UDP datagram.
// 65535 8 (UDP header) 20 (IP header).
dataSize := 65507
doneCh := make(chan struct{})
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
l, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)
defer func() {
err := l.Close()
require.NoError(t, err)
}()
go func() {
defer close(doneCh)
conn, err := l.Accept()
require.NoError(t, err)
buffer := make([]byte, dataSize)
n, err := conn.Read(buffer)
require.NoError(t, err)
assert.Equal(t, dataSize, n)
}()
c, err := net.Dial("udp", l.Addr().String())
require.NoError(t, err)
data := make([]byte, dataSize)
_, err = rand.Read(data)
require.NoError(t, err)
_, err = c.Write(data)
require.NoError(t, err)
select {
case <-doneCh:
case <-time.Tick(5 * time.Second):
t.Fatal("Timeout waiting for datagram read")
}
}
// requireEcho tests that the conn session is live and functional,
// by writing data through it, and expecting the same data as a response when reading on it.
// It fatals if the read blocks longer than timeout,

View file

@ -20,14 +20,14 @@ func NewProxy(address string) (*Proxy, error) {
// ServeUDP implements the Handler interface.
func (p *Proxy) ServeUDP(conn *Conn) {
log.Debugf("Handling connection from %s", conn.rAddr)
log.WithoutContext().Debugf("Handling connection from %s", conn.rAddr)
// needed because of e.g. server.trackedConnection
defer conn.Close()
connBackend, err := net.Dial("udp", p.target)
if err != nil {
log.Errorf("Error while connecting to backend: %v", err)
log.WithoutContext().Errorf("Error while connecting to backend: %v", err)
return
}
@ -35,8 +35,8 @@ func (p *Proxy) ServeUDP(conn *Conn) {
defer connBackend.Close()
errChan := make(chan error)
go p.connCopy(conn, connBackend, errChan)
go p.connCopy(connBackend, conn, errChan)
go connCopy(conn, connBackend, errChan)
go connCopy(connBackend, conn, errChan)
err = <-errChan
if err != nil {
@ -46,8 +46,12 @@ func (p *Proxy) ServeUDP(conn *Conn) {
<-errChan
}
func (p Proxy) connCopy(dst io.WriteCloser, src io.Reader, errCh chan error) {
_, err := io.Copy(dst, src)
func connCopy(dst io.WriteCloser, src io.Reader, errCh chan error) {
// The buffer is initialized to the maximum UDP datagram size,
// to make sure that the whole UDP datagram is read or written atomically (no data is discarded).
buffer := make([]byte, maxDatagramSize)
_, err := io.CopyBuffer(dst, src, buffer)
errCh <- err
if err := dst.Close(); err != nil {

View file

@ -1,7 +1,9 @@
package udp
import (
"crypto/rand"
"net"
"runtime"
"testing"
"time"
@ -9,13 +11,14 @@ import (
"github.com/stretchr/testify/require"
)
func TestUDPProxy(t *testing.T) {
func TestProxy_ServeUDP(t *testing.T) {
backendAddr := ":8081"
go newServer(t, ":8081", HandlerFunc(func(conn *Conn) {
go newServer(t, backendAddr, HandlerFunc(func(conn *Conn) {
for {
b := make([]byte, 1024*1024)
n, err := conn.Read(b)
require.NoError(t, err)
_, err = conn.Write(b[:n])
require.NoError(t, err)
}
@ -28,6 +31,7 @@ func TestUDPProxy(t *testing.T) {
go newServer(t, proxyAddr, proxy)
time.Sleep(time.Second)
udpConn, err := net.Dial("udp", proxyAddr)
require.NoError(t, err)
@ -37,9 +41,58 @@ func TestUDPProxy(t *testing.T) {
b := make([]byte, 1024*1024)
n, err := udpConn.Read(b)
require.NoError(t, err)
assert.Equal(t, "DATAWRITE", string(b[:n]))
}
func TestProxy_ServeUDP_MaxDataSize(t *testing.T) {
if runtime.GOOS == "darwin" {
// sudo sysctl -w net.inet.udp.maxdgram=65507
t.Skip("Skip test on darwin as the maximum dgram size is set to 9216 bytes by default")
}
// Theoretical maximum size of data in a UDP datagram.
// 65535 8 (UDP header) 20 (IP header).
dataSize := 65507
backendAddr := ":8083"
go newServer(t, backendAddr, HandlerFunc(func(conn *Conn) {
buffer := make([]byte, dataSize)
n, err := conn.Read(buffer)
require.NoError(t, err)
_, err = conn.Write(buffer[:n])
require.NoError(t, err)
}))
proxy, err := NewProxy(backendAddr)
require.NoError(t, err)
proxyAddr := ":8082"
go newServer(t, proxyAddr, proxy)
time.Sleep(time.Second)
udpConn, err := net.Dial("udp", proxyAddr)
require.NoError(t, err)
want := make([]byte, dataSize)
_, err = rand.Read(want)
require.NoError(t, err)
_, err = udpConn.Write(want)
require.NoError(t, err)
got := make([]byte, dataSize)
_, err = udpConn.Read(got)
require.NoError(t, err)
assert.Equal(t, want, got)
}
func newServer(t *testing.T, addr string, handler Handler) {
t.Helper()
@ -52,6 +105,7 @@ func newServer(t *testing.T, addr string, handler Handler) {
for {
conn, err := listener.Accept()
require.NoError(t, err)
go handler.ServeUDP(conn)
}
}