Skip to content

Commit 6925dd6

Browse files
committed
Fix some bugs
1 parent 4bfa953 commit 6925dd6

File tree

3 files changed

+243
-87
lines changed

3 files changed

+243
-87
lines changed

src/api/blobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,7 @@ impl ExportRangesProgress {
873873
/// range of 0..100, you will get the entire first chunk, 0..1024.
874874
///
875875
/// It is up to the caller to clip the ranges to the requested ranges.
876-
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
876+
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
877877
Gen::new(|co| async move {
878878
let mut rx = match self.inner.await {
879879
Ok(rx) => rx,

src/api/blobs/reader.rs

Lines changed: 238 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
use std::{
22
io::{self, ErrorKind, SeekFrom},
33
ops::DerefMut,
4+
pin::Pin,
45
sync::{Arc, Mutex},
5-
task::Poll,
6+
task::{Context, Poll},
67
};
78

8-
use n0_future::FutureExt;
9+
use n0_future::StreamExt;
910

1011
use crate::api::{
1112
blobs::{Blobs, ReaderOptions},
12-
RequestResult,
13+
proto::ExportRangesItem,
1314
};
1415

16+
#[derive(Debug)]
1517
pub struct Reader {
1618
blobs: Blobs,
1719
options: ReaderOptions,
1820
state: Arc<Mutex<ReaderState>>,
1921
}
2022

21-
#[derive(Default)]
23+
#[derive(Default, derive_more::Debug)]
2224
enum ReaderState {
2325
Idle {
2426
position: u64,
@@ -28,7 +30,8 @@ enum ReaderState {
2830
},
2931
Reading {
3032
position: u64,
31-
op: n0_future::boxed::BoxFuture<RequestResult<Vec<u8>>>,
33+
#[debug(skip)]
34+
op: n0_future::boxed::BoxStream<ExportRangesItem>,
3235
},
3336
#[default]
3437
Poisoned,
@@ -46,79 +49,112 @@ impl Reader {
4649

4750
impl tokio::io::AsyncRead for Reader {
4851
fn poll_read(
49-
self: std::pin::Pin<&mut Self>,
50-
cx: &mut std::task::Context<'_>,
52+
self: Pin<&mut Self>,
53+
cx: &mut Context<'_>,
5154
buf: &mut tokio::io::ReadBuf<'_>,
52-
) -> std::task::Poll<std::io::Result<()>> {
55+
) -> Poll<io::Result<()>> {
5356
let this = self.get_mut();
54-
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
55-
ReaderState::Idle { position } => {
56-
// todo: read until next page boundary instead of fixed size
57-
let len = buf.remaining().min(1024 * 16);
58-
let end = position.checked_add(len as u64).ok_or_else(|| {
59-
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
60-
})?;
61-
let hash = this.options.hash;
62-
let blobs = this.blobs.clone();
63-
let ranges = position..end;
64-
let op = async move { blobs.export_ranges(hash, ranges).concatenate().await };
65-
*this.state.lock().unwrap() = ReaderState::Reading {
66-
position,
67-
op: Box::pin(op),
68-
};
69-
}
70-
ReaderState::Reading { position, mut op } => {
71-
match op.poll(cx) {
72-
Poll::Ready(Ok(data)) => {
73-
let len = data.len();
74-
if len > buf.remaining() {
75-
return Poll::Ready(Err(io::Error::new(
76-
ErrorKind::UnexpectedEof,
77-
"Read more data than buffer can hold",
57+
let mut position1 = None;
58+
loop {
59+
let mut guard = this.state.lock().unwrap();
60+
match std::mem::take(guard.deref_mut()) {
61+
ReaderState::Idle { position } => {
62+
// todo: read until next page boundary instead of fixed size
63+
let len = buf.remaining() as u64;
64+
let end = position.checked_add(len).ok_or_else(|| {
65+
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
66+
})?;
67+
// start the export op for the entire size of the buffer, and convert to a stream
68+
let stream = this
69+
.blobs
70+
.export_ranges(this.options.hash, position..end)
71+
.stream();
72+
position1 = Some(position);
73+
*guard = ReaderState::Reading {
74+
position,
75+
op: Box::pin(stream),
76+
};
77+
}
78+
ReaderState::Reading { position, mut op } => {
79+
let position1 = position1.get_or_insert(position);
80+
match op.poll_next(cx) {
81+
Poll::Ready(Some(ExportRangesItem::Data(data))) => {
82+
if data.offset != *position1 {
83+
break Poll::Ready(Err(io::Error::other(
84+
"Data offset does not match expected position",
85+
)));
86+
}
87+
buf.put_slice(&data.data);
88+
// update just local position1, not the position in the state.
89+
*position1 =
90+
position1
91+
.checked_add(data.data.len() as u64)
92+
.ok_or_else(|| {
93+
io::Error::new(ErrorKind::InvalidInput, "Position overflow")
94+
})?;
95+
*guard = ReaderState::Reading { position, op };
96+
}
97+
Poll::Ready(Some(ExportRangesItem::Error(err))) => {
98+
*guard = ReaderState::Idle { position };
99+
break Poll::Ready(Err(io::Error::other(
100+
format!("Error reading data: {err}"),
78101
)));
79102
}
80-
buf.put_slice(&data);
81-
let position = position + len as u64;
82-
*this.state.lock().unwrap() = ReaderState::Idle { position };
83-
return Poll::Ready(Ok(()));
84-
}
85-
Poll::Ready(Err(e)) => {
86-
*this.state.lock().unwrap() = ReaderState::Idle { position };
87-
let e = io::Error::new(ErrorKind::Other, e.to_string());
88-
return Poll::Ready(Err(e));
89-
}
90-
Poll::Pending => {
91-
// Put back the state
92-
*this.state.lock().unwrap() = ReaderState::Reading {
93-
position,
94-
op: Box::pin(op),
95-
};
96-
return Poll::Pending;
103+
Poll::Ready(Some(ExportRangesItem::Size(_size))) => {
104+
// put back the state and continue reading
105+
*guard = ReaderState::Reading { position, op };
106+
}
107+
Poll::Ready(None) => {
108+
// done with the stream, go back in idle.
109+
*guard = ReaderState::Idle {
110+
position: *position1,
111+
};
112+
break Poll::Ready(Ok(()));
113+
}
114+
Poll::Pending => {
115+
break if position != *position1 {
116+
// we read some data so we need to abort the op.
117+
//
118+
// we can't be sure we won't be called with the same buf size next time.
119+
*guard = ReaderState::Idle {
120+
position: *position1,
121+
};
122+
Poll::Ready(Ok(()))
123+
} else {
124+
// nothing was read yet, we remain in the reading state
125+
//
126+
// we make an assumption here that the next call will be with the same buf size.
127+
*guard = ReaderState::Reading {
128+
position: *position1,
129+
op,
130+
};
131+
Poll::Pending
132+
};
133+
}
97134
}
98135
}
99-
}
100-
state @ ReaderState::Seeking { .. } => {
101-
*this.state.lock().unwrap() = state;
102-
return Poll::Ready(Err(io::Error::new(
103-
ErrorKind::Other,
104-
"Can't read while seeking",
105-
)));
106-
}
107-
ReaderState::Poisoned => {
108-
return Poll::Ready(Err(io::Error::other("Reader is poisoned")));
109-
}
110-
};
111-
todo!()
136+
state @ ReaderState::Seeking { .. } => {
137+
*this.state.lock().unwrap() = state;
138+
break Poll::Ready(Err(io::Error::other(
139+
"Can't read while seeking",
140+
)));
141+
}
142+
ReaderState::Poisoned => {
143+
break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
144+
}
145+
};
146+
}
112147
}
113148
}
114149

115150
impl tokio::io::AsyncSeek for Reader {
116151
fn start_seek(
117152
self: std::pin::Pin<&mut Self>,
118153
seek_from: tokio::io::SeekFrom,
119-
) -> std::io::Result<()> {
154+
) -> io::Result<()> {
120155
let this = self.get_mut();
121-
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
156+
let mut guard = this.state.lock().unwrap();
157+
match std::mem::take(guard.deref_mut()) {
122158
ReaderState::Idle { position } => {
123159
let position1 = match seek_from {
124160
SeekFrom::Start(pos) => pos,
@@ -138,7 +174,7 @@ impl tokio::io::AsyncSeek for Reader {
138174
))?;
139175
}
140176
};
141-
*this.state.lock().unwrap() = ReaderState::Seeking {
177+
*guard = ReaderState::Seeking {
142178
position: position1,
143179
};
144180
Ok(())
@@ -149,22 +185,144 @@ impl tokio::io::AsyncSeek for Reader {
149185
}
150186
}
151187

152-
fn poll_complete(
153-
self: std::pin::Pin<&mut Self>,
154-
_cx: &mut std::task::Context<'_>,
155-
) -> std::task::Poll<std::io::Result<u64>> {
188+
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
156189
let this = self.get_mut();
157-
Poll::Ready(
158-
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
159-
ReaderState::Seeking { position } => {
160-
// we only put the state back if we are in the right state
161-
*this.state.lock().unwrap() = ReaderState::Idle { position };
162-
Ok(position)
163-
}
164-
ReaderState::Idle { .. } => Err(io::Error::other("No seek operation in progress")),
165-
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
166-
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
190+
let mut guard = this.state.lock().unwrap();
191+
Poll::Ready(match std::mem::take(guard.deref_mut()) {
192+
ReaderState::Seeking { position } => {
193+
*guard = ReaderState::Idle { position };
194+
Ok(position)
195+
}
196+
ReaderState::Idle { position } => {
197+
// seek calls poll_complete just in case, to finish a pending seek operation
198+
// before the next seek operation. So it is poll_complete/start_seek/poll_complete
199+
*guard = ReaderState::Idle { position };
200+
Ok(position)
201+
}
202+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
203+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
204+
})
205+
}
206+
}
207+
208+
#[cfg(test)]
209+
mod tests {
210+
use bao_tree::ChunkRanges;
211+
use testresult::TestResult;
212+
use tokio::io::{AsyncReadExt, AsyncSeekExt};
213+
214+
use super::*;
215+
use crate::{
216+
store::{
217+
fs::{
218+
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
219+
FsStore,
167220
},
168-
)
221+
mem::MemStore,
222+
},
223+
util::ChunkRangesExt,
224+
};
225+
226+
async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
227+
for size in INTERESTING_SIZES {
228+
let data = test_data(size);
229+
let tag = blobs.add_bytes(data.clone()).await?;
230+
// read all
231+
{
232+
let mut reader = blobs.reader(tag.hash);
233+
let mut buf = Vec::new();
234+
reader.read_to_end(&mut buf).await?;
235+
assert_eq!(buf, data);
236+
let pos = reader.stream_position().await?;
237+
assert_eq!(pos, data.len() as u64);
238+
}
239+
// seek to mid and read all
240+
{
241+
let mut reader = blobs.reader(tag.hash);
242+
let mid = size / 2;
243+
reader.seek(SeekFrom::Start(mid as u64)).await?;
244+
let mut buf = Vec::new();
245+
reader.read_to_end(&mut buf).await?;
246+
assert_eq!(buf, data[mid..].to_vec());
247+
let pos = reader.stream_position().await?;
248+
assert_eq!(pos, data.len() as u64);
249+
}
250+
}
251+
Ok(())
252+
}
253+
254+
async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
255+
for size in INTERESTING_SIZES {
256+
let data = test_data(size);
257+
let ranges = ChunkRanges::chunk(0);
258+
let (hash, bao) = create_n0_bao(&data, &ranges)?;
259+
println!("importing {} bytes", bao.len());
260+
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
261+
// read the first chunk or the entire blob, whatever is smaller
262+
// this should work!
263+
{
264+
let mut reader = blobs.reader(hash);
265+
let valid = size.min(1024);
266+
let mut buf = vec![0u8; valid];
267+
reader.read_exact(&mut buf).await?;
268+
assert_eq!(buf, data[..valid]);
269+
let pos = reader.stream_position().await?;
270+
assert_eq!(pos, valid as u64);
271+
}
272+
if size > 1024 {
273+
// read the part we don't have - should immediately return an error
274+
{
275+
let mut reader = blobs.reader(hash);
276+
let mut rest = vec![0u8; size - 1024];
277+
reader.seek(SeekFrom::Start(1024)).await?;
278+
let res = reader.read_exact(&mut rest).await;
279+
assert!(res.is_err());
280+
}
281+
// read crossing the end of the blob - should return an error despite
282+
// the first bytes being valid.
283+
// A read that fails should not update the stream position.
284+
{
285+
let mut reader = blobs.reader(hash);
286+
let mut buf = vec![0u8; size];
287+
let res = reader.read(&mut buf).await;
288+
assert!(res.is_err());
289+
let pos = reader.stream_position().await?;
290+
assert_eq!(pos, 0);
291+
}
292+
}
293+
}
294+
Ok(())
295+
}
296+
297+
#[tokio::test]
298+
async fn reader_partial_fs() -> TestResult<()> {
299+
let testdir = tempfile::tempdir()?;
300+
let store = FsStore::load(testdir.path().to_owned()).await?;
301+
// reader_smoke_raw(store.blobs()).await?;
302+
reader_partial(store.blobs()).await?;
303+
Ok(())
304+
}
305+
306+
#[tokio::test]
307+
async fn reader_partial_memory() -> TestResult<()> {
308+
let store = MemStore::new();
309+
reader_partial(store.blobs()).await?;
310+
Ok(())
311+
}
312+
313+
#[tokio::test]
314+
async fn reader_smoke_fs() -> TestResult<()> {
315+
let testdir = tempfile::tempdir()?;
316+
let store = FsStore::load(testdir.path().to_owned()).await?;
317+
// reader_smoke_raw(store.blobs()).await?;
318+
reader_smoke(store.blobs()).await?;
319+
Ok(())
320+
}
321+
322+
#[tokio::test]
323+
async fn reader_smoke_memory() -> TestResult<()> {
324+
let store = MemStore::new();
325+
reader_smoke(store.blobs()).await?;
326+
Ok(())
169327
}
170328
}

0 commit comments

Comments
 (0)