Skip to content

Commit 18c2c1f

Browse files
authored
fix: delay accept new connections if out of fds (#311)
1 parent 4ae6c36 commit 18c2c1f

File tree

4 files changed

+166
-23
lines changed

4 files changed

+166
-23
lines changed

net_listener.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
9191
// tcp
9292
var fd, sa, err = syscall.Accept(ln.fd)
9393
if err != nil {
94-
if err == syscall.EAGAIN {
94+
/* https://man7.org/linux/man-pages/man2/accept.2.html
95+
EAGAIN or EWOULDBLOCK
96+
The socket is marked nonblocking and no connections are
97+
present to be accepted. POSIX.1-2001 and POSIX.1-2008
98+
allow either error to be returned for this case, and do
99+
not require these constants to have the same value, so a
100+
portable application should check for both possibilities.
101+
*/
102+
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
95103
return nil, nil
96104
}
97105
return nil, err

netpoll_server.go

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"strings"
2424
"sync"
25+
"syscall"
2526
"time"
2627
)
2728

@@ -92,39 +93,86 @@ func (s *server) Close(ctx context.Context) error {
9293
func (s *server) OnRead(p Poll) error {
9394
// accept socket
9495
conn, err := s.ln.Accept()
95-
if err != nil {
96-
// shut down
97-
if strings.Contains(err.Error(), "closed") {
98-
s.operator.Control(PollDetach)
99-
s.onQuit(err)
96+
if err == nil {
97+
if conn != nil {
98+
s.onAccept(conn.(Conn))
99+
}
100+
// EAGAIN | EWOULDBLOCK if conn and err both nil
101+
return nil
102+
}
103+
logger.Printf("NETPOLL: accept conn failed: %v", err)
104+
105+
// delay accept when too many open files
106+
if isOutOfFdErr(err) {
107+
// since we use Epoll LT, we have to detach listener fd from epoll first
108+
// and re-register it when accept successfully or there is no available connection
109+
cerr := s.operator.Control(PollDetach)
110+
if cerr != nil {
111+
logger.Printf("NETPOLL: detach listener fd failed: %v", cerr)
100112
return err
101113
}
102-
logger.Println("NETPOLL: accept conn failed:", err.Error())
103-
return err
114+
go func() {
115+
retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms
116+
retryTimeIndex := 0
117+
for {
118+
if retryTimeIndex > 0 {
119+
time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond)
120+
}
121+
conn, err := s.ln.Accept()
122+
if err == nil {
123+
if conn == nil {
124+
// recovery accept poll loop
125+
s.operator.Control(PollReadable)
126+
return
127+
}
128+
s.onAccept(conn.(Conn))
129+
logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr())
130+
retryTimeIndex = 0
131+
continue
132+
}
133+
if retryTimeIndex+1 < len(retryTimes) {
134+
retryTimeIndex++
135+
}
136+
logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex])
137+
}
138+
}()
104139
}
105-
if conn == nil {
106-
return nil
140+
141+
// shut down
142+
if strings.Contains(err.Error(), "closed") {
143+
s.operator.Control(PollDetach)
144+
s.onQuit(err)
145+
return err
107146
}
147+
148+
return err
149+
}
150+
151+
// OnHup implements FDOperator.
152+
func (s *server) OnHup(p Poll) error {
153+
s.onQuit(errors.New("listener close"))
154+
return nil
155+
}
156+
157+
func (s *server) onAccept(conn Conn) {
108158
// store & register connection
109-
var connection = &connection{}
110-
connection.init(conn.(Conn), s.opts)
111-
if !connection.IsActive() {
112-
return nil
159+
var nconn = new(connection)
160+
nconn.init(conn, s.opts)
161+
if !nconn.IsActive() {
162+
return
113163
}
114-
var fd = conn.(Conn).Fd()
115-
connection.AddCloseCallback(func(connection Connection) error {
164+
var fd = conn.Fd()
165+
nconn.AddCloseCallback(func(connection Connection) error {
116166
s.connections.Delete(fd)
117167
return nil
118168
})
119-
s.connections.Store(fd, connection)
169+
s.connections.Store(fd, nconn)
120170

121171
// trigger onConnect asynchronously
122-
connection.onConnect()
123-
return nil
172+
nconn.onConnect()
124173
}
125174

126-
// OnHup implements FDOperator.
127-
func (s *server) OnHup(p Poll) error {
128-
s.onQuit(errors.New("listener close"))
129-
return nil
175+
func isOutOfFdErr(err error) bool {
176+
se, ok := err.(syscall.Errno)
177+
return ok && (se == syscall.EMFILE || se == syscall.ENFILE)
130178
}

netpoll_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"context"
2222
"errors"
2323
"math/rand"
24+
"os"
2425
"runtime"
2526
"sync"
2627
"sync/atomic"
28+
"syscall"
2729
"testing"
2830
"time"
2931
)
@@ -507,6 +509,78 @@ func TestClientWriteAndClose(t *testing.T) {
507509
MustNil(t, err)
508510
}
509511

512+
func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) {
513+
if os.Getenv("N_LOCAL") == "" {
514+
t.Skip("Only test for debug purpose")
515+
return
516+
}
517+
518+
var originalRlimit syscall.Rlimit
519+
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
520+
MustNil(t, err)
521+
t.Logf("Original RLimit: %v", originalRlimit)
522+
523+
rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max}
524+
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit)
525+
MustNil(t, err)
526+
err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit)
527+
MustNil(t, err)
528+
t.Logf("New RLimit: %v", rlimit)
529+
defer func() { // reset
530+
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
531+
MustNil(t, err)
532+
}()
533+
534+
var network, address = "tcp", ":18888"
535+
var connected int32
536+
var loop = newTestEventLoop(network, address,
537+
func(ctx context.Context, connection Connection) error {
538+
buf, err := connection.Reader().Next(connection.Reader().Len())
539+
connection.Writer().WriteBinary(buf)
540+
connection.Writer().Flush()
541+
return err
542+
},
543+
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
544+
atomic.AddInt32(&connected, 1)
545+
t.Logf("Conn[%s] accpeted", connection.RemoteAddr())
546+
return ctx
547+
}),
548+
WithOnDisconnect(func(ctx context.Context, connection Connection) {
549+
t.Logf("Conn[%s] disconnected", connection.RemoteAddr())
550+
}),
551+
)
552+
time.Sleep(time.Millisecond * 10)
553+
554+
// out of fds
555+
files := make([]*os.File, 0)
556+
for {
557+
f, err := os.Open("/dev/null")
558+
if err != nil {
559+
Assert(t, isOutOfFdErr(errors.Unwrap(err)), err)
560+
break
561+
}
562+
files = append(files, f)
563+
}
564+
go func() {
565+
time.Sleep(time.Second * 10)
566+
t.Logf("close all files")
567+
for _, f := range files {
568+
f.Close()
569+
}
570+
}()
571+
572+
// we should use telnet manually
573+
var connections = 1
574+
for atomic.LoadInt32(&connected) < int32(connections) {
575+
t.Logf("connected=%d", atomic.LoadInt32(&connected))
576+
time.Sleep(time.Second)
577+
}
578+
time.Sleep(time.Second * 10)
579+
580+
err = loop.Shutdown(context.Background())
581+
MustNil(t, err)
582+
}
583+
510584
func createTestListener(network, address string) (Listener, error) {
511585
for {
512586
ln, err := CreateListener(network, address)

test_conns.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env bash
2+
3+
ip="$1"
4+
port="$2"
5+
conns="$3"
6+
timeout="$4"
7+
8+
for i in $(seq 1 $conns);
9+
do
10+
nc -v -w $timeout $ip $port < /dev/null &
11+
done
12+
13+
wait

0 commit comments

Comments
 (0)