Skip to content

Commit 5b69d29

Browse files
committed
smp server: refactor message delivery to always respond SOK to subscriptions
1 parent d80a6b7 commit 5b69d29

File tree

6 files changed

+156
-127
lines changed

6 files changed

+156
-127
lines changed

src/Simplex/Messaging/Protocol.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol
114114
BasicAuth (..),
115115
SrvLoc (..),
116116
CorrId (..),
117+
pattern NoCorrId,
117118
EntityId (..),
118119
pattern NoEntity,
119120
QueueId,
@@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString}
13701371
deriving (Eq, Ord, Show)
13711372
deriving newtype (Encoding)
13721373

1374+
pattern NoCorrId :: CorrId
1375+
pattern NoCorrId = CorrId ""
1376+
13731377
instance IsString CorrId where
13741378
fromString = CorrId . fromString
13751379
{-# INLINE fromString #-}

src/Simplex/Messaging/Server.hs

Lines changed: 88 additions & 105 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM
4040
MsgStore (..),
4141
AStoreType (..),
4242
VerifiedTransmission,
43+
ResponseAndMessage,
4344
newEnv,
4445
mkJournalStoreConfig,
4546
msgStore,
@@ -392,8 +393,8 @@ data Client s = Client
392393
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
393394
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
394395
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
395-
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
396-
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
396+
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [(RecipientId, RcvMessage)]),
397+
msgQ :: TBQueue (NonEmpty (RecipientId, RcvMessage)),
397398
procThreads :: TVar Int,
398399
endThreads :: TVar (IntMap (Weak ThreadId)),
399400
endThreadSeq :: TVar Int,
@@ -406,6 +407,8 @@ data Client s = Client
406407

407408
type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)
408409

410+
type ResponseAndMessage = (Transmission BrokerMsg, Maybe (RecipientId, RcvMessage))
411+
409412
data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub
410413

411414
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB
4343
import Data.ByteString.Char8 (ByteString)
4444
import qualified Data.ByteString.Lazy as LB
4545
import Data.Bitraversable (bimapM)
46-
import Data.Either (fromRight, lefts, rights)
46+
import Data.Either (fromRight, lefts)
4747
import Data.Functor (($>))
4848
import Data.Int (Int64)
4949
import Data.List (foldl', intersperse, partition)
5050
import Data.List.NonEmpty (NonEmpty)
51-
import qualified Data.List.NonEmpty as L
5251
import qualified Data.Map.Strict as M
53-
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
52+
import Data.Maybe (catMaybes, fromMaybe)
5453
import qualified Data.Set as S
5554
import Data.Text (Text)
5655
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
@@ -64,7 +63,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
6463
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
6564
import Database.PostgreSQL.Simple.SqlQQ (sql)
6665
import GHC.IO (catchAny)
67-
import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap)
66+
import Simplex.Messaging.Agent.Client (withLockMap)
6867
import Simplex.Messaging.Agent.Lock (Lock)
6968
import Simplex.Messaging.Agent.Store.AgentStore ()
7069
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
@@ -83,7 +82,7 @@ import Simplex.Messaging.Server.StoreLog
8382
import Simplex.Messaging.TMap (TMap)
8483
import qualified Simplex.Messaging.TMap as TM
8584
import Simplex.Messaging.Transport (SMPServiceRole (..))
86-
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=))
85+
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
8786
import System.Exit (exitFailure)
8887
import System.IO (IOMode (..), hFlush, stdout)
8988
import UnliftIO.STM

tests/CoreTests/BatchingTests.hs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ batchingTests = do
4747
it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions
4848
it "should batch with 255 ENDs per batch" testClientBatchENDs
4949
it "should batch with 80 NMSGs per batch" testClientBatchNMSGs
50+
it "should batch subscription responses with message" testBatchSubResponses
5051
it "should break on message that does not fit" testClientBatchWithMessage
5152
it "should break on large message" testClientBatchWithLargeMessage
5253

@@ -207,6 +208,20 @@ testClientBatchNMSGs = do
207208
(length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80)
208209
all lenOk [s1, s2, s3] `shouldBe` True
209210

211+
-- 4 responses are used in Simplex.Messaging.Server / `send`
212+
testBatchSubResponses :: IO ()
213+
testBatchSubResponses = do
214+
client <- testClientStub
215+
soks <- replicateM 4 $ randomSOK
216+
msg <- randomMSG
217+
let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg])
218+
batches = batchTransmissions (thParams client) $ L.fromList msgs
219+
length batches `shouldBe` 1
220+
soks' <- replicateM 5 $ randomSOK
221+
let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg])
222+
batches' = batchTransmissions (thParams client) $ L.fromList msgs'
223+
length batches' `shouldBe` 2
224+
210225
testClientBatchWithMessageV6 :: IO ()
211226
testClientBatchWithMessageV6 = do
212227
client <- testClientStubV6
@@ -361,6 +376,22 @@ randomNMSGCmd ts = do
361376
Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128
362377
pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta)
363378

379+
randomSOK :: IO (Transmission BrokerMsg)
380+
randomSOK = do
381+
g <- C.newRandom
382+
corrId <- atomically $ C.randomBytes 24 g
383+
rId <- atomically $ C.randomBytes 24 g
384+
pure (CorrId corrId, EntityId rId, SOK Nothing)
385+
386+
randomMSG :: IO (Transmission BrokerMsg)
387+
randomMSG = do
388+
g <- C.newRandom
389+
corrId <- atomically $ C.randomBytes 24 g
390+
rId <- atomically $ C.randomBytes 24 g
391+
msgId <- atomically $ C.randomBytes 24 g
392+
msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g
393+
pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg})
394+
364395
randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString))
365396
randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion
366397

tests/ServerTests.hs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import Data.ByteString.Char8 (ByteString)
2929
import qualified Data.ByteString.Char8 as B
3030
import Data.Hashable (hash)
3131
import qualified Data.IntSet as IS
32+
import Data.List.NonEmpty (NonEmpty)
3233
import Data.String (IsString (..))
3334
import Data.Type.Equality
3435
import qualified Data.X509.Validation as XV
@@ -111,16 +112,25 @@ sendRecv h@THandle {params} (sgn, corrId, qId, cmd) = do
111112
tGet1 h
112113

113114
signSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
114-
signSendRecv h pk = signSendRecv_ h pk Nothing
115+
signSendRecv h pk t = do
116+
[r] <- signSendRecv_ h pk Nothing t
117+
pure r
118+
119+
signSendRecv2 :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg), Transmission (Either ErrorType BrokerMsg))
120+
signSendRecv2 h pk t = do
121+
[r1, r2] <- signSendRecv_ h pk Nothing t
122+
pure (r1, r2)
115123

116124
serviceSignSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
117-
serviceSignSendRecv h pk = signSendRecv_ h pk . Just
125+
serviceSignSendRecv h pk serviceKey t = do
126+
[r] <- signSendRecv_ h pk (Just serviceKey) t
127+
pure r
118128

119-
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
129+
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (NonEmpty (Transmission (Either ErrorType BrokerMsg)))
120130
signSendRecv_ h@THandle {params} (C.APrivateAuthKey a pk) serviceKey_ (corrId, qId, cmd) = do
121131
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth params (CorrId corrId, qId, cmd)
122132
Right () <- tPut1 h (authorize tForAuth, tToSend)
123-
tGet1 h
133+
liftIO $ tGetClient h
124134
where
125135
authorize t = (,(`C.sign'` t) <$> serviceKey_) <$> case a of
126136
C.SEd25519 -> Just . TASignature . C.ASignature C.SEd25519 $ C.sign' pk t'
@@ -365,7 +375,7 @@ testCreateDelete =
365375
Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF)
366376
(ok4, OK) #== "accepts OFF when suspended"
367377

368-
Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
378+
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 rh rKey ("cdab", rId, SUB)
369379
(dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
370380

371381
Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL)
@@ -404,7 +414,7 @@ stressTest =
404414
Resp "" NoEntity (Ids rId _ _) <- signSendRecv h1 rKey ("", NoEntity, New rPub dhPub)
405415
pure rId
406416
let subscribeQueues h = forM_ rIds $ \rId -> do
407-
Resp "" rId' OK <- signSendRecv h rKey ("", rId, SUB)
417+
Resp "" rId' (SOK Nothing) <- signSendRecv h rKey ("", rId, SUB)
408418
rId' `shouldBe` rId
409419
closeConnection $ connection h1
410420
subscribeQueues h2
@@ -497,7 +507,7 @@ testSwitchSub =
497507
Resp "abcd" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1)
498508
(dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK"
499509

500-
Resp "bcda" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB)
510+
(Resp "bcda" _ (SOK Nothing), Resp "" _ (Msg mId2' msg2')) <- signSendRecv2 rh2 rKey ("bcda", rId, SUB)
501511
(dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
502512
Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2')
503513

@@ -684,7 +694,7 @@ testWithStoreLog =
684694
nId <- readTVarIO notifierId
685695
Resp "dabc" _ (SOK Nothing) <- signSendRecv h1 nKey ("dabc", nId, NSUB)
686696
Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello")
687-
Resp "cdab" _ (Msg mId3 msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB)
697+
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId3 msg3)) <- signSendRecv2 h rKey1 ("cdab", rId1, SUB)
688698
(decryptMsgV3 dh1 mId3 msg3, Right "hello") #== "delivered from restored queue"
689699
Resp "" _ (NMSG _ _) <- tGet1 h1
690700
-- this queue is removed - not restored
@@ -769,7 +779,7 @@ testRestoreMessages =
769779
Just rKey <- readTVarIO recipientKey
770780
Just dh <- readTVarIO dhShared
771781
let dec = decryptMsgV3 dh
772-
Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB)
782+
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 h rKey ("2", rId, SUB)
773783
(dec mId2 msg2, Right "hello 2") #== "restored message delivered"
774784
Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
775785
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
@@ -786,7 +796,7 @@ testRestoreMessages =
786796
Just rKey <- readTVarIO recipientKey
787797
Just dh <- readTVarIO dhShared
788798
let dec = decryptMsgV3 dh
789-
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
799+
(Resp "4" _ (SOK Nothing), Resp "" _ (Msg mId4 msg4)) <- signSendRecv2 h rKey ("4", rId, SUB)
790800
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
791801
Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4)
792802
(dec mId5 msg5, Right "hello 5") #== "restored message delivered"
@@ -1131,16 +1141,15 @@ testMsgExpireOnSend =
11311141
threadDelay 2500000
11321142
Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)")
11331143
testSMPClient @c $ \rh -> do
1134-
Resp "3" _ (Msg mId msg) <- signSendRecv rh rKey ("3", rId, SUB)
1144+
(Resp "3" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("3", rId, SUB)
11351145
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
11361146
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11371147
Nothing -> return ()
11381148
Just _ -> error "nothing else should be delivered"
11391149

11401150
testMsgExpireOnInterval :: SpecWith (ASrvTransport, AStoreType)
11411151
testMsgExpireOnInterval =
1142-
-- fails on ubuntu
1143-
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
1152+
it "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
11441153
g <- C.newRandom
11451154
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
11461155
let cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
@@ -1151,7 +1160,7 @@ testMsgExpireOnInterval =
11511160
threadDelay 3000000
11521161
testSMPClient @c $ \rh -> do
11531162
signSendRecv rh rKey ("2", rId, SUB) >>= \case
1154-
Resp "2" _ OK -> pure ()
1163+
Resp "2" _ (SOK Nothing) -> pure ()
11551164
r -> unexpected r
11561165
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11571166
Nothing -> return ()
@@ -1170,7 +1179,7 @@ testMsgNOTExpireOnInterval =
11701179
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)")
11711180
threadDelay 2500000
11721181
testSMPClient @c $ \rh -> do
1173-
Resp "2" _ (Msg mId msg) <- signSendRecv rh rKey ("2", rId, SUB)
1182+
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("2", rId, SUB)
11741183
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
11751184
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11761185
Nothing -> return ()

0 commit comments

Comments
 (0)