Skip to content

Commit a14f77c

Browse files
mapleFUalamb
andauthored
Fix: ViewType gc on huge batch would produce bad output (#8694)
# Which issue does this PR close? - Closes #8681. # Rationale for this change Previously, `gc()` will produce a single buffer. However, for buffer size greater than 2GiB, it would be buggy, since buffer-offset it's a 4-byte signed integer. # What changes are included in this PR? Add a GcCopyGroup type, and do gc for it. # Are these changes tested? Yes # Are there any user-facing changes? gc would produce more buffers --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4d6b93a commit a14f77c

File tree

1 file changed

+134
-12
lines changed

1 file changed

+134
-12
lines changed

arrow-array/src/array/byte_view_array.rs

Lines changed: 134 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -512,18 +512,85 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
512512
};
513513
}
514514

515-
// 3) Allocate exactly capacity for all non-inline data
516-
let mut data_buf = Vec::with_capacity(total_large);
515+
let (views_buf, data_blocks) = if total_large < i32::MAX as usize {
516+
// fast path, the entire data fits in a single buffer
517+
// 3) Allocate exactly capacity for all non-inline data
518+
let mut data_buf = Vec::with_capacity(total_large);
519+
520+
// 4) Iterate over views and process each inline/non-inline view
521+
let views_buf: Vec<u128> = (0..len)
522+
.map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut data_buf) })
523+
.collect();
524+
let data_block = Buffer::from_vec(data_buf);
525+
let data_blocks = vec![data_block];
526+
(views_buf, data_blocks)
527+
} else {
528+
// slow path, need to split into multiple buffers
529+
530+
struct GcCopyGroup {
531+
total_buffer_bytes: usize,
532+
total_len: usize,
533+
}
534+
535+
impl GcCopyGroup {
536+
fn new(total_buffer_bytes: u32, total_len: usize) -> Self {
537+
Self {
538+
total_buffer_bytes: total_buffer_bytes as usize,
539+
total_len,
540+
}
541+
}
542+
}
517543

518-
// 4) Iterate over views and process each inline/non-inline view
519-
let views_buf: Vec<u128> = (0..len)
520-
.map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) })
521-
.collect();
544+
let mut groups = Vec::new();
545+
let mut current_length = 0;
546+
let mut current_elements = 0;
547+
548+
for view in self.views() {
549+
let len = *view as u32;
550+
if len > MAX_INLINE_VIEW_LEN {
551+
if current_length + len > i32::MAX as u32 {
552+
// Start a new group
553+
groups.push(GcCopyGroup::new(current_length, current_elements));
554+
current_length = 0;
555+
current_elements = 0;
556+
}
557+
current_length += len;
558+
current_elements += 1;
559+
}
560+
}
561+
if current_elements != 0 {
562+
groups.push(GcCopyGroup::new(current_length, current_elements));
563+
}
564+
debug_assert!(groups.len() <= i32::MAX as usize);
565+
566+
// 3) Copy the buffers group by group
567+
let mut views_buf = Vec::with_capacity(len);
568+
let mut data_blocks = Vec::with_capacity(groups.len());
569+
570+
let mut current_view_idx = 0;
571+
572+
for (group_idx, gc_copy_group) in groups.iter().enumerate() {
573+
let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes);
574+
575+
// Directly push views to avoid intermediate Vec allocation
576+
let new_views = (current_view_idx..current_view_idx + gc_copy_group.total_len).map(
577+
|view_idx| {
578+
// safety: the view index came from iterating over valid range
579+
unsafe {
580+
self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf)
581+
}
582+
},
583+
);
584+
views_buf.extend(new_views);
585+
586+
data_blocks.push(Buffer::from_vec(data_buf));
587+
current_view_idx += gc_copy_group.total_len;
588+
}
589+
(views_buf, data_blocks)
590+
};
522591

523-
// 5) Wrap up buffers
524-
let data_block = Buffer::from_vec(data_buf);
592+
// 5) Wrap up views buffer
525593
let views_scalar = ScalarBuffer::from(views_buf);
526-
let data_blocks = vec![data_block];
527594

528595
// SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized
529596
unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) }
@@ -538,10 +605,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
538605
/// inside one of `self.buffers`.
539606
/// - `data_buf` must be ready to have additional bytes appended.
540607
/// - After this call, the returned view will have its
541-
/// `buffer_index` reset to `0` and its `offset` updated so that it points
608+
/// `buffer_index` reset to `buffer_idx` and its `offset` updated so that it points
542609
/// into the bytes just appended at the end of `data_buf`.
543610
#[inline(always)]
544-
unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> u128 {
611+
unsafe fn copy_view_to_buffer(
612+
&self,
613+
i: usize,
614+
buffer_idx: i32,
615+
data_buf: &mut Vec<u8>,
616+
) -> u128 {
545617
// SAFETY: `i < self.len()` ensures this is in‑bounds.
546618
let raw_view = unsafe { *self.views().get_unchecked(i) };
547619
let mut bv = ByteView::from(raw_view);
@@ -561,7 +633,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
561633
let new_offset = data_buf.len() as u32;
562634
data_buf.extend_from_slice(slice);
563635

564-
bv.buffer_index = 0;
636+
bv.buffer_index = buffer_idx as u32;
565637
bv.offset = new_offset;
566638
bv.into()
567639
}
@@ -1404,6 +1476,56 @@ mod tests {
14041476
}
14051477
}
14061478

1479+
#[test]
1480+
#[cfg_attr(miri, ignore)] // Takes too long
1481+
fn test_gc_huge_array() {
1482+
// Construct multiple 128 MiB BinaryView entries so total > 4 GiB
1483+
let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view
1484+
let num_views: usize = 36;
1485+
1486+
// Create a single 128 MiB data block with a simple byte pattern
1487+
let buffer = Buffer::from_vec(vec![0xAB; block_len]);
1488+
let buffer2 = Buffer::from_vec(vec![0xFF; block_len]);
1489+
1490+
// Append this block and then add many views pointing to it
1491+
let mut builder = BinaryViewBuilder::new();
1492+
let block_id = builder.append_block(buffer);
1493+
for _ in 0..num_views / 2 {
1494+
builder
1495+
.try_append_view(block_id, 0, block_len as u32)
1496+
.expect("append view into 128MiB block");
1497+
}
1498+
let block_id2 = builder.append_block(buffer2);
1499+
for _ in 0..num_views / 2 {
1500+
builder
1501+
.try_append_view(block_id2, 0, block_len as u32)
1502+
.expect("append view into 128MiB block");
1503+
}
1504+
1505+
let array = builder.finish();
1506+
let total = array.total_buffer_bytes_used();
1507+
assert!(
1508+
total > u32::MAX as usize,
1509+
"Expected total non-inline bytes to exceed 4 GiB, got {}",
1510+
total
1511+
);
1512+
1513+
// Run gc and verify correctness
1514+
let gced = array.gc();
1515+
assert_eq!(gced.len(), num_views, "Length mismatch after gc");
1516+
assert_eq!(gced.null_count(), 0, "Null count mismatch after gc");
1517+
assert_ne!(
1518+
gced.data_buffers().len(),
1519+
1,
1520+
"gc with huge buffer should not consolidate data into a single buffer"
1521+
);
1522+
1523+
// Element-wise equality check across the entire array
1524+
array.iter().zip(gced.iter()).for_each(|(orig, got)| {
1525+
assert_eq!(orig, got, "Value mismatch after gc on huge array");
1526+
});
1527+
}
1528+
14071529
#[test]
14081530
fn test_eq() {
14091531
let test_data = [

0 commit comments

Comments
 (0)