@@ -2705,6 +2705,21 @@ _dummyTokenHandler(void *closure)
27052705 return "token";
27062706}
27072707
2708+ static natsStatus
2709+ _dummyProxyConnHandler(natsSock *fd, char *host, int port, void *closure)
2710+ {
2711+ struct threadArg *args = (struct threadArg*) closure;
2712+
2713+ if (closure != NULL)
2714+ {
2715+ natsMutex_Lock(args->m);
2716+ args->sum++;
2717+ natsMutex_Unlock(args->m);
2718+ }
2719+
2720+ return NATS_ERR;
2721+ }
2722+
27082723static void
27092724_dummyErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err,
27102725 void *closure)
@@ -3000,6 +3015,16 @@ void test_natsOptions(void)
30003015 s = natsOptions_SetMaxPendingBytes(opts, 1000000);
30013016 testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000))
30023017
3018+ test("Set Proxy Connection Handler: ")
3019+ s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler, (void*) 1);
3020+ testCond((s == NATS_OK) && (opts->proxyConnectCb == _dummyProxyConnHandler)
3021+ && (opts->proxyConnectClosure == (void*) 1));
3022+
3023+ test("Remove Proxy Connection Handler: ")
3024+ s = natsOptions_SetProxyConnHandler(opts, NULL, NULL);
3025+ testCond((s == NATS_OK) && (opts->proxyConnectCb == NULL)
3026+ && (opts->proxyConnectClosure == NULL));
3027+
30033028 test("Set Error Handler: ");
30043029 s = natsOptions_SetErrorHandler(opts, _dummyErrHandler, NULL);
30053030 testCond((s == NATS_OK) && (opts->asyncErrCb == _dummyErrHandler));
@@ -5952,6 +5977,88 @@ void test_ReconnectServerStats(void)
59525977 _destroyDefaultThreadArgs(&args);
59535978}
59545979
5980+
5981+ static natsStatus
5982+ _proxyConnectCb(natsSock *fd, char *host, int port, void *closure)
5983+ {
5984+ struct threadArg *arg = (struct threadArg*) closure;
5985+ natsStatus s = NATS_OK;
5986+ natsSockCtx ctx;
5987+
5988+ natsMutex_Lock(arg->m);
5989+ arg->sum++;
5990+ natsMutex_Unlock(arg->m);
5991+
5992+ s = natsSock_Init(&ctx);
5993+ IFOK(s, natsSock_ConnectTcp(&ctx, host, port))
5994+
5995+ if (s == NATS_OK)
5996+ *fd = ctx.fd;
5997+
5998+ return s;
5999+ }
6000+
6001+ void test_ProxyConnectCb(void)
6002+ {
6003+ natsStatus s;
6004+ natsConnection *nc = NULL;
6005+ natsOptions *opts = NULL;
6006+ natsPid serverPid = NATS_INVALID_PID;
6007+ const char *servers[] = { "nats://127.0.0.1:4222", "nats://127.0.0.1:4223" };
6008+ struct threadArg args;
6009+
6010+ s = _createDefaultThreadArgsForCbTests(&args);
6011+ if (s != NATS_OK)
6012+ FAIL("Unable to setup test")
6013+
6014+ serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
6015+ CHECK_SERVER_STARTED(serverPid)
6016+
6017+ s = natsOptions_Create(&opts);
6018+ IFOK(s, natsOptions_SetServers(opts, servers, 2));
6019+ IFOK(s, natsOptions_SetNoRandomize(opts, true));
6020+ if (s != NATS_OK)
6021+ FAIL("Unable to create options")
6022+
6023+ test("Set connectCb that returns failure: ");
6024+ s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler, (void*) &args);
6025+ testCond(s == NATS_OK);
6026+
6027+ test("Connect with connectCb returning failure: ");
6028+ s = natsConnection_Connect(&nc, opts);
6029+ testCond(s == NATS_NO_SERVER);
6030+ nats_clearLastError();
6031+
6032+ test("Check invoked properly: ");
6033+ natsMutex_Lock(args.m);
6034+ s = (args.sum == 2 ? NATS_OK : NATS_ERR);
6035+ args.sum = 0;
6036+ natsMutex_Unlock(args.m);
6037+ testCond(s == NATS_OK);
6038+
6039+ test("Set connectCb that returns success: ");
6040+ s = natsOptions_SetProxyConnHandler(opts, _proxyConnectCb, (void*) &args);
6041+ testCond(s == NATS_OK);
6042+
6043+ test("Connect with connectCb returning success: ");
6044+ s = natsConnection_Connect(&nc, opts);
6045+ testCond(s == NATS_OK);
6046+
6047+ test("Check invoked properly: ");
6048+ natsMutex_Lock(args.m);
6049+ s = (args.sum == 1 ? NATS_OK : NATS_ERR);
6050+ args.sum = 0;
6051+ natsMutex_Unlock(args.m);
6052+ testCond(s == NATS_OK);
6053+
6054+ natsConnection_Destroy(nc);
6055+ natsOptions_Destroy(opts);
6056+
6057+ _stopServer(serverPid);
6058+
6059+ _destroyDefaultThreadArgs(&args);
6060+ }
6061+
59556062static void
59566063_disconnectedCb(natsConnection *nc, void *closure)
59576064{
0 commit comments