Skip to content

Commit df99d2f

Browse files
committed
feat(state-stores): Add StateStore::upsert_thread_subscriptions() method for bulk upsert
1 parent 10ff5d0 commit df99d2f

File tree

8 files changed

+332
-1
lines changed

8 files changed

+332
-1
lines changed

crates/matrix-sdk-base/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ All notable changes to this project will be documented in this file.
2020
([#5817](https://github.com/matrix-org/matrix-rust-sdk/pull/5817))
2121
- `ComposerDraft` can now store attachments alongside text messages.
2222
([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794))
23+
- Add `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
24+
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
2325

2426
## [0.14.1] - 2025-09-10
2527

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ pub trait StateStoreIntegrationTests {
109109
async fn test_thread_subscriptions(&self) -> TestResult;
110110
/// Test thread subscription bumpstamp semantics.
111111
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
112+
/// Test thread subscriptions bulk upsert, including bumpstamp semantics.
113+
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
112114
}
113115

114116
impl StateStoreIntegrationTests for DynStateStore {
@@ -1955,6 +1957,183 @@ impl StateStoreIntegrationTests for DynStateStore {
19551957

19561958
Ok(())
19571959
}
1960+
1961+
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1962+
let threads = [
1963+
event_id!("$t1"),
1964+
event_id!("$t2"),
1965+
event_id!("$t3"),
1966+
event_id!("$t4"),
1967+
event_id!("$t5"),
1968+
event_id!("$t6"),
1969+
];
1970+
// Helper for building the input for `upsert_thread_subscriptions()`,
1971+
// which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
1972+
let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1973+
threads
1974+
.iter()
1975+
.zip(subs)
1976+
.map(|(&event_id, &sub)| (room_id(), event_id, sub))
1977+
.collect::<Vec<_>>()
1978+
};
1979+
1980+
// Test bump_stamp logic
1981+
let initial_subscriptions = build_subscription_updates(&[
1982+
StoredThreadSubscription {
1983+
status: ThreadSubscriptionStatus::Unsubscribed,
1984+
bump_stamp: None,
1985+
},
1986+
StoredThreadSubscription {
1987+
status: ThreadSubscriptionStatus::Unsubscribed,
1988+
bump_stamp: Some(14),
1989+
},
1990+
StoredThreadSubscription {
1991+
status: ThreadSubscriptionStatus::Unsubscribed,
1992+
bump_stamp: None,
1993+
},
1994+
StoredThreadSubscription {
1995+
status: ThreadSubscriptionStatus::Unsubscribed,
1996+
bump_stamp: Some(210),
1997+
},
1998+
StoredThreadSubscription {
1999+
status: ThreadSubscriptionStatus::Unsubscribed,
2000+
bump_stamp: Some(5),
2001+
},
2002+
StoredThreadSubscription {
2003+
status: ThreadSubscriptionStatus::Unsubscribed,
2004+
bump_stamp: Some(100),
2005+
},
2006+
]);
2007+
2008+
let update_subscriptions = build_subscription_updates(&[
2009+
StoredThreadSubscription {
2010+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2011+
bump_stamp: None,
2012+
},
2013+
StoredThreadSubscription {
2014+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2015+
bump_stamp: None,
2016+
},
2017+
StoredThreadSubscription {
2018+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2019+
bump_stamp: Some(1101),
2020+
},
2021+
StoredThreadSubscription {
2022+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2023+
bump_stamp: Some(222),
2024+
},
2025+
StoredThreadSubscription {
2026+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2027+
bump_stamp: Some(1),
2028+
},
2029+
StoredThreadSubscription {
2030+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2031+
bump_stamp: Some(100),
2032+
},
2033+
]);
2034+
2035+
let expected_subscriptions = build_subscription_updates(&[
2036+
// Status should be updated, because prev and new bump_stamp are both None
2037+
StoredThreadSubscription {
2038+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2039+
bump_stamp: None,
2040+
},
2041+
// Status should be updated, but keep initial bump_stamp (new is None)
2042+
StoredThreadSubscription {
2043+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2044+
bump_stamp: Some(14),
2045+
},
2046+
// Status should be updated and also bump_stamp should be updated (inital was None)
2047+
StoredThreadSubscription {
2048+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2049+
bump_stamp: Some(1101),
2050+
},
2051+
// Status should be updated and also bump_stamp should be updated (inital was lower)
2052+
StoredThreadSubscription {
2053+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2054+
bump_stamp: Some(222),
2055+
},
2056+
// Status shouldn't change, as new bump_stamp is lower
2057+
StoredThreadSubscription {
2058+
status: ThreadSubscriptionStatus::Unsubscribed,
2059+
bump_stamp: Some(5),
2060+
},
2061+
// Status shouldn't change, as bump_stamp is equal to the previous one
2062+
StoredThreadSubscription {
2063+
status: ThreadSubscriptionStatus::Unsubscribed,
2064+
bump_stamp: Some(100),
2065+
},
2066+
]);
2067+
2068+
// Set the initial subscriptions
2069+
self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2070+
2071+
// Assert the subscriptions have been added
2072+
for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2073+
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2074+
assert_eq!(stored_subscription, Some(*expected_sub));
2075+
}
2076+
2077+
// Update subscriptions
2078+
self.upsert_thread_subscriptions(update_subscriptions).await?;
2079+
2080+
// Assert the expected subscriptions and bump_stamps
2081+
for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2082+
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2083+
assert_eq!(stored_subscription, Some(*expected_sub));
2084+
}
2085+
2086+
// Test just state changes, but first remove previous subscriptions
2087+
for (room_id, thread_id, _) in &expected_subscriptions {
2088+
self.remove_thread_subscription(room_id, thread_id).await?;
2089+
}
2090+
2091+
let initial_subscriptions = build_subscription_updates(&[
2092+
StoredThreadSubscription {
2093+
status: ThreadSubscriptionStatus::Unsubscribed,
2094+
bump_stamp: Some(1),
2095+
},
2096+
StoredThreadSubscription {
2097+
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2098+
bump_stamp: Some(1),
2099+
},
2100+
StoredThreadSubscription {
2101+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2102+
bump_stamp: Some(1),
2103+
},
2104+
]);
2105+
2106+
self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2107+
2108+
for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2109+
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2110+
assert_eq!(stored_subscription, Some(*expected_sub));
2111+
}
2112+
2113+
let update_subscriptions = build_subscription_updates(&[
2114+
StoredThreadSubscription {
2115+
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2116+
bump_stamp: Some(2),
2117+
},
2118+
StoredThreadSubscription {
2119+
status: ThreadSubscriptionStatus::Unsubscribed,
2120+
bump_stamp: Some(2),
2121+
},
2122+
StoredThreadSubscription {
2123+
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2124+
bump_stamp: Some(2),
2125+
},
2126+
]);
2127+
2128+
self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2129+
2130+
for (room_id, thread_id, expected_sub) in &update_subscriptions {
2131+
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2132+
assert_eq!(stored_subscription, Some(*expected_sub));
2133+
}
2134+
2135+
Ok(())
2136+
}
19582137
}
19592138

19602139
/// Macro building to allow your StateStore implementation to run the entire
@@ -2141,6 +2320,12 @@ macro_rules! statestore_integration_tests {
21412320
let store = get_store().await?.into_state_store();
21422321
store.test_thread_subscriptions_bumpstamps().await
21432322
}
2323+
2324+
#[async_test]
2325+
async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2326+
let store = get_store().await?.into_state_store();
2327+
store.test_thread_subscriptions_bulk_upsert().await
2328+
}
21442329
}
21452330
};
21462331
}

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,33 @@ impl StateStore for MemoryStore {
10051005
Ok(())
10061006
}
10071007

1008+
async fn upsert_thread_subscriptions(
1009+
&self,
1010+
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1011+
) -> Result<(), Self::Error> {
1012+
let mut inner = self.inner.write().unwrap();
1013+
1014+
for (room_id, thread_id, mut new) in updates {
1015+
let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default();
1016+
1017+
if let Some(previous) = room_subs.get(thread_id) {
1018+
if *previous == new {
1019+
continue;
1020+
}
1021+
if !compare_thread_subscription_bump_stamps(
1022+
previous.bump_stamp,
1023+
&mut new.bump_stamp,
1024+
) {
1025+
continue;
1026+
}
1027+
}
1028+
1029+
room_subs.insert(thread_id.to_owned(), new);
1030+
}
1031+
1032+
Ok(())
1033+
}
1034+
10081035
async fn load_thread_subscription(
10091036
&self,
10101037
room: &RoomId,

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ pub trait StateStore: AsyncTraitDeps {
487487
/// bumpstamp is kept.
488488
///
489489
/// If the new thread subscription has a bumpstamp that's lower than or
490-
/// equal to a previously one, the existing subscription is kept, i.e.
490+
/// equal to a previous one, the existing subscription is kept, i.e.
491491
/// this method must have no effect.
492492
async fn upsert_thread_subscription(
493493
&self,
@@ -496,6 +496,20 @@ pub trait StateStore: AsyncTraitDeps {
496496
subscription: StoredThreadSubscription,
497497
) -> Result<(), Self::Error>;
498498

499+
/// Inserts or updates multiple thread subscriptions.
500+
///
501+
/// If the new thread subscription hasn't set a bumpstamp, and there was a
502+
/// previous subscription in the database with a bumpstamp, the existing
503+
/// bumpstamp is kept.
504+
///
505+
/// If the new thread subscription has a bumpstamp that's lower than or
506+
/// equal to a previous one, the existing subscription is kept, i.e.
507+
/// this method must have no effect.
508+
async fn upsert_thread_subscriptions(
509+
&self,
510+
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
511+
) -> Result<(), Self::Error>;
512+
499513
/// Remove a previous thread subscription for a given room and thread.
500514
///
501515
/// Note: removing an unknown thread subscription is a no-op.
@@ -817,6 +831,13 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
817831
self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
818832
}
819833

834+
async fn upsert_thread_subscriptions(
835+
&self,
836+
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
837+
) -> Result<(), Self::Error> {
838+
self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
839+
}
840+
820841
async fn load_thread_subscription(
821842
&self,
822843
room: &RoomId,

crates/matrix-sdk-indexeddb/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ All notable changes to this project will be documented in this file.
1212
([#5819](https://github.com/matrix-org/matrix-rust-sdk/pull/5819))
1313
- [**breaking**] `IndexeddbCryptoStore::get_withheld_info` now returns `Result<Option<RoomKeyWithheldEntry>, ...>`.
1414
([#5737](https://github.com/matrix-org/matrix-rust-sdk/pull/5737))
15+
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
16+
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
1517

1618
### Performance
1719

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,6 +1964,47 @@ impl_state_store!({
19641964
Ok(())
19651965
}
19661966

1967+
async fn upsert_thread_subscriptions(
1968+
&self,
1969+
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1970+
) -> Result<()> {
1971+
let tx = self
1972+
.inner
1973+
.transaction(keys::THREAD_SUBSCRIPTIONS)
1974+
.with_mode(TransactionMode::Readwrite)
1975+
.build()?;
1976+
let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1977+
1978+
for (room_id, thread_id, subscription) in updates {
1979+
let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1980+
let mut new = PersistedThreadSubscription::from(subscription);
1981+
1982+
// See if there's a previous subscription.
1983+
if let Some(previous_value) = obj.get(&encoded_key).await? {
1984+
let previous: PersistedThreadSubscription =
1985+
self.deserialize_value(&previous_value)?;
1986+
1987+
// If the previous status is the same as the new one, don't do anything.
1988+
if new == previous {
1989+
continue;
1990+
}
1991+
if !compare_thread_subscription_bump_stamps(
1992+
previous.bump_stamp,
1993+
&mut new.bump_stamp,
1994+
) {
1995+
continue;
1996+
}
1997+
}
1998+
1999+
let serialized_value = self.serialize_value(&new);
2000+
obj.put(&serialized_value?).with_key(encoded_key).build()?;
2001+
}
2002+
2003+
tx.commit().await?;
2004+
2005+
Ok(())
2006+
}
2007+
19672008
async fn load_thread_subscription(
19682009
&self,
19692010
room: &RoomId,

crates/matrix-sdk-sqlite/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ All notable changes to this project will be documented in this file.
1515

1616
- Implement a new constructor that allows to open `SqliteCryptoStore` with a cryptographic key
1717
([#5472](https://github.com/matrix-org/matrix-rust-sdk/pull/5472))
18+
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
19+
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
1820

1921
### Refactor
2022
- [breaking] Change the logic for opening a store so as to use a `Secret` enum in the function `open_with_pool` instead of a `passphrase`

0 commit comments

Comments
 (0)