Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ impl PageWriter for ArrowPageWriter {
pub struct ArrowLeafColumn(ArrayLevels);

/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
///
/// This function can be used along with [`get_column_writers`] to encode
/// individual columns in parallel. See example on [`ArrowColumnWriter`]
pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
let levels = calculate_array_levels(array, field)?;
Ok(levels.into_iter().map(ArrowLeafColumn).collect())
Expand Down Expand Up @@ -926,7 +929,7 @@ impl ArrowRowGroupWriterFactory {
}
}

/// Returns the [`ArrowColumnWriter`] for a given schema
/// Returns [`ArrowColumnWriter`]s for each column in a given schema
pub fn get_column_writers(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
Expand Down Expand Up @@ -970,7 +973,7 @@ fn get_column_writers_with_encryptor(
Ok(writers)
}

/// Gets [`ArrowColumnWriter`] instances for different data types
/// Creates [`ArrowColumnWriter`] instances
struct ArrowColumnWriterFactory {
#[cfg(feature = "encryption")]
row_group_index: usize,
Expand Down Expand Up @@ -1026,14 +1029,16 @@ impl ArrowColumnWriterFactory {
Ok(Box::<ArrowPageWriter>::default())
}

/// Gets the [`ArrowColumnWriter`] for the given `data_type`
/// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
/// output ColumnDesc to `leaves` and the column writers to `out`
fn get_arrow_column_writer(
&self,
data_type: &ArrowDataType,
props: &WriterPropertiesPtr,
leaves: &mut Iter<'_, ColumnDescPtr>,
out: &mut Vec<ArrowColumnWriter>,
) -> Result<()> {
// Instantiate writers for normal columns
let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
let page_writer = self.create_page_writer(desc, out.len())?;
let chunk = page_writer.buffer.clone();
Expand All @@ -1044,6 +1049,7 @@ impl ArrowColumnWriterFactory {
})
};

// Instantiate writers for byte arrays (e.g. Utf8, Binary, etc)
let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
let page_writer = self.create_page_writer(desc, out.len())?;
let chunk = page_writer.buffer.clone();
Expand Down
12 changes: 8 additions & 4 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ macro_rules! downcast_writer {
}

/// Column writer for a Parquet type.
///
/// See [`get_column_writer`] to create instances of this type
pub enum ColumnWriter<'a> {
/// Column writer for boolean type
BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
Expand Down Expand Up @@ -96,13 +98,13 @@ impl ColumnWriter<'_> {
downcast_writer!(self, typed, typed.get_estimated_total_bytes())
}

/// Close this [`ColumnWriter`]
/// Close this [`ColumnWriter`], returning the metadata for the column chunk.
pub fn close(self) -> Result<ColumnCloseResult> {
downcast_writer!(self, typed, typed.close())
}
}

/// Gets a specific column writer corresponding to column descriptor `descr`.
/// Create a specific column writer corresponding to column descriptor `descr`.
pub fn get_column_writer<'a>(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
Expand Down Expand Up @@ -173,7 +175,9 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
})
}

/// Metadata returned by [`GenericColumnWriter::close`]
/// Metadata for a column chunk of a Parquet file.
///
/// Note this structure is returned by [`ColumnWriter::close`].
#[derive(Debug, Clone)]
pub struct ColumnCloseResult {
/// The total number of bytes written
Expand Down Expand Up @@ -316,7 +320,7 @@ impl<T: Default> ColumnMetrics<T> {
/// Typed column writer for a primitive column.
pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;

/// Generic column writer for a primitive column.
/// Generic column writer for a primitive Parquet column
pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// Column writer properties
descr: ColumnDescPtr,
Expand Down
45 changes: 31 additions & 14 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.
//! [`SerializedFileWriter`]: Low level Parquet writer API

use crate::bloom_filter::Sbbf;
use crate::format as parquet;
Expand Down Expand Up @@ -139,7 +138,14 @@ pub type OnCloseRowGroup<'a, W> = Box<
// Serialized impl for file & row group writers

/// Parquet file writer API.
/// Provides methods to write row groups sequentially.
///
/// This is a low level API for writing Parquet files directly, and handles
/// tracking the location of file structures such as row groups and column
/// chunks, and writing the metadata and file footer.
///
/// Data is written to row groups using [`SerializedRowGroupWriter`] and
/// columns using [`SerializedColumnWriter`]. The `SerializedFileWriter` tracks
/// where all the data is written, and assembles the final file metadata.
///
/// The main workflow should be as following:
/// - Create file writer, this will open a new file and potentially write some metadata.
Expand Down Expand Up @@ -221,11 +227,13 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}

/// Creates new row group from this file writer.
/// In case of IO error or Thrift error, returns `Err`.
///
/// There can be at most 2^15 row groups in a file; and row groups have
/// to be written sequentially. Every time the next row group is requested, the
/// previous row group must be finalised and closed using `RowGroupWriter::close` method.
/// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
/// be written sequentially.
///
/// Every time the next row group is requested, the previous row group must
/// be finalised and closed using the [`SerializedRowGroupWriter::close`]
/// method or an error will be returned.
pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
self.assert_previous_writer_closed()?;
let ordinal = self.row_group_index;
Expand Down Expand Up @@ -396,8 +404,8 @@ impl<W: Write + Send> SerializedFileWriter<W> {

/// Writes the given buf bytes to the internal buffer.
///
/// This can be used to write raw data to an in-progress parquet file, for
/// example, custom index structures or other payloads. Other parquet readers
/// This can be used to write raw data to an in-progress Parquet file, for
/// example, custom index structures or other payloads. Other Parquet readers
/// will skip this data when reading the files.
///
/// It's safe to use this method to write data to the underlying writer,
Expand All @@ -409,7 +417,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
/// Returns a mutable reference to the underlying writer.
///
/// **Warning**: if you write directly to this writer, you will skip
/// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
/// the `TrackedWrite` buffering and byte‐counting layers, which can cause
/// the file footer’s recorded offsets and sizes to diverge from reality,
/// resulting in an unreadable or corrupted Parquet file.
///
Expand Down Expand Up @@ -478,6 +486,7 @@ fn write_bloom_filters<W: Write + Send>(
}

/// Parquet row group writer API.
///
/// Provides methods to access column writers in an iterator-like fashion, order is
/// guaranteed to match the order of schema leaves (column descriptors).
///
Expand Down Expand Up @@ -645,12 +654,20 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
})
}

/// Append an encoded column chunk from another source without decoding it
/// Append an encoded column chunk from `reader` directly to the underlying
/// writer.
///
/// This method can be used for efficiently concatenating or projecting
/// Parquet data, or encoding Parquet data to temporary in-memory buffers.
///
/// This can be used for efficiently concatenating or projecting parquet data,
/// or encoding parquet data to temporary in-memory buffers
/// Arguments:
/// - `reader`: a [`ChunkReader`] containing the encoded column data
/// - `close`: the [`ColumnCloseResult`] metadata returned from closing
/// the column writer that wrote the data in `reader`.
///
/// See [`Self::next_column`] for writing data that isn't already encoded
/// See Also:
/// 1. [`get_column_writer`] for creating writers that can encode data.
/// 2. [`Self::next_column`] for writing data that isn't already encoded
pub fn append_column<R: ChunkReader>(
&mut self,
reader: &R,
Expand Down
Loading