Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl BlobTree {
// to the tree
}

let iter = self
.index
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
let iter =
self.index
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None, None);

// Stores the max seqno of every blob file
let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
Expand Down Expand Up @@ -330,6 +330,7 @@ impl AbstractTree for BlobTree {
data_block_size: self.index.config.data_block_size,
index_block_size: self.index.config.index_block_size,
folder: lsm_segment_folder,
prefix_extractor: self.index.prefix_extractor.clone(),
})?
.use_compression(self.index.config.compression);

Expand Down Expand Up @@ -549,7 +550,7 @@ impl AbstractTree for BlobTree {
Box::new(
self.index
.0
.create_range(&range, Some(seqno), index)
.create_range(&range, Some(seqno), index, None)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
Expand Down Expand Up @@ -577,7 +578,7 @@ impl AbstractTree for BlobTree {
Box::new(
self.index
.0
.create_range(&range, None, None)
.create_range(&range, None, None, None)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
Expand Down
6 changes: 6 additions & 0 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
level_manifest::LevelManifest,
level_reader::LevelReader,
merge::{BoxedIterator, Merger},
prefix_extractor::PrefixExtractor,
segment::{
block_index::{
full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex, BlockIndexImpl,
Expand Down Expand Up @@ -50,6 +51,8 @@ pub struct Options {

/// Evicts items that are older than this seqno (MVCC GC).
pub eviction_seqno: u64,

pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
}

impl Options {
Expand All @@ -63,6 +66,7 @@ impl Options {
stop_signal: tree.stop_signal.clone(),
strategy,
eviction_seqno: 0,
prefix_extractor: tree.prefix_extractor.clone(),
}
}
}
Expand Down Expand Up @@ -143,6 +147,7 @@ fn create_compaction_stream<'a>(
&(Unbounded, Unbounded),
(Some(lo), Some(hi)),
crate::segment::value_block::CachePolicy::Read,
None,
)));

found += hi - lo + 1;
Expand Down Expand Up @@ -251,6 +256,7 @@ fn merge_segments(
segment_id: 0, // TODO: this is never used in MultiWriter
data_block_size: opts.config.data_block_size,
index_block_size: opts.config.index_block_size,
prefix_extractor: opts.prefix_extractor.clone(),
},
) else {
log::error!("Compaction failed");
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
descriptor_table::FileDescriptorTable,
path::absolute_path,
prefix_extractor::{self, PrefixExtractor},
segment::meta::{CompressionType, TableType},
BlobTree, BlockCache, Tree,
};
Expand Down Expand Up @@ -102,6 +103,10 @@ pub struct Config {
/// Descriptor table to use
#[doc(hidden)]
pub descriptor_table: Arc<FileDescriptorTable>,

/// Custom implementation to extract prefix from a key
/// when using read or updates in bloom filter to optimize DB scans by prefix
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
}

impl Default for Config {
Expand All @@ -123,6 +128,7 @@ impl Default for Config {
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
blob_file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024,
blob_file_separation_threshold: /* 4 KiB */ 4 * 1_024,
prefix_extractor: None,
}
}
}
Expand Down Expand Up @@ -304,6 +310,13 @@ impl Config {
self
}

#[must_use]
#[doc(hidden)]
pub fn prefix_extractor(mut self, prefix_extractor: Arc<dyn PrefixExtractor>) -> Self {
self.prefix_extractor = Some(prefix_extractor);
self
}

/// Opens a tree using the config.
///
/// # Errors
Expand Down
89 changes: 53 additions & 36 deletions src/level_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
level_manifest::level::Level,
segment::{range::Range, value_block::CachePolicy},
segment::{range::Range, value_block::CachePolicy, Segment},
InternalValue, UserKey,
};
use std::{ops::Bound, sync::Arc};
Expand All @@ -17,6 +17,7 @@ pub struct LevelReader {
lo_reader: Option<Range>,
hi_reader: Option<Range>,
cache_policy: CachePolicy,
prefix_hash: Option<(u64, u64)>,
}

