Skip to content

Commit 4dd056f

Browse files
authored
Merge pull request #8736 from xudong963/rename
refactor: remove `V2` postfix in cluster
2 parents 8940967 + 92f4ce8 commit 4dd056f

File tree

13 files changed

+38
-59
lines changed

13 files changed

+38
-59
lines changed

src/query/service/src/api/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub use rpc::MergeExchange;
3333
pub use rpc::PrecommitBlock;
3434
pub use rpc::QueryFragmentsPlanPacket;
3535
pub use rpc::ServerFlightExchange;
36-
pub use rpc::ShuffleDataExchangeV2;
36+
pub use rpc::ShuffleDataExchange;
3737
pub use rpc_service::RpcService;
3838

3939
pub mod http;

src/query/service/src/api/rpc/exchange/data_exchange.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,36 @@ use common_sql::executor::PhysicalScalar;
1818
pub enum DataExchange {
1919
Merge(MergeExchange),
2020
Broadcast(BroadcastExchange),
21-
ShuffleDataExchangeV2(ShuffleDataExchangeV2),
21+
ShuffleDataExchange(ShuffleDataExchange),
2222
}
2323

2424
impl DataExchange {
2525
pub fn get_destinations(&self) -> Vec<String> {
2626
match self {
2727
DataExchange::Merge(exchange) => vec![exchange.destination_id.clone()],
2828
DataExchange::Broadcast(exchange) => exchange.destination_ids.clone(),
29-
DataExchange::ShuffleDataExchangeV2(exchange) => exchange.destination_ids.clone(),
29+
DataExchange::ShuffleDataExchange(exchange) => exchange.destination_ids.clone(),
3030
}
3131
}
3232

3333
pub fn from_multiple_nodes(&self) -> bool {
3434
match self {
3535
DataExchange::Merge(_) => true,
36-
DataExchange::ShuffleDataExchangeV2(_) => true,
36+
DataExchange::ShuffleDataExchange(_) => true,
3737
DataExchange::Broadcast(exchange) => exchange.from_multiple_nodes,
3838
}
3939
}
4040
}
4141

