@@ -80,7 +80,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
8080 NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
8181 TDqSolomonReadParams&& readParams,
8282 ui64 computeActorBatchSize,
83+ TDuration truePointsFindRange,
8384 ui64 metricsQueueConsumersCountDelta,
85+ ui64 maxInflight,
8486 NActors::TActorId metricsQueueActor,
8587 const ::NMonitoring::TDynamicCounterPtr& counters,
8688 std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
@@ -93,7 +95,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
9395 , LogPrefix(TStringBuilder() << " TxId: " << TxId << " , TDqSolomonReadActor: " )
9496 , ReadParams(std::move(readParams))
9597 , ComputeActorBatchSize(computeActorBatchSize)
98+ , TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange)
99+ , TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange)
96100 , MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta)
101+ , MaxInflight(maxInflight)
97102 , MetricsQueueActor(metricsQueueActor)
98103 , CredentialsProvider(credentialsProvider)
99104 , SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
@@ -110,9 +115,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
110115 }
111116 return ERetryErrorClass::NoRetry;
112117 },
113- TDuration::MilliSeconds (25 ),
118+ TDuration::MilliSeconds (50 ),
114119 TDuration::MilliSeconds (200 ),
115- TDuration::MilliSeconds (500 ),
120+ TDuration::MilliSeconds (1000 ),
116121 10
117122 );
118123
@@ -125,8 +130,14 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
125130 }
126131
127132 void FillSystemColumnPositionIndex () {
133+ YQL_ENSURE (ReadParams.Source .GetLabelNameAliases ().size () == ReadParams.Source .GetLabelNames ().size ());
134+
135+ for (int i = 0 ; i < ReadParams.Source .GetLabelNameAliases ().size (); ++i) {
136+ AliasIndex[ReadParams.Source .GetLabelNameAliases ()[i]] = ReadParams.Source .GetLabelNames ()[i];
137+ }
138+
128139 std::vector<TString> names (ReadParams.Source .GetSystemColumns ().begin (), ReadParams.Source .GetSystemColumns ().end ());
129- names.insert (names.end (), ReadParams.Source .GetLabelNames ().begin (), ReadParams.Source .GetLabelNames ().end ());
140+ names.insert (names.end (), ReadParams.Source .GetLabelNameAliases ().begin (), ReadParams.Source .GetLabelNameAliases ().end ());
130141 std::sort (names.begin (), names.end ());
131142 size_t index = 0 ;
132143 for (auto & n : names) {
@@ -241,7 +252,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
241252 ParsePointsCount (metric, pointsCount);
242253 CompletedMetricsCount++;
243254
244- TryRequestData ();
255+ while ( TryRequestData ()) {}
245256 }
246257
247258 void HandleNewDataBatch (TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
@@ -317,6 +328,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
317328 YQL_ENSURE (!buffer.IsWide (), " Wide stream is not supported" );
318329 SOURCE_LOG_D (" GetAsyncInputData sending " << MetricsData.size () << " metrics, finished = " << LastMetricProcessed ());
319330
331+ TInstant from = TInstant::Seconds (ReadParams.Source .GetFrom ());
332+ TInstant to = TInstant::Seconds (ReadParams.Source .GetTo ());
333+
320334 for (const auto & data : MetricsData) {
321335 auto & labels = data.Metric .Labels ;
322336
@@ -331,6 +345,11 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
331345 auto & type = data.Metric .Type ;
332346
333347 for (size_t i = 0 ; i < timestamps.size (); ++i){
348+ TInstant timestamp = TInstant::MilliSeconds (timestamps[i]);
349+ if (timestamp < from || timestamp > to) {
350+ continue ;
351+ }
352+
334353 NUdf::TUnboxedValue* items = nullptr ;
335354 auto value = HolderFactory.CreateDirectArrayHolder (ReadParams.Source .GetSystemColumns ().size () + ReadParams.Source .GetLabelNames ().size (), items);
336355
@@ -351,9 +370,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
351370 items[it->second ] = dictValue;
352371 }
353372
354- for (const auto & c : ReadParams.Source .GetLabelNames ()) {
373+ for (const auto & c : ReadParams.Source .GetLabelNameAliases ()) {
355374 auto & v = items[Index[c]];
356- auto it = labels.find (c );
375+ auto it = labels.find (AliasIndex[c] );
357376 if (it != labels.end ()) {
358377 v = NKikimr::NMiniKQL::MakeString (it->second );
359378 } else {
@@ -386,7 +405,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
386405private:
387406 // IActor & IDqComputeActorAsyncInput
388407 void PassAway () override { // Is called from Compute Actor
389- SOURCE_LOG_I (" PassAway, processed " << CompletedMetricsCount << " metrics." );
408+ SOURCE_LOG_I (" PassAway, processed " << CompletedMetricsCount << " metrics, " << CompletedTimeRanges << " time ranges ." );
390409 if (UseMetricsQueue) {
391410 MetricsQueueEvents.Unsubscribe ();
392411 }
@@ -421,6 +440,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
421440
422441 bool TryRequestPointsCount () {
423442 TryRequestMetrics ();
443+
424444 if (ListedMetrics.empty ()) {
425445 return false ;
426446 }
@@ -433,7 +453,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
433453 NSo::TMetric requestMetric = ListedMetrics.back ();
434454 ListedMetrics.pop_back ();
435455
436- auto getPointsCountFuture = SolomonClient->GetPointsCount (requestMetric.Labels );
456+ auto getPointsCountFuture = SolomonClient->GetPointsCount (requestMetric.Labels , TrueRangeFrom, TrueRangeTo );
437457
438458 NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem ();
439459 getPointsCountFuture.Subscribe ([actorSystem, metric = std::move (requestMetric), selfId = SelfId ()](
@@ -446,12 +466,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
446466 });
447467 }
448468
449- void TryRequestData () {
469+ bool TryRequestData () {
450470 TryRequestPointsCount ();
451- while (!MetricsWithTimeRange. empty ()) {
452- RequestData ();
453- TryRequestPointsCount () ;
471+
472+ if (MetricsWithTimeRange. empty ()) {
473+ return false ;
454474 }
475+
476+ if (CurrentInflight >= MaxInflight) {
477+ return false ;
478+ }
479+
480+ RequestData ();
481+ return true ;
455482 }
456483
457484 void RequestData () {
@@ -460,6 +487,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
460487
461488 auto request = MetricsWithTimeRange.back ();
462489 MetricsWithTimeRange.pop_back ();
490+ CurrentInflight++;
463491
464492 if (UseMetricsQueue) {
465493 dataRequestFuture = SolomonClient->GetData (request.Selectors , request.From , request.To );
@@ -480,18 +508,18 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
480508 }
481509
482510 void ParsePointsCount (const NSo::TMetric& metric, ui64 pointsCount) {
483- TInstant from = TInstant::Seconds (ReadParams.Source .GetFrom ());
484- TInstant to = TInstant::Seconds (ReadParams.Source .GetTo ());
485-
486- auto ranges = SplitTimeIntervalIntoRanges (from, to, pointsCount);
511+ auto ranges = SplitTimeIntervalIntoRanges (pointsCount);
487512
488513 for (const auto & [fromRange, toRange] : ranges) {
489514 MetricsWithTimeRange.emplace_back (metric.Labels , " " , fromRange, toRange);
490515 }
491516 ListedTimeRanges += ranges.size ();
492517 }
493518
494- std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges (TInstant from, TInstant to, ui64 pointsCount) const {
519+ std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges (ui64 pointsCount) const {
520+ TInstant from = TrueRangeFrom;
521+ TInstant to = TrueRangeTo;
522+
495523 std::vector<std::pair<TInstant, TInstant>> result;
496524 if (pointsCount == 0 ) {
497525 return result;
@@ -524,6 +552,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
524552 }
525553
526554 PendingDataRequests_.erase (request);
555+ CurrentInflight--;
527556
528557 if (batch.Response .Status != NSo::EStatus::STATUS_OK) {
529558 TIssues issues { TIssue (batch.Response .Error ) };
@@ -552,7 +581,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
552581 const TString LogPrefix;
553582 const TDqSolomonReadParams ReadParams;
554583 const ui64 ComputeActorBatchSize;
584+ const TInstant TrueRangeFrom;
585+ const TInstant TrueRangeTo;
555586 const ui64 MetricsQueueConsumersCountDelta;
587+ const ui64 MaxInflight;
556588 IRetryPolicy<NSo::TGetDataResponse>::TPtr RetryPolicy;
557589
558590 bool UseMetricsQueue;
@@ -570,6 +602,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
570602 ui64 CompletedMetricsCount = 0 ;
571603 ui64 ListedTimeRanges = 0 ;
572604 ui64 CompletedTimeRanges = 0 ;
605+ ui64 CurrentInflight = 0 ;
573606 const ui64 MaxPointsPerOneRequest = 10000 ;
574607
575608 TString SourceId;
@@ -578,6 +611,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
578611 TType* DictType = nullptr ;
579612 std::vector<size_t > SystemColumnPositionIndex;
580613 THashMap<TString, size_t > Index;
614+ THashMap<TString, TString> AliasIndex;
581615};
582616
583617
@@ -618,6 +652,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
618652 computeActorBatchSize = FromString<ui64>(it->second );
619653 }
620654
655+ ui64 truePointsFindRange = 301 ;
656+ if (auto it = settings.find (" truePointsFindRange" ); it != settings.end ()) {
657+ truePointsFindRange = FromString<ui64>(it->second );
658+ }
659+
660+ ui64 maxInflight = 40 ;
661+ if (auto it = settings.find (" maxApiInflight" ); it != settings.end ()) {
662+ maxInflight = FromString<ui64>(it->second );
663+ }
664+
621665 auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken (credentialsFactory, token);
622666 auto credentialsProvider = credentialsProviderFactory->CreateProvider ();
623667
@@ -630,7 +674,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
630674 programBuilder,
631675 std::move (params),
632676 computeActorBatchSize,
677+ TDuration::Seconds (truePointsFindRange),
633678 metricsQueueConsumersCountDelta,
679+ maxInflight,
634680 metricsQueueActor,
635681 counters,
636682 credentialsProvider);
0 commit comments