Skip to content

Commit da37384

Browse files
authored
smp server: batch commands (#1559)
* protocol: refactor types and encoding * clean * smp server: batch commands (#1560) * smp server: batch commands verification into one DB transaction * ghc 8.10.7 * flatten transmission tuples * diff * only use batch logic if there is more than one transmission * func * reset NTF service when adding notifier * version * Revert "smp server: use separate database pool for reading queues and creating service records (#1561)" This reverts commit 3df2425. * version * Revert "version" This reverts commit d80a6b7.
1 parent 1658048 commit da37384

File tree

24 files changed

+559
-380
lines changed

24 files changed

+559
-380
lines changed

src/Simplex/FileTransfer/Client.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = d
204204
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req (Just reqTimeout)
205205
when (B.length bodyHead /= xftpBlockSize) $ throwE $ PCEResponseError BLOCK
206206
-- TODO validate that the file ID is the same as in the request?
207-
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission thParams bodyHead
207+
(_, _fId, respOrErr) <-liftEither $ first PCEResponseError $ xftpDecodeTClient thParams bodyHead
208208
case respOrErr of
209209
Right r -> case protocolError r of
210210
Just e -> throwE $ PCEProtocolError e

src/Simplex/FileTransfer/Protocol.hs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,18 @@ import Simplex.Messaging.Protocol
4444
EntityId (..),
4545
RecipientId,
4646
SenderId,
47+
RawTransmission,
4748
SentRawTransmission,
48-
SignedTransmission,
49+
SignedTransmissionOrError,
4950
SndPublicAuthKey,
5051
Transmission,
5152
TransmissionForAuth (..),
5253
CorrId (..),
5354
encodeTransmission,
5455
encodeTransmissionForAuth,
5556
messageTagP,
56-
tDecodeParseValidate,
57+
tDecodeServer,
58+
tDecodeClient,
5759
tEncodeBatch1,
5860
tParse,
5961
)
@@ -197,7 +199,7 @@ instance FilePartyI p => ProtocolEncoding XFTPVersion XFTPErrorType (FileCommand
197199
fromProtocolError = fromProtocolError @XFTPVersion @XFTPErrorType @FileResponse
198200
{-# INLINE fromProtocolError #-}
199201

200-
checkCredentials (auth, _, EntityId fileId, _) cmd = case cmd of
202+
checkCredentials auth (EntityId fileId) cmd = case cmd of
201203
-- FNEW must not have signature and chunk ID
202204
FNEW {}
203205
| isNothing auth -> Left $ CMD NO_AUTH
@@ -231,7 +233,7 @@ instance ProtocolEncoding XFTPVersion XFTPErrorType FileCmd where
231233
fromProtocolError = fromProtocolError @XFTPVersion @XFTPErrorType @FileResponse
232234
{-# INLINE fromProtocolError #-}
233235

234-
checkCredentials t (FileCmd p c) = FileCmd p <$> checkCredentials t c
236+
checkCredentials tAuth entId (FileCmd p c) = FileCmd p <$> checkCredentials tAuth entId c
235237
{-# INLINE checkCredentials #-}
236238

237239
instance Encoding FileInfo where
@@ -310,7 +312,7 @@ instance ProtocolEncoding XFTPVersion XFTPErrorType FileResponse where
310312
PEBlock -> BLOCK
311313
{-# INLINE fromProtocolError #-}
312314

313-
checkCredentials (_, _, EntityId entId, _) cmd = case cmd of
315+
checkCredentials _ (EntityId entId) cmd = case cmd of
314316
FRSndIds {} -> noEntity
315317
-- ERR response does not always have entity ID
316318
FRErr _ -> Right cmd
@@ -335,25 +337,35 @@ checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of
335337
Just Refl -> Just c
336338
_ -> Nothing
337339

338-
xftpEncodeAuthTransmission :: ProtocolEncoding XFTPVersion e c => THandleParams XFTPVersion 'TClient -> C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString
339-
xftpEncodeAuthTransmission thParams@THandleParams {thAuth} pKey (corrId, fId, msg) = do
340-
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (corrId, fId, msg)
340+
xftpEncodeAuthTransmission :: ProtocolEncoding XFTPVersion XFTPErrorType c => THandleParams XFTPVersion 'TClient -> C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString
341+
xftpEncodeAuthTransmission thParams@THandleParams {thAuth} pKey t@(corrId, _, _) = do
342+
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams t
341343
xftpEncodeBatch1 . (,tToSend) =<< authTransmission thAuth False (Just pKey) (C.cbNonce $ bs corrId) tForAuth
342344

343-
xftpEncodeTransmission :: ProtocolEncoding XFTPVersion e c => THandleParams XFTPVersion p -> Transmission c -> Either TransportError ByteString
344-
xftpEncodeTransmission thParams (corrId, fId, msg) = do
345-
let t = encodeTransmission thParams (corrId, fId, msg)
346-
xftpEncodeBatch1 (Nothing, t)
345+
xftpEncodeTransmission :: ProtocolEncoding XFTPVersion XFTPErrorType c => THandleParams XFTPVersion p -> Transmission c -> Either TransportError ByteString
346+
xftpEncodeTransmission thParams t = xftpEncodeBatch1 (Nothing, encodeTransmission thParams t)
347347

348348
-- this function uses batch syntax but puts only one transmission in the batch
349349
xftpEncodeBatch1 :: SentRawTransmission -> Either TransportError ByteString
350350
xftpEncodeBatch1 t = first (const TELargeMsg) $ C.pad (tEncodeBatch1 False t) xftpBlockSize
351351

352-
xftpDecodeTransmission :: ProtocolEncoding XFTPVersion e c => THandleParams XFTPVersion p -> ByteString -> Either XFTPErrorType (SignedTransmission e c)
353-
xftpDecodeTransmission thParams t = do
352+
xftpDecodeTServer :: THandleParams XFTPVersion 'TServer -> ByteString -> Either XFTPErrorType (SignedTransmissionOrError XFTPErrorType FileCmd)
353+
xftpDecodeTServer = xftpDecodeTransmission tDecodeServer
354+
{-# INLINE xftpDecodeTServer #-}
355+
356+
xftpDecodeTClient :: THandleParams XFTPVersion 'TClient -> ByteString -> Either XFTPErrorType (Transmission (Either XFTPErrorType FileResponse))
357+
xftpDecodeTClient = xftpDecodeTransmission tDecodeClient
358+
{-# INLINE xftpDecodeTClient #-}
359+
360+
xftpDecodeTransmission ::
361+
(THandleParams XFTPVersion p -> Either TransportError RawTransmission -> r) ->
362+
THandleParams XFTPVersion p ->
363+
ByteString ->
364+
Either XFTPErrorType r
365+
xftpDecodeTransmission tDecode thParams t = do
354366
t' <- first (const BLOCK) $ C.unPad t
355367
case tParse thParams t' of
356-
t'' :| [] -> Right $ tDecodeParseValidate thParams t''
368+
t'' :| [] -> Right $ tDecode thParams t''
357369
_ -> Left BLOCK
358370

359371
$(J.deriveJSON (enumJSON $ dropPrefix "F") ''FileParty)

src/Simplex/FileTransfer/Server.hs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import qualified Simplex.Messaging.Crypto as C
5353
import qualified Simplex.Messaging.Crypto.Lazy as LC
5454
import Simplex.Messaging.Encoding
5555
import Simplex.Messaging.Encoding.String
56-
import Simplex.Messaging.Protocol (CorrId (..), BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, TAuthorizations, pattern NoEntity)
56+
import Simplex.Messaging.Protocol (BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, SignedTransmission, pattern NoEntity)
5757
import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization)
5858
import Simplex.Messaging.Server.Control (CPClientRole (..))
5959
import Simplex.Messaging.Server.Expiration
@@ -317,22 +317,20 @@ data ServerFile = ServerFile
317317
processRequest :: XFTPTransportRequest -> M ()
318318
processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse}
319319
| B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", NoEntity, FRErr BLOCK) Nothing
320-
| otherwise = do
321-
case xftpDecodeTransmission thParams bodyHead of
322-
Right (sig_, signed, (corrId, fId, cmdOrErr)) ->
323-
case cmdOrErr of
324-
Right cmd -> do
325-
let THandleParams {thAuth} = thParams
326-
verifyXFTPTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) sig_ signed fId cmd >>= \case
327-
VRVerified req -> uncurry send =<< processXFTPRequest body req
328-
VRFailed e -> send (FRErr e) Nothing
329-
Left e -> send (FRErr e) Nothing
320+
| otherwise =
321+
case xftpDecodeTServer thParams bodyHead of
322+
Right (Right t@(_, _, (corrId, fId, _))) -> do
323+
let THandleParams {thAuth} = thParams
324+
verifyXFTPTransmission thAuth t >>= \case
325+
VRVerified req -> uncurry send =<< processXFTPRequest body req
326+
VRFailed e -> send (FRErr e) Nothing
330327
where
331328
send resp = sendXFTPResponse (corrId, fId, resp)
329+
Right (Left (corrId, fId, e)) -> sendXFTPResponse (corrId, fId, FRErr e) Nothing
332330
Left e -> sendXFTPResponse ("", NoEntity, FRErr e) Nothing
333331
where
334-
sendXFTPResponse (corrId, fId, resp) serverFile_ = do
335-
let t_ = xftpEncodeTransmission thParams (corrId, fId, resp)
332+
sendXFTPResponse t' serverFile_ = do
333+
let t_ = xftpEncodeTransmission thParams t'
336334
#ifdef slow_servers
337335
randomDelay
338336
#endif
@@ -361,8 +359,8 @@ randomDelay = do
361359

362360
data VerificationResult = VRVerified XFTPRequest | VRFailed XFTPErrorType
363361

364-
verifyXFTPTransmission :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TAuthorizations -> ByteString -> XFTPFileId -> FileCmd -> M VerificationResult
365-
verifyXFTPTransmission auth_ tAuth authorized fId cmd =
362+
verifyXFTPTransmission :: Maybe (THandleAuth 'TServer) -> SignedTransmission FileCmd -> M VerificationResult
363+
verifyXFTPTransmission thAuth (tAuth, authorized, (corrId, fId, cmd)) =
366364
case cmd of
367365
FileCmd SFSender (FNEW file rcps auth') -> pure $ XFTPReqNew file rcps auth' `verifyWith` sndKey file
368366
FileCmd SFRecipient PING -> pure $ VRVerified XFTPReqPing
@@ -381,9 +379,9 @@ verifyXFTPTransmission auth_ tAuth authorized fId cmd =
381379
EntityBlocked info -> VRFailed $ BLOCKED info
382380
EntityOff -> noFileAuth
383381
Left _ -> pure noFileAuth
384-
noFileAuth = maybe False (dummyVerifyCmd Nothing authorized) tAuth `seq` VRFailed AUTH
382+
noFileAuth = dummyVerifyCmd thAuth tAuth authorized corrId `seq` VRFailed AUTH
385383
-- TODO verify with DH authorization
386-
req `verifyWith` k = if verifyCmdAuthorization auth_ tAuth authorized k then VRVerified req else VRFailed AUTH
384+
req `verifyWith` k = if verifyCmdAuthorization thAuth tAuth authorized corrId k then VRVerified req else VRFailed AUTH
387385

388386
processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile)
389387
processXFTPRequest HTTP2Body {bodyPart} = \case

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ module Simplex.Messaging.Agent.Client
3333
withConnLocks,
3434
withInvLock,
3535
withLockMap,
36+
withLocksMap,
3637
getMapLock,
3738
ipAddressProtected,
3839
closeAgentClient,
@@ -1004,16 +1005,16 @@ withInvLock' AgentClient {invLocks} = withLockMap invLocks
10041005
{-# INLINE withInvLock' #-}
10051006

10061007
withConnLocks :: AgentClient -> Set ConnId -> Text -> AM' a -> AM' a
1007-
withConnLocks AgentClient {connLocks} = withLocksMap_ connLocks
1008+
withConnLocks AgentClient {connLocks} = withLocksMap connLocks
10081009
{-# INLINE withConnLocks #-}
10091010

10101011
withLockMap :: (Ord k, MonadUnliftIO m) => TMap k Lock -> k -> Text -> m a -> m a
10111012
withLockMap = withGetLock . getMapLock
10121013
{-# INLINE withLockMap #-}
10131014

1014-
withLocksMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> Set k -> Text -> m a -> m a
1015-
withLocksMap_ = withGetLocks . getMapLock
1016-
{-# INLINE withLocksMap_ #-}
1015+
withLocksMap :: (Ord k, MonadUnliftIO m) => TMap k Lock -> Set k -> Text -> m a -> m a
1016+
withLocksMap = withGetLocks . getMapLock
1017+
{-# INLINE withLocksMap #-}
10171018

10181019
getMapLock :: Ord k => TMap k Lock -> k -> STM Lock
10191020
getMapLock locks key = TM.lookup key locks >>= maybe newLock pure

src/Simplex/Messaging/Client.hs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ data PClient v err msg = PClient
183183
clientCorrId :: TVar ChaChaDRG,
184184
sentCommands :: TMap CorrId (Request err msg),
185185
sndQ :: TBQueue (Maybe (Request err msg), ByteString),
186-
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
186+
rcvQ :: TBQueue (NonEmpty (Transmission (Either err msg))),
187187
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
188188
}
189189

@@ -615,7 +615,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
615615

616616
receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
617617
receive ProtocolClient {client_ = PClient {rcvQ, lastReceived, timeoutErrorCount}} h = forever $ do
618-
tGet h >>= atomically . writeTBQueue rcvQ
618+
tGetClient h >>= atomically . writeTBQueue rcvQ
619619
getCurrentTime >>= atomically . writeTVar lastReceived
620620
atomically $ writeTVar timeoutErrorCount 0
621621

@@ -642,14 +642,14 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
642642
process :: ProtocolClient v err msg -> IO ()
643643
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c
644644

645-
processMsgs :: ProtocolClient v err msg -> NonEmpty (SignedTransmission err msg) -> IO ()
645+
processMsgs :: ProtocolClient v err msg -> NonEmpty (Transmission (Either err msg)) -> IO ()
646646
processMsgs c ts = do
647647
ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts)
648648
forM_ msgQ $ \q ->
649649
mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts')
650650

651-
processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg))
652-
processMsg ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr))
651+
processMsg :: ProtocolClient v err msg -> Transmission (Either err msg) -> IO (Maybe (EntityId, ServerTransmission err msg))
652+
processMsg ProtocolClient {client_ = PClient {sentCommands}} (corrId, entId, respOrErr)
653653
| B.null $ bs corrId = sendMsg $ STEvent clientResp
654654
| otherwise =
655655
TM.lookupIO corrId sentCommands >>= \case
@@ -767,7 +767,7 @@ createSMPQueue ::
767767
-- Maybe NewNtfCreds ->
768768
ExceptT SMPClientError IO QueueIdsKeys
769769
createSMPQueue c nonce_ (rKey, rpKey) dhKey auth subMode qrd =
770-
sendProtocolCommand_ c nonce_ Nothing (Just rpKey) NoEntity (Cmd SRecipient $ NEW $ NewQueueReq rKey dhKey auth subMode (Just qrd)) >>= \case
770+
sendProtocolCommand_ c nonce_ Nothing (Just rpKey) NoEntity (Cmd SCreator $ NEW $ NewQueueReq rKey dhKey auth subMode (Just qrd)) >>= \case
771771
IDS qik -> pure qik
772772
r -> throwE $ unexpectedResponse r
773773

@@ -848,7 +848,7 @@ nsubResponse_ = \case
848848
r' -> Left $ unexpectedResponse r'
849849
{-# INLINE nsubResponse_ #-}
850850

851-
subscribeService :: forall p. (PartyI p, SubscriberParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
851+
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
852852
subscribeService c party = case smpClientService c of
853853
Just THClientService {serviceId, serviceKey} -> do
854854
liftIO $ enablePings c
@@ -858,8 +858,8 @@ subscribeService c party = case smpClientService c of
858858
where
859859
subCmd :: Command p
860860
subCmd = case party of
861-
SRecipient -> SUBS
862-
SNotifier -> NSUBS
861+
SRecipientService -> SUBS
862+
SNotifierService -> NSUBS
863863
Nothing -> throwE PCEServiceUnavailable
864864

865865
smpClientService :: SMPClient -> Maybe THClientService
@@ -1119,8 +1119,8 @@ proxySMPCommand c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
11191119
-- server interaction errors are thrown directly
11201120
t' <- liftEitherWith PCECryptoError $ C.cbDecrypt cmdSecret (C.reverseNonce nonce) er
11211121
case tParse serverThParams t' of
1122-
t'' :| [] -> case tDecodeParseValidate serverThParams t'' of
1123-
(_auth, _signed, (_c, _e, cmd)) -> case cmd of
1122+
t'' :| [] -> case tDecodeClient serverThParams t'' of
1123+
(_, _, cmd) -> case cmd of
11241124
Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay
11251125
Right r' -> pure $ Right r'
11261126
Left e -> throwE $ PCEResponseError e

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ import Simplex.Messaging.Protocol
7070
QueueId,
7171
SMPServer,
7272
SParty (..),
73-
SubscriberParty,
74-
subscriberParty,
75-
subscriberServiceRole
73+
ServiceParty,
74+
serviceParty,
75+
partyServiceRole
7676
)
7777
import Simplex.Messaging.Session
7878
import Simplex.Messaging.TMap (TMap)
@@ -331,11 +331,11 @@ reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} s
331331

332332
reconnectSMPClient :: forall p. SMPClientAgent p -> SMPServer -> (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> ExceptT SMPClientError IO ()
333333
reconnectSMPClient ca@SMPClientAgent {agentCfg, agentParty} srv (sSub_, qSubs_) =
334-
withSMP ca srv $ \smp -> liftIO $ case subscriberParty agentParty of
334+
withSMP ca srv $ \smp -> liftIO $ case serviceParty agentParty of
335335
Just Dict -> resubscribe smp
336336
Nothing -> pure ()
337337
where
338-
resubscribe :: (PartyI p, SubscriberParty p) => SMPClient -> IO ()
338+
resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
339339
resubscribe smp = do
340340
mapM_ (smpSubscribeService ca smp srv) sSub_
341341
forM_ qSubs_ $ \qSubs -> do
@@ -394,22 +394,22 @@ withSMP ca srv action = (getSMPServerClient' ca srv >>= action) `catchE` logSMPE
394394
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode $ host srv) <> "): " <> tshow e
395395
throwE e
396396

397-
subscribeQueuesNtfs :: SMPClientAgent 'Notifier -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
397+
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
398398
subscribeQueuesNtfs = subscribeQueues_
399399
{-# INLINE subscribeQueuesNtfs #-}
400400

401-
subscribeQueues_ :: SubscriberParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
401+
subscribeQueues_ :: ServiceParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
402402
subscribeQueues_ ca srv subs = do
403403
atomically $ addPendingSubs ca srv $ L.toList subs
404404
runExceptT (getSMPServerClient' ca srv) >>= \case
405405
Right smp -> smpSubscribeQueues ca smp srv subs
406406
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
407407

408-
smpSubscribeQueues :: SubscriberParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
408+
smpSubscribeQueues :: ServiceParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
409409
smpSubscribeQueues ca smp srv subs = do
410410
rs <- case agentParty ca of
411-
SRecipient -> subscribeSMPQueues smp subs
412-
SNotifier -> subscribeSMPQueuesNtfs smp subs
411+
SRecipientService -> subscribeSMPQueues smp subs
412+
SNotifierService -> subscribeSMPQueuesNtfs smp subs
413413
rs' <-
414414
atomically $
415415
ifM
@@ -454,18 +454,18 @@ smpSubscribeQueues ca smp srv subs = do
454454
notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
455455
notify_ evt qs = mapM_ (notify ca . evt srv) $ L.nonEmpty qs
456456

457-
subscribeServiceNtfs :: SMPClientAgent 'Notifier -> SMPServer -> (ServiceId, Int64) -> IO ()
457+
subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> (ServiceId, Int64) -> IO ()
458458
subscribeServiceNtfs = subscribeService_
459459
{-# INLINE subscribeServiceNtfs #-}
460460

461-
subscribeService_ :: (PartyI p, SubscriberParty p) => SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
461+
subscribeService_ :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
462462
subscribeService_ ca srv serviceSub = do
463463
atomically $ setPendingServiceSub ca srv $ Just serviceSub
464464
runExceptT (getSMPServerClient' ca srv) >>= \case
465465
Right smp -> smpSubscribeService ca smp srv serviceSub
466466
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
467467

468-
smpSubscribeService :: (PartyI p, SubscriberParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
468+
smpSubscribeService :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
469469
smpSubscribeService ca smp srv serviceSub@(serviceId, _) = case smpClientService smp of
470470
Just service | serviceAvailable service -> subscribe
471471
_ -> notifyUnavailable
@@ -490,7 +490,7 @@ smpSubscribeService ca smp srv serviceSub@(serviceId, _) = case smpClientService
490490
setActiveServiceSub ca srv $ Just ((serviceId, n), sessId)
491491
setPendingServiceSub ca srv Nothing
492492
serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =
493-
serviceId == serviceId' && subscriberServiceRole (agentParty ca) == serviceRole
493+
serviceId == serviceId' && partyServiceRole (agentParty ca) == serviceRole
494494
notifyUnavailable = do
495495
atomically $ setPendingServiceSub ca srv Nothing
496496
notify ca $ CAServiceUnavailable srv serviceSub -- this will resubscribe all queues directly

0 commit comments

Comments
 (0)