Skip to content

feat: add querier json #1288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2d1aa22
generic metadata for ingestor and indexer
nikhilsinhaparseable Apr 7, 2025
b8243a2
add querier metadata
nikhilsinhaparseable Apr 7, 2025
59b9867
remove writing query metadata to storage
nikhilsinhaparseable Apr 7, 2025
dcd9b1b
remove unused
nikhilsinhaparseable Apr 7, 2025
a718691
clone from
nikhilsinhaparseable Apr 7, 2025
358af90
split into smaller functions
nikhilsinhaparseable Apr 7, 2025
1c1fb33
clippy suggestions
nikhilsinhaparseable Apr 7, 2025
d39fb47
helper function
nikhilsinhaparseable Apr 7, 2025
bef7984
add Prism mode
nikhilsinhaparseable Apr 7, 2025
4145db2
prism mode to option
nikhilsinhaparseable Apr 8, 2025
8ba0f90
add serde error to QueryError
nikhilsinhaparseable Apr 8, 2025
f5c2088
add custom error
nikhilsinhaparseable Apr 8, 2025
4e4c38f
overwrite remote for Prism mode
nikhilsinhaparseable Apr 8, 2025
2599aa8
make public
nikhilsinhaparseable Apr 8, 2025
166b2cb
make mod store_metadata public
nikhilsinhaparseable Apr 8, 2025
303b7c0
active/inactive indexers and queriers in analytics
nikhilsinhaparseable Apr 8, 2025
64ce915
add error to query error
nikhilsinhaparseable Apr 8, 2025
074b44c
add prism check for various methods
nikhilsinhaparseable Apr 8, 2025
8c164ff
update logic for check or load stream
nikhilsinhaparseable Apr 9, 2025
143e3f2
update logic for load stream
nikhilsinhaparseable Apr 9, 2025
30165c3
revert restriction for enterprise to oss
nikhilsinhaparseable Apr 10, 2025
c6159ff
add comment for check or load stream
nikhilsinhaparseable Apr 10, 2025
7a3a662
self in cluster info
nikhilsinhaparseable Apr 12, 2025
22f763e
add scheme to cluster info
nikhilsinhaparseable Apr 12, 2025
1429877
self metadata for cluster info and metrics
nikhilsinhaparseable Apr 14, 2025
2232af8
update url fetch
nikhilsinhaparseable Apr 14, 2025
1e4728f
add prism metadata for node info
nikhilsinhaparseable Apr 14, 2025
48b121c
refactor url parsing
nikhilsinhaparseable Apr 14, 2025
6f0b623
correct logic to delete node from cluster
nikhilsinhaparseable Apr 14, 2025
7bd0a38
add comments, remove hard coded values for node types
nikhilsinhaparseable Apr 14, 2025
c66fb74
delete from staging for ingestor
nikhilsinhaparseable Apr 14, 2025
264ccf3
replaced to get_stream
nikhilsinhaparseable Apr 14, 2025
70c0342
remove node metadata from parseable struct, add as OnceCell
nikhilsinhaparseable Apr 15, 2025
0049e1b
ingestor_id mapping
nikhilsinhaparseable Apr 15, 2025
87f9803
prism mode in cluster info and metrics
nikhilsinhaparseable Apr 15, 2025
ccf5ef2
convert node metadata to Arc
nikhilsinhaparseable Apr 15, 2025
a717313
coderabbitai suggestions
nikhilsinhaparseable Apr 15, 2025
b043a75
ingestor analytics response check
nikhilsinhaparseable Apr 15, 2025
8af3a4c
remove redundant code
nikhilsinhaparseable Apr 15, 2025
1841a68
process live ingestors in paralllel
nikhilsinhaparseable Apr 15, 2025
04aade9
Update src/handlers/http/cluster/mod.rs
nikhilsinhaparseable Apr 15, 2025
891e38c
rename queries to queriers
nikhilsinhaparseable Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
http::{
base_path_without_preceding_slash,
cluster::{self, utils::check_liveness},
modal::{NodeMetadata, NodeType},
},
STREAM_NAME_HEADER_KEY,
},
Expand Down Expand Up @@ -74,6 +75,10 @@ pub struct Report {
commit_hash: String,
active_ingestors: u64,
inactive_ingestors: u64,
active_indexers: u64,
inactive_indexers: u64,
active_queriers: u64,
inactive_queriers: u64,
stream_count: usize,
total_events_count: u64,
total_json_bytes: u64,
Expand Down Expand Up @@ -106,7 +111,32 @@ impl Report {
mem_total = info.total_memory();
}
let ingestor_metrics = fetch_ingestors_metrics().await?;
let mut active_indexers = 0;
let mut inactive_indexers = 0;
let mut active_queriers = 0;
let mut inactive_queriers = 0;

// check liveness of indexers
// get the count of active and inactive indexers
let indexer_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Indexer).await?;
for indexer in indexer_infos {
if check_liveness(&indexer.domain_name).await {
active_indexers += 1;
} else {
inactive_indexers += 1;
}
}

