Skip to content

Commit f0cc51d

Browse files
authored
feat: Add generic progress emitter utility (#141)
This adds a generic utility which can be used to track progress of data transfers by emitting progress via a tokio broadcast channel. This is extracted from the deltachat PR: chatmail/core#4007
1 parent 1e64f1e commit f0cc51d

File tree

4 files changed

+210
-1
lines changed

4 files changed

+210
-1
lines changed

Cargo.lock

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ ed25519-dalek = { version = "1.0.1", features = ["serde"] }
2424
futures = "0.3.25"
2525
indicatif = { version = "0.17", features = ["tokio"], optional = true }
2626
multibase = { version = "0.9.1", optional = true }
27+
portable-atomic = "1"
2728
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
2829
quinn = "0.9.3"
2930
rand = "0.7"

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#![deny(rustdoc::broken_intra_doc_links)]
44
pub mod blobs;
55
pub mod get;
6+
pub mod progress;
67
pub mod protocol;
78
pub mod provider;
89

src/progress.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
//! Generic utilities to track progress of data transfers.
2+
//!
3+
//! This is not especially specific to sendme but can be helpful together with it. The
4+
//! [`ProgressEmitter`] has a [`ProgressEmitter::wrap_async_read`] method which can make it
5+
//! easy to track process of transfers.
6+
//!
7+
//! However based on your environment there might also be better choices for this, e.g. very
8+
//! similar and more advanced functionality is available in the `indicatif` crate for
9+
//! terminal applications.
10+
11+
use std::pin::Pin;
12+
use std::sync::atomic::Ordering;
13+
use std::sync::Arc;
14+
use std::task::Poll;
15+
16+
use portable_atomic::{AtomicU16, AtomicU64};
17+
use tokio::io::{self, AsyncRead};
18+
use tokio::sync::broadcast;
19+
20+
/// A generic progress event emitter.
21+
///
22+
/// It is created with a total value to reach and at which increments progress should be
23+
/// emitted. E.g. when downloading a file of any size but you want percentage increments
24+
/// you would create `ProgressEmitter::new(file_size_in_bytes, 100)` and
25+
/// [`ProgressEmitter::subscribe`] will yield numbers `1..100` only.
26+
///
27+
/// Progress is made by calling [`ProgressEmitter::inc`], which can be implicitly done by
28+
/// [`ProgressEmitter::wrap_async_read`].
29+
#[derive(Debug, Clone)]
30+
pub struct ProgressEmitter {
31+
inner: Arc<InnerProgressEmitter>,
32+
}
33+
34+
impl ProgressEmitter {
35+
/// Creates a new emitter.
36+
///
37+
/// The emitter expects to see *total* being added via [`ProgressEmitter::inc`] and will
38+
/// emit *steps* updates.
39+
pub fn new(total: u64, steps: u16) -> Self {
40+
let (tx, _rx) = broadcast::channel(16);
41+
Self {
42+
inner: Arc::new(InnerProgressEmitter {
43+
total: AtomicU64::new(total),
44+
count: AtomicU64::new(0),
45+
steps,
46+
last_step: AtomicU16::new(0u16),
47+
tx,
48+
}),
49+
}
50+
}
51+
52+
/// Sets a new total in case you did not now the total up front.
53+
pub fn set_total(&self, value: u64) {
54+
self.inner.set_total(value)
55+
}
56+
57+
/// Returns a receiver that gets incremental values.
58+
///
59+
/// The values yielded depend on *steps* passed to [`ProgressEmitter::new`]: it will go
60+
/// from `1..steps`.
61+
pub fn subscribe(&self) -> broadcast::Receiver<u16> {
62+
self.inner.subscribe()
63+
}
64+
65+
/// Increments the progress by *amount*.
66+
pub fn inc(&self, amount: u64) {
67+
self.inner.inc(amount);
68+
}
69+
70+
/// Wraps an [`AsyncRead`] which implicitly calls [`ProgressEmitter::inc`].
71+
pub fn wrap_async_read<R: AsyncRead + Unpin>(&self, read: R) -> ProgressAsyncReader<R> {
72+
ProgressAsyncReader {
73+
emitter: self.clone(),
74+
inner: read,
75+
}
76+
}
77+
}
78+
79+
/// The actual implementation.
80+
///
81+
/// This exists so it can be Arc'd into [`ProgressEmitter`] and we can easily have multiple
82+
/// `Send + Sync` copies of it. This is used by the
83+
/// [`ProgressEmitter::ProgressAsyncReader`] to update the progress without intertwining
84+
/// lifetimes.
85+
#[derive(Debug)]
86+
struct InnerProgressEmitter {
87+
total: AtomicU64,
88+
count: AtomicU64,
89+
steps: u16,
90+
last_step: AtomicU16,
91+
tx: broadcast::Sender<u16>,
92+
}
93+
94+
impl InnerProgressEmitter {
95+
fn inc(&self, amount: u64) {
96+
let prev_count = self.count.fetch_add(amount, Ordering::Relaxed);
97+
let count = prev_count + amount;
98+
let total = self.total.load(Ordering::Relaxed);
99+
let step = (std::cmp::min(count, total) * u64::from(self.steps) / total) as u16;
100+
let last_step = self.last_step.swap(step, Ordering::Relaxed);
101+
if step > last_step {
102+
self.tx.send(step).ok();
103+
}
104+
}
105+
106+
fn set_total(&self, value: u64) {
107+
self.total.store(value, Ordering::Relaxed);
108+
}
109+
110+
fn subscribe(&self) -> broadcast::Receiver<u16> {
111+
self.tx.subscribe()
112+
}
113+
}
114+
115+
/// A wrapper around [`AsyncRead`] which increments a [`ProgressEmitter`].
116+
///
117+
/// This can be used just like the underlying [`AsyncRead`] but increments progress for each
118+
/// byte read. Create this using [`ProgressEmitter::wrap_async_read`].
119+
#[derive(Debug)]
120+
pub struct ProgressAsyncReader<R: AsyncRead + Unpin> {
121+
emitter: ProgressEmitter,
122+
inner: R,
123+
}
124+
125+
impl<R> AsyncRead for ProgressAsyncReader<R>
126+
where
127+
R: AsyncRead + Unpin,
128+
{
129+
fn poll_read(
130+
mut self: Pin<&mut Self>,
131+
cx: &mut std::task::Context<'_>,
132+
buf: &mut io::ReadBuf<'_>,
133+
) -> Poll<std::io::Result<()>> {
134+
let prev_len = buf.filled().len() as u64;
135+
match Pin::new(&mut self.inner).poll_read(cx, buf) {
136+
Poll::Ready(val) => {
137+
let new_len = buf.filled().len() as u64;
138+
self.emitter.inc(new_len - prev_len);
139+
Poll::Ready(val)
140+
}
141+
Poll::Pending => Poll::Pending,
142+
}
143+
}
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use tokio::sync::broadcast::error::TryRecvError;
149+
150+
use super::*;
151+
152+
#[test]
153+
fn test_inc() {
154+
let progress = ProgressEmitter::new(160, 16);
155+
let mut rx = progress.subscribe();
156+
157+
progress.inc(1);
158+
assert_eq!(progress.inner.count.load(Ordering::Relaxed), 1);
159+
let res = rx.try_recv();
160+
assert!(matches!(res, Err(TryRecvError::Empty)));
161+
162+
progress.inc(9);
163+
assert_eq!(progress.inner.count.load(Ordering::Relaxed), 10);
164+
let res = rx.try_recv();
165+
assert!(matches!(res, Ok(1)));
166+
167+
progress.inc(30);
168+
assert_eq!(progress.inner.count.load(Ordering::Relaxed), 40);
169+
let res = rx.try_recv();
170+
assert!(matches!(res, Ok(4)));
171+
172+
progress.inc(120);
173+
assert_eq!(progress.inner.count.load(Ordering::Relaxed), 160);
174+
let res = rx.try_recv();
175+
assert!(matches!(res, Ok(16)));
176+
}
177+
178+
#[tokio::test]
179+
async fn test_async_reader() {
180+
// Note that the broadcast::Receiver has 16 slots, pushing more into them without
181+
// consuming will result in a (Try)RecvError::Lagged.
182+
let progress = ProgressEmitter::new(160, 16);
183+
let mut rx = progress.subscribe();
184+
185+
let data = [1u8; 100];
186+
let mut wrapped_reader = progress.wrap_async_read(&data[..]);
187+
io::copy(&mut wrapped_reader, &mut io::sink())
188+
.await
189+
.unwrap();
190+
191+
// Most likely this test will invoke a single AsyncRead::poll_read and thus only a
192+
// single event will be emitted. But we can not really rely on this and can only
193+
// check the last value.
194+
let mut current = 0;
195+
while let Ok(val) = rx.try_recv() {
196+
current = val;
197+
}
198+
assert_eq!(current, 10);
199+
}
200+
}

0 commit comments

Comments
 (0)