Skip to content

Commit 62733ef

Browse files
authored
smp server: refactor subscriptions and delivery in order to always response SOK on subscription with an optional message to follow. (#1573)
* smp server: refactor subscriptions and delivery * metric for time between MSG and ACK * cleanup * refactor pattern match for ghc 8.10.7 * time buckets * split max time metric * histogram * fix
1 parent 99e59b7 commit 62733ef

File tree

10 files changed

+276
-198
lines changed

10 files changed

+276
-198
lines changed

src/Simplex/Messaging/Client.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,8 +1224,8 @@ okSMPCommands cmd c nm qs = L.map process <$> sendProtocolCommands c nm cs
12241224
Left e -> Left e
12251225

12261226
-- | Send SMP command
1227-
sendSMPCommand :: PartyI p => SMPClient -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg
1228-
sendSMPCommand c nm pKey qId cmd = sendProtocolCommand c nm pKey qId (Cmd sParty cmd)
1227+
sendSMPCommand :: PartyI p => SMPClient -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg
1228+
sendSMPCommand c nm pKey entId cmd = sendProtocolCommand c nm pKey entId (Cmd sParty cmd)
12291229
{-# INLINE sendSMPCommand #-}
12301230

12311231
type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)

src/Simplex/Messaging/Protocol.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol
114114
BasicAuth (..),
115115
SrvLoc (..),
116116
CorrId (..),
117+
pattern NoCorrId,
117118
EntityId (..),
118119
pattern NoEntity,
119120
QueueId,
@@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString}
13701371
deriving (Eq, Ord, Show)
13711372
deriving newtype (Encoding)
13721373

1374+
pattern NoCorrId :: CorrId
1375+
pattern NoCorrId = CorrId ""
1376+
13731377
instance IsString CorrId where
13741378
fromString = CorrId . fromString
13751379
{-# INLINE fromString #-}

src/Simplex/Messaging/Server.hs

Lines changed: 161 additions & 161 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM
4040
MsgStore (..),
4141
AStoreType (..),
4242
VerifiedTransmission,
43+
ResponseAndMessage,
4344
newEnv,
4445
mkJournalStoreConfig,
4546
msgStore,
@@ -377,7 +378,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
377378
data ClientSub
378379
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
379380
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
380-
| CSService ServiceId -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
381+
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
381382

382383
newtype ProxyAgent = ProxyAgent
383384
{ smpAgent :: SMPClientAgent 'Sender
@@ -394,7 +395,7 @@ data Client s = Client
394395
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
395396
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
396397
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
397-
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
398+
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
398399
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
399400
procThreads :: TVar Int,
400401
endThreads :: TVar (IntMap (Weak ThreadId)),
@@ -408,13 +409,15 @@ data Client s = Client
408409

409410
type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)
410411

412+
type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))
413+
411414
data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub
412415

413416
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
414417

415418
data Sub = Sub
416419
{ subThread :: ServerSub, -- Nothing value indicates that sub
417-
delivered :: TMVar (MsgId, RoundedSystemTime)
420+
delivered :: TVar (Maybe (MsgId, RoundedSystemTime))
418421
}
419422

420423
newServer :: IO (Server s)
@@ -497,13 +500,13 @@ newClient clientId qSize clientTHParams createdAt = do
497500

498501
newSubscription :: SubscriptionThread -> STM Sub
499502
newSubscription st = do
500-
delivered <- newEmptyTMVar
503+
delivered <- newTVar Nothing
501504
subThread <- ServerSub <$> newTVar st
502505
return Sub {subThread, delivered}
503506

504507
newProhibitedSub :: STM Sub
505508
newProhibitedSub = do
506-
delivered <- newEmptyTMVar
509+
delivered <- newTVar Nothing
507510
return Sub {subThread = ProhibitSub, delivered}
508511

