@@ -43,13 +43,26 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
43
43
writeChannel := make (chan WSResponse )
44
44
done := make (chan interface {})
45
45
46
+ sendWSResponse := func (r WSResponse ) {
47
+ select {
48
+ case writeChannel <- r :
49
+ case <- done :
50
+ }
51
+ }
52
+
46
53
// Read goroutine
47
54
go func () {
55
+ defer close (writeChannel )
48
56
for {
57
+ select {
58
+ case <- done :
59
+ return
60
+ default :
61
+ }
62
+
49
63
mType , message , err := wsc .conn .readMessage ()
50
64
if err != nil {
51
65
LogInfo ("Error while reading objectManager message" , err )
52
- close (done )
53
66
return
54
67
}
55
68
@@ -60,8 +73,6 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
60
73
err := json .Unmarshal (message , & messageRequest )
61
74
if err != nil {
62
75
LogInfo ("Error on message request unmarshal" )
63
-
64
- close (done )
65
76
return
66
77
}
67
78
@@ -74,7 +85,6 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
74
85
const itemsPerBatch = 1000
75
86
switch messageRequest .Mode {
76
87
case "close" :
77
- close (done )
78
88
return
79
89
case "cancel" :
80
90
// if we have that request id, cancel it
@@ -97,12 +107,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
97
107
if err != nil {
98
108
LogInfo (fmt .Sprintf ("Error during Objects OptionsParse %s" , err .Error ()))
99
109
100
- writeChannel <- WSResponse {
110
+ sendWSResponse ( WSResponse {
101
111
RequestID : messageRequest .RequestID ,
102
112
Error : ErrorWithContext (ctx , err ),
103
113
Prefix : messageRequest .Prefix ,
104
114
BucketName : messageRequest .BucketName ,
105
- }
115
+ })
106
116
107
117
return
108
118
}
@@ -112,12 +122,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
112
122
return
113
123
}
114
124
if lsObj .Err != nil {
115
- writeChannel <- WSResponse {
125
+ sendWSResponse ( WSResponse {
116
126
RequestID : messageRequest .RequestID ,
117
127
Error : ErrorWithContext (ctx , lsObj .Err ),
118
128
Prefix : messageRequest .Prefix ,
119
129
BucketName : messageRequest .BucketName ,
120
- }
130
+ })
121
131
122
132
continue
123
133
}
@@ -132,24 +142,24 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
132
142
buffer = append (buffer , objItem )
133
143
134
144
if len (buffer ) >= itemsPerBatch {
135
- writeChannel <- WSResponse {
145
+ sendWSResponse ( WSResponse {
136
146
RequestID : messageRequest .RequestID ,
137
147
Data : buffer ,
138
- }
148
+ })
139
149
buffer = nil
140
150
}
141
151
}
142
152
if len (buffer ) > 0 {
143
- writeChannel <- WSResponse {
153
+ sendWSResponse ( WSResponse {
144
154
RequestID : messageRequest .RequestID ,
145
155
Data : buffer ,
146
- }
156
+ })
147
157
}
148
158
149
- writeChannel <- WSResponse {
159
+ sendWSResponse ( WSResponse {
150
160
RequestID : messageRequest .RequestID ,
151
161
RequestEnd : true ,
152
- }
162
+ })
153
163
154
164
// remove the cancellation context
155
165
delete (cancelContexts , messageRequest .RequestID )
@@ -168,12 +178,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
168
178
objectRqConfigs , err := getObjectsOptionsFromReq (messageRequest )
169
179
if err != nil {
170
180
LogInfo (fmt .Sprintf ("Error during Objects OptionsParse %s" , err .Error ()))
171
- writeChannel <- WSResponse {
181
+ sendWSResponse ( WSResponse {
172
182
RequestID : messageRequest .RequestID ,
173
183
Error : ErrorWithContext (ctx , err ),
174
184
Prefix : messageRequest .Prefix ,
175
185
BucketName : messageRequest .BucketName ,
176
- }
186
+ })
177
187
178
188
return
179
189
}
@@ -182,12 +192,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
182
192
183
193
s3Client , err := newS3BucketClient (session , objectRqConfigs .BucketName , objectRqConfigs .Prefix , clientIP )
184
194
if err != nil {
185
- writeChannel <- WSResponse {
195
+ sendWSResponse ( WSResponse {
186
196
RequestID : messageRequest .RequestID ,
187
197
Error : ErrorWithContext (ctx , err ),
188
198
Prefix : messageRequest .Prefix ,
189
199
BucketName : messageRequest .BucketName ,
190
- }
200
+ })
191
201
192
202
cancel ()
193
203
return
@@ -199,12 +209,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
199
209
200
210
for lsObj := range startRewindListing (ctx , mcS3C , objectRqConfigs ) {
201
211
if lsObj .Err != nil {
202
- writeChannel <- WSResponse {
212
+ sendWSResponse ( WSResponse {
203
213
RequestID : messageRequest .RequestID ,
204
214
Error : ErrorWithContext (ctx , lsObj .Err .ToGoError ()),
205
215
Prefix : messageRequest .Prefix ,
206
216
BucketName : messageRequest .BucketName ,
207
- }
217
+ })
208
218
209
219
continue
210
220
}
@@ -222,25 +232,25 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
222
232
buffer = append (buffer , objItem )
223
233
224
234
if len (buffer ) >= itemsPerBatch {
225
- writeChannel <- WSResponse {
235
+ sendWSResponse ( WSResponse {
226
236
RequestID : messageRequest .RequestID ,
227
237
Data : buffer ,
228
- }
238
+ })
229
239
buffer = nil
230
240
}
231
241
232
242
}
233
243
if len (buffer ) > 0 {
234
- writeChannel <- WSResponse {
244
+ sendWSResponse ( WSResponse {
235
245
RequestID : messageRequest .RequestID ,
236
246
Data : buffer ,
237
- }
247
+ })
238
248
}
239
249
240
- writeChannel <- WSResponse {
250
+ sendWSResponse ( WSResponse {
241
251
RequestID : messageRequest .RequestID ,
242
252
RequestEnd : true ,
243
- }
253
+ })
244
254
245
255
// remove the cancellation context
246
256
delete (cancelContexts , messageRequest .RequestID )
@@ -250,27 +260,19 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
250
260
}
251
261
}()
252
262
253
- // Write goroutine
254
- go func () {
255
- for {
256
- select {
257
- case <- done :
258
- return
259
- case writeM := <- writeChannel :
260
- jsonData , err := json .Marshal (writeM )
261
- if err != nil {
262
- LogInfo ("Error while marshaling the response" , err )
263
- return
264
- }
263
+ defer close (done )
265
264
266
- err = wsc .conn .writeMessage (websocket .TextMessage , jsonData )
267
- if err != nil {
268
- LogInfo ("Error while writing the message" , err )
269
- return
270
- }
271
- }
265
+ for writeM := range writeChannel {
266
+ jsonData , err := json .Marshal (writeM )
267
+ if err != nil {
268
+ LogInfo ("Error while marshaling the response" , err )
269
+ return
272
270
}
273
- }()
274
271
275
- <- done
272
+ err = wsc .conn .writeMessage (websocket .TextMessage , jsonData )
273
+ if err != nil {
274
+ LogInfo ("Error while writing the message" , err )
275
+ return
276
+ }
277
+ }
276
278
}
0 commit comments