Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,23 @@ using ParquetDataType = PhysicalType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;

Result<std::shared_ptr<Buffer>> WriteTableToBuffer(
const std::shared_ptr<Table>& table, int64_t row_group_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
auto sink = CreateOutputStream();
ARROW_RETURN_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), sink,
row_group_size, properties, arrow_properties));
return sink->Finish();
}

void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::shared_ptr<Buffer>* out) {
auto sink = CreateOutputStream();

auto write_props = WriterProperties::Builder().write_batch_size(100)->build();

ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
row_group_size, write_props, arrow_properties));
ASSERT_OK_AND_ASSIGN(*out, sink->Finish());
ASSERT_OK_AND_ASSIGN(
*out, WriteTableToBuffer(table, row_group_size, write_props, arrow_properties));
}

void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
Expand Down Expand Up @@ -3101,27 +3108,33 @@ TEST(ArrowReadWrite, DecimalStats) {
using ::arrow::Decimal128;
using ::arrow::field;

auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0);

const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
// Try various precisions to trigger encoding as different physical types:
// - precision 8 should use INT32
// - precision 18 should use INT64
// - precision 35 should use FIXED_LEN_BYTE_ARRAY
for (const int precision : {8, 18, 35}) {
auto type = ::arrow::decimal128(precision, /*scale=*/0);

std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
default_arrow_writer_properties(), &buffer));
const char* json =
R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});

ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));
auto props = WriterProperties::Builder().enable_store_decimal_as_integer()->build();
ASSERT_OK_AND_ASSIGN(auto buffer,
WriteTableToBuffer(table, /*row_group_size=*/100, props));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

std::shared_ptr<Scalar> min, max;
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
std::shared_ptr<Scalar> min, max;
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);

std::shared_ptr<Scalar> expected_min, expected_max;
ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
std::shared_ptr<Scalar> expected_min, expected_max;
ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
}
}

TEST(ArrowReadWrite, NestedNullableField) {
Expand Down
117 changes: 69 additions & 48 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ using arrow::Table;
using arrow::TimestampArray;

using ::arrow::bit_util::FromBigEndian;
using ::arrow::bit_util::ToBigEndian;
using ::arrow::internal::checked_cast;
using ::arrow::internal::checked_pointer_cast;
using ::arrow::internal::SafeLeftShift;
Expand All @@ -108,6 +109,62 @@ namespace {
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;

template <typename DecimalType>
Result<std::shared_ptr<::arrow::Scalar>> DecimalScalarFromBigEndianBytes(
std::string_view data, std::shared_ptr<DataType> arrow_type) {
ARROW_ASSIGN_OR_RAISE(
DecimalType decimal,
DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int32_t>(data.size())));
return ::arrow::MakeScalar(std::move(arrow_type), decimal);
}

// Extract Min and Max scalars from big-endian representation of Decimals.
Status ExtractDecimalMinMaxFromBytes(std::string_view min_bytes,
std::string_view max_bytes,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
const DecimalLogicalType& decimal_type =
checked_cast<const DecimalLogicalType&>(logical_type);

Result<std::shared_ptr<DataType>> maybe_type =
Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
std::shared_ptr<DataType> arrow_type;
if (maybe_type.ok()) {
arrow_type = maybe_type.ValueOrDie();
ARROW_ASSIGN_OR_RAISE(
*min, DecimalScalarFromBigEndianBytes<Decimal128>(min_bytes, arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal128>(
max_bytes, std::move(arrow_type)));
return Status::OK();
}
// Fallback to see if Decimal256 can represent the type.
ARROW_ASSIGN_OR_RAISE(
arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
ARROW_ASSIGN_OR_RAISE(
*min, DecimalScalarFromBigEndianBytes<Decimal256>(min_bytes, arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal256>(
max_bytes, std::move(arrow_type)));

return Status::OK();
}

template <typename Int>
Status ExtractDecimalMinMaxFromInteger(Int min_value, Int max_value,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
static_assert(std::is_integral_v<Int>);
const Int min_be = ToBigEndian(min_value);
const Int max_be = ToBigEndian(max_value);
const auto min_bytes =
std::string_view(reinterpret_cast<const char*>(&min_be), sizeof(min_be));
const auto max_bytes =
std::string_view(reinterpret_cast<const char*>(&max_be), sizeof(max_be));
return ExtractDecimalMinMaxFromBytes(min_bytes, max_bytes, logical_type, min, max);
}

template <typename CType, typename StatisticsType>
Status MakeMinMaxScalar(const StatisticsType& statistics,
std::shared_ptr<::arrow::Scalar>* min,
Expand Down Expand Up @@ -165,17 +222,19 @@ static Status FromInt32Statistics(const Int32Statistics& statistics,
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
break;
case LogicalType::Type::DATE:
case LogicalType::Type::TIME:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
break;
case LogicalType::Type::DECIMAL:
return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(),
logical_type, min, max);
default:
break;
}

return Status::NotImplemented("Cannot extract statistics for type ");
return Status::NotImplemented("Cannot extract statistics for INT32 with logical type ",
logical_type.ToString());
}

static Status FromInt64Statistics(const Int64Statistics& statistics,
Expand All @@ -188,66 +247,28 @@ static Status FromInt64Statistics(const Int64Statistics& statistics,
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
break;
case LogicalType::Type::TIME:
case LogicalType::Type::TIMESTAMP:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
break;
case LogicalType::Type::DECIMAL:
return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(),
logical_type, min, max);
default:
break;
}

return Status::NotImplemented("Cannot extract statistics for type ");
}

template <typename DecimalType>
Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
const std::string& data, std::shared_ptr<DataType> arrow_type) {
ARROW_ASSIGN_OR_RAISE(
DecimalType decimal,
DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int32_t>(data.size())));
return ::arrow::MakeScalar(std::move(arrow_type), decimal);
}

// Extracts Min and Max scalar from bytes like types (i.e. types where
// decimal is encoded as little endian.
Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
const DecimalLogicalType& decimal_type =
checked_cast<const DecimalLogicalType&>(logical_type);

Result<std::shared_ptr<DataType>> maybe_type =
Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
std::shared_ptr<DataType> arrow_type;
if (maybe_type.ok()) {
arrow_type = maybe_type.ValueOrDie();
ARROW_ASSIGN_OR_RAISE(
*min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(),
std::move(arrow_type)));
return Status::OK();
}
// Fallback to see if Decimal256 can represent the type.
ARROW_ASSIGN_OR_RAISE(
arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
ARROW_ASSIGN_OR_RAISE(
*min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(),
std::move(arrow_type)));

return Status::OK();
return Status::NotImplemented("Cannot extract statistics for INT64 with logical type ",
logical_type.ToString());
}

Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
auto logical_type = statistics.descr()->logical_type();
if (logical_type->type() == LogicalType::Type::DECIMAL) {
return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max);
return ExtractDecimalMinMaxFromBytes(statistics.EncodeMin(), statistics.EncodeMax(),
*logical_type, min, max);
}
std::shared_ptr<::arrow::DataType> type;
if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
Expand Down
Loading