Skip to content
Open
4 changes: 4 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,10 @@ struct TEvBlobStorage {
EvNodeWardenReadMetadataResult,
EvNodeWardenWriteMetadata,
EvNodeWardenWriteMetadataResult,
EvNodeWardenUpdateCache,
EvNodeWardenQueryCache,
EvNodeWardenQueryCacheResult,
EvNodeWardenUnsubscribeFromCache,

// Other
EvRunActor = EvPut + 15 * 512,
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/blobstorage/base/blobstorage_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

namespace NKikimr {

TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(const NKikimrBlobStorage::TStorageConfig& config,
const NKikimrBlobStorage::TStorageConfig *proposedConfig, bool selfManagementEnabled)
: Config(std::make_unique<NKikimrBlobStorage::TStorageConfig>(config))
, ProposedConfig(proposedConfig
? std::make_unique<NKikimrBlobStorage::TStorageConfig>(*proposedConfig)
: nullptr)
TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled)
: Config(std::move(config))
, ProposedConfig(std::move(proposedConfig))
, SelfManagementEnabled(selfManagementEnabled)
{}

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/blobstorage/base/blobstorage_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,12 @@ namespace NKikimr {
struct TEvNodeWardenStorageConfig
: TEventLocal<TEvNodeWardenStorageConfig, TEvBlobStorage::EvNodeWardenStorageConfig>
{
std::unique_ptr<NKikimrBlobStorage::TStorageConfig> Config;
std::unique_ptr<NKikimrBlobStorage::TStorageConfig> ProposedConfig;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> Config;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> ProposedConfig;
bool SelfManagementEnabled;

TEvNodeWardenStorageConfig(const NKikimrBlobStorage::TStorageConfig& config,
const NKikimrBlobStorage::TStorageConfig *proposedConfig, bool selfManagementEnabled);
TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled);
~TEvNodeWardenStorageConfig();
};

Expand Down
34 changes: 16 additions & 18 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
namespace NKikimr::NStorage {

TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg,
const NKikimrBlobStorage::TStorageConfig& baseConfig, bool isSelfStatic)
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> baseConfig, bool isSelfStatic)
: IsSelfStatic(isSelfStatic)
, Cfg(std::move(cfg))
, BaseConfig(baseConfig)
, InitialConfig(baseConfig)
{
UpdateFingerprint(&BaseConfig);
InitialConfig.SetFingerprint(BaseConfig.GetFingerprint());
}
, InitialConfig(std::move(baseConfig))
{}

void TDistributedConfigKeeper::Bootstrap() {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
Expand All @@ -34,9 +31,9 @@ namespace NKikimr::NStorage {

// generate initial drive set and query stored configuration
if (IsSelfStatic) {
if (BaseConfig.GetSelfManagementConfig().GetEnabled()) {
if (BaseConfig->GetSelfManagementConfig().GetEnabled()) {
// read this only if it is possibly enabled
EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
EnumerateConfigDrives(*InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
DrivesToRead.push_back(drive.GetPath());
});
std::sort(DrivesToRead.begin(), DrivesToRead.end());
Expand Down Expand Up @@ -97,11 +94,11 @@ namespace NKikimr::NStorage {
}
}

SelfManagementEnabled = (!IsSelfStatic || BaseConfig.GetSelfManagementConfig().GetEnabled()) &&
SelfManagementEnabled = (!IsSelfStatic || BaseConfig->GetSelfManagementConfig().GetEnabled()) &&
config.GetSelfManagementConfig().GetEnabled() &&
config.GetGeneration();

StorageConfig.emplace(config);
StorageConfig = std::make_shared<NKikimrBlobStorage::TStorageConfig>(config);
if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() <= StorageConfig->GetGeneration()) {
ProposedStorageConfig.reset();
}
Expand Down Expand Up @@ -232,8 +229,8 @@ namespace NKikimr::NStorage {

Y_ABORT_UNLESS(!StorageConfig || CheckFingerprint(*StorageConfig));
Y_ABORT_UNLESS(!ProposedStorageConfig || CheckFingerprint(*ProposedStorageConfig));
Y_ABORT_UNLESS(CheckFingerprint(BaseConfig));
Y_ABORT_UNLESS(!InitialConfig.GetFingerprint() || CheckFingerprint(InitialConfig));
Y_ABORT_UNLESS(CheckFingerprint(*BaseConfig));
Y_ABORT_UNLESS(!InitialConfig->GetFingerprint() || CheckFingerprint(*InitialConfig));

if (Scepter) {
Y_ABORT_UNLESS(HasQuorum());
Expand Down Expand Up @@ -302,13 +299,11 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden(ui64 cookie) {
Y_ABORT_UNLESS(StorageConfig);
const TActorId wardenId = MakeBlobStorageNodeWardenID(SelfId().NodeId());
const NKikimrBlobStorage::TStorageConfig *config = SelfManagementEnabled
? &StorageConfig.value()
: &BaseConfig;
const NKikimrBlobStorage::TStorageConfig *proposedConfig = ProposedStorageConfig && SelfManagementEnabled
? &ProposedStorageConfig.value()
const auto& config = SelfManagementEnabled ? StorageConfig : BaseConfig;
auto proposedConfig = ProposedStorageConfig && SelfManagementEnabled
? std::make_shared<NKikimrBlobStorage::TStorageConfig>(*ProposedStorageConfig)
: nullptr;
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(*config, proposedConfig, SelfManagementEnabled);
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, std::move(proposedConfig), SelfManagementEnabled);
Send(wardenId, ev.release(), 0, cookie);
}

Expand Down Expand Up @@ -346,6 +341,9 @@ namespace NKikimr::NStorage {
hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerProposeConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerConsoleCommitResponse, Handle);
hFunc(TEvNodeWardenUpdateCache, Handle);
hFunc(TEvNodeWardenQueryCache, Handle);
hFunc(TEvNodeWardenUnsubscribeFromCache, Handle);
)
for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) {
UnsubscribeInterconnect(nodeId);
Expand Down
32 changes: 26 additions & 6 deletions ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,18 @@ namespace NKikimr::NStorage {
bool SelfManagementEnabled = false;

// currently active storage config
std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> StorageConfig;
TString MainConfigYaml; // the part we have to push (unless this is storage-only) to console
std::optional<ui64> MainConfigYamlVersion;
TString MainConfigFetchYaml; // the part we would get is we fetch from console
ui64 MainConfigFetchYamlHash = 0;
std::optional<TString> StorageConfigYaml; // set if dedicated storage yaml is enabled; otherwise nullopt

// base config from config file
NKikimrBlobStorage::TStorageConfig BaseConfig;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> BaseConfig;

// initial config based on config file and stored committed configs
NKikimrBlobStorage::TStorageConfig InitialConfig;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> InitialConfig;
std::vector<TString> DrivesToRead;

// proposed storage configuration of the cluster
Expand Down Expand Up @@ -279,15 +279,23 @@ namespace NKikimr::NStorage {
std::optional<std::tuple<ui64, ui32>> ProposedConfigHashVersion;
std::vector<std::tuple<TActorId, TString, ui64>> ConsoleConfigValidationQ;

// cache subsystem
struct TCacheItem {
ui32 Generation; // item generation
std::optional<TString> Value; // item binary value
};
THashMap<TString, TCacheItem> Cache;
std::set<std::tuple<TString, TActorId>> CacheSubscriptions;

friend void ::Out<ERootState>(IOutputStream&, ERootState);

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::NODEWARDEN_DISTRIBUTED_CONFIG;
}

TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg, const NKikimrBlobStorage::TStorageConfig& baseConfig,
bool isSelfStatic);
TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg,
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> baseConfig, bool isSelfStatic);

void Bootstrap();
void PassAway() override;
Expand Down Expand Up @@ -430,7 +438,7 @@ namespace NKikimr::NStorage {
void ApplyConfigUpdateToDynamicNodes(bool drop);
void OnDynamicNodeDisconnected(ui32 nodeId, TActorId sessionId);
void HandleDynamicConfigSubscribe(STATEFN_SIG);
void PushConfigToDynamicNode(TActorId actorId, TActorId sessionId);
void PushConfigToDynamicNode(TActorId actorId, TActorId sessionId, bool addCache);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Event delivery
Expand All @@ -445,6 +453,18 @@ namespace NKikimr::NStorage {

void Handle(NMon::TEvHttpInfo::TPtr ev);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Cache update

void ApplyCacheUpdates(NKikimrBlobStorage::TCacheUpdate *cacheUpdate, ui32 senderNodeId);

void AddCacheUpdate(NKikimrBlobStorage::TCacheUpdate *cacheUpdate, THashMap<TString, TCacheItem>::const_iterator it,
bool addValue);

void Handle(TEvNodeWardenUpdateCache::TPtr ev);
void Handle(TEvNodeWardenQueryCache::TPtr ev);
void Handle(TEvNodeWardenUnsubscribeFromCache::TPtr ev);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Console interaction

Expand Down
55 changes: 52 additions & 3 deletions ydb/core/blobstorage/nodewarden/distconf_binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ namespace NKikimr::NStorage {
boundNode->MutableMeta()->CopyFrom(node.Configs.back());
}

for (auto it = Cache.begin(); it != Cache.end(); ++it) {
AddCacheUpdate(record.MutableCacheUpdate(), it, false);
}

SendEvent(*Binding, std::move(ev));
}

Expand Down Expand Up @@ -313,7 +317,23 @@ namespace NKikimr::NStorage {
} else if (prevRootNodeId != GetRootNodeId() || configUpdate) {
STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId),
(ConfigUpdate, configUpdate));
FanOutReversePush(configUpdate ? &StorageConfig.value() : nullptr, record.GetRecurseConfigUpdate());
FanOutReversePush(configUpdate ? StorageConfig.get() : nullptr, record.GetRecurseConfigUpdate());
}
}

// process cache updates, if needed
if (record.HasCacheUpdate()) {
auto *cacheUpdate = record.MutableCacheUpdate();
ApplyCacheUpdates(cacheUpdate, senderNodeId);

if (cacheUpdate->RequestedKeysSize()) {
auto ev = std::make_unique<TEvNodeConfigPush>();
for (const TString& key : cacheUpdate->GetRequestedKeys()) {
if (const auto it = Cache.find(key); it != Cache.end()) {
AddCacheUpdate(ev->Record.MutableCacheUpdate(), it, true);
}
}
SendEvent(*Binding, std::move(ev));
}
}
}
Expand Down Expand Up @@ -425,8 +445,32 @@ namespace NKikimr::NStorage {
const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
TBoundNode& info = it->second;
if (inserted) {
SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(),
StorageConfig ? &StorageConfig.value() : nullptr, false));
auto response = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), StorageConfig.get(), false);
if (record.GetInitial()) {
auto *cache = record.MutableCacheUpdate();

// scan existing cache keys to find the ones we need to ask and to report others
THashSet<TString> keysToReport;
for (const auto& [key, value] : Cache) {
keysToReport.insert(key);
}
for (const auto& item : cache->GetKeyValuePairs()) {
keysToReport.erase(item.GetKey());
const auto it = Cache.find(item.GetKey());
if (it == Cache.end() || it->second.Generation < item.GetGeneration()) {
response->Record.MutableCacheUpdate()->AddRequestedKeys(item.GetKey());
} else if (item.GetGeneration() < it->second.Generation) {
AddCacheUpdate(response->Record.MutableCacheUpdate(), it, true);
}
}
for (const TString& key : keysToReport) {
const auto it = Cache.find(key);
Y_ABORT_UNLESS(it != Cache.end());
AddCacheUpdate(response->Record.MutableCacheUpdate(), it, true);
}
}

SendEvent(senderNodeId, info, std::move(response));
for (auto& [cookie, task] : ScatterTasks) {
IssueScatterTaskForNode(senderNodeId, info, cookie, task);
}
Expand Down Expand Up @@ -465,6 +509,11 @@ namespace NKikimr::NStorage {
}
}

// process cache items
if (!record.GetInitial() && record.HasCacheUpdate()) {
ApplyCacheUpdates(record.MutableCacheUpdate(), senderNodeId);
}

if (pushEv && pushEv->IsUseful()) {
SendEvent(*Binding, std::move(pushEv));
}
Expand Down
85 changes: 85 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "distconf.h"

