diff --git a/src/cli.rs b/src/cli.rs index 26cab2e95..b7880a8db 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -18,7 +18,7 @@ use clap::Parser; use std::{env, fs, path::PathBuf}; - +use ulid::Ulid; use url::Url; #[cfg(feature = "kafka")] @@ -399,7 +399,7 @@ pub struct OidcConfig { required = false, help = "Client id for OIDC provider" )] - pub client_id: String, + pub client_id: Ulid, #[arg( long = "oidc-client-secret", diff --git a/src/connectors/kafka/config.rs b/src/connectors/kafka/config.rs index ee7e77446..733d47745 100644 --- a/src/connectors/kafka/config.rs +++ b/src/connectors/kafka/config.rs @@ -4,6 +4,7 @@ use rdkafka::{ClientConfig, Offset}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::time::Duration; +use ulid::Ulid; #[derive(Debug, Clone, Parser)] pub struct KafkaConfig { @@ -24,7 +25,7 @@ pub struct KafkaConfig { value_name = "client_id", help = "Client ID for Kafka connection" )] - pub client_id: String, + pub client_id: UsernameValidationError, #[arg( long = "partition-listener-concurrency", @@ -76,7 +77,7 @@ pub struct ConsumerConfig { default_value_t = String::from("parseable-connect-cg"), help = "Consumer group ID" )] - pub group_id: String, + pub group_id: Ulid, // uses per partition stream micro-batch buffer size #[arg( @@ -108,7 +109,7 @@ pub struct ConsumerConfig { default_value_t = format!("parseable-connect-cg-ii-{}", rand::random::()).to_string(), help = "Group instance ID for static membership" )] - pub group_instance_id: String, + pub group_instance_id: Ulid, #[arg( long = "consumer-partition-strategy", diff --git a/src/correlation.rs b/src/correlation.rs index c5f4eb2d8..48ebceddf 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -17,6 +17,7 @@ */ use std::collections::{HashMap, HashSet}; +use ulid::Ulid; use actix_web::{http::header::ContentType, Error}; use chrono::Utc; @@ -207,8 +208,8 @@ pub enum CorrelationVersion { V1, } -type CorrelationId = String; -type UserId = String; +type CorrelationId = Ulid; +type UserId = Ulid; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index c714729c7..abdabd4d9 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -20,6 +20,7 @@ use base64::{engine::general_purpose::STANDARD, Engine as _}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::str; +use ulid::Ulid; use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels}; @@ -27,7 +28,7 @@ use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_ #[serde(rename_all = "camelCase")] pub struct Message { records: Vec, - request_id: String, + request_id: Ulid, timestamp: u64, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3504ef0fa..4f8599cff 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -32,6 +32,7 @@ use serde_json::{Map, Value}; use ssl_acceptor::get_ssl_acceptor; use tokio::sync::oneshot; use tracing::{error, info, warn}; +use ulid::Ulid; use crate::{ alerts::ALERTS, @@ -246,7 +247,7 @@ pub struct NodeMetadata { pub domain_name: String, pub bucket_name: String, pub token: String, - pub node_id: String, + pub node_id: Ulid, pub flight_port: String, pub node_type: NodeType, } @@ -259,7 +260,7 @@ impl NodeMetadata { bucket_name: String, username: &str, password: &str, - node_id: String, + node_id: lid, flight_port: String, node_type: NodeType, ) -> Self { diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 3870c88a9..2f920841f 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -31,6 +31,7 @@ use actix_web::{ use http::StatusCode; use itertools::Itertools; use tokio::sync::Mutex; +use ulid::Ulid; use super::modal::utils::rbac_utils::{get_metadata, put_metadata}; @@ -39,7 +40,7 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); #[derive(serde::Serialize)] struct User { - id: String, + id: Ulid, method: String, } diff --git a/src/livetail.rs b/src/livetail.rs index eca90eceb..5fc16f908 100644 --- a/src/livetail.rs +++ b/src/livetail.rs @@ -22,13 +22,13 @@ use std::{ task::Poll, }; +use arrow_array::RecordBatch; use futures_util::Stream; +use once_cell::sync::Lazy; use tokio::sync::mpsc::{ self, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; - -use arrow_array::RecordBatch; -use once_cell::sync::Lazy; +use ulid::Ulid; pub static LIVETAIL: Lazy = Lazy::new(LiveTail::default); @@ -39,7 +39,7 @@ pub struct LiveTail { } impl LiveTail { - pub fn new_pipe(&self, id: String, stream: String) -> ReceiverPipe { + pub fn new_pipe(&self, id: ulid, stream: String) -> ReceiverPipe { let (sender, revc) = channel(id, stream.clone(), Arc::downgrade(&self.pipes)); self.pipes .write() @@ -106,7 +106,7 @@ pub struct ReceiverPipe { } fn channel( - id: String, + id: ulid, stream: String, weak_ptr: Weak, ) -> (SenderPipe, ReceiverPipe) { diff --git a/src/oidc.rs b/src/oidc.rs index 1650fd929..103fd6ce9 100644 --- a/src/oidc.rs +++ b/src/oidc.rs @@ -16,9 +16,9 @@ * */ -use std::collections::HashMap; - use openid::{Client, CompactJson, CustomClaims, Discovered, StandardClaims}; +use std::collections::HashMap; +use ulid::Ulid; use url::Url; pub type DiscoveredClient = Client; @@ -37,7 +37,7 @@ pub enum Origin { #[derive(Debug, Clone)] pub struct OpenidConfig { /// Client id - pub id: String, + pub id: Ulid, /// Client Secret pub secret: String, /// OP host address over which discovery can be done diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..f8794dfa9 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -26,6 +26,7 @@ use std::{ sync::{Arc, Mutex, RwLock}, time::{Instant, SystemTime, UNIX_EPOCH}, }; +use ulid::Ulid; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; @@ -100,7 +101,7 @@ pub struct Stream { pub data_path: PathBuf, pub options: Arc, pub writer: Mutex, - pub ingestor_id: Option, + pub ingestor_id: Option, } impl Stream { @@ -108,7 +109,7 @@ impl Stream { options: Arc, stream_name: impl Into, metadata: LogStreamMetadata, - ingestor_id: Option, + ingestor_id: Option, ) -> StreamRef { let stream_name = stream_name.into(); let data_path = options.local_stream_data_path(&stream_name); @@ -763,7 +764,7 @@ impl Streams { options: Arc, stream_name: String, metadata: LogStreamMetadata, - ingestor_id: Option, + ingestor_id: Option, ) -> StreamRef { let mut guard = self.write().expect(LOCK_EXPECT); if let Some(stream) = guard.get(&stream_name) { @@ -1332,7 +1333,7 @@ mod tests { let options = Arc::new(Options::default()); let stream_name = String::from("concurrent_stream"); let metadata = LogStreamMetadata::default(); - let ingestor_id = Some(String::from("concurrent_ingestor")); + let ingestor_id = Some(Ulid::from_string("concurrent_ingestor").unwrap()); // Barrier to synchronize threads let barrier = Arc::new(Barrier::new(2)); diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 656e50cae..a655af816 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -25,6 +25,7 @@ use itertools::Itertools; use relative_path::RelativePathBuf; use serde::Serialize; use tracing::error; +use ulid::Ulid; use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, @@ -78,7 +79,7 @@ pub enum ResourceType { #[derive(Debug, Serialize)] pub struct Resource { - id: String, + id: Ulid, name: String, resource_type: ResourceType, } @@ -277,7 +278,7 @@ pub async fn generate_home_search_response( for title in stream_titles { if title.to_lowercase().contains(query_value) { resources.push(Resource { - id: title.clone(), + id: Ulid::from_string(&title).unwrap_or_else(|_| Ulid::new()), name: title, resource_type: ResourceType::DataSet, }); diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 28ead768b..8da34dee4 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -29,13 +29,13 @@ use role::model::DefaultPrivilege; use serde::Serialize; use url::Url; -use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; -use crate::rbac::role::Action; -use crate::rbac::user::User; - use self::map::SessionKey; use self::role::{Permission, RoleBuilder}; use self::user::UserType; +use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; +use crate::rbac::role::Action; +use crate::rbac::user::User; +use ulid::Ulid; #[derive(PartialEq)] pub enum Response { @@ -176,7 +176,7 @@ impl Users { #[derive(Debug, Serialize, Clone)] pub struct UsersPrism { // username - pub id: String, + pub id: Ulid, // oaith or native pub method: String, // email only if method is oauth diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 45fc50d79..f53c6ea4e 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -16,12 +16,12 @@ * */ -use std::collections::HashSet; - use argon2::{ password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, Argon2, PasswordHash, PasswordVerifier, }; +use std::collections::HashSet; +use ulid::Ulid; use rand::distributions::{Alphanumeric, DistString}; @@ -152,7 +152,7 @@ pub fn get_admin_user() -> User { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct OAuth { - pub userid: String, + pub userid: Ulid, pub user_info: UserInfo, } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 63e3803bd..7305ffe0c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -32,8 +32,8 @@ use crate::{ }; use chrono::Utc; - use std::fmt::Debug; +use ulid::Ulid; mod azure_blob; mod localfs; @@ -174,25 +174,25 @@ impl std::fmt::Display for StreamType { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Owner { - pub id: String, + pub id: Ulid, pub group: String, } impl Owner { - pub fn new(id: String, group: String) -> Self { + pub fn new(id: Ulid, group: String) -> Self { Self { id, group } } } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Permisssion { - pub id: String, + pub id: Ulid, pub group: String, pub access: Vec, } impl Permisssion { - pub fn new(id: String) -> Self { + pub fn new(id: Ulid) -> Self { Self { id: id.clone(), group: id, diff --git a/src/users/filters.rs b/src/users/filters.rs index d6ade5344..fcd0f632f 100644 --- a/src/users/filters.rs +++ b/src/users/filters.rs @@ -20,6 +20,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::RwLock; +use ulid::Ulid; use super::TimeFilter; use crate::{ @@ -73,21 +74,21 @@ impl FilterType { #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FilterBuilder { - pub id: String, + pub id: Ulid, pub combinator: String, pub rules: Vec, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FilterRules { - pub id: String, + pub id: Ulid, pub combinator: String, pub rules: Vec, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Rules { - pub id: String, + pub id: Ulid, pub field: String, pub value: String, pub operator: String,