diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 636560fb..5f855459 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,6 +61,7 @@ jobs: CARGO_INCREMENTAL: 0 NEXTEST_PROFILE: ci TIKV_VERSION: v8.5.1 + RUST_LOG: info runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index 9227c5b4..7e8e45f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,6 @@ reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] } rstest = "0.18.2" serde_json = "1" serial_test = "0.5.0" -simple_logger = "1" tempfile = "3.6" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } diff --git a/src/common/errors.rs b/src/common/errors.rs index 5e7f6303..c5cd9b44 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -5,6 +5,7 @@ use std::result; use thiserror::Error; use crate::proto::kvrpcpb; +use crate::region::RegionVerId; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. @@ -90,8 +91,8 @@ pub enum Error { #[error("Region {} is not found in the response", region_id)] RegionNotFoundInResponse { region_id: u64 }, /// No leader is found for the given id. - #[error("Leader of region {} is not found", region_id)] - LeaderNotFound { region_id: u64 }, + #[error("Leader of region {} is not found", region.id)] + LeaderNotFound { region: RegionVerId }, /// Scan limit exceeds the maximum #[error("Limit {} exceeds max scan limit {}", limit, max_limit)] MaxScanLimitExceeded { limit: u32, max_limit: u32 }, diff --git a/src/mock.rs b/src/mock.rs index 0a3fb83c..43953ab2 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -216,6 +216,8 @@ impl PdClient for MockPdClient { async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} + async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {} + async fn load_keyspace(&self, _keyspace: &str) -> Result { unimplemented!() } diff --git a/src/pd/client.rs b/src/pd/client.rs index ba36c0ef..417fd19e 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -20,6 +20,7 @@ use crate::proto::metapb; use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; +use crate::region::StoreId; use crate::region_cache::RegionCache; use crate::store::KvConnect; use crate::store::RegionStore; @@ -84,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static { fn group_keys_by_region( self: Arc, keys: impl Iterator + Send + Sync + 'static, - ) -> BoxStream<'static, Result<(RegionWithLeader, Vec)>> + ) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> where K: AsRef + Into + Send + Sync + 'static, K2: Send + Sync + 'static, @@ -102,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static { } grouped.push(keys.next().unwrap().into()); } - Ok(Some((keys, (region, grouped)))) + Ok(Some((keys, (grouped, region)))) } else { Ok(None) } @@ -112,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static { } /// Returns a Stream which iterates over the contexts for each region covered by range. - fn stores_for_range( + fn regions_for_range( self: Arc, range: BoundRange, - ) -> BoxStream<'static, Result> { + ) -> BoxStream<'static, Result> { let (start_key, end_key) = range.into_keys(); stream_fn(Some(start_key), move |start_key| { let end_key = end_key.clone(); @@ -128,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static { let region = this.region_for_key(&start_key).await?; let region_end = region.end_key(); - let store = this.map_region_to_store(region).await?; if end_key .map(|x| x <= region_end && !x.is_empty()) .unwrap_or(false) || region_end.is_empty() { - return Ok(Some((None, store))); + return Ok(Some((None, region))); } - Ok(Some((Some(region_end), store))) + Ok(Some((Some(region_end), region))) } }) .boxed() @@ -146,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static { fn group_ranges_by_region( self: Arc, mut ranges: Vec, - ) -> BoxStream<'static, Result<(RegionWithLeader, Vec)>> { + ) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> { ranges.reverse(); stream_fn(Some(ranges), move |ranges| { let this = self.clone(); @@ -166,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static { if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) { grouped.push(make_key_range(start_key.into(), region_end.clone().into())); ranges.push(make_key_range(region_end.into(), end_key.into())); - return Ok(Some((Some(ranges), (region, grouped)))); + return Ok(Some((Some(ranges), (grouped, region)))); } grouped.push(range); @@ -181,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static { grouped .push(make_key_range(start_key.into(), region_end.clone().into())); ranges.push(make_key_range(region_end.into(), end_key.into())); - return Ok(Some((Some(ranges), (region, grouped)))); + return Ok(Some((Some(ranges), (grouped, region)))); } grouped.push(range); } - Ok(Some((Some(ranges), (region, grouped)))) + Ok(Some((Some(ranges), (grouped, region)))) } else { Ok(None) } @@ -205,6 +205,8 @@ pub trait PdClient: Send + Sync + 'static { async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; async fn invalidate_region_cache(&self, ver_id: RegionVerId); + + async fn invalidate_store_cache(&self, store_id: StoreId); } /// This client converts requests for the logical TiKV cluster into requests @@ -271,6 +273,10 @@ impl PdClient for PdRpcClient { self.region_cache.invalidate_region_cache(ver_id).await } + async fn invalidate_store_cache(&self, store_id: StoreId) { + self.region_cache.invalidate_store_cache(store_id).await + } + async fn load_keyspace(&self, keyspace: &str) -> Result { self.pd.load_keyspace(keyspace).await } @@ -390,7 +396,7 @@ pub mod test { let stream = Arc::new(client).group_keys_by_region(tasks.into_iter()); let mut stream = executor::block_on_stream(stream); - let result: Vec = stream.next().unwrap().unwrap().1; + let result: Vec = stream.next().unwrap().unwrap().0; assert_eq!( result, vec![ @@ -401,27 +407,27 @@ pub mod test { ] ); assert_eq!( - stream.next().unwrap().unwrap().1, + stream.next().unwrap().unwrap().0, vec![vec![12].into(), vec![11, 4].into()] ); assert!(stream.next().is_none()); } #[test] - fn test_stores_for_range() { + fn test_regions_for_range() { let client = Arc::new(MockPdClient::default()); let k1: Key = vec![1].into(); let k2: Key = vec![5, 2].into(); let k3: Key = vec![11, 4].into(); let range1 = (k1, k2.clone()).into(); - let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1)); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); + let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1)); + assert_eq!(stream.next().unwrap().unwrap().id(), 1); assert!(stream.next().is_none()); let range2 = (k2, k3).into(); - let mut stream = executor::block_on_stream(client.stores_for_range(range2)); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); - assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2); + let mut stream = executor::block_on_stream(client.regions_for_range(range2)); + assert_eq!(stream.next().unwrap().unwrap().id(), 1); + assert_eq!(stream.next().unwrap().unwrap().id(), 2); assert!(stream.next().is_none()); } @@ -446,20 +452,20 @@ pub mod test { let ranges3 = stream.next().unwrap().unwrap(); let ranges4 = stream.next().unwrap().unwrap(); - assert_eq!(ranges1.0.id(), 1); + assert_eq!(ranges1.1.id(), 1); assert_eq!( - ranges1.1, + ranges1.0, vec![ make_key_range(k1.clone(), k2.clone()), make_key_range(k1.clone(), k_split.clone()), ] ); - assert_eq!(ranges2.0.id(), 2); - assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]); - assert_eq!(ranges3.0.id(), 1); - assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]); - assert_eq!(ranges4.0.id(), 2); - assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]); + assert_eq!(ranges2.1.id(), 2); + assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]); + assert_eq!(ranges3.1.id(), 1); + assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]); + assert_eq!(ranges4.1.id(), 2); + assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]); assert!(stream.next().is_none()); let range1 = make_key_range(k1.clone(), k2.clone()); @@ -470,11 +476,11 @@ pub mod test { let ranges1 = stream.next().unwrap().unwrap(); let ranges2 = stream.next().unwrap().unwrap(); let ranges3 = stream.next().unwrap().unwrap(); - assert_eq!(ranges1.0.id(), 1); - assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]); - assert_eq!(ranges2.0.id(), 2); - assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]); - assert_eq!(ranges3.0.id(), 3); - assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]); + assert_eq!(ranges1.1.id(), 1); + assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]); + assert_eq!(ranges2.1.id(), 2); + assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]); + assert_eq!(ranges3.1.id(), 3); + assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]); } } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 4422c883..f6f32f3d 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -22,8 +22,8 @@ use crate::request::SingleKey; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_keys; -use crate::store::store_stream_for_ranges; +use crate::store::region_stream_for_keys; +use crate::store::region_stream_for_ranges; use crate::store::RegionStore; use crate::store::Request; use crate::transaction::HasLocks; @@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let kvs = self.pairs.clone(); let ttls = self.ttls.clone(); let mut kv_ttl: Vec = kvs @@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .map(|(kv, ttl)| KvPairTTL(kv, ttl)) .collect(); kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); - store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) + region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { let (pairs, ttls) = shard.into_iter().unzip(); - self.set_leader(&store.region_with_leader)?; self.pairs = pairs; self.ttls = ttls; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.ranges.clone(), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_ranges(self.ranges.clone(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.ranges = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) + } + + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.ranges = shard; } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.set_leader(&store.region_with_leader)?; - self.inner.ranges.clone_from(&shard); - self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard); + self.inner.data = (self.data_builder)( + store.region_with_leader.region.clone(), + self.inner.ranges.clone(), + ); Ok(()) } } diff --git a/src/region.rs b/src/region.rs index 6fb20321..978b8a07 100644 --- a/src/region.rs +++ b/src/region.rs @@ -73,7 +73,7 @@ impl RegionWithLeader { .as_ref() .cloned() .ok_or_else(|| Error::LeaderNotFound { - region_id: self.id(), + region: self.ver_id(), }) .map(|s| s.store_id) } diff --git a/src/region_cache.rs b/src/region_cache.rs index e56068c0..8837de38 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -233,6 +233,11 @@ impl RegionCache { } } + pub async fn invalidate_store_cache(&self, store_id: StoreId) { + let mut cache = self.store_cache.write().await; + cache.remove(&store_id); + } + pub async fn read_through_all_stores(&self) -> Result> { let stores = self .inner_client diff --git a/src/request/mod.rs b/src/request/mod.rs index 14de8e90..a9913e74 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -103,7 +103,7 @@ mod test { use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::region::RegionWithLeader; - use crate::store::store_stream_for_keys; + use crate::store::region_stream_for_keys; use crate::store::HasRegionError; use crate::transaction::lowering::new_commit_request; use crate::Error; @@ -168,22 +168,20 @@ mod test { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - crate::Result<(Self::Shard, crate::store::RegionStore)>, + crate::Result<(Self::Shard, crate::region::RegionWithLeader)>, > { // Increases by 1 for each call. self.test_invoking_count .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - store_stream_for_keys( + region_stream_for_keys( Some(Key::from("mock_key".to_owned())).into_iter(), pd_client.clone(), ) } - fn apply_shard( - &mut self, - _shard: Self::Shard, - _store: &crate::store::RegionStore, - ) -> crate::Result<()> { + fn apply_shard(&mut self, _shard: Self::Shard) {} + + fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> { Ok(()) } } diff --git a/src/request/plan.rs b/src/request/plan.rs index ffff6c24..be915f77 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,6 +17,8 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; +use crate::region::StoreId; +use crate::region::{RegionVerId, RegionWithLeader}; use crate::request::shard::HasNextBatch; use crate::request::NextBatch; use crate::request::Shardable; @@ -114,15 +116,16 @@ where preserve_region_results: bool, ) -> Result<::Result> { let shards = current_plan.shards(&pd_client).collect::>().await; + debug!("single_plan_handler, shards: {}", shards.len()); let mut handles = Vec::new(); for shard in shards { - let (shard, region_store) = shard?; + let (shard, region) = shard?; let mut clone = current_plan.clone(); - clone.apply_shard(shard, ®ion_store)?; + clone.apply_shard(shard); let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), clone, - region_store, + region, backoff.clone(), permits.clone(), preserve_region_results, @@ -153,12 +156,45 @@ where #[async_recursion] async fn single_shard_handler( pd_client: Arc, - plan: P, - region_store: RegionStore, + mut plan: P, + region: RegionWithLeader, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { + debug!("single_shard_handler"); + let region_store = match pd_client + .clone() + .map_region_to_store(region) + .await + .and_then(|region_store| { + plan.apply_store(®ion_store)?; + Ok(region_store) + }) { + Ok(region_store) => region_store, + Err(Error::LeaderNotFound { region }) => { + debug!( + "single_shard_handler::sharding: leader not found: {:?}", + region + ); + return Self::handle_other_error( + pd_client, + plan, + region.clone(), + None, + backoff, + permits, + preserve_region_results, + Error::LeaderNotFound { region }, + ) + .await; + } + Err(err) => { + debug!("single_shard_handler::sharding, error: {:?}", err); + return Err(err); + } + }; + // limit concurrent requests let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; @@ -167,10 +203,12 @@ where let mut resp = match res { Ok(resp) => resp, Err(e) if is_grpc_error(&e) => { - return Self::handle_grpc_error( + debug!("single_shard_handler:execute: grpc error: {:?}", e); + return Self::handle_other_error( pd_client, plan, - region_store, + region_store.region_with_leader.ver_id(), + region_store.region_with_leader.get_store_id().ok(), backoff, permits, preserve_region_results, @@ -178,12 +216,17 @@ where ) .await; } - Err(e) => return Err(e), + Err(e) => { + debug!("single_shard_handler:execute: error: {:?}", e); + return Err(e); + } }; if let Some(e) = resp.key_errors() { + debug!("single_shard_handler:execute: key errors: {:?}", e); Ok(vec![Err(Error::MultipleKeyErrors(e))]) } else if let Some(e) = resp.region_error() { + debug!("single_shard_handler:execute: region error: {:?}", e); match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = @@ -208,18 +251,24 @@ where } } - async fn handle_grpc_error( + #[allow(clippy::too_many_arguments)] + async fn handle_other_error( pd_client: Arc, plan: P, - region_store: RegionStore, + region: RegionVerId, + store: Option, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, e: Error, ) -> Result<::Result> { - debug!("handle grpc error: {:?}", e); - let ver_id = region_store.region_with_leader.ver_id(); - pd_client.invalidate_region_cache(ver_id).await; + debug!("handle_other_error: {:?}", e); + pd_client.invalidate_region_cache(region).await; + if is_grpc_error(&e) { + if let Some(store_id) = store { + pd_client.invalidate_store_cache(store_id).await; + } + } match backoff.next_delay_duration() { Some(duration) => { sleep(duration).await; @@ -246,7 +295,9 @@ pub(crate) async fn handle_region_error( e: errorpb::Error, region_store: RegionStore, ) -> Result { + debug!("handle_region_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); + let store_id = region_store.region_with_leader.get_store_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { match pd_client @@ -269,6 +320,9 @@ pub(crate) async fn handle_region_error( } } else if e.store_not_match.is_some() { pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } Ok(false) } else if e.epoch_not_match.is_some() { on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await @@ -284,6 +338,9 @@ pub(crate) async fn handle_region_error( // TODO: pass the logger around // info!("unknwon region error: {:?}", e); pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } Ok(false) } } @@ -881,11 +938,13 @@ mod test { fn shards( &self, _: &Arc, - ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> { + ) -> BoxStream<'static, crate::Result<(Self::Shard, RegionWithLeader)>> { Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))).boxed() } - fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> { + fn apply_shard(&mut self, _: Self::Shard) {} + + fn apply_store(&mut self, _: &crate::store::RegionStore) -> Result<()> { Ok(()) } } diff --git a/src/request/shard.rs b/src/request/shard.rs index 1f116f76..a68f446a 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -6,6 +6,7 @@ use futures::stream::BoxStream; use super::plan::PreserveShard; use crate::pd::PdClient; +use crate::region::RegionWithLeader; use crate::request::plan::CleanupLocks; use crate::request::Dispatch; use crate::request::KvRequest; @@ -23,12 +24,16 @@ macro_rules! impl_inner_shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.inner.apply_shard(shard, store) + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.apply_shard(shard); + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.inner.apply_store(store) } }; } @@ -39,9 +44,11 @@ pub trait Shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>; + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>; - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>; + fn apply_shard(&mut self, shard: Self::Shard); + + fn apply_store(&mut self, store: &RegionStore) -> Result<()>; } pub trait Batchable { @@ -88,13 +95,17 @@ impl Shardable for Dispatch { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.request.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { + self.request.apply_shard(shard); + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.kv_client = Some(store.client.clone()); - self.request.apply_shard(shard, store) + self.request.apply_store(store) } } @@ -110,13 +121,17 @@ impl Shardable for PreserveShard

{ fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { self.shard = Some(shard.clone()); - self.inner.apply_shard(shard, store) + self.inner.apply_shard(shard) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.inner.apply_store(store) } } @@ -130,13 +145,17 @@ impl Shardable for CleanupLocks { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard) { + self.inner.apply_shard(shard) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.store = Some(store.clone()); - self.inner.apply_shard(shard, store) + self.inner.apply_store(store) } } @@ -152,23 +171,21 @@ macro_rules! shardable_key { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - $crate::Result<(Self::Shard, $crate::store::RegionStore)>, + $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>, > { - $crate::store::store_stream_for_keys( + $crate::store::region_stream_for_keys( std::iter::once(self.key.clone()), pd_client.clone(), ) } - fn apply_shard( - &mut self, - mut shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.key = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; @@ -186,21 +203,19 @@ macro_rules! shardable_keys { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - $crate::Result<(Self::Shard, $crate::store::RegionStore)>, + $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>, > { let mut keys = self.keys.clone(); keys.sort(); - $crate::store::store_stream_for_keys(keys.into_iter(), pd_client.clone()) + $crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone()) } - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.keys = shard.into_iter().map(Into::into).collect(); - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; @@ -242,7 +257,8 @@ macro_rules! shardable_range { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> { + ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>> + { let mut start_key = self.start_key.clone().into(); let mut end_key = self.end_key.clone().into(); // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. @@ -250,16 +266,10 @@ macro_rules! shardable_range { if self.is_reverse() { std::mem::swap(&mut start_key, &mut end_key); } - $crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) + $crate::store::region_stream_for_range((start_key, end_key), pd_client.clone()) } - fn apply_shard( - &mut self, - shard: Self::Shard, - store: &$crate::store::RegionStore, - ) -> $crate::Result<()> { - self.set_leader(&store.region_with_leader)?; - + fn apply_shard(&mut self, shard: Self::Shard) { // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. self.start_key = shard.0; @@ -267,7 +277,10 @@ macro_rules! shardable_range { if self.is_reverse() { std::mem::swap(&mut self.start_key, &mut self.end_key); } - Ok(()) + } + + fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> { + self.set_leader(&store.region_with_leader) } } }; diff --git a/src/store/mod.rs b/src/store/mod.rs index f21373b4..fb6ed709 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -38,46 +38,37 @@ pub struct Store { } /// Maps keys to a stream of stores. `key_data` must be sorted in increasing order -pub fn store_stream_for_keys( +pub fn region_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, RegionStore)>> +) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> where PdC: PdClient, K: AsRef + Into + Send + Sync + 'static, KOut: Send + Sync + 'static, { - pd_client - .clone() - .group_keys_by_region(key_data) - .and_then(move |(region, key)| { - pd_client - .clone() - .map_region_to_store(region) - .map_ok(move |store| (key, store)) - }) - .boxed() + pd_client.clone().group_keys_by_region(key_data) } #[allow(clippy::type_complexity)] -pub fn store_stream_for_range( +pub fn region_stream_for_range( range: (Vec, Vec), pd_client: Arc, -) -> BoxStream<'static, Result<((Vec, Vec), RegionStore)>> { +) -> BoxStream<'static, Result<((Vec, Vec), RegionWithLeader)>> { let bnd_range = if range.1.is_empty() { BoundRange::range_from(range.0.clone().into()) } else { BoundRange::from(range.clone()) }; pd_client - .stores_for_range(bnd_range) - .map_ok(move |store| { - let region_range = store.region_with_leader.range(); + .regions_for_range(bnd_range) + .map_ok(move |region| { + let region_range = region.range(); let result_range = range_intersection( region_range, (range.0.clone().into(), range.1.clone().into()), ); - ((result_range.0.into(), result_range.1.into()), store) + ((result_range.0.into(), result_range.1.into()), region) }) .boxed() } @@ -95,18 +86,9 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) (max(lower, range.0), up) } -pub fn store_stream_for_ranges( +pub fn region_stream_for_ranges( ranges: Vec, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, RegionStore)>> { - pd_client - .clone() - .group_ranges_by_region(ranges) - .and_then(move |(region, range)| { - pd_client - .clone() - .map_region_to_store(region) - .map_ok(move |store| (range, store)) - }) - .boxed() +) -> BoxStream<'static, Result<(Vec, RegionWithLeader)>> { + pd_client.clone().group_ranges_by_region(ranges) } diff --git a/src/store/request.rs b/src/store/request.rs index 9eaabe5f..df66b73a 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -56,7 +56,7 @@ macro_rules! impl_request { fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> { let ctx = self.context.get_or_insert(kvrpcpb::Context::default()); let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound { - region_id: leader.region.id, + region: leader.ver_id(), })?; ctx.region_id = leader.region.id; ctx.region_epoch = leader.region.region_epoch.clone(); diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6a5538d9..ebbb52a8 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -19,6 +19,7 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::kvrpcpb::{self}; use crate::proto::pdpb::Timestamp; +use crate::region::RegionWithLeader; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::CollectWithShard; @@ -37,10 +38,10 @@ use crate::reversible_range_request; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_range; use crate::store::RegionStore; use crate::store::Request; -use crate::store::{store_stream_for_keys, Store}; +use crate::store::Store; +use crate::store::{region_stream_for_keys, region_stream_for_range}; use crate::timestamp::TimestampExt; use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction; use crate::transaction::HasLocks; @@ -283,26 +284,24 @@ impl Shardable for kvrpcpb::PrewriteRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); - store_stream_for_keys(mutations.into_iter(), pd_client.clone()) + region_stream_for_keys(mutations.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches( + Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches( mutations, TXN_COMMIT_BATCH_SIZE, )) - .map(move |batch| Ok((batch, store.clone()))) + .map(move |batch| Ok((batch, region.clone()))) .boxed(), Err(e) => stream::iter(Err(e)).boxed(), }) .boxed() } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; - + fn apply_shard(&mut self, shard: Self::Shard) { // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { self.secondaries = vec![]; @@ -314,7 +313,10 @@ impl Shardable for kvrpcpb::PrewriteRequest { } self.mutations = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -351,15 +353,15 @@ impl Shardable for kvrpcpb::CommitRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut keys = self.keys.clone(); keys.sort(); - store_stream_for_keys(keys.into_iter(), pd_client.clone()) + region_stream_for_keys(keys.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((keys, store)) => { + Ok((keys, region)) => { stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE)) - .map(move |batch| Ok((batch, store.clone()))) + .map(move |batch| Ok((batch, region.clone()))) .boxed() } Err(e) => stream::iter(Err(e)).boxed(), @@ -367,10 +369,12 @@ impl Shardable for kvrpcpb::CommitRequest { .boxed() } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.keys = shard.into_iter().map(Into::into).collect(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -452,16 +456,18 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); - store_stream_for_keys(mutations.into_iter(), pd_client.clone()) + region_stream_for_keys(mutations.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.mutations = shard; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -552,17 +558,19 @@ impl Shardable for kvrpcpb::ScanLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_range( + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_range( (self.start_key.clone(), self.end_key.clone()), pd_client.clone(), ) } - fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, shard: Self::Shard) { self.start_key = shard.0; - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -616,15 +624,17 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) } - fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } @@ -674,15 +684,17 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> { + region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) } - fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_leader(&store.region_with_leader)?; + fn apply_shard(&mut self, mut shard: Self::Shard) { assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); - Ok(()) + } + + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { + self.set_leader(&store.region_with_leader) } } diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 5694614b..f9cb6b9d 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -1,7 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use derive_new::new; -use log::debug; +use log::{debug, trace}; use crate::BoundRange; use crate::Key; @@ -25,7 +25,7 @@ pub struct Snapshot { impl Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { - debug!("invoking get request on snapshot"); + trace!("invoking get request on snapshot"); self.transaction.get(key).await } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index cae46179..b30ef6e4 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -9,8 +9,8 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; -use log::debug; use log::warn; +use log::{debug, trace}; use tokio::time::Duration; use crate::backoff::Backoff; @@ -132,7 +132,7 @@ impl Transaction { /// # }); /// ``` pub async fn get(&mut self, key: impl Into) -> Result> { - debug!("invoking transactional get request"); + trace!("invoking transactional get request"); self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); @@ -461,7 +461,7 @@ impl Transaction { /// # }); /// ``` pub async fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { - debug!("invoking transactional put request"); + trace!("invoking transactional put request"); self.check_allow_operation().await?; let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.is_pessimistic() { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9a32619b..06b5c1fe 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -46,6 +46,8 @@ pub async fn clear_tikv() { // To test with multiple regions, prewrite some data. Tests that hope to test // with multiple regions should use keys in the corresponding ranges. pub async fn init() -> Result<()> { + let _ = env_logger::try_init(); + if env::var(ENV_ENABLE_MULIT_REGION).is_ok() { // 1000 keys: 0..1000 let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1))