namespace NKikimr::NStorage {

void TDistributedConfigKeeper::ApplyCacheUpdates(NKikimrBlobStorage::TCacheUpdate *cacheUpdate, ui32 senderNodeId) {
NKikimrBlobStorage::TCacheUpdate updates;

for (const auto& item : cacheUpdate->GetKeyValuePairs()) {
const auto [it, inserted] = Cache.try_emplace(item.GetKey());
TCacheItem& cacheItem = it->second;
auto newValue = item.HasValue() ? std::make_optional(item.GetValue()) : std::nullopt;
if (inserted || cacheItem.Generation < item.GetGeneration()) {
cacheItem.Generation = item.GetGeneration();
cacheItem.Value = std::move(newValue);
AddCacheUpdate(&updates, it, true);

// notify subscribers
for (auto pos = CacheSubscriptions.lower_bound(std::make_tuple(it->first, TActorId()));
pos != CacheSubscriptions.end() && std::get<0>(*pos) == it->first; ++pos) {
const auto& [key, actorId] = *pos;
Send(actorId, new TEvNodeWardenQueryCacheResult(item.GetKey(), item.HasValue()
? std::make_optional(std::make_tuple(item.GetGeneration(), item.GetValue()))
: std::nullopt));
}
} else if (cacheItem.Generation == item.GetGeneration()) {
Y_DEBUG_ABORT_UNLESS(cacheItem.Value == newValue);
}
}

if (!IsSelfStatic) {
return; // nothing to do for dynamic nodes
}

// propagate forward, if bound
if (Binding && Binding->SessionId && Binding->NodeId != senderNodeId) {
auto ev = std::make_unique<TEvNodeConfigPush>();
ev->Record.MutableCacheUpdate()->CopyFrom(updates);
SendEvent(*Binding, std::move(ev));
}
// propagate backwards
for (const auto& [nodeId, info] : DirectBoundNodes) {
auto ev = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr, false);
ev->Record.MutableCacheUpdate()->CopyFrom(updates);
SendEvent(nodeId, info, std::move(ev));
}
// propagate to connected dynamic nodes
for (const auto& [sessionId, actorId] : DynamicConfigSubscribers) {
auto ev = std::make_unique<TEvNodeWardenDynamicConfigPush>();
ev->Record.MutableCacheUpdate()->CopyFrom(updates);
auto handle = std::make_unique<IEventHandle>(actorId, SelfId(), ev.release());
handle->Rewrite(TEvInterconnect::EvForward, sessionId);
TActivationContext::Send(handle.release());
}
}

void TDistributedConfigKeeper::AddCacheUpdate(NKikimrBlobStorage::TCacheUpdate *cacheUpdate,
THashMap<TString, TCacheItem>::const_iterator it, bool addValue) {
auto *kvp = cacheUpdate->AddKeyValuePairs();
kvp->SetKey(it->first);
kvp->SetGeneration(it->second.Generation);
if (const auto& value = it->second.Value; value && addValue) {
kvp->SetValue(*value);
}
}

void TDistributedConfigKeeper::Handle(TEvNodeWardenUpdateCache::TPtr ev) {
ApplyCacheUpdates(&ev->Get()->CacheUpdate, 0);
}

void TDistributedConfigKeeper::Handle(TEvNodeWardenQueryCache::TPtr ev) {
auto& msg = *ev->Get();
if (msg.Subscribe) {
CacheSubscriptions.emplace(msg.Key, ev->Sender);
}
const auto it = Cache.find(msg.Key);
Send(ev->Sender, new TEvNodeWardenQueryCacheResult(msg.Key, it != Cache.end() && it->second.Value
? std::make_optional(std::make_tuple(it->second.Generation, *it->second.Value))
: std::nullopt));
}

void TDistributedConfigKeeper::Handle(TEvNodeWardenUnsubscribeFromCache::TPtr ev) {
CacheSubscriptions.erase(std::make_tuple(ev->Get()->Key, ev->Sender));
}

} // NKikimr::NStorage
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/nodewarden/distconf_console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ namespace NKikimr::NStorage {
if (!StorageConfig || !StorageConfig->HasConfigComposite() || ProposedConfigHashVersion !=
std::make_tuple(MainConfigFetchYamlHash, *MainConfigYamlVersion)) {
const char *err = "proposed config, but something has gone awfully wrong";
STLOG(PRI_CRIT, BS_NODE, NWDC69, err, (StorageConfig, StorageConfig),
STLOG(PRI_CRIT, BS_NODE, NWDC69, err, (StorageConfig, StorageConfig.get()),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(MainConfigFetchYamlHash, MainConfigFetchYamlHash),
(MainConfigYamlVersion, MainConfigYamlVersion));
Expand Down
Loading
Loading