Feature: add udp timeout configuration
This commit is contained in:
parent
e5a01c7cc8
commit
fc7ec17905
13 changed files with 98 additions and 22 deletions
|
@ -12,12 +12,6 @@ const receiveMTU = 8192
|
|||
|
||||
const closeRetryInterval = 500 * time.Millisecond
|
||||
|
||||
// connTimeout determines how long to wait on an idle session,
|
||||
// before releasing all resources related to that session.
|
||||
const connTimeout = 3 * time.Second
|
||||
|
||||
var timeoutTicker = connTimeout / 10
|
||||
|
||||
var errClosedListener = errors.New("udp: listener closed")
|
||||
|
||||
// Listener augments a session-oriented Listener over a UDP PacketConn.
|
||||
|
@ -31,10 +25,18 @@ type Listener struct {
|
|||
accepting bool
|
||||
|
||||
acceptCh chan *Conn // no need for a Once, already indirectly guarded by accepting.
|
||||
|
||||
// timeout defines how long to wait on an idle session,
|
||||
// before releasing its related resources.
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// Listen creates a new listener.
|
||||
func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
|
||||
func Listen(network string, laddr *net.UDPAddr, timeout time.Duration) (*Listener, error) {
|
||||
if timeout <= 0 {
|
||||
return nil, errors.New("timeout should be greater than zero")
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP(network, laddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -45,6 +47,7 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
|
|||
acceptCh: make(chan *Conn),
|
||||
conns: make(map[string]*Conn),
|
||||
accepting: true,
|
||||
timeout: timeout,
|
||||
}
|
||||
|
||||
go l.readLoop()
|
||||
|
@ -179,7 +182,7 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn {
|
|||
readCh: make(chan []byte),
|
||||
sizeCh: make(chan int),
|
||||
doneCh: make(chan struct{}),
|
||||
timeout: timeoutTicker,
|
||||
timeout: l.timeout,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,7 +209,7 @@ type Conn struct {
|
|||
// that is to say it waits on readCh to receive the slice of bytes that the Read operation wants to read onto.
|
||||
// The Read operation receives the signal that the data has been written to the slice of bytes through the sizeCh.
|
||||
func (c *Conn) readLoop() {
|
||||
ticker := time.NewTicker(c.timeout)
|
||||
ticker := time.NewTicker(c.timeout / 10)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
|
@ -216,7 +219,7 @@ func (c *Conn) readLoop() {
|
|||
c.msgs = append(c.msgs, msg)
|
||||
case <-ticker.C:
|
||||
c.muActivity.RLock()
|
||||
deadline := c.lastActivity.Add(connTimeout)
|
||||
deadline := c.lastActivity.Add(c.timeout)
|
||||
c.muActivity.RUnlock()
|
||||
if time.Now().After(deadline) {
|
||||
c.Close()
|
||||
|
@ -236,7 +239,7 @@ func (c *Conn) readLoop() {
|
|||
c.msgs = append(c.msgs, msg)
|
||||
case <-ticker.C:
|
||||
c.muActivity.RLock()
|
||||
deadline := c.lastActivity.Add(connTimeout)
|
||||
deadline := c.lastActivity.Add(c.timeout)
|
||||
c.muActivity.RUnlock()
|
||||
if time.Now().After(deadline) {
|
||||
c.Close()
|
||||
|
|
|
@ -15,7 +15,7 @@ func TestConsecutiveWrites(t *testing.T) {
|
|||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
ln, err := Listen("udp", addr)
|
||||
ln, err := Listen("udp", addr, 3*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := ln.Close()
|
||||
|
@ -77,7 +77,7 @@ func TestListenNotBlocking(t *testing.T) {
|
|||
|
||||
require.NoError(t, err)
|
||||
|
||||
ln, err := Listen("udp", addr)
|
||||
ln, err := Listen("udp", addr, 3*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := ln.Close()
|
||||
|
@ -162,6 +162,14 @@ func TestListenNotBlocking(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestListenWithZeroTimeout(t *testing.T) {
|
||||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = Listen("udp", addr, 0)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestTimeoutWithRead(t *testing.T) {
|
||||
testTimeout(t, true)
|
||||
}
|
||||
|
@ -176,7 +184,7 @@ func testTimeout(t *testing.T, withRead bool) {
|
|||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
ln, err := Listen("udp", addr)
|
||||
ln, err := Listen("udp", addr, 3*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := ln.Close()
|
||||
|
@ -212,7 +220,7 @@ func testTimeout(t *testing.T, withRead bool) {
|
|||
|
||||
assert.Equal(t, 10, len(ln.conns))
|
||||
|
||||
time.Sleep(4 * time.Second)
|
||||
time.Sleep(ln.timeout + time.Second)
|
||||
assert.Equal(t, 0, len(ln.conns))
|
||||
}
|
||||
|
||||
|
@ -220,7 +228,7 @@ func TestShutdown(t *testing.T) {
|
|||
addr, err := net.ResolveUDPAddr("udp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
l, err := Listen("udp", addr)
|
||||
l, err := Listen("udp", addr, 3*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -46,7 +46,7 @@ func newServer(t *testing.T, addr string, handler Handler) {
|
|||
addrL, err := net.ResolveUDPAddr("udp", addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
listener, err := Listen("udp", addrL)
|
||||
listener, err := Listen("udp", addrL, 3*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
for {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue