Skip to content

Commit 5f8f07a

Browse files
committed
Add a bad integration test for zero-downtime patches
I think it's best to test this issue through integration tests, but there seems to be no way to clear the server-side prepared statements without access to the raw packet stream, as they're not named and thus cannot be deleted via DEALLOCATE PREPARED. To make the test work, additional interfaces have to be made available, which should not be shipped as part of the library for regular use.
1 parent c703015 commit 5f8f07a

File tree

4 files changed

+38
-1
lines changed

4 files changed

+38
-1
lines changed

sqlx-core/src/common/statement_cache.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,8 @@ impl<T> StatementCache<T> {
7272
pub fn is_enabled(&self) -> bool {
7373
self.capacity() > 0
7474
}
75+
76+
pub fn iter(&self) -> impl Iterator<Item=&T> {
77+
self.inner.iter().map(|(_, v)| v)
78+
}
7579
}

sqlx-mysql/src/connection/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl MySqlConnection {
183183
loop {
184184
// query response is a meta-packet which may be one of:
185185
// Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
186-
let mut packet = self.inner.stream.recv_packet().await.inspect_err(|_| {
186+
let mut packet = self.inner.stream.recv_packet().await.inspect_err(|e| {
187187
// if a prepared statement vanished on the server side, we get an error here
188188
// clear the statement cache in case the connection got reset to cause re-preparing
189189
self.inner.cache_statement.clear();

sqlx-mysql/src/connection/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,19 @@ impl MySqlConnection {
5757
.status_flags
5858
.intersects(Status::SERVER_STATUS_IN_TRANS)
5959
}
60+
61+
pub async fn nuke_cached_statements(&mut self) -> Result<(), Error> {
62+
for (statement_id, _) in self.inner.cache_statement.iter() {
63+
self.inner
64+
.stream
65+
.send_packet(StmtClose {
66+
statement: *statement_id,
67+
})
68+
.await?;
69+
}
70+
71+
Ok(())
72+
}
6073
}
6174

6275
impl Debug for MySqlConnection {

tests/mysql/mysql.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,26 @@ async fn it_maths() -> anyhow::Result<()> {
5454
Ok(())
5555
}
5656

57+
#[sqlx_macros::test]
58+
async fn it_clears_statement_cache_on_error() -> anyhow::Result<()> {
59+
setup_if_needed();
60+
61+
let query = "SELECT 1";
62+
63+
let mut conn = new::<MySql>().await?;
64+
let _ = sqlx::query(query).fetch_one(&mut conn).await?;
65+
assert_eq!(1, conn.cached_statements_size());
66+
67+
// clear cached statements only on the server side
68+
conn.nuke_cached_statements().await?;
69+
assert_eq!(1, conn.cached_statements_size());
70+
71+
assert!(sqlx::query(query).fetch_one(&mut conn).await.is_err());
72+
assert_eq!(0, conn.cached_statements_size());
73+
74+
Ok(())
75+
}
76+
5777
#[sqlx_macros::test]
5878
async fn it_can_fail_at_querying() -> anyhow::Result<()> {
5979
let mut conn = new::<MySql>().await?;

0 commit comments

Comments
 (0)