diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index e081b428e24..cd69b2f9469 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -400,16 +400,23 @@ using ParquetDataType = PhysicalType::parquet_enum>; template using ParquetWriter = TypedColumnWriter>; +Result> WriteTableToBuffer( + const std::shared_ptr& table, int64_t row_group_size, + const std::shared_ptr& properties = default_writer_properties(), + const std::shared_ptr& arrow_properties = + default_arrow_writer_properties()) { + auto sink = CreateOutputStream(); + ARROW_RETURN_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), sink, + row_group_size, properties, arrow_properties)); + return sink->Finish(); +} + void WriteTableToBuffer(const std::shared_ptr
& table, int64_t row_group_size, const std::shared_ptr& arrow_properties, std::shared_ptr* out) { - auto sink = CreateOutputStream(); - auto write_props = WriterProperties::Builder().write_batch_size(100)->build(); - - ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, - row_group_size, write_props, arrow_properties)); - ASSERT_OK_AND_ASSIGN(*out, sink->Finish()); + ASSERT_OK_AND_ASSIGN( + *out, WriteTableToBuffer(table, row_group_size, write_props, arrow_properties)); } void DoRoundtrip(const std::shared_ptr
& table, int64_t row_group_size, @@ -3101,27 +3108,33 @@ TEST(ArrowReadWrite, DecimalStats) { using ::arrow::Decimal128; using ::arrow::field; - auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0); - - const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])"; - auto array = ::arrow::ArrayFromJSON(type, json); - auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + // Try various precisions to trigger encoding as different physical types: + // - precision 8 should use INT32 + // - precision 18 should use INT64 + // - precision 35 should use FIXED_LEN_BYTE_ARRAY + for (const int precision : {8, 18, 35}) { + auto type = ::arrow::decimal128(precision, /*scale=*/0); - std::shared_ptr buffer; - ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100, - default_arrow_writer_properties(), &buffer)); + const char* json = + R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); - ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool())); + auto props = WriterProperties::Builder().enable_store_decimal_as_integer()->build(); + ASSERT_OK_AND_ASSIGN(auto buffer, + WriteTableToBuffer(table, /*row_group_size=*/100, props)); + ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared(buffer), + ::arrow::default_memory_pool())); - std::shared_ptr min, max; - ReadSingleColumnFileStatistics(std::move(reader), &min, &max); + std::shared_ptr min, max; + ReadSingleColumnFileStatistics(std::move(reader), &min, &max); - std::shared_ptr expected_min, expected_max; - ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1)); - ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0)); - ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true); - ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true); + std::shared_ptr expected_min, expected_max; + ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1)); + ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0)); + ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true); + ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true); + } } TEST(ArrowReadWrite, NestedNullableField) { diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index b622e93e072..12f36fe39cf 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -86,6 +86,7 @@ using arrow::Table; using arrow::TimestampArray; using ::arrow::bit_util::FromBigEndian; +using ::arrow::bit_util::ToBigEndian; using ::arrow::internal::checked_cast; using ::arrow::internal::checked_pointer_cast; using ::arrow::internal::SafeLeftShift; @@ -108,6 +109,62 @@ namespace { template using ArrayType = typename ::arrow::TypeTraits::ArrayType; +template +Result> DecimalScalarFromBigEndianBytes( + std::string_view data, std::shared_ptr arrow_type) { + ARROW_ASSIGN_OR_RAISE( + DecimalType decimal, + DecimalType::FromBigEndian(reinterpret_cast(data.data()), + static_cast(data.size()))); + return ::arrow::MakeScalar(std::move(arrow_type), decimal); +} + +// Extract Min and Max scalars from big-endian representation of Decimals. +Status ExtractDecimalMinMaxFromBytes(std::string_view min_bytes, + std::string_view max_bytes, + const LogicalType& logical_type, + std::shared_ptr<::arrow::Scalar>* min, + std::shared_ptr<::arrow::Scalar>* max) { + const DecimalLogicalType& decimal_type = + checked_cast(logical_type); + + Result> maybe_type = + Decimal128Type::Make(decimal_type.precision(), decimal_type.scale()); + std::shared_ptr arrow_type; + if (maybe_type.ok()) { + arrow_type = maybe_type.ValueOrDie(); + ARROW_ASSIGN_OR_RAISE( + *min, DecimalScalarFromBigEndianBytes(min_bytes, arrow_type)); + ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes( + max_bytes, std::move(arrow_type))); + return Status::OK(); + } + // Fallback to see if Decimal256 can represent the type. + ARROW_ASSIGN_OR_RAISE( + arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale())); + ARROW_ASSIGN_OR_RAISE( + *min, DecimalScalarFromBigEndianBytes(min_bytes, arrow_type)); + ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes( + max_bytes, std::move(arrow_type))); + + return Status::OK(); +} + +template +Status ExtractDecimalMinMaxFromInteger(Int min_value, Int max_value, + const LogicalType& logical_type, + std::shared_ptr<::arrow::Scalar>* min, + std::shared_ptr<::arrow::Scalar>* max) { + static_assert(std::is_integral_v); + const Int min_be = ToBigEndian(min_value); + const Int max_be = ToBigEndian(max_value); + const auto min_bytes = + std::string_view(reinterpret_cast(&min_be), sizeof(min_be)); + const auto max_bytes = + std::string_view(reinterpret_cast(&max_be), sizeof(max_be)); + return ExtractDecimalMinMaxFromBytes(min_bytes, max_bytes, logical_type, min, max); +} + template Status MakeMinMaxScalar(const StatisticsType& statistics, std::shared_ptr<::arrow::Scalar>* min, @@ -165,17 +222,19 @@ static Status FromInt32Statistics(const Int32Statistics& statistics, switch (logical_type.type()) { case LogicalType::Type::INT: return MakeMinMaxIntegralScalar(statistics, *type, min, max); - break; case LogicalType::Type::DATE: case LogicalType::Type::TIME: case LogicalType::Type::NONE: return MakeMinMaxTypedScalar(statistics, type, min, max); - break; + case LogicalType::Type::DECIMAL: + return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(), + logical_type, min, max); default: break; } - return Status::NotImplemented("Cannot extract statistics for type "); + return Status::NotImplemented("Cannot extract statistics for INT32 with logical type ", + logical_type.ToString()); } static Status FromInt64Statistics(const Int64Statistics& statistics, @@ -188,58 +247,19 @@ static Status FromInt64Statistics(const Int64Statistics& statistics, switch (logical_type.type()) { case LogicalType::Type::INT: return MakeMinMaxIntegralScalar(statistics, *type, min, max); - break; case LogicalType::Type::TIME: case LogicalType::Type::TIMESTAMP: case LogicalType::Type::NONE: return MakeMinMaxTypedScalar(statistics, type, min, max); - break; + case LogicalType::Type::DECIMAL: + return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(), + logical_type, min, max); default: break; } - return Status::NotImplemented("Cannot extract statistics for type "); -} - -template -Result> FromBigEndianString( - const std::string& data, std::shared_ptr arrow_type) { - ARROW_ASSIGN_OR_RAISE( - DecimalType decimal, - DecimalType::FromBigEndian(reinterpret_cast(data.data()), - static_cast(data.size()))); - return ::arrow::MakeScalar(std::move(arrow_type), decimal); -} - -// Extracts Min and Max scalar from bytes like types (i.e. types where -// decimal is encoded as little endian. -Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics, - const LogicalType& logical_type, - std::shared_ptr<::arrow::Scalar>* min, - std::shared_ptr<::arrow::Scalar>* max) { - const DecimalLogicalType& decimal_type = - checked_cast(logical_type); - - Result> maybe_type = - Decimal128Type::Make(decimal_type.precision(), decimal_type.scale()); - std::shared_ptr arrow_type; - if (maybe_type.ok()) { - arrow_type = maybe_type.ValueOrDie(); - ARROW_ASSIGN_OR_RAISE( - *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); - ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), - std::move(arrow_type))); - return Status::OK(); - } - // Fallback to see if Decimal256 can represent the type. - ARROW_ASSIGN_OR_RAISE( - arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale())); - ARROW_ASSIGN_OR_RAISE( - *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); - ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), - std::move(arrow_type))); - - return Status::OK(); + return Status::NotImplemented("Cannot extract statistics for INT64 with logical type ", + logical_type.ToString()); } Status ByteArrayStatisticsAsScalars(const Statistics& statistics, @@ -247,7 +267,8 @@ Status ByteArrayStatisticsAsScalars(const Statistics& statistics, std::shared_ptr<::arrow::Scalar>* max) { auto logical_type = statistics.descr()->logical_type(); if (logical_type->type() == LogicalType::Type::DECIMAL) { - return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max); + return ExtractDecimalMinMaxFromBytes(statistics.EncodeMin(), statistics.EncodeMax(), + *logical_type, min, max); } std::shared_ptr<::arrow::DataType> type; if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {