From 21798d3f2ca87253f3204775b9ff612de40c3222 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Fri, 30 Jun 2023 14:28:00 +0800 Subject: [PATCH 1/5] Fix publish issue Signed-off-by: Andy Lok --- Cargo.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 06a8367a..9cb13ccf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ protobuf-codec = [ "tikv-client-common/protobuf-codec", "tikv-client-pd/protobuf-codec", "tikv-client-store/protobuf-codec", - "mock-tikv/protobuf-codec", ] prost-codec = [ "grpcio/prost-codec", @@ -27,7 +26,6 @@ prost-codec = [ "tikv-client-common/prost-codec", "tikv-client-pd/prost-codec", "tikv-client-store/prost-codec", - "mock-tikv/prost-codec", ] [lib] @@ -64,7 +62,7 @@ tikv-client-store = { version = "0.2.0", path = "tikv-client-store" } [dev-dependencies] clap = "2" fail = { version = "0.4", features = [ "failpoints" ] } -mock-tikv = {path = "mock-tikv"} +mock-tikv = { path = "mock-tikv", features = ["protobuf-codec", "prost-codec"]} proptest = "1" proptest-derive = "0.3" serial_test = "0.5.0" From bf4d98882f79de5f01343a53387a5390ce365652 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Sat, 8 Jul 2023 02:00:03 +0800 Subject: [PATCH 2/5] migrate to tonic Signed-off-by: Andy Lok --- Cargo.toml | 56 ++-- mock-tikv/Cargo.toml | 15 - mock-tikv/src/lib.rs | 25 -- mock-tikv/src/pd.rs | 345 ------------------- mock-tikv/src/server.rs | 509 ----------------------------- mock-tikv/src/store.rs | 73 ----- src/kv/bound_range.rs | 4 +- src/kv/kvpair.rs | 8 +- src/lib.rs | 1 + src/mock.rs | 34 +- src/pd/client.rs | 47 +-- src/pd/retry.rs | 24 +- src/raw/requests.rs | 103 +++--- src/region.rs | 32 +- src/region_cache.rs | 20 +- src/request/mod.rs | 9 +- src/request/plan.rs | 62 ++-- src/request/shard.rs | 14 +- src/store.rs | 10 +- src/transaction/buffer.rs | 16 +- src/transaction/lock.rs | 22 +- src/transaction/lowering.rs | 6 +- src/transaction/requests.rs | 177 +++++----- tests/mock_tikv_tests.rs | 84 ----- tikv-client-common/Cargo.toml | 17 +- tikv-client-common/src/errors.rs | 11 +- tikv-client-common/src/security.rs | 39 ++- tikv-client-pd/Cargo.toml | 16 +- tikv-client-pd/src/cluster.rs | 155 +++++---- tikv-client-pd/src/timestamp.rs | 133 ++++---- tikv-client-proto/Cargo.toml | 16 +- tikv-client-proto/build.rs | 17 +- tikv-client-proto/src/lib.rs | 8 +- tikv-client-store/Cargo.toml | 12 +- tikv-client-store/src/client.rs | 25 +- tikv-client-store/src/errors.rs | 22 +- tikv-client-store/src/request.rs | 115 +++---- 37 files changed, 602 insertions(+), 1680 deletions(-) delete mode 100644 mock-tikv/Cargo.toml delete mode 100644 mock-tikv/src/lib.rs delete mode 100644 mock-tikv/src/pd.rs delete mode 100644 mock-tikv/src/server.rs delete mode 100644 mock-tikv/src/store.rs delete mode 100644 tests/mock_tikv_tests.rs diff --git a/Cargo.toml b/Cargo.toml index 9cb13ccf..84ce0d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,27 +6,13 @@ license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "The Rust language implementation of TiKV client." -edition = "2018" +edition = "2021" [features] -default = [ "prost-codec", "prometheus/process" ] +default = ["prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] -protobuf-codec = [ - "grpcio/protobuf-codec", - "tikv-client-proto/protobuf-codec", - "tikv-client-common/protobuf-codec", - "tikv-client-pd/protobuf-codec", - "tikv-client-store/protobuf-codec", -] -prost-codec = [ - "grpcio/prost-codec", - "tikv-client-proto/prost-codec", - "tikv-client-common/prost-codec", - "tikv-client-pd/prost-codec", - "tikv-client-store/prost-codec", -] [lib] name = "tikv_client" @@ -36,23 +22,24 @@ async-trait = "0.1" derive-new = "0.5" either = "1.6" fail = "0.4" -futures = { version = "0.3", features = ["async-await", "thread-pool"] } -futures-timer = "3.0" -grpcio = { version = "0.10", features = [ "openssl-vendored" ], default-features = false } +futures = { version = "0.3" } lazy_static = "1" log = "0.4" -prometheus = { version = "0.13", features = [ "push" ], default-features = false } +prometheus = { version = "0.13", features = ["push"], default-features = false } rand = "0.8" regex = "1" semver = "1.0" serde = "1.0" serde_derive = "1.0" -slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } +slog = { version = "2.3", features = [ + "max_level_trace", + "release_max_level_debug", +] } slog-term = { version = "2.4" } thiserror = "1" -tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] } +tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } async-recursion = "0.3" - +tonic = "0.9" tikv-client-common = { version = "0.2.0", path = "tikv-client-common" } tikv-client-pd = { version = "0.2.0", path = "tikv-client-pd" } tikv-client-proto = { version = "0.2.0", path = "tikv-client-proto" } @@ -61,31 +48,26 @@ tikv-client-store = { version = "0.2.0", path = "tikv-client-store" } [dev-dependencies] clap = "2" -fail = { version = "0.4", features = [ "failpoints" ] } -mock-tikv = { path = "mock-tikv", features = ["protobuf-codec", "prost-codec"]} +fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" serial_test = "0.5.0" simple_logger = "1" -tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] } -reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]} +tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } +reqwest = { version = "0.11", default-features = false, features = [ + "native-tls-vendored", +] } serde_json = "1" [workspace] members = [ - "tikv-client-common", - "tikv-client-pd", - "tikv-client-proto", - "tikv-client-store", - "mock-tikv" + "tikv-client-common", + "tikv-client-pd", + "tikv-client-proto", + "tikv-client-store", ] [[test]] name = "failpoint_tests" path = "tests/failpoint_tests.rs" required-features = ["fail/failpoints"] - -[patch.crates-io] -raft-proto = { git = "https://github.com/tikv/raft-rs", rev="95c532612ee6a83591fce9a8b51d6afe87b58835"} -protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", rev="82b49fea7e696fd647b5aca0a6c6ec944eab3189" } -protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev="82b49fea7e696fd647b5aca0a6c6ec944eab3189" } diff --git a/mock-tikv/Cargo.toml b/mock-tikv/Cargo.toml deleted file mode 100644 index d503e68f..00000000 --- a/mock-tikv/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "mock-tikv" -version = "0.0.0" -edition = "2018" - -[features] -protobuf-codec = ["grpcio/protobuf-codec"] -prost-codec = ["grpcio/prost-codec"] - -[dependencies] -derive-new = "0.5" -futures = "0.3" -grpcio = { version = "0.10", default-features = false } -log = "0.4" -tikv-client-proto = { path = "../tikv-client-proto"} diff --git a/mock-tikv/src/lib.rs b/mock-tikv/src/lib.rs deleted file mode 100644 index 49aae1a3..00000000 --- a/mock-tikv/src/lib.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. - -// To support both prost & rust-protobuf. -#![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))] - -mod pd; -mod server; -mod store; - -pub use pd::{start_mock_pd_server, MockPd, MOCK_PD_PORT}; -pub use server::{start_mock_tikv_server, MockTikv, MOCK_TIKV_PORT}; -pub use store::KvStore; - -/// A common pattern for implementing an unary RPC call. -/// Successfully returns the result. -#[macro_export] -macro_rules! spawn_unary_success { - ($ctx:ident, $req:ident, $resp:ident, $sink:ident) => { - let f = $sink - .success($resp) - .map_err(move |e| panic!("failed to reply {:?}: {:?}", $req, e)) - .map(|_| ()); - $ctx.spawn(f); - }; -} diff --git a/mock-tikv/src/pd.rs b/mock-tikv/src/pd.rs deleted file mode 100644 index 115d2179..00000000 --- a/mock-tikv/src/pd.rs +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. - -use crate::{spawn_unary_success, MOCK_TIKV_PORT}; -use futures::{FutureExt, StreamExt, TryFutureExt}; -use grpcio::{Environment, Server, ServerBuilder, WriteFlags}; -use std::sync::Arc; -use tikv_client_proto::pdpb::*; - -pub const MOCK_PD_PORT: u16 = 50021; -/// This is mock pd server, used with mock tikv server. -#[derive(Debug, Clone)] -pub struct MockPd {} - -impl MockPd { - fn new() -> MockPd { - MockPd {} - } - - fn region() -> tikv_client_proto::metapb::Region { - tikv_client_proto::metapb::Region { - start_key: vec![], - end_key: vec![], - peers: vec![Self::leader()].into(), - ..Default::default() - } - } - - fn leader() -> tikv_client_proto::metapb::Peer { - tikv_client_proto::metapb::Peer::default() - } - - fn store() -> tikv_client_proto::metapb::Store { - // TODO: start_timestamp? - tikv_client_proto::metapb::Store { - address: format!("localhost:{MOCK_TIKV_PORT}"), - ..Default::default() - } - } -} - -pub fn start_mock_pd_server() -> Server { - let env = Arc::new(Environment::new(1)); - let mut server = ServerBuilder::new(env) - .register_service(create_pd(MockPd::new())) - .bind("localhost", MOCK_PD_PORT) - .build() - .unwrap(); - server.start(); - server -} - -impl Pd for MockPd { - fn get_members( - &mut self, - ctx: ::grpcio::RpcContext, - req: GetMembersRequest, - sink: ::grpcio::UnarySink, - ) { - let member = Member { - name: "mock tikv".to_owned(), - client_urls: vec![format!("localhost:{MOCK_PD_PORT}")].into(), - ..Default::default() - }; - let resp = GetMembersResponse { - members: vec![member.clone()].into(), - leader: Some(member).into(), - ..Default::default() - }; - spawn_unary_success!(ctx, req, resp, sink); - } - - fn tso( - &mut self, - ctx: ::grpcio::RpcContext, - stream: ::grpcio::RequestStream, - sink: ::grpcio::DuplexSink, - ) { - let f = stream - .map(|_| { - let resp = TsoResponse::default(); - // TODO: make ts monotonic - Ok((resp, WriteFlags::default())) - }) - .forward(sink) - .map(|_| ()); - ctx.spawn(f); - } - - fn bootstrap( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: BootstrapRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn is_bootstrapped( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: IsBootstrappedRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn alloc_id( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: AllocIdRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_store( - &mut self, - ctx: ::grpcio::RpcContext, - req: GetStoreRequest, - sink: ::grpcio::UnarySink, - ) { - let resp = GetStoreResponse { - store: Some(Self::store()).into(), - ..Default::default() - }; - spawn_unary_success!(ctx, req, resp, sink); - } - - fn put_store( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: PutStoreRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_all_stores( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: GetAllStoresRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn store_heartbeat( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: StoreHeartbeatRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn region_heartbeat( - &mut self, - _ctx: ::grpcio::RpcContext, - _stream: ::grpcio::RequestStream, - _sink: ::grpcio::DuplexSink, - ) { - todo!() - } - - fn get_region( - &mut self, - ctx: ::grpcio::RpcContext, - req: GetRegionRequest, - sink: ::grpcio::UnarySink, - ) { - let resp = GetRegionResponse { - region: Some(Self::region()).into(), - leader: Some(Self::leader()).into(), - ..Default::default() - }; - spawn_unary_success!(ctx, req, resp, sink); - } - - fn get_prev_region( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: GetRegionRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_region_by_id( - &mut self, - ctx: ::grpcio::RpcContext, - req: GetRegionByIdRequest, - sink: ::grpcio::UnarySink, - ) { - let resp = GetRegionResponse { - region: Some(Self::region()).into(), - leader: Some(Self::leader()).into(), - ..Default::default() - }; - spawn_unary_success!(ctx, req, resp, sink); - } - - fn scan_regions( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: ScanRegionsRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn ask_split( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: AskSplitRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn report_split( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: ReportSplitRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn ask_batch_split( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: AskBatchSplitRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn report_batch_split( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: ReportBatchSplitRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_cluster_config( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: GetClusterConfigRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn put_cluster_config( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: PutClusterConfigRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn scatter_region( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: ScatterRegionRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_gc_safe_point( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: GetGcSafePointRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn update_gc_safe_point( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: UpdateGcSafePointRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn update_service_gc_safe_point( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: UpdateServiceGcSafePointRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn sync_regions( - &mut self, - _ctx: ::grpcio::RpcContext, - _stream: ::grpcio::RequestStream, - _sink: ::grpcio::DuplexSink, - ) { - todo!() - } - - fn get_operator( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: GetOperatorRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn sync_max_ts( - &mut self, - _ctx: ::grpcio::RpcContext, - _req: SyncMaxTsRequest, - _sink: ::grpcio::UnarySink, - ) { - todo!() - } - - fn split_regions( - &mut self, - _: ::grpcio::RpcContext<'_>, - _: SplitRegionsRequest, - _: ::grpcio::UnarySink, - ) { - todo!() - } - - fn get_dc_location_info( - &mut self, - _: grpcio::RpcContext, - _: GetDcLocationInfoRequest, - _: grpcio::UnarySink, - ) { - todo!() - } -} diff --git a/mock-tikv/src/server.rs b/mock-tikv/src/server.rs deleted file mode 100644 index acaf66af..00000000 --- a/mock-tikv/src/server.rs +++ /dev/null @@ -1,509 +0,0 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. - -#![allow(clippy::useless_conversion)] // To support both prost & rust-protobuf. - -use crate::{spawn_unary_success, KvStore}; -use derive_new::new; -use futures::{FutureExt, TryFutureExt}; -use grpcio::{Environment, Server, ServerBuilder}; -use std::sync::Arc; -use tikv_client_proto::{kvrpcpb::*, tikvpb::*}; - -pub const MOCK_TIKV_PORT: u16 = 50019; - -pub fn start_mock_tikv_server() -> Server { - let env = Arc::new(Environment::new(1)); - let mut server = ServerBuilder::new(env) - .register_service(create_tikv(MockTikv::new(KvStore::new()))) - .bind("localhost", MOCK_TIKV_PORT) - .build() - .unwrap(); - server.start(); - server -} - -#[derive(Debug, Clone, new)] -pub struct MockTikv { - inner: KvStore, -} - -impl Tikv for MockTikv { - fn kv_get( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::GetRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_scan( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::ScanRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_prewrite( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::PrewriteRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_pessimistic_lock( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::PessimisticLockRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_pessimistic_rollback( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::PessimisticRollbackRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_txn_heart_beat( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::TxnHeartBeatRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_check_txn_status( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::CheckTxnStatusRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_commit( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::CommitRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_import( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::ImportRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_cleanup( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::CleanupRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_batch_get( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::BatchGetRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_batch_rollback( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::BatchRollbackRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_scan_lock( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::ScanLockRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_resolve_lock( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::ResolveLockRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_gc( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::GcRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn kv_delete_range( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::DeleteRangeRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_get( - &mut self, - ctx: grpcio::RpcContext, - req: tikv_client_proto::kvrpcpb::RawGetRequest, - sink: grpcio::UnarySink, - ) { - let mut resp = RawGetResponse::default(); - if let Some(v) = self.inner.raw_get(req.get_key()) { - resp.set_value(v); - } else { - resp.set_not_found(true); - } - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_batch_get( - &mut self, - ctx: grpcio::RpcContext, - mut req: tikv_client_proto::kvrpcpb::RawBatchGetRequest, - sink: grpcio::UnarySink, - ) { - let mut resp = tikv_client_proto::kvrpcpb::RawBatchGetResponse::default(); - resp.set_pairs(self.inner.raw_batch_get(req.take_keys().into()).into()); - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_put( - &mut self, - ctx: grpcio::RpcContext, - req: tikv_client_proto::kvrpcpb::RawPutRequest, - sink: grpcio::UnarySink, - ) { - self.inner - .raw_put(req.get_key().to_vec(), req.get_value().to_vec()); - let resp = RawPutResponse::default(); - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_batch_put( - &mut self, - ctx: grpcio::RpcContext, - mut req: tikv_client_proto::kvrpcpb::RawBatchPutRequest, - sink: grpcio::UnarySink, - ) { - let pairs = req.take_pairs().into(); - self.inner.raw_batch_put(pairs); - let resp = RawBatchPutResponse::default(); - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_delete( - &mut self, - ctx: grpcio::RpcContext, - req: tikv_client_proto::kvrpcpb::RawDeleteRequest, - sink: grpcio::UnarySink, - ) { - let key = req.get_key(); - self.inner.raw_delete(key); - let resp = RawDeleteResponse::default(); - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_batch_delete( - &mut self, - ctx: grpcio::RpcContext, - mut req: tikv_client_proto::kvrpcpb::RawBatchDeleteRequest, - sink: grpcio::UnarySink, - ) { - let keys = req.take_keys().into(); - self.inner.raw_batch_delete(keys); - let resp = RawBatchDeleteResponse::default(); - spawn_unary_success!(ctx, req, resp, sink); - } - - fn raw_scan( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::RawScanRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_delete_range( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::RawDeleteRangeRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_batch_scan( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::RawBatchScanRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn unsafe_destroy_range( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::UnsafeDestroyRangeRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn register_lock_observer( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::RegisterLockObserverRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn check_lock_observer( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::CheckLockObserverRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn remove_lock_observer( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::RemoveLockObserverRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn physical_scan_lock( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::PhysicalScanLockRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn coprocessor( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::coprocessor::Request, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn coprocessor_stream( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::coprocessor::Request, - _sink: grpcio::ServerStreamingSink, - ) { - todo!() - } - - fn batch_coprocessor( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::coprocessor::BatchRequest, - _sink: grpcio::ServerStreamingSink, - ) { - todo!() - } - - fn raft( - &mut self, - _ctx: grpcio::RpcContext, - _stream: grpcio::RequestStream, - _sink: grpcio::ClientStreamingSink, - ) { - todo!() - } - - fn batch_raft( - &mut self, - _ctx: grpcio::RpcContext, - _stream: grpcio::RequestStream, - _sink: grpcio::ClientStreamingSink, - ) { - todo!() - } - - fn snapshot( - &mut self, - _ctx: grpcio::RpcContext, - _stream: grpcio::RequestStream, - _sink: grpcio::ClientStreamingSink, - ) { - todo!() - } - - fn split_region( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::SplitRegionRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn read_index( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::ReadIndexRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn mvcc_get_by_key( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::MvccGetByKeyRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn mvcc_get_by_start_ts( - &mut self, - _ctx: grpcio::RpcContext, - _req: tikv_client_proto::kvrpcpb::MvccGetByStartTsRequest, - _sink: grpcio::UnarySink, - ) { - todo!() - } - - fn batch_commands( - &mut self, - _ctx: grpcio::RpcContext, - _stream: grpcio::RequestStream, - _sink: grpcio::DuplexSink, - ) { - todo!() - } - - fn kv_check_secondary_locks( - &mut self, - _: grpcio::RpcContext<'_>, - _: tikv_client_proto::kvrpcpb::CheckSecondaryLocksRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn dispatch_mpp_task( - &mut self, - _: grpcio::RpcContext<'_>, - _: tikv_client_proto::mpp::DispatchTaskRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn cancel_mpp_task( - &mut self, - _: grpcio::RpcContext<'_>, - _: tikv_client_proto::mpp::CancelTaskRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn establish_mpp_connection( - &mut self, - _: grpcio::RpcContext<'_>, - _: tikv_client_proto::mpp::EstablishMppConnectionRequest, - _: grpcio::ServerStreamingSink, - ) { - todo!() - } - - fn check_leader( - &mut self, - _: grpcio::RpcContext<'_>, - _: tikv_client_proto::kvrpcpb::CheckLeaderRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_get_key_ttl( - &mut self, - _: grpcio::RpcContext, - _: RawGetKeyTtlRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_compare_and_swap( - &mut self, - _: grpcio::RpcContext, - _: RawCasRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn raw_coprocessor( - &mut self, - _: grpcio::RpcContext, - _: RawCoprocessorRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn get_store_safe_ts( - &mut self, - _: grpcio::RpcContext, - _: StoreSafeTsRequest, - _: grpcio::UnarySink, - ) { - todo!() - } - - fn get_lock_wait_info( - &mut self, - _: grpcio::RpcContext, - _: GetLockWaitInfoRequest, - _: grpcio::UnarySink, - ) { - todo!() - } -} diff --git a/mock-tikv/src/store.rs b/mock-tikv/src/store.rs deleted file mode 100644 index a69c3afc..00000000 --- a/mock-tikv/src/store.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. - -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; -use tikv_client_proto::kvrpcpb::KvPair; - -#[derive(Debug, Clone)] -pub struct KvStore { - data: Arc, Vec>>>, -} - -impl Default for KvStore { - fn default() -> Self { - Self::new() - } -} - -impl KvStore { - pub fn new() -> KvStore { - KvStore { - data: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub fn raw_get(&self, key: &[u8]) -> Option> { - let data = self.data.read().unwrap(); - data.get(key).map(|v| v.to_vec()) - } - - pub fn raw_batch_get(&self, keys: Vec>) -> Vec { - let data = self.data.read().unwrap(); - keys.into_iter() - .filter_map(|key| { - if data.contains_key(&key) { - let mut pair = KvPair::default(); - pair.set_value(data.get(&key).unwrap().to_vec()); - pair.set_key(key); - Some(pair) - } else { - None - } - }) - .collect() - } - - pub fn raw_put(&self, key: Vec, value: Vec) { - let mut data = self.data.write().unwrap(); - data.insert(key, value); - } - - pub fn raw_batch_put(&self, pairs: Vec) { - let mut data = self.data.write().unwrap(); - data.extend( - pairs - .into_iter() - .map(|mut pair| (pair.take_key(), pair.take_value())), - ); - } - - pub fn raw_delete(&self, key: &[u8]) { - let mut data = self.data.write().unwrap(); - data.remove(key); - } - - pub fn raw_batch_delete(&self, keys: Vec>) { - let mut data = self.data.write().unwrap(); - keys.iter().for_each(|k| { - data.remove(k); - }); - } -} diff --git a/src/kv/bound_range.rs b/src/kv/bound_range.rs index 4ef7fb6f..f46599c3 100644 --- a/src/kv/bound_range.rs +++ b/src/kv/bound_range.rs @@ -263,8 +263,8 @@ impl From for kvrpcpb::KeyRange { fn from(bound_range: BoundRange) -> Self { let (start, end) = bound_range.into_keys(); let mut range = kvrpcpb::KeyRange::default(); - range.set_start_key(start.into()); - range.set_end_key(end.unwrap_or_default().into()); + range.start_key = start.into(); + range.end_key = end.unwrap_or_default().into(); range } } diff --git a/src/kv/kvpair.rs b/src/kv/kvpair.rs index c4ce4443..9e1ea857 100644 --- a/src/kv/kvpair.rs +++ b/src/kv/kvpair.rs @@ -103,8 +103,8 @@ impl From for Key { } impl From for KvPair { - fn from(mut pair: kvrpcpb::KvPair) -> Self { - KvPair(Key::from(pair.take_key()), pair.take_value()) + fn from(pair: kvrpcpb::KvPair) -> Self { + KvPair(Key::from(pair.key), pair.value) } } @@ -112,8 +112,8 @@ impl From for kvrpcpb::KvPair { fn from(pair: KvPair) -> Self { let mut result = kvrpcpb::KvPair::default(); let (key, value) = pair.into(); - result.set_key(key.into()); - result.set_value(value); + result.key = key.into(); + result.value = value; result } } diff --git a/src/lib.rs b/src/lib.rs index d40624cb..4d9fd002 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,6 +92,7 @@ // To support both prost & rust-protobuf. #![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))] +#![allow(clippy::field_reassign_with_default)] #[macro_use] pub mod request; diff --git a/src/mock.rs b/src/mock.rs index 02a0bef9..6adea181 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -15,7 +15,7 @@ use async_trait::async_trait; use derive_new::new; use slog::{Drain, Logger}; use std::{any::Any, sync::Arc}; -use tikv_client_proto::metapb; +use tikv_client_proto::metapb::{self, RegionEpoch}; use tikv_client_store::{KvClient, KvConnect, Request}; /// Create a `PdRpcClient` with it's internals replaced with mocks so that the @@ -32,10 +32,9 @@ pub async fn pd_rpc_client() -> PdRpcClient { ); PdRpcClient::new( config.clone(), - |_, _| MockKvConnect, - |e, sm| { + |_| MockKvConnect, + |sm| { futures::future::ok(RetryClient::new_with_cluster( - e, sm, config.timeout, MockCluster, @@ -86,10 +85,11 @@ impl KvClient for MockKvClient { } } +#[async_trait] impl KvConnect for MockKvConnect { type KvClient = MockKvClient; - fn connect(&self, address: &str) -> Result { + async fn connect(&self, address: &str) -> Result { Ok(MockKvClient { addr: address.to_owned(), dispatch: None, @@ -107,8 +107,12 @@ impl MockPdClient { pub fn region1() -> RegionWithLeader { let mut region = RegionWithLeader::default(); region.region.id = 1; - region.region.set_start_key(vec![]); - region.region.set_end_key(vec![10]); + region.region.start_key = vec![]; + region.region.end_key = vec![10]; + region.region.region_epoch = Some(RegionEpoch { + conf_ver: 0, + version: 0, + }); let leader = metapb::Peer { store_id: 41, @@ -122,8 +126,12 @@ impl MockPdClient { pub fn region2() -> RegionWithLeader { let mut region = RegionWithLeader::default(); region.region.id = 2; - region.region.set_start_key(vec![10]); - region.region.set_end_key(vec![250, 250]); + region.region.start_key = vec![10]; + region.region.end_key = vec![250, 250]; + region.region.region_epoch = Some(RegionEpoch { + conf_ver: 0, + version: 0, + }); let leader = metapb::Peer { store_id: 42, @@ -137,8 +145,12 @@ impl MockPdClient { pub fn region3() -> RegionWithLeader { let mut region = RegionWithLeader::default(); region.region.id = 3; - region.region.set_start_key(vec![250, 250]); - region.region.set_end_key(vec![]); + region.region.start_key = vec![250, 250]; + region.region.end_key = vec![]; + region.region.region_epoch = Some(RegionEpoch { + conf_ver: 0, + version: 0, + }); let leader = metapb::Peer { store_id: 43, diff --git a/src/pd/client.rs b/src/pd/client.rs index 87cf7cda..440dfb4d 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -11,17 +11,13 @@ use crate::{ }; use async_trait::async_trait; use futures::{prelude::*, stream::BoxStream}; -use grpcio::{EnvBuilder, Environment}; use slog::Logger; -use std::{collections::HashMap, sync::Arc, thread}; +use std::{collections::HashMap, sync::Arc}; use tikv_client_pd::Cluster; use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_store::{KvClient, KvConnect, TikvConnect}; use tokio::sync::RwLock; -const CQ_COUNT: usize = 1; -const CLIENT_PREFIX: &str = "tikv-client"; - /// The PdClient handles all the encoding stuff. /// /// Raw APIs does not require encoding/decoding at all. @@ -182,8 +178,8 @@ pub trait PdClient: Send + Sync + 'static { fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { if enable_codec { - codec::decode_bytes_in_place(region.region.mut_start_key(), false)?; - codec::decode_bytes_in_place(region.region.mut_end_key(), false)?; + codec::decode_bytes_in_place(&mut region.region.start_key, false)?; + codec::decode_bytes_in_place(&mut region.region.end_key, false)?; } Ok(region) } @@ -211,7 +207,7 @@ impl PdClient for PdRpcClient { async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { let store_id = region.get_store_id()?; let store = self.region_cache.get_store_by_id(store_id).await?; - let kv_client = self.kv_client(store.get_address()).await?; + let kv_client = self.kv_client(&store.address).await?; Ok(RegionStore::new(region, Arc::new(kv_client))) } @@ -258,10 +254,8 @@ impl PdRpcClient { ) -> Result { PdRpcClient::new( config.clone(), - |env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout), - |env, security_mgr| { - RetryClient::connect(env, pd_endpoints, security_mgr, config.timeout) - }, + |security_mgr| TikvConnect::new(security_mgr, config.timeout), + |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout), enable_codec, logger, ) @@ -269,15 +263,6 @@ impl PdRpcClient { } } -/// make a thread name with additional tag inheriting from current thread. -fn thread_name(prefix: &str) -> String { - thread::current() - .name() - .and_then(|name| name.split("::").skip(1).last()) - .map(|tag| format!("{prefix}::{tag}")) - .unwrap_or_else(|| prefix.to_owned()) -} - impl PdRpcClient { pub async fn new( config: Config, @@ -288,15 +273,9 @@ impl PdRpcClient { ) -> Result> where PdFut: Future>>, - MakeKvC: FnOnce(Arc, Arc) -> KvC, - MakePd: FnOnce(Arc, Arc) -> PdFut, + MakeKvC: FnOnce(Arc) -> KvC, + MakePd: FnOnce(Arc) -> PdFut, { - let env = Arc::new( - EnvBuilder::new() - .cq_count(CQ_COUNT) - .name_prefix(thread_name(CLIENT_PREFIX)) - .build(), - ); let security_mgr = Arc::new( if let (Some(ca_path), Some(cert_path), Some(key_path)) = (&config.ca_path, &config.cert_path, &config.key_path) @@ -307,12 +286,12 @@ impl PdRpcClient { }, ); - let pd = Arc::new(pd(env.clone(), security_mgr.clone()).await?); + let pd = Arc::new(pd(security_mgr.clone()).await?); let kv_client_cache = Default::default(); Ok(PdRpcClient { pd: pd.clone(), kv_client_cache, - kv_connect: kv_connect(env, security_mgr), + kv_connect: kv_connect(security_mgr), enable_codec, region_cache: RegionCache::new(pd), logger, @@ -324,7 +303,7 @@ impl PdRpcClient { return Ok(client.clone()); }; info!(self.logger, "connect to tikv endpoint: {:?}", address); - match self.kv_connect.connect(address) { + match self.kv_connect.connect(address).await { Ok(client) => { self.kv_client_cache .write() @@ -339,8 +318,8 @@ impl PdRpcClient { fn make_key_range(start_key: Vec, end_key: Vec) -> kvrpcpb::KeyRange { let mut key_range = kvrpcpb::KeyRange::default(); - key_range.set_start_key(start_key); - key_range.set_end_key(end_key); + key_range.start_key = start_key; + key_range.end_key = end_key; key_range } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 6607a4b7..3734bce1 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -8,8 +8,6 @@ use crate::{ Error, Result, SecurityManager, }; use async_trait::async_trait; -use futures_timer::Delay; -use grpcio::Environment; use std::{ fmt, sync::Arc, @@ -20,7 +18,7 @@ use tikv_client_proto::{ metapb, pdpb::{self, Timestamp}, }; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, time::sleep}; // FIXME: these numbers and how they are used are all just cargo-culted in, there // may be more optimal values. @@ -55,12 +53,11 @@ pub struct RetryClient { #[cfg(test)] impl RetryClient { pub fn new_with_cluster( - env: Arc, security_mgr: Arc, timeout: Duration, cluster: Cl, ) -> RetryClient { - let connection = Connection::new(env, security_mgr); + let connection = Connection::new(security_mgr); RetryClient { cluster: RwLock::new((cluster, Instant::now())), connection, @@ -77,7 +74,7 @@ macro_rules! retry { // use the block here to drop the guard of the read lock, // otherwise `reconnect` will try to acquire the write lock and results in a deadlock let res = { - let $cluster = &$self.cluster.read().await.0; + let $cluster = &mut $self.cluster.write().await.0; let res = $call.await; res }; @@ -93,7 +90,7 @@ macro_rules! retry { if reconnect_count == 0 { return Err(e); } - Delay::new(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await; + sleep(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await; } } @@ -104,12 +101,11 @@ macro_rules! retry { impl RetryClient { pub async fn connect( - env: Arc, endpoints: &[String], security_mgr: Arc, timeout: Duration, ) -> Result { - let connection = Connection::new(env, security_mgr); + let connection = Connection::new(security_mgr); let cluster = RwLock::new(( connection.connect_cluster(endpoints, timeout).await?, Instant::now(), @@ -156,7 +152,7 @@ impl RetryClientTrait for RetryClient { cluster .get_store(id, self.timeout) .await - .map(|mut resp| resp.take_store()) + .map(|resp| resp.store.unwrap()) }) } @@ -166,7 +162,7 @@ impl RetryClientTrait for RetryClient { cluster .get_all_stores(self.timeout) .await - .map(|mut resp| resp.take_stores().into_iter().map(Into::into).collect()) + .map(|resp| resp.stores.into_iter().map(Into::into).collect()) }) } @@ -179,7 +175,7 @@ impl RetryClientTrait for RetryClient { cluster .update_safepoint(safepoint, self.timeout) .await - .map(|resp| resp.get_new_safe_point() == safepoint) + .map(|resp| resp.new_safe_point == safepoint) }) } } @@ -236,8 +232,8 @@ mod test { }; use tikv_client_common::internal_err; - #[test] - fn test_reconnect() { + #[tokio::test(flavor = "multi_thread")] + async fn test_reconnect() { struct MockClient { reconnect_count: AtomicUsize, cluster: RwLock<((), Instant)>, diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 400bf581..9842d36d 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,12 +1,12 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::{any::Any, ops::Range, sync::Arc}; +use std::{any::Any, ops::Range, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::stream::BoxStream; -use grpcio::CallOption; -use tikv_client_proto::{kvrpcpb, metapb, tikvpb::TikvClient}; +use tikv_client_proto::{kvrpcpb, metapb, tikvpb::tikv_client::TikvClient}; use tikv_client_store::Request; +use tonic::transport::Channel; use super::RawRpcRequest; use crate::{ @@ -24,7 +24,7 @@ use crate::{ pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest { let mut req = kvrpcpb::RawGetRequest::default(); - req.set_key(key); + req.key = key; req.maybe_set_cf(cf); req @@ -47,11 +47,11 @@ impl Process for DefaultProcessor { type Out = Option; fn process(&self, input: Result) -> Result { - let mut input = input?; + let input = input?; Ok(if input.not_found { None } else { - Some(input.take_value()) + Some(input.value) }) } } @@ -61,7 +61,7 @@ pub fn new_raw_batch_get_request( cf: Option, ) -> kvrpcpb::RawBatchGetRequest { let mut req = kvrpcpb::RawBatchGetRequest::default(); - req.set_keys(keys.into()); + req.keys = keys; req.maybe_set_cf(cf); req @@ -79,7 +79,7 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .flat_map_ok(|mut resp| resp.take_pairs().into_iter().map(Into::into)) + .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into)) .collect() } } @@ -91,10 +91,10 @@ pub fn new_raw_put_request( atomic: bool, ) -> kvrpcpb::RawPutRequest { let mut req = kvrpcpb::RawPutRequest::default(); - req.set_key(key); - req.set_value(value); + req.key = key; + req.value = value; req.maybe_set_cf(cf); - req.set_for_cas(atomic); + req.for_cas = atomic; req } @@ -117,9 +117,9 @@ pub fn new_raw_batch_put_request( atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { let mut req = kvrpcpb::RawBatchPutRequest::default(); - req.set_pairs(pairs.into()); + req.pairs = pairs; req.maybe_set_cf(cf); - req.set_for_cas(atomic); + req.for_cas = atomic; req } @@ -144,8 +144,8 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_pairs(shard.into()); + self.context = Some(store.region_with_leader.context()?); + self.pairs = shard; Ok(()) } } @@ -156,9 +156,9 @@ pub fn new_raw_delete_request( atomic: bool, ) -> kvrpcpb::RawDeleteRequest { let mut req = kvrpcpb::RawDeleteRequest::default(); - req.set_key(key); + req.key = key; req.maybe_set_cf(cf); - req.set_for_cas(atomic); + req.for_cas = atomic; req } @@ -180,7 +180,7 @@ pub fn new_raw_batch_delete_request( cf: Option, ) -> kvrpcpb::RawBatchDeleteRequest { let mut req = kvrpcpb::RawBatchDeleteRequest::default(); - req.set_keys(keys.into()); + req.keys = keys; req.maybe_set_cf(cf); req @@ -198,8 +198,8 @@ pub fn new_raw_delete_range_request( cf: Option, ) -> kvrpcpb::RawDeleteRangeRequest { let mut req = kvrpcpb::RawDeleteRangeRequest::default(); - req.set_start_key(start_key); - req.set_end_key(end_key); + req.start_key = start_key; + req.end_key = end_key; req.maybe_set_cf(cf); req @@ -219,10 +219,10 @@ pub fn new_raw_scan_request( cf: Option, ) -> kvrpcpb::RawScanRequest { let mut req = kvrpcpb::RawScanRequest::default(); - req.set_start_key(start_key); - req.set_end_key(end_key); - req.set_limit(limit); - req.set_key_only(key_only); + req.start_key = start_key; + req.end_key = end_key; + req.limit = limit; + req.key_only = key_only; req.maybe_set_cf(cf); req @@ -240,7 +240,7 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .flat_map_ok(|mut resp| resp.take_kvs().into_iter().map(Into::into)) + .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into)) .collect() } } @@ -252,9 +252,9 @@ pub fn new_raw_batch_scan_request( cf: Option, ) -> kvrpcpb::RawBatchScanRequest { let mut req = kvrpcpb::RawBatchScanRequest::default(); - req.set_ranges(ranges.into()); - req.set_each_limit(each_limit); - req.set_key_only(key_only); + req.ranges = ranges; + req.each_limit = each_limit; + req.key_only = key_only; req.maybe_set_cf(cf); req @@ -271,12 +271,12 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.ranges.clone().into(), pd_client.clone()) + store_stream_for_ranges(self.ranges.clone(), pd_client.clone()) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_ranges(shard.into()); + self.context = Some(store.region_with_leader.context()?); + self.ranges = shard; Ok(()) } } @@ -287,7 +287,7 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .flat_map_ok(|mut resp| resp.take_kvs().into_iter().map(Into::into)) + .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into)) .collect() } } @@ -299,11 +299,11 @@ pub fn new_cas_request( cf: Option, ) -> kvrpcpb::RawCasRequest { let mut req = kvrpcpb::RawCasRequest::default(); - req.set_key(key); - req.set_value(value); + req.key = key; + req.value = value; match previous_value { - Some(v) => req.set_previous_value(v), - None => req.set_previous_not_exist(true), + Some(v) => req.previous_value = v, + None => req.previous_not_exist = true, } req.maybe_set_cf(cf); req @@ -344,9 +344,9 @@ pub fn new_raw_coprocessor_request( data_builder: RawCoprocessorRequestDataBuilder, ) -> RawCoprocessorRequest { let mut inner = kvrpcpb::RawCoprocessorRequest::default(); - inner.set_copr_name(copr_name); - inner.set_copr_version_req(copr_version_req); - inner.set_ranges(ranges.into()); + inner.copr_name = copr_name; + inner.copr_version_req = copr_version_req; + inner.ranges = ranges; RawCoprocessorRequest { inner, data_builder, @@ -361,8 +361,12 @@ pub struct RawCoprocessorRequest { #[async_trait] impl Request for RawCoprocessorRequest { - async fn dispatch(&self, client: &TikvClient, options: CallOption) -> Result> { - self.inner.dispatch(client, options).await + async fn dispatch( + &self, + client: &TikvClient, + timeout: Duration, + ) -> Result> { + self.inner.dispatch(client, timeout).await } fn label(&self) -> &'static str { @@ -389,16 +393,13 @@ impl Shardable for RawCoprocessorRequest { &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.inner.ranges.clone().into(), pd_client.clone()) + store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.inner.set_context(store.region_with_leader.context()?); - self.inner.set_ranges(shard.clone().into()); - self.inner.set_data((self.data_builder)( - store.region_with_leader.region.clone(), - shard, - )); + self.inner.context = Some(store.region_with_leader.context()?); + self.inner.ranges = shard.clone(); + self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard); Ok(()) } } @@ -419,9 +420,9 @@ impl input? .into_iter() .map(|shard_resp| { - shard_resp.map(|ResponseWithShard(mut resp, ranges)| { + shard_resp.map(|ResponseWithShard(resp, ranges)| { ( - resp.take_data(), + resp.data, ranges .into_iter() .map(|range| range.start_key.into()..range.end_key.into()) @@ -437,7 +438,7 @@ macro_rules! impl_raw_rpc_request { ($name: ident) => { impl RawRpcRequest for kvrpcpb::$name { fn set_cf(&mut self, cf: String) { - self.set_cf(cf); + self.cf = cf; } } }; diff --git a/src/region.rs b/src/region.rs index b15086b9..e5bd7762 100644 --- a/src/region.rs +++ b/src/region.rs @@ -34,32 +34,32 @@ impl Eq for RegionWithLeader {} impl RegionWithLeader { pub fn contains(&self, key: &Key) -> bool { let key: &[u8] = key.into(); - let start_key = self.region.get_start_key(); - let end_key = self.region.get_end_key(); - key >= start_key && (key < end_key || end_key.is_empty()) + let start_key = &self.region.start_key; + let end_key = &self.region.end_key; + key >= start_key.as_slice() && (key < end_key.as_slice() || end_key.is_empty()) } pub fn context(&self) -> Result { self.leader .as_ref() - .ok_or_else(|| Error::LeaderNotFound { - region_id: self.region.get_id(), + .ok_or(Error::LeaderNotFound { + region_id: self.region.id, }) .map(|l| { let mut ctx = kvrpcpb::Context::default(); - ctx.set_region_id(self.region.get_id()); - ctx.set_region_epoch(Clone::clone(self.region.get_region_epoch())); - ctx.set_peer(Clone::clone(l)); + ctx.region_id = self.region.id; + ctx.region_epoch = self.region.region_epoch.clone(); + ctx.peer = Some(l.clone()); ctx }) } pub fn start_key(&self) -> Key { - self.region.get_start_key().to_vec().into() + self.region.start_key.to_vec().into() } pub fn end_key(&self) -> Key { - self.region.get_end_key().to_vec().into() + self.region.end_key.to_vec().into() } pub fn range(&self) -> (Key, Key) { @@ -68,16 +68,16 @@ impl RegionWithLeader { pub fn ver_id(&self) -> RegionVerId { let region = &self.region; - let epoch = region.get_region_epoch(); + let epoch = region.region_epoch.as_ref().unwrap(); RegionVerId { - id: region.get_id(), - conf_ver: epoch.get_conf_ver(), - ver: epoch.get_version(), + id: region.id, + conf_ver: epoch.conf_ver, + ver: epoch.version, } } pub fn id(&self) -> RegionId { - self.region.get_id() + self.region.id } pub fn get_store_id(&self) -> Result { @@ -87,6 +87,6 @@ impl RegionWithLeader { .ok_or_else(|| Error::LeaderNotFound { region_id: self.id(), }) - .map(|s| s.get_store_id()) + .map(|s| s.store_id) } } diff --git a/src/region_cache.rs b/src/region_cache.rs index 0371ac5b..99c941da 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -244,7 +244,7 @@ mod test { }, }; use tikv_client_common::Error; - use tikv_client_proto::metapb; + use tikv_client_proto::metapb::{self, RegionEpoch}; use tokio::sync::Mutex; #[derive(Default)] @@ -316,6 +316,10 @@ mod test { id: 1, start_key: vec![], end_key: vec![100], + region_epoch: Some(RegionEpoch { + conf_ver: 0, + version: 0, + }), ..Default::default() }, leader: Some(metapb::Peer { @@ -331,6 +335,10 @@ mod test { id: 2, start_key: vec![101], end_key: vec![], + region_epoch: Some(RegionEpoch { + conf_ver: 0, + version: 0, + }), ..Default::default() }, leader: Some(metapb::Peer { @@ -484,9 +492,13 @@ mod test { fn region(id: RegionId, start_key: Vec, end_key: Vec) -> RegionWithLeader { let mut region = RegionWithLeader::default(); - region.region.set_id(id); - region.region.set_start_key(start_key); - region.region.set_end_key(end_key); + region.region.id = id; + region.region.start_key = start_key; + region.region.end_key = end_key; + region.region.region_epoch = Some(RegionEpoch { + conf_ver: 0, + version: 0, + }); // We don't care about other fields here region diff --git a/src/request/mod.rs b/src/request/mod.rs index d141d70e..bd1d49cc 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -70,14 +70,15 @@ mod test { transaction::lowering::new_commit_request, Error, Key, Result, }; - use grpcio::CallOption; use std::{ any::Any, iter, sync::{atomic::AtomicUsize, Arc}, + time::Duration, }; - use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient}; + use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::tikv_client::TikvClient}; use tikv_client_store::HasRegionError; + use tonic::transport::Channel; #[tokio::test] async fn test_region_retry() { @@ -105,7 +106,7 @@ mod test { #[async_trait] impl Request for MockKvRequest { - async fn dispatch(&self, _: &TikvClient, _: CallOption) -> Result> { + async fn dispatch(&self, _: &TikvClient, _: Duration) -> Result> { Ok(Box::new(MockRpcResponse {})) } @@ -181,7 +182,7 @@ mod test { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { Ok(Box::new(kvrpcpb::CommitResponse { - error: Some(kvrpcpb::KeyError::default()).into(), + error: Some(kvrpcpb::KeyError::default()), ..Default::default() }) as Box) }, diff --git a/src/request/plan.rs b/src/request/plan.rs index 3760ab39..55f06f4c 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use futures::{future::try_join_all, prelude::*}; use tikv_client_proto::{errorpb, errorpb::EpochNotMatch, kvrpcpb}; use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors, KvClient}; -use tokio::sync::Semaphore; +use tokio::{sync::Semaphore, time::sleep}; use crate::{ backoff::Backoff, @@ -144,7 +144,7 @@ where Self::handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { - futures_timer::Delay::new(duration).await; + sleep(duration).await; } Self::single_plan_handler( pd_client, @@ -168,18 +168,14 @@ where // 3. Err(Error): can't be resolved, return the error to upper level async fn handle_region_error( pd_client: Arc, - mut e: errorpb::Error, + e: errorpb::Error, region_store: RegionStore, ) -> Result { let ver_id = region_store.region_with_leader.ver_id(); - if e.has_not_leader() { - let not_leader = e.get_not_leader(); - if not_leader.has_leader() { + if let Some(not_leader) = e.not_leader { + if let Some(leader) = not_leader.leader { match pd_client - .update_leader( - region_store.region_with_leader.ver_id(), - not_leader.get_leader().clone(), - ) + .update_leader(region_store.region_with_leader.ver_id(), leader) .await { Ok(_) => Ok(true), @@ -196,22 +192,22 @@ where pd_client.invalidate_region_cache(ver_id).await; Ok(false) } - } else if e.has_store_not_match() { + } else if e.store_not_match.is_some() { pd_client.invalidate_region_cache(ver_id).await; Ok(false) - } else if e.has_epoch_not_match() { + } else if e.epoch_not_match.is_some() { Self::on_region_epoch_not_match( pd_client.clone(), region_store, - e.take_epoch_not_match(), + e.epoch_not_match.unwrap(), ) .await - } else if e.has_stale_command() || e.has_region_not_found() { + } else if e.stale_command.is_some() || e.region_not_found.is_some() { pd_client.invalidate_region_cache(ver_id).await; Ok(false) - } else if e.has_server_is_busy() - || e.has_raft_entry_too_large() - || e.has_max_timestamp_not_synced() + } else if e.server_is_busy.is_some() + || e.raft_entry_too_large.is_some() + || e.max_timestamp_not_synced.is_some() { Err(Error::RegionError(Box::new(e))) } else { @@ -232,25 +228,24 @@ where error: EpochNotMatch, ) -> Result { let ver_id = region_store.region_with_leader.ver_id(); - if error.get_current_regions().is_empty() { + if error.current_regions.is_empty() { pd_client.invalidate_region_cache(ver_id).await; return Ok(true); } - for r in error.get_current_regions() { - if r.get_id() == region_store.region_with_leader.id() { - let returned_conf_ver = r.get_region_epoch().get_conf_ver(); - let returned_version = r.get_region_epoch().get_version(); - let current_conf_ver = region_store - .region_with_leader - .region - .get_region_epoch() - .get_conf_ver(); - let current_version = region_store + for r in error.current_regions { + if r.id == region_store.region_with_leader.id() { + let region_epoch = r.region_epoch.unwrap(); + let returned_conf_ver = region_epoch.conf_ver; + let returned_version = region_epoch.version; + let current_region_epoch = region_store .region_with_leader .region - .get_region_epoch() - .get_version(); + .region_epoch + .clone() + .unwrap(); + let current_conf_ver = current_region_epoch.conf_ver; + let current_version = current_region_epoch.version; // Find whether the current region is ahead of TiKV's. If so, backoff. if returned_conf_ver < current_conf_ver || returned_version < current_version { @@ -433,7 +428,7 @@ where match clone.backoff.next_delay_duration() { None => return Err(Error::ResolveLockError), Some(delay_duration) => { - futures_timer::Delay::new(delay_duration).await; + sleep(delay_duration).await; result = clone.inner.execute().await?; } } @@ -538,8 +533,7 @@ where } else if let Some(e) = scan_lock_resp.region_error() { info!( self.logger, - "CleanupLocks::execute, inner region error:{}", - e.get_message() + "CleanupLocks::execute, inner region error:{}", e.message ); result.region_error = Some(e); return Ok(result); @@ -565,7 +559,7 @@ where if self.options.async_commit_only { locks = locks .into_iter() - .filter(|l| l.get_use_async_commit()) + .filter(|l| l.use_async_commit) .collect::>(); } debug!( diff --git a/src/request/shard.rs b/src/request/shard.rs index 16c2e7fd..9a225217 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -158,9 +158,9 @@ macro_rules! shardable_key { mut shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); - self.set_key(shard.pop().unwrap()); + self.key = shard.pop().unwrap(); Ok(()) } } @@ -190,8 +190,8 @@ macro_rules! shardable_keys { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_keys(shard.into_iter().map(Into::into).collect()); + self.context = Some(store.region_with_leader.context()?); + self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } } @@ -218,10 +218,10 @@ macro_rules! shardable_range { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); - self.set_start_key(shard.0.into()); - self.set_end_key(shard.1.into()); + self.start_key = shard.0.into(); + self.end_key = shard.1.into(); Ok(()) } } diff --git a/src/store.rs b/src/store.rs index 7e6eabdc..ce717003 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{pd::PdClient, region::RegionWithLeader, BoundRange, Key, Result}; +use async_trait::async_trait; use derive_new::new; use futures::{prelude::*, stream::BoxStream}; use std::{ @@ -16,10 +17,15 @@ pub struct RegionStore { pub client: Arc, } +#[async_trait] pub trait KvConnectStore: KvConnect { - fn connect_to_store(&self, region: RegionWithLeader, address: String) -> Result { + async fn connect_to_store( + &self, + region: RegionWithLeader, + address: String, + ) -> Result { log::info!("connect to tikv endpoint: {:?}", &address); - let client = self.connect(address.as_str())?; + let client = self.connect(address.as_str()).await?; Ok(RegionStore::new(region, Arc::new(client))) } } diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 8b19a5ba..611f5630 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -343,18 +343,18 @@ impl BufferEntry { match self { BufferEntry::Cached(_) => return None, BufferEntry::Put(v) => { - pb.set_op(kvrpcpb::Op::Put); - pb.set_value(v.clone()); + pb.op = kvrpcpb::Op::Put.into(); + pb.value = v.clone(); } - BufferEntry::Del => pb.set_op(kvrpcpb::Op::Del), - BufferEntry::Locked(_) => pb.set_op(kvrpcpb::Op::Lock), + BufferEntry::Del => pb.op = kvrpcpb::Op::Del.into(), + BufferEntry::Locked(_) => pb.op = kvrpcpb::Op::Lock.into(), BufferEntry::Insert(v) => { - pb.set_op(kvrpcpb::Op::Insert); - pb.set_value(v.clone()); + pb.op = kvrpcpb::Op::Insert.into(); + pb.value = v.clone(); } - BufferEntry::CheckNotExist => pb.set_op(kvrpcpb::Op::CheckNotExists), + BufferEntry::CheckNotExist => pb.op = kvrpcpb::Op::CheckNotExists.into(), }; - pb.set_key(key.clone().into()); + pb.key = key.clone().into(); Some(pb) } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index e5949cd4..cb287223 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -221,7 +221,7 @@ impl LockResolver { let mut txn_infos = HashMap::new(); for l in locks { - let txn_id = l.get_lock_version(); + let txn_id = l.lock_version; if txn_infos.contains_key(&txn_id) || self.ctx.is_region_cleaned(txn_id, ®ion).await { continue; @@ -232,12 +232,12 @@ impl LockResolver { .check_txn_status( pd_client.clone(), txn_id, - l.get_primary_lock().to_vec(), + l.primary_lock.clone(), 0, u64::MAX, true, false, - l.get_lock_type() == kvrpcpb::Op::PessimisticLock, + l.lock_type == kvrpcpb::Op::PessimisticLock as i32, ) .await?; @@ -245,11 +245,7 @@ impl LockResolver { // Then we need to check the secondary locks to determine the final status of the transaction. if let TransactionStatusKind::Locked(_, lock_info) = &status.kind { let secondary_status = self - .check_all_secondaries( - pd_client.clone(), - lock_info.get_secondaries().to_vec(), - txn_id, - ) + .check_all_secondaries(pd_client.clone(), lock_info.secondaries.clone(), txn_id) .await?; slog_debug!( self.logger, @@ -270,12 +266,12 @@ impl LockResolver { .check_txn_status( pd_client.clone(), txn_id, - l.get_primary_lock().to_vec(), + l.primary_lock, 0, u64::MAX, true, true, - l.get_lock_type() == kvrpcpb::Op::PessimisticLock, + l.lock_type == kvrpcpb::Op::PessimisticLock as i32, ) .await?; } else { @@ -314,8 +310,8 @@ impl LockResolver { for (txn_id, commit_ts) in txn_infos.into_iter() { txn_ids.push(txn_id); let mut txn_info = TxnInfo::default(); - txn_info.set_txn(txn_id); - txn_info.set_status(commit_ts); + txn_info.txn = txn_id; + txn_info.status = commit_ts; txn_info_vec.push(txn_info); } let cleaned_region = self @@ -435,7 +431,7 @@ mod tests { |_: &dyn Any| { fail::fail_point!("region-error", |_| { let resp = kvrpcpb::ResolveLockResponse { - region_error: Some(errorpb::Error::default()).into(), + region_error: Some(errorpb::Error::default()), ..Default::default() }; Ok(Box::new(resp) as Box) diff --git a/src/transaction/lowering.rs b/src/transaction/lowering.rs index e20477c7..6c942a15 100644 --- a/src/transaction/lowering.rs +++ b/src/transaction/lowering.rs @@ -146,9 +146,9 @@ pub fn new_pessimistic_lock_request( locks .map(|pl| { let mut mutation = kvrpcpb::Mutation::default(); - mutation.set_op(kvrpcpb::Op::PessimisticLock); - mutation.set_assertion(pl.assertion()); - mutation.set_key(pl.key().into()); + mutation.op = kvrpcpb::Op::PessimisticLock.into(); + mutation.assertion = pl.assertion().into(); + mutation.key = pl.key().into(); mutation }) .collect(), diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index d5059f5f..b44cffe0 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -21,7 +21,7 @@ use futures::{ use std::{cmp, iter, sync::Arc}; use tikv_client_common::Error::PessimisticLockError; use tikv_client_proto::{ - kvrpcpb::{self, LockInfo, TxnHeartBeatResponse, TxnInfo}, + kvrpcpb::{self, Action, LockInfo, TxnHeartBeatResponse, TxnInfo}, pdpb::Timestamp, }; @@ -70,8 +70,8 @@ macro_rules! error_locks { pub fn new_get_request(key: Vec, timestamp: u64) -> kvrpcpb::GetRequest { let mut req = kvrpcpb::GetRequest::default(); - req.set_key(key); - req.set_version(timestamp); + req.key = key; + req.version = timestamp; req } @@ -91,19 +91,19 @@ impl Process for DefaultProcessor { type Out = Option; fn process(&self, input: Result) -> Result { - let mut input = input?; + let input = input?; Ok(if input.not_found { None } else { - Some(input.take_value()) + Some(input.value) }) } } pub fn new_batch_get_request(keys: Vec>, timestamp: u64) -> kvrpcpb::BatchGetRequest { let mut req = kvrpcpb::BatchGetRequest::default(); - req.set_keys(keys.into()); - req.set_version(timestamp); + req.keys = keys; + req.version = timestamp; req } @@ -119,7 +119,7 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .flat_map_ok(|mut resp| resp.take_pairs().into_iter().map(Into::into)) + .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into)) .collect() } } @@ -134,16 +134,16 @@ pub fn new_scan_request( ) -> kvrpcpb::ScanRequest { let mut req = kvrpcpb::ScanRequest::default(); if !reverse { - req.set_start_key(start_key); - req.set_end_key(end_key); + req.start_key = start_key; + req.end_key = end_key; } else { - req.set_start_key(end_key); - req.set_end_key(start_key); + req.start_key = end_key; + req.end_key = start_key; } - req.set_limit(limit); - req.set_key_only(key_only); - req.set_version(timestamp); - req.set_reverse(reverse); + req.limit = limit; + req.key_only = key_only; + req.version = timestamp; + req.reverse = reverse; req } @@ -159,7 +159,7 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .flat_map_ok(|mut resp| resp.take_pairs().into_iter().map(Into::into)) + .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into)) .collect() } } @@ -169,15 +169,15 @@ pub fn new_resolve_lock_request( commit_version: u64, ) -> kvrpcpb::ResolveLockRequest { let mut req = kvrpcpb::ResolveLockRequest::default(); - req.set_start_version(start_version); - req.set_commit_version(commit_version); + req.start_version = start_version; + req.commit_version = commit_version; req } pub fn new_batch_resolve_lock_request(txn_infos: Vec) -> kvrpcpb::ResolveLockRequest { let mut req = kvrpcpb::ResolveLockRequest::default(); - req.set_txn_infos(txn_infos.into()); + req.txn_infos = txn_infos; req } @@ -191,8 +191,8 @@ impl KvRequest for kvrpcpb::ResolveLockRequest { pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest { let mut req = kvrpcpb::CleanupRequest::default(); - req.set_key(key); - req.set_start_version(start_version); + req.key = key; + req.start_version = start_version; req } @@ -224,12 +224,12 @@ pub fn new_prewrite_request( lock_ttl: u64, ) -> kvrpcpb::PrewriteRequest { let mut req = kvrpcpb::PrewriteRequest::default(); - req.set_mutations(mutations.into()); - req.set_primary_lock(primary_lock); - req.set_start_version(start_version); - req.set_lock_ttl(lock_ttl); + req.mutations = mutations; + req.primary_lock = primary_lock; + req.start_version = start_version; + req.lock_ttl = lock_ttl; // FIXME: Lite resolve lock is currently disabled - req.set_txn_size(std::u64::MAX); + req.txn_size = std::u64::MAX; req } @@ -243,8 +243,8 @@ pub fn new_pessimistic_prewrite_request( ) -> kvrpcpb::PrewriteRequest { let len = mutations.len(); let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl); - req.set_for_update_ts(for_update_ts); - req.set_is_pessimistic_lock(iter::repeat(true).take(len).collect()); + req.for_update_ts = for_update_ts; + req.is_pessimistic_lock = iter::repeat(true).take(len).collect(); req } @@ -276,19 +276,19 @@ impl Shardable for kvrpcpb::PrewriteRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { - self.set_secondaries(vec![].into()); + self.secondaries = vec![]; } // Only if there is only one request to send if self.try_one_pc && shard.len() != self.secondaries.len() + 1 { - self.set_try_one_pc(false); + self.try_one_pc = false; } - self.set_mutations(shard.into()); + self.mutations = shard; Ok(()) } } @@ -297,8 +297,8 @@ impl Batchable for kvrpcpb::PrewriteRequest { type Item = kvrpcpb::Mutation; fn item_size(item: &Self::Item) -> u64 { - let mut size = item.get_key().len() as u64; - size += item.get_value().len() as u64; + let mut size = item.key.len() as u64; + size += item.value.len() as u64; size } } @@ -309,9 +309,9 @@ pub fn new_commit_request( commit_version: u64, ) -> kvrpcpb::CommitRequest { let mut req = kvrpcpb::CommitRequest::default(); - req.set_keys(keys.into()); - req.set_start_version(start_version); - req.set_commit_version(commit_version); + req.keys = keys; + req.start_version = start_version; + req.commit_version = commit_version; req } @@ -343,8 +343,8 @@ impl Shardable for kvrpcpb::CommitRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_keys(shard.into_iter().map(Into::into).collect()); + self.context = Some(store.region_with_leader.context()?); + self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } } @@ -362,8 +362,8 @@ pub fn new_batch_rollback_request( start_version: u64, ) -> kvrpcpb::BatchRollbackRequest { let mut req = kvrpcpb::BatchRollbackRequest::default(); - req.set_keys(keys.into()); - req.set_start_version(start_version); + req.keys = keys; + req.start_version = start_version; req } @@ -380,9 +380,9 @@ pub fn new_pessimistic_rollback_request( for_update_ts: u64, ) -> kvrpcpb::PessimisticRollbackRequest { let mut req = kvrpcpb::PessimisticRollbackRequest::default(); - req.set_keys(keys.into()); - req.set_start_version(start_version); - req.set_for_update_ts(for_update_ts); + req.keys = keys; + req.start_version = start_version; + req.for_update_ts = for_update_ts; req } @@ -402,18 +402,17 @@ pub fn new_pessimistic_lock_request( need_value: bool, ) -> kvrpcpb::PessimisticLockRequest { let mut req = kvrpcpb::PessimisticLockRequest::default(); - req.set_mutations(mutations.into()); - req.set_primary_lock(primary_lock); - req.set_start_version(start_version); - req.set_lock_ttl(lock_ttl); - req.set_for_update_ts(for_update_ts); + req.mutations = mutations; + req.primary_lock = primary_lock; + req.start_version = start_version; + req.lock_ttl = lock_ttl; + req.for_update_ts = for_update_ts; // FIXME: make them configurable - req.set_is_first_lock(false); - req.set_wait_timeout(0); - req.set_force(false); - req.set_return_values(need_value); + req.is_first_lock = false; + req.wait_timeout = 0; + req.return_values = need_value; // FIXME: support large transaction - req.set_min_commit_ts(0); + req.min_commit_ts = 0; req } @@ -435,8 +434,8 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_mutations(shard.into()); + self.context = Some(store.region_with_leader.context()?); + self.mutations = shard; Ok(()) } } @@ -473,10 +472,10 @@ impl Merge> = resp.take_values().into(); + .flat_map(|ResponseWithShard(resp, mutations)| { + let values: Vec> = resp.values; let values_len = values.len(); - let not_founds = resp.take_not_founds(); + let not_founds = resp.not_founds; let kvpairs = mutations .into_iter() .map(|m| m.key) @@ -511,10 +510,10 @@ pub fn new_scan_lock_request( limit: u32, ) -> kvrpcpb::ScanLockRequest { let mut req = kvrpcpb::ScanLockRequest::default(); - req.set_start_key(start_key); - req.set_end_key(end_key); - req.set_max_version(safepoint); - req.set_limit(limit); + req.start_key = start_key; + req.end_key = end_key; + req.max_version = safepoint; + req.limit = limit; req } @@ -536,17 +535,17 @@ impl Shardable for kvrpcpb::ScanLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); - self.set_start_key(shard.0); + self.context = Some(store.region_with_leader.context()?); + self.start_key = shard.0; Ok(()) } } impl HasNextBatch for kvrpcpb::ScanLockResponse { fn has_next_batch(&self) -> Option<(Vec, Vec)> { - self.get_locks().last().map(|lock| { + self.locks.last().map(|lock| { // TODO: if last key is larger or equal than ScanLockRequest.end_key, return None. - let mut start_key: Vec = lock.get_key().to_vec(); + let mut start_key: Vec = lock.key.clone(); start_key.push(0); (start_key, vec![]) }) @@ -555,7 +554,7 @@ impl HasNextBatch for kvrpcpb::ScanLockResponse { impl NextBatch for kvrpcpb::ScanLockRequest { fn next_batch(&mut self, range: (Vec, Vec)) { - self.set_start_key(range.0); + self.start_key = range.0; } } @@ -576,9 +575,9 @@ pub fn new_heart_beat_request( ttl: u64, ) -> kvrpcpb::TxnHeartBeatRequest { let mut req = kvrpcpb::TxnHeartBeatRequest::default(); - req.set_start_version(start_ts); - req.set_primary_lock(primary_lock); - req.set_advise_lock_ttl(ttl); + req.start_version = start_ts; + req.primary_lock = primary_lock; + req.advise_lock_ttl = ttl; req } @@ -597,7 +596,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); Ok(()) @@ -630,13 +629,13 @@ pub fn new_check_txn_status_request( resolving_pessimistic_lock: bool, ) -> kvrpcpb::CheckTxnStatusRequest { let mut req = kvrpcpb::CheckTxnStatusRequest::default(); - req.set_primary_key(primary_key); - req.set_lock_ts(lock_ts); - req.set_caller_start_ts(caller_start_ts); - req.set_current_ts(current_ts); - req.set_rollback_if_not_exist(rollback_if_not_exist); - req.set_force_sync_commit(force_sync_commit); - req.set_resolving_pessimistic_lock(resolving_pessimistic_lock); + req.primary_key = primary_key; + req.lock_ts = lock_ts; + req.caller_start_ts = caller_start_ts; + req.current_ts = current_ts; + req.rollback_if_not_exist = rollback_if_not_exist; + req.force_sync_commit = force_sync_commit; + req.resolving_pessimistic_lock = resolving_pessimistic_lock; req } @@ -655,9 +654,9 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); - self.set_primary_key(shard.pop().unwrap()); + self.primary_key = shard.pop().unwrap(); Ok(()) } } @@ -688,7 +687,7 @@ pub struct TransactionStatus { impl From for TransactionStatus { fn from(mut resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus { TransactionStatus { - action: resp.get_action(), + action: Action::from_i32(resp.action).unwrap(), kind: (resp.commit_version, resp.lock_ttl, resp.lock_info.take()).into(), is_expired: false, } @@ -752,8 +751,8 @@ pub fn new_check_secondary_locks_request( start_version: u64, ) -> kvrpcpb::CheckSecondaryLocksRequest { let mut req = kvrpcpb::CheckSecondaryLocksRequest::default(); - req.set_keys(keys.into()); - req.set_start_version(start_version); + req.keys = keys; + req.start_version = start_version; req } @@ -818,7 +817,7 @@ impl HasLocks for kvrpcpb::CleanupResponse {} impl HasLocks for kvrpcpb::ScanLockResponse { fn take_locks(&mut self) -> Vec { - self.take_locks().into() + std::mem::take(&mut self.locks) } } @@ -867,7 +866,7 @@ mod tests { let resp1 = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value1.to_vec()].into(), + values: vec![value1.to_vec()], ..Default::default() }, vec![kvrpcpb::Mutation { @@ -879,7 +878,7 @@ mod tests { let resp_empty_value = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value_empty.to_vec()].into(), + values: vec![value_empty.to_vec()], ..Default::default() }, vec![kvrpcpb::Mutation { @@ -891,7 +890,7 @@ mod tests { let resp_not_found = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value_empty.to_vec(), value4.to_vec()].into(), + values: vec![value_empty.to_vec(), value4.to_vec()], not_founds: vec![true, false], ..Default::default() }, diff --git a/tests/mock_tikv_tests.rs b/tests/mock_tikv_tests.rs deleted file mode 100644 index 85cc64c6..00000000 --- a/tests/mock_tikv_tests.rs +++ /dev/null @@ -1,84 +0,0 @@ -#[cfg(test)] -mod test { - use grpcio::redirect_log; - use log::debug; - use mock_tikv::{start_mock_pd_server, start_mock_tikv_server, MOCK_PD_PORT}; - use simple_logger::SimpleLogger; - use tikv_client::{KvPair, RawClient}; - - #[tokio::test] - #[ignore] - async fn test_raw_put_get() { - SimpleLogger::new().init().unwrap(); - redirect_log(); - - let mut tikv_server = start_mock_tikv_server(); - let _pd_server = start_mock_pd_server(); - - let client = RawClient::new(vec![format!("localhost:{MOCK_PD_PORT}")], None) - .await - .unwrap(); - - // empty; get non-existent key - let res = client.get("k1".to_owned()).await; - assert_eq!(res.unwrap(), None); - - // empty; put then batch_get - client.put("k1".to_owned(), "v1".to_owned()).await.unwrap(); - client.put("k2".to_owned(), "v2".to_owned()).await.unwrap(); - - let res = client - .batch_get(vec!["k1".to_owned(), "k2".to_owned(), "k3".to_owned()]) - .await - .unwrap(); - assert_eq!(res.len(), 2); - assert_eq!(res[0].1, "v1".as_bytes()); - assert_eq!(res[1].1, "v2".as_bytes()); - - // k1,k2; batch_put then batch_get - client - .batch_put(vec![ - KvPair::new("k3".to_owned(), "v3".to_owned()), - KvPair::new("k4".to_owned(), "v4".to_owned()), - ]) - .await - .unwrap(); - - let res = client - .batch_get(vec!["k4".to_owned(), "k3".to_owned()]) - .await - .unwrap(); - assert_eq!(res[0].1, "v3".as_bytes()); - assert_eq!(res[1].1, "v4".as_bytes()); - - // k1,k2,k3,k4; delete then get - let res = client.delete("k3".to_owned()).await; - assert!(res.is_ok()); - - let res = client.get("k3".to_owned()).await; - assert_eq!(res.unwrap(), None); - - // k1,k2,k4; batch_delete then batch_get - let res = client - .batch_delete(vec!["k1".to_owned(), "k2".to_owned(), "k4".to_owned()]) - .await; - assert!(res.is_ok()); - - let res = client - .batch_get(vec![ - "k1".to_owned(), - "k2".to_owned(), - "k3".to_owned(), - "k4".to_owned(), - ]) - .await - .unwrap(); - assert_eq!(res.len(), 0); - - debug!("Pass all tests"); - - let _ = tikv_server.shutdown().await; - // FIXME: shutdown PD server - // let _ = pd_server.shutdown().await; - } -} diff --git a/tikv-client-common/Cargo.toml b/tikv-client-common/Cargo.toml index 2d1fb8b2..9210ddb1 100644 --- a/tikv-client-common/Cargo.toml +++ b/tikv-client-common/Cargo.toml @@ -1,31 +1,30 @@ [package] name = "tikv-client-common" version = "0.2.0" -edition = "2018" +edition = "2021" license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Common components of the TiKV Rust client" -[features] -protobuf-codec = ["grpcio/protobuf-codec"] -prost-codec = ["grpcio/prost-codec"] - [dependencies] thiserror = "1" -futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", default-features = false } +futures = { version = "0.3", features = [ + "compat", + "async-await", + "thread-pool", +] } lazy_static = "1" log = "0.4" regex = "1" semver = "1" tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" } tokio = "1" +tonic = { version = "0.9", features = ["tls"] } [dev-dependencies] clap = "2" -fail = { version = "0.4", features = [ "failpoints" ] } +fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" tempfile = "3" -tokio = "1" diff --git a/tikv-client-common/src/errors.rs b/tikv-client-common/src/errors.rs index 3becd529..2e44c41a 100644 --- a/tikv-client-common/src/errors.rs +++ b/tikv-client-common/src/errors.rs @@ -40,9 +40,18 @@ pub enum Error { /// Wraps a `std::io::Error`. #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Wraps a `std::io::Error`. + #[error("tokio channel error: {0}")] + Channel(#[from] tokio::sync::oneshot::error::RecvError), /// Wraps a `grpcio::Error`. #[error("gRPC error: {0}")] - Grpc(#[from] grpcio::Error), + Grpc(#[from] tonic::transport::Error), + /// Wraps a `grpcio::Error`. + #[error("gRPC api error: {0}")] + GrpcAPI(#[from] tonic::Status), + /// Wraps a `grpcio::Error`. + #[error("url error: {0}")] + Url(#[from] tonic::codegen::http::uri::InvalidUri), /// Represents that a futures oneshot channel was cancelled. #[error("A futures oneshot channel was canceled. {0}")] Canceled(#[from] futures::channel::oneshot::Canceled), diff --git a/tikv-client-common/src/security.rs b/tikv-client-common/src/security.rs index a89b2fca..d030314f 100644 --- a/tikv-client-common/src/security.rs +++ b/tikv-client-common/src/security.rs @@ -1,15 +1,15 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. use crate::Result; -use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment}; +// use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment}; use regex::Regex; use std::{ fs::File, io::Read, path::{Path, PathBuf}, - sync::Arc, time::Duration, }; +use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; lazy_static::lazy_static! { static ref SCHEME_REG: Regex = Regex::new(r"^\s*(https?://)").unwrap(); @@ -63,9 +63,9 @@ impl SecurityManager { } /// Connect to gRPC server using TLS connection. If TLS is not configured, use normal connection. - pub fn connect( + pub async fn connect( &self, - env: Arc, + // env: Arc, addr: &str, factory: Factory, ) -> Result @@ -74,24 +74,27 @@ impl SecurityManager { { info!("connect to rpc server at endpoint: {:?}", addr); - let addr = SCHEME_REG.replace(addr, ""); + let addr = SCHEME_REG.replace(addr, "").into_owned(); - let cb = ChannelBuilder::new(env) - .keepalive_time(Duration::from_secs(10)) - .keepalive_timeout(Duration::from_secs(3)) - .use_local_subchannel_pool(true); - - let channel = if self.ca.is_empty() { - cb.connect(&addr) + let tls = if self.ca.is_empty() { + ClientTlsConfig::default() } else { - let cred = ChannelCredentialsBuilder::new() - .root_cert(self.ca.clone()) - .cert(self.cert.clone(), load_pem_file("private key", &self.key)?) - .build(); - cb.secure_connect(&addr, cred) + ClientTlsConfig::new() + .ca_certificate(Certificate::from_pem(&self.ca)) + .identity(Identity::from_pem( + &self.cert, + load_pem_file("private key", &self.key)?, + )) }; - Ok(factory(channel)) + let ch = Channel::from_shared(addr)? + .tcp_keepalive(Some(Duration::from_secs(10))) + .keep_alive_timeout(Duration::from_secs(3)) + .tls_config(tls)? + .connect() + .await?; + + Ok(factory(ch)) } } diff --git a/tikv-client-pd/Cargo.toml b/tikv-client-pd/Cargo.toml index b263167a..c1654750 100644 --- a/tikv-client-pd/Cargo.toml +++ b/tikv-client-pd/Cargo.toml @@ -1,26 +1,26 @@ [package] name = "tikv-client-pd" version = "0.2.0" -edition = "2018" +edition = "2021" license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Low level PD components for the TiKV Rust client" -[features] -protobuf-codec = ["grpcio/protobuf-codec"] -prost-codec = ["grpcio/prost-codec"] - [dependencies] async-trait = "0.1" -futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", default-features = false } +futures = "0.3" +# futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } log = "0.4" tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" } tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" } +tonic = "0.9" +pin-project = "1" +tokio = { version = "1", features = ["sync"] } + [dev-dependencies] clap = "2" -fail = { version = "0.4", features = [ "failpoints" ] } +fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" diff --git a/tikv-client-pd/src/cluster.rs b/tikv-client-pd/src/cluster.rs index 063532e9..048d35f5 100644 --- a/tikv-client-pd/src/cluster.rs +++ b/tikv-client-pd/src/cluster.rs @@ -1,8 +1,7 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{timestamp::TimestampOracle, Error, Result, SecurityManager}; +use crate::{timestamp::TimestampOracle, Result, SecurityManager}; use async_trait::async_trait; -use grpcio::{CallOption, Environment}; use std::{ collections::HashSet, sync::Arc, @@ -10,11 +9,12 @@ use std::{ }; use tikv_client_common::internal_err; use tikv_client_proto::pdpb::{self, Timestamp}; +use tonic::{transport::Channel, IntoRequest, Request}; /// A PD cluster. pub struct Cluster { id: u64, - client: pdpb::PdClient, + client: pdpb::pd_client::PdClient, members: pdpb::GetMembersResponse, tso: TimestampOracle, } @@ -23,8 +23,8 @@ macro_rules! pd_request { ($cluster_id:expr, $type:ty) => {{ let mut request = <$type>::default(); let mut header = ::tikv_client_proto::pdpb::RequestHeader::default(); - header.set_cluster_id($cluster_id); - request.set_header(header); + header.cluster_id = $cluster_id; + request.header = Some(header); request }}; } @@ -32,34 +32,41 @@ macro_rules! pd_request { // These methods make a single attempt to make a request. impl Cluster { pub async fn get_region( - &self, + &mut self, key: Vec, timeout: Duration, ) -> Result { let mut req = pd_request!(self.id, pdpb::GetRegionRequest); - req.set_region_key(key.clone()); - req.send(&self.client, timeout).await + req.region_key = key.clone(); + req.send(&mut self.client, timeout).await } pub async fn get_region_by_id( - &self, + &mut self, id: u64, timeout: Duration, ) -> Result { let mut req = pd_request!(self.id, pdpb::GetRegionByIdRequest); - req.set_region_id(id); - req.send(&self.client, timeout).await + req.region_id = id; + req.send(&mut self.client, timeout).await } - pub async fn get_store(&self, id: u64, timeout: Duration) -> Result { + pub async fn get_store( + &mut self, + id: u64, + timeout: Duration, + ) -> Result { let mut req = pd_request!(self.id, pdpb::GetStoreRequest); - req.set_store_id(id); - req.send(&self.client, timeout).await + req.store_id = id; + req.send(&mut self.client, timeout).await } - pub async fn get_all_stores(&self, timeout: Duration) -> Result { + pub async fn get_all_stores( + &mut self, + timeout: Duration, + ) -> Result { let req = pd_request!(self.id, pdpb::GetAllStoresRequest); - req.send(&self.client, timeout).await + req.send(&mut self.client, timeout).await } pub async fn get_timestamp(&self) -> Result { @@ -67,25 +74,24 @@ impl Cluster { } pub async fn update_safepoint( - &self, + &mut self, safepoint: u64, timeout: Duration, ) -> Result { let mut req = pd_request!(self.id, pdpb::UpdateGcSafePointRequest); - req.set_safe_point(safepoint); - req.send(&self.client, timeout).await + req.safe_point = safepoint; + req.send(&mut self.client, timeout).await } } /// An object for connecting and reconnecting to a PD cluster. pub struct Connection { - env: Arc, security_mgr: Arc, } impl Connection { - pub fn new(env: Arc, security_mgr: Arc) -> Connection { - Connection { env, security_mgr } + pub fn new(security_mgr: Arc) -> Connection { + Connection { security_mgr } } pub async fn connect_cluster( @@ -95,7 +101,7 @@ impl Connection { ) -> Result { let members = self.validate_endpoints(endpoints, timeout).await?; let (client, members) = self.try_connect_leader(&members, timeout).await?; - let id = members.get_header().get_cluster_id(); + let id = members.header.as_ref().unwrap().cluster_id; let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, @@ -147,7 +153,7 @@ impl Connection { }; // Check cluster ID. - let cid = resp.get_header().get_cluster_id(); + let cid = resp.header.as_ref().unwrap().cluster_id; if let Some(sample) = cluster_id { if sample != cid { return Err(internal_err!( @@ -178,16 +184,16 @@ impl Connection { async fn connect( &self, addr: &str, - timeout: Duration, - ) -> Result<(pdpb::PdClient, pdpb::GetMembersResponse)> { - let client = self + _timeout: Duration, + ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { + let mut client = self .security_mgr - .connect(self.env.clone(), addr, pdpb::PdClient::new)?; - let option = CallOption::default().timeout(timeout); - let resp = client - .get_members_async_opt(&pdpb::GetMembersRequest::default(), option) - .map_err(Error::from)? + .connect(addr, pdpb::pd_client::PdClient::::new) .await?; + let resp: pdpb::GetMembersResponse = client + .get_members(pdpb::GetMembersRequest::default()) + .await? + .into_inner(); Ok((client, resp)) } @@ -196,7 +202,7 @@ impl Connection { addr: &str, cluster_id: u64, timeout: Duration, - ) -> Result<(pdpb::PdClient, pdpb::GetMembersResponse)> { + ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { let (client, r) = self.connect(addr, timeout).await?; Connection::validate_cluster_id(addr, &r, cluster_id)?; Ok((client, r)) @@ -207,7 +213,7 @@ impl Connection { members: &pdpb::GetMembersResponse, cluster_id: u64, ) -> Result<()> { - let new_cluster_id = members.get_header().get_cluster_id(); + let new_cluster_id = members.header.as_ref().unwrap().cluster_id; if new_cluster_id != cluster_id { Err(internal_err!( "{} no longer belongs to cluster {}, it is in {}", @@ -224,10 +230,10 @@ impl Connection { &self, previous: &pdpb::GetMembersResponse, timeout: Duration, - ) -> Result<(pdpb::PdClient, pdpb::GetMembersResponse)> { - let previous_leader = previous.get_leader(); - let members = previous.get_members(); - let cluster_id = previous.get_header().get_cluster_id(); + ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { + let previous_leader = previous.leader.as_ref().unwrap(); + let members = &previous.members; + let cluster_id = previous.header.as_ref().unwrap().cluster_id; let mut resp = None; // Try to connect to other members, then the previous leader. @@ -236,7 +242,7 @@ impl Connection { .filter(|m| *m != previous_leader) .chain(Some(previous_leader)) { - for ep in m.get_client_urls() { + for ep in &m.client_urls { match self.try_connect(ep.as_str(), cluster_id, timeout).await { Ok((_, r)) => { resp = Some(r); @@ -252,8 +258,8 @@ impl Connection { // Then try to connect the PD cluster leader. if let Some(resp) = resp { - let leader = resp.get_leader(); - for ep in leader.get_client_urls() { + let leader = resp.leader.as_ref().unwrap(); + for ep in &leader.client_urls { let r = self.try_connect(ep.as_str(), cluster_id, timeout).await; if r.is_ok() { return r; @@ -265,20 +271,28 @@ impl Connection { } } -type GrpcResult = std::result::Result; +type GrpcResult = std::result::Result; #[async_trait] -trait PdMessage { +trait PdMessage: Sized { type Response: PdResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult; + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult; - async fn send(&self, client: &pdpb::PdClient, timeout: Duration) -> Result { - let option = CallOption::default().timeout(timeout); - let response = self.rpc(client, option).await?; + async fn send( + self, + client: &mut pdpb::pd_client::PdClient, + timeout: Duration, + ) -> Result { + let mut req = self.into_request(); + req.set_timeout(timeout); + let response = Self::rpc(req, client).await?; - if response.header().has_error() { - Err(internal_err!(response.header().get_error().get_message())) + if let Some(err) = &response.header().error { + Err(internal_err!(err.message)) } else { Ok(response) } @@ -289,8 +303,11 @@ trait PdMessage { impl PdMessage for pdpb::GetRegionRequest { type Response = pdpb::GetRegionResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { - client.get_region_async_opt(self, opt)?.await + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult { + Ok(client.get_region(req).await?.into_inner()) } } @@ -298,8 +315,11 @@ impl PdMessage for pdpb::GetRegionRequest { impl PdMessage for pdpb::GetRegionByIdRequest { type Response = pdpb::GetRegionResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { - client.get_region_by_id_async_opt(self, opt)?.await + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult { + Ok(client.get_region_by_id(req).await?.into_inner()) } } @@ -307,8 +327,11 @@ impl PdMessage for pdpb::GetRegionByIdRequest { impl PdMessage for pdpb::GetStoreRequest { type Response = pdpb::GetStoreResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { - client.get_store_async_opt(self, opt)?.await + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult { + Ok(client.get_store(req).await?.into_inner()) } } @@ -316,8 +339,11 @@ impl PdMessage for pdpb::GetStoreRequest { impl PdMessage for pdpb::GetAllStoresRequest { type Response = pdpb::GetAllStoresResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { - client.get_all_stores_async_opt(self, opt)?.await + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult { + Ok(client.get_all_stores(req).await?.into_inner()) } } @@ -325,8 +351,11 @@ impl PdMessage for pdpb::GetAllStoresRequest { impl PdMessage for pdpb::UpdateGcSafePointRequest { type Response = pdpb::UpdateGcSafePointResponse; - async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult { - client.update_gc_safe_point_async_opt(self, opt)?.await + async fn rpc( + req: Request, + client: &mut pdpb::pd_client::PdClient, + ) -> GrpcResult { + Ok(client.update_gc_safe_point(req).await?.into_inner()) } } @@ -336,24 +365,24 @@ trait PdResponse { impl PdResponse for pdpb::GetStoreResponse { fn header(&self) -> &pdpb::ResponseHeader { - self.get_header() + self.header.as_ref().unwrap() } } impl PdResponse for pdpb::GetRegionResponse { fn header(&self) -> &pdpb::ResponseHeader { - self.get_header() + self.header.as_ref().unwrap() } } impl PdResponse for pdpb::GetAllStoresResponse { fn header(&self) -> &pdpb::ResponseHeader { - self.get_header() + self.header.as_ref().unwrap() } } impl PdResponse for pdpb::UpdateGcSafePointResponse { fn header(&self) -> &pdpb::ResponseHeader { - self.get_header() + self.header.as_ref().unwrap() } } diff --git a/tikv-client-pd/src/timestamp.rs b/tikv-client-pd/src/timestamp.rs index 92968645..4770643c 100644 --- a/tikv-client-pd/src/timestamp.rs +++ b/tikv-client-pd/src/timestamp.rs @@ -11,19 +11,19 @@ //! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD //! server and allocates timestamps for the requests. -use crate::{Error, Result}; +use crate::Result; use futures::{ - channel::{mpsc, oneshot}, - executor::block_on, - join, pin_mut, + pin_mut, prelude::*, task::{AtomicWaker, Context, Poll}, }; -use grpcio::WriteFlags; use log::debug; -use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; +use pin_project::pin_project; +use std::{collections::VecDeque, pin::Pin, sync::Arc}; use tikv_client_common::internal_err; -use tikv_client_proto::pdpb::*; +use tikv_client_proto::pdpb::{pd_client::PdClient, *}; +use tokio::sync::{mpsc, oneshot, Mutex}; +use tonic::transport::Channel; /// It is an empirical value. const MAX_BATCH_SIZE: usize = 64; @@ -45,25 +45,17 @@ pub(crate) struct TimestampOracle { } impl TimestampOracle { - pub(crate) fn new(cluster_id: u64, pd_client: &PdClient) -> Result { + pub(crate) fn new(cluster_id: u64, pd_client: &PdClient) -> Result { + let pd_client = pd_client.clone(); let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE); - // FIXME: use tso_opt - let (rpc_sender, rpc_receiver) = pd_client.tso()?; // Start a background thread to handle TSO requests and responses - thread::spawn(move || { - block_on(run_tso( - cluster_id, - rpc_sender.sink_err_into(), - rpc_receiver.err_into(), - request_rx, - )) - }); + tokio::spawn(run_tso(cluster_id, pd_client, request_rx)); Ok(TimestampOracle { request_tx }) } - pub(crate) async fn get_timestamp(mut self) -> Result { + pub(crate) async fn get_timestamp(self) -> Result { debug!("getting current timestamp"); let (request, response) = oneshot::channel(); self.request_tx @@ -76,49 +68,41 @@ impl TimestampOracle { async fn run_tso( cluster_id: u64, - mut rpc_sender: impl Sink<(TsoRequest, WriteFlags), Error = Error> + Unpin, - mut rpc_receiver: impl Stream> + Unpin, + mut pd_client: PdClient, request_rx: mpsc::Receiver, -) { +) -> Result<()> { // The `TimestampRequest`s which are waiting for the responses from the PD server - let pending_requests = Rc::new(RefCell::new(VecDeque::with_capacity(MAX_PENDING_COUNT))); + let pending_requests = Arc::new(Mutex::new(VecDeque::with_capacity(MAX_PENDING_COUNT))); // When there are too many pending requests, the `send_request` future will refuse to fetch // more requests from the bounded channel. This waker is used to wake up the sending future // if the queue containing pending requests is no longer full. - let sending_future_waker = Rc::new(AtomicWaker::new()); + let sending_future_waker = Arc::new(AtomicWaker::new()); - pin_mut!(request_rx); - let mut request_stream = TsoRequestStream { + let request_stream = TsoRequestStream { cluster_id, request_rx, pending_requests: pending_requests.clone(), self_waker: sending_future_waker.clone(), - } - .map(Ok); - - let send_requests = rpc_sender.send_all(&mut request_stream); + }; - let receive_and_handle_responses = async move { - while let Some(Ok(resp)) = rpc_receiver.next().await { - let mut pending_requests = pending_requests.borrow_mut(); + // let send_requests = rpc_sender.send_all(&mut request_stream); + let mut responses = pd_client.tso(request_stream).await?.into_inner(); - // Wake up the sending future blocked by too many pending requests as we are consuming - // some of them here. - if pending_requests.len() == MAX_PENDING_COUNT { - sending_future_waker.wake(); - } + while let Some(Ok(resp)) = responses.next().await { + let mut pending_requests = pending_requests.lock().await; - allocate_timestamps(&resp, &mut pending_requests)?; + // Wake up the sending future blocked by too many pending requests as we are consuming + // some of them here. + if pending_requests.len() == MAX_PENDING_COUNT { + sending_future_waker.wake(); } - // TODO: distinguish between unexpected stream termination and expected end of test - info!("TSO stream terminated"); - Ok(()) - }; - let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses); - info!("TSO send termination: {:?}", send_res); - info!("TSO receive termination: {:?}", recv_res); + allocate_timestamps(&resp, &mut pending_requests)?; + } + // TODO: distinguish between unexpected stream termination and expected end of test + info!("TSO stream terminated"); + Ok(()) } struct RequestGroup { @@ -126,23 +110,33 @@ struct RequestGroup { requests: Vec, } -struct TsoRequestStream<'a> { +#[pin_project] +struct TsoRequestStream { cluster_id: u64, - request_rx: Pin<&'a mut mpsc::Receiver>>, - pending_requests: Rc>>, - self_waker: Rc, + #[pin] + request_rx: mpsc::Receiver>, + pending_requests: Arc>>, + self_waker: Arc, } -impl<'a> Stream for TsoRequestStream<'a> { - type Item = (TsoRequest, WriteFlags); +impl Stream for TsoRequestStream { + type Item = TsoRequest; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let pending_requests = self.pending_requests.clone(); - let mut pending_requests = pending_requests.borrow_mut(); + let pending_requests = this.pending_requests.lock(); + pin_mut!(pending_requests); + let mut pending_requests = if let Poll::Ready(pending_requests) = pending_requests.poll(cx) + { + pending_requests + } else { + return Poll::Pending; + }; let mut requests = Vec::new(); while requests.len() < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT { - match self.request_rx.as_mut().poll_next(cx) { + match this.request_rx.poll_recv(cx) { Poll::Ready(Some(sender)) => { requests.push(sender); } @@ -152,11 +146,14 @@ impl<'a> Stream for TsoRequestStream<'a> { } if !requests.is_empty() { - let mut req = TsoRequest::default(); - req.mut_header().set_cluster_id(self.cluster_id); - req.mut_header().set_sender_id(0); - req.set_count(requests.len() as u32); - req.set_dc_location(String::new()); + let req = TsoRequest { + header: Some(RequestHeader { + cluster_id: *this.cluster_id, + sender_id: 0, + }), + count: requests.len() as u32, + dc_location: String::new(), + }; let request_group = RequestGroup { tso_request: req.clone(), @@ -164,12 +161,11 @@ impl<'a> Stream for TsoRequestStream<'a> { }; pending_requests.push_back(request_group); - let write_flags = WriteFlags::default().buffer_hint(false); - Poll::Ready(Some((req, write_flags))) + Poll::Ready(Some(req)) } else { // Set the waker to the context, then the stream can be waked up after the pending queue // is no longer full. - self.self_waker.register(cx.waker()); + this.self_waker.register(cx.waker()); Poll::Pending } } @@ -201,10 +197,11 @@ fn allocate_timestamps( for request in requests { offset -= 1; - let mut ts = Timestamp::default(); - ts.set_physical(tail_ts.physical); - ts.set_logical(tail_ts.logical - offset as i64); - ts.set_suffix_bits(tail_ts.get_suffix_bits()); + let ts = Timestamp { + physical: tail_ts.physical, + logical: tail_ts.logical - offset as i64, + suffix_bits: tail_ts.suffix_bits, + }; let _ = request.send(ts); } } else { diff --git a/tikv-client-proto/Cargo.toml b/tikv-client-proto/Cargo.toml index 2a3474b1..c4ae3def 100644 --- a/tikv-client-proto/Cargo.toml +++ b/tikv-client-proto/Cargo.toml @@ -1,25 +1,19 @@ [package] name = "tikv-client-proto" version = "0.2.0" -edition = "2018" +edition = "2021" license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Protobuf specs for the TiKV Rust client" build = "build.rs" -[features] -protobuf-codec = ["protobuf-build/grpcio-protobuf-codec", "raft-proto/protobuf-codec", "grpcio/protobuf-codec"] -prost-codec = ["prost", "prost-derive", "protobuf-build/grpcio-prost-codec", "grpcio/prost-codec", "raft-proto/prost-codec"] - [build-dependencies] -protobuf-build = { version = "0.13", default-features = false } +tonic-build = "0.9" +glob = "0.3.1" [dependencies] -protobuf = "=2.8.0" -prost = { version = "0.9", optional = true } -prost-derive = { version = "0.9", optional = true } futures = "0.3" -grpcio = { version = "0.10", default-features = false } lazy_static = { version = "1" } -raft-proto = { version = "0.7.0", default-features = false } +tonic = "0.9" +prost = "0.11" diff --git a/tikv-client-proto/build.rs b/tikv-client-proto/build.rs index 808e1cab..9862aa29 100644 --- a/tikv-client-proto/build.rs +++ b/tikv-client-proto/build.rs @@ -1,10 +1,15 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use protobuf_build::Builder; - fn main() { - Builder::new() - .search_dir_for_protos("proto") - .append_to_black_list("eraftpb") - .generate() + tonic_build::configure() + .build_server(false) + .include_file("mod.rs") + .compile( + &glob::glob("proto/*.proto") + .unwrap() + .collect::, _>>() + .unwrap(), + &["include", "proto"], + ) + .unwrap(); } diff --git a/tikv-client-proto/src/lib.rs b/tikv-client-proto/src/lib.rs index 7587d789..8486d51d 100644 --- a/tikv-client-proto/src/lib.rs +++ b/tikv-client-proto/src/lib.rs @@ -1,11 +1,9 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +#![allow(clippy::large_enum_variant)] + pub use protos::*; -#[allow(dead_code)] -#[allow(clippy::all)] mod protos { - include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); - - use raft_proto::eraftpb; + include!(concat!(env!("OUT_DIR"), "/mod.rs")); } diff --git a/tikv-client-store/Cargo.toml b/tikv-client-store/Cargo.toml index 2c0c1d60..90d3fa4a 100644 --- a/tikv-client-store/Cargo.toml +++ b/tikv-client-store/Cargo.toml @@ -7,15 +7,15 @@ authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Low level TiKV node components of the TiKV Rust client" -[features] -protobuf-codec = ["grpcio/protobuf-codec"] -prost-codec = ["grpcio/prost-codec"] - [dependencies] async-trait = "0.1" derive-new = "0.5" -futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", default-features = false } +futures = { version = "0.3", features = [ + "compat", + "async-await", + "thread-pool", +] } log = "0.4" tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" } tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" } +tonic = "0.9" diff --git a/tikv-client-store/src/client.rs b/tikv-client-store/src/client.rs index 5e3534c5..81d16090 100644 --- a/tikv-client-store/src/client.rs +++ b/tikv-client-store/src/client.rs @@ -3,31 +3,33 @@ use crate::{request::Request, Result, SecurityManager}; use async_trait::async_trait; use derive_new::new; -use grpcio::{CallOption, Environment}; use std::{any::Any, sync::Arc, time::Duration}; -use tikv_client_proto::tikvpb::TikvClient; +use tikv_client_proto::tikvpb::tikv_client::TikvClient; +use tonic::transport::Channel; /// A trait for connecting to TiKV stores. +#[async_trait] pub trait KvConnect: Sized + Send + Sync + 'static { type KvClient: KvClient + Clone + Send + Sync + 'static; - fn connect(&self, address: &str) -> Result; + async fn connect(&self, address: &str) -> Result; } #[derive(new, Clone)] pub struct TikvConnect { - env: Arc, security_mgr: Arc, timeout: Duration, } +#[async_trait] impl KvConnect for TikvConnect { type KvClient = KvRpcClient; - fn connect(&self, address: &str) -> Result { + async fn connect(&self, address: &str) -> Result { self.security_mgr - .connect(self.env.clone(), address, TikvClient::new) - .map(|c| KvRpcClient::new(Arc::new(c), self.timeout)) + .connect(address, TikvClient::new) + .await + .map(|c| KvRpcClient::new(c, self.timeout)) } } @@ -40,18 +42,13 @@ pub trait KvClient { /// types and abstractions of the client program into the grpc data types. #[derive(new, Clone)] pub struct KvRpcClient { - rpc_client: Arc, + rpc_client: TikvClient, timeout: Duration, } #[async_trait] impl KvClient for KvRpcClient { async fn dispatch(&self, request: &dyn Request) -> Result> { - request - .dispatch( - &self.rpc_client, - CallOption::default().timeout(self.timeout), - ) - .await + request.dispatch(&self.rpc_client, self.timeout).await } } diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index f40afd0d..20877549 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -28,11 +28,7 @@ macro_rules! has_region_error { ($type:ty) => { impl HasRegionError for $type { fn region_error(&mut self) -> Option { - if self.has_region_error() { - Some(self.take_region_error().into()) - } else { - None - } + self.region_error.take().map(|e| e.into()) } } }; @@ -71,11 +67,7 @@ macro_rules! has_key_error { ($type:ty) => { impl HasKeyErrors for $type { fn key_errors(&mut self) -> Option> { - if self.has_error() { - Some(vec![self.take_error().into()]) - } else { - None - } + self.error.take().map(|e| vec![e.into()]) } } }; @@ -96,11 +88,11 @@ macro_rules! has_str_error { ($type:ty) => { impl HasKeyErrors for $type { fn key_errors(&mut self) -> Option> { - if self.get_error().is_empty() { + if self.error.is_empty() { None } else { Some(vec![Error::KvError { - message: self.take_error(), + message: std::mem::take(&mut self.error), }]) } } @@ -151,19 +143,19 @@ impl HasKeyErrors for kvrpcpb::RawBatchScanResponse { impl HasKeyErrors for kvrpcpb::PrewriteResponse { fn key_errors(&mut self) -> Option> { - extract_errors(self.take_errors().into_iter().map(Some)) + extract_errors(std::mem::take(&mut self.errors).into_iter().map(Some)) } } impl HasKeyErrors for kvrpcpb::PessimisticLockResponse { fn key_errors(&mut self) -> Option> { - extract_errors(self.take_errors().into_iter().map(Some)) + extract_errors(std::mem::take(&mut self.errors).into_iter().map(Some)) } } impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { fn key_errors(&mut self) -> Option> { - extract_errors(self.take_errors().into_iter().map(Some)) + extract_errors(std::mem::take(&mut self.errors).into_iter().map(Some)) } } diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index 290f142a..e808e2b4 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -2,13 +2,17 @@ use crate::{Error, Result}; use async_trait::async_trait; -use grpcio::CallOption; -use std::any::Any; -use tikv_client_proto::{kvrpcpb, tikvpb::TikvClient}; +use std::{any::Any, time::Duration}; +use tikv_client_proto::{kvrpcpb, tikvpb::tikv_client::TikvClient}; +use tonic::{transport::Channel, IntoRequest}; #[async_trait] pub trait Request: Any + Sync + Send + 'static { - async fn dispatch(&self, client: &TikvClient, options: CallOption) -> Result>; + async fn dispatch( + &self, + client: &TikvClient, + timeout: Duration, + ) -> Result>; fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; fn set_context(&mut self, context: kvrpcpb::Context); @@ -20,14 +24,17 @@ macro_rules! impl_request { impl Request for kvrpcpb::$name { async fn dispatch( &self, - client: &TikvClient, - options: CallOption, + client: &TikvClient, + timeout: Duration, ) -> Result> { + let mut req = self.clone().into_request(); + req.set_timeout(timeout); client - .$fun(self, options)? + .clone() + .$fun(req) .await .map(|r| Box::new(r) as Box) - .map_err(Error::Grpc) + .map_err(Error::GrpcAPI) } fn label(&self) -> &'static str { @@ -39,89 +46,53 @@ macro_rules! impl_request { } fn set_context(&mut self, context: kvrpcpb::Context) { - kvrpcpb::$name::set_context(self, context) + self.context = Some(context); } } }; } -impl_request!(RawGetRequest, raw_get_async_opt, "raw_get"); -impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get"); -impl_request!(RawPutRequest, raw_put_async_opt, "raw_put"); -impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put"); -impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete"); -impl_request!( - RawBatchDeleteRequest, - raw_batch_delete_async_opt, - "raw_batch_delete" -); -impl_request!(RawScanRequest, raw_scan_async_opt, "raw_scan"); -impl_request!( - RawBatchScanRequest, - raw_batch_scan_async_opt, - "raw_batch_scan" -); -impl_request!( - RawDeleteRangeRequest, - raw_delete_range_async_opt, - "raw_delete_range" -); -impl_request!( - RawCasRequest, - raw_compare_and_swap_async_opt, - "raw_compare_and_swap" -); -impl_request!( - RawCoprocessorRequest, - raw_coprocessor_async_opt, - "raw_coprocessor" -); +impl_request!(RawGetRequest, raw_get, "raw_get"); +impl_request!(RawBatchGetRequest, raw_batch_get, "raw_batch_get"); +impl_request!(RawPutRequest, raw_put, "raw_put"); +impl_request!(RawBatchPutRequest, raw_batch_put, "raw_batch_put"); +impl_request!(RawDeleteRequest, raw_delete, "raw_delete"); +impl_request!(RawBatchDeleteRequest, raw_batch_delete, "raw_batch_delete"); +impl_request!(RawScanRequest, raw_scan, "raw_scan"); +impl_request!(RawBatchScanRequest, raw_batch_scan, "raw_batch_scan"); +impl_request!(RawDeleteRangeRequest, raw_delete_range, "raw_delete_range"); +impl_request!(RawCasRequest, raw_compare_and_swap, "raw_compare_and_swap"); +impl_request!(RawCoprocessorRequest, raw_coprocessor, "raw_coprocessor"); -impl_request!(GetRequest, kv_get_async_opt, "kv_get"); -impl_request!(ScanRequest, kv_scan_async_opt, "kv_scan"); -impl_request!(PrewriteRequest, kv_prewrite_async_opt, "kv_prewrite"); -impl_request!(CommitRequest, kv_commit_async_opt, "kv_commit"); -impl_request!(CleanupRequest, kv_cleanup_async_opt, "kv_cleanup"); -impl_request!(BatchGetRequest, kv_batch_get_async_opt, "kv_batch_get"); -impl_request!( - BatchRollbackRequest, - kv_batch_rollback_async_opt, - "kv_batch_rollback" -); +impl_request!(GetRequest, kv_get, "kv_get"); +impl_request!(ScanRequest, kv_scan, "kv_scan"); +impl_request!(PrewriteRequest, kv_prewrite, "kv_prewrite"); +impl_request!(CommitRequest, kv_commit, "kv_commit"); +impl_request!(CleanupRequest, kv_cleanup, "kv_cleanup"); +impl_request!(BatchGetRequest, kv_batch_get, "kv_batch_get"); +impl_request!(BatchRollbackRequest, kv_batch_rollback, "kv_batch_rollback"); impl_request!( PessimisticRollbackRequest, - kv_pessimistic_rollback_async_opt, + kv_pessimistic_rollback, "kv_pessimistic_rollback" ); -impl_request!( - ResolveLockRequest, - kv_resolve_lock_async_opt, - "kv_resolve_lock" -); -impl_request!(ScanLockRequest, kv_scan_lock_async_opt, "kv_scan_lock"); +impl_request!(ResolveLockRequest, kv_resolve_lock, "kv_resolve_lock"); +impl_request!(ScanLockRequest, kv_scan_lock, "kv_scan_lock"); impl_request!( PessimisticLockRequest, - kv_pessimistic_lock_async_opt, + kv_pessimistic_lock, "kv_pessimistic_lock" ); -impl_request!( - TxnHeartBeatRequest, - kv_txn_heart_beat_async_opt, - "kv_txn_heart_beat" -); +impl_request!(TxnHeartBeatRequest, kv_txn_heart_beat, "kv_txn_heart_beat"); impl_request!( CheckTxnStatusRequest, - kv_check_txn_status_async_opt, + kv_check_txn_status, "kv_check_txn_status" ); impl_request!( CheckSecondaryLocksRequest, - kv_check_secondary_locks_async_opt, + kv_check_secondary_locks, "kv_check_secondary_locks_request" ); -impl_request!(GcRequest, kv_gc_async_opt, "kv_gc"); -impl_request!( - DeleteRangeRequest, - kv_delete_range_async_opt, - "kv_delete_range" -); +impl_request!(GcRequest, kv_gc, "kv_gc"); +impl_request!(DeleteRangeRequest, kv_delete_range, "kv_delete_range"); From 2974818393ba57d0b39cc1fdb0a821aa77f5ea17 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Sat, 8 Jul 2023 02:58:16 +0800 Subject: [PATCH 3/5] fix Signed-off-by: Andy Lok --- .github/workflows/ci.yml | 21 +++------------------ Cargo.toml | 1 + Makefile | 14 +++----------- examples/pessimistic.rs | 2 ++ examples/raw.rs | 3 ++- examples/transaction.rs | 2 ++ tests/common/mod.rs | 4 ++-- tests/failpoint_tests.rs | 3 +-- tikv-client-common/src/security.rs | 22 ++++++++++------------ tikv-client-proto/Cargo.toml | 3 +++ tikv-client-store/src/request.rs | 2 +- 11 files changed, 30 insertions(+), 47 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b40e4ab..78d15155 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,9 +10,6 @@ jobs: check: name: check runs-on: ubuntu-latest - strategy: - matrix: - env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -28,19 +25,14 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 - with: - key: ${{ matrix.env }} - name: make check - run: ${{ matrix.env }} make check + run: make check unit-test: name: unit test env: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest - strategy: - matrix: - env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -55,19 +47,14 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 - with: - key: ${{ matrix.env }} - name: unit test - run: ${{ matrix.env }} make unit-test + run: make unit-test integration-test: name: integration test env: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest - strategy: - matrix: - env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -82,8 +69,6 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 - with: - key: ${{ matrix.env }} - name: install tiup run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh - name: start tiup playground @@ -97,4 +82,4 @@ jobs: sleep 1 done - name: integration test - run: ${{ matrix.env }} MULTI_REGION=1 make integration-test + run: MULTI_REGION=1 make integration-test diff --git a/Cargo.toml b/Cargo.toml index 84ce0d68..68ac7b58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "native-tls-vendored", ] } serde_json = "1" +env_logger = "0.10" [workspace] members = [ diff --git a/Makefile b/Makefile index da470eb0..adadacdf 100644 --- a/Makefile +++ b/Makefile @@ -2,20 +2,12 @@ export RUSTFLAGS=-Dwarnings .PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all -ENABLE_FEATURES ?= PD_ADDRS ?= "127.0.0.1:2379" MULTI_REGION ?= 1 -# Use Rust-protobuf instead of Prost to encode and decode protocol buffers. -ifeq ($(RUST_PROTOBUF),1) -ENABLE_FEATURES += protobuf-codec -else -ENABLE_FEATURES += prost-codec -endif +ALL_FEATURES := integration-tests -ALL_FEATURES := ${ENABLE_FEATURES} integration-tests - -INTEGRATION_TEST_ARGS := --no-default-features --features "${ENABLE_FEATURES} integration-tests" +INTEGRATION_TEST_ARGS := --no-default-features --features "integration-tests" default: check @@ -25,7 +17,7 @@ check: cargo clippy --all-targets --no-default-features --features "${ALL_FEATURES}" -- -D clippy::all unit-test: - cargo test --all --no-default-features --features "${ENABLE_FEATURES}" + cargo test --all --no-default-features integration-test: cargo test txn_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index bfdb9e27..f61cce96 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -7,6 +7,8 @@ use tikv_client::{Config, Key, TransactionClient as Client, TransactionOptions, #[tokio::main] async fn main() { + env_logger::init(); + // You can try running this example by passing your pd endpoints // (and SSL options if necessary) through command line arguments. let args = parse_args("txn"); diff --git a/examples/raw.rs b/examples/raw.rs index 05f8b0a9..f5224f64 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -12,6 +12,8 @@ const VALUE: &str = "Rust"; #[tokio::main] async fn main() -> Result<()> { + env_logger::init(); + // You can try running this example by passing your pd endpoints // (and SSL options if necessary) through command line arguments. let args = parse_args("raw"); @@ -27,7 +29,6 @@ async fn main() -> Result<()> { // When we first create a client we receive a `Connect` structure which must be resolved before // the client is actually connected and usable. let client = Client::new_with_config(args.pd, config, None).await?; - let client = client.clone(); // Requests are created from the connected client. These calls return structures which // implement `Future`. This means the `Future` must be resolved before the action ever takes diff --git a/examples/transaction.rs b/examples/transaction.rs index a16dbd92..3c4092d2 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -69,6 +69,8 @@ async fn dels(client: &Client, keys: impl IntoIterator) { #[tokio::main] async fn main() { + env_logger::init(); + // You can try running this example by passing your pd endpoints // (and SSL options if necessary) through command line arguments. let args = parse_args("txn"); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 21f454ab..bbb5cc02 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,12 +2,12 @@ mod ctl; -use futures_timer::Delay; use log::{info, warn}; use rand::Rng; use slog::Drain; use std::{collections::HashSet, convert::TryInto, env, time::Duration}; use tikv_client::{ColumnFamily, Key, RawClient, Result, Transaction, TransactionClient}; +use tokio::time::sleep; const ENV_PD_ADDRS: &str = "PD_ADDRS"; const ENV_ENABLE_MULIT_REGION: &str = "MULTI_REGION"; @@ -88,7 +88,7 @@ async fn ensure_region_split( warn!("Stop splitting regions: time limit exceeded"); break; } - Delay::new(Duration::from_millis(200)).await; + sleep(Duration::from_millis(200)).await; } Ok(()) diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 8b914c85..477a609f 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -348,8 +348,7 @@ async fn count_locks(client: &TransactionClient) -> Result { let ts = client.current_timestamp().await.unwrap(); let locks = client.scan_locks(&ts, vec![].., 1024).await?; // De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes. - let locks_set: HashSet> = - HashSet::from_iter(locks.into_iter().map(|mut l| l.take_key())); + let locks_set: HashSet> = HashSet::from_iter(locks.into_iter().map(|l| l.key)); Ok(locks_set.len()) } diff --git a/tikv-client-common/src/security.rs b/tikv-client-common/src/security.rs index d030314f..77a45206 100644 --- a/tikv-client-common/src/security.rs +++ b/tikv-client-common/src/security.rs @@ -72,27 +72,25 @@ impl SecurityManager { where Factory: FnOnce(Channel) -> Client, { + let addr = "http://".to_string() + &SCHEME_REG.replace(addr, ""); + info!("connect to rpc server at endpoint: {:?}", addr); - let addr = SCHEME_REG.replace(addr, "").into_owned(); + let mut builder = Channel::from_shared(addr)? + .tcp_keepalive(Some(Duration::from_secs(10))) + .keep_alive_timeout(Duration::from_secs(3)); - let tls = if self.ca.is_empty() { - ClientTlsConfig::default() - } else { - ClientTlsConfig::new() + if !self.ca.is_empty() { + let tls = ClientTlsConfig::new() .ca_certificate(Certificate::from_pem(&self.ca)) .identity(Identity::from_pem( &self.cert, load_pem_file("private key", &self.key)?, - )) + )); + builder = builder.tls_config(tls)?; }; - let ch = Channel::from_shared(addr)? - .tcp_keepalive(Some(Duration::from_secs(10))) - .keep_alive_timeout(Duration::from_secs(3)) - .tls_config(tls)? - .connect() - .await?; + let ch = builder.connect().await?; Ok(factory(ch)) } diff --git a/tikv-client-proto/Cargo.toml b/tikv-client-proto/Cargo.toml index c4ae3def..adb4b576 100644 --- a/tikv-client-proto/Cargo.toml +++ b/tikv-client-proto/Cargo.toml @@ -17,3 +17,6 @@ futures = "0.3" lazy_static = { version = "1" } tonic = "0.9" prost = "0.11" + +[lib] +doctest = false diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index e808e2b4..eb684b00 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -33,7 +33,7 @@ macro_rules! impl_request { .clone() .$fun(req) .await - .map(|r| Box::new(r) as Box) + .map(|r| Box::new(r.into_inner()) as Box) .map_err(Error::GrpcAPI) } From bfba85692415c2168a6562ee9c17ffe805c01d40 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Sat, 8 Jul 2023 03:16:28 +0800 Subject: [PATCH 4/5] fix Signed-off-by: Andy Lok --- examples/pessimistic.rs | 2 +- src/lib.rs | 1 + src/request/plan.rs | 4 ++-- src/transaction/buffer.rs | 2 +- tests/failpoint_tests.rs | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index f61cce96..e7455c57 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -34,7 +34,7 @@ async fn main() { .begin_optimistic() .await .expect("Could not begin a transaction"); - for (key, value) in vec![(key1.clone(), value1), (key2, value2)] { + for (key, value) in [(key1.clone(), value1), (key2, value2)] { txn0.put(key, value).await.expect("Could not set key value"); } txn0.commit().await.expect("Could not commit"); diff --git a/src/lib.rs b/src/lib.rs index 4d9fd002..f075904b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ // To support both prost & rust-protobuf. #![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))] #![allow(clippy::field_reassign_with_default)] +#![allow(clippy::arc_with_non_send_sync)] #[macro_use] pub mod request; diff --git a/src/request/plan.rs b/src/request/plan.rs index 55f06f4c..50a490e0 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -471,9 +471,9 @@ impl Merge for Collect { fn merge(&self, input: Vec>) -> Result { input .into_iter() - .fold(Ok(CleanupLocksResult::default()), |acc, x| { + .try_fold(CleanupLocksResult::default(), |acc, x| { Ok(CleanupLocksResult { - resolved_locks: acc?.resolved_locks + x?.resolved_locks, + resolved_locks: acc.resolved_locks + x?.resolved_locks, ..Default::default() }) }) diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 611f5630..41feb201 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -101,7 +101,7 @@ impl Buffer { self.update_cache(key, value); } - let results = cached_results.chain(fetched_results.into_iter()); + let results = cached_results.chain(fetched_results); Ok(results) } diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 477a609f..35b078f9 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -229,7 +229,7 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { assert_eq!(count_locks(&client).await?, keys.len()); info!(logger, "total keys' count {}", keys.len()); - let mut sorted_keys: Vec> = Vec::from_iter(keys.clone().into_iter()); + let mut sorted_keys: Vec> = Vec::from_iter(keys.clone()); sorted_keys.sort(); let start_key = sorted_keys[1].clone(); let end_key = sorted_keys[sorted_keys.len() - 2].clone(); From e15824fe1ad8f59faec55a38ce4e9cb6521c81a2 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Sat, 8 Jul 2023 13:32:04 +0800 Subject: [PATCH 5/5] address comment Signed-off-by: Andy Lok --- Cargo.toml | 29 ++++++++++++++-------------- taplo.toml | 36 +++++++++++++++++++++++++++++++++++ tikv-client-common/Cargo.toml | 8 ++++---- tikv-client-pd/Cargo.toml | 6 ++---- tikv-client-proto/Cargo.toml | 4 ++-- tikv-client-store/Cargo.toml | 6 +++--- 6 files changed, 61 insertions(+), 28 deletions(-) create mode 100644 taplo.toml diff --git a/Cargo.toml b/Cargo.toml index 68ac7b58..73325b91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ integration-tests = [] name = "tikv_client" [dependencies] +async-recursion = "0.3" async-trait = "0.1" derive-new = "0.5" either = "1.6" @@ -32,40 +33,38 @@ semver = "1.0" serde = "1.0" serde_derive = "1.0" slog = { version = "2.3", features = [ - "max_level_trace", - "release_max_level_debug", + "max_level_trace", + "release_max_level_debug", ] } slog-term = { version = "2.4" } thiserror = "1" -tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } -async-recursion = "0.3" -tonic = "0.9" tikv-client-common = { version = "0.2.0", path = "tikv-client-common" } tikv-client-pd = { version = "0.2.0", path = "tikv-client-pd" } tikv-client-proto = { version = "0.2.0", path = "tikv-client-proto" } tikv-client-store = { version = "0.2.0", path = "tikv-client-store" } - +tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } +tonic = "0.9" [dev-dependencies] clap = "2" +env_logger = "0.10" fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" -serial_test = "0.5.0" -simple_logger = "1" -tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } reqwest = { version = "0.11", default-features = false, features = [ - "native-tls-vendored", + "native-tls-vendored", ] } serde_json = "1" -env_logger = "0.10" +serial_test = "0.5.0" +simple_logger = "1" +tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } [workspace] members = [ - "tikv-client-common", - "tikv-client-pd", - "tikv-client-proto", - "tikv-client-store", + "tikv-client-common", + "tikv-client-pd", + "tikv-client-proto", + "tikv-client-store", ] [[test]] diff --git a/taplo.toml b/taplo.toml new file mode 100644 index 00000000..b4640945 --- /dev/null +++ b/taplo.toml @@ -0,0 +1,36 @@ +## https://taplo.tamasfe.dev/configuration/file.html + +include = ["**/Cargo.toml"] + +[formatting] +# Align consecutive entries vertically. +align_entries = false +# Append trailing commas for multi-line arrays. +array_trailing_comma = true +# Expand arrays to multiple lines that exceed the maximum column width. +array_auto_expand = true +# Collapse arrays that don't exceed the maximum column width and don't contain comments. +array_auto_collapse = false +# Omit white space padding from single-line arrays +compact_arrays = true +# Omit white space padding from the start and end of inline tables. +compact_inline_tables = false +# Maximum column width in characters, affects array expansion and collapse, this doesn't take whitespace into account. +# Note that this is not set in stone, and works on a best-effort basis. +column_width = 120 +# Indent based on tables and arrays of tables and their subtables, subtables out of order are not indented. +indent_tables = false +# The substring that is used for indentation, should be tabs or spaces (but technically can be anything). +indent_string = ' ' +# Add trailing newline at the end of the file if not present. +trailing_newline = true +# Alphabetically reorder keys that are not separated by empty lines. +reorder_keys = false +# Maximum amount of allowed consecutive blank lines. This does not affect the whitespace at the end of the document, as it is always stripped. +allowed_blank_lines = 1 +# Use CRLF for line endings. +crlf = false + +[[rule]] +keys = ["dependencies", "dev-dependencies", "build-dependencies"] +formatting = { reorder_keys = true } diff --git a/tikv-client-common/Cargo.toml b/tikv-client-common/Cargo.toml index 9210ddb1..9e9dcf5a 100644 --- a/tikv-client-common/Cargo.toml +++ b/tikv-client-common/Cargo.toml @@ -8,16 +8,16 @@ repository = "https://github.com/tikv/client-rust" description = "Common components of the TiKV Rust client" [dependencies] -thiserror = "1" futures = { version = "0.3", features = [ - "compat", - "async-await", - "thread-pool", + "compat", + "async-await", + "thread-pool", ] } lazy_static = "1" log = "0.4" regex = "1" semver = "1" +thiserror = "1" tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" } tokio = "1" tonic = { version = "0.9", features = ["tls"] } diff --git a/tikv-client-pd/Cargo.toml b/tikv-client-pd/Cargo.toml index c1654750..c66b88ed 100644 --- a/tikv-client-pd/Cargo.toml +++ b/tikv-client-pd/Cargo.toml @@ -10,14 +10,12 @@ description = "Low level PD components for the TiKV Rust client" [dependencies] async-trait = "0.1" futures = "0.3" -# futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } log = "0.4" +pin-project = "1" tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" } tikv-client-proto = { version = "0.2.0", path = "../tikv-client-proto" } -tonic = "0.9" -pin-project = "1" tokio = { version = "1", features = ["sync"] } - +tonic = "0.9" [dev-dependencies] clap = "2" diff --git a/tikv-client-proto/Cargo.toml b/tikv-client-proto/Cargo.toml index adb4b576..482cc5d3 100644 --- a/tikv-client-proto/Cargo.toml +++ b/tikv-client-proto/Cargo.toml @@ -9,14 +9,14 @@ description = "Protobuf specs for the TiKV Rust client" build = "build.rs" [build-dependencies] -tonic-build = "0.9" glob = "0.3.1" +tonic-build = "0.9" [dependencies] futures = "0.3" lazy_static = { version = "1" } -tonic = "0.9" prost = "0.11" +tonic = "0.9" [lib] doctest = false diff --git a/tikv-client-store/Cargo.toml b/tikv-client-store/Cargo.toml index 90d3fa4a..1ab71824 100644 --- a/tikv-client-store/Cargo.toml +++ b/tikv-client-store/Cargo.toml @@ -11,9 +11,9 @@ description = "Low level TiKV node components of the TiKV Rust client" async-trait = "0.1" derive-new = "0.5" futures = { version = "0.3", features = [ - "compat", - "async-await", - "thread-pool", + "compat", + "async-await", + "thread-pool", ] } log = "0.4" tikv-client-common = { version = "0.2.0", path = "../tikv-client-common" }