@@ -165,6 +165,7 @@ function startPublisher() {
165165 if ( closeOnErr ( err ) ) return ;
166166 ch . on ( "error" , function ( err ) {
167167 logger . error ( "[AMQP] channel error" , err ) ;
168+ process . exit ( 0 ) ;
168169 } ) ;
169170 ch . on ( "close" , function ( ) {
170171 logger . debug ( "[AMQP] channel closed" ) ;
@@ -216,6 +217,7 @@ function startWorker() {
216217 if ( closeOnErr ( err ) ) return ;
217218 ch . on ( "error" , function ( err ) {
218219 logger . error ( "[AMQP] channel error" , err ) ;
220+ process . exit ( 0 ) ;
219221 } ) ;
220222 ch . on ( "close" , function ( ) {
221223 logger . debug ( "[AMQP] channel closed" ) ;
@@ -407,11 +409,21 @@ function process_outgoing(topic, message_string, callback) {
407409 } ) ;
408410 }
409411 else {
410- logger . debug ( "Group message." ) ;
411412 const group_id = recipient_id
413+ if ( outgoing_message . group ) {
414+ logger . debug ( "Inline Group message." , outgoing_message ) ;
415+ let inline_group = outgoing_message . group ;
416+ inline_group . uid = group_id ;
417+ inline_group . members [ group_id ] = 1
418+ inline_group . members [ sender_id ] = 1
419+ console . log ( "...inline_group:" , inline_group ) ;
420+ sendMessageToGroupMembers ( outgoing_message , inline_group , app_id , ( ack ) => {
421+ callback ( ack ) ;
422+ } ) ;
423+ return ;
424+ }
412425 // chatdb.getGroup(group_id, function(err, group) { // REDIS?
413426 getGroup ( group_id , function ( err , group ) {
414- // logger.debug("group found!", group)
415427 if ( ! group ) { // created only to temporary store group-messages in group-timeline
416428 // TODO: 1. create group (on-the-fly), 2. remove this code, 3. continue as if the group exists.
417429 logger . debug ( "group doesn't exist! Sending anyway to group timeline..." ) ;
@@ -421,65 +433,67 @@ function process_outgoing(topic, message_string, callback) {
421433 members : {
422434 }
423435 }
424- group . members [ me ] = 1
436+ group . members [ sender_id ] = 1
425437 }
426- // if (!group.members[me]) {
427- // logger.debug(me + " can't write to this group")
428- // callback(true)
429- // return
430- // }
431438
432439 // Adding the group as a "volatile" member, so we easily get a copy of
433440 // all the group messages in timelineOf: group.uid
434441 group . members [ group . uid ] = 1
435- // logger.debug("Writing to group:", group)
436- let count = 0 ;
437- logger . debug ( "group members" , group . members ) ;
438- let max = Object . keys ( group . members ) . length ;
439- let error_encoutered = false ;
440- for ( let [ member_id , value ] of Object . entries ( group . members ) ) {
441- const inbox_of = member_id ;
442- const convers_with = recipient_id ;
443- logger . debug ( "inbox_of: " + inbox_of ) ;
444- logger . debug ( "convers_with: " + convers_with ) ;
445- logger . debug ( "sending group outgoing message to member" , member_id ) ;
446- // if (inbox_of === outgoing_message.sender) {
447- if ( inbox_of === group . uid ) { // choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
448- logger . debug ( "inbox_of === outgoing_message.sender. status=SENT system YES?" , inbox_of ) ;
449- outgoing_message . status = MessageConstants . CHAT_MESSAGE_STATUS_CODE . SENT ;
450- }
451- else if ( outgoing_message . attributes && outgoing_message . attributes . hiddenFor && outgoing_message . attributes . hiddenFor === inbox_of ) {
452- logger . debug ( 'sendGroupMessageToMembersTimeline skip message for ' + outgoing_message . attributes . hiddenFor ) ;
453- break ;
442+ sendMessageToGroupMembers ( outgoing_message , group , app_id , ( ack ) => {
443+ callback ( ack ) ;
444+ } ) ;
445+ } )
446+ }
447+ }
448+
449+ function sendMessageToGroupMembers ( outgoing_message , group , app_id , callback ) {
450+ // logger.debug("Writing to group:", group)
451+ let count = 0 ;
452+ logger . debug ( "group members" , group . members ) ;
453+ let max = Object . keys ( group . members ) . length ;
454+ let error_encoutered = false ;
455+ for ( let [ member_id , value ] of Object . entries ( group . members ) ) {
456+ const inbox_of = member_id ;
457+ const convers_with = group . uid ;
458+ logger . debug ( "inbox_of: " + inbox_of ) ;
459+ logger . debug ( "convers_with: " + convers_with ) ;
460+ logger . debug ( "sending group outgoing message to member" , member_id ) ;
461+ // if (inbox_of === outgoing_message.sender) {
462+ if ( inbox_of === group . uid ) { // choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
463+ logger . debug ( "inbox_of === outgoing_message.sender. status=SENT system YES?" , inbox_of ) ;
464+ outgoing_message . status = MessageConstants . CHAT_MESSAGE_STATUS_CODE . SENT ;
465+ }
466+ else if ( outgoing_message . attributes && outgoing_message . attributes . hiddenFor && outgoing_message . attributes . hiddenFor === inbox_of ) {
467+ logger . debug ( 'sendGroupMessageToMembersTimeline skip message for ' + outgoing_message . attributes . hiddenFor ) ;
468+ break ;
469+ }
470+ else {
471+ logger . debug ( "inbox_of != outgoing_message.sender. status=DELIVERED no system, is:" , inbox_of ) ;
472+ outgoing_message . status = MessageConstants . CHAT_MESSAGE_STATUS_CODE . DELIVERED ;
473+ }
474+ console . log ( "delivering group message with status..." , outgoing_message . status , " to:" , inbox_of ) ;
475+ deliverMessage ( outgoing_message , app_id , inbox_of , convers_with , function ( ok ) {
476+ logger . debug ( "GROUP MESSAGE DELIVERED?" , ok )
477+ count ++ ;
478+ logger . debug ( "Sent Counting:" , count ) ;
479+ logger . debug ( "Max:" , max ) ;
480+ if ( ! ok ) {
481+ logger . debug ( "Error sending message to group " + group . uid ) ;
482+ error_encoutered = true
483+ }
484+ if ( count == max ) {
485+ if ( error_encoutered ) {
486+ logger . error ( "ERROR SENDING MESSAGE TO GROUP!" ) ;
487+ //callback(false)
454488 }
455489 else {
456- logger . debug ( "inbox_of != outgoing_message.sender. status=DELIVERED no system, is:" , inbox_of ) ;
457- outgoing_message . status = MessageConstants . CHAT_MESSAGE_STATUS_CODE . DELIVERED ;
490+ logger . log ( "ALL OK! MESSAGE SENT TO GROUP! ACK!" ) ;
491+ //callback(true) ;
458492 }
459- logger . debug ( "delivering group message with status..." , outgoing_message . status , " to:" , inbox_of ) ;
460- deliverMessage ( outgoing_message , app_id , inbox_of , convers_with , function ( ok ) {
461- logger . debug ( "GROUP MESSAGE DELIVERED?" , ok )
462- count ++ ;
463- logger . debug ( "Sent Counting:" , count ) ;
464- logger . debug ( "Max:" , max ) ;
465- if ( ! ok ) {
466- logger . debug ( "Error sending message to group " + group . uid ) ;
467- error_encoutered = true
468- }
469- if ( count == max ) {
470- if ( error_encoutered ) {
471- logger . error ( "ERROR SENDING MESSAGE TO GROUP!" ) ;
472- callback ( false )
473- }
474- else {
475- logger . log ( "ALL OK! MESSAGE SENT TO GROUP! ACK!" ) ;
476- callback ( true ) ;
477- }
478- }
479- } )
480- } // end for
481- } ) // end getGroup
482- }
493+ }
494+ } )
495+ } // end for
496+ callback ( true ) ;
483497}
484498
485499let groups = { } ;
@@ -755,6 +769,7 @@ function process_update(topic, message_string, callback) {
755769 // }
756770
757771 // TODO: MOVE TO A PERSIST_UPDATED TOPIC/QUEUE...
772+ // TODO, BETTER: USE _WEBHOOK WITH MESSAGE-STATUS-UPDATED TO SAVE THE MESSAGE
758773 logger . debug ( ">>> ON DISK... WITH A STATUS ON MY MESSAGE-UPDATE TOPIC" , topic , "WITH PATCH" , my_message_patch )
759774 chatdb . saveOrUpdateMessage ( my_message_patch , function ( err , msg ) {
760775 // logger.debug(">>> MESSAGE ON TOPIC", topic, "UPDATED!")
@@ -795,6 +810,7 @@ function process_update(topic, message_string, callback) {
795810 patch . conversWith = convers_with
796811 logger . debug ( ">>> ON DISK... CONVERSATION TOPIC " + topic + " WITH PATCH " + patch )
797812 logger . debug ( "Updating conversation 2." )
813+ // BETTER: ACK, THEN WEBHOOK CONVERSATION-SAVE
798814 chatdb . saveOrUpdateConversation ( patch , function ( err , doc ) {
799815 logger . debug ( ">>> CONVERSATION ON TOPIC:" , topic , "UPDATED?" )
800816 if ( err ) {
@@ -845,6 +861,7 @@ function process_archive(topic, payload, callback) {
845861 }
846862 logger . debug ( "NOTIFY VIA WEBHOOK ON SAVE TOPIC " + topic )
847863 if ( webhook_enabled ) {
864+ // BETTER: WEBHOOK CONVERSATION-SAVE WITH archived=true
848865 webhooks . WHnotifyConversationArchived ( conversation_archive_patch , topic , ( err ) => {
849866 if ( err ) {
850867 logger . error ( "Webhook notified with err:" + err )
0 commit comments