diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 8dac2bd7..5116ded1 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -499,6 +499,7 @@ mod tests { use crate::integration::*; use crate::tests::*; use crate::ClientOptions; + use crate::ObjectStoreExt; use base64::prelude::BASE64_STANDARD; use base64::Engine; use http::HeaderMap; diff --git a/src/azure/mod.rs b/src/azure/mod.rs index f65bf9f3..506666a2 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -309,6 +309,7 @@ mod tests { use super::*; use crate::integration::*; use crate::tests::*; + use crate::ObjectStoreExt; use bytes::Bytes; #[tokio::test] diff --git a/src/buffered.rs b/src/buffered.rs index f189c534..dddcebc7 100644 --- a/src/buffered.rs +++ b/src/buffered.rs @@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader { /// An async buffered writer compatible with the tokio IO traits /// -/// This writer adaptively uses [`ObjectStore::put`] or +/// This writer adaptively uses [`ObjectStore::put_opts`] or /// [`ObjectStore::put_multipart`] depending on the amount of data that has /// been written. /// /// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown -/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be +/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be /// streamed using [`ObjectStore::put_multipart`] pub struct BufWriter { capacity: usize, @@ -242,7 +242,7 @@ enum BufWriterState { Prepare(BoxFuture<'static, crate::Result>), /// Write to a multipart upload Write(Option), - /// [`ObjectStore::put`] + /// [`ObjectStore::put_opts`] Flush(BoxFuture<'static, crate::Result<()>>), } @@ -489,7 +489,7 @@ mod tests { use super::*; use crate::memory::InMemory; use crate::path::Path; - use crate::{Attribute, GetOptions}; + use crate::{Attribute, GetOptions, ObjectStoreExt}; use itertools::Itertools; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; diff --git a/src/chunked.rs b/src/chunked.rs index 8af3b2c4..4412085c 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -185,6 +185,7 @@ mod tests { use crate::local::LocalFileSystem; use crate::memory::InMemory; use crate::path::Path; + use crate::ObjectStoreExt; use super::*; diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 442b24fe..bbe6a21e 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -287,6 +287,7 @@ mod test { use crate::integration::*; use crate::tests::*; + use crate::ObjectStoreExt; use super::*; diff --git a/src/integration.rs b/src/integration.rs index 49b7be57..0e38b984 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -29,7 +29,7 @@ use crate::multipart::MultipartStore; use crate::path::Path; use crate::{ Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload, - ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart, + ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart, }; use bytes::Bytes; use futures::stream::FuturesUnordered; diff --git a/src/lib.rs b/src/lib.rs index 06edd33c..7a9b899c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -252,11 +252,11 @@ //! //! # Put Object //! -//! Use the [`ObjectStore::put`] method to atomically write data. +//! Use the [`ObjectStoreExt::put`] method to atomically write data. //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # use object_store::{ObjectStore, PutPayload}; +//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; //! # use std::sync::Arc; //! # use object_store::path::Path; //! # fn get_object_store() -> Arc { @@ -338,7 +338,7 @@ //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # use object_store::{ObjectStore, PutPayloadMut}; +//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut}; //! # use std::sync::Arc; //! # use bytes::Bytes; //! # use tokio::io::AsyncWriteExt; @@ -587,19 +587,22 @@ pub type DynObjectStore = dyn ObjectStore; pub type MultipartId = String; /// Universal API to multiple object store services. +/// +/// For more convience methods, check [`ObjectStoreExt`]. +/// +/// # Contract +/// This trait is meant as a contract between object store implementations +/// (e.g. providers, wrappers) and the `object_store` crate itself. +/// +/// The [`ObjectStoreExt`] acts as an API/contract between `object_store` +/// and the store users. #[async_trait] pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { - /// Save the provided bytes to the specified location + /// Save the provided `payload` to `location` with the given options /// /// The operation is guaranteed to be atomic, it will either successfully /// write the entirety of `payload` to `location`, or fail. No clients /// should be able to observe a partially written object - async fn put(&self, location: &Path, payload: PutPayload) -> Result { - self.put_opts(location, payload, PutOptions::default()) - .await - } - - /// Save the provided `payload` to `location` with the given options async fn put_opts( &self, location: &Path, @@ -609,7 +612,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Perform a multipart upload /// - /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads + /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads /// typically require multiple separate requests. See [`MultipartUpload`] for more information /// /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore) @@ -620,7 +623,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Perform a multipart upload with options /// - /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads + /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads /// typically require multiple separate requests. See [`MultipartUpload`] for more information /// /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore) @@ -696,7 +699,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// # async fn example() -> Result<(), Box> { /// # let root = tempfile::TempDir::new().unwrap(); /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - /// # use object_store::{ObjectStore, ObjectMeta}; + /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta}; /// # use object_store::path::Path; /// # use futures::{StreamExt, TryStreamExt}; /// # @@ -803,10 +806,6 @@ macro_rules! as_ref_impl { ($type:ty) => { #[async_trait] impl ObjectStore for $type { - async fn put(&self, location: &Path, payload: PutPayload) -> Result { - self.as_ref().put(location, payload).await - } - async fn put_opts( &self, location: &Path, @@ -901,6 +900,40 @@ macro_rules! as_ref_impl { as_ref_impl!(Arc); as_ref_impl!(Box); +/// Helper module to [seal traits](https://predr.ag/blog/definitive-guide-to-sealed-traits-in-rust/). +mod private { + pub trait Sealed {} + + impl Sealed for T where T: super::ObjectStore + ?Sized {} +} + +/// Extension trait for [`ObjectStore`] with convinience functions. +/// +/// See "contract" section within the [`ObjectStore`] documentation for more reasoning. +/// +/// # Implementation +/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations. +#[async_trait] +pub trait ObjectStoreExt: private::Sealed { + /// Save the provided bytes to the specified location + /// + /// The operation is guaranteed to be atomic, it will either successfully + /// write the entirety of `payload` to `location`, or fail. No clients + /// should be able to observe a partially written object + async fn put(&self, location: &Path, payload: PutPayload) -> Result; +} + +#[async_trait] +impl ObjectStoreExt for T +where + T: ObjectStore + private::Sealed + ?Sized, +{ + async fn put(&self, location: &Path, payload: PutPayload) -> Result { + self.put_opts(location, payload, PutOptions::default()) + .await + } +} + /// Result of a list call that includes objects, prefixes (directories) and a /// token for the next set of results. Individual result sets may be limited to /// 1,000 objects based on the underlying object storage's limitations. diff --git a/src/limit.rs b/src/limit.rs index 85714967..abb09c54 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -71,11 +71,6 @@ impl std::fmt::Display for LimitStore { #[async_trait] impl ObjectStore for LimitStore { - async fn put(&self, location: &Path, payload: PutPayload) -> Result { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.put(location, payload).await - } - async fn put_opts( &self, location: &Path, diff --git a/src/local.rs b/src/local.rs index dc2e2d10..c165c604 100644 --- a/src/local.rs +++ b/src/local.rs @@ -1090,7 +1090,7 @@ mod tests { #[cfg(target_family = "unix")] use tempfile::NamedTempFile; - use crate::integration::*; + use crate::{integration::*, ObjectStoreExt}; use super::*; diff --git a/src/memory.rs b/src/memory.rs index e15c2465..c8c2bed9 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -536,7 +536,7 @@ impl MultipartUpload for InMemoryUpload { #[cfg(test)] mod tests { - use crate::integration::*; + use crate::{integration::*, ObjectStoreExt}; use super::*; diff --git a/src/prefix.rs b/src/prefix.rs index e5a917aa..aaa0aeb0 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -98,11 +98,6 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta { } #[async_trait::async_trait] impl ObjectStore for PrefixStore { - async fn put(&self, location: &Path, payload: PutPayload) -> Result { - let full_path = self.full_path(location); - self.inner.put(&full_path, payload).await - } - async fn put_opts( &self, location: &Path, @@ -225,8 +220,8 @@ impl ObjectStore for PrefixStore { #[cfg(test)] mod tests { use super::*; - use crate::integration::*; use crate::local::LocalFileSystem; + use crate::{integration::*, ObjectStoreExt}; use tempfile::TempDir; diff --git a/src/throttle.rs b/src/throttle.rs index 8f40871c..f51743f6 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -90,7 +90,7 @@ pub struct ThrottleConfig { /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call). pub wait_list_with_delimiter_per_entry: Duration, - /// Sleep duration for every call to [`put`](ThrottledStore::put). + /// Sleep duration for every call to [`put_opts`](ThrottledStore::put_opts). /// /// Sleeping is done before the underlying store is called and independently of the success of /// the operation. @@ -148,11 +148,6 @@ impl std::fmt::Display for ThrottledStore { #[async_trait] impl ObjectStore for ThrottledStore { - async fn put(&self, location: &Path, payload: PutPayload) -> Result { - sleep(self.config().wait_put_per_call).await; - self.inner.put(location, payload).await - } - async fn put_opts( &self, location: &Path, @@ -404,6 +399,7 @@ impl MultipartUpload for ThrottledUpload { #[cfg(test)] mod tests { use super::*; + use crate::ObjectStoreExt; use crate::{integration::*, memory::InMemory, GetResultPayload}; use futures::TryStreamExt; use tokio::time::Duration;