@@ -61,7 +61,7 @@ mod store;
61
61
pub use store:: * ;
62
62
63
63
use crate :: {
64
- arrow:: arrow_writer:: { ArrowColumnChunk , ArrowColumnWriter , ArrowWriterOptions } ,
64
+ arrow:: arrow_writer:: ArrowWriterOptions ,
65
65
arrow:: ArrowWriter ,
66
66
errors:: { ParquetError , Result } ,
67
67
file:: { metadata:: RowGroupMetaData , properties:: WriterProperties } ,
@@ -288,42 +288,17 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
288
288
289
289
Ok ( ( ) )
290
290
}
291
-
292
- /// Create a new row group writer and return its column writers.
293
- pub async fn get_column_writers ( & mut self ) -> Result < Vec < ArrowColumnWriter > > {
294
- let before = self . sync_writer . flushed_row_groups ( ) . len ( ) ;
295
- // TODO: should use the new API
296
- #[ allow( deprecated) ]
297
- let writers = self . sync_writer . get_column_writers ( ) ?;
298
- // let (serialized_file_writer, arrow_row_group_writer_factory) =
299
- // self.sync_writer.into_serialized_writer().unwrap();
300
- // let writers = row_group_writer_factory.create_column_writers(0).unwrap();
301
- // let metadata = serialized_file_writer.close().unwrap();
302
- if before != self . sync_writer . flushed_row_groups ( ) . len ( ) {
303
- self . do_write ( ) . await ?;
304
- }
305
- Ok ( writers)
306
- }
307
-
308
- /// Append the given column chunks to the file as a new row group.
309
- pub async fn append_row_group ( & mut self , chunks : Vec < ArrowColumnChunk > ) -> Result < ( ) > {
310
- // TODO: should use the new API
311
- #[ allow( deprecated) ]
312
- self . sync_writer . append_row_group ( chunks) ?;
313
- self . do_write ( ) . await
314
- }
315
291
}
316
292
317
293
#[ cfg( test) ]
318
294
mod tests {
295
+ use crate :: arrow:: arrow_reader:: { ParquetRecordBatchReader , ParquetRecordBatchReaderBuilder } ;
296
+ use crate :: arrow:: arrow_writer:: compute_leaves;
319
297
use arrow:: datatypes:: { DataType , Field , Schema } ;
320
298
use arrow_array:: { ArrayRef , BinaryArray , Int32Array , Int64Array , RecordBatchReader } ;
321
299
use bytes:: Bytes ;
322
300
use std:: sync:: Arc ;
323
301
324
- use crate :: arrow:: arrow_reader:: { ParquetRecordBatchReader , ParquetRecordBatchReaderBuilder } ;
325
- use crate :: arrow:: arrow_writer:: compute_leaves;
326
-
327
302
use super :: * ;
328
303
329
304
fn get_test_reader ( ) -> ParquetRecordBatchReader {
@@ -369,7 +344,13 @@ mod tests {
369
344
// Use classic API
370
345
writer. write ( & to_write_record) . await . unwrap ( ) ;
371
346
372
- let mut writers = writer. get_column_writers ( ) . await . unwrap ( ) ;
347
+ // Use low-level API to write an Arrow group
348
+ let arrow_writer = writer. sync_writer ;
349
+ let ( mut serialized_file_writer, row_group_writer_factory) =
350
+ arrow_writer. into_serialized_writer ( ) . unwrap ( ) ;
351
+
352
+ // Get column writers
353
+ let mut writers = row_group_writer_factory. create_column_writers ( 0 ) . unwrap ( ) ;
373
354
let col = Arc :: new ( Int64Array :: from_iter_values ( [ 1 , 2 , 3 ] ) ) as ArrayRef ;
374
355
let to_write_arrow_group = RecordBatch :: try_from_iter ( [ ( "col" , col) ] ) . unwrap ( ) ;
375
356
@@ -384,10 +365,13 @@ mod tests {
384
365
}
385
366
}
386
367
387
- let columns: Vec < _ > = writers. into_iter ( ) . map ( |w| w. close ( ) . unwrap ( ) ) . collect ( ) ;
368
+ let mut columns: Vec < _ > = writers. into_iter ( ) . map ( |w| w. close ( ) . unwrap ( ) ) . collect ( ) ;
388
369
// Append the arrow group as a new row group. Flush in progress
389
- writer. append_row_group ( columns) . await . unwrap ( ) ;
390
- writer. close ( ) . await . unwrap ( ) ;
370
+ let mut row_group_writer = serialized_file_writer. next_row_group ( ) . unwrap ( ) ;
371
+ let chunk = columns. remove ( 0 ) ;
372
+ chunk. append_to_row_group ( & mut row_group_writer) . unwrap ( ) ;
373
+ row_group_writer. close ( ) . unwrap ( ) ;
374
+ let _metadata = serialized_file_writer. close ( ) . unwrap ( ) ;
391
375
392
376
let buffer = Bytes :: from ( buffer) ;
393
377
let mut reader = ParquetRecordBatchReaderBuilder :: try_new ( buffer)
0 commit comments