impl LevelReader {
Expand All @@ -25,6 +26,7 @@ impl LevelReader {
level: Arc<Level>,
range: &(Bound<UserKey>, Bound<UserKey>),
cache_policy: CachePolicy,
prefix_hash: Option<(u64, u64)>,
) -> Self {
assert!(!level.is_empty(), "level reader cannot read empty level");

Expand All @@ -39,10 +41,17 @@ impl LevelReader {
lo_reader: None,
hi_reader: None,
cache_policy,
prefix_hash,
};
};

Self::from_indexes(level, range, (Some(lo), Some(hi)), cache_policy)
Self::from_indexes(
level,
range,
(Some(lo), Some(hi)),
cache_policy,
prefix_hash,
)
}

#[must_use]
Expand All @@ -51,6 +60,7 @@ impl LevelReader {
range: &(Bound<UserKey>, Bound<UserKey>),
(lo, hi): (Option<usize>, Option<usize>),
cache_policy: CachePolicy,
prefix_hash: Option<(u64, u64)>,
) -> Self {
let lo = lo.unwrap_or_default();
let hi = hi.unwrap_or(level.len() - 1);
Expand All @@ -74,8 +84,14 @@ impl LevelReader {
lo_reader: Some(lo_reader),
hi_reader,
cache_policy,
prefix_hash,
}
}

fn may_segment_contain_hash(&self, segment: &Segment) -> bool {
self.prefix_hash
.map_or(true, |hash| segment.may_contain_hash(hash))
}
}

impl Iterator for LevelReader {
Expand All @@ -90,24 +106,20 @@ impl Iterator for LevelReader {

// NOTE: Lo reader is empty, get next one
self.lo_reader = None;
self.lo += 1;

if self.lo < self.hi {
self.lo_reader = Some(
self.segments
.get(self.lo)
.expect("should exist")
.iter()
.cache_policy(self.cache_policy),
);
}
} else if let Some(hi_reader) = &mut self.hi_reader {
} else if self.lo == self.hi {
// NOTE: We reached the hi marker, so consume from it instead
//
// If it returns nothing, it is empty, so we are done
return hi_reader.next();
return self.hi_reader.as_mut().and_then(|r| r.next());
} else {
return None;
self.lo += 1;

if self.lo < self.hi {
let segment = self.segments.get(self.lo).expect("should exist");
if self.may_segment_contain_hash(segment) {
self.lo_reader = Some(segment.iter().cache_policy(self.cache_policy));
}
}
}
}
}
Expand All @@ -121,26 +133,22 @@ impl DoubleEndedIterator for LevelReader {
return Some(item);
}

// NOTE: Hi reader is empty, get orev one
// NOTE: Hi reader is empty, get the previous one
self.hi_reader = None;
self.hi -= 1;

