Skip to content

Commit 59aa7cb

Browse files
committed
fix: race condition
1 parent 5995a5b commit 59aa7cb

8 files changed

+133
-120
lines changed

connection_impl.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -503,17 +503,10 @@ func (c *connection) flush() error {
503503
if c.outputBuffer.IsEmpty() {
504504
return nil
505505
}
506-
if c.operator.getMode() == ophup {
507-
// triggered read throttled, so here shouldn't trigger read event again
508-
err = c.operator.Control(PollHup2W)
509-
} else {
510-
err = c.operator.Control(PollR2RW)
511-
}
512-
c.operator.done()
513-
if err != nil {
514-
return Exception(err, "when flush")
515-
}
516506

507+
// no need to check if resume write successfully
508+
// if resume failed, the connection will be triggered triggerWrite(err), and waitFlush will return err
509+
c.resumeWrite()
517510
return c.waitFlush()
518511
}
519512

@@ -546,8 +539,8 @@ func (c *connection) waitFlush() (err error) {
546539
default:
547540
}
548541
// if timeout, remove write event from poller
549-
// we cannot flush it again, since we don't if the poller is still process outputBuffer
550-
c.operator.Control(PollRW2R)
542+
// we cannot flush it again, since we don't know if the poller is still processing outputBuffer
543+
c.pauseWrite()
551544
return Exception(ErrWriteTimeout, c.remoteAddr.String())
552545
}
553546
}

connection_reactor.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *connection) closeBuffer() {
8080

8181
// inputs implements FDOperator.
8282
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
83+
// trigger throttle
84+
if c.readBufferThreshold > 0 && int64(c.inputBuffer.Len()) >= c.readBufferThreshold {
85+
c.pauseRead()
86+
return
87+
}
88+
8389
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
8490
return vs[:1]
8591
}
@@ -123,6 +129,7 @@ func (c *connection) inputAck(n int) (err error) {
123129
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
124130
if c.outputBuffer.IsEmpty() {
125131
c.pauseWrite()
132+
c.triggerWrite(nil)
126133
return rs, c.supportZeroCopy
127134
}
128135
rs = c.outputBuffer.GetBytes(vs)
@@ -137,50 +144,43 @@ func (c *connection) outputAck(n int) (err error) {
137144
}
138145
if c.outputBuffer.IsEmpty() {
139146
c.pauseWrite()
147+
c.triggerWrite(nil)
140148
}
141149
return nil
142150
}
143151

152+
/* The race description of operator event monitoring
153+
- Pause operation will remove old event monitor of operator
154+
- Resume operation will add new event monitor of operator
155+
- Only poller could use Pause to remove event monitor, and poller already hold the op.do() locker
156+
- Only user could use Resume, and user's operation maybe compete with poller's operation
157+
- If competition happen, because of all resume operation will monitor all events, it's safe to do that with a race condition.
158+
* If resume first and pause latter, poller will monitor the accurate events it needs.
159+
* If pause first and resume latter, poller will monitor the duplicate events which will be removed after next poller triggered.
160+
And poller will ensure to remove the duplicate events.
161+
- If there is no readBufferThreshold option, the code path will be more simple and efficient.
162+
*/
163+
144164
// pauseWrite removed the monitoring of write events.
145165
// pauseWrite used in poller
146166
func (c *connection) pauseWrite() {
147-
switch c.operator.getMode() {
148-
case opreadwrite:
149-
c.operator.Control(PollRW2R)
150-
case opwrite:
151-
c.operator.Control(PollW2Hup)
152-
}
153-
c.triggerWrite(nil)
167+
c.operator.Control(PollRW2R)
168+
}
169+
170+
// resumeWrite add the monitoring of write events.
171+
// resumeWrite used by users
172+
func (c *connection) resumeWrite() {
173+
c.operator.Control(PollR2RW)
154174
}
155175

156176
// pauseRead removed the monitoring of read events.
157177
// pauseRead used in poller
158178
func (c *connection) pauseRead() {
159-
// Note that the poller ensure that every fd should read all left data in socket buffer before detach it.
160-
// So the operator mode should never be ophup.
161-
var changeTo PollEvent
162-
switch c.operator.getMode() {
163-
case opread:
164-
changeTo = PollR2Hup
165-
case opreadwrite:
166-
changeTo = PollRW2W
167-
}
168-
if changeTo > 0 && atomic.CompareAndSwapInt32(&c.operator.throttled, 0, 1) {
169-
c.operator.Control(changeTo)
170-
}
179+
c.operator.Control(PollRW2W)
171180
}
172181

