Skip to content

Commit da91626

Browse files
authored
feat: implement transaction api (#418)
- Introduce `Transaction` class to manage multi-operation table updates. - Add `Table::NewTransaction()` and `Table::NewUpdateProperties()` to initiate updates. - Move `PendingUpdate` and `UpdateProperties` to `src/iceberg/update/` and refactor them to use the transaction mechanism. - Add `StagedTable` to represent tables with uncommitted changes. - Update `InMemoryCatalog` and `RestCatalog` to align with the new update flow. - Refactor `Table` to support metadata refresh and location management within transactions. - Add comprehensive tests for transactions and property updates.
1 parent 39133d0 commit da91626

32 files changed

+1123
-540
lines changed

example/demo_example.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ int main(int argc, char** argv) {
5858
}
5959

6060
auto table = std::move(load_result.value());
61-
auto scan_result = table->NewScan()->Build();
61+
auto scan_builder = table->NewScan();
62+
if (!scan_builder.has_value()) {
63+
std::cerr << "Failed to create scan builder: " << scan_builder.error().message
64+
<< std::endl;
65+
return 1;
66+
}
67+
auto scan_result = scan_builder.value()->Build();
6268
if (!scan_result.has_value()) {
6369
std::cerr << "Failed to build scan: " << scan_result.error().message << std::endl;
6470
return 1;

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ set(ICEBERG_SOURCES
7272
table_requirements.cc
7373
table_scan.cc
7474
table_update.cc
75+
transaction.cc
7576
transform.cc
7677
transform_function.cc
7778
type.cc
79+
update/pending_update.cc
7880
update/update_properties.cc
7981
util/bucket_util.cc
8082
util/conversions.cc

src/iceberg/catalog.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class ICEBERG_EXPORT Catalog {
110110
/// \param location a location for the table; leave empty if unspecified
111111
/// \param properties a string map of table properties
112112
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
113-
virtual Result<std::unique_ptr<Table>> CreateTable(
113+
virtual Result<std::shared_ptr<Table>> CreateTable(
114114
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
115115
const std::string& location,
116116
const std::unordered_map<std::string, std::string>& properties) = 0;
@@ -121,7 +121,7 @@ class ICEBERG_EXPORT Catalog {
121121
/// \param requirements a list of table requirements
122122
/// \param updates a list of table updates
123123
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
124-
virtual Result<std::unique_ptr<Table>> UpdateTable(
124+
virtual Result<std::shared_ptr<Table>> UpdateTable(
125125
const TableIdentifier& identifier,
126126
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
127127
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;
@@ -175,7 +175,7 @@ class ICEBERG_EXPORT Catalog {
175175
/// \param identifier a table identifier
176176
/// \return instance of Table implementation referred to by identifier or
177177
/// ErrorKind::kNoSuchTable if the table does not exist
178-
virtual Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) = 0;
178+
virtual Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) = 0;
179179

180180
/// \brief Register a table with the catalog if it does not exist
181181
///

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -399,15 +399,15 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
399399
return table_idents;
400400
}
401401

402-
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
402+
Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
403403
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
404404
const std::string& location,
405405
const std::unordered_map<std::string, std::string>& properties) {
406406
std::unique_lock lock(mutex_);
407407
return NotImplemented("create table");
408408
}
409409

410-
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
410+
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
411411
const TableIdentifier& identifier,
412412
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
413413
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
@@ -434,9 +434,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
434434
root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location));
435435
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
436436

437-
return std::make_unique<Table>(identifier, std::move(updated),
438-
std::move(new_metadata_location), file_io_,
439-
std::static_pointer_cast<Catalog>(shared_from_this()));
437+
return Table::Make(identifier, std::move(updated), std::move(new_metadata_location),
438+
file_io_, shared_from_this());
440439
}
441440

442441
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
@@ -464,7 +463,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier& from,
464463
return NotImplemented("rename table");
465464
}
466465

467-
Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
466+
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
468467
const TableIdentifier& identifier) {
469468
if (!file_io_) [[unlikely]] {
470469
return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
@@ -479,9 +478,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
479478

480479
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
481480
TableMetadataUtil::Read(*file_io_, metadata_location));
482-
return std::make_unique<Table>(identifier, std::move(metadata),
483-
std::move(metadata_location), file_io_,
484-
std::static_pointer_cast<Catalog>(shared_from_this()));
481+
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
482+
file_io_, shared_from_this());
485483
}
486484

487485
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
@@ -500,9 +498,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
500498
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
501499
return UnknownError("The registry failed.");
502500
}
503-
return std::make_unique<Table>(identifier, std::move(metadata), metadata_file_location,
504-
file_io_,
505-
std::static_pointer_cast<Catalog>(shared_from_this()));
501+
return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_,
502+
shared_from_this());
506503
}
507504

508505
} // namespace iceberg

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ class ICEBERG_EXPORT InMemoryCatalog
7070

7171
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7272

73-
Result<std::unique_ptr<Table>> CreateTable(
73+
Result<std::shared_ptr<Table>> CreateTable(
7474
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
7575
const std::string& location,
7676
const std::unordered_map<std::string, std::string>& properties) override;
7777

78-
Result<std::unique_ptr<Table>> UpdateTable(
78+
Result<std::shared_ptr<Table>> UpdateTable(
7979
const TableIdentifier& identifier,
8080
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
8181
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
@@ -91,7 +91,7 @@ class ICEBERG_EXPORT InMemoryCatalog
9191

9292
Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;
9393

94-
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
94+
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
9595

9696
Result<std::shared_ptr<Table>> RegisterTable(
9797
const TableIdentifier& identifier,

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,15 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
240240
return NotImplemented("Not implemented");
241241
}
242242

243-
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
243+
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
244244
[[maybe_unused]] const TableIdentifier& identifier,
245245
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
246246
[[maybe_unused]] const std::string& location,
247247
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
248248
return NotImplemented("Not implemented");
249249
}
250250

251-
Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
251+
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
252252
[[maybe_unused]] const TableIdentifier& identifier,
253253
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
254254
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
@@ -278,7 +278,7 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
278278
return NotImplemented("Not implemented");
279279
}
280280

281-
Result<std::unique_ptr<Table>> RestCatalog::LoadTable(
281+
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
282282
[[maybe_unused]] const TableIdentifier& identifier) {
283283
return NotImplemented("Not implemented");
284284
}

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7171

7272
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7373

74-
Result<std::unique_ptr<Table>> CreateTable(
74+
Result<std::shared_ptr<Table>> CreateTable(
7575
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
7676
const std::string& location,
7777
const std::unordered_map<std::string, std::string>& properties) override;
7878

79-
Result<std::unique_ptr<Table>> UpdateTable(
79+
Result<std::shared_ptr<Table>> UpdateTable(
8080
const TableIdentifier& identifier,
8181
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
8282
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
@@ -92,7 +92,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
9292

9393
Status DropTable(const TableIdentifier& identifier, bool purge) override;
9494

95-
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
95+
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
9696

9797
Result<std::shared_ptr<Table>> RegisterTable(
9898
const TableIdentifier& identifier,

src/iceberg/meson.build

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,11 @@ iceberg_sources = files(
9494
'table_requirements.cc',
9595
'table_scan.cc',
9696
'table_update.cc',
97+
'transaction.cc',
9798
'transform.cc',
9899
'transform_function.cc',
99100
'type.cc',
101+
'update/pending_update.cc',
100102
'update/update_properties.cc',
101103
'util/bucket_util.cc',
102104
'util/conversions.cc',
@@ -175,7 +177,6 @@ install_headers(
175177
'name_mapping.h',
176178
'partition_field.h',
177179
'partition_spec.h',
178-
'pending_update.h',
179180
'result.h',
180181
'schema_field.h',
181182
'schema.h',
@@ -196,7 +197,6 @@ install_headers(
196197
'transform.h',
197198
'type_fwd.h',
198199
'type.h',
199-
'update/update_properties.h',
200200
],
201201
subdir: 'iceberg',
202202
)
@@ -205,6 +205,7 @@ subdir('catalog')
205205
subdir('expression')
206206
subdir('manifest')
207207
subdir('row')
208+
subdir('update')
208209
subdir('util')
209210

210211
if get_option('tests').enabled()

src/iceberg/pending_update.h

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)