Skip to content

Commit b57b709

Browse files
committed
rfc about drift detection, and SALL to mark end of message delivery
1 parent c4f6240 commit b57b709

File tree

9 files changed

+173
-34
lines changed

9 files changed

+173
-34
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Detecting and fixing state with service subscriptions
2+
3+
## Problem
4+
5+
While service certificates and subscriptions hugely decrease startup time and delivery delays on server restarts, they introduce the risk of losing subscriptions in case of state drifts. They also do not provide efficient mechanism for validating that the list of subscribed queues is in sync.
6+
7+
How can the state drift happen?
8+
9+
There are several possibilities:
10+
- lost broker response would make the broker consider that the queue is associated, but the client won't know it, and will have to re-associate. While in itself it is not a problem, as it'll be resolved, it would make drift detected more frequently (regardless of the detection logic used). That service certificates are used on clients with good connection would make it less likely though.
11+
- server state restored from the backup, in case of some failure. Nothing can be done to recover lost queues, but we may restore lost service associations.
12+
- queue blocking or removal by server operator because of policy violation.
13+
- server downgrade (when it loses all service associations) with subsequent upgrade - the client would think queues are associated, while they are not, and won't receive any messages at all in this scenario.
14+
- any other server-side error or logic error.
15+
16+
In addition to the possibility of the drift, we simply need to have confidence that service subscriptions work as intended, without skipping queues. We ignored this consideration for notifications, as the tolerance to lost notifications is higher, but we can't ignore it for messages.
17+
18+
## Solution
19+
20+
Previously considered approach of sending NIL to all queues without messages is very expensive for traffic (most queues don't have messages), and it is also very expensive to detect and validate drift in the client because of asynchronous / concurrent events.
21+
22+
We cannot read all queues into memory, and we cannot aggregate all responses in memory, and we cannot create database writes on every single service subscription to say 1m queues (a realistic number), as it simply won't work well even at the current scale.
23+
24+
An approach of having an efficient way to detect drift, but load the full list of IDs when drift is detected, also won't work well, as drifts may be common, so we need both efficient way to detect there is diff and also to reconcile it.
25+
26+
### Drift detection
27+
28+
Both client and server would maintain the number of associated queues and the "symmetric" hash over the set of queue IDs. The requirements for this hash algorithm are:
29+
- not cryptographically strong, to be fast.
30+
- 128 bits to minimize collisions over the large set of millions of queues.
31+
- symmetric - the result should not depend on ID order.
32+
- allows fast additions and removals.
33+
34+
In this way, every time association is added or removed (including queue marked as deleted), both peers would recompute this hash in the same transaction.
35+
36+
The client would suspend sending and processing any other commands on the server and the queues of this server until SOKS response is received from this server, to prevent drift. It can be achieved with per-server semaphores/locks in memory. UI clients need to become responsive sooner than these responses are received, but we do not service certificates on UI clients, and chat relays may prevent operations on server queues until SOKS response is received.
37+
38+
SOKS response would include both the count of associated queues (as now) and the hash over all associated queue IDs (to be added). If both count and hash match, the client will not do anything. If either does not match the client would perform full sync (see below).
39+
40+
There is a value from doing the same in notification server as well to detect and "fix" drifts.
41+
42+
The algorithm to compute hashes can be the following.
43+
44+
1. Compute hash of each queue ID using xxHash3_128 ([xxhash-ffi](https://hackage.haskell.org/package/xxhash-ffi) library). They don't need to be stored or loaded at once, initially, it can be done with streaming if it is detected on start that there is no pre-computed hash.
45+
2. Combine hashes using XOR. XOR is both commutative and associative, so it would produce the same aggregate hash irrespective of the ID order.
46+
3. Adding queue ID to pre-computed hash requires a single XOR with ID hash: `new_aggregate = aggregate XOR hash(queue_id)`.
47+
4. Removing queue ID from pre-computed hash also requires the same XOR (XOR is involutory, it undoes itself): `new_aggregate = aggregate XOR hash(queue_id)`.
48+
49+
These hashes need to be computed per user/server in the client and per service certificate in the server - on startup both have to validate and compute them once if necessary.
50+
51+
There can be also a start-up option to recompute hashe(s) to detect and fix any errors.
52+
53+
This is all rather simple and would help detecting drifts.
54+
55+
### Synchronization when drift is detected
56+
57+
The assumption here is that in most cases drifts are rare, and isolated to few IDs (e.g., this is the case with notification server).
58+
59+
But the algorithm should be resilient to losing all associations, and it should not be substantially worse than simply restoring all associations or loading all IDs.
60+
61+
We have `c_n` and `c_hash` for client-side count and hash of queue IDs and `s_n` and `s_hash` for server-side, which are returned in SOKS response to SUBS command.
62+
63+
1. If `c_n /= s_n || c_hash /= s_hash`, the client must perform sync.
64+
65+
2. If `abs(c_n - s_n) / max(c_n, s_n) > 0.5`, the client will request the full list of queues (more than half of the queues are different), and will perform diff with the queues it has. While performing the diff the client will continue block operations with this user/server.
66+
67+
3. Otherwise would perform some algorithm for determining the difference between queue IDs between client and server. This algorithm can be made efficient (`O(log N)`) by relying on efficient sorting of IDs and database loading of ranges, via computing and communicating hashes of ranges, and performing a binary search on ranges, with batching to optimize network traffic.
68+
69+
This algorithm is similar to Merkle tree reconcilliation, but it is optimized for database reading of ordered ranges, and for our 16kb block size to minimize network requests.
70+
71+
The algorithm:
72+
1. The client would request all ranges from the server.
73+
2. The server would compute hashes for N ranges of IDs and send them to the client. Each range would include start_id, optional end_id (for single ID ranges) and XOR-hash of the range. N is determined based on the block size and the range size.
74+
3. The client would perform the same computation for the same ranges, and compare them with the returned ranges from the server, while detecting any gaps between ranges and missing range boundaries.
75+
4. If more than half of the ranges don't match, the client would request the full list. Otherwise it would repeat the same algorithm for each mismatched range and for gaps.
76+
77+
It can be further optimized by merging adjacent ranges and by batching all range requests, it is quite simple.
78+
79+
Once the client determines the list of missing and extra queues it can:
80+
- create associations (via SUB) for missing queues,
81+
- request removal of association (a new command, e.g. BUS) for extra queues on the server.
82+
83+
The pseudocode for the algorightm:
84+
85+
For the server to return all ranges or subranges of requested range:
86+
87+
```haskell
88+
getSubRanges :: Maybe (RecipientId, RecipientId) -> [(RecipientId, Maybe RecipientId, Hash)]
89+
getSubRanges range_ = do
90+
((min_id, max_id), s_n) <- case range_ of
91+
Nothing -> getAssociatedQueueRange -- with the certificate in the client session.
92+
Just range -> (range,) <$> getAssociatedQueueCount range
93+
if
94+
| s_n <= max_N -> reply_with_single_queue_ranges
95+
| otherwise -> do
96+
let range_size = s_n `div` max_N
97+
read_all_ranges -- in a recursive loop, with max_id, range_hash and next_min_id in each step
98+
reply_ranges
99+
```
100+
101+
We don't need to implement this synchronization logic right now, so not including client logic here, it's sufficient to implement drift detection, and the action to fix the drift would be to disable and to re-enable certificates via some command-line parameter of CLI.

src/Simplex/Messaging/Agent.hs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ import Simplex.Messaging.Protocol
200200
ErrorType (AUTH),
201201
MsgBody,
202202
MsgFlags (..),
203+
IdsHash,
203204
NtfServer,
204205
ProtoServerWithAuth (..),
205206
ProtocolServer (..),
@@ -465,7 +466,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
465466
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
466467
{-# INLINE resubscribeConnections #-}
467468

468-
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType Int64))
469+
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
469470
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
470471
{-# INLINE subscribeClientServices #-}
471472

@@ -1361,7 +1362,8 @@ resubscribeConnections' c connIds = do
13611362
-- union is left-biased, so results returned by subscribeConnections' take precedence
13621363
(`M.union` r) <$> subscribeConnections' c connIds'
13631364

1364-
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType Int64))
1365+
-- TODO [serts rcv] compare hash with lock
1366+
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
13651367
subscribeClientServices' c userId =
13661368
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
13671369
where

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ import Simplex.Messaging.Protocol
254254
ErrorType,
255255
MsgFlags (..),
256256
MsgId,
257+
IdsHash,
257258
NtfServer,
258259
NtfServerWithAuth,
259260
ProtoServer,
@@ -1424,6 +1425,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
14241425
(ntfKeys, ntfCreds) <- liftIO $ mkNtfCreds a g smp
14251426
(thParams smp,ntfKeys,) <$> createSMPQueue smp nm nonce_ rKeys dhKey auth subMode (queueReqData cqrd) ntfCreds
14261427
-- TODO [certs rcv] validate that serviceId is the same as in the client session, fail otherwise
1428+
-- possibly, it should allow returning Nothing - it would indicate incorrect old version
14271429
liftIO . logServer "<--" c srv NoEntity $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
14281430
shortLink <- mkShortLinkCreds thParams' qik
14291431
let rq =
@@ -1575,7 +1577,7 @@ subscribeQueues c qs = do
15751577
processSubResults = mapM_ $ uncurry $ processSubResult c sessId
15761578
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
15771579

1578-
subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM Int64
1580+
subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM (Int64, IdsHash)
15791581
subscribeClientService c userId srv =
15801582
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $
15811583
(`subscribeService` SMP.SRecipientService) . connectedClient

src/Simplex/Messaging/Client.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -884,12 +884,12 @@ nsubResponse_ = \case
884884
{-# INLINE nsubResponse_ #-}
885885

886886
-- This command is always sent in background request mode
887-
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
887+
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO (Int64, IdsHash)
888888
subscribeService c party = case smpClientService c of
889889
Just THClientService {serviceId, serviceKey} -> do
890890
liftIO $ enablePings c
891891
sendSMPCommand c NRMBackground (Just (C.APrivateAuthKey C.SEd25519 serviceKey)) serviceId subCmd >>= \case
892-
SOKS n -> pure n
892+
SOKS n idsHash -> pure (n, idsHash)
893893
r -> throwE $ unexpectedResponse r
894894
where
895895
subCmd :: Command p

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,14 +479,14 @@ smpSubscribeService ca smp srv serviceSub@(serviceId, _) = case smpClientService
479479
(True <$ processSubscription r)
480480
(pure False)
481481
if ok
482-
then case r of
483-
Right n -> notify ca $ CAServiceSubscribed srv serviceSub n
482+
then case r of -- TODO [certs rcv] compare hash
483+
Right (n, _idsHash) -> notify ca $ CAServiceSubscribed srv serviceSub n
484484
Left e
485485
| smpClientServiceError e -> notifyUnavailable
486486
| temporaryClientError e -> reconnectClient ca srv
487487
| otherwise -> notify ca $ CAServiceSubError srv serviceSub e
488488
else reconnectClient ca srv
489-
processSubscription = mapM_ $ \n -> do
489+
processSubscription = mapM_ $ \(n, _idsHash) -> do -- TODO [certs rcv] validate hash here?
490490
setActiveServiceSub ca srv $ Just ((serviceId, n), sessId)
491491
setPendingServiceSub ca srv Nothing
492492
serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =

src/Simplex/Messaging/Protocol.hs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ module Simplex.Messaging.Protocol
139139
RcvMessage (..),
140140
MsgId,
141141
MsgBody,
142+
IdsHash,
142143
MaxMessageLen,
143144
MaxRcvMessageLen,
144145
EncRcvMsgBody (..),
@@ -692,11 +693,13 @@ data BrokerMsg where
692693
-- | Service subscription success - confirms when queue was associated with the service
693694
SOK :: Maybe ServiceId -> BrokerMsg
694695
-- | The number of queues subscribed with SUBS command
695-
SOKS :: Int64 -> BrokerMsg
696+
SOKS :: Int64 -> IdsHash -> BrokerMsg
696697
-- MSG v1/2 has to be supported for encoding/decoding
697698
-- v1: MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg
698699
-- v2: MsgId -> SystemTime -> MsgFlags -> MsgBody -> BrokerMsg
699700
MSG :: RcvMessage -> BrokerMsg
701+
-- sent once delivering messages to SUBS command is complete
702+
SALL :: BrokerMsg
700703
NID :: NotifierId -> RcvNtfPublicDhKey -> BrokerMsg
701704
NMSG :: C.CbNonce -> EncNMsgMeta -> BrokerMsg
702705
-- Should include certificate chain
@@ -933,6 +936,7 @@ data BrokerMsgTag
933936
| SOK_
934937
| SOKS_
935938
| MSG_
939+
| SALL_
936940
| NID_
937941
| NMSG_
938942
| PKEY_
@@ -1025,6 +1029,7 @@ instance Encoding BrokerMsgTag where
10251029
SOK_ -> "SOK"
10261030
SOKS_ -> "SOKS"
10271031
MSG_ -> "MSG"
1032+
SALL_ -> "SALL"
10281033
NID_ -> "NID"
10291034
NMSG_ -> "NMSG"
10301035
PKEY_ -> "PKEY"
@@ -1046,6 +1051,7 @@ instance ProtocolMsgTag BrokerMsgTag where
10461051
"SOK" -> Just SOK_
10471052
"SOKS" -> Just SOKS_
10481053
"MSG" -> Just MSG_
1054+
"SALL" -> Just SALL_
10491055
"NID" -> Just NID_
10501056
"NMSG" -> Just NMSG_
10511057
"PKEY" -> Just PKEY_
@@ -1448,6 +1454,8 @@ type MsgId = ByteString
14481454
-- | SMP message body.
14491455
type MsgBody = ByteString
14501456

1457+
type IdsHash = ByteString
1458+
14511459
data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock
14521460

14531461
-- | Type for protocol errors.
@@ -1807,9 +1815,12 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
18071815
SOK serviceId_
18081816
| v >= serviceCertsSMPVersion -> e (SOK_, ' ', serviceId_)
18091817
| otherwise -> e OK_ -- won't happen, the association with the service requires v >= serviceCertsSMPVersion
1810-
SOKS n -> e (SOKS_, ' ', n)
1818+
SOKS n idsHash
1819+
| v >= rcvServiceSMPVersion -> e (SOKS_, ' ', n, idsHash)
1820+
| otherwise -> e (SOKS_, ' ', n)
18111821
MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body} ->
18121822
e (MSG_, ' ', msgId, Tail body)
1823+
SALL -> e SALL_
18131824
NID nId srvNtfDh -> e (NID_, ' ', nId, srvNtfDh)
18141825
NMSG nmsgNonce encNMsgMeta -> e (NMSG_, ' ', nmsgNonce, encNMsgMeta)
18151826
PKEY sid vr certKey -> e (PKEY_, ' ', sid, vr, certKey)
@@ -1836,6 +1847,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
18361847
MSG . RcvMessage msgId <$> bodyP
18371848
where
18381849
bodyP = EncRcvMsgBody . unTail <$> smpP
1850+
SALL_ -> pure SALL
18391851
IDS_
18401852
| v >= newNtfCredsSMPVersion -> ids smpP smpP smpP smpP
18411853
| v >= serviceCertsSMPVersion -> ids smpP smpP smpP nothing
@@ -1856,7 +1868,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
18561868
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId, serviceId, serverNtfCreds}
18571869
LNK_ -> LNK <$> _smpP <*> smpP
18581870
SOK_ -> SOK <$> _smpP
1859-
SOKS_ -> SOKS <$> _smpP
1871+
SOKS_
1872+
| v >= rcvServiceSMPVersion -> SOKS <$> _smpP <*> smpP
1873+
| otherwise -> SOKS <$> _smpP <*> pure B.empty
18601874
NID_ -> NID <$> _smpP <*> smpP
18611875
NMSG_ -> NMSG <$> _smpP <*> smpP
18621876
PKEY_ -> PKEY <$> _smpP <*> smpP <*> smpP
@@ -1886,6 +1900,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
18861900
PONG -> noEntityMsg
18871901
PKEY {} -> noEntityMsg
18881902
RRES _ -> noEntityMsg
1903+
SALL -> noEntityMsg
18891904
-- other broker responses must have queue ID
18901905
_
18911906
| B.null entId -> Left $ CMD NO_ENTITY

