Skip to content

Commit b7db588

Browse files
committed
tests: introduce new test for compression
test are trying both algorithms (snappy, lz4), while counting the size of the frames coming into scylla for reference we also have one test doing the same without compression test main structure was borrowed from java-driver, the usage of proxy to count the packets is unique to this change.
1 parent 86efc40 commit b7db588

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use crate::utils::{test_with_3_node_cluster, unique_keyspace_name, PerformDDL};
2+
use scylla::client::session::Session;
3+
use scylla::client::session_builder::SessionBuilder;
4+
use scylla::statement::query::Query;
5+
use scylla::frame::Compression;
6+
7+
use scylla_proxy::{ProxyError, WorkerError, ShardAwareness, RequestRule, Reaction, RequestReaction, Condition, RequestOpcode};
8+
9+
use tokio::sync::mpsc;
10+
use std::sync::Arc;
11+
12+
async fn test_compression(compression: Option<Compression>, text_size: usize, expected_frame_total_size_range: std::ops::Range<usize>) {
13+
let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
14+
15+
let request_rule = |tx| {
16+
RequestRule(
17+
Condition::or(Condition::RequestOpcode(RequestOpcode::Execute), Condition::RequestOpcode(RequestOpcode::Query)),
18+
RequestReaction::noop().with_feedback_when_performed(tx),
19+
)
20+
};
21+
22+
let (request_tx, mut request_rx) = mpsc::unbounded_channel();
23+
for running_node in running_proxy.running_nodes.iter_mut() {
24+
running_node.change_request_rules(Some(vec![request_rule(request_tx.clone())]));
25+
}
26+
27+
let session: Session = SessionBuilder::new()
28+
.known_node(proxy_uris[0].as_str())
29+
.address_translator(Arc::new(translation_map))
30+
.compression(compression)
31+
.build()
32+
.await
33+
.unwrap();
34+
35+
let ks = unique_keyspace_name();
36+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap();
37+
session.use_keyspace(ks, false).await.unwrap();
38+
session
39+
.ddl("CREATE TABLE test (k text PRIMARY KEY, t text, i int, f float)")
40+
.await
41+
.unwrap();
42+
43+
let q = Query::from("INSERT INTO test (k, t, i, f) VALUES (?, ?, ?, ?)");
44+
let large_string = "a".repeat(text_size);
45+
session.query_unpaged(q.clone(), ("key", large_string.clone(), 42_i32, 24.03_f32)).await.unwrap();
46+
47+
let result: Vec<(String, String, i32, f32)> = session
48+
.query_unpaged("SELECT k, t, i, f FROM test WHERE k = 'key'", &[])
49+
.await
50+
.unwrap()
51+
.into_rows_result()
52+
.unwrap()
53+
.rows::<(String, String, i32, f32)>()
54+
.unwrap()
55+
.collect::<Result<_, _>>()
56+
.unwrap();
57+
58+
assert_eq!(result, vec![((String::from("key"), String::from(large_string), 42_i32, 24.03_f32))]);
59+
60+
61+
let mut total_frame_size = 0;
62+
while let Ok((request_frame, _shard)) = request_rx.try_recv() {
63+
total_frame_size += request_frame.body.len();
64+
}
65+
println!("Total frame size: {}", total_frame_size);
66+
assert!(expected_frame_total_size_range.contains(&total_frame_size));
67+
68+
running_proxy
69+
70+
}).await;
71+
72+
match res {
73+
Ok(()) => (),
74+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
75+
Err(err) => panic!("{}", err),
76+
}
77+
78+
}
79+
80+
#[tokio::test]
81+
#[cfg(not(scylla_cloud_tests))]
82+
async fn should_execute_queries_without_compression() {
83+
test_compression(None, 1024, 4200..5000).await;
84+
}
85+
86+
#[tokio::test]
87+
#[cfg(not(scylla_cloud_tests))]
88+
async fn should_execute_queries_without_compression_10mb() {
89+
test_compression(None, 1024 * 10, 13300..15000).await;
90+
}
91+
92+
#[tokio::test]
93+
#[cfg(not(scylla_cloud_tests))]
94+
async fn should_execute_queries_with_snappy_compression() {
95+
test_compression(Some(Compression::Snappy), 1024, 3100..4000).await;
96+
}
97+
98+
#[tokio::test]
99+
#[cfg(not(scylla_cloud_tests))]
100+
async fn should_execute_queries_with_snappy_compression_10mb() {
101+
test_compression(Some(Compression::Snappy), 1024 * 10, 3300..4000).await;
102+
}
103+
104+
#[tokio::test]
105+
#[cfg(not(scylla_cloud_tests))]
106+
async fn should_execute_queries_with_lz4_compression(){
107+
test_compression(Some(Compression::Lz4), 1024, 3100..4000).await;
108+
}
109+
110+
#[tokio::test]
111+
#[cfg(not(scylla_cloud_tests))]
112+
async fn should_execute_queries_with_lz4_compression_10mb(){
113+
test_compression(Some(Compression::Lz4), 1024 * 10, 3300..4000).await;
114+
}

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ mod silent_prepare_batch;
1818
mod silent_prepare_query;
1919
mod skip_metadata_optimization;
2020
mod tablets;
21+
mod compression;
2122
pub(crate) mod utils;

0 commit comments

Comments
 (0)