@@ -1325,23 +1325,26 @@ subscribeQueues c qs = do
1325
1325
subscribeQueues_ :: Env -> TVar (Maybe SessionId ) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError () )
1326
1326
subscribeQueues_ env session smp qs' = do
1327
1327
rs <- sendBatch subscribeSMPQueues smp qs'
1328
- (active, up ) <-
1328
+ (active, subResults ) <-
1329
1329
atomically $
1330
1330
ifM
1331
1331
(activeClientSession c tSess sessId)
1332
1332
(writeTVar session (Just sessId) >> ((True ,) <$> processSubResults rs))
1333
1333
(pure (False , [] ))
1334
1334
if active
1335
- then rs <$ if hasTempErrors rs then resubscribe else unless (null up) (notifySub c " " $ UP srv up)
1335
+ then do
1336
+ when (any isNothing subResults) resubscribe
1337
+ let up = catMaybes $ L. toList subResults
1338
+ unless (null up) $ notifySub c " " $ UP srv up
1339
+ pure rs
1336
1340
else do
1337
1341
logWarn " subcription batch result for replaced SMP client, resubscribing"
1338
1342
resubscribe $> L. map (second $ \ _ -> Left PCENetworkError ) rs
1339
1343
where
1340
1344
tSess@ (_, srv, _) = transportSession' smp
1341
1345
sessId = sessionId $ thParams smp
1342
- hasTempErrors = any (either temporaryClientError (const False ) . snd )
1343
- processSubResults :: NonEmpty (RcvQueue , Either SMPClientError () ) -> STM [ConnId ]
1344
- processSubResults = fmap catMaybes . mapM (uncurry $ processSubResult c) . L. toList
1346
+ processSubResults :: NonEmpty (RcvQueue , Either SMPClientError () ) -> STM (NonEmpty (Maybe ConnId ))
1347
+ processSubResults = mapM (uncurry $ processSubResult c)
1345
1348
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
1346
1349
1347
1350
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
0 commit comments