src/Simplex/Messaging/Server.hs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,17 +1790,18 @@ client
17901790
subscribeServiceMessages serviceId =
17911791
sharedSubscribeService SRecipientService serviceId subscribers serviceSubscribed serviceSubsCount >>= \case
17921792
Left e -> pure $ ERR e
1793-
Right (hasSub, count) -> do
1793+
Right (hasSub, (count, idsHash)) -> do
17941794
unless hasSub $ forkClient clnt "deliverServiceMessages" $ liftIO $ deliverServiceMessages count
1795-
pure $ SOKS count
1795+
pure $ SOKS count idsHash
17961796
where
17971797
deliverServiceMessages expectedCnt = do
17981798
(qCnt, _msgCnt, _dupCnt, _errCnt) <- foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, 0)
1799+
atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, SALL)]
17991800
-- TODO [cert rcv] compare with expected
18001801
logNote $ "Service subscriptions for " <> tshow serviceId <> " (" <> tshow qCnt <> " queues)"
18011802
deliverQueueMsg :: (Int, Int, Int, Int) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int, Int, Int, Int)
18021803
deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, !errCnt) rId = \case
1803-
Left e -> pure (qCnt + 1, msgCnt, dupCnt, errCnt + 1) -- TODO deliver subscription error
1804+
Left e -> pure (qCnt + 1, msgCnt, dupCnt, errCnt + 1) -- TODO [certs rcv] deliver subscription error
18041805
Right qMsg_ -> case qMsg_ of
18051806
Nothing -> pure (qCnt + 1, msgCnt, dupCnt, errCnt)
18061807
Just (qr, msg) ->
@@ -1823,23 +1824,23 @@ client
18231824

