Skip to content

Replace String with Ulids to store ids #1345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use clap::Parser;
use std::{env, fs, path::PathBuf};

use ulid::Ulid;
use url::Url;

#[cfg(feature = "kafka")]
Expand Down Expand Up @@ -399,7 +399,7 @@ pub struct OidcConfig {
required = false,
help = "Client id for OIDC provider"
)]
pub client_id: String,
pub client_id: Ulid,

#[arg(
long = "oidc-client-secret",
Expand Down
7 changes: 4 additions & 3 deletions src/connectors/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use rdkafka::{ClientConfig, Offset};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
use ulid::Ulid;

#[derive(Debug, Clone, Parser)]
pub struct KafkaConfig {
Expand All @@ -24,7 +25,7 @@ pub struct KafkaConfig {
value_name = "client_id",
help = "Client ID for Kafka connection"
)]
pub client_id: String,
pub client_id: UsernameValidationError,

#[arg(
long = "partition-listener-concurrency",
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct ConsumerConfig {
default_value_t = String::from("parseable-connect-cg"),
help = "Consumer group ID"
)]
pub group_id: String,
pub group_id: Ulid,

// uses per partition stream micro-batch buffer size
#[arg(
Expand Down Expand Up @@ -108,7 +109,7 @@ pub struct ConsumerConfig {
default_value_t = format!("parseable-connect-cg-ii-{}", rand::random::<u8>()).to_string(),
help = "Group instance ID for static membership"
)]
pub group_instance_id: String,
pub group_instance_id: Ulid,

#[arg(
long = "consumer-partition-strategy",
Expand Down
5 changes: 3 additions & 2 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

use std::collections::{HashMap, HashSet};
use ulid::Ulid;

use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
Expand Down Expand Up @@ -207,8 +208,8 @@ pub enum CorrelationVersion {
V1,
}

type CorrelationId = String;
type UserId = String;
type CorrelationId = Ulid;
type UserId = Ulid;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use base64::{engine::general_purpose::STANDARD, Engine as _};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::str;
use ulid::Ulid;

