From b5fa995495099e2d9da3daf377e5a2080cbbc4d4 Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 29 Sep 2025 17:32:42 +0800 Subject: [PATCH 1/6] save work Signed-off-by: xufei --- include/pingcap/kv/LockResolver.h | 22 +++++++--------- src/kv/LockResolver.cc | 44 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index 1376347..77078e5 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -9,6 +9,8 @@ #include #include +#include "kvrpcpb.pb.h" + namespace pingcap { namespace kv @@ -23,22 +25,14 @@ struct TxnStatus ::kvrpcpb::Action action; std::optional<::kvrpcpb::LockInfo> primary_lock; bool isCommitted() const { return ttl == 0 && commit_ts > 0; } + bool isRollback() const + { + return ttl == 0 && (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::TTLExpireRollback || action == kvrpcpb::Action::LockNotExistRollback); + } bool isCacheable() const { - if (isCommitted()) - { - return true; - } - if (ttl == 0) - { - if (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::LockNotExistRollback - || action == kvrpcpb::Action::TTLExpireRollback) - { - return true; - } - } - return false; + return isCommitted() || isRollback(); } }; @@ -225,6 +219,8 @@ class LockResolver int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & pushed); + int64_t getBypassLockTs(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & bypass_lock_ts); + int64_t resolveLocks( Backoffer & bo, uint64_t caller_start_ts, diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index 40dc6ef..ef69428 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -19,6 +19,50 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std return resolveLocks(bo, caller_start_ts, locks, pushed, false); } +int64_t LockResolver::getBypassLockTs( + Backoffer & bo, + uint64_t caller_start_ts, + std::vector & locks, + std::vector & bypass_lock_ts) +{ + TxnExpireTime before_txn_expired; + if (locks.empty()) + return before_txn_expired.value(); + bypass_lock_ts.reserve(locks.size()); + for (auto & lock : locks) + { + TxnStatus status; + try + { + status = getTxnStatusFromLock(bo, lock, caller_start_ts, false); + } + catch (Exception & e) + { + log->warning("get txn status failed: " + e.displayText()); + // each lock is independent, so we can continue to check other locks + continue; + } + + if (status.ttl == 0) + { + if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) + { + bypass_lock_ts.push_back(lock->txn_id); + } + } + else // status.ttl != 0 + { + auto before_txn_expired_time = cluster->oracle->untilExpired(lock->txn_id, status.ttl); + before_txn_expired.update(before_txn_expired_time); + if (status.action == ::kvrpcpb::MinCommitTSPushed) + { + bypass_lock_ts.push_back(lock->txn_id); + } + } + } + return before_txn_expired.value(); +} + int64_t LockResolver::resolveLocks( Backoffer & bo, uint64_t caller_start_ts, From 905c028c7b03f9b7ece94bd65d87245a3b450c7a Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 13 Oct 2025 17:14:56 +0800 Subject: [PATCH 2/6] save work Signed-off-by: xufei --- include/pingcap/coprocessor/Client.h | 1 - include/pingcap/kv/Backoff.h | 1 + include/pingcap/kv/Cluster.h | 2 + include/pingcap/kv/LockResolver.h | 14 ++++- src/kv/Cluster.cc | 6 ++ src/kv/LockResolver.cc | 83 +++++++++++++++++++++++----- 6 files changed, 90 insertions(+), 17 deletions(-) diff --git a/include/pingcap/coprocessor/Client.h b/include/pingcap/coprocessor/Client.h index 945d681..ed85698 100644 --- a/include/pingcap/coprocessor/Client.h +++ b/include/pingcap/coprocessor/Client.h @@ -5,7 +5,6 @@ #include #include -#include #include #include #include diff --git a/include/pingcap/kv/Backoff.h b/include/pingcap/kv/Backoff.h index 4019767..19839df 100644 --- a/include/pingcap/kv/Backoff.h +++ b/include/pingcap/kv/Backoff.h @@ -98,6 +98,7 @@ constexpr int prewriteMaxBackoff = 20000; constexpr int commitMaxBackoff = 41000; constexpr int splitRegionBackoff = 20000; constexpr int cleanupMaxBackoff = 20000; +constexpr int bgResolveLockMaxBackoff = 20000; constexpr int copBuildTaskMaxBackoff = 5000; constexpr int copNextMaxBackoff = 60000; constexpr int pessimisticLockMaxBackoff = 20000; diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index ec2eac0..c23870f 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -67,6 +67,8 @@ struct Cluster mpp_prober->stop(); if (region_cache) region_cache->stop(); + if (lock_resolver) + lock_resolver->stopBgResolve(); thread_pool->stop(); } diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index 77078e5..e453737 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -5,9 +5,11 @@ #include #include +#include #include #include #include +#include #include "kvrpcpb.pb.h" @@ -207,6 +209,10 @@ class LockResolver cluster = cluster_; } + void backgroundResolve(); + void addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector & locks); + void stopBgResolve(); + // resolveLocks tries to resolve Locks. The resolving process is in 3 steps: // 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too // old are considered orphan locks and will be handled later. If all locks @@ -219,7 +225,7 @@ class LockResolver int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & pushed); - int64_t getBypassLockTs(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & bypass_lock_ts); + int64_t getBypassLockTs(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); int64_t resolveLocks( Backoffer & bo, @@ -292,6 +298,12 @@ class LockResolver std::unordered_map resolved; std::queue cached; + // fields for background resolve + std::mutex bg_mutex; + std::condition_variable bg_cv; + std::atomic stopped{false}; + std::vector>> pending_locks; + Logger * log; }; diff --git a/src/kv/Cluster.cc b/src/kv/Cluster.cc index 70b6c90..906b0d7 100644 --- a/src/kv/Cluster.cc +++ b/src/kv/Cluster.cc @@ -42,6 +42,12 @@ void Cluster::startBackgroundTasks() region_cache->updateCachePeriodically(); }); } + if (lock_resolver) + { + thread_pool->enqueue([this] { + lock_resolver->backgroundResolve(); + }); + } } } // namespace kv diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index ef69428..ddf725f 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -1,7 +1,13 @@ #include +#include #include #include +#include +#include +#include +#include + namespace pingcap { namespace kv @@ -22,19 +28,22 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std int64_t LockResolver::getBypassLockTs( Backoffer & bo, uint64_t caller_start_ts, - std::vector & locks, + const std::unordered_map> & locks, std::vector & bypass_lock_ts) { TxnExpireTime before_txn_expired; if (locks.empty()) return before_txn_expired.value(); bypass_lock_ts.reserve(locks.size()); - for (auto & lock : locks) + for (const auto & lock_entry : locks) { + // should not happen, just for safety + if (lock_entry.second.empty()) + continue; TxnStatus status; try { - status = getTxnStatusFromLock(bo, lock, caller_start_ts, false); + status = getTxnStatusFromLock(bo, lock_entry.second[0], caller_start_ts, false); } catch (Exception & e) { @@ -47,16 +56,23 @@ int64_t LockResolver::getBypassLockTs( { if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) { - bypass_lock_ts.push_back(lock->txn_id); + bypass_lock_ts.push_back(lock_entry.first); + } + if (status.isRollback() || status.isCommitted()) + { + // resolve lock in background threads if the status is determined + // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small + // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); } } else // status.ttl != 0 { - auto before_txn_expired_time = cluster->oracle->untilExpired(lock->txn_id, status.ttl); + auto before_txn_expired_time = cluster->oracle->untilExpired(lock_entry.first, status.ttl); before_txn_expired.update(before_txn_expired_time); if (status.action == ::kvrpcpb::MinCommitTSPushed) { - bypass_lock_ts.push_back(lock->txn_id); + bypass_lock_ts.push_back(lock_entry.first); } } } @@ -74,7 +90,6 @@ int64_t LockResolver::resolveLocks( if (locks.empty()) return before_txn_expired.value(); std::unordered_map> clean_txns; - bool push_fail = false; if (!for_write) { pushed.reserve(locks.size()); @@ -94,7 +109,6 @@ int64_t LockResolver::resolveLocks( { log->warning("get txn status failed: " + e.displayText()); before_txn_expired.update(0); - pushed.clear(); return before_txn_expired.value(); } @@ -141,7 +155,6 @@ int64_t LockResolver::resolveLocks( { log->warning("resolve txn failed: " + e.displayText()); before_txn_expired.update(0); - pushed.clear(); return before_txn_expired.value(); } } @@ -158,7 +171,6 @@ int64_t LockResolver::resolveLocks( if (lock->lock_type != ::kvrpcpb::PessimisticLock && lock->txn_id > caller_start_ts) { log->warning("write conflict detected"); - pushed.clear(); // TODO: throw write conflict exception throw Exception("write conflict", ErrorCodes::UnknownError); } @@ -167,7 +179,6 @@ int64_t LockResolver::resolveLocks( { if (status.action != ::kvrpcpb::MinCommitTSPushed) { - push_fail = true; break; } pushed.push_back(lock->txn_id); @@ -176,10 +187,6 @@ int64_t LockResolver::resolveLocks( break; } } - if (push_fail) - { - pushed.clear(); - } return before_txn_expired.value(); } @@ -580,6 +587,52 @@ TxnStatus LockResolver::getTxnStatusFromLock(Backoffer & bo, LockPtr lock, uint6 } } +void LockResolver::backgroundResolve() +{ + while (!stopped.load()) + { + std::vector>> to_resolve; + { + std::unique_lock lk(bg_mutex); + bg_cv.wait(lk, [this] { + return !pending_locks.empty() || stopped.load(); + }); + if (stopped.load()) + { + return; + } + pending_locks.swap(to_resolve); + } + + for (auto & lock_entry : to_resolve) + { + pingcap::kv::Backoffer bo(pingcap::kv::bgResolveLockMaxBackoff); + try + { + std::vector ignored; + resolveLocks(bo, lock_entry.first, lock_entry.second, ignored); + } + catch (...) + { + // ignore all errors, and do not retry. Let the next reader to resolve it again. + } + } + } +} + +void LockResolver::addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector & locks) +{ + std::unique_lock lk(bg_mutex); + pending_locks.push_back({caller_start_ts, locks}); + bg_cv.notify_one(); +} + +void LockResolver::stopBgResolve() +{ + std::unique_lock lk(bg_mutex); + stopped.store(true); + bg_cv.notify_all(); +} } // namespace kv } // namespace pingcap From 01176900a8b79c488e7d0cdbf64cbc5676a1de56 Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 13 Oct 2025 18:06:54 +0800 Subject: [PATCH 3/6] save work Signed-off-by: xufei --- include/pingcap/kv/LockResolver.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index e453737..6a3cb10 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -11,8 +11,6 @@ #include #include -#include "kvrpcpb.pb.h" - namespace pingcap { namespace kv From 9f18e27852e64724080ee573a38710f97e2fa229 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 14 Oct 2025 10:09:11 +0800 Subject: [PATCH 4/6] save work Signed-off-by: xufei --- include/pingcap/kv/LockResolver.h | 3 ++- src/kv/LockResolver.cc | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index 6a3cb10..b5a2119 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -223,7 +223,8 @@ class LockResolver int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & pushed); - int64_t getBypassLockTs(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); + // tryGetBypassLock checks the status of the transactions which own the locks in `locks`, and collect the txn ids which can be bypassed + int64_t tryGetBypassLock(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); int64_t resolveLocks( Backoffer & bo, diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index ddf725f..39d2497 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -25,7 +25,7 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std return resolveLocks(bo, caller_start_ts, locks, pushed, false); } -int64_t LockResolver::getBypassLockTs( +int64_t LockResolver::tryGetBypassLock( Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, @@ -54,15 +54,21 @@ int64_t LockResolver::getBypassLockTs( if (status.ttl == 0) { + if ((status.primary_lock.has_value() && status.primary_lock->use_async_commit())) + { + // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small + // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + continue; + } if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) { + // the lock can be bypassed if the txn is rolled back or committed after caller_start_ts bypass_lock_ts.push_back(lock_entry.first); } if (status.isRollback() || status.isCommitted()) { // resolve lock in background threads if the status is determined - // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small - // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); } } From c984d8a309051c83b8f4d47dc0b762ccb8217a9e Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 14 Oct 2025 13:46:18 +0800 Subject: [PATCH 5/6] save work Signed-off-by: xufei --- include/pingcap/kv/LockResolver.h | 2 +- src/kv/LockResolver.cc | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index b5a2119..08177e9 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -224,7 +224,7 @@ class LockResolver int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & pushed); // tryGetBypassLock checks the status of the transactions which own the locks in `locks`, and collect the txn ids which can be bypassed - int64_t tryGetBypassLock(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); + void tryGetBypassLock(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); int64_t resolveLocks( Backoffer & bo, diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index 39d2497..436b126 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -25,15 +25,12 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std return resolveLocks(bo, caller_start_ts, locks, pushed, false); } -int64_t LockResolver::tryGetBypassLock( +void LockResolver::tryGetBypassLock( Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts) { - TxnExpireTime before_txn_expired; - if (locks.empty()) - return before_txn_expired.value(); bypass_lock_ts.reserve(locks.size()); for (const auto & lock_entry : locks) { @@ -48,7 +45,7 @@ int64_t LockResolver::tryGetBypassLock( catch (Exception & e) { log->warning("get txn status failed: " + e.displayText()); - // each lock is independent, so we can continue to check other locks + // each txn is independent, so just continue to check other txns continue; } @@ -74,15 +71,13 @@ int64_t LockResolver::tryGetBypassLock( } else // status.ttl != 0 { - auto before_txn_expired_time = cluster->oracle->untilExpired(lock_entry.first, status.ttl); - before_txn_expired.update(before_txn_expired_time); if (status.action == ::kvrpcpb::MinCommitTSPushed) { + // min_commit_ts is pushed, so the lock can be bypassed bypass_lock_ts.push_back(lock_entry.first); } } } - return before_txn_expired.value(); } int64_t LockResolver::resolveLocks( From 9f026afbb7ca4bc98c9feb81ddc5d4a85d9223c9 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 14 Oct 2025 14:11:09 +0800 Subject: [PATCH 6/6] save work Signed-off-by: xufei --- src/kv/LockResolver.cc | 80 +++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index 436b126..e947d67 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -31,53 +31,61 @@ void LockResolver::tryGetBypassLock( const std::unordered_map> & locks, std::vector & bypass_lock_ts) { - bypass_lock_ts.reserve(locks.size()); - for (const auto & lock_entry : locks) + try { - // should not happen, just for safety - if (lock_entry.second.empty()) - continue; - TxnStatus status; - try - { - status = getTxnStatusFromLock(bo, lock_entry.second[0], caller_start_ts, false); - } - catch (Exception & e) - { - log->warning("get txn status failed: " + e.displayText()); - // each txn is independent, so just continue to check other txns - continue; - } - - if (status.ttl == 0) + bypass_lock_ts.reserve(locks.size()); + for (const auto & lock_entry : locks) { - if ((status.primary_lock.has_value() && status.primary_lock->use_async_commit())) - { - // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small - // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read - addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + // should not happen, just for safety + if (lock_entry.second.empty()) continue; + TxnStatus status; + try + { + status = getTxnStatusFromLock(bo, lock_entry.second[0], caller_start_ts, false); } - if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) + catch (Exception & e) { - // the lock can be bypassed if the txn is rolled back or committed after caller_start_ts - bypass_lock_ts.push_back(lock_entry.first); + log->warning("get txn status failed: " + e.displayText()); + // each txn is independent, so just continue to check other txns + continue; } - if (status.isRollback() || status.isCommitted()) + + if (status.ttl == 0) { - // resolve lock in background threads if the status is determined - addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + if ((status.primary_lock.has_value() && status.primary_lock->use_async_commit())) + { + // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small + // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + continue; + } + if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) + { + // the lock can be bypassed if the txn is rolled back or committed after caller_start_ts + bypass_lock_ts.push_back(lock_entry.first); + } + if (status.isRollback() || status.isCommitted()) + { + // resolve lock in background threads if the status is determined + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + } } - } - else // status.ttl != 0 - { - if (status.action == ::kvrpcpb::MinCommitTSPushed) + else // status.ttl != 0 { - // min_commit_ts is pushed, so the lock can be bypassed - bypass_lock_ts.push_back(lock_entry.first); + if (status.action == ::kvrpcpb::MinCommitTSPushed) + { + // min_commit_ts is pushed, so the lock can be bypassed + bypass_lock_ts.push_back(lock_entry.first); + } } } } + catch (...) + { + // tryGetBypassLock is just an optimization, should not throw any exception even fails + log->warning("tryGetBypassLock failed"); + } } int64_t LockResolver::resolveLocks( @@ -615,7 +623,7 @@ void LockResolver::backgroundResolve() } catch (...) { - // ignore all errors, and do not retry. Let the next reader to resolve it again. + // ignore all errors, and do not retry. Let the next reader to trigger resolve again. } } }