Skip to content

Commit 834550a

Browse files
add querier metadata
1 parent aada727 commit 834550a

File tree

12 files changed

+219
-146
lines changed

12 files changed

+219
-146
lines changed

src/analytics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
http::{
3737
base_path_without_preceding_slash,
3838
cluster::{self, utils::check_liveness},
39+
modal::NodeMetadata,
3940
},
4041
STREAM_NAME_HEADER_KEY,
4142
},
@@ -228,7 +229,7 @@ async fn fetch_ingestors_metrics(
228229
// send analytics for ingest servers
229230

230231
// ingestor infos should be valid here, if not some thing is wrong
231-
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();
232+
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info("ingestor").await.unwrap();
232233

233234
for im in ingestor_infos {
234235
if !check_liveness(&im.domain_name).await {

src/catalog/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use tracing::{error, info};
2828

2929
use crate::{
3030
event::DEFAULT_TIMESTAMP_KEY,
31-
handlers::{self, http::base_path_without_preceding_slash},
31+
handlers::{
32+
self,
33+
http::{base_path_without_preceding_slash, modal::NodeMetadata},
34+
},
3235
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
3336
option::Mode,
3437
parseable::PARSEABLE,
@@ -406,8 +409,8 @@ pub async fn get_first_event(
406409
}
407410
}
408411
Mode::Query => {
409-
let ingestor_metadata =
410-
handlers::http::cluster::get_ingestor_info()
412+
let ingestor_metadata: Vec<NodeMetadata> =
413+
handlers::http::cluster::get_node_info("ingestor")
411414
.await
412415
.map_err(|err| {
413416
error!("Fatal: failed to get ingestor info: {:?}", err);

src/cli.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ pub struct Options {
328328
)]
329329
pub indexer_endpoint: String,
330330

331+
#[arg(
332+
long,
333+
env = "P_QUERIER_ENDPOINT",
334+
default_value = "",
335+
help = "URL to connect to this specific querier. Default is the address of the server"
336+
)]
337+
pub querier_endpoint: String,
338+
331339
#[command(flatten)]
332340
pub oidc: Option<OidcConfig>,
333341

@@ -470,6 +478,20 @@ impl Options {
470478
}
471479
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
472480
}
481+
Mode::Query => {
482+
if self.querier_endpoint.is_empty() {
483+
return format!(
484+
"{}://{}",
485+
self.get_scheme(),
486+
self.address
487+
)
488+
.parse::<Url>() // if the value was improperly set, this will panic before hand
489+
.unwrap_or_else(|err| {
490+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
491+
});
492+
}
493+
(&self.querier_endpoint, "P_QUERIER_ENDPOINT")
494+
}
473495
_ => panic!("Invalid mode"),
474496
};
475497

src/handlers/airplane.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use futures_util::{Future, TryFutureExt};
3333
use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

36-
use crate::handlers::http::cluster::get_ingestor_info;
36+
use crate::handlers::http::cluster::get_node_info;
37+
use crate::handlers::http::modal::NodeMetadata;
3738
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3839
use crate::handlers::livetail::cross_origin_config;
3940
use crate::metrics::QUERY_EXECUTE_TIME;
@@ -179,7 +180,7 @@ impl FlightService for AirServiceImpl {
179180
})
180181
.to_string();
181182

182-
let ingester_metadatas = get_ingestor_info()
183+
let ingester_metadatas: Vec<NodeMetadata> = get_node_info("ingestor")
183184
.await
184185
.map_err(|err| Status::failed_precondition(err.to_string()))?;
185186
let mut minute_result: Vec<RecordBatch> = vec![];

src/handlers/http/cluster/mod.rs

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,10 @@ use crate::HTTP_CLIENT;
5454
use super::base_path_without_preceding_slash;
5555
use super::ingest::PostError;
5656
use super::logstream::error::StreamError;
57-
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata};
57+
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata, NodeMetadata, QuerierMetadata};
5858
use super::rbac::RBACError;
5959
use super::role::RoleError;
6060

61-
type IngestorMetadataArr = Vec<IngestorMetadata>;
62-
63-
type IndexerMetadataArr = Vec<IndexerMetadata>;
64-
6561
pub const INTERNAL_STREAM_NAME: &str = "pmeta";
6662

