11use std:: cmp:: Ordering ;
22use std:: collections:: HashMap ;
3- use std:: io:: Read ;
3+ use std:: io:: SeekFrom ;
44use std:: ops:: { Bound , RangeBounds } ;
5- use std:: sync:: Arc ;
65
76use crate :: arrow:: types:: { ArrowReadableKey , ArrowReadableValue } ;
87use arrow:: array:: ArrayData ;
98use arrow:: buffer:: Buffer ;
10- use arrow:: ipc:: convert:: fb_to_schema;
11- use arrow:: ipc:: reader:: { read_footer_length, FileDecoder } ;
12- use arrow:: ipc:: { root_as_footer, root_as_message, Footer , MessageHeader , MetadataVersion } ;
9+ use arrow:: ipc:: reader:: read_footer_length;
10+ use arrow:: ipc:: { root_as_footer, root_as_message, MessageHeader , MetadataVersion } ;
1311use arrow:: util:: bit_util;
1412use arrow:: {
1513 array:: { Array , StringArray } ,
@@ -68,8 +66,9 @@ impl<'de> Deserialize<'de> for RecordBatchWrapper {
6866 where
6967 D : serde:: Deserializer < ' de > ,
7068 {
71- let data: & ' de [ u8 ] = serde_bytes:: deserialize ( deserializer) ?;
72- let rb = Block :: load_record_batch ( data, false ) . map_err ( D :: Error :: custom) ?;
69+ let data = Vec :: < u8 > :: deserialize ( deserializer) ?;
70+ let reader = std:: io:: Cursor :: new ( data) ;
71+ let rb = Block :: load_record_batch ( reader, false ) . map_err ( D :: Error :: custom) ?;
7372 Ok ( RecordBatchWrapper ( rb) )
7473 }
7574}
@@ -522,15 +521,20 @@ impl Block {
522521
523522 /// Load a block from bytes in Arrow IPC format with the given id
524523 pub fn from_bytes ( bytes : & [ u8 ] , id : Uuid ) -> Result < Self , BlockLoadError > {
525- Self :: load_from_bytes ( bytes, id, false )
524+ Self :: from_bytes_internal ( bytes, id, false )
526525 }
527526
528527 /// Load a block from bytes in Arrow IPC format with the given id and validate the layout
529528 /// ### Notes
530529 /// - This method should be used in tests to ensure that the layout of the IPC file is as expected
531530 /// - The validation is not performant and should not be used in production code
532531 pub fn from_bytes_with_validation ( bytes : & [ u8 ] , id : Uuid ) -> Result < Self , BlockLoadError > {
533- Self :: load_from_bytes ( bytes, id, true )
532+ Self :: from_bytes_internal ( bytes, id, true )
533+ }
534+
535+ fn from_bytes_internal ( bytes : & [ u8 ] , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError > {
536+ let cursor = std:: io:: Cursor :: new ( bytes) ;
537+ Self :: load_with_reader ( cursor, id, validate)
534538 }
535539
536540 /// Load a block from the given path with the given id and validate the layout
@@ -554,47 +558,41 @@ impl Block {
554558 return Err ( BlockLoadError :: IOError ( e) ) ;
555559 }
556560 } ;
557- let mut reader = std:: io:: BufReader :: new ( file) ;
558- let mut target_buffer = Vec :: with_capacity ( reader. get_ref ( ) . metadata ( ) ?. len ( ) as usize ) ;
559- reader. read_to_end ( & mut target_buffer) ?;
560- Self :: load_from_bytes ( & target_buffer, id, validate)
561+ let reader = std:: io:: BufReader :: new ( file) ;
562+ Self :: load_with_reader ( reader, id, validate)
561563 }
562564
563- fn load_from_bytes ( bytes : & [ u8 ] , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError > {
564- let batch = Self :: load_record_batch ( bytes, validate) ?;
565+ fn load_with_reader < R > ( reader : R , id : Uuid , validate : bool ) -> Result < Self , BlockLoadError >
566+ where
567+ R : std:: io:: Read + std:: io:: Seek ,
568+ {
569+ let batch = Self :: load_record_batch ( reader, validate) ?;
570+ // TODO: how to store / hydrate id?
565571 Ok ( Self :: from_record_batch ( id, batch) )
566572 }
567573
568- fn load_record_batch ( bytes : & [ u8 ] , validate : bool ) -> Result < RecordBatch , BlockLoadError > {
574+ fn load_record_batch < R > ( mut reader : R , validate : bool ) -> Result < RecordBatch , BlockLoadError >
575+ where
576+ R : std:: io:: Read + std:: io:: Seek ,
577+ {
569578 if validate {
570- verify_buffers_layout ( bytes) . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
579+ verify_buffers_layout ( & mut reader)
580+ . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
571581 }
572582
573- let footer =
574- read_arrow_footer ( bytes) . map_err ( BlockLoadError :: ArrowLayoutVerificationError ) ?;
575- let schema = footer
576- . schema ( )
577- . ok_or ( BlockLoadError :: ArrowLayoutVerificationError (
578- ArrowLayoutVerificationError :: RecordBatchDecodeError ,
579- ) ) ?;
580- let schema = fb_to_schema ( schema) ;
581- // Requiring alignment should always work for blocks since we write them with alignment
582- // This is just being defensive
583- let decoder =
584- FileDecoder :: new ( Arc :: new ( schema) , footer. version ( ) ) . with_require_alignment ( true ) ;
585- let ( block, record_batch_offset, _, record_batch_len) = read_record_batch_range ( footer) ?;
586-
587- // This incurs a copy of the buffer we should be able to avoid this
588- // but as is foyer hands a reference to the [u8]. So the end to end
589- // path involves up to two copies, kernel to user space copy when reading from disk cache
590- // and then a copy into this buffer. We could avoid the second copy by changing foyer to
591- // hand over ownership of the buffer, but that would be a larger change.
592- // This is something we can optimize later if it becomes a bottleneck.
593- let buffer =
594- Buffer :: from ( & bytes[ record_batch_offset..record_batch_offset + record_batch_len] ) ;
595- decoder
596- . read_record_batch ( block, & buffer) ?
597- . ok_or ( BlockLoadError :: NoRecordBatches )
583+ let mut arrow_reader = arrow:: ipc:: reader:: FileReader :: try_new ( & mut reader, None )
584+ . map_err ( BlockLoadError :: ArrowError ) ?;
585+
586+ let batch = match arrow_reader. next ( ) {
587+ Some ( Ok ( batch) ) => batch,
588+ Some ( Err ( e) ) => {
589+ return Err ( BlockLoadError :: ArrowError ( e) ) ;
590+ }
591+ None => {
592+ return Err ( BlockLoadError :: NoRecordBatches ) ;
593+ }
594+ } ;
595+ Ok ( batch)
598596 }
599597}
600598
@@ -739,25 +737,40 @@ impl ChromaError for ArrowLayoutVerificationError {
739737 }
740738}
741739
742- fn read_arrow_footer ( bytes : & [ u8 ] ) -> Result < Footer < ' _ > , ArrowLayoutVerificationError > {
740+ /// Verifies that the buffers in the IPC file are 64 byte aligned
741+ /// and stored in Arrow in the way we expect.
742+ /// All non-benchmark test code should use this by loading the block
743+ /// with verification enabled.
744+ fn verify_buffers_layout < R > ( mut reader : R ) -> Result < ( ) , ArrowLayoutVerificationError >
745+ where
746+ R : std:: io:: Read + std:: io:: Seek ,
747+ {
748+ // Read the IPC file and verify that the buffers are 64 byte aligned
749+ // by inspecting the offsets, this is required since our
750+ // size calculation assumes that the buffers are 64 byte aligned
743751 // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
744752 let mut footer_buffer = [ 0 ; 10 ] ;
745- let trailer_start = bytes. len ( ) - 10 ;
746- footer_buffer. copy_from_slice ( & bytes[ trailer_start..] ) ;
747- let footer_len =
748- read_footer_length ( footer_buffer) . map_err ( ArrowLayoutVerificationError :: ArrowError ) ?;
753+ reader
754+ . seek ( SeekFrom :: End ( -10 ) )
755+ . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
756+ reader
757+ . read_exact ( & mut footer_buffer)
758+ . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
759+
760+ let footer_len = read_footer_length ( footer_buffer) ;
761+ let footer_len = footer_len. map_err ( ArrowLayoutVerificationError :: ArrowError ) ?;
749762
750763 // read footer
751- let footer_data = & bytes[ trailer_start - footer_len..trailer_start] ;
764+ let mut footer_data = vec ! [ 0 ; footer_len] ;
765+ reader
766+ . seek ( SeekFrom :: End ( -10 - footer_len as i64 ) )
767+ . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
768+ reader
769+ . read_exact ( & mut footer_data)
770+ . map_err ( ArrowLayoutVerificationError :: IOError ) ?;
752771 let footer =
753- root_as_footer ( footer_data) . map_err ( ArrowLayoutVerificationError :: InvalidFlatbuffer ) ?;
772+ root_as_footer ( & footer_data) . map_err ( ArrowLayoutVerificationError :: InvalidFlatbuffer ) ?;
754773
755- Ok ( footer)
756- }
757-
758- fn read_record_batch_range (
759- footer : Footer < ' _ > ,
760- ) -> Result < ( & arrow:: ipc:: Block , usize , usize , usize ) , ArrowLayoutVerificationError > {
761774 // Read the record batch
762775 let record_batch_definitions = match footer. recordBatches ( ) {
763776 Some ( record_batch_definitions) => record_batch_definitions,
@@ -772,32 +785,25 @@ fn read_record_batch_range(
772785 }
773786
774787 let record_batch_definition = record_batch_definitions. get ( 0 ) ;
775- let record_batch_offset = record_batch_definition. offset ( ) as usize ;
788+ let record_batch_len = record_batch_definition. bodyLength ( ) as usize
789+ + record_batch_definition. metaDataLength ( ) as usize ;
776790 let record_batch_body_len = record_batch_definition. bodyLength ( ) as usize ;
777- let record_batch_len =
778- record_batch_body_len + record_batch_definition. metaDataLength ( ) as usize ;
779-
780- Ok ( (
781- record_batch_definition,
782- record_batch_offset,
783- record_batch_body_len,
784- record_batch_len,
785- ) )
786- }
787791
788- /// Verifies that the buffers in the IPC file are 64 byte aligned
789- /// and stored in Arrow in the way we expect.
790- /// All non-benchmark test code should use this by loading the block
791- /// with verification enabled.
792- fn verify_buffers_layout ( bytes : & [ u8 ] ) -> Result < ( ) , ArrowLayoutVerificationError > {
793- // Read the IPC file and verify that the buffers are 64 byte aligned
794- // by inspecting the offsets, this is required since our
795- // size calculation assumes that the buffers are 64 byte aligned
796- // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
797- let footer = read_arrow_footer ( bytes) ?;
798- let ( _, record_batch_offset, record_batch_body_len, record_batch_len) =
799- read_record_batch_range ( footer) ?;
800- let buffer = Buffer :: from ( & bytes[ record_batch_offset..record_batch_offset + record_batch_len] ) ;
792+ // Read the actual record batch
793+ let mut file_buffer = vec ! [ 0 ; record_batch_len] ;
794+ match reader. seek ( SeekFrom :: Start ( record_batch_definition. offset ( ) as u64 ) ) {
795+ Ok ( _) => { }
796+ Err ( e) => {
797+ return Err ( ArrowLayoutVerificationError :: IOError ( e) ) ;
798+ }
799+ }
800+ match reader. read_exact ( & mut file_buffer) {
801+ Ok ( _) => { }
802+ Err ( e) => {
803+ return Err ( ArrowLayoutVerificationError :: IOError ( e) ) ;
804+ }
805+ }
806+ let buffer = Buffer :: from ( file_buffer) ;
801807
802808 // This is borrowed from arrow-ipc parse_message.rs
803809 // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
0 commit comments