173182
// resumeRead add the monitoring of read events.
174183
// resumeRead used by users
175184
func (c *connection) resumeRead() {
176-
var changeTo PollEvent
177-
switch c.operator.getMode() {
178-
case ophup:
179-
changeTo = PollHup2R
180-
case opwrite:
181-
changeTo = PollW2RW
182-
}
183-
if changeTo > 0 && atomic.CompareAndSwapInt32(&c.operator.throttled, 1, 0) {
184-
c.operator.Control(changeTo)
185-
}
185+
c.operator.Control(PollW2RW)
186186
}

connection_test.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -784,18 +784,10 @@ func TestConnectionReadThresholdWithClosed(t *testing.T) {
784784
MustNil(t, err)
785785
t.Logf("read non-throttled data")
786786

787-
// continue read throttled data
787+
// continue read throttled data with EOF
788788
buf, err = connection.Reader().Next(5)
789-
MustNil(t, err)
790-
t.Logf("read throttled data: [%s]", buf)
791789
Equal(t, len(buf), 5)
792790
MustNil(t, err)
793-
err = connection.Reader().Release()
794-
MustNil(t, err)
795-
Equal(t, connection.Reader().Len(), 0)
796-
797-
_, err = connection.Reader().Next(1)
798-
Assert(t, errors.Is(err, ErrEOF))
799791
trigger <- struct{}{}
800792
return nil
801793
}

fd_operator.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const (
2525
opread int32 = 1
2626
opwrite int32 = 2
2727
opreadwrite int32 = 3
28-
ophup int32 = 4
2928
)
3029

3130
// FDOperator is a collection of operations on file descriptors.
@@ -51,8 +50,8 @@ type FDOperator struct {
5150
// poll is the registered location of the file descriptor.
5251
poll Poll
5352

54-
mode int32
55-
throttled int32
53+
// protect only detach once
54+
detached int32
5655

5756
// private, used by operatorCache
5857
next *FDOperator
@@ -61,21 +60,16 @@ type FDOperator struct {
6160
}
6261

6362
func (op *FDOperator) Control(event PollEvent) error {
63+
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
64+
return nil
65+
}
6466
return op.poll.Control(op, event)
6567
}
6668

6769
func (op *FDOperator) Free() {
6870
op.poll.Free(op)
6971
}
7072

71-
func (op *FDOperator) getMode() int32 {
72-
return atomic.LoadInt32(&op.mode)
73-
}
74-
75-
func (op *FDOperator) setMode(mode int32) {
76-
atomic.StoreInt32(&op.mode, mode)
77-
}
78-
7973
func (op *FDOperator) do() (can bool) {
8074
return atomic.CompareAndSwapInt32(&op.state, 1, 2)
8175
}
@@ -112,6 +106,5 @@ func (op *FDOperator) reset() {
112106
op.Inputs, op.InputAck = nil, nil
113107
op.Outputs, op.OutputAck = nil, nil
114108
op.poll = nil
115-
op.mode = 0
116-
op.throttled = 0
109+
op.detached = 0
117110
}

poll.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,23 @@ type PollEvent int
4848
const (
4949
// PollReadable is used to monitor whether the FDOperator registered by
5050
// listener and connection is readable or closed.
51-
PollReadable PollEvent = 0x1
51+
PollReadable PollEvent = iota + 1
5252

5353
// PollWritable is used to monitor whether the FDOperator created by the dialer is writable or closed.
5454
// ET mode must be used (still need to poll hup after being writable)
55-
PollWritable PollEvent = 0x2
55+
PollWritable
5656

5757
// PollDetach is used to remove the FDOperator from poll.
58-
PollDetach PollEvent = 0x3
58+
PollDetach
5959

6060
// PollR2RW is used to monitor writable for FDOperator,
6161
// which is only called when the socket write buffer is full.
62-
PollR2RW PollEvent = 0x4
62+
PollR2RW
6363
// PollRW2R is used to remove the writable monitor of FDOperator, generally used with PollR2RW.
64-
PollRW2R PollEvent = 0x5
64+
PollRW2R
6565

6666
// PollRW2W is used to remove the readable monitor of FDOperator.
67-
PollRW2W PollEvent = 0x6
67+
PollRW2W
6868
// PollW2RW is used to add the readable monitor of FDOperator, generally used with PollRW2W.
69-
PollW2RW PollEvent = 0x7
70-
PollW2Hup PollEvent = 0x8
71-
72-
// PollR2Hup is used to remove the readable monitor of FDOperator.
73-
PollR2Hup PollEvent = 0x9
74-
// PollHup2R is used to add the readable monitor of FDOperator, generally used with PollR2Hup.
75-
PollHup2R PollEvent = 0xA
76-
// PollHup2W is used to add the writeable monitor of FDOperator.
77-
PollHup2W PollEvent = 0xB
69+
PollW2RW
7870
)