6763
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
@@ -77,7 +73,7 @@ pub async fn sync_streams_with_ingestors(
7773
for (key, value) in headers.iter() {
7874
reqwest_headers.insert(key.clone(), value.clone());
7975
}
80-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
76+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
8177
error!("Fatal: failed to get ingestor info: {:?}", err);
8278
StreamError::Anyhow(err)
8379
})?;
@@ -125,7 +121,7 @@ pub async fn sync_users_with_roles_with_ingestors(
125121
username: &String,
126122
role: &HashSet<String>,
127123
) -> Result<(), RBACError> {
128-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
124+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
129125
error!("Fatal: failed to get ingestor info: {:?}", err);
130126
RBACError::Anyhow(err)
131127
})?;
@@ -175,7 +171,7 @@ pub async fn sync_users_with_roles_with_ingestors(
175171

176172
// forward the delete user request to all ingestors to keep them in sync
177173
pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> {
178-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
174+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
179175
error!("Fatal: failed to get ingestor info: {:?}", err);
180176
RBACError::Anyhow(err)
181177
})?;
@@ -222,7 +218,7 @@ pub async fn sync_user_creation_with_ingestors(
222218
user: User,
223219
role: &Option<HashSet<String>>,
224220
) -> Result<(), RBACError> {
225-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
221+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
226222
error!("Fatal: failed to get ingestor info: {:?}", err);
227223
RBACError::Anyhow(err)
228224
})?;
@@ -280,7 +276,7 @@ pub async fn sync_user_creation_with_ingestors(
280276

281277
// forward the password reset request to all ingestors to keep them in sync
282278
pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> {
283-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
279+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
284280
error!("Fatal: failed to get ingestor info: {:?}", err);
285281
RBACError::Anyhow(err)
286282
})?;
@@ -328,7 +324,7 @@ pub async fn sync_role_update_with_ingestors(
328324
name: String,
329325
privileges: Vec<DefaultPrivilege>,
330326
) -> Result<(), RoleError> {
331-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
327+
let ingestor_infos: Vec<NodeMetadata> = get_node_info("ingestor").await.map_err(|err| {
332328
error!("Fatal: failed to get ingestor info: {:?}", err);
333329
RoleError::Anyhow(err)
334330
})?;
@@ -545,34 +541,48 @@ pub async fn send_retention_cleanup_request(
545541

546542
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
547543
// Get ingestor and indexer metadata concurrently
548-
let (ingestor_result, indexer_result) =
549-
future::join(get_ingestor_info(), get_indexer_info()).await;
544+
let (querier_result, ingestor_result, indexer_result) = future::join3(
545+
get_node_info("querier"),
546+
get_node_info("ingestor"),
547+
get_node_info("indexer"),
548+
)
549+
.await;
550+
551+
// Handle querier metadata result
552+
let querier_metadata: Vec<NodeMetadata> = querier_result
553+
.map_err(|err| {
554+
error!("Fatal: failed to get querier info: {:?}", err);
555+
PostError::Invalid(err)
556+
})
557+
.map_err(|err| StreamError::Anyhow(err.into()))?;
550558

551559
// Handle ingestor metadata result
552-
let ingestor_metadata = ingestor_result
560+
let ingestor_metadata: Vec<NodeMetadata> = ingestor_result
553561
.map_err(|err| {
554562
error!("Fatal: failed to get ingestor info: {:?}", err);
555563
PostError::Invalid(err)
556564
})
557565
.map_err(|err| StreamError::Anyhow(err.into()))?;
558566

559567
// Handle indexer metadata result
560-
let indexer_metadata = indexer_result
568+
let indexer_metadata: Vec<NodeMetadata> = indexer_result
561569
.map_err(|err| {
562570
error!("Fatal: failed to get indexer info: {:?}", err);
563571
PostError::Invalid(err)
564572
})
565573
.map_err(|err| StreamError::Anyhow(err.into()))?;
566574

567575
// Fetch info for both node types concurrently
568-
let (ingestor_infos, indexer_infos) = future::join(
576+
let (querier_infos, ingestor_infos, indexer_infos) = future::join3(
577+
fetch_nodes_info(querier_metadata),
569578
fetch_nodes_info(ingestor_metadata),
570579
fetch_nodes_info(indexer_metadata),
571580
)
572581
.await;
573582

574583
// Combine results from both node types
575584
let mut infos = Vec::new();
585+
infos.extend(querier_infos?);
576586
infos.extend(ingestor_infos?);
577587
infos.extend(indexer_infos?);
578588

@@ -671,40 +681,22 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
671681
Ok(actix_web::HttpResponse::Ok().json(dresses))
672682
}
673683

674-
pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
684+
pub async fn get_node_info<T: Metadata + DeserializeOwned>(prefix: &str) -> anyhow::Result<Vec<T>> {
675685
let store = PARSEABLE.storage.get_object_store();
676-
677686
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
678-
let arr = store
679-
.get_objects(
680-
Some(&root_path),
681-
Box::new(|file_name| file_name.starts_with("ingestor")),
682-
)
683-
.await?
684-
.iter()
685-
// this unwrap will most definateley shoot me in the foot later
686-
.map(|x| serde_json::from_slice::<IngestorMetadata>(x).unwrap_or_default())
687-
.collect_vec();
688-
689-
Ok(arr)
690-
}
687+
let prefix_owned = prefix.to_string(); // Create an owned copy of the prefix
691688

692-
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
693-
let store = PARSEABLE.storage.get_object_store();
694-
695-
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
696-
let arr = store
689+
let metadata = store
697690
.get_objects(
698691
Some(&root_path),
699-
Box::new(|file_name| file_name.starts_with("indexer")),
692+
Box::new(move |file_name| file_name.starts_with(&prefix_owned)), // Use the owned copy
700693
)
701694
.await?
702695
.iter()
703-
// this unwrap will most definateley shoot me in the foot later
704-
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
705-
.collect_vec();
696+
.filter_map(|x| serde_json::from_slice::<T>(x).ok())
697+
.collect();
706698

707-
Ok(arr)
699+
Ok(metadata)
708700
}
709701

710702
pub async fn remove_node(node_url: Path<String>) -> Result<impl Responder, PostError> {
@@ -725,7 +717,11 @@ pub async fn remove_node(node_url: Path<String>) -> Result<impl Responder, PostE
725717
let removed_indexer =
726718
remove_node_metadata::<IndexerMetadata>(&object_store, &domain_name).await?;
727719

728-
let msg = if removed_ingestor || removed_indexer {
720+
// Delete querier metadata
721+
let removed_querier =
722+
remove_node_metadata::<QuerierMetadata>(&object_store, &domain_name).await?;
723+
724+
let msg = if removed_ingestor || removed_indexer || removed_querier {
729725
format!("node {} removed successfully", domain_name)
730726
} else {
731727
format!("node {} is not found", domain_name)
@@ -836,6 +832,9 @@ where
836832
T: Metadata + Send + Sync + 'static,
837833
{
838834
let nodes_len = nodes.len();
835+
if nodes_len == 0 {
836+
return Ok(vec![]);
837+
}
839838
let results = stream::iter(nodes)
840839
.map(|node| async move { fetch_node_metrics(&node).await })
841840
.buffer_unordered(nodes_len) // No concurrency limit
@@ -858,23 +857,31 @@ where
858857
/// Main function to fetch all cluster metrics, parallelized and refactored
859858
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
860859
// Get ingestor and indexer metadata concurrently
861-
let (ingestor_result, indexer_result) =
862-
future::join(get_ingestor_info(), get_indexer_info()).await;
860+
let (querier_result, ingestor_result, indexer_result) = future::join3(
861+
get_node_info("querier"),
862+
get_node_info("ingestor"),
863+
get_node_info("indexer"),
864+
)
865+
.await;
863866

867+
// Handle querier metadata result
868+
let querier_metadata: Vec<NodeMetadata> = querier_result.map_err(|err| {
869+
error!("Fatal: failed to get querier info: {:?}", err);
870+
PostError::Invalid(err)
871+
})?;
864872
// Handle ingestor metadata result
865-
let ingestor_metadata = ingestor_result.map_err(|err| {
873+
let ingestor_metadata: Vec<NodeMetadata> = ingestor_result.map_err(|err| {
866874
error!("Fatal: failed to get ingestor info: {:?}", err);
867875
PostError::Invalid(err)
868876
})?;
869-
870877
// Handle indexer metadata result
871-
let indexer_metadata = indexer_result.map_err(|err| {
878+
let indexer_metadata: Vec<NodeMetadata> = indexer_result.map_err(|err| {
872879
error!("Fatal: failed to get indexer info: {:?}", err);
873880
PostError::Invalid(err)
874881
})?;
875-
876882
// Fetch metrics from ingestors and indexers concurrently
877-
let (ingestor_metrics, indexer_metrics) = future::join(
883+
let (querier_metrics, ingestor_metrics, indexer_metrics) = future::join3(
884+
fetch_nodes_metrics(querier_metadata),
878885
fetch_nodes_metrics(ingestor_metadata),
879886
fetch_nodes_metrics(indexer_metadata),
880887
)
@@ -883,6 +890,12 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
883890
// Combine all metrics
884891
let mut all_metrics = Vec::new();
885892

893+
// Add querier metrics
894+
match querier_metrics {
895+
Ok(metrics) => all_metrics.extend(metrics),
896+
Err(err) => return Err(err),
897+
}
898+
886899
// Add ingestor metrics
887900
match ingestor_metrics {
888901
Ok(metrics) => all_metrics.extend(metrics),

src/handlers/http/cluster/utils.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*
1717
*/
1818

19-
use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT};
19+
use crate::{
20+
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
21+
HTTP_CLIENT,
22+
};
2023
use actix_web::http::header;
2124
use chrono::{DateTime, Utc};
2225
use serde::{Deserialize, Serialize};
@@ -55,7 +58,7 @@ pub struct ClusterInfo {
5558
storage_path: String,
5659
error: Option<String>, // error message if the ingestor is not reachable
5760
status: Option<String>, // status message if the ingestor is reachable
58-
node_type: String,
61+
node_type: NodeType,
5962
}
6063

6164
impl ClusterInfo {
@@ -66,7 +69,7 @@ impl ClusterInfo {
6669
storage_path: String,
6770
error: Option<String>,
6871
status: Option<String>,
69-
node_type: &str,
72+
node_type: &NodeType,
7073
) -> Self {
7174
Self {
7275
domain_name: domain_name.to_string(),
@@ -75,7 +78,7 @@ impl ClusterInfo {
7578
storage_path,
7679
error,
7780
status,
78-
node_type: node_type.to_string(),
81+
node_type: node_type.clone(),
7982
}
8083
}
8184
}

0 commit comments

Comments
 (0)