4242
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43-
pub struct ShuffleDataExchangeV2 {
43+
pub struct ShuffleDataExchange {
4444
pub destination_ids: Vec<String>,
4545
pub shuffle_keys: Vec<PhysicalScalar>,
4646
}
4747

48-
impl ShuffleDataExchangeV2 {
48+
impl ShuffleDataExchange {
4949
pub fn create(destination_ids: Vec<String>, shuffle_keys: Vec<PhysicalScalar>) -> DataExchange {
50-
DataExchange::ShuffleDataExchangeV2(ShuffleDataExchangeV2 {
50+
DataExchange::ShuffleDataExchange(ShuffleDataExchange {
5151
destination_ids,
5252
shuffle_keys,
5353
})

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::api::rpc::exchange::statistics_receiver::StatisticsReceiver;
4141
use crate::api::rpc::exchange::statistics_sender::StatisticsSender;
4242
use crate::api::rpc::flight_client::FlightExchange;
4343
use crate::api::rpc::flight_scatter_broadcast::BroadcastFlightScatter;
44-
use crate::api::rpc::flight_scatter_hash_v2::HashFlightScatterV2;
44+
use crate::api::rpc::flight_scatter_hash::HashFlightScatter;
4545
use crate::api::rpc::Packet;
4646
use crate::api::DataExchange;
4747
use crate::api::FlightClient;
@@ -636,14 +636,14 @@ impl FragmentCoordinator {
636636
)?)),
637637
}))
638638
}
639-
Some(DataExchange::ShuffleDataExchangeV2(exchange)) => {
639+
Some(DataExchange::ShuffleDataExchange(exchange)) => {
640640
Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
641641
schema: self.payload.schema()?,
642642
fragment_id: self.fragment_id,
643643
query_id: info.query_id.to_string(),
644644
executor_id: info.current_executor.to_string(),
645645
destination_ids: exchange.destination_ids.to_owned(),
646-
shuffle_scatter: Arc::new(HashFlightScatterV2::try_create(
646+
shuffle_scatter: Arc::new(HashFlightScatter::try_create(
647647
info.query_ctx.try_get_function_context()?,
648648
exchange.shuffle_keys.clone(),
649649
exchange.destination_ids.len(),
@@ -658,7 +658,7 @@ impl FragmentCoordinator {
658658
self.initialized = true;
659659

660660
match &self.payload {
661-
FragmentPayload::PlanV2(plan) => {
661+
FragmentPayload::Plan(plan) => {
662662
let pipeline_ctx = QueryContext::create_from(ctx);
663663
let pipeline_builder = PipelineBuilderV2::create(pipeline_ctx);
664664
self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?);

src/query/service/src/api/rpc/exchange/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ mod statistics_sender;
2626
pub use data_exchange::BroadcastExchange;
2727
pub use data_exchange::DataExchange;
2828
pub use data_exchange::MergeExchange;
29-
pub use data_exchange::ShuffleDataExchangeV2;
29+
pub use data_exchange::ShuffleDataExchange;
3030
pub use exchange_manager::DataExchangeManager;

src/query/service/src/api/rpc/flight_scatter_hash_v2.rs renamed to src/query/service/src/api/rpc/flight_scatter_hash.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ use crate::sql::evaluator::Evaluator;
3030
use crate::sql::evaluator::TypedVector;
3131

3232
#[derive(Clone)]
33-
pub struct HashFlightScatterV2 {
33+
pub struct HashFlightScatter {
3434
func_ctx: FunctionContext,
3535
hash_keys: Vec<EvalNode>,
3636
hash_functions: Vec<Box<dyn Function>>,
3737
scatter_size: usize,
3838
}
3939

40-
impl HashFlightScatterV2 {
40+
impl HashFlightScatter {
4141
pub fn try_create(
4242
func_ctx: FunctionContext,
4343
scalars: Vec<PhysicalScalar>,
@@ -187,7 +187,7 @@ impl FlightScatter for OneHashKeyFlightScatter {
187187
}
188188
}
189189

190-
impl FlightScatter for HashFlightScatterV2 {
190+
impl FlightScatter for HashFlightScatter {
191191
fn execute(&self, data_block: &DataBlock, _num: usize) -> Result<Vec<DataBlock>> {
192192
let hash_keys = self
193193
.hash_keys

src/query/service/src/api/rpc/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ mod flight_actions;
2121
mod flight_client;
2222
mod flight_scatter;
2323
mod flight_scatter_broadcast;
24-
mod flight_scatter_hash_v2;
24+
mod flight_scatter_hash;
2525
mod flight_service;
2626
mod packets;
2727
mod request_builder;
@@ -30,7 +30,7 @@ pub use exchange::BroadcastExchange;
3030
pub use exchange::DataExchange;
3131
pub use exchange::DataExchangeManager;
3232
pub use exchange::MergeExchange;
33-
pub use exchange::ShuffleDataExchangeV2;
33+
pub use exchange::ShuffleDataExchange;
3434
pub use flight_client::ClientFlightExchange;
3535
pub use flight_client::ServerFlightExchange;
3636
pub use packets::ConnectionInfo;

src/query/service/src/api/rpc/packets/packet_fragment.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,21 @@ use crate::sql::executor::PhysicalPlan;
2626
#[allow(clippy::large_enum_variant)]
2727
#[derive(Clone, serde::Serialize, serde::Deserialize)]
2828
pub enum FragmentPayload {
29-
PlanV2(PhysicalPlan),
29+
Plan(PhysicalPlan),
3030
}
3131

3232
impl Debug for FragmentPayload {
3333
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
3434
match self {
35-
FragmentPayload::PlanV2(plan) => write!(f, "PhysicalPlan({:?})", plan),
35+
FragmentPayload::Plan(plan) => write!(f, "PhysicalPlan({:?})", plan),
3636
}
3737
}
3838
}
3939

4040
impl FragmentPayload {
4141
pub fn schema(&self) -> Result<DataSchemaRef> {
4242
match self {
43-
FragmentPayload::PlanV2(plan) => plan.output_schema(),
43+
FragmentPayload::Plan(plan) => plan.output_schema(),
4444
}
4545
}
4646
}

src/query/service/src/interpreters/fragments/v2/fragmenter.rs renamed to src/query/service/src/interpreters/fragments/fragmenter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ use common_catalog::table_context::TableContext;
1818
use common_exception::Result;
1919
use common_sql::executor::FragmentKind;
2020

21-
use super::FragmentType;
22-
use super::PlanFragment;
2321
use crate::api::BroadcastExchange;
2422
use crate::api::DataExchange;
2523
use crate::api::MergeExchange;
26-
use crate::api::ShuffleDataExchangeV2;
24+
use crate::api::ShuffleDataExchange;
2725
use crate::clusters::ClusterHelper;
26+
use crate::interpreters::fragments::plan_fragment::FragmentType;
27+
use crate::interpreters::fragments::PlanFragment;
2828
use crate::sessions::QueryContext;
2929
use crate::sql::executor::Exchange;
3030
use crate::sql::executor::ExchangeSink;
@@ -76,7 +76,7 @@ impl Fragmenter {
7676
) -> Result<Option<DataExchange>> {
7777
match plan {
7878
PhysicalPlan::ExchangeSink(plan) => match plan.kind {
79-
FragmentKind::Normal => Ok(Some(ShuffleDataExchangeV2::create(
79+
FragmentKind::Normal => Ok(Some(ShuffleDataExchange::create(
8080
Self::get_executors(ctx),
8181
plan.keys.clone(),
8282
))),

src/query/service/src/interpreters/fragments/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod fragmenter;
16+
mod plan_fragment;
1517
mod query_fragment_actions;
1618
mod query_fragment_actions_display;
17-
mod v2;
1819

20+
pub use fragmenter::Fragmenter;
21+
pub use plan_fragment::PlanFragment;
1922
pub use query_fragment_actions::QueryFragmentAction;
2023
pub use query_fragment_actions::QueryFragmentActions;
2124
pub use query_fragment_actions::QueryFragmentsActions;
22-
pub use v2::Fragmenter;
23-
pub use v2::PlanFragment;

src/query/service/src/interpreters/fragments/v2/plan_fragment.rs renamed to src/query/service/src/interpreters/fragments/plan_fragment.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use common_catalog::plan::DataSourcePlan;
1818
use common_exception::ErrorCode;
1919
use common_exception::Result;
2020

21-
use super::Fragmenter;
2221
use crate::api::DataExchange;
22+
use crate::interpreters::fragments::Fragmenter;
2323
use crate::interpreters::QueryFragmentAction;
2424
use crate::interpreters::QueryFragmentActions;
2525
use crate::interpreters::QueryFragmentsActions;
@@ -69,7 +69,7 @@ impl PlanFragment {
6969

7070
match &self.fragment_type {
7171
FragmentType::Root => {
72-
let action = QueryFragmentAction::create_v2(
72+
let action = QueryFragmentAction::create(
7373
Fragmenter::get_local_executor(ctx),
7474
self.plan.clone(),
7575
);
@@ -87,15 +87,15 @@ impl PlanFragment {
8787
{
8888
// If this is a intermediate fragment with merge input,
8989
// we will only send it to coordinator node.
90-
let action = QueryFragmentAction::create_v2(
90+
let action = QueryFragmentAction::create(
9191
Fragmenter::get_local_executor(ctx),
9292
self.plan.clone(),
9393
);
9494
fragment_actions.add_action(action);
9595
} else {
9696
// Otherwise distribute the fragment to all the executors.
9797
for executor in Fragmenter::get_executors(ctx) {
98-
let action = QueryFragmentAction::create_v2(executor, self.plan.clone());
98+
let action = QueryFragmentAction::create(executor, self.plan.clone());
9999
fragment_actions.add_action(action);
100100
}
101101
}
@@ -155,10 +155,8 @@ impl PlanFragment {
155155
};
156156
plan = replace_read_source.replace(&plan)?;
157157

158-
fragment_actions.add_action(QueryFragmentAction::create_v2(
159-
executor.clone(),
160-
plan.clone(),
161-
));
158+
fragment_actions
159+
.add_action(QueryFragmentAction::create(executor.clone(), plan.clone()));
162160
}
163161

164162
Ok(fragment_actions)

0 commit comments

Comments
 (0)