Skip to content

[WIP] *: trace scan requests #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bec213a
trace scan requests
pingyu Oct 26, 2023
e35f479
wip
pingyu Oct 26, 2023
547eabb
wip
pingyu Oct 26, 2023
e35a71b
wip
pingyu Oct 27, 2023
942dbfc
wip
pingyu Oct 27, 2023
56e8f4d
fix blocking_write
pingyu Oct 30, 2023
75ecb2c
comment out block_on
pingyu Oct 30, 2023
83ae139
use AtomicU8 for status
pingyu Oct 31, 2023
34a7936
simplify
pingyu Oct 31, 2023
fdec181
fix check
pingyu Oct 31, 2023
a1afcc1
skip exchange on equal
pingyu Oct 31, 2023
1f9eba3
Merge branch 'status-as-atomic-u8' into trace-scan
pingyu Oct 31, 2023
74df9f3
fix check
pingyu Oct 31, 2023
63790aa
Merge remote-tracking branch 'upstream/master' into trace-scan
pingyu Nov 1, 2023
bab9d01
polish
pingyu Nov 1, 2023
7d8b777
trace tso
pingyu Nov 2, 2023
aabe8e5
fix get tso hang
pingyu Nov 3, 2023
7892d75
Merge remote-tracking branch 'upstream/master' into trace-scan
pingyu Nov 3, 2023
55887f4
change all log to tracing
pingyu Nov 3, 2023
a0bd83f
Merge remote-tracking branch 'origin/fix-get-tso-hang' into trace-scan
pingyu Nov 3, 2023
1d3d074
more trace for tso
pingyu Nov 4, 2023
7ba5f55
wake
pingyu Nov 6, 2023
81058ee
Merge branch 'fix-get-tso-hang' into trace-scan
pingyu Nov 6, 2023
e08fa53
Merge remote-tracking branch 'upstream/master' into trace-scan
pingyu Nov 13, 2023
faf135b
print locks
pingyu Nov 13, 2023
5591a9a
Merge branch 'master' into trace-scan
pingyu Nov 22, 2023
c2325e2
tracing for gc
pingyu Nov 22, 2023
e067c34
do not trace single shard
pingyu Nov 22, 2023
36be2d0
gc with range
pingyu Nov 22, 2023
20f51be
polish trace
pingyu Nov 22, 2023
2fb5c22
trace handle_region_error
pingyu Nov 22, 2023
87f5c5c
no trace ResolveLock::execute
pingyu Nov 22, 2023
5695e2e
migrate to tikv/minitrace-rust
andylokandy Dec 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -32,7 +32,8 @@ either = "1.6"
fail = "0.4"
futures = { version = "0.3" }
lazy_static = "1"
log = "0.4"
log = { version = "0.4", features = ["kv_unstable"] }
minitrace = "0.6.2"
pin-project = "1"
prometheus = { version = "0.13", default-features = false }
prost = "0.12"
2 changes: 1 addition & 1 deletion src/kv/mod.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ pub use key::Key;
pub use kvpair::KvPair;
pub use value::Value;

struct HexRepr<'a>(pub &'a [u8]);
pub struct HexRepr<'a>(pub &'a [u8]);

impl<'a> fmt::Display for HexRepr<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2 changes: 2 additions & 0 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
@@ -103,6 +103,7 @@ impl Connection {
Connection { security_mgr }
}

