@@ -1498,15 +1498,15 @@ client
1498
1498
| clntIds -> pure $ ERR AUTH -- no retry on collision if sender ID is client-supplied
1499
1499
| otherwise -> tryCreate (n - 1 )
1500
1500
Left e -> pure $ ERR e
1501
- Right q -> do
1501
+ Right _q -> do
1502
1502
stats <- asks serverStats
1503
1503
incStat $ qCreated stats
1504
1504
incStat $ qCount stats
1505
1505
-- TODO [notifications]
1506
1506
-- when (isJust ntf) $ incStat $ ntfCreated stats
1507
1507
case subMode of
1508
1508
SMOnlyCreate -> pure ()
1509
- SMSubscribe -> void $ subscribeQueue q qr -- no need to check if message is available, it's a new queue
1509
+ SMSubscribe -> subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue
1510
1510
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = rcvServiceId} -- , serverNtfCreds = snd <$> ntf
1511
1511
(corrId,entId,) <$> tryCreate (3 :: Int )
1512
1512
@@ -1569,32 +1569,35 @@ client
1569
1569
1570
1570
-- TODO [certs rcv] if serviceId is passed, associate with the service and respond with SOK
1571
1571
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
1572
- subscribeQueueAndDeliver q qr =
1573
- liftIO (TM. lookupIO rId $ subscriptions clnt) >>= \ case
1574
- Nothing -> subscribeQueue q qr >>= deliver True
1572
+ subscribeQueueAndDeliver q qr@ QueueRec {rcvServiceId} =
1573
+ liftIO (TM. lookupIO entId $ subscriptions clnt) >>= \ case
1574
+ Nothing ->
1575
+ sharedSubscribeQueue SRecipientService q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub ) rcvServices >>= \ case
1576
+ Left e -> pure (err e, Nothing )
1577
+ Right s -> deliver s
1575
1578
Just s@ Sub {subThread} -> do
1576
1579
stats <- asks serverStats
1577
1580
case subThread of
1578
1581
ProhibitSub -> do
1579
1582
-- cannot use SUB in the same connection where GET was used
1580
1583
incStat $ qSubProhibited stats
1581
- pure ((corrId, rId, ERR $ CMD PROHIBITED ), Nothing )
1584
+ pure (err ( CMD PROHIBITED ), Nothing )
1582
1585
_ -> do
1583
1586
incStat $ qSubDuplicate stats
1584
- atomically (tryTakeTMVar $ delivered s) >> deliver False (Just s)
1587
+ let clntServiceId = (\ THClientService {serviceId} -> serviceId) <$> service
1588
+ atomically (tryTakeTMVar $ delivered s) >> deliver (True , Just s, clntServiceId)
1585
1589
where
1586
- rId = recipientId q
1587
- deliver :: Bool -> Maybe Sub -> M s ResponseAndMessage
1588
- deliver inc sub_ = do
1590
+ deliver :: (Bool , Maybe Sub , Maybe ServiceId ) -> M s ResponseAndMessage
1591
+ deliver (hasSub, sub_, serviceId) = do
1589
1592
stats <- asks serverStats
1590
- fmap (either (\ e -> ((corrId, rId, ERR e), Nothing )) id ) $ liftIO $ runExceptT $ do
1593
+ fmap (either ((, Nothing ) . err ) id ) $ liftIO $ runExceptT $ do
1591
1594
msg_ <- tryPeekMsg ms q
1592
1595
msg' <- forM msg_ $ \ msg -> do
1593
- sub <- maybe (atomically $ getSub rId ) pure sub_
1596
+ sub <- maybe (atomically $ getSub entId ) pure sub_
1594
1597
void $ atomically $ setDelivered sub msg
1595
- when inc $ incStat $ qSub stats
1596
- pure (rId , encryptMsg qr msg)
1597
- pure ((corrId, rId , SOK Nothing ), msg')
1598
+ unless hasSub $ incStat $ qSub stats
1599
+ pure (entId , encryptMsg qr msg)
1600
+ pure ((corrId, entId , SOK serviceId ), msg')
1598
1601
1599
1602
getSub :: RecipientId -> STM Sub
1600
1603
getSub rId =
@@ -1605,14 +1608,14 @@ client
1605
1608
TM. insert rId sub $ subscriptions clnt
1606
1609
pure sub
1607
1610
1608
- subscribeQueue :: StoreQueue s -> QueueRec -> M s (Maybe Sub ) -- (Either ErrorType (Bool, Maybe Sub, Maybe ServiceId) )
1609
- subscribeQueue q QueueRec {rcvServiceId} = do
1610
- -- sharedSubscribeQueue SRecipientService q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
1611
- let rId = recipientId q
1612
- sub <- atomically $ newSubscription NoSub
1613
- atomically $ TM. insert rId sub $ subscriptions clnt
1614
- atomically $ writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId Nothing , clientId)
1615
- pure $ Just sub
1611
+ subscribeNewQueue :: RecipientId -> QueueRec -> M s ()
1612
+ subscribeNewQueue rId QueueRec {rcvServiceId} = do
1613
+ case rcvServiceId of
1614
+ Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) ( + 1 )
1615
+ Nothing -> do
1616
+ sub <- atomically $ newSubscription NoSub
1617
+ atomically $ TM. insert rId sub $ subscriptions clnt
1618
+ atomically $ writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId rcvServiceId, clientId)
1616
1619
1617
1620
-- clients that use GET are not added to server subscribers
1618
1621
getMessage :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg )
@@ -1867,10 +1870,13 @@ client
1867
1870
tryDeliverMessage msg =
1868
1871
-- the subscribed client var is read outside of STM to avoid transaction cost
1869
1872
-- in case no client is subscribed.
1870
- getSubscribedClient rId (queueSubscribers subscribers)
1873
+ getSubscribed
1871
1874
$>>= atomically . deliverToSub
1872
1875
>>= mapM_ forkDeliver
1873
1876
where
1877
+ getSubscribed = case rcvServiceId qr of
1878
+ Just serviceId -> getSubscribedClient serviceId $ serviceSubscribers subscribers
1879
+ Nothing -> getSubscribedClient rId $ queueSubscribers subscribers
1874
1880
rId = recipientId q
1875
1881
deliverToSub rcv =
1876
1882
-- reading client TVar in the same transaction,
@@ -1879,6 +1885,7 @@ client
1879
1885
-- the new client will receive message in response to SUB.
1880
1886
readTVar rcv
1881
1887
$>>= \ rc@ Client {subscriptions = subs, sndQ = sndQ'} -> TM. lookup rId subs
1888
+ >>= maybe (newServiceDeliverySub subs) (pure . Just )
1882
1889
$>>= \ s@ Sub {subThread, delivered} -> case subThread of
1883
1890
ProhibitSub -> pure Nothing
1884
1891
ServerSub st -> readTVar st >>= \ case
@@ -1891,6 +1898,12 @@ client
1891
1898
(writeTVar st SubPending $> Just (rc, s, st))
1892
1899
(deliver sndQ' s $> Nothing )
1893
1900
_ -> pure Nothing
1901
+ newServiceDeliverySub subs
1902
+ | isJust (rcvServiceId qr) = do
1903
+ sub <- newSubscription NoSub
1904
+ TM. insert rId sub subs
1905
+ pure $ Just sub
1906
+ | otherwise = pure Nothing
1894
1907
deliver sndQ' s = do
1895
1908
let encMsg = encryptMsg qr msg
1896
1909
writeTBQueue sndQ' ([(NoCorrId , rId, MSG encMsg)], [] )
0 commit comments