1
1
local socket = require " bee.socket"
2
+ local select = require " bee.select"
3
+ local selector = select .create ()
4
+ local SELECT_READ <const> = select .SELECT_READ
5
+ local SELECT_WRITE <const> = select .SELECT_WRITE
2
6
3
- local readfds = {}
4
- local writefds = {}
5
- local map = {}
6
-
7
- local function FD_SET (set , fd )
8
- for i = 1 , # set do
9
- if fd == set [i ] then
10
- return
11
- end
7
+ local function fd_set_read (s )
8
+ if s ._flags & SELECT_READ ~= 0 then
9
+ return
12
10
end
13
- set [# set + 1 ] = fd
11
+ s ._flags = s ._flags | SELECT_READ
12
+ selector :event_mod (s ._fd , s ._flags )
14
13
end
15
14
16
- local function FD_CLR (set , fd )
17
- for i = 1 , # set do
18
- if fd == set [i ] then
19
- set [i ] = set [# set ]
20
- set [# set ] = nil
21
- return
22
- end
15
+ local function fd_clr_read (s )
16
+ if s ._flags & SELECT_READ == 0 then
17
+ return
23
18
end
19
+ s ._flags = s ._flags & (~SELECT_READ )
20
+ selector :event_mod (s ._fd , s ._flags )
24
21
end
25
22
26
- local function fd_set_read (fd )
27
- FD_SET (readfds , fd )
28
- end
29
-
30
- local function fd_clr_read (fd )
31
- FD_CLR (readfds , fd )
32
- end
33
-
34
- local function fd_set_write (fd )
35
- FD_SET (writefds , fd )
23
+ local function fd_set_write (s )
24
+ if s ._flags & SELECT_WRITE ~= 0 then
25
+ return
26
+ end
27
+ s ._flags = s ._flags | SELECT_WRITE
28
+ selector :event_mod (s ._fd , s ._flags )
36
29
end
37
30
38
- local function fd_clr_write (fd )
39
- FD_CLR (writefds , fd )
31
+ local function fd_clr_write (s )
32
+ if s ._flags & SELECT_WRITE == 0 then
33
+ return
34
+ end
35
+ s ._flags = s ._flags & (~SELECT_WRITE )
36
+ selector :event_mod (s ._fd , s ._flags )
40
37
end
41
38
42
39
local function on_event (self , name , ...)
49
46
local function close (self )
50
47
local fd = self ._fd
51
48
on_event (self , " close" )
49
+ selector :event_del (fd )
52
50
fd :close ()
53
- map [fd ] = nil
54
51
end
55
52
56
53
local stream_mt = {}
@@ -69,7 +66,7 @@ function stream:write(data)
69
66
return
70
67
end
71
68
if self ._writebuf == " " then
72
- fd_set_write (self . _fd )
69
+ fd_set_write (self )
73
70
end
74
71
self ._writebuf = self ._writebuf .. data
75
72
end
79
76
function stream :close ()
80
77
if not self .shutdown_r then
81
78
self .shutdown_r = true
82
- fd_clr_read (self . _fd )
79
+ fd_clr_read (self )
83
80
end
84
81
if self .shutdown_w or self ._writebuf == " " then
85
82
self .shutdown_w = true
86
- fd_clr_write (self . _fd )
83
+ fd_clr_write (self )
87
84
close (self )
88
85
end
89
86
end
90
- function stream :update (timeout )
91
- local fd = self ._fd
92
- local r = {fd }
93
- local w = r
94
- if self ._writebuf == " " then
95
- w = nil
96
- end
97
- local rd , wr = socket .select (r , w , timeout or 0 )
98
- if rd then
99
- if # rd > 0 then
100
- self :select_r ()
101
- end
102
- if # wr > 0 then
103
- self :select_w ()
104
- end
105
- end
106
- end
107
87
local function close_write (self )
108
- fd_clr_write (self . _fd )
88
+ fd_clr_write (self )
109
89
if self .shutdown_r then
110
- fd_clr_read (self ._fd )
111
90
close (self )
112
91
end
113
92
end
@@ -133,26 +112,43 @@ function stream:select_w()
133
112
end
134
113
end
135
114
end
115
+ local function update_stream (s , event )
116
+ if event & SELECT_READ ~= 0 then
117
+ s :select_r ()
118
+ end
119
+ if event & SELECT_WRITE ~= 0 then
120
+ s :select_w ()
121
+ end
122
+ end
136
123
137
124
local function accept_stream (fd )
138
- local self = setmetatable ({
125
+ local s = setmetatable ({
139
126
_fd = fd ,
127
+ _flags = SELECT_READ ,
140
128
_event = {},
141
129
_writebuf = " " ,
142
130
shutdown_r = false ,
143
131
shutdown_w = false ,
144
132
}, stream_mt )
145
- map [fd ] = self
146
- fd_set_read (fd )
147
- return self
148
- end
149
- local function connect_stream (self )
150
- setmetatable (self , stream_mt )
151
- fd_set_read (self ._fd )
152
- if self ._writebuf ~= " " then
153
- self :select_w ()
133
+ selector :event_add (fd , SELECT_READ , function (event )
134
+ update_stream (s , event )
135
+ end )
136
+ return s
137
+ end
138
+ local function connect_stream (s )
139
+ setmetatable (s , stream_mt )
140
+ selector :event_del (s ._fd )
141
+ if s ._writebuf ~= " " then
142
+ s ._flags = SELECT_READ | SELECT_WRITE
143
+ selector :event_add (s ._fd , SELECT_READ | SELECT_WRITE , function (event )
144
+ update_stream (s , event )
145
+ end )
146
+ s :select_w ()
154
147
else
155
- fd_clr_write (self ._fd )
148
+ s ._flags = SELECT_READ
149
+ selector :event_add (s ._fd , SELECT_READ , function (event )
150
+ update_stream (s , event )
151
+ end )
156
152
end
157
153
end
158
154
@@ -170,35 +166,32 @@ function listen:is_closed()
170
166
end
171
167
function listen :close ()
172
168
self .shutdown_r = true
173
- fd_clr_read (self ._fd )
174
169
close (self )
175
170
end
176
- function listen :update (timeout )
177
- local fd = self ._fd
178
- local r = {fd }
179
- local rd = socket .select (r , nil , timeout or 0 )
180
- if rd then
181
- if # rd > 0 then
182
- self :select_r ()
183
- end
184
- end
185
- end
186
- function listen :select_r ()
187
- local newfd = self ._fd :accept ()
188
- if newfd :status () then
189
- local news = accept_stream (newfd )
190
- on_event (self , " accept" , news )
191
- end
192
- end
193
171
local function new_listen (fd )
194
172
local s = {
195
173
_fd = fd ,
174
+ _flags = SELECT_READ ,
196
175
_event = {},
197
176
shutdown_r = false ,
198
177
shutdown_w = true ,
199
178
}
200
- map [fd ] = s
201
- fd_set_read (fd )
179
+ selector :event_add (fd , SELECT_READ , function ()
180
+ local newfd , err = fd :accept ()
181
+ if not newfd then
182
+ on_event (s , " error" , err )
183
+ return
184
+ end
185
+ local ok , err = newfd :status ()
186
+ if not ok then
187
+ on_event (s , " error" , err )
188
+ return
189
+ end
190
+ if newfd :status () then
191
+ local news = accept_stream (newfd )
192
+ on_event (s , " accept" , news )
193
+ end
194
+ end )
202
195
return setmetatable (s , listen_mt )
203
196
end
204
197
@@ -221,39 +214,27 @@ function connect:is_closed()
221
214
end
222
215
function connect :close ()
223
216
self .shutdown_w = true
224
- fd_clr_write (self ._fd )
225
217
close (self )
226
218
end
227
- function connect :update (timeout )
228
- local fd = self ._fd
229
- local w = {fd }
230
- local rd , wr = socket .select (nil , w , timeout or 0 )
231
- if rd then
232
- if # wr > 0 then
233
- self :select_w ()
234
- end
235
- end
236
- end
237
- function connect :select_w ()
238
- local ok , err = self ._fd :status ()
239
- if ok then
240
- connect_stream (self )
241
- on_event (self , " connect" )
242
- else
243
- on_event (self , " error" , err )
244
- self :close ()
245
- end
246
- end
247
219
local function new_connect (fd )
248
220
local s = {
249
221
_fd = fd ,
222
+ _flags = SELECT_WRITE ,
250
223
_event = {},
251
224
_writebuf = " " ,
252
225
shutdown_r = false ,
253
226
shutdown_w = false ,
254
227
}
255
- map [fd ] = s
256
- fd_set_write (fd )
228
+ selector :event_add (fd , SELECT_WRITE , function ()
229
+ local ok , err = fd :status ()
230
+ if ok then
231
+ connect_stream (s )
232
+ on_event (s , " connect" )
233
+ else
234
+ on_event (s , " error" , err )
235
+ s :close ()
236
+ end
237
+ end )
257
238
return setmetatable (s , connect_mt )
258
239
end
259
240
@@ -293,18 +274,8 @@ function m.connect(protocol, ...)
293
274
end
294
275
295
276
function m .update (timeout )
296
- local rd , wr = socket .select (readfds , writefds , timeout or 0 )
297
- if rd then
298
- for i = 1 , # rd do
299
- local fd = rd [i ]
300
- local s = map [fd ]
301
- s :select_r ()
302
- end
303
- for i = 1 , # wr do
304
- local fd = wr [i ]
305
- local s = map [fd ]
306
- s :select_w ()
307
- end
277
+ for func , event in selector :wait (timeout or 0 ) do
278
+ func (event )
308
279
end
309
280
end
310
281
0 commit comments