11use std:: cmp:: Ordering ;
22use std:: collections:: HashMap ;
3- use std:: io:: SeekFrom ;
3+ use std:: io:: Read ;
44use std:: ops:: { Bound , RangeBounds } ;
5+ use std:: sync:: Arc ;
56
67use crate :: arrow:: types:: { ArrowReadableKey , ArrowReadableValue } ;
78use arrow:: array:: ArrayData ;
89use arrow:: buffer:: Buffer ;
9- use arrow:: ipc:: reader:: read_footer_length;
10- use arrow:: ipc:: { root_as_footer, root_as_message, MessageHeader , MetadataVersion } ;
10+ use arrow:: ipc:: convert:: fb_to_schema;
11+ use arrow:: ipc:: reader:: { read_footer_length, FileDecoder } ;
12+ use arrow:: ipc:: { root_as_footer, root_as_message, Footer , MessageHeader , MetadataVersion } ;
1113use arrow:: util:: bit_util;
1214use arrow:: {
1315 array:: { Array , StringArray } ,
@@ -66,9 +68,8 @@ impl<'de> Deserialize<'de> for RecordBatchWrapper {
6668 where
6769 D : serde:: Deserializer < ' de > ,
6870 {
69- let data = Vec :: < u8 > :: deserialize ( deserializer) ?;
70- let reader = std:: io:: Cursor :: new ( data) ;
71- let rb = Block :: load_record_batch ( reader, false ) . map_err ( D :: Error :: custom) ?;
71+ let data: & ' de [ u8 ] = serde_bytes:: deserialize ( deserializer) ?;
72+ let rb = Block :: load_record_batch ( data, false ) . map_err ( D :: Error :: custom) ?;
7273 Ok ( RecordBatchWrapper ( rb) )
7374 }
7475}
@@ -521,20 +522,15 @@ impl Block {
521522
522523 /// Load a block from bytes in Arrow IPC format with the given id
523524 pub fn from_bytes ( bytes : & [ u8 ] , id : Uuid ) -> Result < Self , BlockLoadError > {
524- Self :: from_bytes_internal ( bytes, id, false )
525+ Self :: load_from_bytes ( bytes, id, false )
525526 }
526527
527528 /// Load a block from bytes in Arrow IPC format with the given id and validate the layout
528529 /// ### Notes
529530 /// - This method should be used in tests to ensure that the layout of the IPC file is as expected
530531 /// - The validation is not performant and should not be used in production code
531532 pub fn from_bytes_with_validation ( bytes : & [ u8 ] , id : Uuid ) -> Result < Self , BlockLoadError > {
532- Self :: from_bytes_internal ( bytes, id, true )
533- }
534-
535- fn from_bytes_internal ( bytes : & [ u8 ] , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError > {
536- let cursor = std:: io:: Cursor :: new ( bytes) ;
537- Self :: load_with_reader ( cursor, id, validate)
533+ Self :: load_from_bytes ( bytes, id, true )
538534 }
539535
540536 /// Load a block from the given path with the given id and validate the layout
@@ -558,41 +554,47 @@ impl Block {
558554 return Err ( BlockLoadError :: IOError ( e) ) ;
559555 }
560556 } ;
561- let reader = std:: io:: BufReader :: new ( file) ;
562- Self :: load_with_reader ( reader, id, validate)
557+ let mut reader = std:: io:: BufReader :: new ( file) ;
558+ let mut target_buffer = Vec :: with_capacity ( reader. get_ref ( ) . metadata ( ) ?. len ( ) as usize ) ;
559+ reader. read_to_end ( & mut target_buffer) ?;
560+ Self :: load_from_bytes ( & target_buffer, id, validate)
563561 }
564562
565- fn load_with_reader < R > ( reader : R , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError >
566- where
567- R : std:: io:: Read + std:: io:: Seek ,
568- {
569- let batch = Self :: load_record_batch ( reader, validate) ?;
570- // TODO: how to store / hydrate id?
563+ fn load_from_bytes ( bytes : & [ u8 ] , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError > {
564+ let batch = Self :: load_record_batch ( bytes, validate) ?;
571565 Ok ( Self :: from_record_batch ( id, batch) )
572566 }
573567
574- fn load_record_batch < R > ( mut reader : R , validate : bool ) -> Result < RecordBatch , BlockLoadError >
575- where
576- R : std:: io:: Read + std:: io:: Seek ,
577- {
568+ fn load_record_batch ( bytes : & [ u8 ] , validate : bool ) -> Result < RecordBatch , BlockLoadError > {
578569 if validate {
579- verify_buffers_layout ( & mut reader)
580- . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
570+ verify_buffers_layout ( bytes) . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
581571 }
582572
583- let mut arrow_reader = arrow:: ipc:: reader:: FileReader :: try_new ( & mut reader, None )
584- . map_err ( BlockLoadError :: ArrowError ) ?;
585-
586- let batch = match arrow_reader. next ( ) {
587- Some ( Ok ( batch) ) => batch,
588- Some ( Err ( e) ) => {
589- return Err ( BlockLoadError :: ArrowError ( e) ) ;
590- }
591- None => {
592- return Err ( BlockLoadError :: NoRecordBatches ) ;
593- }
594- } ;
595- Ok ( batch)
573+ let footer =
574+ read_arrow_footer ( bytes) . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
575+ let schema = footer
576+ . schema ( )
577+ . ok_or ( BlockLoadError :: ArrowLayoutVerificationError (
578+ ArrowLayoutVerificationError :: RecordBatchDecodeError ,
579+ ) ) ?;
580+ let schema = fb_to_schema ( schema) ;
581+ // Requiring alignment should always work for blocks since we write them with alignment
582+ // This is just being defensive
583+ let decoder =
584+ FileDecoder :: new ( Arc :: new ( schema) , footer. version ( ) ) . with_require_alignment ( true ) ;
585+ let ( block, record_batch_offset, _, record_batch_len) = read_record_batch_range ( footer) ?;
586+
587+ // This incurs a copy of the buffer we should be able to avoid this
588+ // but as is foyer hands a reference to the [u8]. So the end to end
589+ // path involves up to two copies, kernel to user space copy when reading from disk cache
590+ // and then a copy into this buffer. We could avoid the second copy by changing foyer to
591+ // hand over ownership of the buffer, but that would be a larger change.
592+ // This is something we can optimize later if it becomes a bottleneck.
593+ let buffer =
594+ Buffer :: from ( & bytes[ record_batch_offset..record_batch_offset + record_batch_len] ) ;
595+ decoder
596+ . read_record_batch ( block, & buffer) ?
597+ . ok_or ( BlockLoadError :: NoRecordBatches )
596598 }
597599}
598600
@@ -737,40 +739,25 @@ impl ChromaError for ArrowLayoutVerificationError {
737739 }
738740}
739741
740- /// Verifies that the buffers in the IPC file are 64 byte aligned
741- /// and stored in Arrow in the way we expect.
742- /// All non-benchmark test code should use this by loading the block
743- /// with verification enabled.
744- fn verify_buffers_layout < R > ( mut reader : R ) -> Result < ( ) , ArrowLayoutVerificationError >
745- where
746- R : std:: io:: Read + std:: io:: Seek ,
747- {
748- // Read the IPC file and verify that the buffers are 64 byte aligned
749- // by inspecting the offsets, this is required since our
750- // size calculation assumes that the buffers are 64 byte aligned
742+ fn read_arrow_footer ( bytes : & [ u8 ] ) -> Result < Footer < ' _ > , ArrowLayoutVerificationError > {
751743 // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
752744 let mut footer_buffer = [ 0 ; 10 ] ;
753- reader
754- . seek ( SeekFrom :: End ( -10 ) )
755- . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
756- reader
757- . read_exact ( & mut footer_buffer)
758- . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
759-
760- let footer_len = read_footer_length ( footer_buffer) ;
761- let footer_len = footer_len. map_err ( ArrowLayoutVerificationError :: ArrowError ) ?;
745+ let trailer_start = bytes. len ( ) - 10 ;
746+ footer_buffer. copy_from_slice ( & bytes[ trailer_start..] ) ;
747+ let footer_len =
748+ read_footer_length ( footer_buffer) . map_err ( ArrowLayoutVerificationError :: ArrowError ) ?;
762749
763750 // read footer
764- let mut footer_data = vec ! [ 0 ; footer_len] ;
765- reader
766- . seek ( SeekFrom :: End ( -10 - footer_len as i64 ) )
767- . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
768- reader
769- . read_exact ( & mut footer_data)
770- . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
751+ let footer_data = & bytes[ trailer_start - footer_len..trailer_start] ;
771752 let footer =
772- root_as_footer ( & footer_data) . map_err ( ArrowLayoutVerificationError :: InvalidFlatbuffer ) ?;
753+ root_as_footer ( footer_data) . map_err ( ArrowLayoutVerificationError :: InvalidFlatbuffer ) ?;
773754
755+ Ok ( footer)
756+ }
757+
758+ fn read_record_batch_range (
759+ footer : Footer < ' _ > ,
760+ ) -> Result < ( & arrow:: ipc:: Block , usize , usize , usize ) , ArrowLayoutVerificationError > {
774761 // Read the record batch
775762 let record_batch_definitions = match footer. recordBatches ( ) {
776763 Some ( record_batch_definitions) => record_batch_definitions,
@@ -785,25 +772,32 @@ where
785772 }
786773
787774 let record_batch_definition = record_batch_definitions. get ( 0 ) ;
788- let record_batch_len = record_batch_definition. bodyLength ( ) as usize
789- + record_batch_definition. metaDataLength ( ) as usize ;
775+ let record_batch_offset = record_batch_definition. offset ( ) as usize ;
790776 let record_batch_body_len = record_batch_definition. bodyLength ( ) as usize ;
777+ let record_batch_len =
778+ record_batch_body_len + record_batch_definition. metaDataLength ( ) as usize ;
779+
780+ Ok ( (
781+ record_batch_definition,
782+ record_batch_offset,
783+ record_batch_body_len,
784+ record_batch_len,
785+ ) )
786+ }
791787
792- // Read the actual record batch
793- let mut file_buffer = vec ! [ 0 ; record_batch_len] ;
794- match reader. seek ( SeekFrom :: Start ( record_batch_definition. offset ( ) as u64 ) ) {
795- Ok ( _) => { }
796- Err ( e) => {
797- return Err ( ArrowLayoutVerificationError :: IOError ( e) ) ;
798- }
799- }
800- match reader. read_exact ( & mut file_buffer) {
801- Ok ( _) => { }
802- Err ( e) => {
803- return Err ( ArrowLayoutVerificationError :: IOError ( e) ) ;
804- }
805- }
806- let buffer = Buffer :: from ( file_buffer) ;
788+ /// Verifies that the buffers in the IPC file are 64 byte aligned
789+ /// and stored in Arrow in the way we expect.
790+ /// All non-benchmark test code should use this by loading the block
791+ /// with verification enabled.
792+ fn verify_buffers_layout ( bytes : & [ u8 ] ) -> Result < ( ) , ArrowLayoutVerificationError > {
793+ // Read the IPC file and verify that the buffers are 64 byte aligned
794+ // by inspecting the offsets, this is required since our
795+ // size calculation assumes that the buffers are 64 byte aligned
796+ // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
797+ let footer = read_arrow_footer ( bytes) ?;
798+ let ( _, record_batch_offset, record_batch_body_len, record_batch_len) =
799+ read_record_batch_range ( footer) ?;
800+ let buffer = Buffer :: from ( & bytes[ record_batch_offset..record_batch_offset + record_batch_len] ) ;
807801
808802 // This is borrowed from arrow-ipc parse_message.rs
809803 // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
0 commit comments