Skip to content

Commit c98f178

Browse files
authored
Add "update_if_exists", "update_mut_if_exists", "remove_if" methods (#6)
* Add "update_if_exists", "update_mut_if_exists", "remove_if" methods * No need for Arc and Pin there, only Box * Create tests badge in readme
1 parent fd9f3e9 commit c98f178

File tree

5 files changed

+250
-32
lines changed

5 files changed

+250
-32
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# cache-loader-async
2+
[![Tests](https://github.com/ZeroTwo-Bot/cache-loader-async-rs/actions/workflows/rust.yml/badge.svg?branch=master&event=push)](https://github.com/ZeroTwo-Bot/cache-loader-async-rs/actions/workflows/rust.yml)
3+
24
[crates.io](https://crates.io/crates/cache_loader_async)
35

46
The goal of this crate is to provide a thread-safe and easy way to access any data structure
@@ -84,5 +86,6 @@ pub trait CacheBacking<K, V>
8486
fn set(&mut self, key: K, value: V) -> Option<V>;
8587
fn remove(&mut self, key: &K) -> Option<V>;
8688
fn contains_key(&self, key: &K) -> bool;
89+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + 'static>);
8790
}
8891
```

src/backing.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub trait CacheBacking<K, V>
1717
fn set(&mut self, key: K, value: V) -> Option<V>;
1818
fn remove(&mut self, key: &K) -> Option<V>;
1919
fn contains_key(&self, key: &K) -> bool;
20+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + 'static>);
2021
}
2122

2223
#[cfg(feature = "lru-cache")]
@@ -48,6 +49,22 @@ impl<
4849
fn contains_key(&self, key: &K) -> bool {
4950
self.lru.contains(&key.clone())
5051
}
52+
53+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) {
54+
let keys = self.lru.iter()
55+
.filter_map(|(key, value)| {
56+
if predicate((key, value)) {
57+
Some(key)
58+
} else {
59+
None
60+
}
61+
})
62+
.cloned()
63+
.collect::<Vec<K>>();
64+
for key in keys.into_iter(){
65+
self.lru.pop(&key);
66+
}
67+
}
5168
}
5269

5370
#[cfg(feature = "lru-cache")]
@@ -118,6 +135,23 @@ impl<
118135
.filter(|(_, expiry)| SystemTime::now().lt(expiry))
119136
.is_some()
120137
}
138+
139+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) {
140+
let keys = self.map.iter()
141+
.filter_map(|(key, (value, _))| {
142+
if predicate((key, value)) {
143+
Some(key)
144+
} else {
145+
None
146+
}
147+
})
148+
.cloned()
149+
.collect::<Vec<K>>();
150+
for key in keys.into_iter() {
151+
self.map.remove(&key);
152+
self.expiry_queue.retain(|(expiry_key, _)| expiry_key.ne(&key))
153+
}
154+
}
121155
}
122156

123157
#[cfg(feature = "ttl-cache")]
@@ -169,6 +203,10 @@ impl<
169203
fn contains_key(&self, key: &K) -> bool {
170204
self.map.contains_key(key)
171205
}
206+
207+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) {
208+
self.map.retain(|k, v| !predicate((k, v)));
209+
}
172210
}
173211

174212
impl<K, V> HashMapBacking<K, V> {

src/cache_api.rs

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ pub enum CacheLoadingError<E: Debug> {
1111
#[error(transparent)]
1212
CommunicationError(CacheCommunicationError),
1313
#[error("No data found")]
14-
NoData(), // todo better handling here? eventually return loadingerror if possible
14+
NoData(),
15+
// todo better handling here? eventually return loadingerror if possible
1516
#[error("An error occurred when loading the entity from the loader function")]
16-
LoadingError(E)
17+
LoadingError(E),
1718
}
1819

1920
#[derive(Error, Debug)]
@@ -31,7 +32,6 @@ pub enum CacheCommunicationError {
3132
}
3233

3334
impl<E: Debug> CacheLoadingError<E> {
34-
3535
pub fn as_loading_error(&self) -> Option<&E> {
3636
match self {
3737
CacheLoadingError::LoadingError(error) => Some(error),
@@ -204,6 +204,18 @@ impl<
204204
.map(|meta| meta.result)
205205
}
206206

207+
/// Retrieves or loads the value for specified key from either cache or loader function with
208+
/// meta information, i.e. if the key was loaded from cache or from the loader function
209+
///
210+
/// # Arguments
211+
///
212+
/// * `key` - The key which should be loaded
213+
///
214+
/// # Return Value
215+
///
216+
/// Returns a Result with:
217+
/// Ok - Value of type ResultMeta<V>
218+
/// Err - Error of type CacheLoadingError
207219
pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError<E>> {
208220
self.send_cache_action(CacheAction::Get(key)).await
209221
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
@@ -277,7 +289,23 @@ impl<
277289
.map(|opt_meta| opt_meta.map(|meta| meta.result))
278290
}
279291

280-
/// Updates a key on the cache with the given update function and returns the previous value
292+
/// Removes all entries which match the specified predicate
293+
///
294+
/// # Arguments
295+
///
296+
/// * `predicate` - The predicate to test all entries against
297+
///
298+
/// # Return Value
299+
///
300+
/// Returns a Result with:
301+
/// Ok - Nothing, the removed values are discarded
302+
/// Err - Error of type CacheLoadingError -> the values were not discarded
303+
pub async fn remove_if<P: Fn((&K, Option<&V>)) -> bool + Send + Sync + 'static>(&self, predicate: P) -> Result<(), CacheLoadingError<E>> {
304+
self.send_cache_action(CacheAction::RemoveIf(Box::new(predicate))).await
305+
.map(|_| ())
306+
}
307+
308+
/// Updates a key on the cache with the given update function and returns the updated value
281309
///
282310
/// If the key is not present yet, it'll be loaded using the loader function and will be
283311
/// updated once this loader function completes.
@@ -298,18 +326,82 @@ impl<
298326
/// Err - Error of type CacheLoadingError
299327
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
300328
where U: FnOnce(V) -> V + Send + 'static {
301-
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn))).await
329+
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn), true)).await
302330
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
303331
.map(|meta| meta.result)
304332
}
305333

334+
/// Updates a key on the cache with the given update function and returns the updated value if
335+
/// it existed
336+
///
337+
/// If the key is not present yet, it'll be ignored. This also counts for keys in LOADING state.
338+
///
339+
/// # Arguments
340+
///
341+
/// * `key` - The key which should be updated
342+
/// * `update_fn` - A `FnOnce(V) -> V` which has the current value as parameter and should
343+
/// return the updated value
344+
///
345+
/// # Return Value
346+
///
347+
/// Returns a Result with:
348+
/// Ok - Optional value of type V which is the previously mapped value
349+
/// Err - Error of type CacheLoadingError
350+
pub async fn update_if_exists<U>(&self, key: K, update_fn: U) -> Result<Option<V>, CacheLoadingError<E>>
351+
where U: FnOnce(V) -> V + Send + 'static {
352+
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn), false)).await
353+
.map(|opt| opt.map(|meta| meta.result))
354+
}
355+
356+
/// Updates a key on the cache with the given update function and returns the updated value
357+
///
358+
/// If the key is not present yet, it'll be loaded using the loader function and will be
359+
/// updated once this loader function completes.
360+
/// In case the key was manually updated via `set` during the loader function the update will
361+
/// take place on the manually updated value, so user-controlled input takes precedence over
362+
/// the loader function
363+
///
364+
/// # Arguments
365+
///
366+
/// * `key` - The key which should be updated
367+
/// * `update_fn` - A `FnMut(&mut V) -> ()` which has the current value as parameter and should
368+
/// update it accordingly
369+
///
370+
/// # Return Value
371+
///
372+
/// Returns a Result with:
373+
/// Ok - Value of type V
374+
/// Err - Error of type CacheLoadingError
306375
pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
307376
where U: FnMut(&mut V) -> () + Send + 'static {
308-
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn))).await
377+
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn), true)).await
309378
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
310379
.map(|meta| meta.result)
311380
}
312381

382+
/// Updates a key on the cache with the given update function and returns the updated value if
383+
/// it existed
384+
///
385+
/// If the key is not present yet, it'll be ignored.
386+
/// Keys in LOADING state will still be updated as they get available.
387+
///
388+
/// # Arguments
389+
///
390+
/// * `key` - The key which should be updated
391+
/// * `update_fn` - A `FnMut(&mut V) -> ()` which has the current value as parameter and should
392+
/// update it accordingly
393+
///
394+
/// # Return Value
395+
///
396+
/// Returns a Result with:
397+
/// Ok - Optional value of type V
398+
/// Err - Error of type CacheLoadingError
399+
pub async fn update_mut_if_exists<U>(&self, key: K, update_fn: U) -> Result<Option<V>, CacheLoadingError<E>>
400+
where U: FnMut(&mut V) -> () + Send + 'static {
401+
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn), false)).await
402+
.map(|opt| opt.map(|meta| meta.result))
403+
}
404+
313405
async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError<E>> {
314406
let (tx, rx) = tokio::sync::oneshot::channel();
315407
match self.tx.send(CacheMessage {

src/internal_cache.rs

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ pub(crate) enum CacheAction<K, V> {
99
GetIfPresent(K),
1010
Get(K),
1111
Set(K, V),
12-
Update(K, Box<dyn FnOnce(V) -> V + Send + 'static>),
13-
UpdateMut(K, Box<dyn FnMut(&mut V) -> () + Send + 'static>),
12+
Update(K, Box<dyn FnOnce(V) -> V + Send + 'static>, bool),
13+
UpdateMut(K, Box<dyn FnMut(&mut V) -> () + Send + 'static>, bool),
1414
Remove(K),
15+
RemoveIf(Box<dyn Fn((&K, Option<&V>)) -> bool + Send + Sync + 'static>),
1516
// Internal use
1617
SetAndUnblock(K, V),
1718
Unblock(K),
@@ -57,9 +58,10 @@ impl<
5758
CacheAction::GetIfPresent(key) => self.get_if_present(key),
5859
CacheAction::Get(key) => self.get(key),
5960
CacheAction::Set(key, value) => self.set(key, value, false),
60-
CacheAction::Update(key, update_fn) => self.update(key, update_fn),
61-
CacheAction::UpdateMut(key, update_mut_fn) => self.update_mut(key, update_mut_fn),
61+
CacheAction::Update(key, update_fn, load) => self.update(key, update_fn, load),
62+
CacheAction::UpdateMut(key, update_mut_fn, load) => self.update_mut(key, update_mut_fn, load),
6263
CacheAction::Remove(key) => self.remove(key),
64+
CacheAction::RemoveIf(predicate) => self.remove_if(predicate),
6365
CacheAction::SetAndUnblock(key, value) => self.set(key, value, true),
6466
CacheAction::Unblock(key) => {
6567
self.unblock(key);
@@ -95,7 +97,26 @@ impl<
9597
}
9698
}
9799

98-
fn update_mut(&mut self, key: K, mut update_mut_fn: Box<dyn FnMut(&mut V) -> () + Send + 'static>) -> CacheResult<V, E> {
100+
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, Option<&V>)) -> bool + Send + Sync + 'static>) -> CacheResult<V, E> {
101+
self.data.remove_if(self.to_predicate(predicate));
102+
CacheResult::None
103+
}
104+
105+
fn to_predicate(&self, predicate: Box<dyn Fn((&K, Option<&V>)) -> bool + Send + Sync + 'static>)
106+
-> Box<dyn Fn((&K, &CacheEntry<V, E>)) -> bool + Send + Sync + 'static> {
107+
Box::new(move |(key, value)| {
108+
match value {
109+
CacheEntry::Loaded(value) => {
110+
predicate((key, Some(value)))
111+
}
112+
CacheEntry::Loading(_) => {
113+
predicate((key, None))
114+
}
115+
}
116+
})
117+
}
118+
119+
fn update_mut(&mut self, key: K, mut update_mut_fn: Box<dyn FnMut(&mut V) -> () + Send + 'static>, load: bool) -> CacheResult<V, E> {
99120
match self.data.get_mut(&key) {
100121
Some(entry) => {
101122
match entry {
@@ -110,7 +131,7 @@ impl<
110131
rx.recv().await.ok(); // result confirmed
111132
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
112133
cache_tx.send(CacheMessage {
113-
action: CacheAction::UpdateMut(key, update_mut_fn),
134+
action: CacheAction::UpdateMut(key, update_mut_fn, load),
114135
response: response_tx,
115136
}).await.ok();
116137
match response_rx.await.unwrap() {
@@ -122,31 +143,41 @@ impl<
122143
}
123144
}
124145
None => {
125-
let result = self.get(key.clone());
126-
match result {
127-
CacheResult::Loading(waiter) => {
128-
let cache_tx = self.tx.clone();
129-
CacheResult::Loading(tokio::spawn(async move {
130-
waiter.await.ok(); // result confirmed
131-
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
132-
cache_tx.send(CacheMessage {
133-
action: CacheAction::UpdateMut(key, update_mut_fn),
134-
response: response_tx,
135-
}).await.ok();
136-
match response_rx.await.unwrap() {
137-
CacheResult::Found(data) => Ok(data),
138-
_ => Err(CacheLoadingError::NoData())
139-
}
140-
}))
146+
if load {
147+
let result = self.get(key.clone());
148+
match result {
149+
CacheResult::Loading(waiter) => {
150+
let cache_tx = self.tx.clone();
151+
CacheResult::Loading(tokio::spawn(async move {
152+
waiter.await.ok(); // result confirmed
153+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
154+
cache_tx.send(CacheMessage {
155+
action: CacheAction::UpdateMut(key, update_mut_fn, load),
156+
response: response_tx,
157+
}).await.ok();
158+
match response_rx.await.unwrap() {
159+
CacheResult::Found(data) => Ok(data),
160+
_ => Err(CacheLoadingError::NoData())
161+
}
162+
}))
163+
}
164+
_ => CacheResult::None,
141165
}
142-
_ => CacheResult::None,
166+
} else {
167+
CacheResult::None
143168
}
144169
}
145170
}
146171
}
147172

148-
fn update(&mut self, key: K, update_fn: Box<dyn FnOnce(V) -> V + Send + 'static>) -> CacheResult<V, E> {
149-
match self.get(key.clone()) {
173+
fn update(&mut self, key: K, update_fn: Box<dyn FnOnce(V) -> V + Send + 'static>, load: bool) -> CacheResult<V, E> {
174+
let data = if load {
175+
self.get(key.clone())
176+
} else {
177+
self.get_if_present(key.clone())
178+
};
179+
180+
match data {
150181
CacheResult::Found(data) => {
151182
let updated_data = update_fn(data);
152183
self.data.set(key, CacheEntry::Loaded(updated_data.clone()));
@@ -161,7 +192,7 @@ impl<
161192
// todo: is there a possibility that this loops forever?
162193
let (response_tx, rx) = tokio::sync::oneshot::channel();
163194
tx.send(CacheMessage {
164-
action: CacheAction::Update(key, update_fn),
195+
action: CacheAction::Update(key, update_fn, load),
165196
response: response_tx,
166197
}).await.ok();
167198
match rx.await {

0 commit comments

Comments
 (0)