diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 07ec43918f..a20adb6a5a 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -792,7 +792,8 @@ mod test { let projected_iceberg_field_ids = [1, 2, 3]; let mut transformer = - RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); let file_schema = Arc::new(ArrowSchema::new(vec![ simple_field("id", DataType::Int32, false, "1"), diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 6e2d152ed7..d6bb0b06f3 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -27,6 +27,19 @@ use url::Url; use super::storage::Storage; use crate::{Error, ErrorKind, Result}; +/// Write mode for output files. +/// +/// This controls whether to use append mode when writing to storage. +/// Note: This is different from OpenDAL's internal write strategy +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum WriteMode { + /// Standard write mode (default).. + #[default] + Standard, + /// Append mode (required for some storage backends like AZDLS). + Append, +} + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -144,6 +157,7 @@ impl FileIO { let (op, relative_path) = self.inner.create_operator(&path)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); + Ok(InputFile { op, path, @@ -160,10 +174,22 @@ impl FileIO { let (op, relative_path) = self.inner.create_operator(&path)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); + + // ADLS requires append mode for writes + #[cfg(feature = "storage-azdls")] + let write_mode = if matches!(self.inner.as_ref(), Storage::Azdls { .. }) { + WriteMode::Append + } else { + WriteMode::Standard + }; + #[cfg(not(feature = "storage-azdls"))] + let write_mode = WriteMode::Standard; + Ok(OutputFile { op, path, relative_path_pos, + write_mode, }) } } @@ -409,6 +435,8 @@ pub struct OutputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, + // Write mode for the file (required for some storage backends like AZDLS) + write_mode: WriteMode, } impl OutputFile { @@ -456,9 +484,13 @@ impl OutputFile { /// /// For one-time writing, use [`Self::write`] instead. pub async fn writer(&self) -> crate::Result> { - Ok(Box::new( - self.op.writer(&self.path[self.relative_path_pos..]).await?, - )) + let writer = self + .op + .writer_with(&self.path[self.relative_path_pos..]) + .append(self.write_mode == WriteMode::Append) + .await?; + + Ok(Box::new(writer)) } }