Skip to content

Commit fe1a884

Browse files
authored
Merge pull request #918 from nats-io/fix_event_loop_attach_detach
[FIXED] EventLoop: Handling of possible failure on initial attach
2 parents de0ec66 + 655c8ea commit fe1a884

File tree

4 files changed

+72
-32
lines changed

4 files changed

+72
-32
lines changed

src/adapters/libevent.h

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,32 @@ keepAliveCb(evutil_socket_t fd, short flags, void * arg)
8383
// do nothing...
8484
}
8585

86+
static void
87+
natsLibeventEvents_free(natsLibeventEvents *nle, bool processDetachedEvents)
88+
{
89+
if (nle->read != NULL)
90+
event_free(nle->read);
91+
if (nle->write != NULL)
92+
event_free(nle->write);
93+
if (nle->keepActive != NULL)
94+
{
95+
event_active(nle->keepActive, 0, 0);
96+
event_free(nle->keepActive);
97+
}
98+
if (processDetachedEvents)
99+
natsConnection_ProcessDetachedEvent(nle->nc);
100+
free(nle);
101+
}
102+
103+
// This callback is invoked from the event loop thread and will free the
104+
// `natsLibeventEvents` object.
105+
static void
106+
_freeCb(evutil_socket_t ignoredSocket, short ignoredEvent, void *arg)
107+
{
108+
natsLibeventEvents *nle = (natsLibeventEvents*) arg;
109+
natsLibeventEvents_free(nle, true);
110+
}
111+
86112
/** \brief Attach a connection to the given event loop.
87113
*
88114
* This callback is invoked after `NATS` library has connected, or reconnected.
@@ -101,6 +127,7 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so
101127
struct event_base *libeventLoop = (struct event_base*) loop;
102128
natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
103129
natsStatus s = NATS_OK;
130+
bool created = false;
104131

105132
// This is the first attach (when reconnecting, nle will be non-NULL).
106133
if (nle == NULL)
@@ -109,6 +136,8 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so
109136
if (nle == NULL)
110137
return NATS_NO_MEMORY;
111138

139+
// Indicate that we have created the object here (in case we get a failure).
140+
created = true;
112141
nle->nc = nc;
113142
nle->loop = libeventLoop;
114143

@@ -153,8 +182,8 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so
153182

154183
if (s == NATS_OK)
155184
*userData = (void*) nle;
156-
else
157-
natsLibevent_Detach((void*) nle);
185+
else if (created)
186+
natsLibeventEvents_free(nle, false);
158187

159188
return s;
160189
}
@@ -224,27 +253,6 @@ natsLibevent_Write(void *userData, bool add)
224253
return (res == 0 ? NATS_OK : NATS_ERR);
225254
}
226255

227-
// This callback is invoked from the event loop thread and will free the
228-
// `natsLibeventEvents` object.
229-
static void
230-
_freeCb(evutil_socket_t ignoredSocket, short ignoredEvent, void *arg)
231-
{
232-
natsLibeventEvents *nle = (natsLibeventEvents*) arg;
233-
234-
if (nle->read != NULL)
235-
event_free(nle->read);
236-
if (nle->write != NULL)
237-
event_free(nle->write);
238-
if (nle->keepActive != NULL)
239-
{
240-
event_active(nle->keepActive, 0, 0);
241-
event_free(nle->keepActive);
242-
}
243-
// This will release the connection that is retained by the library on the first attach.
244-
natsConnection_Destroy(nle->nc);
245-
free(nle);
246-
}
247-
248256
/** \brief The connection is closed, it can be safely detached.
249257
*
250258
* When a connection is closed (not disconnected, pending a reconnect), this

src/adapters/libuv.h

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,24 +237,33 @@ uvAsyncAttach(natsLibuvEvents *nle)
237237
}
238238

239239
static void
240-
uvFinalCloseCb(uv_handle_t* handle)
240+
natsLibuvEvents_free(natsLibuvEvents *nle, bool processDetachedEvent)
241241
{
242-
natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
243-
natsLibuvEvent *event;
242+
natsLibuvEvent *event;
244243

245244
while ((event = nle->head) != NULL)
246245
{
247246
nle->head = event->next;
248247
free(event);
249248
}
250249
free(nle->scheduler);
251-
uv_mutex_destroy(nle->lock);
252-
free(nle->lock);
253-
// This will release the connection that is retained by the library on the first attach.
254-
natsConnection_Destroy(nle->nc);
250+
if (nle->lock != NULL)
251+
{
252+
uv_mutex_destroy(nle->lock);
253+
free(nle->lock);
254+
}
255+
if (processDetachedEvent)
256+
natsConnection_ProcessDetachedEvent(nle->nc);
255257
free(nle);
256258
}
257259

260+
static void
261+
uvFinalCloseCb(uv_handle_t* handle)
262+
{
263+
natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
264+
natsLibuvEvents_free(nle, true);
265+
}
266+
258267
static void
259268
uvAsyncDetach(natsLibuvEvents *nle)
260269
{
@@ -348,6 +357,7 @@ natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socke
348357
bool sched = false;
349358
natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
350359
natsStatus s = NATS_OK;
360+
bool created = false;
351361

352362
sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? true : false);
353363

@@ -362,6 +372,9 @@ natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socke
362372
if (nle == NULL)
363373
return NATS_NO_MEMORY;
364374

375+
// Indicate that we have created the object here (in case we get a failure).
376+
created = true;
377+
365378
nle->lock = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
366379
if (nle->lock == NULL)
367380
s = NATS_NO_MEMORY;
@@ -402,8 +415,8 @@ natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socke
402415

403416
if (s == NATS_OK)
404417
*userData = (void*) nle;
405-
else
406-
natsLibuv_Detach((void*) nle);
418+
else if (created)
419+
natsLibuvEvents_free(nle, false);
407420

408421
return s;
409422
}

src/conn.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4367,6 +4367,12 @@ natsConnection_ProcessCloseEvent(natsSock *socket)
43674367
*socket = NATS_SOCK_INVALID;
43684368
}
43694369

4370+
void
4371+
natsConnection_ProcessDetachedEvent(natsConnection *nc)
4372+
{
4373+
natsConn_release(nc);
4374+
}
4375+
43704376
natsStatus
43714377
natsConnection_GetClientID(natsConnection *nc, uint64_t *cid)
43724378
{

src/nats.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5085,6 +5085,19 @@ natsConnection_ProcessCloseEvent(natsSock *socket);
50855085
NATS_EXTERN void
50865086
natsConnection_ProcessWriteEvent(natsConnection *nc);
50875087

5088+
/** \brief Process a detach event when using external event loop.
5089+
*
5090+
* When a connection is closed, the library will invoke the adapter's
5091+
* #natsEvLoop_Detach callback. But the code in the adapter may run
5092+
* asynchronously. The adapter will invoke this function when the
5093+
* adapter has fully detached the NATS connection from the event loop,
5094+
* so that resources held by the library for this connection can be released.
5095+
*
5096+
* @param nc the pointer to the #natsConnection object.
5097+
*/
5098+
NATS_EXTERN void
5099+
natsConnection_ProcessDetachedEvent(natsConnection *nc);
5100+
50885101
/** \brief Connects to a `NATS Server` using any of the URL from the given list.
50895102
*
50905103
* Attempts to connect to a `NATS Server`.

0 commit comments

Comments
 (0)