-
Notifications
You must be signed in to change notification settings - Fork 583
Adding two level hashing in metrics hashmap #1564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
98db4f3
1c440e0
75c853e
fd858f6
1f05dcf
d191cf7
fd94caa
da817b5
498f088
2930fe1
ebe4a38
d8cbc4c
cad1391
61ae262
2b8549f
44efee7
d8c56da
7b0bac5
61c8b0d
267e305
078c994
cc12da1
90bbb2d
f89c3ea
603305e
21b0b3e
f33729c
8b48564
04c68c2
fdb5020
d973c4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,36 @@ | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::{ | ||
collections::{hash_map::Entry, HashMap}, | ||
sync::Mutex, | ||
sync::{Arc, Mutex}, | ||
time::SystemTime, | ||
}; | ||
|
||
use crate::attributes::AttributeSet; | ||
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; | ||
use opentelemetry::{global, metrics::MetricsError}; | ||
use std::hash::{Hash, Hasher}; | ||
|
||
#[cfg(feature = "use_hashbrown")] | ||
use ahash::AHasher; | ||
#[cfg(feature = "use_hashbrown")] | ||
use hashbrown::{hash_map::Entry, HashMap}; | ||
|
||
#[cfg(not(feature = "use_hashbrown"))] | ||
use std::collections::{ | ||
hash_map::{DefaultHasher, Entry}, | ||
HashMap, | ||
}; | ||
|
||
use super::{ | ||
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, | ||
AtomicTracker, Number, | ||
}; | ||
|
||
const BUCKET_COUNT: usize = 256; | ||
type BucketValue<T> = Mutex<Option<HashMap<AttributeSet, T>>>; | ||
type Buckets<T> = Arc<[BucketValue<T>; BUCKET_COUNT]>; | ||
/// The storage for sums. | ||
struct ValueMap<T: Number<T>> { | ||
values: Mutex<HashMap<AttributeSet, T>>, | ||
buckets: Buckets<T>, | ||
has_no_value_attribute_value: AtomicBool, | ||
no_attribute_value: T::AtomicTracker, | ||
} | ||
|
@@ -29,12 +43,41 @@ impl<T: Number<T>> Default for ValueMap<T> { | |
|
||
impl<T: Number<T>> ValueMap<T> { | ||
fn new() -> Self { | ||
let buckets = std::iter::repeat_with(|| Mutex::new(None)) | ||
.take(BUCKET_COUNT) | ||
.collect::<Vec<_>>() | ||
.try_into() | ||
.unwrap_or_else(|_| panic!("Incorrect length")); | ||
lalitb marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
ValueMap { | ||
values: Mutex::new(HashMap::new()), | ||
buckets: Arc::new(buckets), | ||
has_no_value_attribute_value: AtomicBool::new(false), | ||
no_attribute_value: T::new_atomic_tracker(), | ||
} | ||
} | ||
|
||
// Hash function to determine the bucket | ||
fn hash_to_bucket(key: &AttributeSet) -> u8 { | ||
#[cfg(not(feature = "use_hashbrown"))] | ||
let mut hasher = DefaultHasher::new(); | ||
#[cfg(feature = "use_hashbrown")] | ||
let mut hasher = AHasher::default(); | ||
|
||
key.hash(&mut hasher); | ||
// Use the 8 least significant bits directly, avoiding the modulus operation. | ||
hasher.finish() as u8 | ||
cijothomas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
|
||
// Calculate the total length of data points across all buckets. | ||
fn total_data_points_count(&self) -> usize { | ||
cijothomas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
self.buckets | ||
.iter() | ||
.map(|bucket_mutex| { | ||
let locked_bucket = bucket_mutex.lock().unwrap(); | ||
locked_bucket.as_ref().map_or(0, |bucket| bucket.len()) | ||
}) | ||
.sum::<usize>() | ||
} | ||
} | ||
|
||
impl<T: Number<T>> ValueMap<T> { | ||
|
@@ -43,22 +86,33 @@ impl<T: Number<T>> ValueMap<T> { | |
self.no_attribute_value.add(measurement); | ||
self.has_no_value_attribute_value | ||
.store(true, Ordering::Release); | ||
} else if let Ok(mut values) = self.values.lock() { | ||
let size = values.len(); | ||
match values.entry(attrs) { | ||
Entry::Occupied(mut occupied_entry) => { | ||
let sum = occupied_entry.get_mut(); | ||
*sum += measurement; | ||
} | ||
Entry::Vacant(vacant_entry) => { | ||
if is_under_cardinality_limit(size) { | ||
vacant_entry.insert(measurement); | ||
} else { | ||
values | ||
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) | ||
.and_modify(|val| *val += measurement) | ||
.or_insert(measurement); | ||
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into())); | ||
} else { | ||
let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing | ||
let bucket_mutex = &self.buckets[bucket_index]; | ||
let mut bucket_guard = bucket_mutex.lock().unwrap(); | ||
|
||
if bucket_guard.is_none() { | ||
*bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None | ||
} | ||
|
||
if let Some(ref mut values) = *bucket_guard { | ||
let size = values.len(); | ||
cijothomas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
match values.entry(attrs) { | ||
Entry::Occupied(mut occupied_entry) => { | ||
let sum = occupied_entry.get_mut(); | ||
*sum += measurement; | ||
} | ||
Entry::Vacant(vacant_entry) => { | ||
if is_under_cardinality_limit(size) { | ||
vacant_entry.insert(measurement); | ||
} else { | ||
// TBD - Update total_count ?? | ||
values | ||
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) | ||
|
||
.and_modify(|val| *val += measurement) | ||
.or_insert(measurement); | ||
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into())); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -112,16 +166,10 @@ impl<T: Number<T>> Sum<T> { | |
s_data.is_monotonic = self.monotonic; | ||
s_data.data_points.clear(); | ||
|
||
let mut values = match self.value_map.values.lock() { | ||
Ok(v) => v, | ||
Err(_) => return (0, None), | ||
}; | ||
|
||
let n = values.len() + 1; | ||
if n > s_data.data_points.capacity() { | ||
s_data | ||
.data_points | ||
.reserve_exact(n - s_data.data_points.capacity()); | ||
let total_len: usize = self.value_map.total_data_points_count() + 1; | ||
if total_len > s_data.data_points.capacity() { | ||
let additional_space_needed = total_len - s_data.data_points.capacity(); | ||
s_data.data_points.reserve_exact(additional_space_needed); | ||
} | ||
|
||
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); | ||
|
@@ -139,14 +187,19 @@ impl<T: Number<T>> Sum<T> { | |
}); | ||
} | ||
|
||
for (attrs, value) in values.drain() { | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs, | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value, | ||
exemplars: vec![], | ||
}); | ||
for bucket_mutex in self.value_map.buckets.iter() { | ||
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() { | ||
for (attrs, value) in locked_bucket.drain() { | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs, | ||
start_time: Some(*self.start.lock().unwrap()), | ||
time: Some(t), | ||
value, | ||
exemplars: vec![], | ||
}); | ||
} | ||
// The bucket is automatically cleared by the .drain() method | ||
} | ||
} | ||
|
||
// The delta collection cycle resets. | ||
|
@@ -181,16 +234,10 @@ impl<T: Number<T>> Sum<T> { | |
s_data.is_monotonic = self.monotonic; | ||
s_data.data_points.clear(); | ||
|
||
let values = match self.value_map.values.lock() { | ||
Ok(v) => v, | ||
Err(_) => return (0, None), | ||
}; | ||
|
||
let n = values.len() + 1; | ||
if n > s_data.data_points.capacity() { | ||
s_data | ||
.data_points | ||
.reserve_exact(n - s_data.data_points.capacity()); | ||
let total_len: usize = self.value_map.total_data_points_count() + 1; | ||
if total_len > s_data.data_points.capacity() { | ||
let additional_space_needed = total_len - s_data.data_points.capacity(); | ||
s_data.data_points.reserve_exact(additional_space_needed); | ||
} | ||
|
||
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); | ||
|
@@ -213,14 +260,18 @@ impl<T: Number<T>> Sum<T> { | |
// are unbounded number of attribute sets being aggregated. Attribute | ||
// sets that become "stale" need to be forgotten so this will not | ||
// overload the system. | ||
for (attrs, value) in values.iter() { | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: *value, | ||
exemplars: vec![], | ||
}); | ||
for bucket_mutex in self.value_map.buckets.iter() { | ||
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() { | ||
for (attrs, value) in locked_bucket.iter() { | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time | ||
time: Some(t), | ||
value: *value, | ||
exemplars: vec![], | ||
}); | ||
} | ||
} | ||
} | ||
|
||
( | ||
|
@@ -274,18 +325,13 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
s_data.temporality = Temporality::Delta; | ||
s_data.is_monotonic = self.monotonic; | ||
|
||
let mut values = match self.value_map.values.lock() { | ||
Ok(v) => v, | ||
Err(_) => return (0, None), | ||
}; | ||
|
||
let n = values.len() + 1; | ||
if n > s_data.data_points.capacity() { | ||
s_data | ||
.data_points | ||
.reserve_exact(n - s_data.data_points.capacity()); | ||
let total_len: usize = self.value_map.total_data_points_count() + 1; | ||
if total_len > s_data.data_points.capacity() { | ||
let additional_space_needed = total_len - s_data.data_points.capacity(); | ||
s_data.data_points.reserve_exact(additional_space_needed); | ||
} | ||
let mut new_reported = HashMap::with_capacity(n); | ||
|
||
let mut new_reported = HashMap::with_capacity(total_len); | ||
let mut reported = match self.reported.lock() { | ||
Ok(r) => r, | ||
Err(_) => return (0, None), | ||
|
@@ -305,19 +351,23 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
}); | ||
} | ||
|
||
let default = T::default(); | ||
for (attrs, value) in values.drain() { | ||
let delta = value - *reported.get(&attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), value); | ||
for bucket_mutex in self.value_map.buckets.iter() { | ||
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() { | ||
let default = T::default(); | ||
for (attrs, value) in locked_bucket.drain() { | ||
let delta = value - *reported.get(&attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), value); | ||
} | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: delta, | ||
exemplars: vec![], | ||
}); | ||
} | ||
} | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: delta, | ||
exemplars: vec![], | ||
}); | ||
} | ||
|
||
// The delta collection cycle resets. | ||
|
@@ -356,18 +406,13 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
s_data.temporality = Temporality::Cumulative; | ||
s_data.is_monotonic = self.monotonic; | ||
|
||
let values = match self.value_map.values.lock() { | ||
Ok(v) => v, | ||
Err(_) => return (0, None), | ||
}; | ||
|
||
let n = values.len() + 1; | ||
if n > s_data.data_points.capacity() { | ||
s_data | ||
.data_points | ||
.reserve_exact(n - s_data.data_points.capacity()); | ||
let total_len: usize = self.value_map.total_data_points_count() + 1; | ||
if total_len > s_data.data_points.capacity() { | ||
let additional_space_needed = total_len - s_data.data_points.capacity(); | ||
s_data.data_points.reserve_exact(additional_space_needed); | ||
} | ||
let mut new_reported = HashMap::with_capacity(n); | ||
|
||
let mut new_reported = HashMap::with_capacity(total_len); | ||
let mut reported = match self.reported.lock() { | ||
Ok(r) => r, | ||
Err(_) => return (0, None), | ||
|
@@ -388,18 +433,22 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
} | ||
|
||
let default = T::default(); | ||
for (attrs, value) in values.iter() { | ||
let delta = *value - *reported.get(attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), *value); | ||
for bucket_mutex in self.value_map.buckets.iter() { | ||
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() { | ||
for (attrs, value) in locked_bucket.iter() { | ||
let delta = *value - *reported.get(attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), *value); | ||
} | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: *value, // For cumulative, we use the value directly without calculating delta | ||
exemplars: vec![], | ||
}); | ||
} | ||
} | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: delta, | ||
exemplars: vec![], | ||
}); | ||
} | ||
|
||
*reported = new_reported; | ||
|
Uh oh!
There was an error while loading. Please reload this page.