509512
newEnv :: ServerConfig s -> IO (Env s)

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
356356
{-# INLINE setQueueService #-}
357357
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
358358
{-# INLINE getQueueNtfServices #-}
359-
getNtfServiceQueueCount = withQS (getNtfServiceQueueCount @(JournalQueue s))
360-
{-# INLINE getNtfServiceQueueCount #-}
359+
getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s))
360+
{-# INLINE getServiceQueueCount #-}
361361

362362
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
363363
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB
4343
import Data.ByteString.Char8 (ByteString)
4444
import qualified Data.ByteString.Lazy as LB
4545
import Data.Bitraversable (bimapM)
46-
import Data.Either (fromRight, lefts, rights)
46+
import Data.Either (fromRight, lefts)
4747
import Data.Functor (($>))
4848
import Data.Int (Int64)
4949
import Data.List (foldl', intersperse, partition)
5050
import Data.List.NonEmpty (NonEmpty)
51-
import qualified Data.List.NonEmpty as L
5251
import qualified Data.Map.Strict as M
53-
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
52+
import Data.Maybe (catMaybes, fromMaybe)
5453
import qualified Data.Set as S
5554
import Data.Text (Text)
5655
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
@@ -65,7 +64,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
6564
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
6665
import Database.PostgreSQL.Simple.SqlQQ (sql)
6766
import GHC.IO (catchAny)
68-
import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap)
67+
import Simplex.Messaging.Agent.Client (withLockMap)
6968
import Simplex.Messaging.Agent.Lock (Lock)
7069
import Simplex.Messaging.Agent.Store.AgentStore ()
7170
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
@@ -84,7 +83,7 @@ import Simplex.Messaging.Server.StoreLog
8483
import Simplex.Messaging.TMap (TMap)
8584
import qualified Simplex.Messaging.TMap as TM
8685
import Simplex.Messaging.Transport (SMPServiceRole (..))
87-
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=))
86+
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
8887
import System.Exit (exitFailure)
8988
import System.IO (IOMode (..), hFlush, stdout)
9089
import UnliftIO.STM
@@ -485,11 +484,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
485484
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
486485
in ((serviceId, sNtfs) : ssNtfs, restNtfs)
487486

488-
getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
489-
getNtfServiceQueueCount st serviceId =
490-
E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db ->
487+
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
488+
getServiceQueueCount st party serviceId =
489+
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
491490
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
492-
DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId)
491+
DB.query db query (Only serviceId)
492+
where
493+
query = case party of
494+
SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL"
495+
SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL"
493496

494497
batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
495498
batchInsertServices services' toStore =

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,15 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
346346
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
347347
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)
348348

349-
getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
350-
getNtfServiceQueueCount st serviceId =
349+
getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
350+
getServiceQueueCount st party serviceId =
351351
TM.lookupIO serviceId (services st) >>=
352-
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues)
352+
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel)
353+
where
354+
serviceSel :: STMService -> TVar (Set QueueId)
355+
serviceSel = case party of
356+
SRecipientService -> serviceRcvQueues
357+
SNotifierService -> serviceNtfQueues
353358

354359
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
355360
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a

src/Simplex/Messaging/Server/QueueStore/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class StoreQueueClass q => QueueStoreClass q s where
4848
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
4949
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
5050
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
51-
getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64)
51+
getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
5252

5353
data EntityCounts = EntityCounts
5454
{ queueCount :: Int,

tests/CoreTests/BatchingTests.hs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ batchingTests = do
4747
it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions
4848
it "should batch with 255 ENDs per batch" testClientBatchENDs
4949
it "should batch with 80 NMSGs per batch" testClientBatchNMSGs
50+
it "should batch subscription responses with message" testBatchSubResponses
5051
it "should break on message that does not fit" testClientBatchWithMessage
5152
it "should break on large message" testClientBatchWithLargeMessage
5253

@@ -207,6 +208,20 @@ testClientBatchNMSGs = do
207208
(length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80)
208209
all lenOk [s1, s2, s3] `shouldBe` True
209210

211+
-- 4 responses are used in Simplex.Messaging.Server / `send`
212+
testBatchSubResponses :: IO ()
213+
testBatchSubResponses = do
214+
client <- testClientStub
215+
soks <- replicateM 4 $ randomSOK
216+
msg <- randomMSG
217+
let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg])
218+
batches = batchTransmissions (thParams client) $ L.fromList msgs
219+
length batches `shouldBe` 1
220+
soks' <- replicateM 5 $ randomSOK
221+
let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg])
222+
batches' = batchTransmissions (thParams client) $ L.fromList msgs'
223+
length batches' `shouldBe` 2
224+
210225
testClientBatchWithMessageV6 :: IO ()
211226
testClientBatchWithMessageV6 = do
212227
client <- testClientStubV6
@@ -361,6 +376,22 @@ randomNMSGCmd ts = do
361376
Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128
362377
pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta)
363378

379+
randomSOK :: IO (Transmission BrokerMsg)
380+
randomSOK = do
381+
g <- C.newRandom
382+
corrId <- atomically $ C.randomBytes 24 g
383+
rId <- atomically $ C.randomBytes 24 g
384+
pure (CorrId corrId, EntityId rId, SOK Nothing)
385+
386+
randomMSG :: IO (Transmission BrokerMsg)
387+
randomMSG = do
388+
g <- C.newRandom
389+
corrId <- atomically $ C.randomBytes 24 g
390+
rId <- atomically $ C.randomBytes 24 g
391+
msgId <- atomically $ C.randomBytes 24 g
392+
msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g
393+
pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg})
394+
364395
randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString))
365396
randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion
366397

0 commit comments

Comments
 (0)