Merge remote-tracking branch 'upstream/v2.2' into mrg-current-v2.2
This commit is contained in:
commit
7affeae480
12 changed files with 136 additions and 55 deletions
|
@ -132,18 +132,18 @@ func (x *XForwarded) rewrite(outreq *http.Request) {
|
|||
|
||||
xfProto := outreq.Header.Get(xForwardedProto)
|
||||
if xfProto == "" {
|
||||
if outreq.TLS != nil {
|
||||
outreq.Header.Set(xForwardedProto, "https")
|
||||
if isWebsocketRequest(outreq) {
|
||||
if outreq.TLS != nil {
|
||||
outreq.Header.Set(xForwardedProto, "wss")
|
||||
} else {
|
||||
outreq.Header.Set(xForwardedProto, "ws")
|
||||
}
|
||||
} else {
|
||||
outreq.Header.Set(xForwardedProto, "http")
|
||||
}
|
||||
}
|
||||
|
||||
if isWebsocketRequest(outreq) {
|
||||
if outreq.Header.Get(xForwardedProto) == "https" || outreq.Header.Get(xForwardedProto) == "wss" {
|
||||
outreq.Header.Set(xForwardedProto, "wss")
|
||||
} else {
|
||||
outreq.Header.Set(xForwardedProto, "ws")
|
||||
if outreq.TLS != nil {
|
||||
outreq.Header.Set(xForwardedProto, "https")
|
||||
} else {
|
||||
outreq.Header.Set(xForwardedProto, "http")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -345,7 +345,23 @@ func (p Provider) getIPAddress(ctx context.Context, container dockerData) string
|
|||
logger.Warnf("Unable to get IP address for container %s : Failed to inspect container ID %s, error: %s", container.Name, connectedContainer, err)
|
||||
return ""
|
||||
}
|
||||
return p.getIPAddress(ctx, parseContainer(containerInspected))
|
||||
|
||||
// Check connected container for traefik.docker.network, falling back to
|
||||
// the network specified on the current container.
|
||||
containerParsed := parseContainer(containerInspected)
|
||||
extraConf, err := p.getConfiguration(containerParsed)
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("Unable to get IP address for container %s : failed to get extra configuration for container %s: %s", container.Name, containerInspected.Name, err)
|
||||
return ""
|
||||
}
|
||||
|
||||
if extraConf.Docker.Network == "" {
|
||||
extraConf.Docker.Network = container.ExtraConf.Docker.Network
|
||||
}
|
||||
|
||||
containerParsed.ExtraConf = extraConf
|
||||
return p.getIPAddress(ctx, containerParsed)
|
||||
}
|
||||
|
||||
for _, network := range container.NetworkSettings.Networks {
|
||||
|
|
|
@ -128,9 +128,11 @@ func (l *Listener) Shutdown(graceTimeout time.Duration) error {
|
|||
// we find that session, and otherwise we create a new one.
|
||||
// We then send the data the session's readLoop.
|
||||
func (l *Listener) readLoop() {
|
||||
buf := make([]byte, receiveMTU)
|
||||
|
||||
for {
|
||||
// Allocating a new buffer for every read avoids
|
||||
// overwriting data in c.msgs in case the next packet is received
|
||||
// before c.msgs is emptied via Read()
|
||||
buf := make([]byte, receiveMTU)
|
||||
n, raddr, err := l.pConn.ReadFrom(buf)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -177,7 +179,7 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn {
|
|||
readCh: make(chan []byte),
|
||||
sizeCh: make(chan int),
|
||||
doneCh: make(chan struct{}),
|
||||
ticker: time.NewTicker(timeoutTicker),
|
||||
timeout: timeoutTicker,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -194,7 +196,7 @@ type Conn struct {
|
|||
muActivity sync.RWMutex
|
||||
lastActivity time.Time // the last time the session saw either read or write activity
|
||||
|
||||
ticker *time.Ticker // for timeouts
|
||||
timeout time.Duration // for timeouts
|
||||
doneOnce sync.Once
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
@ -204,12 +206,15 @@ type Conn struct {
|
|||
// that is to say it waits on readCh to receive the slice of bytes that the Read operation wants to read onto.
|
||||
// The Read operation receives the signal that the data has been written to the slice of bytes through the sizeCh.
|
||||
func (c *Conn) readLoop() {
|
||||
ticker := time.NewTicker(c.timeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
if len(c.msgs) == 0 {
|
||||
select {
|
||||
case msg := <-c.receiveCh:
|
||||
c.msgs = append(c.msgs, msg)
|
||||
case <-c.ticker.C:
|
||||
case <-ticker.C:
|
||||
c.muActivity.RLock()
|
||||
deadline := c.lastActivity.Add(connTimeout)
|
||||
c.muActivity.RUnlock()
|
||||
|
@ -229,7 +234,7 @@ func (c *Conn) readLoop() {
|
|||
c.sizeCh <- n
|
||||
case msg := <-c.receiveCh:
|
||||
c.msgs = append(c.msgs, msg)
|
||||
case <-c.ticker.C:
|
||||
case <-ticker.C:
|
||||
c.muActivity.RLock()
|
||||
deadline := c.lastActivity.Add(connTimeout)
|
||||
c.muActivity.RUnlock()
|
||||
|
@ -281,6 +286,5 @@ func (c *Conn) Close() error {
|
|||
c.listener.mu.Lock()
|
||||
defer c.listener.mu.Unlock()
|
||||
delete(c.listener.conns, c.rAddr.String())
|
||||
c.ticker.Stop()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -10,6 +10,67 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConsecutiveWrites(t *testing.T) {
|
||||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
ln, err := Listen("udp", addr)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := ln.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err == errClosedListener {
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
go func() {
|
||||
b := make([]byte, 2048)
|
||||
b2 := make([]byte, 2048)
|
||||
var n int
|
||||
var n2 int
|
||||
|
||||
n, err = conn.Read(b)
|
||||
require.NoError(t, err)
|
||||
// Wait to make sure that the second packet is received
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
n2, err = conn.Read(b2)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = conn.Write(b[:n])
|
||||
require.NoError(t, err)
|
||||
_, err = conn.Write(b2[:n2])
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
udpConn, err := net.Dial("udp", ln.Addr().String())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send multiple packets of different content and length consecutively
|
||||
// Read back packets afterwards and make sure that content matches
|
||||
// This checks if any buffers are overwritten while the receiver is enqueuing multiple packets
|
||||
b := make([]byte, 2048)
|
||||
var n int
|
||||
_, err = udpConn.Write([]byte("TESTLONG0"))
|
||||
require.NoError(t, err)
|
||||
_, err = udpConn.Write([]byte("1TEST"))
|
||||
require.NoError(t, err)
|
||||
|
||||
n, err = udpConn.Read(b)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "TESTLONG0", string(b[:n]))
|
||||
n, err = udpConn.Read(b)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "1TEST", string(b[:n]))
|
||||
}
|
||||
|
||||
func TestListenNotBlocking(t *testing.T) {
|
||||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue