File tree Expand file tree Collapse file tree 9 files changed +39
-11
lines changed
tx/columnshard/engines/reader/simple_reader/duplicates Expand file tree Collapse file tree 9 files changed +39
-11
lines changed Original file line number Diff line number Diff line change @@ -2260,6 +2260,29 @@ void TCompGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSys
22602260 }
22612261}
22622262
2263+ TDeduplicationGroupedMemoryLimiterInitializer::TDeduplicationGroupedMemoryLimiterInitializer (const TKikimrRunConfig& runConfig)
2264+ : IKikimrServicesInitializer(runConfig)
2265+ {
2266+ }
2267+
2268+ void TDeduplicationGroupedMemoryLimiterInitializer::InitializeServices (NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2269+ NOlap::NGroupedMemoryManager::TConfig serviceConfig;
2270+ if (Config.GetDeduplicationGroupedMemoryLimiterConfig ().GetCountBuckets () == 0 ) {
2271+ Config.MutableDeduplicationGroupedMemoryLimiterConfig ()->SetCountBuckets (1 );
2272+ }
2273+ Y_ABORT_UNLESS (serviceConfig.DeserializeFromProto (Config.GetDeduplicationGroupedMemoryLimiterConfig ()));
2274+
2275+ if (serviceConfig.IsEnabled ()) {
2276+ TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters (appData->Counters , " tablets" );
2277+ TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup (" type" , " TX_DEDU_GROUPED_MEMORY_LIMITER" );
2278+
2279+ auto service = NOlap::NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::CreateService (serviceConfig, countersGroup);
2280+
2281+ setup->LocalServices .push_back (std::make_pair (NOlap::NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::MakeServiceId (NodeId),
2282+ TActorSetupCmd (service, TMailboxType::HTSwap, appData->UserPoolId )));
2283+ }
2284+ }
2285+
22632286TCompDiskLimiterInitializer::TCompDiskLimiterInitializer (const TKikimrRunConfig& runConfig)
22642287 : IKikimrServicesInitializer(runConfig) {
22652288}
Original file line number Diff line number Diff line change @@ -415,6 +415,12 @@ class TCompGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
415415 void InitializeServices (NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override ;
416416};
417417
418+ class TDeduplicationGroupedMemoryLimiterInitializer : public IKikimrServicesInitializer {
419+ public:
420+ TDeduplicationGroupedMemoryLimiterInitializer (const TKikimrRunConfig& runConfig);
421+ void InitializeServices (NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override ;
422+ };
423+
418424class TCompPrioritiesInitializer : public IKikimrServicesInitializer {
419425public:
420426 TCompPrioritiesInitializer (const TKikimrRunConfig& runConfig);
Original file line number Diff line number Diff line change @@ -1741,6 +1741,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
17411741 if (serviceMask.EnableGroupedMemoryLimiter ) {
17421742 sil->AddServiceInitializer (new TScanGroupedMemoryLimiterInitializer (runConfig));
17431743 sil->AddServiceInitializer (new TCompGroupedMemoryLimiterInitializer (runConfig));
1744+ sil->AddServiceInitializer (new TDeduplicationGroupedMemoryLimiterInitializer (runConfig));
17441745 }
17451746
17461747 if (serviceMask.EnableCompPriorities ) {
Original file line number Diff line number Diff line change @@ -2523,6 +2523,7 @@ message TAppConfig {
25232523 optional TCompositeConveyorConfig CompositeConveyorConfig = 109 ;
25242524 optional TGeneralCacheConfig PortionsMetadataCache = 110 ;
25252525 optional TGeneralCacheConfig ColumnDataCache = 111 ;
2526+ optional TGroupedMemoryLimiterConfig DeduplicationGroupedMemoryLimiterConfig = 112 ;
25262527}
25272528
25282529message TYdbVersion {
Original file line number Diff line number Diff line change @@ -154,6 +154,7 @@ message TConfigItem {
154154
155155 CompositeConveyorConfigItem = 109 ;
156156 PortionsMetadataCacheItem = 110 ;
157+ DeduGroupedMemoryLimiterConfig = 112 ;
157158
158159 // synthetic kinds for audit purposes only
159160 DatabaseYamlConfigChangeItem = 32767 ;
Original file line number Diff line number Diff line change 44
55namespace NKikimr ::NOlap::NReader::NSimple::NDuplicateFiltering {
66
7- namespace {
8-
9- static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> DeduplicationStageFeatures =
10- NOlap::NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::BuildStageFeatures (" DEFAULT" , 1000000000 );
11-
12- }
13-
147TInternalFilterConstructor::TInternalFilterConstructor (const TEvRequestFilter::TPtr& request, TColumnDataSplitter&& splitter)
158 : OriginalRequest(request)
169 , Intervals(std::move(splitter))
17- , ProcessGuard(NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::BuildProcessGuard({ DeduplicationStageFeatures }))
10+ , ProcessGuard(NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::BuildProcessGuard({}))
1811 , ScopeGuard(ProcessGuard->BuildScopeGuard (1 ))
1912 , GroupGuard(ScopeGuard->BuildGroupGuard ())
2013{
Original file line number Diff line number Diff line change @@ -101,10 +101,10 @@ class TInternalFilterConstructor: TMoveOnly {
101101 return ProcessGuard->GetProcessId ();
102102 }
103103 ui64 GetMemoryScopeId () const {
104- return ScopeGuard->GetProcessId ();
104+ return ScopeGuard->GetScopeId ();
105105 }
106106 ui64 GetMemoryGroupId () const {
107- return GroupGuard->GetProcessId ();
107+ return GroupGuard->GetGroupId ();
108108 }
109109};
110110
Original file line number Diff line number Diff line change @@ -69,7 +69,7 @@ class TColumnDataAllocation: public NGroupedMemoryManager::IAllocation {
6969private:
7070 virtual void DoOnAllocationImpossible (const TString& errorMessage) override {
7171 AFL_VERIFY (Callback);
72- Callback->OnError (errorMessage);
72+ Callback->OnError (TStringBuilder () << " cannot allocate memory: " << errorMessage);
7373 }
7474 virtual bool DoOnAllocated (std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
7575 const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /* allocation*/ ) override {
Original file line number Diff line number Diff line change @@ -165,6 +165,7 @@ def __init__(
165165 enable_resource_pools = None ,
166166 scan_grouped_memory_limiter_config = None ,
167167 comp_grouped_memory_limiter_config = None ,
168+ deduplication_grouped_memory_limiter_config = None ,
168169 query_service_config = None ,
169170 domain_login_only = None ,
170171 use_self_management = False ,
@@ -408,6 +409,8 @@ def __init__(
408409 self .yaml_config ["scan_grouped_memory_limiter_config" ] = scan_grouped_memory_limiter_config
409410 if comp_grouped_memory_limiter_config :
410411 self .yaml_config ["comp_grouped_memory_limiter_config" ] = comp_grouped_memory_limiter_config
412+ if deduplication_grouped_memory_limiter_config :
413+ self .yaml_config ["deduplication_grouped_memory_limiter_config" ] = deduplication_grouped_memory_limiter_config
411414
412415 self .__build ()
413416
You can’t perform that action at this time.
0 commit comments