if self.lo < self.hi {
self.hi_reader = Some(
self.segments
.get(self.hi)
.expect("should exist")
.iter()
.cache_policy(self.cache_policy),
);
}
} else if let Some(lo_reader) = &mut self.lo_reader {
} else if self.lo == self.hi {
// NOTE: We reached the lo marker, so consume from it instead
//
// If it returns nothing, it is empty, so we are done
return lo_reader.next_back();
return self.lo_reader.as_mut().and_then(|r| r.next_back());
} else {
return None;
self.hi -= 1;

if self.lo < self.hi {
let segment = self.segments.get(self.hi).expect("should exist");
if self.may_segment_contain_hash(segment) {
self.hi_reader = Some(segment.iter().cache_policy(self.cache_policy));
}
}
}
}
}
Expand Down Expand Up @@ -190,8 +198,12 @@ mod tests {

#[allow(clippy::unwrap_used)]
{
let multi_reader =
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
let multi_reader = LevelReader::new(
level.clone(),
&(Unbounded, Unbounded),
CachePolicy::Read,
None,
);

let mut iter = multi_reader.flatten();

Expand All @@ -211,8 +223,12 @@ mod tests {

#[allow(clippy::unwrap_used)]
{
let multi_reader =
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
let multi_reader = LevelReader::new(
level.clone(),
&(Unbounded, Unbounded),
CachePolicy::Read,
None,
);

let mut iter = multi_reader.rev().flatten();

Expand All @@ -232,7 +248,8 @@ mod tests {

#[allow(clippy::unwrap_used)]
{
let multi_reader = LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read);
let multi_reader =
LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read, None);

let mut iter = multi_reader.flatten();

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ mod block_cache;
#[cfg(feature = "bloom")]
pub mod bloom;

#[doc(hidden)]
pub mod prefix_extractor;

#[doc(hidden)]
pub mod coding;
pub mod compaction;
Expand Down
11 changes: 11 additions & 0 deletions src/prefix_extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/// A trait allowing to customize prefix extraction in operations with bloom filter.
/// It defines how prefix should be extracted from the key, when update or read
/// a bloom filter.
pub trait PrefixExtractor: Sync + Send {
/// Extracts prefix from original key
fn transform<'a>(&self, key: &'a [u8]) -> &'a [u8];
/// Checks if a key is in domain and prefix can be extracted from it.
/// For example if `PrefixExtractor` suppose to extract first 4 bytes from key,
/// `in_domain(&[0, 2, 3])` should return false
fn in_domain(&self, key: &[u8]) -> bool;
}
19 changes: 14 additions & 5 deletions src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl DoubleEndedIterator for TreeIter {
fn collect_disjoint_tree_with_range(
level_manifest: &LevelManifest,
bounds: &(Bound<UserKey>, Bound<UserKey>),
prefix_hash: Option<(u64, u64)>,
) -> MultiReader<LevelReader> {
debug_assert!(level_manifest.is_disjoint());

Expand Down Expand Up @@ -120,7 +121,7 @@ fn collect_disjoint_tree_with_range(

let readers = levels
.into_iter()
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write, prefix_hash))
.collect();

MultiReader::new(readers)
Expand All @@ -134,6 +135,7 @@ impl TreeIter {
bounds: (Bound<UserKey>, Bound<UserKey>),
seqno: Option<SeqNo>,
level_manifest: ArcRwLockReadGuardian<LevelManifest>,
prefix_hash: Option<(u64, u64)>,
) -> Self {
Self::new(guard, |lock| {
let lo = match &bounds.0 {
Expand Down Expand Up @@ -185,7 +187,8 @@ impl TreeIter {

// NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
if level_manifest.is_disjoint() {
let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
let reader =
collect_disjoint_tree_with_range(&level_manifest, &bounds, prefix_hash);

if let Some(seqno) = seqno {
iters.push(Box::new(reader.filter(move |item| match item {
Expand All @@ -199,8 +202,12 @@ impl TreeIter {
for level in &level_manifest.levels {
if level.is_disjoint {
if !level.is_empty() {
let reader =
LevelReader::new(level.clone(), &bounds, CachePolicy::Write);
let reader = LevelReader::new(
level.clone(),
&bounds,
CachePolicy::Write,
prefix_hash,
);

if let Some(seqno) = seqno {
iters.push(Box::new(reader.filter(move |item| match item {
Expand All @@ -213,7 +220,9 @@ impl TreeIter {
}
} else {
for segment in &level.segments {
if segment.check_key_range_overlap(&bounds) {
let may_contain_hash =
prefix_hash.map_or(true, |hash| segment.may_contain_hash(hash));
if may_contain_hash && segment.check_key_range_overlap(&bounds) {
let reader = segment.range(bounds.clone());

if let Some(seqno) = seqno {
Expand Down
Loading
Loading