diff --git a/src/request/plan.rs b/src/request/plan.rs index 78d4776d..eb6caa74 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -141,7 +141,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - Self::handle_region_error(pd_client.clone(), e, region_store).await?; + handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { futures_timer::Delay::new(duration).await; @@ -161,107 +161,6 @@ where Ok(vec![Ok(resp)]) } } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn handle_region_error( - pd_client: Arc, - mut e: errorpb::Error, - region_store: RegionStore, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if e.has_not_leader() { - let not_leader = e.get_not_leader(); - if not_leader.has_leader() { - match pd_client - .update_leader( - region_store.region_with_leader.ver_id(), - not_leader.get_leader().clone(), - ) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.has_store_not_match() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.has_epoch_not_match() { - Self::on_region_epoch_not_match( - pd_client.clone(), - region_store, - e.take_epoch_not_match(), - ) - .await - } else if e.has_stale_command() || e.has_region_not_found() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.has_server_is_busy() - || e.has_raft_entry_too_large() - || e.has_max_timestamp_not_synced() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.get_current_regions().is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.get_current_regions() { - if r.get_id() == region_store.region_with_leader.id() { - let returned_conf_ver = r.get_region_epoch().get_conf_ver(); - let returned_version = r.get_region_epoch().get_version(); - let current_conf_ver = region_store - .region_with_leader - .region - .get_region_epoch() - .get_conf_ver(); - let current_version = region_store - .region_with_leader - .region - .get_region_epoch() - .get_version(); - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } } impl Clone for RetryableMultiRegion { @@ -298,6 +197,102 @@ where } } +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub async fn handle_region_error( + pd_client: Arc, + mut e: errorpb::Error, + region_store: RegionStore, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if e.has_not_leader() { + let not_leader = e.get_not_leader(); + if not_leader.has_leader() { + match pd_client + .update_leader( + region_store.region_with_leader.ver_id(), + not_leader.get_leader().clone(), + ) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.has_store_not_match() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_epoch_not_match() { + on_region_epoch_not_match(pd_client.clone(), region_store, e.take_epoch_not_match()).await + } else if e.has_stale_command() || e.has_region_not_found() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_server_is_busy() + || e.has_raft_entry_too_large() + || e.has_max_timestamp_not_synced() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } +} + +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.get_current_regions().is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.get_current_regions() { + if r.get_id() == region_store.region_with_leader.id() { + let returned_conf_ver = r.get_region_epoch().get_conf_ver(); + let returned_version = r.get_region_epoch().get_version(); + let current_conf_ver = region_store + .region_with_leader + .region + .get_region_epoch() + .get_conf_ver(); + let current_version = region_store + .region_with_leader + .region + .get_region_epoch() + .get_version(); + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) +} + /// A technique for merging responses into a single result (with type `Out`). pub trait Merge: Sized + Clone + Send + Sync + 'static { type Out: Send; diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d7ca5760..d747f960 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -5,16 +5,23 @@ use crate::{ backoff::DEFAULT_REGION_BACKOFF, config::Config, pd::{PdClient, PdRpcClient}, - request::{plan::CleanupLocksResult, Plan}, + request::{ + plan::{handle_region_error, CleanupLocksResult}, + Plan, + }, + store::RegionStore, timestamp::TimestampExt, transaction::{ - lock::ResolveLocksOptions, ResolveLocksContext, Snapshot, Transaction, TransactionOptions, + lock::ResolveLocksOptions, requests, requests::new_unsafe_destroy_range_request, + ResolveLocksContext, Snapshot, Transaction, TransactionOptions, }, - Backoff, Result, + Backoff, BoundRange, Result, }; +use futures::{future::try_join_all, StreamExt}; use slog::{Drain, Logger}; -use std::{mem, sync::Arc}; -use tikv_client_proto::pdpb::Timestamp; +use std::{collections::HashMap, mem, sync::Arc}; +use tikv_client_common::Error; +use tikv_client_proto::{metapb::Region, pdpb::Timestamp}; // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; @@ -301,4 +308,125 @@ impl Client { let logger = self.logger.new(o!("child" => 1)); Transaction::new(timestamp, self.pd.clone(), options, logger) } + + // FIXME: make the loop a plan. + pub async fn split_region_with_retry( + &self, + #[allow(clippy::ptr_arg)] key: &Vec, + split_keys: Vec>, + is_raw_kv: bool, + ) -> Result> { + debug!(self.logger, "invoking split region with retry"); + let mut backoff = DEFAULT_REGION_BACKOFF; + let mut i = 0; + 'retry: loop { + i += 1; + debug!(self.logger, "split region: attempt {}", i); + let store = self.pd.clone().store_for_key(key.into()).await?; + let request = requests::new_split_region_request(split_keys.clone(), is_raw_kv); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), request) + .single_region_with_store(store.clone()) + .await? + .extract_error() + .plan(); + match plan.execute().await { + Ok(mut resp) => return Ok(resp.take_regions().into()), + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + Some(Error::RegionError(e)) => match backoff.next_delay_duration() { + Some(duration) => { + let region_error_resolved = + handle_region_error(self.pd.clone(), *e, store).await?; + if !region_error_resolved { + futures_timer::Delay::new(duration).await; + } + continue 'retry; + } + None => return Err(Error::RegionError(e)), + }, + Some(e) => return Err(e), + None => unreachable!(), + }, + Err(e) => return Err(e), + } + } + } + + // FIXME: make list stores and retry a plan. + pub async fn unsafe_destroy_range(&self, start_key: Vec, end_key: Vec) -> Result<()> { + debug!(self.logger, "invoking unsafe destroy range"); + let backoff = DEFAULT_REGION_BACKOFF; + let stores = self + .list_stores_for_unsafe_destroy(start_key.clone(), end_key.clone()) + .await?; + let mut handles = Vec::with_capacity(stores.len()); + for store in stores.into_values() { + let logger = self.logger.clone(); + let start_key = start_key.clone(); + let end_key = end_key.clone(); + let pd = self.pd.clone(); + let mut backoff = backoff.clone(); + let task = async move { + let mut i = 0; + 'retry: loop { + i += 1; + debug!(logger, "unsafe destroy range: attempt {}", i); + let request = + new_unsafe_destroy_range_request(start_key.clone(), end_key.clone()); + let plan = crate::request::PlanBuilder::new(pd.clone(), request) + .single_region_with_store(store.clone()) + .await? + .extract_error() + .plan(); + match plan.execute().await { + Ok(_) => return Ok(()), + Err(e) => { + warn!(logger, "unsafe destroy range error: {:?}", e); + match backoff.next_delay_duration() { + Some(duration) => { + futures_timer::Delay::new(duration).await; + continue 'retry; + } + None => return Err(e), + } + } + } + } + }; + handles.push(tokio::spawn(task)); + } + + let results = try_join_all(handles).await?; + match results.into_iter().find(|x| x.is_err()) { + Some(r) => r, + None => Ok(()), + } + } + + async fn list_stores_for_unsafe_destroy( + &self, + start_key: Vec, + end_key: Vec, + ) -> Result> { + let mut stores = HashMap::new(); + let bnd_range = BoundRange::from((start_key, end_key)); + self.pd + .clone() + .stores_for_range(bnd_range) + .map(|store| -> Result<()> { + let store = store?; + let store_id = store + .region_with_leader + .leader + .as_ref() + .unwrap() + .get_store_id(); + stores.entry(store_id).or_insert(store); + Ok(()) + }) + .collect::>() + .await + .into_iter() + .collect::>()?; + Ok(stores) + } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index e5949cd4..44865e70 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -113,7 +113,6 @@ async fn resolve_lock_with_retry( let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); - // The only place where single-region is used let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store) .await? diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 2ed90041..d6b593ae 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -782,6 +782,44 @@ impl HasLocks for kvrpcpb::PrewriteResponse { } } +pub fn new_split_region_request( + split_keys: Vec>, + is_raw_kv: bool, +) -> kvrpcpb::SplitRegionRequest { + let mut req = kvrpcpb::SplitRegionRequest::default(); + req.set_split_keys(split_keys.into()); + req.set_is_raw_kv(is_raw_kv); + req +} + +pub fn new_unsafe_destroy_range_request( + start_key: Vec, + end_key: Vec, +) -> kvrpcpb::UnsafeDestroyRangeRequest { + let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default(); + req.set_start_key(start_key); + req.set_end_key(end_key); + req +} + +// Note: SplitRegionRequest is sent to a specified region without keys. So it's not Shardable. +// And we don't automatically retry on its region errors (in the Plan level). +// The region error must be manually handled (in the upper level). +impl KvRequest for kvrpcpb::SplitRegionRequest { + type Response = kvrpcpb::SplitRegionResponse; +} + +impl HasLocks for kvrpcpb::SplitRegionResponse {} + +// Note: UnsafeDestroyRangeRequest is sent to all stores cover the range. So it's not Shardable. +// And we don't automatically retry on its errors (in the Plan level). +// The errors must be manually handled (in the upper level). +impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest { + type Response = kvrpcpb::UnsafeDestroyRangeResponse; +} + +impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {} + #[cfg(test)] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index f40afd0d..8512f28d 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -66,6 +66,8 @@ has_region_error!(kvrpcpb::RawScanResponse); has_region_error!(kvrpcpb::RawBatchScanResponse); has_region_error!(kvrpcpb::RawCasResponse); has_region_error!(kvrpcpb::RawCoprocessorResponse); +has_region_error!(kvrpcpb::SplitRegionResponse); +has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse); macro_rules! has_key_error { ($type:ty) => { @@ -118,6 +120,7 @@ has_str_error!(kvrpcpb::RawCasResponse); has_str_error!(kvrpcpb::RawCoprocessorResponse); has_str_error!(kvrpcpb::ImportResponse); has_str_error!(kvrpcpb::DeleteRangeResponse); +has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse); impl HasKeyErrors for kvrpcpb::ScanResponse { fn key_errors(&mut self) -> Option> { @@ -167,6 +170,12 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } +impl HasKeyErrors for kvrpcpb::SplitRegionResponse { + fn key_errors(&mut self) -> Option> { + None + } +} + impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index 290f142a..963a8f9c 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -125,3 +125,9 @@ impl_request!( kv_delete_range_async_opt, "kv_delete_range" ); +impl_request!(SplitRegionRequest, split_region_async_opt, "split_region"); +impl_request!( + UnsafeDestroyRangeRequest, + unsafe_destroy_range_async_opt, + "unsafe_destroy_range" +);