diff --git a/Cargo.lock b/Cargo.lock index fdd8f0a44dc..51e4d4669c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1472,6 +1472,7 @@ dependencies = [ "rand 0.8.5", "roaring", "serde", + "serde_bytes", "shuttle", "tempfile", "thiserror 1.0.69", @@ -7340,6 +7341,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_bytes" +version = "0.11.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.215" diff --git a/Cargo.toml b/Cargo.toml index a78f0808408..fb9e26515a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ sea-query = "0.32" sea-query-binder = "0.7" serde = { version = "1.0.215", features = ["derive", "rc"] } serde_json = "1.0.133" +serde_bytes = "0.11.17" setsum = "0.7" sprs = "0.11" tantivy = "0.22.0" diff --git a/rust/blockstore/Cargo.toml b/rust/blockstore/Cargo.toml index a1609989bcf..9d7311e0cac 100644 --- a/rust/blockstore/Cargo.toml +++ b/rust/blockstore/Cargo.toml @@ -8,6 +8,7 @@ path = "src/lib.rs" [dependencies] serde = { workspace = true } +serde_bytes = { workspace = true } arrow = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } diff --git a/rust/blockstore/src/arrow/block/types.rs b/rust/blockstore/src/arrow/block/types.rs index 53d2ec302b6..9d65555a255 100644 --- a/rust/blockstore/src/arrow/block/types.rs +++ b/rust/blockstore/src/arrow/block/types.rs @@ -1,13 +1,15 @@ use std::cmp::Ordering; use std::collections::HashMap; -use std::io::SeekFrom; +use std::io::Read; use std::ops::{Bound, RangeBounds}; +use std::sync::Arc; use crate::arrow::types::{ArrowReadableKey, ArrowReadableValue}; use arrow::array::ArrayData; use arrow::buffer::Buffer; -use arrow::ipc::reader::read_footer_length; -use arrow::ipc::{root_as_footer, root_as_message, MessageHeader, MetadataVersion}; +use arrow::ipc::convert::fb_to_schema; +use arrow::ipc::reader::{read_footer_length, FileDecoder}; +use arrow::ipc::{root_as_footer, root_as_message, Footer, MessageHeader, MetadataVersion}; use arrow::util::bit_util; use arrow::{ array::{Array, StringArray}, @@ -66,9 +68,8 @@ impl<'de> Deserialize<'de> for RecordBatchWrapper { where D: serde::Deserializer<'de>, { - let data = Vec::::deserialize(deserializer)?; - let reader = std::io::Cursor::new(data); - let rb = Block::load_record_batch(reader, false).map_err(D::Error::custom)?; + let data: &'de [u8] = serde_bytes::deserialize(deserializer)?; + let rb = Block::load_record_batch(data, false).map_err(D::Error::custom)?; Ok(RecordBatchWrapper(rb)) } } @@ -521,7 +522,7 @@ impl Block { /// Load a block from bytes in Arrow IPC format with the given id pub fn from_bytes(bytes: &[u8], id: Uuid) -> Result { - Self::from_bytes_internal(bytes, id, false) + Self::load_from_bytes(bytes, id, false) } /// Load a block from bytes in Arrow IPC format with the given id and validate the layout @@ -529,12 +530,7 @@ impl Block { /// - This method should be used in tests to ensure that the layout of the IPC file is as expected /// - The validation is not performant and should not be used in production code pub fn from_bytes_with_validation(bytes: &[u8], id: Uuid) -> Result { - Self::from_bytes_internal(bytes, id, true) - } - - fn from_bytes_internal(bytes: &[u8], id: Uuid, validate: bool) -> Result { - let cursor = std::io::Cursor::new(bytes); - Self::load_with_reader(cursor, id, validate) + Self::load_from_bytes(bytes, id, true) } /// Load a block from the given path with the given id and validate the layout @@ -558,41 +554,47 @@ impl Block { return Err(BlockLoadError::IOError(e)); } }; - let reader = std::io::BufReader::new(file); - Self::load_with_reader(reader, id, validate) + let mut reader = std::io::BufReader::new(file); + let mut target_buffer = Vec::with_capacity(reader.get_ref().metadata()?.len() as usize); + reader.read_to_end(&mut target_buffer)?; + Self::load_from_bytes(&target_buffer, id, validate) } - fn load_with_reader(reader: R, id: Uuid, validate: bool) -> Result - where - R: std::io::Read + std::io::Seek, - { - let batch = Self::load_record_batch(reader, validate)?; - // TODO: how to store / hydrate id? + fn load_from_bytes(bytes: &[u8], id: Uuid, validate: bool) -> Result { + let batch = Self::load_record_batch(bytes, validate)?; Ok(Self::from_record_batch(id, batch)) } - fn load_record_batch(mut reader: R, validate: bool) -> Result - where - R: std::io::Read + std::io::Seek, - { + fn load_record_batch(bytes: &[u8], validate: bool) -> Result { if validate { - verify_buffers_layout(&mut reader) - .map_err(BlockLoadError::ArrowLayoutVerificationError)?; + verify_buffers_layout(bytes).map_err(BlockLoadError::ArrowLayoutVerificationError)?; } - let mut arrow_reader = arrow::ipc::reader::FileReader::try_new(&mut reader, None) - .map_err(BlockLoadError::ArrowError)?; - - let batch = match arrow_reader.next() { - Some(Ok(batch)) => batch, - Some(Err(e)) => { - return Err(BlockLoadError::ArrowError(e)); - } - None => { - return Err(BlockLoadError::NoRecordBatches); - } - }; - Ok(batch) + let footer = + read_arrow_footer(bytes).map_err(BlockLoadError::ArrowLayoutVerificationError)?; + let schema = footer + .schema() + .ok_or(BlockLoadError::ArrowLayoutVerificationError( + ArrowLayoutVerificationError::RecordBatchDecodeError, + ))?; + let schema = fb_to_schema(schema); + // Requiring alignment should always work for blocks since we write them with alignment + // This is just being defensive + let decoder = + FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true); + let (block, record_batch_offset, _, record_batch_len) = read_record_batch_range(footer)?; + + // This incurs a copy of the buffer we should be able to avoid this + // but as is foyer hands a reference to the [u8]. So the end to end + // path involves up to two copies, kernel to user space copy when reading from disk cache + // and then a copy into this buffer. We could avoid the second copy by changing foyer to + // hand over ownership of the buffer, but that would be a larger change. + // This is something we can optimize later if it becomes a bottleneck. + let buffer = + Buffer::from(&bytes[record_batch_offset..record_batch_offset + record_batch_len]); + decoder + .read_record_batch(block, &buffer)? + .ok_or(BlockLoadError::NoRecordBatches) } } @@ -737,40 +739,25 @@ impl ChromaError for ArrowLayoutVerificationError { } } -/// Verifies that the buffers in the IPC file are 64 byte aligned -/// and stored in Arrow in the way we expect. -/// All non-benchmark test code should use this by loading the block -/// with verification enabled. -fn verify_buffers_layout(mut reader: R) -> Result<(), ArrowLayoutVerificationError> -where - R: std::io::Read + std::io::Seek, -{ - // Read the IPC file and verify that the buffers are 64 byte aligned - // by inspecting the offsets, this is required since our - // size calculation assumes that the buffers are 64 byte aligned +fn read_arrow_footer(bytes: &[u8]) -> Result, ArrowLayoutVerificationError> { // Space for ARROW_MAGIC (6 bytes) and length (4 bytes) let mut footer_buffer = [0; 10]; - reader - .seek(SeekFrom::End(-10)) - .map_err(ArrowLayoutVerificationError::IOError)?; - reader - .read_exact(&mut footer_buffer) - .map_err(ArrowLayoutVerificationError::IOError)?; - - let footer_len = read_footer_length(footer_buffer); - let footer_len = footer_len.map_err(ArrowLayoutVerificationError::ArrowError)?; + let trailer_start = bytes.len() - 10; + footer_buffer.copy_from_slice(&bytes[trailer_start..]); + let footer_len = + read_footer_length(footer_buffer).map_err(ArrowLayoutVerificationError::ArrowError)?; // read footer - let mut footer_data = vec![0; footer_len]; - reader - .seek(SeekFrom::End(-10 - footer_len as i64)) - .map_err(ArrowLayoutVerificationError::IOError)?; - reader - .read_exact(&mut footer_data) - .map_err(ArrowLayoutVerificationError::IOError)?; + let footer_data = &bytes[trailer_start - footer_len..trailer_start]; let footer = - root_as_footer(&footer_data).map_err(ArrowLayoutVerificationError::InvalidFlatbuffer)?; + root_as_footer(footer_data).map_err(ArrowLayoutVerificationError::InvalidFlatbuffer)?; + Ok(footer) +} + +fn read_record_batch_range( + footer: Footer<'_>, +) -> Result<(&arrow::ipc::Block, usize, usize, usize), ArrowLayoutVerificationError> { // Read the record batch let record_batch_definitions = match footer.recordBatches() { Some(record_batch_definitions) => record_batch_definitions, @@ -785,25 +772,32 @@ where } let record_batch_definition = record_batch_definitions.get(0); - let record_batch_len = record_batch_definition.bodyLength() as usize - + record_batch_definition.metaDataLength() as usize; + let record_batch_offset = record_batch_definition.offset() as usize; let record_batch_body_len = record_batch_definition.bodyLength() as usize; + let record_batch_len = + record_batch_body_len + record_batch_definition.metaDataLength() as usize; + + Ok(( + record_batch_definition, + record_batch_offset, + record_batch_body_len, + record_batch_len, + )) +} - // Read the actual record batch - let mut file_buffer = vec![0; record_batch_len]; - match reader.seek(SeekFrom::Start(record_batch_definition.offset() as u64)) { - Ok(_) => {} - Err(e) => { - return Err(ArrowLayoutVerificationError::IOError(e)); - } - } - match reader.read_exact(&mut file_buffer) { - Ok(_) => {} - Err(e) => { - return Err(ArrowLayoutVerificationError::IOError(e)); - } - } - let buffer = Buffer::from(file_buffer); +/// Verifies that the buffers in the IPC file are 64 byte aligned +/// and stored in Arrow in the way we expect. +/// All non-benchmark test code should use this by loading the block +/// with verification enabled. +fn verify_buffers_layout(bytes: &[u8]) -> Result<(), ArrowLayoutVerificationError> { + // Read the IPC file and verify that the buffers are 64 byte aligned + // by inspecting the offsets, this is required since our + // size calculation assumes that the buffers are 64 byte aligned + // Space for ARROW_MAGIC (6 bytes) and length (4 bytes) + let footer = read_arrow_footer(bytes)?; + let (_, record_batch_offset, record_batch_body_len, record_batch_len) = + read_record_batch_range(footer)?; + let buffer = Buffer::from(&bytes[record_batch_offset..record_batch_offset + record_batch_len]); // This is borrowed from arrow-ipc parse_message.rs // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format