// check liveness of queriers
// get the count of active and inactive queriers
let query_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Querier).await?;
for query in query_infos {
if check_liveness(&query.domain_name).await {
active_queriers += 1;
} else {
inactive_queriers += 1;
}
}
Ok(Self {
deployment_id: storage::StorageMetadata::global().deployment_id,
uptime: upt,
Expand All @@ -122,6 +152,10 @@ impl Report {
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
inactive_ingestors: ingestor_metrics.1,
active_indexers,
inactive_indexers,
active_queriers,
inactive_queriers,
stream_count: ingestor_metrics.2,
total_events_count: ingestor_metrics.3,
total_json_bytes: ingestor_metrics.4,
Expand Down Expand Up @@ -224,11 +258,14 @@ async fn fetch_ingestors_metrics(
let mut vec = vec![];
let mut active_ingestors = 0u64;
let mut offline_ingestors = 0u64;
if PARSEABLE.options.mode == Mode::Query {

// for OSS, Query mode fetches the analytics report
// for Enterprise, Prism mode fetches the analytics report
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
// send analytics for ingest servers

// ingestor infos should be valid here, if not some thing is wrong
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor).await?;

for im in ingestor_infos {
if !check_liveness(&im.domain_name).await {
Expand All @@ -250,10 +287,14 @@ async fn fetch_ingestors_metrics(
.send()
.await
.expect("should respond");

let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
vec.push(data);
active_ingestors += 1;
// check if the response is valid
if let Ok(data) = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?) {
active_ingestors += 1;
vec.push(data);
} else {
offline_ingestors += 1;
continue;
}
}

node_metrics.accumulate(&mut vec);
Expand Down
22 changes: 16 additions & 6 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ use tracing::{error, info};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
handlers::{self, http::base_path_without_preceding_slash},
handlers::{
self,
http::{
base_path_without_preceding_slash,
modal::{NodeMetadata, NodeType},
},
},
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
option::Mode,
parseable::PARSEABLE,
Expand Down Expand Up @@ -335,15 +341,19 @@ pub async fn remove_manifest_from_snapshot(
meta.first_event_at = None;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

// retention is initiated from the querier
// request is forwarded to all ingestors to clean up their manifests
// no action required for the Index or Prism nodes
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Can't remove manifest from within Index server",
"Can't remove manifest from within Index or Prism server",
),
))),
}
Expand All @@ -356,7 +366,6 @@ pub async fn get_first_event(
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::Index => unimplemented!(),
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
Expand Down Expand Up @@ -406,8 +415,8 @@ pub async fn get_first_event(
}
}
Mode::Query => {
let ingestor_metadata =
handlers::http::cluster::get_ingestor_info()
let ingestor_metadata: Vec<NodeMetadata> =
handlers::http::cluster::get_node_info(NodeType::Ingestor)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
Expand Down Expand Up @@ -437,6 +446,7 @@ pub async fn get_first_event(
}
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
}
_ => {}
}

