diff --git a/src/analytics.rs b/src/analytics.rs index 42af72d36..329a20632 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -36,6 +36,7 @@ use crate::{ http::{ base_path_without_preceding_slash, cluster::{self, utils::check_liveness}, + modal::{NodeMetadata, NodeType}, }, STREAM_NAME_HEADER_KEY, }, @@ -74,6 +75,10 @@ pub struct Report { commit_hash: String, active_ingestors: u64, inactive_ingestors: u64, + active_indexers: u64, + inactive_indexers: u64, + active_queriers: u64, + inactive_queriers: u64, stream_count: usize, total_events_count: u64, total_json_bytes: u64, @@ -106,7 +111,32 @@ impl Report { mem_total = info.total_memory(); } let ingestor_metrics = fetch_ingestors_metrics().await?; + let mut active_indexers = 0; + let mut inactive_indexers = 0; + let mut active_queriers = 0; + let mut inactive_queriers = 0; + + // check liveness of indexers + // get the count of active and inactive indexers + let indexer_infos: Vec = cluster::get_node_info(NodeType::Indexer).await?; + for indexer in indexer_infos { + if check_liveness(&indexer.domain_name).await { + active_indexers += 1; + } else { + inactive_indexers += 1; + } + } + // check liveness of queriers + // get the count of active and inactive queriers + let query_infos: Vec = cluster::get_node_info(NodeType::Querier).await?; + for query in query_infos { + if check_liveness(&query.domain_name).await { + active_queriers += 1; + } else { + inactive_queriers += 1; + } + } Ok(Self { deployment_id: storage::StorageMetadata::global().deployment_id, uptime: upt, @@ -122,6 +152,10 @@ impl Report { commit_hash: current().commit_hash, active_ingestors: ingestor_metrics.0, inactive_ingestors: ingestor_metrics.1, + active_indexers, + inactive_indexers, + active_queriers, + inactive_queriers, stream_count: ingestor_metrics.2, total_events_count: ingestor_metrics.3, total_json_bytes: ingestor_metrics.4, @@ -224,11 +258,14 @@ async fn fetch_ingestors_metrics( let mut vec = vec![]; let mut active_ingestors = 0u64; let mut offline_ingestors = 0u64; - if PARSEABLE.options.mode == Mode::Query { + + // for OSS, Query mode fetches the analytics report + // for Enterprise, Prism mode fetches the analytics report + if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { // send analytics for ingest servers // ingestor infos should be valid here, if not some thing is wrong - let ingestor_infos = cluster::get_ingestor_info().await.unwrap(); + let ingestor_infos: Vec = cluster::get_node_info(NodeType::Ingestor).await?; for im in ingestor_infos { if !check_liveness(&im.domain_name).await { @@ -250,10 +287,14 @@ async fn fetch_ingestors_metrics( .send() .await .expect("should respond"); - - let data = serde_json::from_slice::(&resp.bytes().await?)?; - vec.push(data); - active_ingestors += 1; + // check if the response is valid + if let Ok(data) = serde_json::from_slice::(&resp.bytes().await?) { + active_ingestors += 1; + vec.push(data); + } else { + offline_ingestors += 1; + continue; + } } node_metrics.accumulate(&mut vec); diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index d44ac877b..8db1a49b4 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -28,7 +28,13 @@ use tracing::{error, info}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, - handlers::{self, http::base_path_without_preceding_slash}, + handlers::{ + self, + http::{ + base_path_without_preceding_slash, + modal::{NodeMetadata, NodeType}, + }, + }, metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}, option::Mode, parseable::PARSEABLE, @@ -335,15 +341,19 @@ pub async fn remove_manifest_from_snapshot( meta.first_event_at = None; storage.put_snapshot(stream_name, meta.snapshot).await?; } + + // retention is initiated from the querier + // request is forwarded to all ingestors to clean up their manifests + // no action required for the Index or Prism nodes match PARSEABLE.options.mode { Mode::All | Mode::Ingest => { Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?) } Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?), - Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new( + Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new( std::io::Error::new( std::io::ErrorKind::Unsupported, - "Can't remove manifest from within Index server", + "Can't remove manifest from within Index or Prism server", ), ))), } @@ -356,7 +366,6 @@ pub async fn get_first_event( ) -> Result, ObjectStorageError> { let mut first_event_at: String = String::default(); match PARSEABLE.options.mode { - Mode::Index => unimplemented!(), Mode::All | Mode::Ingest => { // get current snapshot let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event(); @@ -406,8 +415,8 @@ pub async fn get_first_event( } } Mode::Query => { - let ingestor_metadata = - handlers::http::cluster::get_ingestor_info() + let ingestor_metadata: Vec = + handlers::http::cluster::get_node_info(NodeType::Ingestor) .await .map_err(|err| { error!("Fatal: failed to get ingestor info: {:?}", err); @@ -437,6 +446,7 @@ pub async fn get_first_event( } first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string(); } + _ => {} } Ok(Some(first_event_at)) diff --git a/src/cli.rs b/src/cli.rs index ad2db244d..976e96969 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -328,6 +328,14 @@ pub struct Options { )] pub indexer_endpoint: String, + #[arg( + long, + env = "P_QUERIER_ENDPOINT", + default_value = "", + help = "URL to connect to this specific querier. Default is the address of the server" + )] + pub querier_endpoint: String, + #[command(flatten)] pub oidc: Option, @@ -439,83 +447,88 @@ impl Options { } } - /// TODO: refactor and document + /// get the address of the server + /// based on the mode pub fn get_url(&self, mode: Mode) -> Url { - let (endpoint, env_var) = match mode { - Mode::Ingest => { - if self.ingestor_endpoint.is_empty() { - return format!( - "{}://{}", - self.get_scheme(), - self.address - ) - .parse::() // if the value was improperly set, this will panic before hand - .unwrap_or_else(|err| { - panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) - }); - } - (&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT") - } - Mode::Index => { - if self.indexer_endpoint.is_empty() { - return format!( - "{}://{}", - self.get_scheme(), - self.address - ) - .parse::() // if the value was improperly set, this will panic before hand - .unwrap_or_else(|err| { - panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) - }); - } - (&self.indexer_endpoint, "P_INDEXER_ENDPOINT") - } - _ => panic!("Invalid mode"), + let endpoint = match mode { + Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"), + Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"), + Mode::Query => self.get_endpoint(&self.querier_endpoint, "P_QUERIER_ENDPOINT"), + _ => return self.build_url(&self.address), }; - if endpoint.starts_with("http") { - panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint); - } - - let addr_from_env = endpoint.split(':').collect::>(); + self.parse_endpoint(&endpoint) + } - if addr_from_env.len() != 2 { - panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint); + /// get the endpoint for the server + /// if env var is empty, use the address, else use the env var + fn get_endpoint(&self, endpoint: &str, env_var: &str) -> String { + if endpoint.is_empty() { + self.address.to_string() + } else { + if endpoint.starts_with("http") { + panic!( + "Invalid value `{}`, please set the environment variable `{}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", + endpoint, env_var + ); + } + endpoint.to_string() } + } - let mut hostname = addr_from_env[0].to_string(); - let mut port = addr_from_env[1].to_string(); + /// parse the endpoint to get the address and port + /// if the address is an env var, resolve it + /// if the port is an env var, resolve it + fn parse_endpoint(&self, endpoint: &str) -> Url { + let addr_parts: Vec<&str> = endpoint.split(':').collect(); + + if addr_parts.len() != 2 { + panic!( + "Invalid value `{}`, please set the environment variable to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", + endpoint + ); + } - // if the env var value fits the pattern $VAR_NAME:$VAR_NAME - // fetch the value from the specified env vars - if hostname.starts_with('$') { - let var_hostname = hostname[1..].to_string(); - hostname = env::var(&var_hostname).unwrap_or_default(); + let hostname = self.resolve_env_var(addr_parts[0]); + let port = self.resolve_env_var(addr_parts[1]); - if hostname.is_empty() { - panic!("The environement variable `{}` is not set, please set as without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname); - } - if hostname.starts_with("http") { - panic!("Invalid value `{}`, please set the environement variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname); - } else { - hostname = format!("{}://{}", self.get_scheme(), hostname); - } - } + self.build_url(&format!("{}:{}", hostname, port)) + } - if port.starts_with('$') { - let var_port = port[1..].to_string(); - port = env::var(&var_port).unwrap_or_default(); + /// resolve the env var + /// if the env var is not set, panic + /// if the env var is set, return the value + fn resolve_env_var(&self, value: &str) -> String { + if let Some(env_var) = value.strip_prefix('$') { + let resolved_value = env::var(env_var).unwrap_or_else(|_| { + panic!( + "The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.", + env_var + ); + }); - if port.is_empty() { + if resolved_value.starts_with("http") { panic!( - "Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.", - var_port + "Invalid value `{}`, please set the environment variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", + resolved_value, env_var ); } + + resolved_value + } else { + value.to_string() } + } - format!("{}://{}:{}", self.get_scheme(), hostname, port) + /// build the url from the address + fn build_url(&self, address: &str) -> Url { + format!("{}://{}", self.get_scheme(), address) .parse::() - .expect("Valid URL") + .unwrap_or_else(|err| { + panic!( + "{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", + address + ); + }) } } diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 0b03dc9e2..8a5551c2f 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -33,7 +33,8 @@ use futures_util::{Future, TryFutureExt}; use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; -use crate::handlers::http::cluster::get_ingestor_info; +use crate::handlers::http::cluster::get_node_info; +use crate::handlers::http::modal::{NodeMetadata, NodeType}; use crate::handlers::http::query::{into_query, update_schema_when_distributed}; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; @@ -179,7 +180,7 @@ impl FlightService for AirServiceImpl { }) .to_string(); - let ingester_metadatas = get_ingestor_info() + let ingester_metadatas: Vec = get_node_info(NodeType::Ingestor) .await .map_err(|err| Status::failed_precondition(err.to_string()))?; let mut minute_result: Vec = vec![]; diff --git a/src/handlers/http/about.rs b/src/handlers/http/about.rs index 675d0182e..57bf0197a 100644 --- a/src/handlers/http/about.rs +++ b/src/handlers/http/about.rs @@ -21,7 +21,6 @@ use serde_json::{json, Value}; use crate::{ about::{self, get_latest_release}, - option::Mode, parseable::PARSEABLE, storage::StorageMetadata, }; @@ -63,11 +62,7 @@ pub async fn about() -> Json { let commit = current_release.commit_hash; let deployment_id = meta.deployment_id.to_string(); let mode = PARSEABLE.get_server_mode_string(); - let staging = if PARSEABLE.options.mode == Mode::Query { - "".to_string() - } else { - PARSEABLE.options.staging_dir().display().to_string() - }; + let staging = PARSEABLE.options.staging_dir().display().to_string(); let grpc_port = PARSEABLE.options.grpc_port; let store_endpoint = PARSEABLE.storage.get_endpoint(); diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index a452dedca..038a1654b 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -20,6 +20,7 @@ pub mod utils; use futures::{future, stream, StreamExt}; use std::collections::HashSet; +use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -54,18 +55,52 @@ use crate::HTTP_CLIENT; use super::base_path_without_preceding_slash; use super::ingest::PostError; use super::logstream::error::StreamError; -use super::modal::{IndexerMetadata, IngestorMetadata, Metadata}; +use super::modal::{ + IndexerMetadata, IngestorMetadata, Metadata, NodeMetadata, NodeType, QuerierMetadata, +}; use super::rbac::RBACError; use super::role::RoleError; -type IngestorMetadataArr = Vec; - -type IndexerMetadataArr = Vec; - pub const INTERNAL_STREAM_NAME: &str = "pmeta"; const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); +pub async fn for_each_live_ingestor(api_fn: F) -> Result<(), E> +where + F: Fn(NodeMetadata) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send, + E: From + Send + Sync + 'static, +{ + let ingestor_infos: Vec = + get_node_info(NodeType::Ingestor).await.map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + E::from(err) + })?; + + let mut live_ingestors = Vec::new(); + for ingestor in ingestor_infos { + if utils::check_liveness(&ingestor.domain_name).await { + live_ingestors.push(ingestor); + } else { + warn!("Ingestor {} is not live", ingestor.domain_name); + } + } + + // Process all live ingestors in parallel + let results = futures::future::join_all(live_ingestors.into_iter().map(|ingestor| { + let api_fn = api_fn.clone(); + async move { api_fn(ingestor).await } + })) + .await; + + // collect results + for result in results { + result?; + } + + Ok(()) +} + // forward the create/update stream request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( headers: HeaderMap, @@ -77,68 +112,63 @@ pub async fn sync_streams_with_ingestors( for (key, value) in headers.iter() { reqwest_headers.insert(key.clone(), value.clone()); } - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::Anyhow(err) - })?; - for ingestor in ingestor_infos { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } - let url = format!( - "{}{}/logstream/{}/sync", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - let res = HTTP_CLIENT - .put(url) - .headers(reqwest_headers.clone()) - .header(header::AUTHORIZATION, &ingestor.token) - .body(body.clone()) - .send() - .await - .map_err(|err| { - error!( - "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err - ); - StreamError::Network(err) - })?; + let body_clone = body.clone(); + let stream_name = stream_name.to_string(); + let reqwest_headers_clone = reqwest_headers.clone(); - if !res.status().is_success() { - error!( - "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", + for_each_live_ingestor( + move |ingestor| { + let url = format!( + "{}{}/logstream/{}/sync", ingestor.domain_name, - res.text().await + base_path_without_preceding_slash(), + stream_name ); + let headers = reqwest_headers_clone.clone(); + let body = body_clone.clone(); + async move { + let res = HTTP_CLIENT + .put(url) + .headers(headers) + .header(header::AUTHORIZATION, &ingestor.token) + .body(body) + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + StreamError::Network(err) + })?; + + if !res.status().is_success() { + error!( + "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + Ok(()) + } } - } - - Ok(()) + ).await } // forward the role update request to all ingestors to keep them in sync pub async fn sync_users_with_roles_with_ingestors( - username: &String, + username: &str, role: &HashSet, ) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - RBACError::Anyhow(err) - })?; - - let role = to_vec(&role.clone()).map_err(|err| { + let role_data = to_vec(&role.clone()).map_err(|err| { error!("Fatal: failed to serialize role: {:?}", err); RBACError::SerdeError(err) })?; - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + + let username = username.to_owned(); + + for_each_live_ingestor(move |ingestor| { let url = format!( "{}{}/user/{}/role/sync", ingestor.domain_name, @@ -146,45 +176,43 @@ pub async fn sync_users_with_roles_with_ingestors( username ); - let res = HTTP_CLIENT - .put(url) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .body(role.clone()) - .send() - .await - .map_err(|err| { + let role_data = role_data.clone(); + + async move { + let res = HTTP_CLIENT + .put(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .body(role_data) + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { error!( - "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); - RBACError::Network(err) - })?; + } - if !res.status().is_success() { - error!( - "failed to forward request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await - ); + Ok(()) } - } - - Ok(()) + }) + .await } // forward the delete user request to all ingestors to keep them in sync -pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - RBACError::Anyhow(err) - })?; +pub async fn sync_user_deletion_with_ingestors(username: &str) -> Result<(), RBACError> { + let username = username.to_owned(); - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + for_each_live_ingestor(move |ingestor| { let url = format!( "{}{}/user/{}/sync", ingestor.domain_name, @@ -192,29 +220,32 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), username ); - let res = HTTP_CLIENT - .delete(url) - .header(header::AUTHORIZATION, &ingestor.token) - .send() - .await - .map_err(|err| { + async move { + let res = HTTP_CLIENT + .delete(url) + .header(header::AUTHORIZATION, &ingestor.token) + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { error!( - "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); - RBACError::Network(err) - })?; + } - if !res.status().is_success() { - error!( - "failed to forward request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await - ); + Ok(()) } - } - - Ok(()) + }) + .await } // forward the create user request to all ingestors to keep them in sync @@ -222,11 +253,6 @@ pub async fn sync_user_creation_with_ingestors( user: User, role: &Option>, ) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - RBACError::Anyhow(err) - })?; - let mut user = user.clone(); if let Some(role) = role { @@ -234,16 +260,14 @@ pub async fn sync_user_creation_with_ingestors( } let username = user.username(); - let user = to_vec(&user).map_err(|err| { + let user_data = to_vec(&user).map_err(|err| { error!("Fatal: failed to serialize user: {:?}", err); RBACError::SerdeError(err) })?; - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + let username = username.to_string(); + + for_each_live_ingestor(move |ingestor| { let url = format!( "{}{}/user/{}/sync", ingestor.domain_name, @@ -251,45 +275,43 @@ pub async fn sync_user_creation_with_ingestors( username ); - let res = HTTP_CLIENT - .post(url) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .body(user.clone()) - .send() - .await - .map_err(|err| { + let user_data = user_data.clone(); + + async move { + let res = HTTP_CLIENT + .post(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .body(user_data) + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { error!( - "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); - RBACError::Network(err) - })?; + } - if !res.status().is_success() { - error!( - "failed to forward request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await - ); + Ok(()) } - } - - Ok(()) + }) + .await } // forward the password reset request to all ingestors to keep them in sync -pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - RBACError::Anyhow(err) - })?; +pub async fn sync_password_reset_with_ingestors(username: &str) -> Result<(), RBACError> { + let username = username.to_owned(); - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + for_each_live_ingestor(move |ingestor| { let url = format!( "{}{}/user/{}/generate-new-password/sync", ingestor.domain_name, @@ -297,30 +319,33 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), username ); - let res = HTTP_CLIENT - .post(url) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await - .map_err(|err| { + async move { + let res = HTTP_CLIENT + .post(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { error!( - "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); - RBACError::Network(err) - })?; + } - if !res.status().is_success() { - error!( - "failed to forward request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await - ); + Ok(()) } - } - - Ok(()) + }) + .await } // forward the put role request to all ingestors to keep them in sync @@ -328,16 +353,7 @@ pub async fn sync_role_update_with_ingestors( name: String, privileges: Vec, ) -> Result<(), RoleError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - RoleError::Anyhow(err) - })?; - - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + for_each_live_ingestor(move |ingestor| { let url = format!( "{}{}/role/{}/sync", ingestor.domain_name, @@ -345,31 +361,36 @@ pub async fn sync_role_update_with_ingestors( name ); - let res = HTTP_CLIENT - .put(url) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .json(&privileges) - .send() - .await - .map_err(|err| { + let privileges = privileges.clone(); + + async move { + let res = HTTP_CLIENT + .put(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .json(&privileges) + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + RoleError::Network(err) + })?; + + if !res.status().is_success() { error!( - "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); - RoleError::Network(err) - })?; + } - if !res.status().is_success() { - error!( - "failed to forward request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await - ); + Ok(()) } - } - - Ok(()) + }) + .await } pub fn fetch_daily_stats_from_ingestors( @@ -543,13 +564,35 @@ pub async fn send_retention_cleanup_request( Ok(first_event_at) } +/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism) pub async fn get_cluster_info() -> Result { - // Get ingestor and indexer metadata concurrently - let (ingestor_result, indexer_result) = - future::join(get_ingestor_info(), get_indexer_info()).await; + // Get querier, ingestor and indexer metadata concurrently + let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4( + get_node_info(NodeType::Prism), + get_node_info(NodeType::Querier), + get_node_info(NodeType::Ingestor), + get_node_info(NodeType::Indexer), + ) + .await; + + // Handle prism metadata result + let prism_metadata: Vec = prism_result + .map_err(|err| { + error!("Fatal: failed to get prism info: {:?}", err); + PostError::Invalid(err) + }) + .map_err(|err| StreamError::Anyhow(err.into()))?; + + // Handle querier metadata result + let querier_metadata: Vec = querier_result + .map_err(|err| { + error!("Fatal: failed to get querier info: {:?}", err); + PostError::Invalid(err) + }) + .map_err(|err| StreamError::Anyhow(err.into()))?; // Handle ingestor metadata result - let ingestor_metadata = ingestor_result + let ingestor_metadata: Vec = ingestor_result .map_err(|err| { error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err) @@ -557,29 +600,34 @@ pub async fn get_cluster_info() -> Result { .map_err(|err| StreamError::Anyhow(err.into()))?; // Handle indexer metadata result - let indexer_metadata = indexer_result + let indexer_metadata: Vec = indexer_result .map_err(|err| { error!("Fatal: failed to get indexer info: {:?}", err); PostError::Invalid(err) }) .map_err(|err| StreamError::Anyhow(err.into()))?; - // Fetch info for both node types concurrently - let (ingestor_infos, indexer_infos) = future::join( + // Fetch info for all nodes concurrently + let (prism_infos, querier_infos, ingestor_infos, indexer_infos) = future::join4( + fetch_nodes_info(prism_metadata), + fetch_nodes_info(querier_metadata), fetch_nodes_info(ingestor_metadata), fetch_nodes_info(indexer_metadata), ) .await; - // Combine results from both node types + // Combine results from all node types let mut infos = Vec::new(); + infos.extend(prism_infos?); + infos.extend(querier_infos?); infos.extend(ingestor_infos?); infos.extend(indexer_infos?); - Ok(actix_web::HttpResponse::Ok().json(infos)) } -/// Fetches info for a single node (ingestor or indexer) +/// Fetches info for a single node +/// call the about endpoint of the node +/// construct the ClusterInfo struct and return it async fn fetch_node_info(node: &T) -> Result { let uri = Url::parse(&format!( "{}{}/about", @@ -671,42 +719,41 @@ pub async fn get_cluster_metrics() -> Result { Ok(actix_web::HttpResponse::Ok().json(dresses)) } -pub async fn get_ingestor_info() -> anyhow::Result { +/// get node info for a specific node type +/// this is used to get the node info for ingestor, indexer, querier and prism +/// it will return the metadata for all nodes of that type +pub async fn get_node_info( + node_type: NodeType, +) -> anyhow::Result> { let store = PARSEABLE.storage.get_object_store(); - let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); - let arr = store - .get_objects( - Some(&root_path), - Box::new(|file_name| file_name.starts_with("ingestor")), - ) - .await? - .iter() - // this unwrap will most definateley shoot me in the foot later - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) - .collect_vec(); + let prefix_owned = node_type.to_string(); - Ok(arr) -} - -pub async fn get_indexer_info() -> anyhow::Result { - let store = PARSEABLE.storage.get_object_store(); - - let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); - let arr = store + let metadata = store .get_objects( Some(&root_path), - Box::new(|file_name| file_name.starts_with("indexer")), + Box::new(move |file_name| file_name.starts_with(&prefix_owned)), // Use the owned copy ) .await? .iter() - // this unwrap will most definateley shoot me in the foot later - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) - .collect_vec(); + .filter_map(|x| { + match serde_json::from_slice::(x) { + Ok(val) => Some(val), + Err(e) => { + error!("Failed to parse node metadata: {:?}", e); + None + } + } + }) + .collect(); - Ok(arr) + Ok(metadata) } - +/// remove a node from the cluster +/// check liveness of the node +/// if the node is live, return an error +/// if the node is not live, remove the node from the cluster +/// remove the node metadata from the object store pub async fn remove_node(node_url: Path) -> Result { let domain_name = to_url_string(node_url.into_inner()); @@ -719,33 +766,46 @@ pub async fn remove_node(node_url: Path) -> Result(&object_store, &domain_name).await?; + remove_node_metadata::(&object_store, &domain_name, NodeType::Ingestor) + .await?; // Delete indexer metadata let removed_indexer = - remove_node_metadata::(&object_store, &domain_name).await?; - - let msg = if removed_ingestor || removed_indexer { - format!("node {} removed successfully", domain_name) - } else { - format!("node {} is not found", domain_name) - }; - - info!("{}", &msg); - Ok((msg, StatusCode::OK)) + remove_node_metadata::(&object_store, &domain_name, NodeType::Indexer) + .await?; + + // Delete querier metadata + let removed_querier = + remove_node_metadata::(&object_store, &domain_name, NodeType::Querier) + .await?; + + // Delete prism metadata + let removed_prism = + remove_node_metadata::(&object_store, &domain_name, NodeType::Prism).await?; + + if removed_ingestor || removed_indexer || removed_querier || removed_prism { + return Ok(( + format!("node {} removed successfully", domain_name), + StatusCode::OK, + )); + } + Err(PostError::Invalid(anyhow::anyhow!( + "node {} not found", + domain_name + ))) } -// Helper function to remove a specific type of node metadata +/// Removes node metadata from the object store +/// Returns true if the metadata was removed, false if it was not found async fn remove_node_metadata( object_store: &Arc, domain_name: &str, + node_type: NodeType, ) -> Result { - let node_type = T::default().node_type().to_string(); - let metadatas = object_store .get_objects( Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)), - Box::new(move |file_name| file_name.starts_with(&node_type)), + Box::new(move |file_name| file_name.starts_with(&node_type.to_string())), ) .await?; @@ -774,7 +834,10 @@ async fn remove_node_metadata( } } -/// Fetches metrics from a node (ingestor or indexer) +/// Fetches metrics for a single node +/// This function is used to fetch metrics from a single node +/// It checks if the node is live and then fetches the metrics +/// If the node is not live, it returns None async fn fetch_node_metrics(node: &T) -> Result, PostError> where T: Metadata + Send + Sync + 'static, @@ -836,6 +899,9 @@ where T: Metadata + Send + Sync + 'static, { let nodes_len = nodes.len(); + if nodes_len == 0 { + return Ok(vec![]); + } let results = stream::iter(nodes) .map(|node| async move { fetch_node_metrics(&node).await }) .buffer_unordered(nodes_len) // No concurrency limit @@ -855,26 +921,45 @@ where Ok(metrics) } -/// Main function to fetch all cluster metrics, parallelized and refactored +/// Main function to fetch cluster metrics +/// fetches node info for all nodes +/// fetches metrics for all nodes +/// combines all metrics into a single vector async fn fetch_cluster_metrics() -> Result, PostError> { // Get ingestor and indexer metadata concurrently - let (ingestor_result, indexer_result) = - future::join(get_ingestor_info(), get_indexer_info()).await; + let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4( + get_node_info(NodeType::Prism), + get_node_info(NodeType::Querier), + get_node_info(NodeType::Ingestor), + get_node_info(NodeType::Indexer), + ) + .await; + + // Handle prism metadata result + let prism_metadata: Vec = prism_result.map_err(|err| { + error!("Fatal: failed to get prism info: {:?}", err); + PostError::Invalid(err) + })?; + // Handle querier metadata result + let querier_metadata: Vec = querier_result.map_err(|err| { + error!("Fatal: failed to get querier info: {:?}", err); + PostError::Invalid(err) + })?; // Handle ingestor metadata result - let ingestor_metadata = ingestor_result.map_err(|err| { + let ingestor_metadata: Vec = ingestor_result.map_err(|err| { error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err) })?; - // Handle indexer metadata result - let indexer_metadata = indexer_result.map_err(|err| { + let indexer_metadata: Vec = indexer_result.map_err(|err| { error!("Fatal: failed to get indexer info: {:?}", err); PostError::Invalid(err) })?; - // Fetch metrics from ingestors and indexers concurrently - let (ingestor_metrics, indexer_metrics) = future::join( + let (prism_metrics, querier_metrics, ingestor_metrics, indexer_metrics) = future::join4( + fetch_nodes_metrics(prism_metadata), + fetch_nodes_metrics(querier_metadata), fetch_nodes_metrics(ingestor_metadata), fetch_nodes_metrics(indexer_metadata), ) @@ -883,6 +968,18 @@ async fn fetch_cluster_metrics() -> Result, PostError> { // Combine all metrics let mut all_metrics = Vec::new(); + // Add prism metrics + match prism_metrics { + Ok(metrics) => all_metrics.extend(metrics), + Err(err) => return Err(err), + } + + // Add querier metrics + match querier_metrics { + Ok(metrics) => all_metrics.extend(metrics), + Err(err) => return Err(err), + } + // Add ingestor metrics match ingestor_metrics { Ok(metrics) => all_metrics.extend(metrics), diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 54d451cb7..a4038aa5c 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -16,7 +16,10 @@ * */ -use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT}; +use crate::{ + handlers::http::{base_path_without_preceding_slash, modal::NodeType}, + HTTP_CLIENT, +}; use actix_web::http::header; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -55,7 +58,7 @@ pub struct ClusterInfo { storage_path: String, error: Option, // error message if the ingestor is not reachable status: Option, // status message if the ingestor is reachable - node_type: String, + node_type: NodeType, } impl ClusterInfo { @@ -66,7 +69,7 @@ impl ClusterInfo { storage_path: String, error: Option, status: Option, - node_type: &str, + node_type: &NodeType, ) -> Self { Self { domain_name: domain_name.to_string(), @@ -75,7 +78,7 @@ impl ClusterInfo { storage_path, error, status, - node_type: node_type.to_string(), + node_type: node_type.clone(), } } } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 6d095d7b0..d4949b61b 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -47,7 +47,7 @@ use tracing::warn; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); // Error out if stream doesn't exist in memory, or in the case of query node, in storage as well - if PARSEABLE.check_or_load_stream(&stream_name).await { + if !PARSEABLE.check_or_load_stream(&stream_name).await { return Err(StreamNotFound(stream_name).into()); } @@ -124,7 +124,7 @@ pub async fn get_schema(stream_name: Path) -> Result) -> Result) -> Result) -> Result { + Mode::Index | Mode::Prism => { let fut = self.service.call(req); Box::pin(async move { diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index aa2e0a02d..3d674d20b 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -19,13 +19,15 @@ use actix_cors::Cors; use actix_web::Responder; use arrow_schema::Schema; +use cluster::get_node_info; use http::StatusCode; use itertools::Itertools; +use modal::{NodeMetadata, NodeType}; use serde_json::Value; use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; -use self::{cluster::get_ingestor_info, query::Query}; +use self::query::Query; pub mod about; pub mod alerts; @@ -108,7 +110,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; - let ima = get_ingestor_info().await?; + let ima: Vec = get_node_info(NodeType::Ingestor).await?; for im in ima.iter() { let uri = format!( diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 4872a6476..b5ab42a40 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -16,6 +16,8 @@ * */ +use std::fs; + use actix_web::{ web::{Json, Path}, HttpRequest, Responder, @@ -69,6 +71,16 @@ pub async fn delete(stream_name: Path) -> Result> = OnceCell::const_new(); pub struct IngestServer; #[async_trait] @@ -98,6 +101,15 @@ impl ParseableServer for IngestServer { prometheus: &PrometheusMetrics, shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { + // write the ingestor metadata to storage + INGESTOR_META + .get_or_init(|| async { + IngestorMetadata::load_node_metadata(NodeType::Ingestor) + .await + .expect("Ingestor Metadata should be set in ingestor mode") + }) + .await; + PARSEABLE.storage.register_store_metrics(prometheus); migration::run_migration(&PARSEABLE).await?; @@ -108,9 +120,6 @@ impl ParseableServer for IngestServer { tokio::spawn(airplane::server()); - // write the ingestor metadata to storage - PARSEABLE.store_metadata(Mode::Ingest).await?; - // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index a0774f36b..3504ef0fa 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -16,7 +16,7 @@ * */ -use std::{path::Path, sync::Arc}; +use std::{fmt, path::Path, sync::Arc}; use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; @@ -42,7 +42,7 @@ use crate::{ parseable::PARSEABLE, storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, - utils::{get_indexer_id, get_ingestor_id}, + utils::get_node_id, }; use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; @@ -58,7 +58,7 @@ pub mod utils; pub type OpenIdClient = Arc>; // to be decided on what the Default version should be -pub const DEFAULT_VERSION: &str = "v3"; +pub const DEFAULT_VERSION: &str = "v4"; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -199,26 +199,69 @@ pub async fn load_on_init() -> anyhow::Result<()> { Ok(()) } +/// NodeType represents the type of node in the cluster +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Default)] +#[serde(rename_all = "lowercase")] +pub enum NodeType { + #[default] + Ingestor, + Indexer, + Querier, + Prism, + All, +} + +impl NodeType { + fn as_str(&self) -> &'static str { + match self { + NodeType::Ingestor => "ingestor", + NodeType::Indexer => "indexer", + NodeType::Querier => "querier", + NodeType::Prism => "prism", + NodeType::All => "all", + } + } + + fn to_mode(&self) -> Mode { + match self { + NodeType::Ingestor => Mode::Ingest, + NodeType::Indexer => Mode::Index, + NodeType::Querier => Mode::Query, + NodeType::Prism => Mode::Prism, + NodeType::All => Mode::All, + } + } +} + +impl fmt::Display for NodeType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + #[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)] -pub struct IngestorMetadata { +pub struct NodeMetadata { pub version: String, pub port: String, pub domain_name: String, pub bucket_name: String, pub token: String, - pub ingestor_id: String, + pub node_id: String, pub flight_port: String, + pub node_type: NodeType, } -impl IngestorMetadata { +impl NodeMetadata { + #[allow(clippy::too_many_arguments)] pub fn new( port: String, domain_name: String, bucket_name: String, username: &str, password: &str, - ingestor_id: String, + node_id: String, flight_port: String, + node_type: NodeType, ) -> Self { let token = base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); @@ -228,319 +271,274 @@ impl IngestorMetadata { version: DEFAULT_VERSION.to_string(), bucket_name, token: format!("Basic {token}"), - ingestor_id, + node_id, flight_port, + node_type, } } - /// Capture metadata information by either loading it from staging or starting fresh - pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc { - // all the files should be in the staging directory root - let entries = options - .staging_dir() - .read_dir() - .expect("Couldn't read from file"); - let url = options.get_url(Mode::Ingest); + pub async fn load_node_metadata(node_type: NodeType) -> anyhow::Result> { + let staging_path = PARSEABLE.options.staging_dir(); + let node_type_str = node_type.as_str(); + + // Attempt to load metadata from staging + if let Some(meta) = Self::load_from_staging(staging_path, node_type_str, &PARSEABLE.options) + { + return Self::process_and_store_metadata(meta, staging_path, node_type).await; + } + + // Attempt to load metadata from storage + let storage_metas = Self::load_from_storage(node_type_str.to_string()).await; + let url = PARSEABLE.options.get_url(node_type.to_mode()); let port = url.port().unwrap_or(80).to_string(); let url = url.to_string(); - let Options { - username, password, .. - } = options; - let staging_path = options.staging_dir(); - let flight_port = options.flight_port.to_string(); - for entry in entries { - // cause the staging directory will have only one file with ingestor in the name - // so the JSON Parse should not error unless the file is corrupted - let path = entry.expect("Should be a directory entry").path(); - if !path - .file_name() - .and_then(|s| s.to_str()) - .is_some_and(|s| s.contains("ingestor")) - { - continue; - } - - // get the ingestor metadata from staging - let bytes = std::fs::read(path).expect("File should be present"); - let mut meta = - Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata"); - - // compare url endpoint and port, update - if meta.domain_name != url { - info!( - "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, url - ); - meta.domain_name = url; + for storage_meta in storage_metas { + if storage_meta.domain_name == url && storage_meta.port == port { + return Self::process_and_store_metadata(storage_meta, staging_path, node_type) + .await; } + } - if meta.port != port { - info!("Port was Updated. Old: {} New: {}", meta.port, port); - meta.port = port; - } + // If no metadata is found, create a new one + let meta = Self::create_new_metadata(&PARSEABLE.options, &*PARSEABLE.storage, node_type); + Self::store_new_metadata(meta, staging_path).await + } - let token = format!( - "Basic {}", - BASE64_STANDARD.encode(format!("{username}:{password}")) - ); - if meta.token != token { - // TODO: Update the message to be more informative with username and password - warn!( - "Credentials were Updated. Tokens updated; Old: {} New: {}", - meta.token, token - ); - meta.token = token; - } - meta.put_on_disk(staging_path) - .expect("Couldn't write to disk"); + /// Process and store metadata + async fn process_and_store_metadata( + mut meta: Self, + staging_path: &Path, + node_type: NodeType, + ) -> anyhow::Result> { + Self::update_metadata(&mut meta, &PARSEABLE.options, node_type); + meta.put_on_disk(staging_path) + .expect("Couldn't write updated metadata to disk"); - return Arc::new(meta); - } + let path = meta.file_path(); + let resource = serde_json::to_vec(&meta)?.into(); + let store = PARSEABLE.storage.get_object_store(); + store.put_object(&path, resource).await?; - let storage = storage.get_object_store(); - let meta = Self::new( - port, - url, - storage.get_bucket_name(), - username, - password, - get_ingestor_id(), - flight_port, - ); + Ok(Arc::new(meta)) + } + /// Store new metadata + async fn store_new_metadata(meta: Self, staging_path: &Path) -> anyhow::Result> { meta.put_on_disk(staging_path) - .expect("Should Be valid Json"); - Arc::new(meta) - } + .expect("Couldn't write new metadata to disk"); - pub fn get_ingestor_id(&self) -> String { - self.ingestor_id.clone() - } + let path = meta.file_path(); + let resource = serde_json::to_vec(&meta)?.into(); + let store = PARSEABLE.storage.get_object_store(); + store.put_object(&path, resource).await?; - #[inline(always)] - pub fn file_path(&self) -> RelativePathBuf { - RelativePathBuf::from_iter([ - PARSEABLE_ROOT_DIRECTORY, - &format!("ingestor.{}.json", self.get_ingestor_id()), - ]) + Ok(Arc::new(meta)) } - /// Updates json with `flight_port` field if not already present - fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result { - let mut json: Map = serde_json::from_slice(bytes)?; - json.entry("flight_port") - .or_insert_with(|| Value::String(flight_port.to_string())); - - Ok(serde_json::from_value(Value::Object(json))?) + async fn load_from_storage(node_type: String) -> Vec { + let path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); + let glob_storage = PARSEABLE.storage.get_object_store(); + let obs = glob_storage + .get_objects( + Some(&path), + Box::new({ + let node_type = node_type.clone(); + move |file_name| file_name.contains(&node_type) + }), + ) + .await; + + let mut metadata = vec![]; + if let Ok(obs) = obs { + for object in obs { + //convert to NodeMetadata + match serde_json::from_slice::(&object) { + Ok(node_metadata) => metadata.push(node_metadata), + Err(e) => error!("Failed to deserialize NodeMetadata: {:?}", e), + } + } + } else { + error!("Couldn't read from storage"); + } + // Return the metadata + metadata } - pub async fn migrate(&self) -> anyhow::Result> { - let imp = self.file_path(); - let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await { - Ok(bytes) => bytes, - Err(_) => { - return Ok(None); + /// Load metadata from the staging directory + fn load_from_staging( + staging_path: &Path, + node_type_str: &str, + options: &Options, + ) -> Option { + let entries = match staging_path.read_dir() { + Ok(entries) => entries, + Err(e) => { + error!("Couldn't read from staging directory: {}", e); + return None; } }; - let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?; - let bytes = Bytes::from(serde_json::to_vec(&resource)?); - - resource.put_on_disk(PARSEABLE.options.staging_dir())?; + for entry in entries { + let path = match entry { + Ok(entry) => entry.path(), + Err(e) => { + error!("Error reading directory entry: {}", e); + continue; + } + }; + if !Self::is_valid_metadata_file(&path, node_type_str) { + continue; + } - PARSEABLE - .storage - .get_object_store() - .put_object(&imp, bytes) - .await?; + let bytes = std::fs::read(&path).expect("File should be present"); + match Self::from_bytes(&bytes, options.flight_port) { + Ok(meta) => return Some(meta), + Err(e) => { + error!("Failed to extract {} metadata: {}", node_type_str, e); + return None; + } + } + } - Ok(Some(resource)) + None } - /// Puts the ingestor info into the staging. - /// - /// This function takes the ingestor info as a parameter and stores it in staging. - /// # Parameters - /// - /// * `staging_path`: Staging root directory. - pub fn put_on_disk(&self, staging_path: &Path) -> anyhow::Result<()> { - let file_name = format!("ingestor.{}.json", self.ingestor_id); - let file_path = staging_path.join(file_name); - - std::fs::write(file_path, serde_json::to_vec(&self)?)?; - - Ok(()) + /// Check if a file is a valid metadata file for the given node type + fn is_valid_metadata_file(path: &Path, node_type_str: &str) -> bool { + path.file_name() + .and_then(|s| s.to_str()) + .is_some_and(|s| s.contains(node_type_str)) } -} -#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)] -pub struct IndexerMetadata { - pub version: String, - pub port: String, - pub domain_name: String, - pub bucket_name: String, - pub token: String, - pub indexer_id: String, - pub flight_port: String, -} - -impl IndexerMetadata { - pub fn new( - port: String, - domain_name: String, - bucket_name: String, - username: &str, - password: &str, - indexer_id: String, - flight_port: String, - ) -> Self { - let token = base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); - - Self { - port, - domain_name, - version: DEFAULT_VERSION.to_string(), - bucket_name, - token: format!("Basic {token}"), - indexer_id, - flight_port, - } - } - - /// Capture metadata information by either loading it from staging or starting fresh - pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc { - // all the files should be in the staging directory root - let entries = options - .staging_dir() - .read_dir() - .expect("Couldn't read from file"); - let url = options.get_url(Mode::Index); + /// Update metadata fields if they differ from the current configuration + fn update_metadata(meta: &mut Self, options: &Options, node_type: NodeType) { + let url = options.get_url(node_type.to_mode()); let port = url.port().unwrap_or(80).to_string(); let url = url.to_string(); - let Options { - username, password, .. - } = options; - let staging_path = options.staging_dir(); - let flight_port = options.flight_port.to_string(); - for entry in entries { - // cause the staging directory will have only one file with indexer in the name - // so the JSON Parse should not error unless the file is corrupted - let path = entry.expect("Should be a directory entry").path(); - if !path - .file_name() - .and_then(|s| s.to_str()) - .is_some_and(|s| s.contains("indexer")) - { - continue; - } - - // get the indexer metadata from staging - let bytes = std::fs::read(path).expect("File should be present"); - let mut meta = - Self::from_bytes(&bytes, options.flight_port).expect("Extracted indexer metadata"); - - // compare url endpoint and port, update - if meta.domain_name != url { - info!( - "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, url - ); - meta.domain_name = url; - } + if meta.domain_name != url { + info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, url + ); + meta.domain_name = url; + } - if meta.port != port { - info!("Port was Updated. Old: {} New: {}", meta.port, port); - meta.port = port; - } + if meta.port != port { + info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } - let token = format!( - "Basic {}", - BASE64_STANDARD.encode(format!("{username}:{password}")) + let token = Self::generate_token(&options.username, &options.password); + if meta.token != token { + warn!( + "Credentials were Updated. Tokens updated; Old: {} New: {}", + meta.token, token ); - if meta.token != token { - // TODO: Update the message to be more informative with username and password - warn!( - "Credentials were Updated. Tokens updated; Old: {} New: {}", - meta.token, token - ); - meta.token = token; - } - meta.put_on_disk(staging_path) - .expect("Couldn't write to disk"); - - return Arc::new(meta); + meta.token = token; } - let storage = storage.get_object_store(); - let meta = Self::new( + meta.node_type = node_type; + } + + /// Create a new metadata instance + fn create_new_metadata( + options: &Options, + storage: &dyn ObjectStorageProvider, + node_type: NodeType, + ) -> Self { + let url = options.get_url(node_type.to_mode()); + let port = url.port().unwrap_or(80).to_string(); + let url = url.to_string(); + + Self::new( port, url, - storage.get_bucket_name(), - username, - password, - get_indexer_id(), - flight_port, - ); + storage.get_object_store().get_bucket_name(), + &options.username, + &options.password, + get_node_id(), + options.flight_port.to_string(), + node_type, + ) + } - meta.put_on_disk(staging_path) - .expect("Should Be valid Json"); - Arc::new(meta) + /// Generate a token from the username and password + fn generate_token(username: &str, password: &str) -> String { + format!( + "Basic {}", + BASE64_STANDARD.encode(format!("{username}:{password}")) + ) } - pub fn get_indexer_id(&self) -> String { - self.indexer_id.clone() + pub fn get_node_id(&self) -> String { + self.node_id.clone() } #[inline(always)] pub fn file_path(&self) -> RelativePathBuf { RelativePathBuf::from_iter([ PARSEABLE_ROOT_DIRECTORY, - &format!("indexer.{}.json", self.get_indexer_id()), + &format!("{}.{}.json", self.node_type.as_str(), self.get_node_id()), ]) } /// Updates json with `flight_port` field if not already present fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result { let mut json: Map = serde_json::from_slice(bytes)?; - json.entry("flight_port") - .or_insert_with(|| Value::String(flight_port.to_string())); - Ok(serde_json::from_value(Value::Object(json))?) - } - - pub async fn migrate(&self) -> anyhow::Result> { - let imp = self.file_path(); - let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await { - Ok(bytes) => bytes, - Err(_) => { - return Ok(None); + // Check version + let version = json.get("version").and_then(|version| version.as_str()); + + if version == Some("v3") { + fn migrate_legacy_id( + json: &mut Map, + legacy_id_key: &str, + node_type_str: &str, + ) -> bool { + if json.contains_key(legacy_id_key) { + if let Some(id) = json.remove(legacy_id_key) { + json.insert("node_id".to_string(), id); + json.insert( + "version".to_string(), + Value::String(DEFAULT_VERSION.to_string()), + ); + } + json.insert( + "node_type".to_string(), + Value::String(node_type_str.to_string()), + ); + true + } else { + false + } } - }; - let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?; - let bytes = Bytes::from(serde_json::to_vec(&resource)?); + if !migrate_legacy_id(&mut json, "ingestor_id", "ingestor") { + migrate_legacy_id(&mut json, "indexer_id", "indexer"); + } + } + // Determine node type and perform migration if needed - resource.put_on_disk(PARSEABLE.options.staging_dir())?; + // Add flight_port if missing + json.entry("flight_port") + .or_insert_with(|| Value::String(flight_port.to_string())); - PARSEABLE - .storage - .get_object_store() - .put_object(&imp, bytes) - .await?; + // Parse the JSON to our struct + let metadata: Self = serde_json::from_value(Value::Object(json))?; - Ok(Some(resource)) + Ok(metadata) } - /// Puts the indexer info into the staging. + /// Puts the node info into the staging. /// - /// This function takes the indexer info as a parameter and stores it in staging. + /// This function takes the node info as a parameter and stores it in staging. /// # Parameters /// /// * `staging_path`: Staging root directory. pub fn put_on_disk(&self, staging_path: &Path) -> anyhow::Result<()> { - let file_name = format!("indexer.{}.json", self.indexer_id); + let file_name = format!("{}.{}.json", self.node_type.as_str(), self.node_id); let file_path = staging_path.join(file_name); std::fs::write(file_path, serde_json::to_vec(&self)?)?; @@ -552,11 +550,11 @@ impl IndexerMetadata { pub trait Metadata { fn domain_name(&self) -> &str; fn token(&self) -> &str; - fn node_type(&self) -> &str; + fn node_type(&self) -> &NodeType; fn file_path(&self) -> RelativePathBuf; } -impl Metadata for IngestorMetadata { +impl Metadata for NodeMetadata { fn domain_name(&self) -> &str { &self.domain_name } @@ -564,35 +562,30 @@ impl Metadata for IngestorMetadata { fn token(&self) -> &str { &self.token } - fn node_type(&self) -> &str { - "ingestor" - } - fn file_path(&self) -> RelativePathBuf { - self.file_path() - } -} -impl Metadata for IndexerMetadata { - fn domain_name(&self) -> &str { - &self.domain_name + fn node_type(&self) -> &NodeType { + &self.node_type } - fn token(&self) -> &str { - &self.token - } - fn node_type(&self) -> &str { - "indexer" - } fn file_path(&self) -> RelativePathBuf { self.file_path() } } + +// Aliases for different node types +pub type IngestorMetadata = NodeMetadata; +pub type IndexerMetadata = NodeMetadata; +pub type QuerierMetadata = NodeMetadata; +pub type PrismMetadata = NodeMetadata; + #[cfg(test)] mod test { use actix_web::body::MessageBody; use bytes::Bytes; use rstest::rstest; + use crate::handlers::http::modal::NodeType; + use super::IngestorMetadata; #[rstest] @@ -605,9 +598,10 @@ mod test { "admin", "ingestor_id".to_owned(), "8002".to_string(), + NodeType::Ingestor, ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v4","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","node_id": "ingestor_id","flight_port": "8002","node_type":"ingestor"}"#).unwrap(); assert_eq!(rhs, lhs); } @@ -636,10 +630,11 @@ mod test { "admin", "ingestor_id".to_owned(), "8002".to_string(), + NodeType::Ingestor, ); let lhs = Bytes::from(serde_json::to_vec(&im).unwrap()); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# + let rhs = br#"{"version":"v4","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","node_id":"ingestor_id","flight_port":"8002","node_type":"ingestor"}"# .try_into_bytes() .unwrap(); diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 76d282f76..ebffbfb43 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -41,6 +41,7 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::{error::StreamError, get_stats_date}, + modal::{NodeMetadata, NodeType}, }, hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, @@ -81,10 +82,12 @@ pub async fn delete(stream_name: Path) -> Result = cluster::get_node_info(NodeType::Ingestor) + .await + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + err + })?; for ingestor in ingestor_metadata { let url = format!( diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index d4ce26f72..b138a292c 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use std::sync::Arc; use std::thread; use crate::handlers::airplane; @@ -32,17 +33,17 @@ use actix_web::{web, Scope}; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, OnceCell}; use tracing::info; use crate::parseable::PARSEABLE; use crate::Server; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; -use super::{load_on_init, OpenIdClient, ParseableServer}; +use super::{load_on_init, NodeType, OpenIdClient, ParseableServer, QuerierMetadata}; pub struct QueryServer; - +pub static QUERIER_META: OnceCell> = OnceCell::const_new(); #[async_trait] impl ParseableServer for QueryServer { // configure the api routes @@ -99,7 +100,14 @@ impl ParseableServer for QueryServer { shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { PARSEABLE.storage.register_store_metrics(prometheus); - + // write the ingestor metadata to storage + QUERIER_META + .get_or_init(|| async { + QuerierMetadata::load_node_metadata(NodeType::Querier) + .await + .expect("Querier Metadata should be set in ingestor mode") + }) + .await; migration::run_migration(&PARSEABLE).await?; //create internal stream at server start diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 3b00268b2..368fe8b9c 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -45,7 +45,6 @@ use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; -use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; @@ -173,12 +172,12 @@ pub async fn get_counts( } pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), EventError> { - if PARSEABLE.options.mode == Mode::Query { + // if the mode is query or prism, we need to update the schema in memory + // no need to commit schema to storage + // as the schema is read from memory everytime + if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { for table in tables { if let Ok(new_schema) = fetch_schema(table).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(table, new_schema.clone()).await?; - commit_schema(table, Arc::new(new_schema))?; } } @@ -325,6 +324,12 @@ Description: {0}"# Anyhow(#[from] anyhow::Error), #[error("Error: {0}")] StreamNotFound(#[from] StreamNotFound), + #[error("SerdeJsonError: {0}")] + SerdeJsonError(#[from] serde_json::Error), + #[error("CustomError: {0}")] + CustomError(String), + #[error("No available queriers found")] + NoAvailableQuerier, } impl actix_web::ResponseError for QueryError { diff --git a/src/logstream/mod.rs b/src/logstream/mod.rs index 9dc29e06b..844885fc6 100644 --- a/src/logstream/mod.rs +++ b/src/logstream/mod.rs @@ -27,7 +27,7 @@ use crate::{handlers::http::{logstream::error::StreamError, query::update_schema pub async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { // Ensure parseable is aware of stream in distributed mode - if PARSEABLE.check_or_load_stream(&stream_name).await { + if !PARSEABLE.check_or_load_stream(&stream_name).await { return Err(StreamNotFound(stream_name.to_owned()).into()); } @@ -48,7 +48,7 @@ pub async fn get_stream_info_helper(stream_name: &str) -> Result anyhow::Result<()> { println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!"); exit(0) } + Mode::Prism => { + println!("Prism is an enterprise feature. Check out https://www.parseable.com/pricing to know more!"); + exit(0) + } Mode::All => Box::new(Server), }; diff --git a/src/option.rs b/src/option.rs index 0e659905b..26b5f4664 100644 --- a/src/option.rs +++ b/src/option.rs @@ -18,11 +18,14 @@ use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use serde::{Deserialize, Serialize}; +use crate::handlers::http::modal::NodeType; + #[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum Mode { Query, Ingest, Index, + Prism, #[default] All, } @@ -40,6 +43,16 @@ impl Mode { Ok(()) } + + pub fn to_node_type(&self) -> NodeType { + match self { + Mode::Ingest => NodeType::Ingestor, + Mode::Index => NodeType::Indexer, + Mode::Query => NodeType::Querier, + Mode::Prism => NodeType::Prism, + Mode::All => NodeType::All, + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)] @@ -128,6 +141,7 @@ pub mod validation { match s { "query" => Ok(Mode::Query), "ingest" => Ok(Mode::Ingest), + "prism" => Ok(Mode::Prism), "all" => Ok(Mode::All), "index" => Ok(Mode::Index), _ => Err("Invalid MODE provided".to_string()), diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index ed6354eb8..83197e42e 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -47,7 +47,7 @@ use crate::{ cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}, ingest::PostError, logstream::error::{CreateStreamError, StreamError}, - modal::{utils::logstream_utils::PutStreamHeaders, IndexerMetadata, IngestorMetadata}, + modal::{ingest_server::INGESTOR_META, utils::logstream_utils::PutStreamHeaders}, }, STREAM_TYPE_KEY, }, @@ -127,10 +127,6 @@ pub struct Parseable { /// Metadata and staging realting to each logstreams /// A globally shared mapping of `Streams` that parseable is aware of. pub streams: Streams, - /// Metadata associated only with an ingestor - pub ingestor_metadata: Option>, - /// Metadata associated only with an indexer - pub indexer_metadata: Option>, /// Used to configure the kafka connector #[cfg(feature = "kafka")] pub kafka_config: KafkaConfig, @@ -142,25 +138,14 @@ impl Parseable { #[cfg(feature = "kafka")] kafka_config: KafkaConfig, storage: Arc, ) -> Self { - let ingestor_metadata = match &options.mode { - Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())), - _ => None, - }; - let indexer_metadata = match &options.mode { - Mode::Index => Some(IndexerMetadata::load(&options, storage.as_ref())), - _ => None, - }; Parseable { options: Arc::new(options), storage, streams: Streams::default(), - ingestor_metadata, - indexer_metadata, #[cfg(feature = "kafka")] kafka_config, } } - /// Try to get the handle of a stream in staging, if it doesn't exist return `None`. pub fn get_stream(&self, stream_name: &str) -> Result { self.streams @@ -177,25 +162,31 @@ impl Parseable { return staging; } + let ingestor_id = INGESTOR_META + .get() + .map(|ingestor_metadata| ingestor_metadata.get_node_id()); + // Gets write privileges only for creating the stream when it doesn't already exist. self.streams.get_or_create( self.options.clone(), stream_name.to_owned(), LogStreamMetadata::default(), - self.ingestor_metadata - .as_ref() - .map(|meta| meta.get_ingestor_id()), + ingestor_id, ) } /// Checks for the stream in memory, or loads it from storage when in distributed mode + /// return true if stream exists in memory or loaded from storage + /// return false if stream doesn't exist in memory and not loaded from storage pub async fn check_or_load_stream(&self, stream_name: &str) -> bool { - !self.streams.contains(stream_name) - && (self.options.mode != Mode::Query - || !self - .create_stream_and_schema_from_storage(stream_name) - .await - .unwrap_or_default()) + if self.streams.contains(stream_name) { + return true; + } + (self.options.mode == Mode::Query || self.options.mode == Mode::Prism) + && self + .create_stream_and_schema_from_storage(stream_name) + .await + .unwrap_or_default() } // validate the storage, if the proper path for staging directory is provided @@ -260,75 +251,11 @@ impl Parseable { Mode::Query => "Distributed (Query)", Mode::Ingest => "Distributed (Ingest)", Mode::Index => "Distributed (Index)", + Mode::Prism => "Distributed (Prism)", Mode::All => "Standalone", } } - // create the ingestor metadata and put the .ingestor.json file in the object store - pub async fn store_metadata(&self, mode: Mode) -> anyhow::Result<()> { - match mode { - Mode::Ingest => { - let Some(meta) = self.ingestor_metadata.as_ref() else { - return Ok(()); - }; - let storage_ingestor_metadata = meta.migrate().await?; - let store = self.storage.get_object_store(); - - // use the id that was generated/found in the staging and - // generate the path for the object store - let path = meta.file_path(); - - // we are considering that we can always get from object store - if let Some(mut store_data) = storage_ingestor_metadata { - if store_data.domain_name != meta.domain_name { - store_data.domain_name.clone_from(&meta.domain_name); - store_data.port.clone_from(&meta.port); - - let resource = Bytes::from(serde_json::to_vec(&store_data)?); - - // if pushing to object store fails propagate the error - store.put_object(&path, resource).await?; - } - } else { - let resource = serde_json::to_vec(&meta)?.into(); - - store.put_object(&path, resource).await?; - } - Ok(()) - } - Mode::Index => { - let Some(meta) = self.indexer_metadata.as_ref() else { - return Ok(()); - }; - let storage_indexer_metadata = meta.migrate().await?; - let store = self.storage.get_object_store(); - - // use the id that was generated/found in the staging and - // generate the path for the object store - let path = meta.file_path(); - - // we are considering that we can always get from object store - if let Some(mut store_data) = storage_indexer_metadata { - if store_data.domain_name != meta.domain_name { - store_data.domain_name.clone_from(&meta.domain_name); - store_data.port.clone_from(&meta.port); - - let resource = Bytes::from(serde_json::to_vec(&store_data)?); - - // if pushing to object store fails propagate the error - store.put_object(&path, resource).await?; - } - } else { - let resource = serde_json::to_vec(&meta)?.into(); - - store.put_object(&path, resource).await?; - } - Ok(()) - } - _ => Err(anyhow::anyhow!("Invalid mode")), - } - } - /// list all streams from storage /// if stream exists in storage, create stream and schema from storage /// and add it to the memory map @@ -382,13 +309,16 @@ impl Parseable { schema_version, log_source, ); + let ingestor_id = INGESTOR_META + .get() + .map(|ingestor_metadata| ingestor_metadata.get_node_id()); + + // Gets write privileges only for creating the stream when it doesn't already exist. self.streams.get_or_create( self.options.clone(), - stream_name.to_string(), + stream_name.to_owned(), metadata, - self.ingestor_metadata - .as_ref() - .map(|meta| meta.get_ingestor_id()), + ingestor_id, ); Ok(true) @@ -525,8 +455,11 @@ impl Parseable { let stream_in_memory_dont_update = self.streams.contains(stream_name) && !update_stream_flag; - let stream_in_storage_only_for_query_node = !self.streams.contains(stream_name) // check if stream in storage only if not in memory - && self.options.mode == Mode::Query // and running in query mode + // check if stream in storage only if not in memory + // for Parseable OSS, create_update_stream is called only from query node + // for Parseable Enterprise, create_update_stream is called from prism node + let stream_in_storage_only_for_query_node = !self.streams.contains(stream_name) + && (self.options.mode == Mode::Query || self.options.mode == Mode::Prism) && self .create_stream_and_schema_from_storage(stream_name) .await?; @@ -687,13 +620,16 @@ impl Parseable { SchemaVersion::V1, // New stream log_source, ); + let ingestor_id = INGESTOR_META + .get() + .map(|ingestor_metadata| ingestor_metadata.get_node_id()); + + // Gets write privileges only for creating the stream when it doesn't already exist. self.streams.get_or_create( self.options.clone(), - stream_name.to_string(), + stream_name.to_owned(), metadata, - self.ingestor_metadata - .as_ref() - .map(|meta| meta.get_ingestor_id()), + ingestor_id, ); } Err(err) => { diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index b03de0454..c8e1015c4 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -86,7 +86,7 @@ pub async fn get_prism_logstream_info( async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { // Ensure parseable is aware of stream in distributed mode - if PARSEABLE.check_or_load_stream(stream_name).await { + if !PARSEABLE.check_or_load_stream(stream_name).await { return Err(StreamNotFound(stream_name.to_owned()).into()); } @@ -152,7 +152,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result bool { - if PARSEABLE.check_or_load_stream(stream).await { - warn!("Stream not found: {stream}"); - false - } else { - true - } - } - async fn build_dataset_response( &self, stream: String, diff --git a/src/query/mod.rs b/src/query/mod.rs index 6486b4836..0608a9459 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -467,7 +467,7 @@ pub async fn get_manifest_list( let mut merged_snapshot: Snapshot = Snapshot::default(); // get a list of manifests - if PARSEABLE.options.mode == Mode::Query { + if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); let obs = glob_storage .get_objects( diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 6b6219f91..d015bc71d 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -503,7 +503,7 @@ impl TableProvider for StandardTableProvider { .await?; }; let mut merged_snapshot = Snapshot::default(); - if PARSEABLE.options.mode == Mode::Query { + if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); let obs = glob_storage .get_objects( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b6bc9bb25..63e3803bd 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -41,7 +41,7 @@ mod metrics_layer; pub mod object_storage; pub mod retention; mod s3; -mod store_metadata; +pub mod store_metadata; use self::retention::Retention; pub use azure_blob::AzureBlobConfig; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a40072f62..49da6b8fd 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -48,6 +48,7 @@ use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::CORRELATION_DIR; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::storage::StorageMetrics; @@ -895,40 +896,34 @@ pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { } pub fn schema_path(stream_name: &str) -> RelativePathBuf { - match &PARSEABLE.options.mode { - Mode::Ingest => { - let id = PARSEABLE - .ingestor_metadata - .as_ref() - .expect(INGESTOR_EXPECT) - .get_ingestor_id(); - let file_name = format!(".ingestor.{id}{SCHEMA_FILE_NAME}"); - - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) - } - Mode::All | Mode::Query | Mode::Index => { - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) - } + if PARSEABLE.options.mode == Mode::Ingest { + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); + let file_name = format!(".ingestor.{id}{SCHEMA_FILE_NAME}"); + + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } else { + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) } } #[inline(always)] pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { - match &PARSEABLE.options.mode { - Mode::Ingest => { - let id = PARSEABLE - .ingestor_metadata - .as_ref() - .expect(INGESTOR_EXPECT) - .get_ingestor_id(); - let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",); - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) - } - Mode::Query | Mode::All | Mode::Index => RelativePathBuf::from_iter([ + if PARSEABLE.options.mode == Mode::Ingest { + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); + let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",); + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } else { + RelativePathBuf::from_iter([ stream_name, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, - ]), + ]) } } @@ -966,11 +961,10 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { pub fn manifest_path(prefix: &str) -> RelativePathBuf { match &PARSEABLE.options.mode { Mode::Ingest => { - let id = PARSEABLE - .ingestor_metadata - .as_ref() - .expect(INGESTOR_EXPECT) - .get_ingestor_id(); + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}"); RelativePathBuf::from_iter([prefix, &manifest_file_name]) } diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 89f638161..2152f3d07 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -129,6 +129,7 @@ pub async fn resolve_parseable_metadata( Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server") } EnvChange::NewStaging(mut metadata) => { + // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest { @@ -148,7 +149,7 @@ pub async fn resolve_parseable_metadata( })?; overwrite_remote = true; }, - Mode::Query => { + Mode::Query | Mode::Prism => { overwrite_remote = true; metadata.server_mode = PARSEABLE.options.mode; metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); @@ -175,7 +176,7 @@ pub async fn resolve_parseable_metadata( // new metadata needs to be set // if mode is query or all then both staging and remote match PARSEABLE.options.mode { - Mode::All | Mode::Query => overwrite_remote = true, + Mode::All | Mode::Query | Mode::Prism => overwrite_remote = true, _ => (), } // else only staging @@ -202,7 +203,7 @@ pub async fn resolve_parseable_metadata( Ok(metadata) } -fn determine_environment( +pub fn determine_environment( staging_metadata: Option, remote_metadata: Option, ) -> EnvChange { diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index c8d2dacf2..180d352d5 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -133,7 +133,8 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { ); let time_filters = extract_primary_filter(&[Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)], &None); - PARSEABLE.options.mode == Mode::Query && is_within_staging_window(&time_filters) + (PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism) + && is_within_staging_window(&time_filters) } fn lit_timestamp_milli(time: i64) -> Expr { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d2ba24b54..f2e2685d6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -36,19 +36,10 @@ use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; use datafusion::common::tree_node::TreeNode; use regex::Regex; use sha2::{Digest, Sha256}; -use tracing::debug; -pub fn get_ingestor_id() -> String { +pub fn get_node_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string(); - debug!("Ingestor ID: {id}"); - id -} - -pub fn get_indexer_id() -> String { - let now = Utc::now().to_rfc3339(); - let id = get_hash(&now).to_string().split_at(15).0.to_string(); - debug!("Indexer ID: {id}"); id }