Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sea-query = "0.32"
sea-query-binder = "0.7"
serde = { version = "1.0.215", features = ["derive", "rc"] }
serde_json = "1.0.133"
serde_bytes = "0.11.17"
setsum = "0.7"
sprs = "0.11"
tantivy = "0.22.0"
Expand Down
1 change: 1 addition & 0 deletions rust/blockstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ path = "src/lib.rs"

[dependencies]
serde = { workspace = true }
serde_bytes = { workspace = true }
arrow = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
Expand Down
164 changes: 79 additions & 85 deletions rust/blockstore/src/arrow/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::io::SeekFrom;
use std::io::Read;
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;

use crate::arrow::types::{ArrowReadableKey, ArrowReadableValue};
use arrow::array::ArrayData;
use arrow::buffer::Buffer;
use arrow::ipc::reader::read_footer_length;
use arrow::ipc::{root_as_footer, root_as_message, MessageHeader, MetadataVersion};
use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::{read_footer_length, FileDecoder};
use arrow::ipc::{root_as_footer, root_as_message, Footer, MessageHeader, MetadataVersion};
use arrow::util::bit_util;
use arrow::{
array::{Array, StringArray},
Expand Down Expand Up @@ -66,9 +68,8 @@ impl<'de> Deserialize<'de> for RecordBatchWrapper {
where
D: serde::Deserializer<'de>,
{
let data = Vec::<u8>::deserialize(deserializer)?;
let reader = std::io::Cursor::new(data);
let rb = Block::load_record_batch(reader, false).map_err(D::Error::custom)?;
let data: &'de [u8] = serde_bytes::deserialize(deserializer)?;
let rb = Block::load_record_batch(data, false).map_err(D::Error::custom)?;
Ok(RecordBatchWrapper(rb))
}
}
Expand Down Expand Up @@ -521,20 +522,15 @@ impl Block {

/// Load a block from bytes in Arrow IPC format with the given id
pub fn from_bytes(bytes: &[u8], id: Uuid) -> Result<Self, BlockLoadError> {
Self::from_bytes_internal(bytes, id, false)
Self::load_from_bytes(bytes, id, false)
}

/// Load a block from bytes in Arrow IPC format with the given id and validate the layout
/// ### Notes
/// - This method should be used in tests to ensure that the layout of the IPC file is as expected
/// - The validation is not performant and should not be used in production code
pub fn from_bytes_with_validation(bytes: &[u8], id: Uuid) -> Result<Self, BlockLoadError> {
Self::from_bytes_internal(bytes, id, true)
}

fn from_bytes_internal(bytes: &[u8], id: Uuid, validate: bool) -> Result<Self, BlockLoadError> {
let cursor = std::io::Cursor::new(bytes);
Self::load_with_reader(cursor, id, validate)
Self::load_from_bytes(bytes, id, true)
}

/// Load a block from the given path with the given id and validate the layout
Expand All @@ -558,41 +554,47 @@ impl Block {
return Err(BlockLoadError::IOError(e));
}
};
let reader = std::io::BufReader::new(file);
Self::load_with_reader(reader, id, validate)
let mut reader = std::io::BufReader::new(file);
let mut target_buffer = Vec::with_capacity(reader.get_ref().metadata()?.len() as usize);
reader.read_to_end(&mut target_buffer)?;
Self::load_from_bytes(&target_buffer, id, validate)
}

fn load_with_reader<R>(reader: R, id: Uuid, validate: bool) -> Result<Self, BlockLoadError>
where
R: std::io::Read + std::io::Seek,
{
let batch = Self::load_record_batch(reader, validate)?;
// TODO: how to store / hydrate id?
fn load_from_bytes(bytes: &[u8], id: Uuid, validate: bool) -> Result<Self, BlockLoadError> {
let batch = Self::load_record_batch(bytes, validate)?;
Ok(Self::from_record_batch(id, batch))
}

fn load_record_batch<R>(mut reader: R, validate: bool) -> Result<RecordBatch, BlockLoadError>
where
R: std::io::Read + std::io::Seek,
{
fn load_record_batch(bytes: &[u8], validate: bool) -> Result<RecordBatch, BlockLoadError> {
if validate {
verify_buffers_layout(&mut reader)
.map_err(BlockLoadError::ArrowLayoutVerificationError)?;
verify_buffers_layout(bytes).map_err(BlockLoadError::ArrowLayoutVerificationError)?;
}

let mut arrow_reader = arrow::ipc::reader::FileReader::try_new(&mut reader, None)
.map_err(BlockLoadError::ArrowError)?;

let batch = match arrow_reader.next() {
Some(Ok(batch)) => batch,
Some(Err(e)) => {
return Err(BlockLoadError::ArrowError(e));
}
None => {
return Err(BlockLoadError::NoRecordBatches);
}
};
Ok(batch)
let footer =
read_arrow_footer(bytes).map_err(BlockLoadError::ArrowLayoutVerificationError)?;
let schema = footer
.schema()
.ok_or(BlockLoadError::ArrowLayoutVerificationError(
ArrowLayoutVerificationError::RecordBatchDecodeError,
))?;
let schema = fb_to_schema(schema);
// Requiring alignment should always work for blocks since we write them with alignment
// This is just being defensive
let decoder =
FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
let (block, record_batch_offset, _, record_batch_len) = read_record_batch_range(footer)?;

// This incurs a copy of the buffer we should be able to avoid this
// but as is foyer hands a reference to the [u8]. So the end to end
// path involves up to two copies, kernel to user space copy when reading from disk cache
// and then a copy into this buffer. We could avoid the second copy by changing foyer to
// hand over ownership of the buffer, but that would be a larger change.
// This is something we can optimize later if it becomes a bottleneck.
let buffer =
Buffer::from(&bytes[record_batch_offset..record_batch_offset + record_batch_len]);
decoder
.read_record_batch(block, &buffer)?
.ok_or(BlockLoadError::NoRecordBatches)
}
}

Expand Down Expand Up @@ -737,40 +739,25 @@ impl ChromaError for ArrowLayoutVerificationError {
}
}

/// Verifies that the buffers in the IPC file are 64 byte aligned
/// and stored in Arrow in the way we expect.
/// All non-benchmark test code should use this by loading the block
/// with verification enabled.
fn verify_buffers_layout<R>(mut reader: R) -> Result<(), ArrowLayoutVerificationError>
where
R: std::io::Read + std::io::Seek,
{
// Read the IPC file and verify that the buffers are 64 byte aligned
// by inspecting the offsets, this is required since our
// size calculation assumes that the buffers are 64 byte aligned
fn read_arrow_footer(bytes: &[u8]) -> Result<Footer<'_>, ArrowLayoutVerificationError> {
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
let mut footer_buffer = [0; 10];
reader
.seek(SeekFrom::End(-10))
.map_err(ArrowLayoutVerificationError::IOError)?;
reader
.read_exact(&mut footer_buffer)
.map_err(ArrowLayoutVerificationError::IOError)?;

let footer_len = read_footer_length(footer_buffer);
let footer_len = footer_len.map_err(ArrowLayoutVerificationError::ArrowError)?;
let trailer_start = bytes.len() - 10;
footer_buffer.copy_from_slice(&bytes[trailer_start..]);
let footer_len =
read_footer_length(footer_buffer).map_err(ArrowLayoutVerificationError::ArrowError)?;

// read footer
let mut footer_data = vec![0; footer_len];
reader
.seek(SeekFrom::End(-10 - footer_len as i64))
.map_err(ArrowLayoutVerificationError::IOError)?;
reader
.read_exact(&mut footer_data)
.map_err(ArrowLayoutVerificationError::IOError)?;
let footer_data = &bytes[trailer_start - footer_len..trailer_start];
let footer =
root_as_footer(&footer_data).map_err(ArrowLayoutVerificationError::InvalidFlatbuffer)?;
root_as_footer(footer_data).map_err(ArrowLayoutVerificationError::InvalidFlatbuffer)?;

Ok(footer)
}

fn read_record_batch_range(
footer: Footer<'_>,
) -> Result<(&arrow::ipc::Block, usize, usize, usize), ArrowLayoutVerificationError> {
// Read the record batch
let record_batch_definitions = match footer.recordBatches() {
Some(record_batch_definitions) => record_batch_definitions,
Expand All @@ -785,25 +772,32 @@ where
}

let record_batch_definition = record_batch_definitions.get(0);
let record_batch_len = record_batch_definition.bodyLength() as usize
+ record_batch_definition.metaDataLength() as usize;
let record_batch_offset = record_batch_definition.offset() as usize;
let record_batch_body_len = record_batch_definition.bodyLength() as usize;
let record_batch_len =
record_batch_body_len + record_batch_definition.metaDataLength() as usize;

Ok((
record_batch_definition,
record_batch_offset,
record_batch_body_len,
record_batch_len,
))
}

// Read the actual record batch
let mut file_buffer = vec![0; record_batch_len];
match reader.seek(SeekFrom::Start(record_batch_definition.offset() as u64)) {
Ok(_) => {}
Err(e) => {
return Err(ArrowLayoutVerificationError::IOError(e));
}
}
match reader.read_exact(&mut file_buffer) {
Ok(_) => {}
Err(e) => {
return Err(ArrowLayoutVerificationError::IOError(e));
}
}
let buffer = Buffer::from(file_buffer);
/// Verifies that the buffers in the IPC file are 64 byte aligned
/// and stored in Arrow in the way we expect.
/// All non-benchmark test code should use this by loading the block
/// with verification enabled.
fn verify_buffers_layout(bytes: &[u8]) -> Result<(), ArrowLayoutVerificationError> {
// Read the IPC file and verify that the buffers are 64 byte aligned
// by inspecting the offsets, this is required since our
// size calculation assumes that the buffers are 64 byte aligned
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
let footer = read_arrow_footer(bytes)?;
let (_, record_batch_offset, record_batch_body_len, record_batch_len) =
read_record_batch_range(footer)?;
let buffer = Buffer::from(&bytes[record_batch_offset..record_batch_offset + record_batch_len]);

// This is borrowed from arrow-ipc parse_message.rs
// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
Expand Down
Loading