#[minitrace::trace]
pub async fn connect_cluster(
&self,
endpoints: &[String],
@@ -122,6 +123,7 @@ impl Connection {
}

// Re-establish connection with PD leader in asynchronous fashion.
#[minitrace::trace]
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
17 changes: 14 additions & 3 deletions src/pd/retry.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use log::debug;
use minitrace::prelude::*;
use tokio::sync::RwLock;
use tokio::time::sleep;

@@ -74,14 +76,17 @@ macro_rules! retry_core {
($self: ident, $tag: literal, $call: expr) => {{
let stats = pd_stats($tag);
let mut last_err = Ok(());
for _ in 0..LEADER_CHANGE_RETRY {
for retry in 0..LEADER_CHANGE_RETRY {
let _span = LocalSpan::enter_with_local_parent("RetryClient::retry");

let res = $call;

match stats.done(res) {
Ok(r) => return Ok(r),
Err(e) => last_err = Err(e),
}

debug!("retry {} on last_err: {:?}", retry, last_err);
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await {
reconnect_count -= 1;
@@ -142,6 +147,7 @@ impl RetryClient<Cluster> {
impl RetryClientTrait for RetryClient<Cluster> {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
#[minitrace::trace]
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region", |cluster| {
let key = key.clone();
@@ -156,6 +162,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[minitrace::trace]
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region_by_id", |cluster| async {
cluster
@@ -167,6 +174,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[minitrace::trace]
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
retry_mut!(self, "get_store", |cluster| async {
cluster
@@ -176,6 +184,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[minitrace::trace]
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry_mut!(self, "get_all_stores", |cluster| async {
cluster
@@ -185,10 +194,12 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[minitrace::trace]
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
}

#[minitrace::trace]
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
retry_mut!(self, "update_gc_safepoint", |cluster| async {
cluster
@@ -277,7 +288,7 @@ mod test {
}

async fn retry_ok(client: Arc<MockClient>) -> Result<()> {
retry!(client, "test", |_c| ready(Ok::<_, Error>(())))
retry_mut!(client, "test", |_c| ready(Ok::<_, Error>(())))
}

executor::block_on(async {
@@ -342,7 +353,7 @@ mod test {
client: Arc<MockClient>,
max_retries: Arc<AtomicUsize>,
) -> Result<()> {
retry!(client, "test", |c| {
retry_mut!(client, "test", |c| {
c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1;
19 changes: 19 additions & 0 deletions src/pd/timestamp.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use futures::task::Context;
use futures::task::Poll;
use log::debug;
use log::info;
use minitrace::prelude::*;
use pin_project::pin_project;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
@@ -63,6 +64,7 @@ impl TimestampOracle {
Ok(TimestampOracle { request_tx })
}

#[minitrace::trace]
pub(crate) async fn get_timestamp(self) -> Result<Timestamp> {
debug!("getting current timestamp");
let (request, response) = oneshot::channel();
@@ -74,6 +76,7 @@ impl TimestampOracle {
}
}

#[minitrace::trace]
async fn run_tso(
cluster_id: u64,
mut pd_client: PdClient<Channel>,
@@ -98,6 +101,9 @@ async fn run_tso(
let mut responses = pd_client.tso(request_stream).await?.into_inner();

while let Some(Ok(resp)) = responses.next().await {
let _span = LocalSpan::enter_with_local_parent("handle_response");
debug!("got response: {:?}", resp);

{
let mut pending_requests = pending_requests.lock().await;
allocate_timestamps(&resp, &mut pending_requests)?;
@@ -128,6 +134,7 @@ struct TsoRequestStream {
impl Stream for TsoRequestStream {
type Item = TsoRequest;

#[minitrace::trace]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();

@@ -152,6 +159,12 @@ impl Stream for TsoRequestStream {
}
}

debug!(
"got requests: len {}, pending_requests {}",
requests.len(),
pending_requests.len()
);

if !requests.is_empty() {
let req = TsoRequest {
header: Some(RequestHeader {
@@ -168,6 +181,12 @@ impl Stream for TsoRequestStream {
};
pending_requests.push_back(request_group);

debug!(
"sending request to PD: {:?}, pending_requests {}",
req,
pending_requests.len()
);

Poll::Ready(Some(req))
} else {
// Set the waker to the context, then the stream can be waked up after the pending queue
9 changes: 9 additions & 0 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::any::Any;
use std::fmt::Formatter;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
@@ -404,6 +405,14 @@ impl Request for RawCoprocessorRequest {
}
}

impl std::fmt::Debug for RawCoprocessorRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawCoprocessorRequest")
.field("inner", &self.inner)
.finish()
}
}

impl KvRequest for RawCoprocessorRequest {
type Response = kvrpcpb::RawCoprocessorResponse;
}
2 changes: 1 addition & 1 deletion src/request/mod.rs
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ mod test {

impl HasLocks for MockRpcResponse {}

#[derive(Clone)]
#[derive(Debug, Clone)]
struct MockKvRequest {
test_invoking_count: Arc<AtomicUsize>,
}
55 changes: 45 additions & 10 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ use futures::future::try_join_all;
use futures::prelude::*;
use log::debug;
use log::info;
use minitrace::future::FutureExt;
use minitrace::prelude::*;
use tokio::sync::Semaphore;
use tokio::time::sleep;

@@ -57,6 +59,7 @@ pub struct Dispatch<Req: KvRequest> {
impl<Req: KvRequest> Plan for Dispatch<Req> {
type Result = Req::Response;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
let stats = tikv_stats(self.request.label());
let result = self
@@ -104,6 +107,7 @@ where
{
// A plan may involve multiple shards
#[async_recursion]
#[minitrace::trace]
async fn single_plan_handler(
pd_client: Arc<PdC>,
current_plan: P,
@@ -117,14 +121,17 @@ where
let (shard, region_store) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard, &region_store)?;
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
backoff.clone(),
permits.clone(),
preserve_region_results,
));
let handle = tokio::spawn(
Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
backoff.clone(),
permits.clone(),
preserve_region_results,
)
.in_span(Span::enter_with_local_parent("single_shard_handler")),
);
handles.push(handle);
}

@@ -149,6 +156,7 @@ where
}

#[async_recursion]
#[minitrace::trace]
async fn single_shard_handler(
pd_client: Arc<PdC>,
plan: P,
@@ -210,11 +218,17 @@ where
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
#[minitrace::trace]
async fn handle_region_error(
pd_client: Arc<PdC>,
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
debug!(
"handle_region_error, error:{:?}, region_store:{:?}",
e, region_store
);

let ver_id = region_store.region_with_leader.ver_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
@@ -266,6 +280,7 @@ where
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
#[minitrace::trace]
async fn on_region_epoch_not_match(
pd_client: Arc<PdC>,
region_store: RegionStore,
@@ -302,6 +317,7 @@ where
Ok(false)
}

#[minitrace::trace]
async fn handle_grpc_error(
pd_client: Arc<PdC>,
plan: P,
@@ -349,6 +365,7 @@ where
{
type Result = Vec<Result<P::Result>>;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
// Limit the maximum concurrency of multi-region request. If there are
// too many concurrent requests, TiKV is more likely to return a "TiKV
@@ -469,6 +486,7 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
{
type Result = M::Out;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
self.merge.merge(self.inner.execute().await?)
}
@@ -565,27 +583,43 @@ where
{
type Result = P::Result;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
let mut result = self.inner.execute().await?;
let mut clone = self.clone();
let mut retry_cnt = 0;
loop {
retry_cnt += 1;
let _span = LocalSpan::enter_with_local_parent("ResolveLock::execute::retry")
.with_property(|| ("retry_count", retry_cnt.to_string()));

let locks = result.take_locks();
if locks.is_empty() {
debug!("ResolveLock::execute ok");
return Ok(result);
}

if self.backoff.is_none() {
debug!("ResolveLock::execute lock error");
return Err(Error::ResolveLockError(locks));
}

let pd_client = self.pd_client.clone();
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
if live_locks.is_empty() {
debug!("ResolveLock::execute lock error retry (resolved)",);
result = self.inner.execute().await?;
} else {
match clone.backoff.next_delay_duration() {
None => return Err(Error::ResolveLockError(live_locks)),
None => {
debug!("ResolveLock::execute lock error");
return Err(Error::ResolveLockError(live_locks));
}
Some(delay_duration) => {
debug!(
"ResolveLock::execute lock error retry (delay {:?})",
delay_duration
);
sleep(delay_duration).await;
result = clone.inner.execute().await?;
}
@@ -595,7 +629,7 @@ where
}
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct CleanupLocksResult {
pub region_error: Option<errorpb::Error>,
pub key_error: Option<Vec<Error>>,
@@ -667,6 +701,7 @@ where
{
type Result = CleanupLocksResult;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
let mut result = CleanupLocksResult::default();
let mut inner = self.inner.clone();
1 change: 1 addition & 0 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
@@ -159,6 +159,7 @@ where

/// Preserve all results, even some of them are Err.
/// To pass all responses to merge, and handle partial successful results correctly.
#[minitrace::trace]
pub fn retry_multi_region_preserve_results(
self,
backoff: Backoff,
28 changes: 28 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ mod request;

use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
@@ -33,6 +34,33 @@ pub struct RegionStore {
pub client: Arc<dyn KvClient + Send + Sync>,
}

impl fmt::Debug for RegionStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RegionStore")
.field("region_id", &self.region_with_leader.region.id)
.field(
"region_version",
&self
.region_with_leader
.region
.region_epoch
.as_ref()
.map(|e| e.version)
.unwrap_or_default(),
)
.field(
"leader_store_id",
&self
.region_with_leader
.leader
.as_ref()
.map(|l| l.store_id)
.unwrap_or_default(),
)
.finish()
}
}

#[derive(new, Clone)]
pub struct Store {
pub client: Arc<dyn KvClient + Send + Sync>,
3 changes: 2 additions & 1 deletion src/store/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;

use async_trait::async_trait;
@@ -13,7 +14,7 @@ use crate::Error;
use crate::Result;

#[async_trait]
pub trait Request: Any + Sync + Send + 'static {
pub trait Request: Any + Sync + Send + Debug + 'static {
async fn dispatch(
&self,
client: &TikvClient<Channel>,
16 changes: 11 additions & 5 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

use std::sync::Arc;

use log::as_debug;
use log::debug;
use log::info;

@@ -248,14 +249,17 @@ impl<Cod: Codec> Client<Cod> {
///
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
/// We skip the second step "delete ranges" which is an optimization for TiDB.
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
debug!("invoking transactional gc request");
#[minitrace::trace]
pub async fn gc(&self, range: impl Into<BoundRange>, safepoint: Timestamp) -> Result<bool> {
let range = range.into();
debug!(range = as_debug!(range); "invoking transactional gc request");

let options = ResolveLocksOptions {
batch_size: SCAN_LOCK_BATCH_SIZE,
..Default::default()
};
self.cleanup_locks(.., &safepoint, options).await?;

self.cleanup_locks(range, &safepoint, options).await?;

// update safepoint to PD
let res: bool = self
@@ -269,17 +273,19 @@ impl<Cod: Codec> Client<Cod> {
Ok(res)
}

#[minitrace::trace]
pub async fn cleanup_locks(
&self,
range: impl Into<BoundRange>,
safepoint: &Timestamp,
options: ResolveLocksOptions,
) -> Result<CleanupLocksResult> {
debug!("invoking cleanup async commit locks");
let range = range.into();
debug!(range = as_debug!(range); "invoking transactional gc request");
// scan all locks with ts <= safepoint
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
let req = new_scan_lock_request(range, safepoint, options.batch_size);
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
.cleanup_locks(ctx.clone(), options, backoff)
42 changes: 40 additions & 2 deletions src/transaction/lock.rs
Original file line number Diff line number Diff line change
@@ -5,13 +5,17 @@ use std::collections::HashSet;
use std::sync::Arc;

use fail::fail_point;
use log::as_display;
use log::debug;
use log::error;
use log::info;
use minitrace::prelude::*;
use tokio::sync::RwLock;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::kv::HexRepr;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::TxnInfo;
@@ -41,6 +45,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
/// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get
/// its status (committed or rolled back). Then, we use the status of its primary lock to determine
/// the status of the other keys in the same transaction.
#[minitrace::trace]
pub async fn resolve_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
@@ -54,11 +59,23 @@ pub async fn resolve_locks(
ts.physical - Timestamp::from_version(lock.lock_version).physical
>= lock.lock_ttl as i64
});
debug!(
"resolving locks: expired_locks {:?}, live_locks {:?}",
expired_locks, live_locks
);

// records the commit version of each primary lock (representing the status of the transaction)
let mut commit_versions: HashMap<u64, u64> = HashMap::new();
let mut clean_regions: HashMap<u64, HashSet<RegionVerId>> = HashMap::new();
for lock in expired_locks {
let _span =
LocalSpan::enter_with_local_parent("cleanup_expired_lock").with_properties(|| {
[
("lock_version", lock.lock_version.to_string()),
("primary_lock", HexRepr(&lock.primary_lock).to_string()),
]
});

let region_ver_id = pd_client
.region_for_key(&lock.primary_lock.clone().into())
.await?
@@ -75,6 +92,7 @@ pub async fn resolve_locks(
let commit_version = match commit_versions.get(&lock.lock_version) {
Some(&commit_version) => commit_version,
None => {
debug!("cleanup lock");
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
@@ -84,6 +102,7 @@ pub async fn resolve_locks(
.post_process_default()
.plan();
let commit_version = plan.execute().await?;
debug!("cleanup lock done: commit_version {}", commit_version);
commit_versions.insert(lock.lock_version, commit_version);
commit_version
}
@@ -104,6 +123,7 @@ pub async fn resolve_locks(
Ok(live_locks)
}

#[minitrace::trace]
async fn resolve_lock_with_retry(
#[allow(clippy::ptr_arg)] key: &Vec<u8>,
start_version: u64,
@@ -134,6 +154,7 @@ async fn resolve_lock_with_retry(
// ResolveLockResponse can have at most 1 error
match errors.pop() {
e @ Some(Error::RegionError(_)) => {
pd_client.invalidate_region_cache(ver_id).await;
error = e;
continue;
}
@@ -209,6 +230,7 @@ impl LockResolver {
/// _Cleanup_ the given locks. Returns whether all the given locks are resolved.
///
/// Note: Will rollback RUNNING transactions. ONLY use in GC.
#[minitrace::trace]
pub async fn cleanup_locks(
&mut self,
store: RegionStore,
@@ -244,6 +266,12 @@ impl LockResolver {
l.lock_type == kvrpcpb::Op::PessimisticLock as i32,
)
.await?;
debug!(
"cleanup_locks: txn_id:{}, primary:{}, status:{:?}",
txn_id,
HexRepr(&l.primary_lock),
status
);

// If the transaction uses async commit, check_txn_status will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
@@ -290,7 +318,7 @@ impl LockResolver {
match &status.kind {
TransactionStatusKind::Locked(_, lock_info) => {
error!(
"cleanup_locks fail to clean locks, this result is not expected. txn_id:{}",
"cleanup_locks: fail to clean locks, this result is not expected. txn_id:{}",
txn_id
);
return Err(Error::ResolveLockError(vec![lock_info.clone()]));
@@ -301,7 +329,7 @@ impl LockResolver {
}

debug!(
"batch resolve locks, region:{:?}, txn:{:?}",
"cleanup_locks: batch resolve locks, region:{:?}, txn:{:?}",
store.region_with_leader.ver_id(),
txn_infos
);
@@ -317,6 +345,10 @@ impl LockResolver {
let cleaned_region = self
.batch_resolve_locks(pd_client.clone(), store.clone(), txn_info_vec)
.await?;
debug!(
"cleanup_locks: batch resolve locks, cleaned_region:{:?}",
cleaned_region
);
for txn_id in txn_ids {
self.ctx
.save_cleaned_region(txn_id, cleaned_region.clone())
@@ -327,6 +359,7 @@ impl LockResolver {
}

#[allow(clippy::too_many_arguments)]
#[minitrace::trace]
pub async fn check_txn_status(
&mut self,
pd_client: Arc<impl PdClient>,
@@ -338,6 +371,8 @@ impl LockResolver {
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) -> Result<Arc<TransactionStatus>> {
info!("primary" = as_display!(HexRepr(&primary)); "check_txn_status");

if let Some(txn_status) = self.ctx.get_resolved(txn_id).await {
return Ok(txn_status);
}
@@ -370,13 +405,15 @@ impl LockResolver {

let current = pd_client.clone().get_timestamp().await?;
res.check_ttl(current);
debug!("check_txn_status: status:{:?}", res);
let res = Arc::new(res);
if res.is_cacheable() {
self.ctx.save_resolved(txn_id, res.clone()).await;
}
Ok(res)
}

#[minitrace::trace]
async fn check_all_secondaries(
&mut self,
pd_client: Arc<impl PdClient>,
@@ -393,6 +430,7 @@ impl LockResolver {
plan.execute().await
}

#[minitrace::trace]
async fn batch_resolve_locks(
&mut self,
pd_client: Arc<impl PdClient>,
9 changes: 9 additions & 0 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use either::Either;
use futures::stream::BoxStream;
use futures::stream::{self};
use futures::StreamExt;
use log::debug;

use super::transaction::TXN_COMMIT_BATCH_SIZE;
use crate::collect_first;
@@ -175,7 +176,14 @@ shardable_range!(kvrpcpb::ScanRequest);
impl Merge<kvrpcpb::ScanResponse> for Collect {
type Out = Vec<KvPair>;

#[minitrace::trace]
fn merge(&self, input: Vec<Result<kvrpcpb::ScanResponse>>) -> Result<Self::Out> {
let length: usize = input
.iter()
.map(|r| r.as_ref().map(|r| r.pairs.len()).unwrap_or_default())
.sum();
debug!("Collect<ScanResponse>::merge: result length {}", length);

input
.into_iter()
.flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
@@ -816,6 +824,7 @@ impl Merge<kvrpcpb::CheckSecondaryLocksResponse> for Collect {
}
}

#[derive(Debug)]
pub struct SecondaryLocksStatus {
pub commit_ts: Option<Timestamp>,
pub min_commit_ts: u64,
1 change: 1 addition & 0 deletions src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<Cod, PdC> {
}

/// Scan a range, return at most `limit` key-value pairs that lying in the range.
#[minitrace::trace]
pub async fn scan(
&mut self,
range: impl Into<BoundRange>,
27 changes: 25 additions & 2 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt;
use std::iter;
use std::marker::PhantomData;
use std::sync::atomic;
@@ -10,9 +11,11 @@ use std::time::Instant;
use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use log::as_debug;
use log::debug;
use log::info;
use log::warn;
use tokio::time::Duration;
use std::time::Duration;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
@@ -88,6 +91,17 @@ pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> =
phantom: PhantomData<Cod>,
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> fmt::Debug for Transaction<Cod, PdC> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Transaction")
.field("timestamp", &self.timestamp)
.field("options", &self.options)
.field("is_heartbeat_started", &self.is_heartbeat_started)
.field("start_instant", &self.start_instant)
.finish()
}
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
pub(crate) fn new(
timestamp: Timestamp,
@@ -354,6 +368,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// txn.commit().await.unwrap();
/// # });
/// ```
#[minitrace::trace]
pub async fn scan(
&mut self,
range: impl Into<BoundRange>,
@@ -390,6 +405,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// txn.commit().await.unwrap();
/// # });
/// ```
#[minitrace::trace]
pub async fn scan_keys(
&mut self,
range: impl Into<BoundRange>,
@@ -405,6 +421,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// Create a 'scan_reverse' request.
///
/// Similar to [`scan`](Transaction::scan), but scans in the reverse direction.
#[minitrace::trace]
pub async fn scan_reverse(
&mut self,
range: impl Into<BoundRange>,
@@ -417,6 +434,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// Create a 'scan_keys_reverse' request.
///
/// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction.
#[minitrace::trace]
pub async fn scan_keys_reverse(
&mut self,
range: impl Into<BoundRange>,
@@ -755,6 +773,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
plan.execute().await
}

#[minitrace::trace]
async fn scan_inner(
&mut self,
range: impl Into<BoundRange>,
@@ -767,9 +786,12 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();

let range = range.into();
info!(range = as_debug!(&range); "scanning range");

self.buffer
.scan_and_fetch(
range.into(),
range,
limit,
!key_only,
reverse,
@@ -1006,6 +1028,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Drop for Transaction<Cod, PdC> {
#[minitrace::trace]
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
2 changes: 1 addition & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -554,7 +554,7 @@ async fn raw_req() -> Result<()> {
async fn txn_update_safepoint() -> Result<()> {
init().await?;
let client = TransactionClient::new(pd_addrs()).await?;
let res = client.gc(client.current_timestamp().await?).await?;
let res = client.gc(.., client.current_timestamp().await?).await?;
assert!(res);
Ok(())
}