@@ -30,6 +30,9 @@ namespace facebook::velox::connector::hive::iceberg {
3030
3131namespace {
3232
33+ constexpr std::string_view kNotClusteredRowsErrorMsg =
34+ " Incoming records violate the writer assumption that records are clustered by spec and \n by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n Encountered records that belong to already closed files:\n " ;
35+
3336#define WRITER_NON_RECLAIMABLE_SECTION_GUARD (index ) \
3437 memory::NonReclaimableSectionGuard nonReclaimableGuard ( \
3538 writerInfo_[(index)]->nonReclaimableSectionHolder.get())
@@ -213,7 +216,10 @@ IcebergDataSink::IcebergDataSink(
213216 insertTableHandle->columnTransforms(),
214217 hiveConfig->isPartitionPathAsLowerCase(
215218 connectorQueryCtx->sessionProperties ()))
216- : nullptr) {
219+ : nullptr),
220+ fanoutEnabled_(
221+ hiveConfig_->fanoutEnabled (connectorQueryCtx_->sessionProperties ())),
222+ currentWriterId_(0 ) {
217223 if (isPartitioned ()) {
218224 partitionData_.resize (maxOpenWriters_);
219225 }
@@ -332,8 +338,6 @@ std::vector<std::string> IcebergDataSink::commitMessage() const {
332338}
333339
334340void IcebergDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr input) {
335- VELOX_CHECK (isPartitioned ());
336-
337341 std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
338342
339343 const auto numRows = partitionIds_.size ();
@@ -346,26 +350,7 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
346350 if (!partitionData_[index].empty ()) {
347351 continue ;
348352 }
349-
350- std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
351- auto icebergPartitionIdGenerator =
352- dynamic_cast <const IcebergPartitionIdGenerator*>(
353- partitionIdGenerator_.get ());
354- VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
355- const RowVectorPtr transformedValues =
356- icebergPartitionIdGenerator->partitionValues ();
357- for (auto i = 0 ; i < partitionChannels_.size (); ++i) {
358- auto block = transformedValues->childAt (i);
359- if (block->isNullAt (index)) {
360- partitionValues[i] = nullptr ;
361- } else {
362- DecodedVector decoded (*block);
363- partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
364- extractPartitionValue, block->typeKind (), &decoded, index);
365- }
366- }
367-
368- partitionData_[index] = partitionValues;
353+ buildPartitionData (index);
369354 }
370355
371356 for (auto i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -376,6 +361,11 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
376361 }
377362}
378363
364+ void IcebergDataSink::computePartition (const RowVectorPtr& input) {
365+ VELOX_CHECK (isPartitioned ());
366+ partitionIdGenerator_->run (input, partitionIds_);
367+ }
368+
379369void IcebergDataSink::appendData (RowVectorPtr input) {
380370 checkRunning ();
381371 if (!isPartitioned ()) {
@@ -384,22 +374,79 @@ void IcebergDataSink::appendData(RowVectorPtr input) {
384374 return ;
385375 }
386376
387- // Compute partition and bucket numbers.
388- computePartitionAndBucketIds (input);
377+ computePartition (input);
389378
390- splitInputRowsAndEnsureWriters (input);
379+ if (fanoutEnabled_) {
380+ splitInputRowsAndEnsureWriters (input);
391381
392- for (auto index = 0 ; index < writers_.size (); ++index) {
393- const vector_size_t partitionSize = partitionSizes_[index];
394- if (partitionSize == 0 ) {
395- continue ;
382+ for (auto index = 0 ; index < writers_.size (); ++index) {
383+ const vector_size_t partitionSize = partitionSizes_[index];
384+ if (partitionSize == 0 ) {
385+ continue ;
386+ }
387+
388+ const RowVectorPtr writerInput = partitionSize == input->size ()
389+ ? input
390+ : exec::wrap (partitionSize, partitionRows_[index], input);
391+ write (index, writerInput);
392+ }
393+ } else { // Clustered mode.
394+ std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
395+ const auto numRows = input->size ();
396+ uint32_t index = 0 ;
397+ for (auto row = 0 ; row < numRows; ++row) {
398+ auto id = getIcebergWriterId (row);
399+ index = ensureWriter (id);
400+ if (currentWriterId_ != index) {
401+ clusteredWrite (input, currentWriterId_);
402+ closeWriter (currentWriterId_);
403+ completedWriterIds_.insert (currentWriterId_);
404+ VELOX_USER_CHECK_EQ (
405+ completedWriterIds_.count (index),
406+ 0 ,
407+ " {}" ,
408+ kNotClusteredRowsErrorMsg );
409+ currentWriterId_ = index;
410+ }
411+ updatePartitionRows (index, numRows, row);
412+ buildPartitionData (index);
396413 }
414+ clusteredWrite (input, index);
415+ }
416+ }
397417
398- const RowVectorPtr writerInput = partitionSize == input->size ()
399- ? input
400- : exec::wrap (partitionSize, partitionRows_[index], input);
401- write (index, writerInput);
418+ void IcebergDataSink::buildPartitionData (int32_t index) {
419+ std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
420+ auto icebergPartitionIdGenerator =
421+ dynamic_cast <const IcebergPartitionIdGenerator*>(
422+ partitionIdGenerator_.get ());
423+ VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
424+ const RowVectorPtr transformedValues =
425+ icebergPartitionIdGenerator->partitionValues ();
426+ for (auto i = 0 ; i < partitionChannels_.size (); ++i) {
427+ auto block = transformedValues->childAt (i);
428+ if (block->isNullAt (index)) {
429+ partitionValues[i] = nullptr ;
430+ } else {
431+ DecodedVector decoded (*block);
432+ partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
433+ extractPartitionValue, block->typeKind (), &decoded, index);
434+ }
402435 }
436+ partitionData_[index] = partitionValues;
437+ }
438+
439+ void IcebergDataSink::clusteredWrite (RowVectorPtr input, int32_t writerIdx) {
440+ if (partitionSizes_[writerIdx] != 0 ) {
441+ VELOX_CHECK_NOT_NULL (partitionRows_[writerIdx]);
442+ partitionRows_[writerIdx]->setSize (
443+ partitionSizes_[writerIdx] * sizeof (vector_size_t ));
444+ }
445+ const vector_size_t partitionSize = partitionSizes_[writerIdx];
446+ const RowVectorPtr writerInput = partitionSize == input->size ()
447+ ? input
448+ : exec::wrap (partitionSize, partitionRows_[writerIdx], input);
449+ write (writerIdx, writerInput);
403450}
404451
405452HiveWriterId IcebergDataSink::getIcebergWriterId (size_t row) const {
@@ -470,9 +517,11 @@ void IcebergDataSink::closeInternal() {
470517
471518 if (state_ == State::kClosed ) {
472519 for (int i = 0 ; i < writers_.size (); ++i) {
473- WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
474- writers_[i]->close ();
475- dataFileStats_.push_back (writers_[i]->dataFileStats ());
520+ if (writers_[i]) {
521+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
522+ writers_[i]->close ();
523+ dataFileStats_.push_back (writers_[i]->dataFileStats ());
524+ }
476525 }
477526 } else {
478527 for (int i = 0 ; i < writers_.size (); ++i) {
@@ -482,6 +531,63 @@ void IcebergDataSink::closeInternal() {
482531 }
483532}
484533
534+ void IcebergDataSink::closeWriter (int32_t index) {
535+ common::testutil::TestValue::adjust (
536+ " facebook::velox::connector::hive::iceberg::IcebergDataSink::closeWriter" ,
537+ this );
538+
539+ if (writers_[index]) {
540+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (index);
541+ if (sortWrite ()) {
542+ finishWriter (index);
543+ }
544+ writers_[index]->close ();
545+ dataFileStats_.push_back (writers_[index]->dataFileStats ());
546+ writers_[index] = nullptr ;
547+ }
548+ }
549+
550+ bool IcebergDataSink::finishWriter (int32_t index) {
551+ if (!sortWrite ()) {
552+ return true ;
553+ }
554+
555+ if (writers_[index]) {
556+ const uint64_t startTimeMs = getCurrentTimeMs ();
557+ if (!writers_[index]->finish ()) {
558+ return false ;
559+ }
560+ if (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
561+ return false ;
562+ }
563+ }
564+ return true ;
565+ }
566+
567+ bool IcebergDataSink::finish () {
568+ // Flush is reentry state.
569+ setState (State::kFinishing );
570+
571+ // As for now, only sorted writer needs flush buffered data. For non-sorted
572+ // writer, data is directly written to the underlying file writer.
573+ if (!sortWrite ()) {
574+ return true ;
575+ }
576+
577+ // TODO: we might refactor to move the data sorting logic into hive data sink.
578+ const uint64_t startTimeMs = getCurrentTimeMs ();
579+ for (auto i = 0 ; i < writers_.size (); ++i) {
580+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
581+ if (writers_[i] && !writers_[i]->finish ()) {
582+ return false ;
583+ }
584+ if (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
585+ return false ;
586+ }
587+ }
588+ return true ;
589+ }
590+
485591std::unique_ptr<facebook::velox::dwio::common::Writer>
486592IcebergDataSink::maybeCreateBucketSortWriter (
487593 std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
0 commit comments