@@ -170,7 +170,7 @@ import Data.List.NonEmpty (NonEmpty (..), (<|))
170
170
import qualified Data.List.NonEmpty as L
171
171
import Data.Map.Strict (Map )
172
172
import qualified Data.Map.Strict as M
173
- import Data.Maybe (isJust , isNothing , listToMaybe )
173
+ import Data.Maybe (catMaybes , isJust , isNothing , listToMaybe )
174
174
import Data.Set (Set )
175
175
import qualified Data.Set as S
176
176
import Data.Text (Text )
@@ -678,15 +678,12 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
678
678
serverDown :: ([RcvQueue ], [ConnId ]) -> IO ()
679
679
serverDown (qs, conns) = whenM (readTVarIO active) $ do
680
680
incClientStat' c userId client " DISCONNECT" " "
681
- notifySub " " $ hostEvent' DISCONNECT client
682
- unless (null conns) $ notifySub " " $ DOWN srv conns
681
+ notifySub c " " $ hostEvent' DISCONNECT client
682
+ unless (null conns) $ notifySub c " " $ DOWN srv conns
683
683
unless (null qs) $ do
684
684
atomically $ mapM_ (releaseGetLock c) qs
685
685
runReaderT (resubscribeSMPSession c tSess) env
686
686
687
- notifySub :: forall e . AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
688
- notifySub connId cmd = atomically $ writeTBQueue (subQ c) (" " , connId, APC (sAEntity @ e ) cmd)
689
-
690
687
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
691
688
resubscribeSMPSession c@ AgentClient {smpSubWorkers, workerSeq} tSess =
692
689
atomically getWorkerVar >>= mapM_ (either newSubWorker (\ _ -> pure () ))
@@ -721,21 +718,22 @@ reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
721
718
(rs, sessId_) <- subscribeQueues c $ L. toList qs
722
719
let (errs, okConns) = partitionEithers $ map (\ (RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
723
720
conns = filter (`M.notMember` cs) okConns
724
- unless (null conns) $ notifySub " " $ UP srv conns
721
+ unless (null conns) $ notifySub c " " $ UP srv conns
725
722
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd ) errs
726
- mapM_ (\ (connId, e) -> notifySub connId $ ERR e) finalErrs
723
+ mapM_ (\ (connId, e) -> notifySub c connId $ ERR e) finalErrs
727
724
forM_ (listToMaybe tempErrs) $ \ (connId, e) -> do
728
725
when (null okConns && M. null cs && null finalErrs) . liftIO $
729
726
forM_ sessId_ $ \ sessId -> do
730
727
-- We only close the client session that was used to subscribe.
731
728
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM. lookupDelete tSess $ smpClients c) (pure Nothing )
732
729
mapM_ (closeClient_ c) v_
733
- notifySub connId $ ERR e
730
+ notifySub c connId $ ERR e
734
731
where
735
732
handleNotify :: AM' () -> AM' ()
736
- handleNotify = E. handleAny $ notifySub " " . ERR . INTERNAL . show
737
- notifySub :: forall e . AEntityI e => ConnId -> ACommand 'Agent e -> AM' ()
738
- notifySub connId cmd = atomically $ writeTBQueue (subQ c) (" " , connId, APC (sAEntity @ e ) cmd)
733
+ handleNotify = E. handleAny $ notifySub c " " . ERR . INTERNAL . show
734
+
735
+ notifySub :: forall e m . (AEntityI e , MonadIO m ) => AgentClient -> ConnId -> ACommand 'Agent e -> m ()
736
+ notifySub c connId cmd = atomically $ writeTBQueue (subQ c) (" " , connId, APC (sAEntity @ e ) cmd)
739
737
740
738
getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient
741
739
getNtfServerClient c@ AgentClient {active, ntfClients, workerSeq} tSess@ (userId, srv, _) = do
@@ -1274,14 +1272,10 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do
1274
1272
qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey
1275
1273
pure (rq, qUri, tSess, sessId)
1276
1274
1277
- processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM ()
1275
+ processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM (Maybe ConnId )
1278
1276
processSubResult c rq@ RcvQueue {connId} = \ case
1279
- Left e ->
1280
- unless (temporaryClientError e) $
1281
- failSubscription c rq e
1282
- Right () ->
1283
- whenM (hasPendingSubscription c connId) $
1284
- addSubscription c rq
1277
+ Left e -> Nothing <$ unless (temporaryClientError e) (failSubscription c rq e)
1278
+ Right () -> ifM (hasPendingSubscription c connId) (Just connId <$ addSubscription c rq) (pure Nothing )
1285
1279
1286
1280
temporaryAgentError :: AgentErrorType -> Bool
1287
1281
temporaryAgentError = \ case
@@ -1331,23 +1325,23 @@ subscribeQueues c qs = do
1331
1325
subscribeQueues_ :: Env -> TVar (Maybe SessionId ) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError () )
1332
1326
subscribeQueues_ env session smp qs' = do
1333
1327
rs <- sendBatch subscribeSMPQueues smp qs'
1334
- active <-
1328
+ ( active, up) <-
1335
1329
atomically $
1336
1330
ifM
1337
1331
(activeClientSession c tSess sessId)
1338
- (writeTVar session (Just sessId) >> processSubResults rs $> True )
1339
- (pure False )
1332
+ (writeTVar session (Just sessId) >> (( True ,) < $> processSubResults rs) )
1333
+ (pure ( False , [] ) )
1340
1334
if active
1341
- then when ( hasTempErrors rs) resubscribe $> rs
1335
+ then rs <$ if hasTempErrors rs then resubscribe else unless ( null up) (notifySub c " " $ UP srv up)
1342
1336
else do
1343
1337
logWarn " subcription batch result for replaced SMP client, resubscribing"
1344
1338
resubscribe $> L. map (second $ \ _ -> Left PCENetworkError ) rs
1345
1339
where
1346
- tSess = transportSession' smp
1340
+ tSess@ (_, srv, _) = transportSession' smp
1347
1341
sessId = sessionId $ thParams smp
1348
1342
hasTempErrors = any (either temporaryClientError (const False ) . snd )
1349
- processSubResults :: NonEmpty (RcvQueue , Either SMPClientError () ) -> STM ()
1350
- processSubResults = mapM_ $ uncurry $ processSubResult c
1343
+ processSubResults :: NonEmpty (RcvQueue , Either SMPClientError () ) -> STM [ ConnId ]
1344
+ processSubResults = fmap catMaybes . mapM ( uncurry $ processSubResult c) . L. toList
1351
1345
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
1352
1346
1353
1347
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
0 commit comments