diff --git a/src/turtle_kv/core/algo/compute_running_total.hpp b/src/turtle_kv/core/algo/compute_running_total.hpp index 33a0568..9ff8f73 100644 --- a/src/turtle_kv/core/algo/compute_running_total.hpp +++ b/src/turtle_kv/core/algo/compute_running_total.hpp @@ -14,10 +14,10 @@ namespace turtle_kv { //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -template +template inline batt::RunningTotal compute_running_total( batt::WorkerPool& worker_pool, - const MergeCompactor::ResultSet& result_set, + const MergeCompactor::ResultSet& result_set, DecayToItem decay_to_item [[maybe_unused]] = {}) { auto merged_edits = result_set.get(); diff --git a/src/turtle_kv/core/merge_compactor.cpp b/src/turtle_kv/core/merge_compactor.cpp index a50409e..a8b0b96 100644 --- a/src/turtle_kv/core/merge_compactor.cpp +++ b/src/turtle_kv/core/merge_compactor.cpp @@ -429,6 +429,12 @@ template /*static*/ auto MergeCompactor::ResultSet::concat(ResultSet&& first, ResultSet&& second) -> ResultSet { + if (first.size() > 0 && second.size() > 0) { + BATT_CHECK_LT(first.get_max_key(), second.get_min_key()) + << "All elements in the first ResultSet should be strictly less than the elements in the " + "second ResultSet!"; + } + ResultSet ans; //----- --- -- - - - - @@ -495,6 +501,8 @@ template chunk_from_second.offset += first_size; }); + ans.chunks_.back().offset = first_size + second.chunks_.back().offset; + first.clear(); second.clear(); diff --git a/src/turtle_kv/core/merge_compactor.test.cpp b/src/turtle_kv/core/merge_compactor.test.cpp index 651371a..3bf2cf1 100644 --- a/src/turtle_kv/core/merge_compactor.test.cpp +++ b/src/turtle_kv/core/merge_compactor.test.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -23,7 +24,10 @@ using namespace batt::int_types; using batt::as_seq; using batt::WorkerPool; +using llfs::StableStringStore; + using turtle_kv::CInterval; +using turtle_kv::DecayToItem; using turtle_kv::EditSlice; using turtle_kv::EditView; using turtle_kv::getenv_as; @@ -39,6 +43,8 @@ using turtle_kv::Status; using turtle_kv::StatusOr; using turtle_kv::ValueView; +using turtle_kv::testing::RandomStringGenerator; + namespace seq = turtle_kv::seq; constexpr usize kNumKeys = 16; @@ -482,4 +488,176 @@ TEST(MergeCompactor, ResultSetDropKeyRange) } } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +class ResultSetConcatTest : public ::testing::Test +{ + public: + void generate_edits(usize num_edits, bool needs_sort = true) + { + std::unordered_set keys_set; + + std::default_random_engine rng{/*seed=*/30}; + RandomStringGenerator generate_key; + while (this->all_edits_.size() < num_edits) { + KeyView key = generate_key(rng, this->store_); + if (keys_set.contains(key)) { + continue; + } + keys_set.emplace(key); + this->all_edits_.emplace_back(key, + ValueView::from_str(this->store_.store(std::string(100, 'a')))); + } + + if (needs_sort) { + std::sort(this->all_edits_.begin(), this->all_edits_.end(), KeyOrder{}); + } else { + if (std::is_sorted(this->all_edits_.begin(), this->all_edits_.end(), KeyOrder{})) { + std::swap(this->all_edits_.front(), this->all_edits_.back()); + } + } + } + + template + MergeCompactor::ResultSet concat(std::vector&& first, + std::vector&& second, + DecayToItem decay_to_item) + { + usize first_size = first.size(); + usize second_size = second.size(); + + MergeCompactor::ResultSet first_result_set; + first_result_set.append(std::move(first)); + MergeCompactor::ResultSet second_result_set; + second_result_set.append(std::move(second)); + + EXPECT_EQ(first_result_set.size(), first_size); + EXPECT_EQ(second_result_set.size(), second_size); + + MergeCompactor::ResultSet concatenated_result_set = + MergeCompactor::ResultSet::concat(std::move(first_result_set), + std::move(second_result_set)); + + return concatenated_result_set; + } + + template + void verify_result_set(const MergeCompactor::ResultSet& result_set, + const std::vector& edits) + { + EXPECT_EQ(result_set.size(), edits.size()); + + usize i = 0; + for (const EditView& edit : result_set.get()) { + EXPECT_EQ(edit, edits[i]); + ++i; + } + } + + llfs::StableStringStore store_; + std::vector all_edits_; +}; + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(ResultSetConcatTest, Concat) +{ + // Generate an edit batch of size 200. + // + usize n = 200; + this->generate_edits(n); + + // Divide the edit batch in half, and create ResultSet objects out of each half. + // + std::vector first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)}; + std::vector second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()}; + + MergeCompactor::ResultSet concatenated_result_set = + this->concat(std::move(first), std::move(second), DecayToItem{}); + + // Concatenated ResultSet should have the same size as the original edit batch, and should + // also contain the same items in the same order. + // + this->verify_result_set(concatenated_result_set, this->all_edits_); + + // Now, repeat the process qith unequal sized inputs + // + first.assign(this->all_edits_.begin(), this->all_edits_.begin() + (n / 4)); + second.assign(this->all_edits_.begin() + (n / 4), this->all_edits_.end()); + + concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem{}); + + this->verify_result_set(concatenated_result_set, this->all_edits_); + + // Finally, test with empty input. + // + first = {}; + second.assign(this->all_edits_.begin(), this->all_edits_.begin() + (n / 4)); + + concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem{}); + + this->verify_result_set(concatenated_result_set, + {this->all_edits_.begin(), this->all_edits_.begin() + (n / 4)}); + + first = {}; + second = {}; + concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem{}); + EXPECT_EQ(concatenated_result_set.size(), 0); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(ResultSetConcatTest, FragmentedConcat) +{ + usize n = 200; + this->generate_edits(n); + + std::vector first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)}; + std::vector second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()}; + + MergeCompactor::ResultSet first_result_set; + first_result_set.append(std::move(first)); + MergeCompactor::ResultSet second_result_set; + second_result_set.append(std::move(second)); + + // Drop some keys fron the beginning of the ResultSet. + // + first_result_set.drop_before_n(n / 10); + + // Drop some keys in the middle of the ResultSet. + // + auto second_range_begin = this->all_edits_.begin() + (3 * n / 5); + auto second_range_end = this->all_edits_.begin() + (3 * n / 4); + Interval second_range{second_range_begin->key, second_range_end->key}; + second_result_set.drop_key_range_half_open(second_range); + + MergeCompactor::ResultSet concatenated_result_set = + MergeCompactor::ResultSet::concat(std::move(first_result_set), + std::move(second_result_set)); + + std::vector concat_edits{this->all_edits_.begin() + (n / 10), + this->all_edits_.begin() + (3 * n / 5)}; + concat_edits.insert(concat_edits.end(), + this->all_edits_.begin() + (3 * n / 4), + this->all_edits_.end()); + this->verify_result_set(concatenated_result_set, concat_edits); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(ResultSetConcatTest, ConcatDeath) +{ + usize n = 200; + this->generate_edits(n, /*needs_sort*/ false); + + std::vector first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)}; + std::vector second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()}; + + // We should panic since first and second have overlapping key ranges. + // + EXPECT_DEATH(this->concat(std::move(first), std::move(second), DecayToItem{}), + "All elements in the first ResultSet should be strictly less than the elements in " + "the second ResultSet!"); +} + } // namespace diff --git a/src/turtle_kv/core/testing/generate.hpp b/src/turtle_kv/core/testing/generate.hpp index bafa95a..8ead962 100644 --- a/src/turtle_kv/core/testing/generate.hpp +++ b/src/turtle_kv/core/testing/generate.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include namespace turtle_kv { @@ -184,8 +185,11 @@ class RandomResultSetGenerator : public MinMaxSize } template - MergeCompactor::ResultSet - operator()(DecayToItem, Rng& rng, llfs::StableStringStore& store) + MergeCompactor::ResultSet operator()( + DecayToItem, + Rng& rng, + llfs::StableStringStore& store, + const std::vector& to_delete) { using ResultSet = MergeCompactor::ResultSet; using Item = typename ResultSet::value_type; @@ -193,12 +197,23 @@ class RandomResultSetGenerator : public MinMaxSize const usize n = this->Super::pick_size(rng); std::vector items; + for (const KeyView& delete_key : to_delete) { + items.emplace_back(delete_key, ValueView::deleted()); + } + + std::unordered_set deleted_items_set{to_delete.begin(), to_delete.end()}; while (items.size() < n) { - for (usize i = items.size(); i < n; ++i) { + for (usize i = items.size(); i < n;) { char ch = '_' + (i & 31); - items.emplace_back(this->key_generator_(rng, store), + KeyView key = this->key_generator_(rng, store); + if (deleted_items_set.count(key)) { + continue; + } + items.emplace_back(key, ValueView::from_str(store.store(std::string(this->value_size_, ch)))); + ++i; } + std::sort(items.begin(), items.end(), KeyOrder{}); items.erase(std::unique(items.begin(), items.end(), diff --git a/src/turtle_kv/core/testing/generate.test.cpp b/src/turtle_kv/core/testing/generate.test.cpp index b26c2a7..5076623 100644 --- a/src/turtle_kv/core/testing/generate.test.cpp +++ b/src/turtle_kv/core/testing/generate.test.cpp @@ -7,9 +7,14 @@ namespace { +using batt::int_types::usize; + using turtle_kv::DecayToItem; using turtle_kv::ItemView; using turtle_kv::KeyOrder; +using turtle_kv::KeyView; +using turtle_kv::StatusOr; +using turtle_kv::ValueView; using turtle_kv::testing::RandomResultSetGenerator; template @@ -24,10 +29,28 @@ TEST(GenerateTest, Test) g.set_size(200); - ResultSet result_set = g(DecayToItem{}, rng, store); + std::vector to_delete; + ResultSet result_set = g(DecayToItem{}, rng, store, to_delete); EXPECT_TRUE(std::is_sorted(result_set.get().begin(), result_set.get().end(), KeyOrder{})); EXPECT_EQ(result_set.get().size(), 200u); + + auto result_set_slice = result_set.get(); + usize i = 0; + for (const ItemView& edit : result_set_slice) { + if (i % 2) { + to_delete.emplace_back(edit.key); + } + ++i; + } + + ResultSet result_set_with_deletes = g(DecayToItem{}, rng, store, to_delete); + for (const KeyView& deleted_key : to_delete) { + StatusOr deleted_value = result_set_with_deletes.find_key(deleted_key); + EXPECT_TRUE(deleted_value.ok()); + EXPECT_EQ(*deleted_value, ValueView::deleted()); + } + EXPECT_EQ(to_delete.size(), result_set_with_deletes.size() / 2); } } // namespace diff --git a/src/turtle_kv/kv_store.cpp b/src/turtle_kv/kv_store.cpp index 50f0db2..3701109 100644 --- a/src/turtle_kv/kv_store.cpp +++ b/src/turtle_kv/kv_store.cpp @@ -650,11 +650,19 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ this->metrics_.mem_table_get_latency, observed_state->mem_table_->get(key)); + const auto return_memtable_value = + [](Optional mem_table_value, + FastCountMetric& get_count_metric) -> StatusOr { + get_count_metric.add(1); + if (mem_table_value->is_delete()) { + return {batt::StatusCode::kNotFound}; + } + return *mem_table_value; + }; + if (value) { if (!value->needs_combine()) { - this->metrics_.mem_table_get_count.add(1); - // VLOG(1) << "found key " << batt::c_str_literal(key) << " in active MemTable"; - return *value; + return return_memtable_value(value, this->metrics_.mem_table_get_count); } } @@ -676,13 +684,15 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ if (value) { *value = combine(*value, *delta_value); if (!value->needs_combine()) { - this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); - return *value; + return return_memtable_value( + value, + this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]); } } else { if (!delta_value->needs_combine()) { - this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); - return *delta_value; + return return_memtable_value( + delta_value, + this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]); } value = delta_value; } @@ -757,7 +767,6 @@ StatusOr KVStore::scan_keys(const KeyView& min_key, this->metrics_.scan_count.add(1); KVStoreScanner scanner{*this, min_key}; - scanner.set_keys_only(true); BATT_REQUIRE_OK(scanner.start()); return scanner.read_keys(items_out); @@ -766,9 +775,7 @@ StatusOr KVStore::scan_keys(const KeyView& min_key, // Status KVStore::remove(const KeyView& key) noexcept /*override*/ { - (void)key; - - return batt::StatusCode::kUnimplemented; + return this->put(key, ValueView::deleted()); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index 61f6d02..336b152 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -435,15 +435,37 @@ Status KVStoreScanner::set_next_item() ScanLevel* scan_level = this->heap_.first(); if (!this->next_item_) { - this->next_item_.emplace(scan_level->item(this->keys_only_)); + this->next_item_.emplace(scan_level->item()); } else if (this->next_item_->key == scan_level->key) { - if (!this->keys_only_ && this->next_item_->needs_combine()) { + // If this->next_item_->key == scan_level->key, we need to search for a terminal value for + // the item, so combine it if necessary. + // + if (this->next_item_->needs_combine()) { this->next_item_->value = combine(this->next_item_->value, scan_level->value()); } } else { - break; + // If the item stored in this->next_item_ does not have the same key as the first key in + // the current scan_level, we have reached a terminal value for this->next_item_. Now, + // we have to decide whether we want to keep this->next_item_ and break from the loop + // (returning the item to the function's caller) OR discard it, because the terminal value + // represents a deleted item. + // + if (this->next_item_->value == ValueView::deleted()) { + // The terminal value represents a deleted item, so discard it by setting this->next_item_ + // to None. Then, continue on to the next iteration of the loop, skipping the logic to + // advance the current scan_level. We do this because we now need to set the first key + // in the current scan_level to this->next_item_ to examine it next. + // + this->next_item_ = None; + if (this->needs_resume_) { + BATT_REQUIRE_OK(this->resume()); + } + continue; + } else { + break; + } } if (scan_level->advance()) { @@ -567,7 +589,7 @@ Status KVStoreScanner::set_next_item() //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -EditView KVStoreScanner::ScanLevel::item(bool key_only) const +EditView KVStoreScanner::ScanLevel::item() const { return batt::case_of( this->state_impl, @@ -575,58 +597,37 @@ EditView KVStoreScanner::ScanLevel::item(bool key_only) const BATT_PANIC() << "illegal state"; BATT_UNREACHABLE(); }, - [this, key_only](const MemTableScanState& state) -> EditView { + [this](const MemTableScanState& state) -> EditView { MemTableEntry entry; const bool found = state.mem_table_->hash_index().find_key(this->key, entry); BATT_CHECK(found); - if (key_only) { - return EditView{entry.key_, ValueView{}}; - } return EditView{entry.key_, entry.value_}; }, - [this, key_only](const MemTableScanState& state) -> EditView { + [this](const MemTableScanState& state) -> EditView { const MemTableEntry* entry = state.mem_table_->hash_index().unsynchronized_find_key(key); BATT_CHECK_NOT_NULLPTR(entry); - if (key_only) { - return EditView{entry->key_, ValueView{}}; - } return EditView{entry->key_, entry->value_}; }, - [key_only](const MemTableValueScanState& state) -> EditView { + [](const MemTableValueScanState& state) -> EditView { const MemTableValueEntry& entry = state.art_scanner_->get_value(); - if (key_only) { - return EditView{entry.key_view(), ValueView{}}; - } return EditView{entry.key_view(), entry.value_view()}; }, - [key_only](const MemTableValueScanState& state) -> EditView { + [](const MemTableValueScanState& state) -> EditView { const MemTableValueEntry& entry = state.art_scanner_->get_value(); - if (key_only) { - return EditView{entry.key_view(), ValueView{}}; - } return EditView{entry.key_view(), entry.value_view()}; }, [](const Slice& state) -> EditView { return state.front(); }, - [this, key_only](const TreeLevelScanState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const TreeLevelScanState& state) -> EditView { return EditView{this->key, get_value(state.kv_slice.front())}; }, - [this, key_only](const TreeLevelScanShardedState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const TreeLevelScanShardedState& state) -> EditView { return EditView{this->key, state.kv_slice.front_value()}; }, - [this, key_only](const ShardedLeafScanState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const ShardedLeafScanState& state) -> EditView { return EditView{this->key, BATT_OK_RESULT_OR_PANIC(state.leaf_scanner_->front_value())}; }); } diff --git a/src/turtle_kv/kv_store_scanner.hpp b/src/turtle_kv/kv_store_scanner.hpp index dc7d26c..83c9468 100644 --- a/src/turtle_kv/kv_store_scanner.hpp +++ b/src/turtle_kv/kv_store_scanner.hpp @@ -198,7 +198,7 @@ class KVStoreScanner /** \brief Returns the current item as an EditView. */ - EditView item(bool key_only) const; + EditView item() const; /** \brief Returns the value of the current item. */ @@ -306,11 +306,6 @@ class KVStoreScanner StatusOr read_keys(const Slice& buffer); - void set_keys_only(bool b) noexcept - { - this->keys_only_ = b; - } - //+++++++++++-+-+--+----- --- -- - - - -- private: Status validate_page_layout(i32 height, const llfs::PinnedPage& pinned_page); @@ -353,7 +348,6 @@ class KVStoreScanner boost::container::static_vector tree_scan_path_; boost::container::small_vector scan_levels_; StackMerger heap_; - bool keys_only_ = false; Optional sharded_leaf_scanner_; }; diff --git a/src/turtle_kv/tree/algo/nodes.hpp b/src/turtle_kv/tree/algo/nodes.hpp index 8bb8fe5..50c066a 100644 --- a/src/turtle_kv/tree/algo/nodes.hpp +++ b/src/turtle_kv/tree/algo/nodes.hpp @@ -179,7 +179,7 @@ struct NodeAlgorithms { BATT_REQUIRE_OK(combine_in_place(&value, subtree_result)); - if (!value) { + if (!value || value->is_delete()) { return {batt::StatusCode::kNotFound}; } diff --git a/src/turtle_kv/tree/algo/segmented_levels.hpp b/src/turtle_kv/tree/algo/segmented_levels.hpp index c9864e1..822233e 100644 --- a/src/turtle_kv/tree/algo/segmented_levels.hpp +++ b/src/turtle_kv/tree/algo/segmented_levels.hpp @@ -305,6 +305,18 @@ struct SegmentedLevelAlgorithms { return OkStatus(); } + /** \brief Merges the two given pivots, effectively erasing `right_pivot`. + */ + void merge_pivots(i32 left_pivot, i32 right_pivot) + { + const usize segment_count = this->level_.segment_count(); + + for (usize segment_i = 0; segment_i < segment_count; ++segment_i) { + SegmentT& segment = this->level_.get_segment(segment_i); + in_segment(segment).merge_pivots(left_pivot, right_pivot, this->level_); + } + } + /** \brief Invokes `fn` for each SegmentT& selected by `pivot_selector`. * * `pivot_selector` can be: diff --git a/src/turtle_kv/tree/algo/segments.hpp b/src/turtle_kv/tree/algo/segments.hpp index 54f8016..0798dfb 100644 --- a/src/turtle_kv/tree/algo/segments.hpp +++ b/src/turtle_kv/tree/algo/segments.hpp @@ -116,6 +116,22 @@ struct SegmentAlgorithms { return true; } + /** \brief Merges the two given pivots, effectively erasing `right_pivot`. + */ + template + [[nodiscard]] void merge_pivots(i32 left_pivot, i32 right_pivot, const LevelT& level) + { + BATT_CHECK(!this->segment_.is_pivot_active(left_pivot)); + + u32 new_flushed_upper_bound = this->segment_.get_flushed_item_upper_bound(level, right_pivot); + bool new_is_active = this->segment_.is_pivot_active(right_pivot); + + this->segment_.set_pivot_active(left_pivot, new_is_active); + this->segment_.set_flushed_item_upper_bound(left_pivot, new_flushed_upper_bound); + + this->segment_.remove_pivot(right_pivot); + } + /** \brief Invokes the speficied `fn` for each active pivot in the specified range, passing a * reference to the segment and the pivot index (i32). */ diff --git a/src/turtle_kv/tree/batch_update.hpp b/src/turtle_kv/tree/batch_update.hpp index 161c370..d76c0e6 100644 --- a/src/turtle_kv/tree/batch_update.hpp +++ b/src/turtle_kv/tree/batch_update.hpp @@ -29,18 +29,25 @@ struct BatchUpdateContext { /** \brief Uses the worker_pool to perform a parallel merge-compaction of the lines * produced by the passed `generator_fn`, up to and including (but stopping at) `max_key`. */ - template - StatusOr> merge_compact_edits( + template + StatusOr> merge_compact_edits( const KeyView& max_key, GeneratorFn&& generator_fn); /** \brief Computes and returns the running total (prefix sum) of the edit sizes in result_set. */ + template batt::RunningTotal compute_running_total( - const MergeCompactor::ResultSet& result_set) const + const MergeCompactor::ResultSet& result_set) const { - return ::turtle_kv::compute_running_total(this->worker_pool, result_set); + return ::turtle_kv::compute_running_total(this->worker_pool, result_set); } + + /** \brief Returns a `ResultSet` with only the edits from the batch passed into the function + * that decay to base-level items (e.g., no tombstones). + */ + MergeCompactor::ResultSet decay_batch_to_items( + MergeCompactor::ResultSet& batch); }; //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- @@ -90,9 +97,10 @@ std::ostream& operator<<(std::ostream& out, const BatchUpdate::TrimResult& t); //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -template -inline StatusOr> -BatchUpdateContext::merge_compact_edits(const KeyView& max_key, GeneratorFn&& generator_fn) +template +inline StatusOr> BatchUpdateContext::merge_compact_edits( + const KeyView& max_key, + GeneratorFn&& generator_fn) { MergeCompactor compactor{this->worker_pool}; @@ -100,10 +108,124 @@ BatchUpdateContext::merge_compact_edits(const KeyView& max_key, GeneratorFn&& ge BATT_REQUIRE_OK(BATT_FORWARD(generator_fn)(compactor)); compactor.finish_push_levels(); - MergeCompactor::EditBuffer edit_buffer; + MergeCompactor::OutputBuffer edit_buffer; this->worker_pool.reset(); return compactor.read(edit_buffer, max_key); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +inline MergeCompactor::ResultSet BatchUpdateContext::decay_batch_to_items( + MergeCompactor::ResultSet& batch) +{ + const batt::TaskCount max_tasks{this->worker_pool.size() + 1}; + std::vector decayed_items; + + if (max_tasks == 1) { + for (const EditView& edit : batch.get()) { + Optional maybe_item = to_item_view(edit); + if (maybe_item) { + decayed_items.emplace_back(EditView::from_item_view(*maybe_item)); + } + } + } else if (batch.size() > 0) { + const ParallelAlgoDefaults& algo_defaults = parallel_algo_defaults(); + + auto actual_edits = batch.get(); + const auto src_begin = actual_edits.begin(); + const auto src_end = actual_edits.end(); + + const batt::WorkSlicePlan plan{batt::WorkSliceParams{ + algo_defaults.copy_decayed_items.min_task_size, + max_tasks, + }, + src_begin, + src_end}; + + BATT_CHECK_GT(plan.n_tasks, 0); + + batt::SmallVec output_size_per_shard(plan.n_tasks); + BATT_CHECK_EQ(output_size_per_shard.size(), plan.n_tasks); + + // First count the number of non-decayed items in the output for each shard. + { + batt::ScopedWorkContext work_context{this->worker_pool}; + + BATT_CHECK_OK(batt::slice_work( + work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, task_index, task_offset, task_size, &output_size_per_shard] { + BATT_CHECK_LT(task_index, output_size_per_shard.size()); + + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + usize output_size = 0; + + for (; task_src_begin != task_src_end; ++task_src_begin) { + if (decays_to_item(*task_src_begin)) { + output_size += 1; + } + } + output_size_per_shard[task_index] = output_size; + }; + })) + << "worker_pool must not be closed!"; + } + + // Change to a rolling sum and do the actual copy. + // + usize output_total_size = 0; + batt::SmallVec output_shard_offset; + for (usize output_shard_size : output_size_per_shard) { + output_shard_offset.emplace_back(output_total_size); + output_total_size += output_shard_size; + } + + decayed_items.resize(output_total_size); + { + this->worker_pool.reset(); + + batt::ScopedWorkContext work_context{this->worker_pool}; + + BATT_CHECK_OK( + batt::slice_work(work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, + &output_shard_offset, + &output_size_per_shard, + task_index, + task_offset, + task_size, + &decayed_items] { + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + BATT_CHECK_LT(task_index, output_shard_offset.size()); + auto task_dst_begin = + std::next(decayed_items.data(), output_shard_offset[task_index]); + + for (; task_src_begin != task_src_end; ++task_src_begin) { + Optional maybe_item = to_item_view(*task_src_begin); + if (maybe_item) { + *task_dst_begin = EditView::from_item_view(*maybe_item); + ++task_dst_begin; + } + } + }; + })) + << "worker_pool must not be closed!"; + } + } + + MergeCompactor::ResultSet output_result_set; + output_result_set.append(std::move(decayed_items)); + return output_result_set; +} + } // namespace turtle_kv diff --git a/src/turtle_kv/tree/in_memory_leaf.cpp b/src/turtle_kv/tree/in_memory_leaf.cpp index 9c02cf4..829b373 100644 --- a/src/turtle_kv/tree/in_memory_leaf.cpp +++ b/src/turtle_kv/tree/in_memory_leaf.cpp @@ -8,6 +8,74 @@ namespace turtle_kv { +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ std::unique_ptr InMemoryLeaf::unpack( + llfs::PinnedPage&& pinned_leaf_page, + const TreeOptions& tree_options, + const PackedLeafPage& packed_leaf, + batt::WorkerPool& worker_pool) noexcept +{ + std::unique_ptr new_leaf = + std::make_unique(batt::make_copy(pinned_leaf_page), tree_options); + + const batt::TaskCount max_tasks{worker_pool.size() + 1}; + + Slice packed_items = packed_leaf.items_slice(); + std::vector buffer; + buffer.reserve(packed_items.size()); + + if (max_tasks == 1) { + for (const PackedKeyValue& pkv : packed_items) { + buffer.emplace_back(to_edit_view(pkv)); + } + } else { + const ParallelAlgoDefaults& algo_defaults = parallel_algo_defaults(); + + const auto src_begin = packed_items.begin(); + const auto src_end = packed_items.end(); + const auto dst_begin = buffer.begin(); + + const batt::WorkSlicePlan plan{batt::WorkSliceParams{ + algo_defaults.copy_edits.min_task_size, + max_tasks, + }, + src_begin, + src_end}; + + BATT_CHECK_GT(plan.n_tasks, 0); + + { + batt::ScopedWorkContext work_context{worker_pool}; + + BATT_CHECK_OK(slice_work(work_context, + plan, + /*gen_work_fn=*/ + [&](usize /*task_index*/, isize task_offset, isize task_size) { + return [src_begin, dst_begin, task_offset, task_size] { + auto task_src_begin = std::next(src_begin, task_offset); + auto task_src_end = std::next(task_src_begin, task_size); + auto task_dst_begin = std::next(dst_begin, task_offset); + + for (; task_src_begin != task_src_end; ++task_src_begin) { + *task_dst_begin = to_edit_view(*task_src_begin); + ++task_dst_begin; + } + }; + })) + << "work_context must not be closed!"; + } + } + + MergeCompactor::ResultSet result_set; + result_set.append(std::move(buffer)); + new_leaf->result_set = std::move(result_set); + + new_leaf->set_edit_size_totals(compute_running_total(worker_pool, new_leaf->result_set)); + + return {std::move(new_leaf)}; +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // SubtreeViability InMemoryLeaf::get_viability() @@ -149,6 +217,65 @@ auto InMemoryLeaf::make_split_plan() const -> StatusOr return plan; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> InMemoryLeaf::try_merge( + BatchUpdateContext& context, + std::unique_ptr sibling) noexcept +{ + if (sibling->result_set.empty()) { + BATT_CHECK(batt::is_case(this->get_viability())); + return nullptr; + } + + if (this->result_set.empty()) { + BATT_CHECK(batt::is_case(sibling->get_viability())); + this->pinned_leaf_page_ = std::move(sibling->pinned_leaf_page_); + this->result_set = std::move(sibling->result_set); + this->shared_edit_size_totals_ = sibling->shared_edit_size_totals_; + this->edit_size_totals = std::move(sibling->edit_size_totals); + return nullptr; + } + + BATT_CHECK_LT(this->get_max_key(), sibling->get_min_key()); + + this->result_set = MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(sibling->result_set)); + + this->set_edit_size_totals(context.compute_running_total(this->result_set)); + + return nullptr; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status InMemoryLeaf::apply_batch_update(BatchUpdate& update, + Optional>&& current_result_set) noexcept +{ + if (current_result_set) { + // A valid BoxedSeq was passed in. Merge compact this sequence with the incoming + // update. + // + BATT_ASSIGN_OK_RESULT(this->result_set, + update.context.merge_compact_edits( + global_max_key(), + [&](MergeCompactor& compactor) -> Status { + compactor.push_level(update.result_set.live_edit_slices()); + compactor.push_level(std::move(*current_result_set)); + return OkStatus(); + })); + } else { + // If nothing was passed in, we have a new leaf being populated for the first time (empty tree). + // + this->result_set = update.context.decay_batch_to_items(update.result_set); + } + + this->result_set.update_has_page_refs(update.result_set.has_page_refs()); + this->set_edit_size_totals(update.context.compute_running_total(this->result_set)); + + return OkStatus(); +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/tree/in_memory_leaf.hpp b/src/turtle_kv/tree/in_memory_leaf.hpp index 9d71d93..5e24c8a 100644 --- a/src/turtle_kv/tree/in_memory_leaf.hpp +++ b/src/turtle_kv/tree/in_memory_leaf.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -32,13 +33,20 @@ struct InMemoryLeaf { llfs::PinnedPage pinned_leaf_page_; TreeOptions tree_options; - MergeCompactor::ResultSet result_set; + MergeCompactor::ResultSet result_set; std::shared_ptr shared_edit_size_totals_; Optional edit_size_totals; mutable std::atomic future_id_{~u64{0}}; //+++++++++++-+-+--+----- --- -- - - - - + static std::unique_ptr unpack(llfs::PinnedPage&& pinned_leaf_page, + const TreeOptions& tree_options, + const PackedLeafPage& packed_leaf, + batt::WorkerPool& worker_pool) noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + explicit InMemoryLeaf(llfs::PinnedPage&& pinned_leaf_page, const TreeOptions& tree_options_arg) noexcept : pinned_leaf_page_{std::move(pinned_leaf_page)} @@ -91,6 +99,12 @@ struct InMemoryLeaf { StatusOr make_split_plan() const; + StatusOr> try_merge(BatchUpdateContext& context, + std::unique_ptr sibling) noexcept; + + Status apply_batch_update(BatchUpdate& update, + Optional>&& current_result_set) noexcept; + Status start_serialize(TreeSerializeContext& context); StatusOr finish_serialize(TreeSerializeContext& context); diff --git a/src/turtle_kv/tree/in_memory_node.cpp b/src/turtle_kv/tree/in_memory_node.cpp index 72fc976..00c399b 100644 --- a/src/turtle_kv/tree/in_memory_node.cpp +++ b/src/turtle_kv/tree/in_memory_node.cpp @@ -216,9 +216,12 @@ Status InMemoryNode::apply_batch_update(BatchUpdate& update, // BATT_REQUIRE_OK(this->update_buffer_insert(update)); - // Check for flush. + // Check for flush. If a flush is not necessary, batt::StatusCode::kUnavailable is returned. // - BATT_REQUIRE_OK(this->flush_if_necessary(update.context)); + Status flush_status = this->flush_if_necessary(update.context); + if (flush_status != OkStatus() && flush_status != batt::StatusCode::kUnavailable) { + return flush_status; + } // We don't need to check whether _this_ node needs to be split; the caller will take care of // that! @@ -288,7 +291,7 @@ Status InMemoryNode::update_buffer_insert(BatchUpdate& update) BATT_ASSIGN_OK_RESULT( // new_merged_level.result_set, - update.context.merge_compact_edits( // + update.context.merge_compact_edits( // global_max_key(), [&](MergeCompactor& compactor) -> Status { compactor.push_level(update.result_set.live_edit_slices()); @@ -352,6 +355,10 @@ Status InMemoryNode::flush_if_necessary(BatchUpdateContext& context, bool force_ // const MaxPendingBytes max_pending = this->find_max_pending(); + if (!max_pending.byte_count) { + return {batt::StatusCode::kUnavailable}; + } + const bool flush_needed = force_flush || // (max_pending.byte_count >= this->tree_options.min_flush_size()) || // this->has_too_many_tiers(); @@ -402,7 +409,7 @@ Status InMemoryNode::compact_update_buffer_levels(BatchUpdateContext& update_con Status segment_load_status; BATT_ASSIGN_OK_RESULT(new_merged_level.result_set, - update_context.merge_compact_edits( + update_context.merge_compact_edits( global_max_key(), [&](MergeCompactor& compactor) -> Status { this->push_levels_to_merge(compactor, @@ -447,10 +454,10 @@ StatusOr InMemoryNode::collect_pivot_batch(BatchUpdateContext& upda // Merge/compact all pending edits for the specified pivot. // - BATT_ASSIGN_OK_RESULT( // - pivot_batch.result_set, // - update_context.merge_compact_edits( // - /*max_key=*/pivot_key_range.upper_bound, // + BATT_ASSIGN_OK_RESULT( // + pivot_batch.result_set, // + update_context.merge_compact_edits( // + /*max_key=*/pivot_key_range.upper_bound, // [&](MergeCompactor& compactor) -> Status { this->push_levels_to_merge(compactor, update_context.page_loader, @@ -589,8 +596,7 @@ Status InMemoryNode::make_child_viable(BatchUpdateContext& update_context, i32 p //----- --- -- - - - - [&](const NeedsMerge&) -> Status { - BATT_PANIC() << "TODO [tastolfi 2025-03-16] implement me!"; - return batt::StatusCode::kUnimplemented; + return this->merge_child(update_context, pivot_i); }); return status; @@ -691,6 +697,572 @@ Status InMemoryNode::split_child(BatchUpdateContext& update_context, i32 pivot_i return OkStatus(); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Subtree InMemoryNode::try_shrink() +{ + BATT_CHECK_EQ(this->children.size(), 1); + BATT_CHECK_EQ(this->pending_bytes.size(), 1); + BATT_CHECK_EQ(this->pending_bytes[0], 0); + + return {std::move(this->children[0])}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i) noexcept +{ + // If there are no siblings to merge with, we must be in the middle of collapsing the tree + // (flush and shrink). + // + if (this->pivot_count() == 1) { + return OkStatus(); + } + + // Decide which sibling to merge with. Edge cases: child that needs merge is the leftmost or + // rightmost child in the node. + // + i32 sibling_i = pivot_i; + i32 right_sibling = pivot_i + 1; + i32 left_sibling = pivot_i - 1; + + bool need_update_buffer_compaction = false; + u64 active_segmented_levels = this->update_buffer.compute_active_pivots(); + if (pivot_i == 0) { + sibling_i = right_sibling; + } else if ((usize)pivot_i == this->pivot_count() - 1) { + sibling_i = left_sibling; + } else { + // If we don't have one of the edge cases, try and pick the sibling where the leftmost of + // {child, sibling} is inactive in all segmented levels. This way, the final merged pivot + // won't have on/off flushed ranges in segments. If this is not possible, pick the right + // sibling. + // + if (!get_bit(active_segmented_levels, pivot_i)) { + sibling_i = right_sibling; + } else { + if (!get_bit(active_segmented_levels, left_sibling)) { + sibling_i = left_sibling; + } else { + sibling_i = right_sibling; + } + } + } + + BATT_CHECK_NE(pivot_i, sibling_i); + if (get_bit(active_segmented_levels, std::min(pivot_i, sibling_i))) { + need_update_buffer_compaction = true; + } + + BATT_REQUIRE_OK(this->children[sibling_i].unpack_if_necessary(update_context.page_loader, + update_context.worker_pool, + this->tree_options, + this->height - 1)); + + // Erase rightmost of {child subtree, sibling} in all metadata of the parent. + // + const i32 right_pivot_i = std::max(pivot_i, sibling_i); + const i32 left_pivot_i = std::min(pivot_i, sibling_i); + const usize old_pivot_count = this->pivot_count(); + + // Call Subtree::try_merge. + // + StatusOr> status_or_merged = + this->children[left_pivot_i].try_merge(update_context, + std::move(this->children[right_pivot_i])); + if (!status_or_merged.ok()) { + LOG(ERROR) << BATT_INSPECT(status_or_merged.status()); + } + BATT_REQUIRE_OK(status_or_merged); + + if (*status_or_merged) { + // If try_merge returned a Subtree, a borrow occurred. + // + this->child_pages[left_pivot_i] = llfs::PinnedPage{}; + this->child_pages[right_pivot_i] = llfs::PinnedPage{}; + + // A borrow would have returned the updated right sibling (left sibling was updated in place), + // so overwrite what is currently in this->children. + // + this->children[right_pivot_i] = std::move(**status_or_merged); + + if ((usize)right_pivot_i == old_pivot_count - 1) { + BATT_ASSIGN_OK_RESULT( + this->max_key_, + this->children.back().get_max_key(update_context.page_loader, this->child_pages.back())); + } + + // Compute and store the new pivot key. + // + StatusOr right_child_min_key = + this->children[right_pivot_i].get_min_key(update_context.page_loader, + this->child_pages[right_pivot_i]); + BATT_REQUIRE_OK(right_child_min_key); + StatusOr left_child_max_key = + this->children[left_pivot_i].get_max_key(update_context.page_loader, + this->child_pages[left_pivot_i]); + BATT_REQUIRE_OK(left_child_max_key); + + const KeyView prefix = llfs::find_common_prefix(0, *left_child_max_key, *right_child_min_key); + const KeyView new_pivot_key = right_child_min_key->substr(0, prefix.size() + 1); + this->pivot_keys_[right_pivot_i] = new_pivot_key; + + // Compact the update buffer levels and recompute pending byte counts. + // + BATT_REQUIRE_OK(this->compact_update_buffer_levels(update_context)); + + BATT_CHECK_EQ(this->update_buffer.levels.size(), 1); + BATT_CHECK(batt::is_case(this->update_buffer.levels[0])); + MergedLevel& merged_edits = std::get(this->update_buffer.levels[0]); + + std::fill(this->pending_bytes.begin(), this->pending_bytes.end(), 0); + in_node(*this).update_pending_bytes(update_context.worker_pool, + merged_edits.result_set.get(), + PackedSizeOfEdit{}); + + return OkStatus(); + } + + this->child_pages[left_pivot_i] = llfs::PinnedPage{}; + this->child_pages.erase(this->child_pages.begin() + right_pivot_i); + + // Update the update_buffer levels. + // + if (need_update_buffer_compaction) { + BATT_REQUIRE_OK(this->compact_update_buffer_levels(update_context)); + } else { + for (Level& level : this->update_buffer.levels) { + if (batt::is_case(level)) { + SegmentedLevel& segmented_level = std::get(level); + in_segmented_level(*this, segmented_level, update_context.page_loader) + .merge_pivots(left_pivot_i, right_pivot_i); + } + } + } + + // Update this->children. + // + this->children.erase(this->children.begin() + right_pivot_i); + + // Update pending_bytes. The leftmost of {subtree, sibling} should be incremented by the removed + // subtree's pending bytes values. Erase the pending bytes of the removed subtree. + // + this->pending_bytes[left_pivot_i] += this->pending_bytes[right_pivot_i]; + this->pending_bytes.erase(this->pending_bytes.begin() + right_pivot_i); + + bool is_pending_bytes_exact = get_bit(this->pending_bytes_is_exact, left_pivot_i) & + get_bit(this->pending_bytes_is_exact, right_pivot_i); + this->pending_bytes_is_exact = + set_bit(this->pending_bytes_is_exact, left_pivot_i, is_pending_bytes_exact); + this->pending_bytes_is_exact = remove_bit(this->pending_bytes_is_exact, right_pivot_i); + + // Remove the pivot key of the removed child subtree from this->pivot_keys_. + // + this->pivot_keys_.erase(this->pivot_keys_.begin() + right_pivot_i); + + if ((usize)right_pivot_i == old_pivot_count - 1) { + BATT_ASSIGN_OK_RESULT( + this->max_key_, + this->children.back().get_max_key(update_context.page_loader, this->child_pages.back())); + } + + // Finally, split the newly merged child if needed. + // + SubtreeViability merged_viability = this->children[left_pivot_i].get_viability(); + if (batt::is_case(merged_viability)) { + BATT_REQUIRE_OK(this->make_child_viable(update_context, left_pivot_i)); + } else { + BATT_CHECK(batt::is_case(merged_viability)); + } + + return OkStatus(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> InMemoryNode::try_merge( + BatchUpdateContext& context, + std::unique_ptr sibling) noexcept +{ + //----- --- -- - - - - + // If merging both full nodes will cause the merged node's pivot count to exceed the max + // possible pivot count, try a borrow. + // + if (this->pivot_count() + sibling->pivot_count() > this->max_pivot_count()) { + bool borrow_from_sibling = false; + if (batt::is_case(this->get_viability())) { + borrow_from_sibling = true; + } else { + BATT_CHECK(batt::is_case(sibling->get_viability())); + } + + Status borrow_status = borrow_from_sibling ? this->try_borrow(context, *sibling) + : sibling->try_borrow(context, *this); + BATT_REQUIRE_OK(borrow_status); + + return {std::move(sibling)}; + } + + BATT_CHECK_LT(this->get_max_key(), sibling->get_min_key()); + + //----- --- -- - - - - + // Concatenate the update buffers. + // + usize i = 0; + for (; i < this->update_buffer.levels.size(); ++i) { + Level& left_level = this->update_buffer.levels[i]; + BATT_REQUIRE_OK(batt::case_of( // + left_level, // + [&](EmptyLevel&) -> Status { + if (i < sibling->update_buffer.levels.size()) { + Level& right_level = sibling->update_buffer.levels[i]; + if (!batt::is_case(right_level)) { + this->update_buffer.levels[i] = std::move(right_level); + } + } + + return OkStatus(); + }, + [&](MergedLevel& left_merged_level) -> Status { + if (i < sibling->update_buffer.levels.size()) { + BATT_REQUIRE_OK(batt::case_of( + sibling->update_buffer.levels[i], + [](EmptyLevel&) -> Status { + return OkStatus(); + }, + [&](MergedLevel& right_merged_level) -> Status { + this->update_buffer.levels[i] = + std::move(left_merged_level.concat(right_merged_level)); + return OkStatus(); + }, + [&](SegmentedLevel& right_segmented_level) -> Status { + // When merging a MergedLevel and a SegmentedLevel, create a new MergedLevel. + // + BATT_ASSIGN_OK_RESULT( + MergedLevel new_merged_level, + UpdateBuffer::concat_segmented_and_merged_level(context, + left_merged_level, + right_segmented_level, + *sibling)); + + this->update_buffer.levels[i] = std::move(new_merged_level); + + return OkStatus(); + })); + } + + return OkStatus(); + }, + [&](SegmentedLevel& left_segmented_level) -> Status { + if (i < sibling->update_buffer.levels.size()) { + BATT_REQUIRE_OK(batt::case_of( + sibling->update_buffer.levels[i], + [](EmptyLevel&) -> Status { + return OkStatus(); + }, + [&](MergedLevel& right_merged_level) -> Status { + BATT_ASSIGN_OK_RESULT( + MergedLevel new_merged_level, + UpdateBuffer::concat_segmented_and_merged_level(context, + right_merged_level, + left_segmented_level, + *this)); + + this->update_buffer.levels[i] = std::move(new_merged_level); + + return OkStatus(); + }, + [&](SegmentedLevel& right_segmented_level) -> Status { + // First shift the right level's bitsets to the left by the number of pivots + // in the left node. + // + usize left_node_pivot_count = this->pivot_count(); + for (usize segment_i = 0; segment_i < right_segmented_level.segment_count(); + ++segment_i) { + Segment& segment = right_segmented_level.get_segment(segment_i); + segment.flushed_pivots <<= left_node_pivot_count; + segment.active_pivots <<= left_node_pivot_count; + } + + left_segmented_level.segments.insert( + left_segmented_level.segments.end(), + std::make_move_iterator(right_segmented_level.segments.begin()), + std::make_move_iterator(right_segmented_level.segments.end())); + + return OkStatus(); + })); + } + + return OkStatus(); + })); + } + + // Carry over any remaining levels from the right node's update buffer. + // + for (; i < sibling->update_buffer.levels.size(); ++i) { + batt::case_of( + sibling->update_buffer.levels[i], + [](EmptyLevel&) { + // do nothing + }, + [&](MergedLevel& right_merged_level) { + this->update_buffer.levels.emplace_back(std::move(right_merged_level)); + }, + [&](SegmentedLevel& right_segmented_level) { + usize left_node_pivot_count = this->pivot_count(); + for (usize segment_i = 0; segment_i < right_segmented_level.segment_count(); + ++segment_i) { + Segment& segment = right_segmented_level.get_segment(segment_i); + segment.flushed_pivots <<= left_node_pivot_count; + segment.active_pivots <<= left_node_pivot_count; + } + + this->update_buffer.levels.emplace_back(right_segmented_level); + }); + } + + //----- --- -- - - - - + // Then, concatenate the two nodes' metadata. + // + this->max_key_ = sibling->max_key_; + + this->pending_bytes.insert(this->pending_bytes.end(), + sibling->pending_bytes.begin(), + sibling->pending_bytes.end()); + + sibling->pending_bytes_is_exact <<= this->pivot_count(); + this->pending_bytes_is_exact |= sibling->pending_bytes_is_exact; + + this->child_pages.insert(this->child_pages.end(), + std::make_move_iterator(sibling->child_pages.begin()), + std::make_move_iterator(sibling->child_pages.end())); + + // Modify the children Subtree vector after concatenating the update buffers, inserting into the + // vector will cause the pivot count to increase. + // + this->children.insert(this->children.end(), + std::make_move_iterator(sibling->children.begin()), + std::make_move_iterator(sibling->children.end())); + + // Remove the key upper bound for `this`. + // + this->pivot_keys_.pop_back(); + this->pivot_keys_.insert(this->pivot_keys_.end(), + sibling->pivot_keys_.begin(), + sibling->pivot_keys_.end()); + + return nullptr; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status InMemoryNode::try_borrow(BatchUpdateContext& context, InMemoryNode& sibling) noexcept +{ + BATT_CHECK(batt::is_case(sibling.get_viability())); + + bool right_sibling = this->get_max_key() < sibling.get_min_key(); + + BATT_CHECK_LT(this->pivot_count(), 4); + u32 num_pivots_to_borrow = 4 - this->pivot_count(); + + //----- --- -- - - - - + // Borrow node metadata. Modify all metadata right now except this->children, since modifying it + // will change the pivot count. + // + if (right_sibling) { + this->pending_bytes.insert(this->pending_bytes.end(), + sibling.pending_bytes.begin(), + sibling.pending_bytes.begin() + num_pivots_to_borrow); + sibling.pending_bytes.erase(sibling.pending_bytes.begin(), + sibling.pending_bytes.begin() + num_pivots_to_borrow); + + // Update this->pending_bytes_is_exact by placing the borrowed pending bytes bits from the + // right sibling directly after the pending bytes bits for this node. + // + u64 borrowed_bits = sibling.pending_bytes_is_exact & ((u64{1} << num_pivots_to_borrow) - 1); + u64 mask = ((u64{1} << num_pivots_to_borrow) - 1) << this->pivot_count(); + this->pending_bytes_is_exact = + (this->pending_bytes_is_exact & ~mask) | (borrowed_bits << this->pivot_count()); + sibling.pending_bytes_is_exact >>= num_pivots_to_borrow; + + // Get rid of the key upper bound in this node and insert the borrowed pivot keys, including + // one past num_pivots_to_borrow, to set the new key upper bound. + // + this->pivot_keys_.pop_back(); + this->pivot_keys_.insert(this->pivot_keys_.end(), + sibling.pivot_keys_.begin(), + sibling.pivot_keys_.begin() + num_pivots_to_borrow + 1); + sibling.pivot_keys_.erase(sibling.pivot_keys_.begin(), + sibling.pivot_keys_.begin() + num_pivots_to_borrow); + + this->child_pages.insert( + this->child_pages.end(), + std::make_move_iterator(sibling.child_pages.begin()), + std::make_move_iterator(sibling.child_pages.begin() + num_pivots_to_borrow)); + sibling.child_pages.erase(sibling.child_pages.begin(), + sibling.child_pages.begin() + num_pivots_to_borrow); + } else { + this->pending_bytes.insert(this->pending_bytes.begin(), + sibling.pending_bytes.end() - num_pivots_to_borrow, + sibling.pending_bytes.end()); + sibling.pending_bytes.erase(sibling.pending_bytes.end() - num_pivots_to_borrow, + sibling.pending_bytes.end()); + + // Shift this->pending_bytes_is_exact up by num_pivots_to_borrow, and place the borrowed + // pending bytes bits at the lowest order bits. + // + u64 borrowed_bits = + (sibling.pending_bytes_is_exact >> (sibling.pivot_count() - num_pivots_to_borrow)) & + ((u64{1} << num_pivots_to_borrow) - 1); + this->pending_bytes_is_exact <<= num_pivots_to_borrow; + this->pending_bytes_is_exact |= borrowed_bits; + u64 mask = ((u64{1} << num_pivots_to_borrow) - 1) + << (sibling.pivot_count() - num_pivots_to_borrow); + sibling.pending_bytes_is_exact &= ~mask; + + sibling.pivot_keys_.pop_back(); + this->pivot_keys_.insert(this->pivot_keys_.begin(), + sibling.pivot_keys_.end() - num_pivots_to_borrow, + sibling.pivot_keys_.end()); + sibling.pivot_keys_.erase(sibling.pivot_keys_.end() - num_pivots_to_borrow + 1, + sibling.pivot_keys_.end()); + + this->child_pages.insert( + this->child_pages.begin(), + std::make_move_iterator(sibling.child_pages.end() - num_pivots_to_borrow), + std::make_move_iterator(sibling.child_pages.end())); + sibling.child_pages.erase(sibling.child_pages.end() - num_pivots_to_borrow, + sibling.child_pages.end()); + } + + //----- --- -- - - - - + // Modify the update buffers of both `this` and `sibling`. + // Calculate the pivot range to borrow from the sibling, and then extract updates from the + // sibling's update buffer that contain this range. + // + i32 borrowed_min_pivot_i = -1; + KeyView borrowed_max_pivot_key; + if (right_sibling) { + borrowed_min_pivot_i = 0; + borrowed_max_pivot_key = sibling.get_pivot_key(num_pivots_to_borrow); + } else { + borrowed_min_pivot_i = sibling.pivot_count() - num_pivots_to_borrow; + borrowed_max_pivot_key = sibling.get_pivot_key(sibling.pivot_count()); + } + Interval borrowed_pivot_range{sibling.get_pivot_key(borrowed_min_pivot_i), + borrowed_max_pivot_key}; + + BatchUpdate borrowed_pivot_batch{ + .context = context, + .result_set = {}, + .edit_size_totals = None, + }; + + Status segment_load_status; + HasPageRefs has_page_refs{false}; + + BATT_ASSIGN_OK_RESULT( // + borrowed_pivot_batch.result_set, // + context.merge_compact_edits( // + /*max_key=*/borrowed_max_pivot_key, // + [&](MergeCompactor& compactor) -> Status { + sibling.push_levels_to_merge(compactor, + context.page_loader, + segment_load_status, + has_page_refs, + as_slice(sibling.update_buffer.levels), + /*min_pivot_i=*/borrowed_min_pivot_i, + /*only_pivot=*/false); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + borrowed_pivot_batch.result_set.drop_key_range_half_open(Interval{ + borrowed_max_pivot_key, + sibling.key_upper_bound(), + }); + + borrowed_pivot_batch.edit_size_totals = None; + + // Adjust the update buffer levels metadata in the sibling now that the borrowed updates have + // been extracted. + // + usize remove_pivot_i = right_sibling ? 0 : sibling.pivot_count() - num_pivots_to_borrow; + for (Level& level : sibling.update_buffer.levels) { + batt::case_of( // + level, // + [](EmptyLevel&) { + // nothing to do + }, + [&](MergedLevel& merged_level) { + merged_level.result_set.drop_key_range_half_open(borrowed_pivot_range); + }, + [&](SegmentedLevel& segmented_level) { + for (usize segment_i = 0; segment_i < segmented_level.segment_count(); ++segment_i) { + Segment& segment = segmented_level.get_segment(segment_i); + // Iterate backwards, since calling `remove_bit` will shift the bitset. + // TODO [vsilai 12-6-2025]: consider writing a `remove_bits` function to modify the bit + // sets more efficiently? This would only be for active_pivots. + // + for (usize j = remove_pivot_i + num_pivots_to_borrow - 1; j >= remove_pivot_i; --j) { + segment.remove_pivot(j); + } + } + }); + } + + // Insert the borrowed updates into the update buffer. + // + BATT_REQUIRE_OK(this->update_buffer_insert(borrowed_pivot_batch)); + + usize insert_pivot_i = right_sibling ? this->pivot_count() : 0; + for (Level& level : this->update_buffer.levels) { + batt::case_of( // + level, // + [](EmptyLevel&) { + // nothing to do + }, + [&](MergedLevel& merged_level) { + // nothing to do + }, + [&](SegmentedLevel& segmented_level) { + for (usize segment_i = 0; segment_i < segmented_level.segment_count(); ++segment_i) { + Segment& segment = segmented_level.get_segment(segment_i); + for (usize j = insert_pivot_i; j < insert_pivot_i + num_pivots_to_borrow; ++j) { + segment.insert_pivot(j, /*is_active*/ false); + } + } + }); + } + + //----- --- -- - - - - + // Finally, update the children Subtree vector for both nodes. + // + if (right_sibling) { + this->children.insert(this->children.end(), + std::make_move_iterator(sibling.children.begin()), + std::make_move_iterator(sibling.children.begin() + num_pivots_to_borrow)); + sibling.children.erase(sibling.children.begin(), + sibling.children.begin() + num_pivots_to_borrow); + + BATT_ASSIGN_OK_RESULT( + this->max_key_, + this->children.back().get_max_key(context.page_loader, this->child_pages.back())); + } else { + this->children.insert(this->children.begin(), + std::make_move_iterator(sibling.children.end() - num_pivots_to_borrow), + std::make_move_iterator(sibling.children.end())); + sibling.children.erase(sibling.children.end() - num_pivots_to_borrow, sibling.children.end()); + + BATT_ASSIGN_OK_RESULT( + sibling.max_key_, + sibling.children.back().get_max_key(context.page_loader, sibling.child_pages.back())); + } + + return OkStatus(); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // Status InMemoryNode::set_pivot_items_flushed(llfs::PageLoader& page_loader, @@ -1588,6 +2160,43 @@ void InMemoryNode::UpdateBuffer::SegmentedLevel::check_items_sorted( } } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ StatusOr InMemoryNode::UpdateBuffer::concat_segmented_and_merged_level( + BatchUpdateContext& context, + MergedLevel& merged_level, + SegmentedLevel& segmented_level, + InMemoryNode& segmented_level_node) noexcept +{ + MergedLevel new_merged_level; + HasPageRefs has_page_refs{false}; + Status segment_load_status; + + BoxedSeq segmented_level_slices = + SegmentedLevelScanner{ + segmented_level_node, + segmented_level, + context.page_loader, + llfs::PinPageToJob::kDefault, + segment_load_status, + /*min_pivot_i=*/0} // + | seq::boxed(); + + BATT_ASSIGN_OK_RESULT( // + new_merged_level.result_set, + context.merge_compact_edits( // + global_max_key(), + [&](MergeCompactor& compactor) -> Status { + compactor.push_level(merged_level.result_set.live_edit_slices()); + compactor.push_level(std::move(segmented_level_slices)); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + return new_merged_level; +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -1672,6 +2281,27 @@ void InMemoryNode::UpdateBuffer::Segment::insert_pivot(i32 pivot_i, bool is_acti this->flushed_pivots = insert_bit(this->flushed_pivots, pivot_i, false); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void InMemoryNode::UpdateBuffer::Segment::remove_pivot(i32 pivot_i) +{ + this->check_invariants(__FILE__, __LINE__); + auto on_scope_exit = batt::finally([&] { + this->check_invariants(__FILE__, __LINE__); + }); + + if (get_bit(this->flushed_pivots, pivot_i)) { + const i32 index = bit_rank(this->flushed_pivots, pivot_i); + BATT_ASSERT_GE(index, 0); + BATT_ASSERT_LT(index, this->flushed_item_upper_bound_.size()); + + this->flushed_item_upper_bound_.erase(this->flushed_item_upper_bound_.begin() + index); + } + + this->active_pivots = remove_bit(this->active_pivots, pivot_i); + this->flushed_pivots = remove_bit(this->flushed_pivots, pivot_i); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // void InMemoryNode::UpdateBuffer::Segment::pop_front_pivots(i32 count) @@ -1725,6 +2355,24 @@ SmallFn InMemoryNode::UpdateBuffer::dump() const }; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +u64 InMemoryNode::UpdateBuffer::compute_active_pivots() const +{ + u64 active_pivots = 0; + for (const Level& level : this->levels) { + if (batt::is_case(level)) { + const SegmentedLevel& segmented_level = std::get(level); + for (usize segment_i = 0; segment_i < segmented_level.segment_count(); ++segment_i) { + const Segment& segment = segmented_level.get_segment(segment_i); + active_pivots |= segment.get_active_pivots(); + } + } + } + + return active_pivots; +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // SmallFn InMemoryNode::UpdateBuffer::EmptyLevel::dump() const diff --git a/src/turtle_kv/tree/in_memory_node.hpp b/src/turtle_kv/tree/in_memory_node.hpp index 33ed541..15ad720 100644 --- a/src/turtle_kv/tree/in_memory_node.hpp +++ b/src/turtle_kv/tree/in_memory_node.hpp @@ -144,6 +144,11 @@ struct InMemoryNode { */ void insert_pivot(i32 pivot_i, bool is_active); + /** \brief Removes a pivot bit in this->active_pivots and this->flushed_pivots at position + * `pivot_i`. + */ + void remove_pivot(i32 pivot_i); + /** \brief Removes the specified number (`count`) pivots from the front of this segment. This * is used while splitting a node's update buffer. */ @@ -318,6 +323,14 @@ struct InMemoryNode { return estimated; } + MergedLevel concat(MergedLevel& that) + { + return MergedLevel{ + .result_set = MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(that.result_set)), + .segment_future_ids_ = {}}; + } + /** \brief Returns the number of segment leaf page build jobs added to the context. */ StatusOr start_serialize(const InMemoryNode& node, TreeSerializeContext& context); @@ -336,8 +349,16 @@ struct InMemoryNode { //+++++++++++-+-+--+----- --- -- - - - - + static StatusOr concat_segmented_and_merged_level( + BatchUpdateContext& context, // + MergedLevel& merged_level, + SegmentedLevel& segmented_level, + InMemoryNode& segmented_level_node) noexcept; + SmallFn dump() const; + u64 compute_active_pivots() const; + usize count_non_empty_levels() const { usize count = 0; @@ -525,10 +546,32 @@ struct InMemoryNode { */ Status try_flush(BatchUpdateContext& context); + /** \brief Attempt to collapse one level of the tree. Returns the node's single pivot. + */ + Subtree try_shrink(); + + /** \brief Merge the node in place with its right sibling. + * + * Returns nullptr if `sibling` is completely consumed; otherwise, returns the modified sibling + * since a borrow occurred. + */ + StatusOr> try_merge(BatchUpdateContext& context, + std::unique_ptr sibling) noexcept; + + /** \brief Attempts to make `this` (which needs a merge) viable by borrowing data + * from one of its siblings. Note that for this function, `sibling` does not have to be the right + * sibling. Both `this` and `sibling` are modified in place. + */ + Status try_borrow(BatchUpdateContext& context, InMemoryNode& sibling) noexcept; + /** \brief Splits the specified child, inserting a new pivot immediately after `pivot_i`. */ Status split_child(BatchUpdateContext& update_context, i32 pivot_i); + /** \brief Merges the specified child with a sibling. + */ + Status merge_child(BatchUpdateContext& update_context, i32 pivot_i) noexcept; + /** \brief Returns true iff there are no MergedLevels or unserialized Subtree children in this * node. */ diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 490d30d..5bae61c 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -56,6 +56,7 @@ using turtle_kv::KVStoreScanner; using turtle_kv::LatencyMetric; using turtle_kv::LatencyTimer; using turtle_kv::make_memory_page_cache; +using turtle_kv::NeedsMerge; using turtle_kv::NeedsSplit; using turtle_kv::None; using turtle_kv::OkStatus; @@ -74,7 +75,9 @@ using turtle_kv::TreeOptions; using turtle_kv::TreeSerializeContext; using turtle_kv::ValueView; using turtle_kv::testing::RandomResultSetGenerator; +using turtle_kv::testing::RandomStringGenerator; +using llfs::get_key; using llfs::StableStringStore; using batt::getenv_as; @@ -131,6 +134,16 @@ void verify_table_point_queries(Table& expected_table, Table& actual_table, Rng& } } +void verify_deleted_point_queries(Table& expected_table, + Table& actual_table, + const std::vector& deleted_keys) +{ + for (const KeyView& key : deleted_keys) { + EXPECT_EQ(expected_table.get(key).status(), batt::StatusCode::kNotFound); + EXPECT_EQ(actual_table.get(key).status(), batt::StatusCode::kNotFound); + } +} + void verify_range_scan(LatencyMetric* scan_latency, Table& expected_table, const Slice>& actual_read_items, @@ -365,6 +378,8 @@ void SubtreeBatchUpdateScenario::run() usize total_items = 0; + std::vector pending_deletes; + for (usize i = 0; i < max_i; ++i) { BatchUpdate update{ .context = @@ -373,7 +388,7 @@ void SubtreeBatchUpdateScenario::run() .page_loader = *page_loader, .cancel_token = batt::CancelToken{}, }, - .result_set = result_set_generator(DecayToItem{}, rng, strings), + .result_set = result_set_generator(DecayToItem{}, rng, strings, pending_deletes), .edit_size_totals = None, }; update.update_edit_size_totals(); @@ -386,6 +401,19 @@ void SubtreeBatchUpdateScenario::run() Status table_update_status = update_table(expected_table, update.result_set); ASSERT_TRUE(table_update_status.ok()) << BATT_INSPECT(table_update_status); + if (my_id == 0) { + if (!pending_deletes.empty()) { + pending_deletes.clear(); + } + + if (i % 5 == 0) { + BATT_CHECK(pending_deletes.empty()); + for (const EditView& edit : update.result_set.get()) { + pending_deletes.emplace_back(edit.key); + } + } + } + StatusOr tree_height = tree.get_height(*page_loader); ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); @@ -409,6 +437,10 @@ void SubtreeBatchUpdateScenario::run() verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + ASSERT_NO_FATAL_FAILURE( + verify_deleted_point_queries(expected_table, actual_table, pending_deletes)) + << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + if (((i + 1) % chi) == 0) { if (my_id == 0) { LOG(INFO) << "taking checkpoint..."; @@ -438,6 +470,10 @@ void SubtreeBatchUpdateScenario::run() verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + ASSERT_NO_FATAL_FAILURE( + verify_deleted_point_queries(expected_table, actual_table, pending_deletes)) + << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + { auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); std::unique_ptr scanner_page_job = page_cache->new_job(); @@ -493,4 +529,217 @@ void SubtreeBatchUpdateScenario::run() } } +TEST(InMemoryNodeTest, SubtreeDeletions) +{ + const usize key_size = 24; + const usize value_size = 100; + const usize chi = 4; + + TreeOptions tree_options = TreeOptions::with_default_values() // + .set_leaf_size(32 * kKiB) + .set_node_size(4 * kKiB) + .set_key_size_hint(key_size) + .set_value_size_hint(value_size); + + usize items_per_leaf = tree_options.flush_size() / tree_options.expected_item_size(); + usize total_batches = 81; + + std::vector keys; + keys.reserve(total_batches * items_per_leaf); + + std::string value_str = std::string(value_size, 'a'); + ValueView value = ValueView::from_str(value_str); + + std::default_random_engine rng{/*seed=*/3}; + RandomStringGenerator generate_key; + llfs::StableStringStore store; + std::unordered_set keys_set; + while (keys.size() < total_batches * items_per_leaf) { + KeyView key = generate_key(rng, store); + if (keys_set.contains(key)) { + continue; + } + keys_set.emplace(key); + keys.emplace_back(key); + } + std::sort(keys.begin(), keys.end(), llfs::KeyOrder{}); + + std::shared_ptr page_cache = + make_memory_page_cache(batt::Runtime::instance().default_scheduler(), + tree_options, + /*byte_capacity=*/1500 * kMiB); + + Subtree tree = Subtree::make_empty(); + ASSERT_TRUE(tree.is_serialized()); + + turtle_kv::OrderedMapTable> expected_table; + SubtreeTable actual_table{*page_cache, tree_options, tree}; + + batt::WorkerPool& worker_pool = batt::WorkerPool::null_pool(); + + Optional page_loader{*page_cache}; + + const auto create_insertion_batch = [&](usize batch_number) -> std::vector { + std::vector current_batch; + current_batch.reserve(items_per_leaf); + for (usize j = 0; j < items_per_leaf; ++j) { + current_batch.emplace_back(keys[(batch_number * items_per_leaf) + j], value); + } + + return current_batch; + }; + + const auto create_deletion_batch = [&](usize batch_number) -> std::vector { + std::vector current_batch; + current_batch.reserve(items_per_leaf); + + usize per_batch = items_per_leaf / total_batches; + usize batch_remainder = items_per_leaf % total_batches; + usize total_amount_per_batch = per_batch + (batch_number < batch_remainder ? 1 : 0); + + for (usize i = 0; i < total_batches; ++i) { + usize base_i = i * items_per_leaf; + usize offset = batch_number * per_batch + std::min(batch_number, batch_remainder); + + for (usize j = 0; j < total_amount_per_batch; ++j) { + current_batch.emplace_back(keys[base_i + offset + j], ValueView::deleted()); + } + } + BATT_CHECK_LE(current_batch.size(), items_per_leaf) << BATT_INSPECT(batch_number); + + return current_batch; + }; + + const auto apply_tree_updates = [&](auto batch_creation_func, bool perform_scan) { + for (usize i = 0; i < total_batches; ++i) { + std::vector current_batch = batch_creation_func(i); + + ResultSet result; + result.append(std::move(current_batch)); + + BatchUpdate update{ + .context = + BatchUpdateContext{ + .worker_pool = worker_pool, + .page_loader = *page_loader, + .cancel_token = batt::CancelToken{}, + }, + .result_set = std::move(result), + .edit_size_totals = None, + }; + update.update_edit_size_totals(); + + Status table_update_status = update_table(expected_table, update.result_set); + ASSERT_TRUE(table_update_status.ok()) << BATT_INSPECT(table_update_status); + + StatusOr tree_height_before = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height_before.ok()) << BATT_INSPECT(tree_height_before); + + Status status = // + tree.apply_batch_update(tree_options, + ParentNodeHeight{*tree_height_before + 1}, + update, + /*key_upper_bound=*/global_max_key(), + IsRoot{true}); + + ASSERT_TRUE(status.ok()) << BATT_INSPECT(status) << BATT_INSPECT(i); + + StatusOr tree_height_after = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height_after.ok()) << BATT_INSPECT(tree_height_after); + + if (*tree_height_after == 0) { + ASSERT_LT(*tree_height_after, *tree_height_before); + ASSERT_TRUE(tree.is_serialized()); + break; + } else { + ASSERT_FALSE(tree.is_serialized()); + } + + ASSERT_FALSE(batt::is_case(tree.get_viability())); + + ASSERT_NO_FATAL_FAILURE( + verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) + << BATT_INSPECT(i); + + if (((i + 1) % chi) == 0) { + std::unique_ptr page_job = page_cache->new_job(); + TreeSerializeContext context{tree_options, *page_job, worker_pool}; + + Status start_status = tree.start_serialize(context); + ASSERT_TRUE(start_status.ok()) << BATT_INSPECT(start_status); + + Status build_status = context.build_all_pages(); + ASSERT_TRUE(build_status.ok()) << BATT_INSPECT(build_status); + + StatusOr finish_status = tree.finish_serialize(context); + ASSERT_TRUE(finish_status.ok()) << BATT_INSPECT(finish_status); + + page_job->new_root(*finish_status); + Status commit_status = llfs::unsafe_commit_job(std::move(page_job)); + ASSERT_TRUE(commit_status.ok()) << BATT_INSPECT(commit_status); + + ASSERT_NO_FATAL_FAILURE( + verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) + << BATT_INSPECT(i); + + if (perform_scan) { + auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); + std::unique_ptr scanner_page_job = page_cache->new_job(); + + const usize scan_len = 20; + std::array, kMaxScanSize> scan_items_buffer; + KeyView min_key = update.result_set.get_min_key(); + + KVStoreScanner kv_scanner{*page_loader, + root_ptr->page_id_slot_or_panic(), + BATT_OK_RESULT_OR_PANIC(root_ptr->get_height(*page_loader)), + min_key, + tree_options.trie_index_sharded_view_size(), + None}; + + usize n_read = 0; + { + BATT_CHECK_OK(kv_scanner.start()); + for (auto& kv_pair : scan_items_buffer) { + Optional item = kv_scanner.next(); + if (!item) { + break; + } + kv_pair.first = item->key; + kv_pair.second = item->value; + ++n_read; + if (n_read == scan_len) { + break; + } + } + } + ASSERT_NO_FATAL_FAILURE(verify_range_scan(nullptr, + expected_table, + as_slice(scan_items_buffer.data(), n_read), + min_key, + scan_len)) + << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); + } + + page_loader.emplace(*page_cache); + } + } + }; + + LOG(INFO) << "Inserting key/value pairs into tree..."; + apply_tree_updates(create_insertion_batch, false); + + LOG(INFO) << "Deleting key/value pairs from tree..."; + for (usize i = 0; i < total_batches; ++i) { + bool perform_scan = i == 0 ? true : false; + StatusOr tree_height = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); + if (*tree_height > 0) { + apply_tree_updates(create_deletion_batch, perform_scan); + } else { + break; + } + } +} + } // namespace diff --git a/src/turtle_kv/tree/packed_leaf_page.hpp b/src/turtle_kv/tree/packed_leaf_page.hpp index 1a2ad3e..cb1b9ea 100644 --- a/src/turtle_kv/tree/packed_leaf_page.hpp +++ b/src/turtle_kv/tree/packed_leaf_page.hpp @@ -320,6 +320,8 @@ struct PackedLeafLayoutPlan { usize page_size; usize key_count; usize trie_index_reserved_size; + usize avg_key_len; + usize drop_count; usize trie_index_begin; usize trie_index_end; @@ -361,6 +363,8 @@ struct PackedLeafLayoutPlan { } void check_valid(std::string_view label) const; + + usize compute_trie_step_size() const; }; //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -370,6 +374,8 @@ BATT_OBJECT_PRINT_IMPL((inline), (page_size, key_count, trie_index_reserved_size, + avg_key_len, + drop_count, trie_index_begin, trie_index_end, leaf_header_begin, @@ -392,6 +398,43 @@ inline void PackedLeafLayoutPlan::check_valid(std::string_view label) const BATT_CHECK(this->is_valid()) << *this << BATT_INSPECT_STR(label); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +inline usize PackedLeafLayoutPlan::compute_trie_step_size() const +{ + BATT_CHECK_GT(this->key_count, 0); + BATT_CHECK_GT(this->avg_key_len, 0); + + // If there are no deleted items in this leaf, return 16. + // + if (this->drop_count == 0) { + return 16; + } + + usize trie_buffer_size = this->trie_index_end - this->trie_index_begin; + BATT_CHECK_GT(trie_buffer_size, 0); + + // Determine the number of pivot keys to intialize the trie with by using the size of the trie + // buffer and the average key length across the items in the leaf. + // + usize pivot_count = trie_buffer_size / this->avg_key_len; + usize step_size = (this->key_count + pivot_count - 1) / pivot_count; + + BATT_CHECK_GT(step_size, 0); + + // If the calculated step size is already a power of 2, return it now. + // + if ((step_size & (step_size - 1)) == 0) { + return step_size; + } + + // Otherwise, calculate the nearest power of 2 less than `step_size`. + // + i32 shift = batt::log2_floor(step_size); + BATT_CHECK_GE(shift, 0); + return usize{1} << shift; +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- // class PackedLeafLayoutPlanBuilder @@ -423,6 +466,7 @@ class PackedLeafLayoutPlanBuilder plan.page_size = this->page_size; plan.key_count = BATT_CHECKED_CAST(u32, this->key_count); plan.trie_index_reserved_size = this->trie_index_reserved_size; + plan.avg_key_len = plan.key_count > 0 ? this->key_data_size / plan.key_count : 0; usize offset = 0; const auto append = [&offset](usize size) { @@ -513,23 +557,21 @@ struct LeafItemsSummary { struct AddLeafItemsSummary { LeafItemsSummary operator()(const LeafItemsSummary& prior, const EditView& edit) const noexcept { - if (!decays_to_item(edit.value)) { - LOG(ERROR) << "TODO [tastolfi 2025-05-27] support deletes:" << BATT_INSPECT(edit); - - return LeafItemsSummary{ - .drop_count = prior.drop_count + 1, - .key_count = prior.key_count, - .key_data_size = prior.key_data_size, - .value_data_size = prior.value_data_size, - }; - } else { - return LeafItemsSummary{ - .drop_count = prior.drop_count, - .key_count = prior.key_count + 1, - .key_data_size = prior.key_data_size + (edit.key.size() + 4), - .value_data_size = prior.value_data_size + (1 + edit.value.size()), - }; + usize drop_count = prior.drop_count; + if (!decays_to_item(edit)) { + drop_count++; } + return LeafItemsSummary{ + .drop_count = drop_count, + .key_count = prior.key_count + 1, + .key_data_size = prior.key_data_size + (edit.key.size() + 4), + .value_data_size = prior.value_data_size + (1 + edit.value.size()), + }; + } + + LeafItemsSummary operator()(const LeafItemsSummary& prior, const ItemView& edit) const noexcept + { + return AddLeafItemsSummary{}(BATT_FORWARD(prior), EditView::from_item_view(edit)); } LeafItemsSummary operator()(const LeafItemsSummary& left, @@ -556,8 +598,6 @@ template LeafItemsSummary{}, AddLeafItemsSummary{}); - BATT_CHECK_EQ(summary.drop_count, 0); - PackedLeafLayoutPlanBuilder plan_builder; plan_builder.page_size = page_size; @@ -568,6 +608,8 @@ template PackedLeafLayoutPlan plan = plan_builder.build(); + plan.drop_count = summary.drop_count; + return plan; } @@ -693,7 +735,7 @@ inline PackedLeafPage* build_leaf_page(MutableBuffer buffer, if (plan.trie_index_reserved_size > 0) { const MutableBuffer trie_buffer{(void*)advance_pointer(buffer.data(), plan.trie_index_begin), plan.trie_index_end - plan.trie_index_begin}; - usize step_size = 16; + usize step_size = plan.compute_trie_step_size(); bool retried = false; batt::SmallVec pivot_keys; for (;;) { diff --git a/src/turtle_kv/tree/subtree.cpp b/src/turtle_kv/tree/subtree.cpp index a104b05..9650952 100644 --- a/src/turtle_kv/tree/subtree.cpp +++ b/src/turtle_kv/tree/subtree.cpp @@ -139,14 +139,7 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, auto new_leaf = std::make_unique(llfs::PinnedPage{}, tree_options); - new_leaf->result_set = update.result_set; - - if (!update.edit_size_totals) { - update.update_edit_size_totals(); - } - - new_leaf->set_edit_size_totals(std::move(*update.edit_size_totals)); - update.edit_size_totals = None; + BATT_REQUIRE_OK(new_leaf->apply_batch_update(update, /*current_result_set*/ None)); return Subtree{std::move(new_leaf)}; } @@ -175,18 +168,9 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, auto new_leaf = std::make_unique(batt::make_copy(pinned_page), tree_options); - BATT_ASSIGN_OK_RESULT( // - new_leaf->result_set, - update.context.merge_compact_edits( // - global_max_key(), - [&](MergeCompactor& compactor) -> Status { - compactor.push_level(update.result_set.live_edit_slices()); - compactor.push_level(packed_leaf.as_edit_slice_seq()); - return OkStatus(); - })); - - new_leaf->set_edit_size_totals( - update.context.compute_running_total(new_leaf->result_set)); + BATT_REQUIRE_OK( + new_leaf->apply_batch_update(update, + /*current_result_set*/ packed_leaf.as_edit_slice_seq())); return Subtree{std::move(new_leaf)}; @@ -214,19 +198,9 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, BATT_CHECK_EQ(parent_height, 2); - BATT_ASSIGN_OK_RESULT( - in_memory_leaf->result_set, - update.context.merge_compact_edits( - global_max_key(), - [&](MergeCompactor& compactor) -> Status { - compactor.push_level(update.result_set.live_edit_slices()); - compactor.push_level(in_memory_leaf->result_set.live_edit_slices()); - return OkStatus(); - })); - - in_memory_leaf->result_set.update_has_page_refs(update.result_set.has_page_refs()); - in_memory_leaf->set_edit_size_totals( - update.context.compute_running_total(in_memory_leaf->result_set)); + BATT_REQUIRE_OK(in_memory_leaf->apply_batch_update( + update, + /*current_result_set*/ in_memory_leaf->result_set.live_edit_slices())); return Subtree{std::move(in_memory_leaf)}; }, @@ -256,6 +230,16 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, return OkStatus(); }, [&](NeedsSplit needs_split) { + // TODO [vsilai 12-9-2025]: revist when VLDB changes are merged in. + // + if (needs_split.too_many_segments && !needs_split.too_many_pivots && + !needs_split.keys_too_large) { + Status flush_status = new_subtree->try_flush(update.context); + if (flush_status.ok() && batt::is_case(new_subtree->get_viability())) { + return OkStatus(); + } + } + Status status = new_subtree->split_and_grow(update.context, tree_options, key_upper_bound); @@ -265,9 +249,18 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, return status; }, [&](const NeedsMerge& needs_merge) { - BATT_CHECK(!needs_merge.single_pivot) - << "TODO [tastolfi 2025-03-26] implement flush and shrink"; - return OkStatus(); + // Only perform a flush and shrink if the root has a single pivot. + // + if (!needs_merge.single_pivot) { + return OkStatus(); + } + + Status status = new_subtree->flush_and_shrink(update.context); + + if (!status.ok()) { + LOG(INFO) << "flush_and_shrink failed;" << BATT_INSPECT(needs_merge); + } + return status; })); } @@ -306,6 +299,52 @@ Status Subtree::split_and_grow(BatchUpdateContext& context, return OkStatus(); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status Subtree::flush_and_shrink(BatchUpdateContext& context) noexcept +{ + BATT_CHECK(!this->locked_.load()); + + BATT_CHECK(!this->is_serialized()); + + while (!is_root_viable(this->get_viability())) { + // First, try flushing. If flushing makes the root viable, return immediately. + // + Status flush_status = this->try_flush(context); + if (flush_status != OkStatus() && flush_status != batt::StatusCode::kUnavailable) { + return flush_status; + } + + SubtreeViability current_viability = this->get_viability(); + if (is_root_viable(current_viability)) { + break; + } + + // Nothing was available to flush since the node's update buffer is empty. Try collapsing one + // level of the tree. + // + if (flush_status == batt::StatusCode::kUnavailable) { + // Note: At this point, we must have a node and not a leaf, since the `is_root_viable` check + // above will return Viable` for a leaf and we break out of the loop in that case. + // + BATT_REQUIRE_OK(this->try_shrink()); + } + } + + // If the root is a leaf and there are no items in the leaf, set the root to be an empty subtree. + // + if (batt::is_case>(this->impl_)) { + std::unique_ptr& root_leaf = std::get>(this->impl_); + BATT_CHECK(root_leaf); + + if (!root_leaf->get_item_count()) { + this->impl_ = llfs::PageIdSlot::from_page_id(llfs::PageId{}); + } + } + + return OkStatus(); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // StatusOr Subtree::get_height(llfs::PageLoader& page_loader) const @@ -526,6 +565,40 @@ StatusOr> Subtree::try_split(BatchUpdateContext& context) }); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> Subtree::try_merge(BatchUpdateContext& context, + Subtree&& sibling) noexcept +{ + BATT_CHECK(!this->locked_.load()); + + return batt::case_of( + this->impl_, + + [&](const llfs::PageIdSlot& page_id_slot) -> StatusOr> { + BATT_PANIC() << "Cannot try merging a serialized subtree!"; + + return {batt::StatusCode::kUnimplemented}; + }, + + [&](auto& in_memory) -> StatusOr> { + using PtrT = std::decay_t; + + BATT_CHECK(batt::is_case(sibling.impl_)); + auto& sibling_ptr = std::get(sibling.impl_); + BATT_CHECK(sibling_ptr); + + BATT_ASSIGN_OK_RESULT(PtrT merged_subtree, + in_memory->try_merge(context, std::move(sibling_ptr))); + + if (merged_subtree == nullptr) { + return Optional{None}; + } + + return {Subtree{std::move(merged_subtree)}}; + }); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // Status Subtree::try_flush(BatchUpdateContext& context) @@ -540,7 +613,7 @@ Status Subtree::try_flush(BatchUpdateContext& context) }, [&](const std::unique_ptr& leaf [[maybe_unused]]) -> Status { - return OkStatus(); + return {batt::StatusCode::kUnavailable}; }, [&](const std::unique_ptr& node) -> Status { @@ -548,6 +621,34 @@ Status Subtree::try_flush(BatchUpdateContext& context) }); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status Subtree::try_shrink() noexcept +{ + BATT_CHECK(!this->locked_.load()); + + StatusOr new_root = batt::case_of( + this->impl_, + + [&](const llfs::PageIdSlot& page_id_slot [[maybe_unused]]) -> StatusOr { + return {batt::StatusCode::kUnimplemented}; + }, + + [&](const std::unique_ptr& leaf [[maybe_unused]]) -> StatusOr { + return {batt::StatusCode::kUnavailable}; + }, + + [&](const std::unique_ptr& node) -> StatusOr { + return node->try_shrink(); + }); + + BATT_REQUIRE_OK(new_root); + + this->impl_ = std::move(new_root->impl_); + + return OkStatus(); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // llfs::PackedPageId Subtree::packed_page_id_or_panic() const @@ -645,4 +746,55 @@ void Subtree::lock() this->locked_.store(true); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status Subtree::unpack_if_necessary(llfs::PageLoader& page_loader, + batt::WorkerPool& worker_pool, + const TreeOptions& tree_options, + i32 height) noexcept +{ + BATT_CHECK_GT(height, 0); + + if (this->is_serialized()) { + llfs::PageIdSlot& page_id_slot = std::get(this->impl_); + + BATT_CHECK(page_id_slot.is_valid()); + + const llfs::PageLayoutId expected_layout = Subtree::expected_layout_for_height(height); + + StatusOr status_or_pinned_page = page_id_slot.load_through( + page_loader, + llfs::PageLoadOptions{ + expected_layout, + llfs::PinPageToJob::kDefault, + llfs::OkIfNotFound{false}, + llfs::LruPriority{(height > 2) ? kNodeLruPriority : kLeafLruPriority}, + }); + + BATT_REQUIRE_OK(status_or_pinned_page) << BATT_INSPECT(height); + + llfs::PinnedPage& pinned_page = *status_or_pinned_page; + + if (height == 1) { + const PackedLeafPage& packed_leaf = PackedLeafPage::view_of(pinned_page); + + std::unique_ptr new_leaf = InMemoryLeaf::unpack(batt::make_copy(pinned_page), + tree_options, + packed_leaf, + worker_pool); + + this->impl_ = std::move(new_leaf); + } else { + const PackedNodePage& packed_node = PackedNodePage::view_of(pinned_page); + + BATT_ASSIGN_OK_RESULT( + std::unique_ptr node, + InMemoryNode::unpack(batt::make_copy(pinned_page), tree_options, packed_node)); + + this->impl_ = std::move(node); + } + } + + return OkStatus(); +} } // namespace turtle_kv diff --git a/src/turtle_kv/tree/subtree.hpp b/src/turtle_kv/tree/subtree.hpp index 1dc48b6..1e3981c 100644 --- a/src/turtle_kv/tree/subtree.hpp +++ b/src/turtle_kv/tree/subtree.hpp @@ -138,10 +138,23 @@ class Subtree */ StatusOr> try_split(BatchUpdateContext& context); - /** \brief Attempt to make the root viable by flushing a batch. + /** \brief Attempts to merge the given Subtree in place with its right sibling. + * + * If the in place merge is successful, `sibling` is completely consumed and `None` is returned. + * + * If a borrow needs to occur, `this` is modified in place and the modified sibling is returned. + */ + StatusOr> try_merge(BatchUpdateContext& context, Subtree&& sibling) noexcept; + + /** \brief Attempt to make the root viable by flushing a batch. If nothing is available to + * flush, returns batt::StatusCode::kUnavailable. */ Status try_flush(BatchUpdateContext& context); + /** \brief Attempt to collapse a level of the tree. + */ + Status try_shrink() noexcept; + /** \brief Returns true iff this Subtree has no in-memory modifications. */ bool is_serialized() const; @@ -172,12 +185,26 @@ class Subtree */ bool is_locked() const; + /** \brief Converts a serialized Subtree to its in-memory equivalent, modifying the Subtree in + * place. If the Subtree is already an in-memory type, this function does nothing. + */ + Status unpack_if_necessary(llfs::PageLoader& page_loader, + batt::WorkerPool& worker_pool, + const TreeOptions& tree_options, + i32 height) noexcept; + //+++++++++++-+-+--+----- --- -- - - - - private: Status split_and_grow(BatchUpdateContext& context, const TreeOptions& tree_options, const KeyView& key_upper_bound); + /** \brief Called when the root of the tree is a node with a single pivot. This function + * flushes the root's update buffer until its is either empty + * (causing the tree to shrink in height) or until it gains more pivots. + */ + Status flush_and_shrink(BatchUpdateContext& context) noexcept; + //+++++++++++-+-+--+----- --- -- - - - - std::variant, std::unique_ptr> diff --git a/src/turtle_kv/tree/subtree_viability.hpp b/src/turtle_kv/tree/subtree_viability.hpp index d2984e0..de00f60 100644 --- a/src/turtle_kv/tree/subtree_viability.hpp +++ b/src/turtle_kv/tree/subtree_viability.hpp @@ -108,4 +108,19 @@ inline bool compacting_levels_might_fix(const SubtreeViability& viability) }); } +inline bool is_root_viable(const SubtreeViability& viability) +{ + return batt::case_of( + viability, + [](const Viable&) { + return true; + }, + [](const NeedsSplit&) { + return false; + }, + [](const NeedsMerge& needs_merge) { + return !needs_merge.single_pivot; + }); +} + } // namespace turtle_kv diff --git a/src/turtle_kv/tree/testing/random_leaf_generator.hpp b/src/turtle_kv/tree/testing/random_leaf_generator.hpp index 7f128be..eb1b74b 100644 --- a/src/turtle_kv/tree/testing/random_leaf_generator.hpp +++ b/src/turtle_kv/tree/testing/random_leaf_generator.hpp @@ -59,7 +59,7 @@ class RandomLeafGenerator // Generate a sorted run of random key/value pairs. // - result.result_set = this->items_generator_(decay_to_items, rng, store); + result.result_set = this->items_generator_(decay_to_items, rng, store, {}); batt::WorkerPool& worker_pool = batt::WorkerPool::null_pool(); @@ -72,7 +72,7 @@ class RandomLeafGenerator // Compute a running total of packed sizes, so we can split the result set in to leaf pages. // batt::RunningTotal running_total = - compute_running_total(worker_pool, result.result_set, DecayToItem{}); + compute_running_total(worker_pool, result.result_set, DecayToItem{}); SplitParts page_parts = split_parts( // running_total, //