Ok(Some(first_event_at))
Expand Down
137 changes: 75 additions & 62 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ pub struct Options {
)]
pub indexer_endpoint: String,

#[arg(
long,
env = "P_QUERIER_ENDPOINT",
default_value = "",
help = "URL to connect to this specific querier. Default is the address of the server"
)]
pub querier_endpoint: String,

#[command(flatten)]
pub oidc: Option<OidcConfig>,

Expand Down Expand Up @@ -439,83 +447,88 @@ impl Options {
}
}

/// TODO: refactor and document
/// get the address of the server
/// based on the mode
pub fn get_url(&self, mode: Mode) -> Url {
let (endpoint, env_var) = match mode {
Mode::Ingest => {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
}
Mode::Index => {
if self.indexer_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
}
_ => panic!("Invalid mode"),
let endpoint = match mode {
Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"),
Mode::Query => self.get_endpoint(&self.querier_endpoint, "P_QUERIER_ENDPOINT"),
_ => return self.build_url(&self.address),
};

if endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
}

let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
self.parse_endpoint(&endpoint)
}

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
/// get the endpoint for the server
/// if env var is empty, use the address, else use the env var
fn get_endpoint(&self, endpoint: &str, env_var: &str) -> String {
if endpoint.is_empty() {
self.address.to_string()
} else {
if endpoint.starts_with("http") {
panic!(
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
endpoint, env_var
);
}
endpoint.to_string()
}
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();
/// parse the endpoint to get the address and port
/// if the address is an env var, resolve it
/// if the port is an env var, resolve it
fn parse_endpoint(&self, endpoint: &str) -> Url {
let addr_parts: Vec<&str> = endpoint.split(':').collect();

if addr_parts.len() != 2 {
panic!(
"Invalid value `{}`, please set the environment variable to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
endpoint
);
}

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();
let hostname = self.resolve_env_var(addr_parts[0]);
let port = self.resolve_env_var(addr_parts[1]);

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}
self.build_url(&format!("{}:{}", hostname, port))
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();
/// resolve the env var
/// if the env var is not set, panic
/// if the env var is set, return the value
fn resolve_env_var(&self, value: &str) -> String {
if let Some(env_var) = value.strip_prefix('$') {
let resolved_value = env::var(env_var).unwrap_or_else(|_| {
panic!(
"The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.",
env_var
);
});

if port.is_empty() {
if resolved_value.starts_with("http") {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
resolved_value, env_var
);
}

resolved_value
} else {
value.to_string()
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
/// build the url from the address
fn build_url(&self, address: &str) -> Url {
format!("{}://{}", self.get_scheme(), address)
.parse::<Url>()
.expect("Valid URL")
.unwrap_or_else(|err| {
panic!(
"{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.",
address
);
})
}
}
5 changes: 3 additions & 2 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use futures_util::{Future, TryFutureExt};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::cluster::get_node_info;
use crate::handlers::http::modal::{NodeMetadata, NodeType};
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
Expand Down Expand Up @@ -179,7 +180,7 @@ impl FlightService for AirServiceImpl {
})
.to_string();

let ingester_metadatas = get_ingestor_info()
let ingester_metadatas: Vec<NodeMetadata> = get_node_info(NodeType::Ingestor)
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];
Expand Down
7 changes: 1 addition & 6 deletions src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use serde_json::{json, Value};

use crate::{
about::{self, get_latest_release},
option::Mode,
parseable::PARSEABLE,
storage::StorageMetadata,
};
Expand Down Expand Up @@ -63,11 +62,7 @@ pub async fn about() -> Json<Value> {
let commit = current_release.commit_hash;
let deployment_id = meta.deployment_id.to_string();
let mode = PARSEABLE.get_server_mode_string();
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.options.staging_dir().display().to_string()
};
let staging = PARSEABLE.options.staging_dir().display().to_string();
let grpc_port = PARSEABLE.options.grpc_port;

let store_endpoint = PARSEABLE.storage.get_endpoint();
Expand Down
Loading
Loading