-
Notifications
You must be signed in to change notification settings - Fork 14
feat: Read and seek #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Read and seek #119
Changes from 4 commits
4bfa953
6925dd6
2d388a2
b2b0216
b2658b6
709b14e
5fc308e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt}; | |
use quinn::SendStream; | ||
use range_collections::{range_set::RangeSetRange, RangeSet2}; | ||
use ref_cast::RefCast; | ||
use serde::{Deserialize, Serialize}; | ||
use tokio::io::AsyncWriteExt; | ||
use tracing::trace; | ||
mod reader; | ||
pub use reader::BlobReader; | ||
|
||
// Public reexports from the proto module. | ||
// | ||
|
@@ -102,6 +105,14 @@ impl Blobs { | |
}) | ||
} | ||
|
||
pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader { | ||
self.reader_with_opts(ReaderOptions { hash: hash.into() }) | ||
} | ||
|
||
pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader { | ||
BlobReader::new(self.clone(), options) | ||
} | ||
|
||
/// Delete a blob. | ||
/// | ||
/// This function is not public, because it does not work as expected when called manually, | ||
|
@@ -647,6 +658,12 @@ impl<'a> AddProgress<'a> { | |
} | ||
} | ||
|
||
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek. | ||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
pub struct ReaderOptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might think that this options object is a bit much, but who knows, maybe we want to add more options in the future. One option would be the "magic wait until download" option, e.g. you seek to somewhere where the blob isn't downloaded yet, it waits instead of failing. Or even, you seek to somewhere where the blob isn't downloaded yet, it triggers a download and waits instead of failing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe mark it as non_exhaustive in this case? |
||
pub hash: Hash, | ||
} | ||
|
||
/// An observe result. Awaiting this will return the current state. | ||
/// | ||
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where | ||
|
@@ -856,7 +873,7 @@ impl ExportRangesProgress { | |
/// range of 0..100, you will get the entire first chunk, 0..1024. | ||
/// | ||
/// It is up to the caller to clip the ranges to the requested ranges. | ||
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> { | ||
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> { | ||
Gen::new(|co| async move { | ||
let mut rx = match self.inner.await { | ||
Ok(rx) => rx, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,328 @@ | ||
use std::{ | ||
io::{self, ErrorKind, SeekFrom}, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use n0_future::StreamExt; | ||
|
||
use crate::api::{ | ||
blobs::{Blobs, ReaderOptions}, | ||
proto::ExportRangesItem, | ||
}; | ||
|
||
#[derive(Debug)] | ||
pub struct BlobReader { | ||
blobs: Blobs, | ||
options: ReaderOptions, | ||
state: ReaderState, | ||
} | ||
|
||
#[derive(Default, derive_more::Debug)] | ||
enum ReaderState { | ||
Idle { | ||
position: u64, | ||
}, | ||
Seeking { | ||
position: u64, | ||
}, | ||
Reading { | ||
position: u64, | ||
#[debug(skip)] | ||
op: n0_future::boxed::BoxStream<ExportRangesItem>, | ||
}, | ||
#[default] | ||
Poisoned, | ||
} | ||
|
||
impl BlobReader { | ||
rklaehn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn new(blobs: Blobs, options: ReaderOptions) -> Self { | ||
rklaehn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Self { | ||
blobs, | ||
options, | ||
state: ReaderState::Idle { position: 0 }, | ||
} | ||
} | ||
} | ||
|
||
impl tokio::io::AsyncRead for BlobReader { | ||
fn poll_read( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &mut tokio::io::ReadBuf<'_>, | ||
) -> Poll<io::Result<()>> { | ||
let this = self.get_mut(); | ||
let mut position1 = None; | ||
loop { | ||
let guard = &mut this.state; | ||
match std::mem::take(guard) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi, I had this pattern show up being extremly slow in another project :'( with the rust compiler allocating over and over again, making reads/writes you can see how I fixed/worked around it here: https://github.com/rpgp/rpgp/pull/579/files#diff-70038cda95dcb4bc54877b1a3eb9f111ed226310dad30909b6d3cb88af6cd2f5 not saying this is currently slow, just warning you this can be very slow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is unfortunate. I like this pattern. I think it should be fine as of now. The state is just 32 bytes, so it is very unlikely to matter compared with all the async stuff going on. I could rewrite it to just match on the state in place, but then there is always the chance that you get in some weird state instead of poisoned if you forget to handle an odd error. |
||
ReaderState::Idle { position } => { | ||
// todo: read until next page boundary instead of fixed size | ||
let len = buf.remaining() as u64; | ||
let end = position.checked_add(len).ok_or_else(|| { | ||
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading") | ||
})?; | ||
// start the export op for the entire size of the buffer, and convert to a stream | ||
let stream = this | ||
.blobs | ||
.export_ranges(this.options.hash, position..end) | ||
.stream(); | ||
position1 = Some(position); | ||
*guard = ReaderState::Reading { | ||
position, | ||
op: Box::pin(stream), | ||
}; | ||
} | ||
ReaderState::Reading { position, mut op } => { | ||
let position1 = position1.get_or_insert(position); | ||
match op.poll_next(cx) { | ||
Poll::Ready(Some(ExportRangesItem::Size(_))) => { | ||
*guard = ReaderState::Reading { position, op }; | ||
} | ||
Poll::Ready(Some(ExportRangesItem::Data(data))) => { | ||
if data.offset != *position1 { | ||
break Poll::Ready(Err(io::Error::other( | ||
"Data offset does not match expected position", | ||
))); | ||
} | ||
buf.put_slice(&data.data); | ||
// update just local position1, not the position in the state. | ||
*position1 = | ||
position1 | ||
.checked_add(data.data.len() as u64) | ||
.ok_or_else(|| { | ||
io::Error::new(ErrorKind::InvalidInput, "Position overflow") | ||
})?; | ||
*guard = ReaderState::Reading { position, op }; | ||
} | ||
Poll::Ready(Some(ExportRangesItem::Error(err))) => { | ||
*guard = ReaderState::Idle { position }; | ||
break Poll::Ready(Err(io::Error::other(format!( | ||
"Error reading data: {err}" | ||
)))); | ||
} | ||
Poll::Ready(None) => { | ||
// done with the stream, go back in idle. | ||
*guard = ReaderState::Idle { | ||
position: *position1, | ||
}; | ||
break Poll::Ready(Ok(())); | ||
rklaehn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
Poll::Pending => { | ||
break if position != *position1 { | ||
rklaehn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// we read some data so we need to abort the op. | ||
// | ||
// we can't be sure we won't be called with the same buf size next time. | ||
*guard = ReaderState::Idle { | ||
position: *position1, | ||
}; | ||
Poll::Ready(Ok(())) | ||
} else { | ||
// nothing was read yet, we remain in the reading state | ||
// | ||
// we make an assumption here that the next call will be with the same buf size. | ||
*guard = ReaderState::Reading { | ||
position: *position1, | ||
op, | ||
}; | ||
Poll::Pending | ||
}; | ||
} | ||
} | ||
} | ||
state @ ReaderState::Seeking { .. } => { | ||
// should I try to recover from this or just keep it poisoned? | ||
this.state = state; | ||
break Poll::Ready(Err(io::Error::other("Can't read while seeking"))); | ||
} | ||
ReaderState::Poisoned => { | ||
break Poll::Ready(Err(io::Error::other("Reader is poisoned"))); | ||
} | ||
}; | ||
} | ||
} | ||
} | ||
|
||
impl tokio::io::AsyncSeek for BlobReader { | ||
fn start_seek( | ||
self: std::pin::Pin<&mut Self>, | ||
seek_from: tokio::io::SeekFrom, | ||
) -> io::Result<()> { | ||
let this = self.get_mut(); | ||
let guard = &mut this.state; | ||
match std::mem::take(guard) { | ||
ReaderState::Idle { position } => { | ||
let position1 = match seek_from { | ||
SeekFrom::Start(pos) => pos, | ||
SeekFrom::Current(offset) => { | ||
position.checked_add_signed(offset).ok_or_else(|| { | ||
io::Error::new( | ||
ErrorKind::InvalidInput, | ||
"Position overflow when seeking", | ||
) | ||
})? | ||
} | ||
SeekFrom::End(_offset) => { | ||
// todo: support seeking from end if we know the size | ||
return Err(io::Error::new( | ||
ErrorKind::InvalidInput, | ||
"Seeking from end is not supported yet", | ||
))?; | ||
} | ||
}; | ||
*guard = ReaderState::Seeking { | ||
position: position1, | ||
}; | ||
Ok(()) | ||
} | ||
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit unfortunate, not necessary for the PR but I have had instances in the past where I wanted to say read the first 100 bytes, seek 200 bytes, and then continue reading There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can do that, as long as you don't drop futures. read(...).await? is fine. the only thing that would break it would be something like this: read(...).timeout(Duration::from_millis(1)).await?; // read future is dropped. I mean, I would love to support this, but the docs is uniquely unhelpful about what the behaviour should be. "The position returned by calling this method can only be relied on right after start_seek. If you have changed the position by e.g. reading or writing since calling start_seek, then it is unspecified whether the returned position takes that position change into account. Similarly, if start_seek has never been called, then it is unspecified whether poll_complete returns the actual position or some other placeholder value (such as 0)." I guess you could make it so a seek from an absolute position (start and later end) recovers the state you get into when a read future is dropped, since at this point the internal state is deterministic again... 🤷 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah right, I missunderstood
uff... |
||
ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")), | ||
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), | ||
} | ||
} | ||
|
||
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> { | ||
let this = self.get_mut(); | ||
let guard = &mut this.state; | ||
Poll::Ready(match std::mem::take(guard) { | ||
ReaderState::Seeking { position } => { | ||
*guard = ReaderState::Idle { position }; | ||
Ok(position) | ||
} | ||
ReaderState::Idle { position } => { | ||
// seek calls poll_complete just in case, to finish a pending seek operation | ||
// before the next seek operation. So it is poll_complete/start_seek/poll_complete | ||
*guard = ReaderState::Idle { position }; | ||
Ok(position) | ||
} | ||
state @ ReaderState::Reading { .. } => { | ||
// should I try to recover from this or just keep it poisoned? | ||
*guard = state; | ||
Err(io::Error::other("Can't seek while reading")) | ||
} | ||
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), | ||
}) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use bao_tree::ChunkRanges; | ||
use testresult::TestResult; | ||
use tokio::io::{AsyncReadExt, AsyncSeekExt}; | ||
|
||
use super::*; | ||
use crate::{ | ||
store::{ | ||
fs::{ | ||
tests::{create_n0_bao, test_data, INTERESTING_SIZES}, | ||
FsStore, | ||
}, | ||
mem::MemStore, | ||
}, | ||
util::ChunkRangesExt, | ||
}; | ||
|
||
async fn reader_smoke(blobs: &Blobs) -> TestResult<()> { | ||
for size in INTERESTING_SIZES { | ||
let data = test_data(size); | ||
let tag = blobs.add_bytes(data.clone()).await?; | ||
// read all | ||
{ | ||
let mut reader = blobs.reader(tag.hash); | ||
let mut buf = Vec::new(); | ||
reader.read_to_end(&mut buf).await?; | ||
assert_eq!(buf, data); | ||
let pos = reader.stream_position().await?; | ||
assert_eq!(pos, data.len() as u64); | ||
} | ||
// seek to mid and read all | ||
{ | ||
let mut reader = blobs.reader(tag.hash); | ||
let mid = size / 2; | ||
reader.seek(SeekFrom::Start(mid as u64)).await?; | ||
let mut buf = Vec::new(); | ||
reader.read_to_end(&mut buf).await?; | ||
assert_eq!(buf, data[mid..].to_vec()); | ||
let pos = reader.stream_position().await?; | ||
assert_eq!(pos, data.len() as u64); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
async fn reader_partial(blobs: &Blobs) -> TestResult<()> { | ||
for size in INTERESTING_SIZES { | ||
let data = test_data(size); | ||
let ranges = ChunkRanges::chunk(0); | ||
let (hash, bao) = create_n0_bao(&data, &ranges)?; | ||
println!("importing {} bytes", bao.len()); | ||
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; | ||
// read the first chunk or the entire blob, whatever is smaller | ||
// this should work! | ||
{ | ||
let mut reader = blobs.reader(hash); | ||
let valid = size.min(1024); | ||
let mut buf = vec![0u8; valid]; | ||
reader.read_exact(&mut buf).await?; | ||
assert_eq!(buf, data[..valid]); | ||
let pos = reader.stream_position().await?; | ||
assert_eq!(pos, valid as u64); | ||
} | ||
if size > 1024 { | ||
// read the part we don't have - should immediately return an error | ||
{ | ||
let mut reader = blobs.reader(hash); | ||
let mut rest = vec![0u8; size - 1024]; | ||
reader.seek(SeekFrom::Start(1024)).await?; | ||
let res = reader.read_exact(&mut rest).await; | ||
assert!(res.is_err()); | ||
} | ||
// read crossing the end of the blob - should return an error despite | ||
// the first bytes being valid. | ||
// A read that fails should not update the stream position. | ||
{ | ||
let mut reader = blobs.reader(hash); | ||
let mut buf = vec![0u8; size]; | ||
let res = reader.read(&mut buf).await; | ||
assert!(res.is_err()); | ||
let pos = reader.stream_position().await?; | ||
assert_eq!(pos, 0); | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reader_partial_fs() -> TestResult<()> { | ||
let testdir = tempfile::tempdir()?; | ||
let store = FsStore::load(testdir.path().to_owned()).await?; | ||
// reader_smoke_raw(store.blobs()).await?; | ||
rklaehn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
reader_partial(store.blobs()).await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reader_partial_memory() -> TestResult<()> { | ||
let store = MemStore::new(); | ||
reader_partial(store.blobs()).await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reader_smoke_fs() -> TestResult<()> { | ||
let testdir = tempfile::tempdir()?; | ||
let store = FsStore::load(testdir.path().to_owned()).await?; | ||
// reader_smoke_raw(store.blobs()).await?; | ||
reader_smoke(store.blobs()).await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reader_smoke_memory() -> TestResult<()> { | ||
let store = MemStore::new(); | ||
reader_smoke(store.blobs()).await?; | ||
Ok(()) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.