Skip to content

Commit c20ed29

Browse files
PingLiuPingmeta-codesync[bot]
authored andcommitted
feat: Add basic support for writing to an Iceberg table (facebookincubator#14723)
Summary: As per review request from facebookincubator#10996, we should split facebookincubator#10996 to multiple smaller PRs. This is the first PR of them. It adds support for basic insertion without partition transforms or data file statistics. The implementation supports both primitive data types and nested types, including struct, map, and array. A series of follow-up PRs will extend this work to make Iceberg write fully compatible with the Iceberg specification. 1. Iceberg partition spec (new files). 2. Add new option for Timestamp::tmToStringView to meet Iceberg spec (completely standalone code). 3. Customise iceberg writer options. 4. Iceberg file name generator. 5. Iceberg partition ID generator (new files). 6. Identity partition transform. Pull Request resolved: facebookincubator#14723 Reviewed By: xiaoxmeng Differential Revision: D83667667 Pulled By: kgpai fbshipit-source-id: a3df1aeac432d6dde6610b9a7057e170a706de9e
1 parent 561d175 commit c20ed29

File tree

11 files changed

+584
-2
lines changed

11 files changed

+584
-2
lines changed

velox/connectors/hive/HiveConnector.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "velox/connectors/hive/HiveDataSink.h"
2121
#include "velox/connectors/hive/HiveDataSource.h"
2222
#include "velox/connectors/hive/HivePartitionFunction.h"
23+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
2324

2425
#include <boost/lexical_cast.hpp>
2526
#include <memory>
@@ -73,6 +74,16 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
7374
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
7475
ConnectorQueryCtx* connectorQueryCtx,
7576
CommitStrategy commitStrategy) {
77+
if (auto icebergInsertHandle =
78+
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
79+
connectorInsertTableHandle)) {
80+
return std::make_unique<iceberg::IcebergDataSink>(
81+
inputType,
82+
icebergInsertHandle,
83+
connectorQueryCtx,
84+
commitStrategy,
85+
hiveConfig_);
86+
}
7687
auto hiveInsertHandle =
7788
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
7889
connectorInsertTableHandle);

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,10 @@ bool HiveDataSink::finish() {
682682
std::vector<std::string> HiveDataSink::close() {
683683
setState(State::kClosed);
684684
closeInternal();
685+
return commitMessage();
686+
}
685687

688+
std::vector<std::string> HiveDataSink::commitMessage() const {
686689
std::vector<std::string> partitionUpdates;
687690
partitionUpdates.reserve(writerInfo_.size());
688691
for (int i = 0; i < writerInfo_.size(); ++i) {

velox/connectors/hive/HiveDataSink.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,11 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
337337

338338
std::string toString() const override;
339339

340-
private:
340+
protected:
341341
const std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns_;
342342
const std::shared_ptr<const LocationHandle> locationHandle_;
343+
344+
private:
343345
const dwio::common::FileFormat storageFormat_;
344346
const std::shared_ptr<const HiveBucketProperty> bucketProperty_;
345347
const std::optional<common::CompressionKind> compressionKind_;
@@ -544,11 +546,19 @@ class HiveDataSink : public DataSink {
544546

545547
bool canReclaim() const;
546548

547-
private:
549+
protected:
548550
// Validates the state transition from 'oldState' to 'newState'.
549551
void checkStateTransition(State oldState, State newState);
550552
void setState(State newState);
551553

554+
// Generates commit messages for all writers containing metadata about written
555+
// files. Creates a JSON object for each writer with partition name,
556+
// file paths, file names, data sizes, and row counts. This metadata is used
557+
// by the coordinator to commit the transaction and update the metastore.
558+
//
559+
// @return Vector of JSON strings, one per writer.
560+
virtual std::vector<std::string> commitMessage() const;
561+
552562
class WriterReclaimer : public exec::MemoryReclaimer {
553563
public:
554564
static std::unique_ptr<memory::MemoryReclaimer> create(

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
velox_add_library(
1616
velox_hive_iceberg_splitreader
17+
IcebergDataSink.cpp
1718
IcebergSplitReader.cpp
1819
IcebergSplit.cpp
1920
PositionalDeleteFileReader.cpp
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
18+
#include "velox/common/base/Fs.h"
19+
20+
namespace facebook::velox::connector::hive::iceberg {
21+
22+
IcebergInsertTableHandle::IcebergInsertTableHandle(
23+
std::vector<HiveColumnHandlePtr> inputColumns,
24+
LocationHandlePtr locationHandle,
25+
dwio::common::FileFormat tableStorageFormat,
26+
std::optional<common::CompressionKind> compressionKind,
27+
const std::unordered_map<std::string, std::string>& serdeParameters)
28+
: HiveInsertTableHandle(
29+
std::move(inputColumns),
30+
std::move(locationHandle),
31+
tableStorageFormat,
32+
nullptr,
33+
compressionKind,
34+
serdeParameters,
35+
nullptr,
36+
false,
37+
std::make_shared<const HiveInsertFileNameGenerator>()) {
38+
VELOX_USER_CHECK(
39+
!inputColumns_.empty(),
40+
"Input columns cannot be empty for Iceberg tables.");
41+
VELOX_USER_CHECK_NOT_NULL(
42+
locationHandle_, "Location handle is required for Iceberg tables.");
43+
}
44+
45+
IcebergDataSink::IcebergDataSink(
46+
RowTypePtr inputType,
47+
IcebergInsertTableHandlePtr insertTableHandle,
48+
const ConnectorQueryCtx* connectorQueryCtx,
49+
CommitStrategy commitStrategy,
50+
const std::shared_ptr<const HiveConfig>& hiveConfig)
51+
: HiveDataSink(
52+
std::move(inputType),
53+
insertTableHandle,
54+
connectorQueryCtx,
55+
commitStrategy,
56+
hiveConfig,
57+
0,
58+
nullptr) {}
59+
60+
std::vector<std::string> IcebergDataSink::commitMessage() const {
61+
std::vector<std::string> commitTasks;
62+
commitTasks.reserve(writerInfo_.size());
63+
64+
for (auto i = 0; i < writerInfo_.size(); ++i) {
65+
const auto& info = writerInfo_.at(i);
66+
VELOX_CHECK_NOT_NULL(info);
67+
// Following metadata (json format) is consumed by Presto CommitTaskData.
68+
// It contains the minimal subset of metadata.
69+
// TODO: Complete metrics is missing now and this could lead to suboptimal
70+
// query plan, will collect full iceberg metrics in following PR.
71+
// clang-format off
72+
folly::dynamic commitData = folly::dynamic::object(
73+
"path", (fs::path(info->writerParameters.writeDirectory()) /
74+
info->writerParameters.writeFileName()).string())
75+
("fileSizeInBytes", ioStats_.at(i)->rawBytesWritten())
76+
("metrics",
77+
folly::dynamic::object("recordCount", info->numWrittenRows))
78+
("partitionSpecJson", 0)
79+
("fileFormat", "PARQUET")
80+
("content", "DATA");
81+
// clang-format on
82+
auto commitDataJson = folly::toJson(commitData);
83+
commitTasks.push_back(commitDataJson);
84+
}
85+
return commitTasks;
86+
}
87+
88+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/hive/HiveDataSink.h"
20+
21+
namespace facebook::velox::connector::hive::iceberg {
22+
23+
/// Represents a request for Iceberg write.
24+
class IcebergInsertTableHandle final : public HiveInsertTableHandle {
25+
public:
26+
/// @param inputColumns Columns from the table schema to write.
27+
/// The input RowVector must have the same number of columns and matching
28+
/// types in the same order.
29+
/// Column names in the RowVector may differ from those in inputColumns,
30+
/// only position and type must align. All columns present in the input
31+
/// data must be included, mismatches can lead to write failure.
32+
/// @param locationHandle Contains the target location information including:
33+
/// - Base directory path where data files will be written.
34+
/// - File naming scheme and temporary directory paths.
35+
/// @param compressionKind Optional compression to apply to data files.
36+
/// @param serdeParameters Additional serialization/deserialization parameters
37+
/// for the file format.
38+
IcebergInsertTableHandle(
39+
std::vector<HiveColumnHandlePtr> inputColumns,
40+
LocationHandlePtr locationHandle,
41+
dwio::common::FileFormat tableStorageFormat,
42+
std::optional<common::CompressionKind> compressionKind = {},
43+
const std::unordered_map<std::string, std::string>& serdeParameters = {});
44+
};
45+
46+
using IcebergInsertTableHandlePtr =
47+
std::shared_ptr<const IcebergInsertTableHandle>;
48+
49+
class IcebergDataSink : public HiveDataSink {
50+
public:
51+
IcebergDataSink(
52+
RowTypePtr inputType,
53+
IcebergInsertTableHandlePtr insertTableHandle,
54+
const ConnectorQueryCtx* connectorQueryCtx,
55+
CommitStrategy commitStrategy,
56+
const std::shared_ptr<const HiveConfig>& hiveConfig);
57+
58+
/// Generates Iceberg-specific commit messages for all writers containing
59+
/// metadata about written files. Creates a JSON object for each writer
60+
/// in the format expected by Presto and Spark for Iceberg tables.
61+
///
62+
/// Each commit message contains:
63+
/// - path: full file path where data was written.
64+
/// - fileSizeInBytes: raw bytes written to disk.
65+
/// - metrics: object with recordCount (number of rows written).
66+
/// - partitionSpecJson: partition specification.
67+
/// - fileFormat: storage format (e.g., "PARQUET").
68+
/// - content: file content type ("DATA" for data files).
69+
///
70+
/// See
71+
/// https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java
72+
///
73+
/// Note: Complete Iceberg metrics are not yet implemented, which results in
74+
/// incomplete manifest files that may lead to suboptimal query planning.
75+
///
76+
/// @return Vector of JSON strings, one per writer, formatted according to
77+
/// Presto and Spark Iceberg commit protocol.
78+
std::vector<std::string> commitMessage() const override;
79+
};
80+
81+
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/tests/CMakeLists.txt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,29 @@ if(NOT VELOX_DISABLE_GOOGLETEST)
5656
GTest::gtest
5757
GTest::gtest_main
5858
)
59+
60+
add_executable(velox_hive_iceberg_insert_test IcebergInsertTest.cpp IcebergTestBase.cpp Main.cpp)
61+
62+
add_test(velox_hive_iceberg_insert_test velox_hive_iceberg_insert_test)
63+
64+
target_link_libraries(
65+
velox_hive_iceberg_insert_test
66+
velox_exec_test_lib
67+
velox_hive_connector
68+
velox_hive_iceberg_splitreader
69+
velox_vector_fuzzer
70+
GTest::gtest
71+
)
72+
5973
if(VELOX_ENABLE_PARQUET)
6074
target_link_libraries(velox_hive_iceberg_test velox_dwio_parquet_reader)
6175

6276
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/examples DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
77+
78+
target_link_libraries(
79+
velox_hive_iceberg_insert_test
80+
velox_dwio_parquet_reader
81+
velox_dwio_parquet_writer
82+
)
6383
endif()
6484
endif()
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h"
18+
#include "velox/exec/tests/utils/PlanBuilder.h"
19+
20+
namespace facebook::velox::connector::hive::iceberg {
21+
namespace {
22+
23+
class IcebergInsertTest : public test::IcebergTestBase {
24+
protected:
25+
void test(const RowTypePtr& rowType, double nullRatio = 0.0) {
26+
const auto outputDirectory = exec::test::TempDirectoryPath::create();
27+
const auto dataPath = fmt::format("{}", outputDirectory->getPath());
28+
constexpr int32_t numBatches = 10;
29+
constexpr int32_t vectorSize = 5'000;
30+
const auto vectors =
31+
createTestData(rowType, numBatches, vectorSize, nullRatio);
32+
auto dataSink =
33+
createIcebergDataSink(rowType, outputDirectory->getPath(), {});
34+
35+
for (const auto& vector : vectors) {
36+
dataSink->appendData(vector);
37+
}
38+
39+
ASSERT_TRUE(dataSink->finish());
40+
const auto commitTasks = dataSink->close();
41+
createDuckDbTable(vectors);
42+
auto splits = createSplitsForDirectory(dataPath);
43+
ASSERT_EQ(splits.size(), commitTasks.size());
44+
auto plan = exec::test::PlanBuilder().tableScan(rowType).planNode();
45+
assertQuery(plan, splits, "SELECT * FROM tmp");
46+
}
47+
};
48+
49+
TEST_F(IcebergInsertTest, basic) {
50+
auto rowType =
51+
ROW({"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"},
52+
{BIGINT(),
53+
INTEGER(),
54+
SMALLINT(),
55+
BOOLEAN(),
56+
REAL(),
57+
DECIMAL(18, 5),
58+
VARCHAR(),
59+
VARBINARY(),
60+
DATE(),
61+
TIMESTAMP(),
62+
ROW({"id", "name"}, {INTEGER(), VARCHAR()})});
63+
test(rowType, 0.2);
64+
}
65+
66+
TEST_F(IcebergInsertTest, mapAndArray) {
67+
auto rowType =
68+
ROW({"c1", "c2"}, {MAP(INTEGER(), VARCHAR()), ARRAY(VARCHAR())});
69+
test(rowType);
70+
}
71+
72+
#ifdef VELOX_ENABLE_PARQUET
73+
TEST_F(IcebergInsertTest, bigDecimal) {
74+
auto rowType = ROW({"c1"}, {DECIMAL(38, 5)});
75+
fileFormat_ = dwio::common::FileFormat::PARQUET;
76+
test(rowType);
77+
}
78+
#endif
79+
80+
} // namespace
81+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)