Skip to content

Commit 5a107b5

Browse files
committed
refactor: remove file-handle from object store GET operations
Closes #18.
1 parent 0c3152c commit 5a107b5

File tree

7 files changed

+79
-167
lines changed

7 files changed

+79
-167
lines changed

src/chunked.rs

Lines changed: 38 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use futures::StreamExt;
2828

2929
use crate::path::Path;
3030
use crate::{
31-
GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
32-
PutMultipartOpts, PutOptions, PutResult,
31+
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
32+
PutOptions, PutResult,
3333
};
3434
use crate::{PutPayload, Result};
3535

@@ -85,57 +85,43 @@ impl ObjectStore for ChunkedStore {
8585

8686
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
8787
let r = self.inner.get_opts(location, options).await?;
88-
let stream = match r.payload {
89-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
90-
GetResultPayload::File(file, path) => {
91-
crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
92-
}
93-
GetResultPayload::Stream(stream) => {
94-
let buffer = BytesMut::new();
95-
futures::stream::unfold(
96-
(stream, buffer, false, self.chunk_size),
97-
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
98-
// Keep accumulating bytes until we reach capacity as long as
99-
// the stream can provide them:
100-
if exhausted {
101-
return None;
88+
let buffer = BytesMut::new();
89+
let payload = futures::stream::unfold(
90+
(r.payload, buffer, false, self.chunk_size),
91+
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
92+
// Keep accumulating bytes until we reach capacity as long as
93+
// the stream can provide them:
94+
if exhausted {
95+
return None;
96+
}
97+
while buffer.len() < chunk_size {
98+
match stream.next().await {
99+
None => {
100+
exhausted = true;
101+
let slice = buffer.split_off(0).freeze();
102+
return Some((Ok(slice), (stream, buffer, exhausted, chunk_size)));
102103
}
103-
while buffer.len() < chunk_size {
104-
match stream.next().await {
105-
None => {
106-
exhausted = true;
107-
let slice = buffer.split_off(0).freeze();
108-
return Some((
109-
Ok(slice),
110-
(stream, buffer, exhausted, chunk_size),
111-
));
112-
}
113-
Some(Ok(bytes)) => {
114-
buffer.put(bytes);
115-
}
116-
Some(Err(e)) => {
117-
return Some((
118-
Err(crate::Error::Generic {
119-
store: "ChunkedStore",
120-
source: Box::new(e),
121-
}),
122-
(stream, buffer, exhausted, chunk_size),
123-
))
124-
}
125-
};
104+
Some(Ok(bytes)) => {
105+
buffer.put(bytes);
126106
}
127-
// Return the chunked values as the next value in the stream
128-
let slice = buffer.split_to(chunk_size).freeze();
129-
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
130-
},
131-
)
132-
.boxed()
133-
}
134-
};
135-
Ok(GetResult {
136-
payload: GetResultPayload::Stream(stream),
137-
..r
138-
})
107+
Some(Err(e)) => {
108+
return Some((
109+
Err(crate::Error::Generic {
110+
store: "ChunkedStore",
111+
source: Box::new(e),
112+
}),
113+
(stream, buffer, exhausted, chunk_size),
114+
))
115+
}
116+
};
117+
}
118+
// Return the chunked values as the next value in the stream
119+
let slice = buffer.split_to(chunk_size).freeze();
120+
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
121+
},
122+
)
123+
.boxed();
124+
Ok(GetResult { payload, ..r })
139125
}
140126

141127
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
@@ -196,10 +182,7 @@ mod tests {
196182

197183
for chunk_size in [10, 20, 31] {
198184
let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
199-
let mut s = match store.get(&location).await.unwrap().payload {
200-
GetResultPayload::Stream(s) => s,
201-
_ => unreachable!(),
202-
};
185+
let mut s = store.get(&location).await.unwrap().payload;
203186

204187
let mut remaining = 1001;
205188
while let Some(next) = s.next().await {

src/client/get.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::ops::Range;
2020
use crate::client::header::{header_meta, HeaderConfig};
2121
use crate::client::HttpResponse;
2222
use crate::path::Path;
23-
use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result};
23+
use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, Result};
2424
use async_trait::async_trait;
2525
use futures::{StreamExt, TryStreamExt};
2626
use http::header::{
@@ -240,7 +240,7 @@ fn get_result<T: GetClient>(
240240
}
241241
}
242242

243-
let stream = response
243+
let payload = response
244244
.into_body()
245245
.bytes_stream()
246246
.map_err(|source| crate::Error::Generic {
@@ -253,7 +253,7 @@ fn get_result<T: GetClient>(
253253
range,
254254
meta,
255255
attributes,
256-
payload: GetResultPayload::Stream(stream),
256+
payload,
257257
})
258258
}
259259

src/lib.rs

Lines changed: 19 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@
111111
//!
112112
//! [`BufReader`]: buffered::BufReader
113113
//! [`BufWriter`]: buffered::BufWriter
114+
//! [`Read`]: std::io::Read
115+
//! [`Seek`]: std::io::Seek
114116
//!
115117
//! # Adapters
116118
//!
@@ -575,8 +577,6 @@ use bytes::Bytes;
575577
use chrono::{DateTime, Utc};
576578
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
577579
use std::fmt::{Debug, Formatter};
578-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
579-
use std::io::{Read, Seek, SeekFrom};
580580
use std::ops::Range;
581581
use std::sync::Arc;
582582

@@ -1035,10 +1035,9 @@ impl GetOptions {
10351035
}
10361036

10371037
/// Result for a get request
1038-
#[derive(Debug)]
10391038
pub struct GetResult {
1040-
/// The [`GetResultPayload`]
1041-
pub payload: GetResultPayload,
1039+
/// The payload.
1040+
pub payload: BoxStream<'static, Result<Bytes>>,
10421041
/// The [`ObjectMeta`] for this object
10431042
pub meta: ObjectMeta,
10441043
/// The range of bytes returned by this request
@@ -1049,82 +1048,29 @@ pub struct GetResult {
10491048
pub attributes: Attributes,
10501049
}
10511050

1052-
/// The kind of a [`GetResult`]
1053-
///
1054-
/// This special cases the case of a local file, as some systems may
1055-
/// be able to optimise the case of a file already present on local disk
1056-
pub enum GetResultPayload {
1057-
/// The file, path
1058-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1059-
File(std::fs::File, std::path::PathBuf),
1060-
/// An opaque stream of bytes
1061-
Stream(BoxStream<'static, Result<Bytes>>),
1062-
}
1063-
1064-
impl Debug for GetResultPayload {
1051+
impl std::fmt::Debug for GetResult {
10651052
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1066-
match self {
1067-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1068-
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1069-
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1070-
}
1053+
let Self {
1054+
payload: _,
1055+
meta,
1056+
range,
1057+
attributes,
1058+
} = self;
1059+
1060+
f.debug_struct("GetResult")
1061+
.field("payload", &"<STREAM>")
1062+
.field("meta", meta)
1063+
.field("range", range)
1064+
.field("attributes", attributes)
1065+
.finish()
10711066
}
10721067
}
10731068

10741069
impl GetResult {
10751070
/// Collects the data into a [`Bytes`]
10761071
pub async fn bytes(self) -> Result<Bytes> {
10771072
let len = self.range.end - self.range.start;
1078-
match self.payload {
1079-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1080-
GetResultPayload::File(mut file, path) => {
1081-
maybe_spawn_blocking(move || {
1082-
file.seek(SeekFrom::Start(self.range.start as _))
1083-
.map_err(|source| local::Error::Seek {
1084-
source,
1085-
path: path.clone(),
1086-
})?;
1087-
1088-
let mut buffer = if let Ok(len) = len.try_into() {
1089-
Vec::with_capacity(len)
1090-
} else {
1091-
Vec::new()
1092-
};
1093-
file.take(len as _)
1094-
.read_to_end(&mut buffer)
1095-
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
1096-
1097-
Ok(buffer.into())
1098-
})
1099-
.await
1100-
}
1101-
GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1102-
}
1103-
}
1104-
1105-
/// Converts this into a byte stream
1106-
///
1107-
/// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1108-
/// otherwise will return the [`GetResultPayload::Stream`].
1109-
///
1110-
/// # Tokio Compatibility
1111-
///
1112-
/// Tokio discourages performing blocking IO on a tokio worker thread, however,
1113-
/// no major operating systems have stable async file APIs. Therefore if called from
1114-
/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1115-
/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1116-
///
1117-
/// If not called from a tokio context, this will perform IO on the current thread with
1118-
/// no additional complexity or overheads
1119-
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1120-
match self.payload {
1121-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1122-
GetResultPayload::File(file, path) => {
1123-
const CHUNK_SIZE: usize = 8 * 1024;
1124-
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1125-
}
1126-
GetResultPayload::Stream(s) => s,
1127-
}
1073+
collect_bytes(self.payload, Some(len)).await
11281074
}
11291075
}
11301076

src/limit.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
//! An object store that limits the maximum concurrency of the wrapped implementation
1919
2020
use crate::{
21-
BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
22-
ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt,
23-
UploadPart,
21+
BoxStream, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, Path,
22+
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt, UploadPart,
2423
};
2524
use async_trait::async_trait;
2625
use bytes::Bytes;
@@ -200,13 +199,7 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
200199
}
201200

202201
fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
203-
let payload = match r.payload {
204-
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
205-
v @ GetResultPayload::File(_, _) => v,
206-
GetResultPayload::Stream(s) => {
207-
GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
208-
}
209-
};
202+
let payload = PermitWrapper::new(r.payload, permit).boxed();
210203
GetResult { payload, ..r }
211204
}
212205

src/local.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use crate::{
3737
maybe_spawn_blocking,
3838
path::{absolute_path_to_url, Path},
3939
util::InvalidGetRange,
40-
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
41-
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
40+
Attributes, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
41+
PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
4242
};
4343

4444
/// A specialized `Error` for filesystem object store-related errors
@@ -414,8 +414,11 @@ impl ObjectStore for LocalFileSystem {
414414
None => 0..meta.size,
415415
};
416416

417+
const CHUNK_SIZE: usize = 8 * 1024;
418+
let payload = chunked_stream(file, path, range.clone(), CHUNK_SIZE);
419+
417420
Ok(GetResult {
418-
payload: GetResultPayload::File(file, path),
421+
payload,
419422
attributes: Attributes::default(),
420423
range,
421424
meta,

src/memory.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use parking_lot::RwLock;
2929
use crate::multipart::{MultipartStore, PartId};
3030
use crate::util::InvalidGetRange;
3131
use crate::{
32-
path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
33-
MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult,
34-
Result, UpdateVersion, UploadPart,
32+
path::Path, Attributes, GetRange, GetResult, ListResult, MultipartId, MultipartUpload,
33+
ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult, Result,
34+
UpdateVersion, UploadPart,
3535
};
3636
use crate::{GetOptions, PutPayload};
3737

@@ -262,7 +262,7 @@ impl ObjectStore for InMemory {
262262
let stream = futures::stream::once(futures::future::ready(Ok(data)));
263263

264264
Ok(GetResult {
265-
payload: GetResultPayload::Stream(stream.boxed()),
265+
payload: stream.boxed(),
266266
attributes: entry.attributes,
267267
meta,
268268
range,

0 commit comments

Comments
 (0)