From 6b34ce7edc3ac15406bfe726805a1cb096e37a06 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 16:28:05 +0800 Subject: [PATCH 1/8] split batches for RawKV requests Signed-off-by: lance6716 --- examples/bench_batch_put.rs | 91 +++++++++++++++++++++++++++++++++++++ src/raw/requests.rs | 34 +++++++++++++- src/request/mod.rs | 3 +- src/request/plan.rs | 5 +- src/request/shard.rs | 17 +++++++ 5 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 examples/bench_batch_put.rs diff --git a/examples/bench_batch_put.rs b/examples/bench_batch_put.rs new file mode 100644 index 00000000..223afa61 --- /dev/null +++ b/examples/bench_batch_put.rs @@ -0,0 +1,91 @@ +// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. + +#![type_length_limit = "8165158"] + +mod common; + +use std::time::Instant; +use tikv_client::Config; +use tikv_client::KvPair; +use tikv_client::RawClient as Client; +use tikv_client::Result; + +use crate::common::parse_args; + +const TARGET_SIZE_MB: usize = 40; +const KEY_SIZE: usize = 32; +const VALUE_SIZE: usize = 1024; // 1KB per value + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + // Parse command line arguments + let args = parse_args("raw"); + + // Create a configuration to use for the example + let config = if let (Some(ca), Some(cert), Some(key)) = (args.ca, args.cert, args.key) { + Config::default().with_security(ca, cert, key) + } else { + Config::default() + }; + + // Create the client + let client = Client::new_with_config(args.pd, config).await?; + + // Calculate how many key-value pairs we need to reach 100MB + let pair_size = KEY_SIZE + VALUE_SIZE; + let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024; + let num_pairs = target_size_bytes / pair_size; + + println!("Preparing to create {} key-value pairs", num_pairs); + println!("Key size: {} bytes, Value size: {} bytes", KEY_SIZE, VALUE_SIZE); + println!("Total data size: ~{} MB", (num_pairs * pair_size) / (1024 * 1024)); + + // Generate key-value pairs + println!("Generating key-value pairs..."); + let generation_start = Instant::now(); + + let mut pairs = Vec::with_capacity(num_pairs); + for i in 0..num_pairs { + // Generate key: "bench_key_" + zero-padded number + let key = format!("bench_key_{:010}", i); + + // Generate value: repeat pattern to reach VALUE_SIZE + let pattern = format!("value_{}", i % 1000); + let mut value = String::new(); + while value.len() < VALUE_SIZE { + value.push_str(&pattern); + } + value.truncate(VALUE_SIZE); + + pairs.push(KvPair::from((key, value))); + } + + let generation_duration = generation_start.elapsed(); + println!("Generated {} pairs in {:?}", pairs.len(), generation_duration); + + // Perform batch_put and measure timing + println!("Starting batch_put operation..."); + let batch_put_start = Instant::now(); + + client.batch_put(pairs).await.expect("Failed to perform batch_put"); + + let batch_put_duration = batch_put_start.elapsed(); + + // Calculate statistics + let total_bytes = num_pairs * pair_size; + let throughput_mb_per_sec = (total_bytes as f64 / (1024.0 * 1024.0)) / batch_put_duration.as_secs_f64(); + let ops_per_sec = num_pairs as f64 / batch_put_duration.as_secs_f64(); + + // Print results + println!("\n=== Batch Put Benchmark Results ==="); + println!("Total key-value pairs: {}", num_pairs); + println!("Total data size: {:.2} MB", total_bytes as f64 / (1024.0 * 1024.0)); + println!("Batch put duration: {:?}", batch_put_duration); + println!("Throughput: {:.2} MB/s", throughput_mb_per_sec); + println!("Operations per second: {:.2} ops/s", ops_per_sec); + println!("Average latency per operation: {:.2} μs", batch_put_duration.as_micros() as f64 / num_pairs as f64); + + Ok(()) +} diff --git a/src/raw/requests.rs b/src/raw/requests.rs index f6f32f3d..4de8df79 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,7 @@ use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; -use crate::request::Collect; +use crate::request::{Batchable, Collect}; use crate::request::CollectSingle; use crate::request::DefaultProcessor; use crate::request::KvRequest; @@ -39,8 +39,11 @@ use std::any::Any; use std::ops::Range; use std::sync::Arc; use std::time::Duration; +use futures::{stream, StreamExt}; use tonic::transport::Channel; +const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB + pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest { let mut req = kvrpcpb::RawGetRequest::default(); req.key = key; @@ -188,6 +191,14 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { type Response = kvrpcpb::RawBatchPutResponse; } +impl Batchable for kvrpcpb::RawBatchPutRequest { + type Item = (kvrpcpb::KvPair, u64); + + fn item_size(item: &Self::Item) -> u64 { + (item.0.key.len() + item.0.value.len()) as u64 + } +} + impl Shardable for kvrpcpb::RawBatchPutRequest { type Shard = Vec<(kvrpcpb::KvPair, u64)>; @@ -204,6 +215,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .collect(); kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) + .flat_map(|result| match result { + Ok((keys, region)) => { + stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE)) + .map(move |batch| Ok((batch, region.clone()))) + .boxed() + } + Err(e) => stream::iter(Err(e)).boxed(), + }) + .boxed() } fn apply_shard(&mut self, shard: Self::Shard) { @@ -212,6 +232,18 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { self.ttls = ttls; } + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + let mut cloned = Self::default(); + cloned.context = self.context.clone(); + cloned.cf = self.cf.clone(); + cloned.for_cas = self.for_cas; + cloned.apply_shard(shard); + cloned + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.set_leader(&store.region_with_leader) } diff --git a/src/request/mod.rs b/src/request/mod.rs index a9913e74..558fdd43 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -130,7 +130,8 @@ mod test { impl HasLocks for MockRpcResponse {} #[derive(Clone)] - struct MockKvRequest { + #[derive(Default)] +struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/plan.rs b/src/request/plan.rs index be915f77..c4a1a368 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -117,11 +117,10 @@ where ) -> Result<::Result> { let shards = current_plan.shards(&pd_client).collect::>().await; debug!("single_plan_handler, shards: {}", shards.len()); - let mut handles = Vec::new(); + let mut handles = Vec::with_capacity(shards.len()); for shard in shards { let (shard, region) = shard?; - let mut clone = current_plan.clone(); - clone.apply_shard(shard); + let clone = current_plan.clone_then_apply_shard(shard); let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), clone, diff --git a/src/request/shard.rs b/src/request/shard.rs index a68f446a..2c03c10b 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -48,6 +48,16 @@ pub trait Shardable { fn apply_shard(&mut self, shard: Self::Shard); + /// Implementation can skip unnecessary fields clone if fields will be overwritten by `apply_shard`. + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + let mut cloned = self.clone(); + cloned.apply_shard(shard); + cloned + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()>; } @@ -103,6 +113,13 @@ impl Shardable for Dispatch { self.request.apply_shard(shard); } + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() } + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.kv_client = Some(store.client.clone()); self.request.apply_store(store) From 3b0102e33470b85c87b2f0e70506fed3fbf43901 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 16:55:34 +0800 Subject: [PATCH 2/8] add test Signed-off-by: lance6716 --- examples/bench_batch_put.rs | 91 ------------------------------------- tests/integration_tests.rs | 34 ++++++++++++++ 2 files changed, 34 insertions(+), 91 deletions(-) delete mode 100644 examples/bench_batch_put.rs diff --git a/examples/bench_batch_put.rs b/examples/bench_batch_put.rs deleted file mode 100644 index 223afa61..00000000 --- a/examples/bench_batch_put.rs +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. - -#![type_length_limit = "8165158"] - -mod common; - -use std::time::Instant; -use tikv_client::Config; -use tikv_client::KvPair; -use tikv_client::RawClient as Client; -use tikv_client::Result; - -use crate::common::parse_args; - -const TARGET_SIZE_MB: usize = 40; -const KEY_SIZE: usize = 32; -const VALUE_SIZE: usize = 1024; // 1KB per value - -#[tokio::main] -async fn main() -> Result<()> { - env_logger::init(); - - // Parse command line arguments - let args = parse_args("raw"); - - // Create a configuration to use for the example - let config = if let (Some(ca), Some(cert), Some(key)) = (args.ca, args.cert, args.key) { - Config::default().with_security(ca, cert, key) - } else { - Config::default() - }; - - // Create the client - let client = Client::new_with_config(args.pd, config).await?; - - // Calculate how many key-value pairs we need to reach 100MB - let pair_size = KEY_SIZE + VALUE_SIZE; - let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024; - let num_pairs = target_size_bytes / pair_size; - - println!("Preparing to create {} key-value pairs", num_pairs); - println!("Key size: {} bytes, Value size: {} bytes", KEY_SIZE, VALUE_SIZE); - println!("Total data size: ~{} MB", (num_pairs * pair_size) / (1024 * 1024)); - - // Generate key-value pairs - println!("Generating key-value pairs..."); - let generation_start = Instant::now(); - - let mut pairs = Vec::with_capacity(num_pairs); - for i in 0..num_pairs { - // Generate key: "bench_key_" + zero-padded number - let key = format!("bench_key_{:010}", i); - - // Generate value: repeat pattern to reach VALUE_SIZE - let pattern = format!("value_{}", i % 1000); - let mut value = String::new(); - while value.len() < VALUE_SIZE { - value.push_str(&pattern); - } - value.truncate(VALUE_SIZE); - - pairs.push(KvPair::from((key, value))); - } - - let generation_duration = generation_start.elapsed(); - println!("Generated {} pairs in {:?}", pairs.len(), generation_duration); - - // Perform batch_put and measure timing - println!("Starting batch_put operation..."); - let batch_put_start = Instant::now(); - - client.batch_put(pairs).await.expect("Failed to perform batch_put"); - - let batch_put_duration = batch_put_start.elapsed(); - - // Calculate statistics - let total_bytes = num_pairs * pair_size; - let throughput_mb_per_sec = (total_bytes as f64 / (1024.0 * 1024.0)) / batch_put_duration.as_secs_f64(); - let ops_per_sec = num_pairs as f64 / batch_put_duration.as_secs_f64(); - - // Print results - println!("\n=== Batch Put Benchmark Results ==="); - println!("Total key-value pairs: {}", num_pairs); - println!("Total data size: {:.2} MB", total_bytes as f64 / (1024.0 * 1024.0)); - println!("Batch put duration: {:?}", batch_put_duration); - println!("Throughput: {:.2} MB/s", throughput_mb_per_sec); - println!("Operations per second: {:.2} ops/s", ops_per_sec); - println!("Average latency per operation: {:.2} μs", batch_put_duration.as_micros() as f64 / num_pairs as f64); - - Ok(()) -} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 73b43459..2a1646f0 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -871,6 +871,40 @@ async fn raw_write_million() -> Result<()> { Ok(()) } +/// Tests raw batch put has a large payload. +#[tokio::test] +#[serial] +async fn raw_large_batch_put() -> Result<()> { + const TARGET_SIZE_MB: usize = 100; + const KEY_SIZE: usize = 32; + const VALUE_SIZE: usize = 1024; + + let pair_size = KEY_SIZE + VALUE_SIZE; + let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024; + let num_pairs = target_size_bytes / pair_size; + let mut pairs = Vec::with_capacity(num_pairs); + for i in 0..num_pairs { + // Generate key: "bench_key_" + zero-padded number + let key = format!("bench_key_{:010}", i); + + // Generate value: repeat pattern to reach VALUE_SIZE + let pattern = format!("value_{}", i % 1000); + let mut value = String::new(); + while value.len() < VALUE_SIZE { + value.push_str(&pattern); + } + value.truncate(VALUE_SIZE); + + pairs.push(KvPair::from((key, value))); + } + + init().await?; + let client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; + + client.batch_put(pairs).await?; +} + /// Tests raw ttl API. #[tokio::test] #[serial] From e78d803aba4499984c54975580a3727325ed7246 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 17:00:40 +0800 Subject: [PATCH 3/8] format Signed-off-by: lance6716 --- tests/integration_tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 2a1646f0..f69edf15 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -903,6 +903,7 @@ async fn raw_large_batch_put() -> Result<()> { RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; client.batch_put(pairs).await?; + Ok(()) } /// Tests raw ttl API. From d4fcc7af1bf193514f934917e197b78384de6e6b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 17:06:34 +0800 Subject: [PATCH 4/8] cargo fmt Signed-off-by: lance6716 --- src/raw/requests.rs | 19 ++++++++++--------- src/request/mod.rs | 5 ++--- src/request/shard.rs | 5 ++++- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 4de8df79..f9da7420 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,6 @@ use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; -use crate::request::{Batchable, Collect}; use crate::request::CollectSingle; use crate::request::DefaultProcessor; use crate::request::KvRequest; @@ -19,6 +18,7 @@ use crate::request::Process; use crate::request::RangeRequest; use crate::request::Shardable; use crate::request::SingleKey; +use crate::request::{Batchable, Collect}; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; @@ -35,11 +35,11 @@ use crate::Result; use crate::Value; use async_trait::async_trait; use futures::stream::BoxStream; +use futures::{stream, StreamExt}; use std::any::Any; use std::ops::Range; use std::sync::Arc; use std::time::Duration; -use futures::{stream, StreamExt}; use tonic::transport::Channel; const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB @@ -216,13 +216,14 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((keys, region)) => { - stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE)) - .map(move |batch| Ok((batch, region.clone()))) - .boxed() - } - Err(e) => stream::iter(Err(e)).boxed(), - }) + Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches( + keys, + RAW_KV_REQUEST_BATCH_SIZE, + )) + .map(move |batch| Ok((batch, region.clone()))) + .boxed(), + Err(e) => stream::iter(Err(e)).boxed(), + }) .boxed() } diff --git a/src/request/mod.rs b/src/request/mod.rs index 558fdd43..f7c81a7c 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,9 +129,8 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone)] - #[derive(Default)] -struct MockKvRequest { + #[derive(Clone, Default)] + struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/shard.rs b/src/request/shard.rs index 2c03c10b..1bac69e4 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -117,7 +117,10 @@ impl Shardable for Dispatch { where Self: Sized + Clone, { - Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() } + Dispatch { + request: self.request.clone_then_apply_shard(shard), + kv_client: self.kv_client.clone(), + } } fn apply_store(&mut self, store: &RegionStore) -> Result<()> { From 669774e9ff2eec6c1ec0f5899106bc26eb7a0b7a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 17:09:46 +0800 Subject: [PATCH 5/8] remove unused change Signed-off-by: lance6716 --- src/request/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/request/mod.rs b/src/request/mod.rs index f7c81a7c..a9913e74 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,7 +129,7 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone, Default)] + #[derive(Clone)] struct MockKvRequest { test_invoking_count: Arc, } From c546874f648d20d9e94050b4d252a777c55b3d00 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 20:59:28 +0800 Subject: [PATCH 6/8] also split batch delete Signed-off-by: lance6716 --- src/raw/requests.rs | 51 +++++++++++++++++++++++++++++++++++++- tests/integration_tests.rs | 22 +++++++++++----- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index f9da7420..e7825b36 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -290,7 +290,56 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest { type Response = kvrpcpb::RawBatchDeleteResponse; } -shardable_keys!(kvrpcpb::RawBatchDeleteRequest); +impl Batchable for kvrpcpb::RawBatchDeleteRequest { + type Item = Vec; + + fn item_size(item: &Self::Item) -> u64 { + item.len() as u64 + } +} + +impl Shardable for kvrpcpb::RawBatchDeleteRequest { + type Shard = Vec>; + + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + let mut keys = self.keys.clone(); + keys.sort(); + region_stream_for_keys(keys.into_iter(), pd_client.clone()) + .flat_map(|result| match result { + Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchDeleteRequest::batches( + keys, + RAW_KV_REQUEST_BATCH_SIZE, + )) + .map(move |batch| Ok((batch, region.clone()))) + .boxed(), + Err(e) => stream::iter(Err(e)).boxed(), + }) + .boxed() + } + + fn apply_shard(&mut self, shard: Self::Shard) { + self.keys = shard; + } + + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + let mut cloned = Self::default(); + cloned.context = self.context.clone(); + cloned.cf = self.cf.clone(); + cloned.for_cas = self.for_cas; + cloned.apply_shard(shard); + cloned + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) + } +} pub fn new_raw_delete_range_request( start_key: Vec, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index f69edf15..7b44cc1a 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -889,11 +889,8 @@ async fn raw_large_batch_put() -> Result<()> { // Generate value: repeat pattern to reach VALUE_SIZE let pattern = format!("value_{}", i % 1000); - let mut value = String::new(); - while value.len() < VALUE_SIZE { - value.push_str(&pattern); - } - value.truncate(VALUE_SIZE); + let repeat_count = (VALUE_SIZE + pattern.len() - 1) / pattern.len(); + let value = pattern.repeat(repeat_count); pairs.push(KvPair::from((key, value))); } @@ -902,7 +899,20 @@ async fn raw_large_batch_put() -> Result<()> { let client = RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; - client.batch_put(pairs).await?; + client.batch_put(pairs.clone()).await?; + + let keys = pairs.iter().map(|pair| pair.0.clone()).collect::>(); + // split into multiple batch_get to avoid response too large error + const BATCH_SIZE: usize = 1000; + let mut got = Vec::with_capacity(num_pairs); + for chunk in keys.chunks(BATCH_SIZE) { + let mut partial = client.batch_get(chunk.to_vec()).await?; + got.append(&mut partial); + } + assert_eq!(got, pairs); + + client.batch_delete(keys).await?; + Ok(()) } From 5600349716e01833601690e376b75814d2ec61f8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 21:02:18 +0800 Subject: [PATCH 7/8] fix lint Signed-off-by: lance6716 --- tests/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 7b44cc1a..56ac952c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -889,7 +889,7 @@ async fn raw_large_batch_put() -> Result<()> { // Generate value: repeat pattern to reach VALUE_SIZE let pattern = format!("value_{}", i % 1000); - let repeat_count = (VALUE_SIZE + pattern.len() - 1) / pattern.len(); + let repeat_count = VALUE_SIZE.div_ceil(pattern.len()); let value = pattern.repeat(repeat_count); pairs.push(KvPair::from((key, value))); From dffa44edaf0f24c1c2662504bb8add40ebb86d0e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 21:30:34 +0800 Subject: [PATCH 8/8] check batch delete really clean up data Signed-off-by: lance6716 --- tests/integration_tests.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 56ac952c..411e8810 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -911,7 +911,9 @@ async fn raw_large_batch_put() -> Result<()> { } assert_eq!(got, pairs); - client.batch_delete(keys).await?; + client.batch_delete(keys.clone()).await?; + let res = client.batch_get(keys).await?; + assert!(res.is_empty()); Ok(()) }