use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Message {
records: Vec<Data>,
request_id: String,
request_id: Ulid,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use serde_json::{Map, Value};
use ssl_acceptor::get_ssl_acceptor;
use tokio::sync::oneshot;
use tracing::{error, info, warn};
use ulid::Ulid;

use crate::{
alerts::ALERTS,
Expand Down Expand Up @@ -246,7 +247,7 @@ pub struct NodeMetadata {
pub domain_name: String,
pub bucket_name: String,
pub token: String,
pub node_id: String,
pub node_id: Ulid,
pub flight_port: String,
pub node_type: NodeType,
}
Expand All @@ -259,7 +260,7 @@ impl NodeMetadata {
bucket_name: String,
username: &str,
password: &str,
node_id: String,
node_id: lid,
flight_port: String,
node_type: NodeType,
) -> Self {
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use actix_web::{
use http::StatusCode;
use itertools::Itertools;
use tokio::sync::Mutex;
use ulid::Ulid;

use super::modal::utils::rbac_utils::{get_metadata, put_metadata};

Expand All @@ -39,7 +40,7 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(());

#[derive(serde::Serialize)]
struct User {
id: String,
id: Ulid,
method: String,
}

Expand Down
10 changes: 5 additions & 5 deletions src/livetail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use std::{
task::Poll,
};

use arrow_array::RecordBatch;
use futures_util::Stream;
use once_cell::sync::Lazy;
use tokio::sync::mpsc::{
self, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};

use arrow_array::RecordBatch;
use once_cell::sync::Lazy;
use ulid::Ulid;

pub static LIVETAIL: Lazy<LiveTail> = Lazy::new(LiveTail::default);

Expand All @@ -39,7 +39,7 @@ pub struct LiveTail {
}

impl LiveTail {
pub fn new_pipe(&self, id: String, stream: String) -> ReceiverPipe {
pub fn new_pipe(&self, id: ulid, stream: String) -> ReceiverPipe {
let (sender, revc) = channel(id, stream.clone(), Arc::downgrade(&self.pipes));
self.pipes
.write()
Expand Down Expand Up @@ -106,7 +106,7 @@ pub struct ReceiverPipe {
}

fn channel(
id: String,
id: ulid,
stream: String,
weak_ptr: Weak<LiveTailRegistry>,
) -> (SenderPipe, ReceiverPipe) {
Expand Down
6 changes: 3 additions & 3 deletions src/oidc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*
*/

use std::collections::HashMap;

use openid::{Client, CompactJson, CustomClaims, Discovered, StandardClaims};
use std::collections::HashMap;
use ulid::Ulid;
use url::Url;

pub type DiscoveredClient = Client<Discovered, Claims>;
Expand All @@ -37,7 +37,7 @@ pub enum Origin {
#[derive(Debug, Clone)]
pub struct OpenidConfig {
/// Client id
pub id: String,
pub id: Ulid,
/// Client Secret
pub secret: String,
/// OP host address over which discovery can be done
Expand Down
9 changes: 5 additions & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
sync::{Arc, Mutex, RwLock},
time::{Instant, SystemTime, UNIX_EPOCH},
};
use ulid::Ulid;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
Expand Down Expand Up @@ -100,15 +101,15 @@ pub struct Stream {
pub data_path: PathBuf,
pub options: Arc<Options>,
pub writer: Mutex<Writer>,
pub ingestor_id: Option<String>,
pub ingestor_id: Option<Ulid>,
}

impl Stream {
pub fn new(
options: Arc<Options>,
stream_name: impl Into<String>,
metadata: LogStreamMetadata,
ingestor_id: Option<String>,
ingestor_id: Option<Ulid>,
) -> StreamRef {
let stream_name = stream_name.into();
let data_path = options.local_stream_data_path(&stream_name);
Expand Down Expand Up @@ -763,7 +764,7 @@ impl Streams {
options: Arc<Options>,
stream_name: String,
metadata: LogStreamMetadata,
ingestor_id: Option<String>,
ingestor_id: Option<Ulid>,
) -> StreamRef {
let mut guard = self.write().expect(LOCK_EXPECT);
if let Some(stream) = guard.get(&stream_name) {
Expand Down Expand Up @@ -1332,7 +1333,7 @@ mod tests {
let options = Arc::new(Options::default());
let stream_name = String::from("concurrent_stream");
let metadata = LogStreamMetadata::default();
let ingestor_id = Some(String::from("concurrent_ingestor"));
let ingestor_id = Some(Ulid::from_string("concurrent_ingestor").unwrap());

// Barrier to synchronize threads
let barrier = Arc::new(Barrier::new(2));
Expand Down
5 changes: 3 additions & 2 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::Serialize;
use tracing::error;
use ulid::Ulid;

use crate::{
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
Expand Down Expand Up @@ -78,7 +79,7 @@ pub enum ResourceType {

#[derive(Debug, Serialize)]
pub struct Resource {
id: String,
id: Ulid,
name: String,
resource_type: ResourceType,
}
Expand Down Expand Up @@ -277,7 +278,7 @@ pub async fn generate_home_search_response(
for title in stream_titles {
if title.to_lowercase().contains(query_value) {
resources.push(Resource {
id: title.clone(),
id: Ulid::from_string(&title).unwrap_or_else(|_| Ulid::new()),
name: title,
resource_type: ResourceType::DataSet,
});
Expand Down
10 changes: 5 additions & 5 deletions src/rbac/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use role::model::DefaultPrivilege;
use serde::Serialize;
use url::Url;

use crate::rbac::map::{mut_sessions, mut_users, sessions, users};
use crate::rbac::role::Action;
use crate::rbac::user::User;

use self::map::SessionKey;
use self::role::{Permission, RoleBuilder};
use self::user::UserType;
use crate::rbac::map::{mut_sessions, mut_users, sessions, users};
use crate::rbac::role::Action;
use crate::rbac::user::User;
use ulid::Ulid;

#[derive(PartialEq)]
pub enum Response {
Expand Down Expand Up @@ -176,7 +176,7 @@ impl Users {
#[derive(Debug, Serialize, Clone)]
pub struct UsersPrism {
// username
pub id: String,
pub id: Ulid,
// oaith or native
pub method: String,
// email only if method is oauth
Expand Down
6 changes: 3 additions & 3 deletions src/rbac/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*
*/

use std::collections::HashSet;

use argon2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Argon2, PasswordHash, PasswordVerifier,
};
use std::collections::HashSet;
use ulid::Ulid;

use rand::distributions::{Alphanumeric, DistString};

Expand Down Expand Up @@ -152,7 +152,7 @@ pub fn get_admin_user() -> User {

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct OAuth {
pub userid: String,
pub userid: Ulid,
pub user_info: UserInfo,
}

Expand Down
10 changes: 5 additions & 5 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::{
};

use chrono::Utc;

use std::fmt::Debug;
use ulid::Ulid;

mod azure_blob;
mod localfs;
Expand Down Expand Up @@ -174,25 +174,25 @@ impl std::fmt::Display for StreamType {

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Owner {
pub id: String,
pub id: Ulid,
pub group: String,
}

impl Owner {
pub fn new(id: String, group: String) -> Self {
pub fn new(id: Ulid, group: String) -> Self {
Self { id, group }
}
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Permisssion {
pub id: String,
pub id: Ulid,
pub group: String,
pub access: Vec<String>,
}

impl Permisssion {
pub fn new(id: String) -> Self {
pub fn new(id: Ulid) -> Self {
Self {
id: id.clone(),
group: id,
Expand Down
7 changes: 4 additions & 3 deletions src/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::RwLock;
use ulid::Ulid;

use super::TimeFilter;
use crate::{
Expand Down Expand Up @@ -73,21 +74,21 @@ impl FilterType {

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterBuilder {
pub id: String,
pub id: Ulid,
pub combinator: String,
pub rules: Vec<FilterRules>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterRules {
pub id: String,
pub id: Ulid,
pub combinator: String,
pub rules: Vec<Rules>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Rules {
pub id: String,
pub id: Ulid,
pub field: String,
pub value: String,
pub operator: String,
Expand Down
Loading