18241825
subscribeServiceNotifications :: ServiceId -> M s BrokerMsg
18251826
subscribeServiceNotifications serviceId =
1826-
either ERR (SOKS . snd) <$> sharedSubscribeService SNotifierService serviceId ntfSubscribers ntfServiceSubscribed ntfServiceSubsCount
1827+
either ERR (uncurry SOKS . snd) <$> sharedSubscribeService SNotifierService serviceId ntfSubscribers ntfServiceSubscribed ntfServiceSubsCount
18271828

1828-
sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar Int64) -> M s (Either ErrorType (Bool, Int64))
1829+
sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar Int64) -> M s (Either ErrorType (Bool, (Int64, IdsHash)))
18291830
sharedSubscribeService party serviceId srvSubscribers clientServiceSubscribed clientServiceSubs = do
18301831
subscribed <- readTVarIO $ clientServiceSubscribed clnt
18311832
liftIO $ runExceptT $
18321833
(subscribed,)
18331834
<$> if subscribed
1834-
then readTVarIO (clientServiceSubs clnt)
1835+
then (,B.empty) <$> readTVarIO (clientServiceSubs clnt) -- TODO [certs rcv] get IDs hash
18351836
else do
18361837
count' <- ExceptT $ getServiceQueueCount @(StoreQueue s) (queueStore ms) party serviceId
18371838
incCount <- atomically $ do
18381839
writeTVar (clientServiceSubscribed clnt) True
18391840
count <- swapTVar (clientServiceSubs clnt) count'
18401841
pure $ count' - count
18411842
atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId incCount, clientId)
1842-
pure count'
1843+
pure (count', B.empty) -- TODO [certs rcv] get IDs hash
18431844

18441845
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
18451846
acknowledgeMsg msgId q qr =

0 commit comments

Comments
 (0)