1- // rewrite by Jaylin EMQ X for MQTT usage
2-
1+ //
2+ // Copyright 2021 NanoMQ Team, Inc. <[email protected] > 3+ //
4+ // This software is supplied under the terms of the MIT License, a
5+ // copy of which should be located in the distribution where this
6+ // file was obtained (LICENSE.txt). A copy of the license may also be
7+ // found online at https://opensource.org/licenses/MIT.
8+ //
39#include <conf.h>
410#include <mqtt_db.h>
511#include <poll.h>
@@ -39,10 +45,9 @@ struct tcptran_pipe {
3945 nni_aio * txaio ;
4046 nni_aio * rxaio ;
4147 nni_aio * qsaio ;
42- nni_aio * rsaio ;
4348 nni_aio * rpaio ;
4449 nni_aio * negoaio ;
45- nni_lmq rslmq ;
50+ nni_lmq rslmq ;
4651 nni_msg * rxmsg , * cnmsg ;
4752 nni_mtx mtx ;
4853 conn_param * tcp_cparam ;
@@ -123,7 +128,6 @@ tcptran_pipe_close(void *arg)
123128 nni_aio_close (p -> rxaio );
124129 nni_aio_close (p -> rpaio );
125130 nni_aio_close (p -> txaio );
126- nni_aio_close (p -> rsaio );
127131 nni_aio_close (p -> qsaio );
128132 nni_aio_close (p -> negoaio );
129133
@@ -137,7 +141,6 @@ tcptran_pipe_stop(void *arg)
137141 tcptran_pipe * p = arg ;
138142
139143 nni_aio_stop (p -> qsaio );
140- nni_aio_stop (p -> rsaio );
141144 nni_aio_stop (p -> rpaio );
142145 nni_aio_stop (p -> rxaio );
143146 nni_aio_stop (p -> txaio );
@@ -179,7 +182,6 @@ tcptran_pipe_fini(void *arg)
179182 nng_free (p -> qos_buf , 16 + NNI_NANO_MAX_PACKET_SIZE );
180183 nni_aio_free (p -> qsaio );
181184 nni_aio_free (p -> rpaio );
182- nni_aio_free (p -> rsaio );
183185 nni_aio_free (p -> rxaio );
184186 nni_aio_free (p -> txaio );
185187 nni_aio_free (p -> negoaio );
@@ -214,7 +216,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep)
214216 if (((rv = nni_aio_alloc (& p -> txaio , nmq_tcptran_pipe_send_cb , p )) != 0 ) ||
215217 ((rv = nni_aio_alloc (& p -> qsaio , nmq_tcptran_pipe_qos_send_cb , p )) != 0 ) ||
216218 ((rv = nni_aio_alloc (& p -> rpaio , NULL , p )) != 0 ) ||
217- ((rv = nni_aio_alloc (& p -> rsaio , NULL , p )) != 0 ) ||
218219 ((rv = nni_aio_alloc (& p -> rxaio , tcptran_pipe_recv_cb , p )) != 0 ) ||
219220 ((rv = nni_aio_alloc (& p -> negoaio , tcptran_pipe_nego_cb , p )) !=
220221 0 )) {
@@ -537,14 +538,15 @@ tcptran_pipe_recv_cb(void *arg)
537538 return ;
538539 } else if (len == 0 && n == 2 ) {
539540 if ((p -> rxlen [0 ] & 0XFF ) == CMD_PINGREQ ) {
540- nng_aio_wait (p -> qsaio );
541+ // TODO set timeout in case it never finish
542+ nng_aio_wait (p -> rpaio );
541543 p -> txlen [0 ] = CMD_PINGRESP ;
542544 p -> txlen [1 ] = 0x00 ;
543545 iov .iov_len = 2 ;
544546 iov .iov_buf = & p -> txlen ;
545- // send it down...
547+ // send CMD_PINGRESP down...
546548 nni_aio_set_iov (p -> qsaio , 1 , & iov );
547- nng_stream_send (p -> conn , p -> qsaio );
549+ nng_stream_send (p -> conn , p -> rpaio );
548550 goto notify ;
549551 }
550552 }
@@ -623,7 +625,6 @@ tcptran_pipe_recv_cb(void *arg)
623625 goto recv_error ;
624626 }
625627 }
626- // nng_aio_wait(p->rsaio);
627628 if (qos_pac == 1 ) {
628629 p -> txlen [0 ] = CMD_PUBACK ;
629630 } else if (qos_pac == 2 ) {
@@ -633,34 +634,17 @@ tcptran_pipe_recv_cb(void *arg)
633634 pid = nni_msg_get_pub_pid (msg );
634635 NNI_PUT16 (p -> txlen + 2 , pid );
635636 ack = true;
636- // iov.iov_len = 4;
637- // iov.iov_buf = &p->txlen;
638- // // send it down...
639- // nni_aio_set_iov(p->rsaio, 1, &iov);
640- // nng_stream_send(p->conn, p->rsaio);
641637 }
642638 } else if (type == CMD_PUBREC ) {
643- // nng_aio_wait(p->rpaio);
644639 p -> txlen [0 ] = 0X62 ;
645640 p -> txlen [1 ] = 0x02 ;
646641 memcpy (p -> txlen + 2 , nni_msg_body (msg ), 2 );
647642 ack = true;
648- // iov.iov_len = 4;
649- // iov.iov_buf = &p->txlen;
650- // // send it down...
651- // nni_aio_set_iov(p->rpaio, 1, &iov);
652- // nng_stream_send(p->conn, p->rpaio);
653643 } else if (type == CMD_PUBREL ) {
654- // nng_aio_wait(p->qsaio);
655644 p -> txlen [0 ] = CMD_PUBCOMP ;
656645 p -> txlen [1 ] = 0x02 ;
657646 memcpy (p -> txlen + 2 , nni_msg_body (msg ), 2 );
658647 ack = true;
659- // iov.iov_len = 4;
660- // iov.iov_buf = &p->txlen;
661- // // send it down...
662- // nni_aio_set_iov(p->qsaio, 1, &iov);
663- // nng_stream_send(p->conn, p->qsaio);
664648 } else if (type == CMD_PUBACK || type == CMD_PUBCOMP ) {
665649 // MQTT V5 flow control
666650 if (p -> tcp_cparam -> pro_ver == 5 ) {
0 commit comments