poll_default_bsd.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (p *defaultPoll) Wait() error {
116116
}
117117
if triggerHup {
118118
// if peer closed with throttled state, we should ensure we read all left data to avoid data loss
119-
if (triggerRead || atomic.LoadInt32(&operator.throttled) > 0) && operator.Inputs != nil {
119+
if triggerRead && operator.Inputs != nil {
120120
var leftRead int
121121
// read all left data if peer send and close
122122
if leftRead, err = readall(operator, barriers[i]); err != nil && !errors.Is(err, ErrEOF) {
@@ -183,44 +183,25 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
183183
switch event {
184184
case PollReadable:
185185
operator.inuse()
186-
operator.setMode(opread)
187186
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
188187
case PollWritable:
189188
operator.inuse()
190-
operator.setMode(opwrite)
191189
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
192190
case PollDetach:
193-
operator.setMode(ophup)
194191
if operator.OnWrite != nil { // means WaitWrite finished
195192
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
196193
} else {
197194
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
198195
}
199196
p.delOperator(operator)
200197
case PollR2RW:
201-
operator.setMode(opreadwrite)
202198
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
203199
case PollRW2R:
204-
operator.setMode(opread)
205200
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
206201
case PollRW2W:
207-
operator.setMode(opwrite)
208202
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
209203
case PollW2RW:
210-
operator.setMode(opreadwrite)
211204
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
212-
case PollR2Hup:
213-
operator.setMode(ophup)
214-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
215-
case PollW2Hup:
216-
operator.setMode(ophup)
217-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
218-
case PollHup2R:
219-
operator.setMode(opread)
220-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
221-
case PollHup2W:
222-
operator.setMode(opwrite)
223-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
224205
}
225206
_, err := syscall.Kevent(p.fd, evs, nil, nil)
226207
return err

poll_default_bsd_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2024 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build darwin
16+
// +build darwin
17+
18+
package netpoll
19+
20+
import (
21+
"syscall"
22+
"testing"
23+
)
24+
25+
func TestKqueueEvent(t *testing.T) {
26+
kqfd, err := syscall.Kqueue()
27+
defer syscall.Close(kqfd)
28+
_, err = syscall.Kevent(kqfd, []syscall.Kevent_t{{
29+
Ident: 0,
30+
Filter: syscall.EVFILT_USER,
31+
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
32+
}}, nil, nil)
33+
MustNil(t, err)
34+
35+
rfd, wfd := GetSysFdPairs()
36+
defer syscall.Close(rfd)
37+
defer syscall.Close(wfd)
38+
39+
// add read event
40+
changes := make([]syscall.Kevent_t, 1)
41+
changes[0].Ident = uint64(rfd)
42+
changes[0].Filter = syscall.EVFILT_READ
43+
changes[0].Flags = syscall.EV_ADD
44+
_, err = syscall.Kevent(kqfd, changes, nil, nil)
45+
MustNil(t, err)
46+
47+
// write
48+
send := []byte("hello")
49+
recv := make([]byte, 5)
50+
_, err = syscall.Write(wfd, send)
51+
MustNil(t, err)
52+
53+
// check readable
54+
events := make([]syscall.Kevent_t, 128)
55+
n, err := syscall.Kevent(kqfd, nil, events, nil)
56+
MustNil(t, err)
57+
Equal(t, n, 1)
58+
Assert(t, events[0].Filter == syscall.EVFILT_READ)
59+
// read
60+
_, err = syscall.Read(rfd, recv)
61+
MustNil(t, err)
62+
Equal(t, string(recv), string(send))
63+
64+
// delete read
65+
changes[0].Ident = uint64(rfd)
66+
changes[0].Filter = syscall.EVFILT_READ
67+
changes[0].Flags = syscall.EV_DELETE
68+
_, err = syscall.Kevent(kqfd, changes, nil, nil)
69+
MustNil(t, err)
70+
71+
// write
72+
_, err = syscall.Write(wfd, send)
73+
MustNil(t, err)
74+
75+
// check readable
76+
n, err = syscall.Kevent(kqfd, nil, events, &syscall.Timespec{Sec: 1})
77+
MustNil(t, err)
78+
Equal(t, n, 0)
79+
// read
80+
_, err = syscall.Read(rfd, recv)
81+
MustNil(t, err)
82+
Equal(t, string(recv), string(send))
83+
}

0 commit comments

Comments
 (0)