Skip to content

Commit 27b34ac

Browse files
solomon: working inflight restriction, retry policy tweaks & quality of life changes (ydb-platform#22358)
1 parent 2c3df8d commit 27b34ac

File tree

17 files changed

+227
-41
lines changed

17 files changed

+227
-41
lines changed

ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
8282
ui64 computeActorBatchSize,
8383
TDuration truePointsFindRange,
8484
ui64 metricsQueueConsumersCountDelta,
85+
ui64 maxInflight,
8586
NActors::TActorId metricsQueueActor,
8687
const ::NMonitoring::TDynamicCounterPtr& counters,
8788
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
@@ -97,6 +98,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
9798
, TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange)
9899
, TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange)
99100
, MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta)
101+
, MaxInflight(maxInflight)
100102
, MetricsQueueActor(metricsQueueActor)
101103
, CredentialsProvider(credentialsProvider)
102104
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
@@ -113,9 +115,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
113115
}
114116
return ERetryErrorClass::NoRetry;
115117
},
116-
TDuration::MilliSeconds(25),
118+
TDuration::MilliSeconds(50),
117119
TDuration::MilliSeconds(200),
118-
TDuration::MilliSeconds(500),
120+
TDuration::MilliSeconds(1000),
119121
10
120122
);
121123

@@ -128,8 +130,14 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
128130
}
129131

130132
void FillSystemColumnPositionIndex() {
133+
YQL_ENSURE(ReadParams.Source.GetLabelNameAliases().size() == ReadParams.Source.GetLabelNames().size());
134+
135+
for (int i = 0; i < ReadParams.Source.GetLabelNameAliases().size(); ++i) {
136+
AliasIndex[ReadParams.Source.GetLabelNameAliases()[i]] = ReadParams.Source.GetLabelNames()[i];
137+
}
138+
131139
std::vector<TString> names(ReadParams.Source.GetSystemColumns().begin(), ReadParams.Source.GetSystemColumns().end());
132-
names.insert(names.end(), ReadParams.Source.GetLabelNames().begin(), ReadParams.Source.GetLabelNames().end());
140+
names.insert(names.end(), ReadParams.Source.GetLabelNameAliases().begin(), ReadParams.Source.GetLabelNameAliases().end());
133141
std::sort(names.begin(), names.end());
134142
size_t index = 0;
135143
for (auto& n : names) {
@@ -244,7 +252,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
244252
ParsePointsCount(metric, pointsCount);
245253
CompletedMetricsCount++;
246254

247-
TryRequestData();
255+
while (TryRequestData()) {}
248256
}
249257

250258
void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
@@ -362,9 +370,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
362370
items[it->second] = dictValue;
363371
}
364372

365-
for (const auto& c : ReadParams.Source.GetLabelNames()) {
373+
for (const auto& c : ReadParams.Source.GetLabelNameAliases()) {
366374
auto& v = items[Index[c]];
367-
auto it = labels.find(c);
375+
auto it = labels.find(AliasIndex[c]);
368376
if (it != labels.end()) {
369377
v = NKikimr::NMiniKQL::MakeString(it->second);
370378
} else {
@@ -432,6 +440,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
432440

433441
bool TryRequestPointsCount() {
434442
TryRequestMetrics();
443+
435444
if (ListedMetrics.empty()) {
436445
return false;
437446
}
@@ -457,12 +466,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
457466
});
458467
}
459468

460-
void TryRequestData() {
469+
bool TryRequestData() {
461470
TryRequestPointsCount();
462-
while (!MetricsWithTimeRange.empty()) {
463-
RequestData();
464-
TryRequestPointsCount();
471+
472+
if (MetricsWithTimeRange.empty()) {
473+
return false;
465474
}
475+
476+
if (CurrentInflight >= MaxInflight) {
477+
return false;
478+
}
479+
480+
RequestData();
481+
return true;
466482
}
467483

468484
void RequestData() {
@@ -471,6 +487,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
471487

472488
auto request = MetricsWithTimeRange.back();
473489
MetricsWithTimeRange.pop_back();
490+
CurrentInflight++;
474491

475492
if (UseMetricsQueue) {
476493
dataRequestFuture = SolomonClient->GetData(request.Selectors, request.From, request.To);
@@ -535,6 +552,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
535552
}
536553

537554
PendingDataRequests_.erase(request);
555+
CurrentInflight--;
538556

539557
if (batch.Response.Status != NSo::EStatus::STATUS_OK) {
540558
TIssues issues { TIssue(batch.Response.Error) };
@@ -566,6 +584,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
566584
const TInstant TrueRangeFrom;
567585
const TInstant TrueRangeTo;
568586
const ui64 MetricsQueueConsumersCountDelta;
587+
const ui64 MaxInflight;
569588
IRetryPolicy<NSo::TGetDataResponse>::TPtr RetryPolicy;
570589

571590
bool UseMetricsQueue;
@@ -583,6 +602,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
583602
ui64 CompletedMetricsCount = 0;
584603
ui64 ListedTimeRanges = 0;
585604
ui64 CompletedTimeRanges = 0;
605+
ui64 CurrentInflight = 0;
586606
const ui64 MaxPointsPerOneRequest = 10000;
587607

588608
TString SourceId;
@@ -591,6 +611,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
591611
TType* DictType = nullptr;
592612
std::vector<size_t> SystemColumnPositionIndex;
593613
THashMap<TString, size_t> Index;
614+
THashMap<TString, TString> AliasIndex;
594615
};
595616

596617

@@ -636,6 +657,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
636657
truePointsFindRange = FromString<ui64>(it->second);
637658
}
638659

660+
ui64 maxInflight = 40;
661+
if (auto it = settings.find("maxApiInflight"); it != settings.end()) {
662+
maxInflight = FromString<ui64>(it->second);
663+
}
664+
639665
auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
640666
auto credentialsProvider = credentialsProviderFactory->CreateProvider();
641667

@@ -650,6 +676,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
650676
computeActorBatchSize,
651677
TDuration::Seconds(truePointsFindRange),
652678
metricsQueueConsumersCountDelta,
679+
maxInflight,
653680
metricsQueueActor,
654681
counters,
655682
credentialsProvider);

ydb/library/yql/providers/solomon/common/util.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ THolder<re2::RE2> CompileRE2WithCheck(const std::string& pattern) {
1717
}
1818

1919
const TString LABEL_NAME_PATTERN = R"( *[a-zA-Z0-9-._/]{1,50} *)";
20-
const TString LABEL_VALUE_PATTERN = R"( *"[ -!#-&(-)+->@-_a-{}-~]{1,200}" *)";
20+
const TString LABEL_VALUE_PATTERN = R"( *"[ -!#-&(-)+->@-_a-{}-~*|-]{1,200}" *)";
2121

2222
const TString SENSOR_NAME_PATTERN = "(" + LABEL_VALUE_PATTERN + ")?({.*})";
2323
THolder<re2::RE2> SENSOR_NAME_RE = CompileRE2WithCheck(SENSOR_NAME_PATTERN);
@@ -28,6 +28,9 @@ THolder<re2::RE2> SELECTOR_RE = CompileRE2WithCheck(SELECTOR_PATTERN);
2828
const TString SELECTORS_FULL_PATTERN = "{((" + SELECTOR_PATTERN + ",)*" + SELECTOR_PATTERN + ")?}";
2929
THolder<re2::RE2> SELECTORS_FULL_RE = CompileRE2WithCheck(SELECTORS_FULL_PATTERN);
3030

31+
const TString USER_LABELS_PATTERN = "(" + LABEL_NAME_PATTERN + ")(?: (?i:as) (" + LABEL_NAME_PATTERN + "))?";
32+
THolder<re2::RE2> USER_LABELS_RE = CompileRE2WithCheck(USER_LABELS_PATTERN);
33+
3134
TMaybe<TString> InsertOrCheck(std::map<TString, TString>& selectors, const TString& name, const TString& value) {
3235
auto [it, inserted] = selectors.emplace(name, value);
3336
if (!inserted && it->second != value) {
@@ -101,6 +104,26 @@ TMaybe<TString> BuildSelectorValues(const NSo::NProto::TDqSolomonSource& source,
101104
return {};
102105
}
103106

107+
TMaybe<TString> ParseLabelNames(const TString& labelNames, TVector<TString>& names, TVector<TString>& aliases) {
108+
auto labels = StringSplitter(labelNames).Split(',').SkipEmpty().ToList<TString>();
109+
names.reserve(labels.size());
110+
aliases.reserve(labels.size());
111+
112+
for (TString& label : labels) {
113+
TString name;
114+
std::optional<TString> alias;
115+
116+
if (!RE2::FullMatch(label, *USER_LABELS_RE, &name, &alias)) {
117+
return "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format";
118+
}
119+
120+
names.push_back(StripString(name));
121+
aliases.push_back(StripString(alias ? *alias : name));
122+
}
123+
124+
return {};
125+
}
126+
104127
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
105128
switch (clusterType) {
106129
case TSolomonClusterConfig::SCT_SOLOMON:

ydb/library/yql/providers/solomon/common/util.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ namespace NYql::NSo {
77

88
TMaybe<TString> ParseSelectorValues(const TString& selectors, std::map<TString, TString>& result);
99
TMaybe<TString> BuildSelectorValues(const NSo::NProto::TDqSolomonSource& source, const TString& selectors, std::map<TString, TString>& result);
10+
11+
TMaybe<TString> ParseLabelNames(const TString& labelNames, TVector<TString>& names, TVector<TString>& aliases);
1012

1113
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType);
1214

ydb/library/yql/providers/solomon/common/util_ut.cpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,98 @@ Y_UNIT_TEST_SUITE(TestSolomonParseSelectors) {
9090
}
9191
}
9292

93+
Y_UNIT_TEST_SUITE(TestSolomonParseLabelNames) {
94+
Y_UNIT_TEST(Basic) {
95+
TString labelNames = "label1, label2";
96+
TVector<TString> names;
97+
TVector<TString> aliases;
98+
99+
TVector<TString> expectedNames = {
100+
"label1", "label2"
101+
};
102+
TVector<TString> expectedAliases = {
103+
"label1", "label2"
104+
};
105+
106+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
107+
UNIT_ASSERT_EQUAL(names, expectedNames);
108+
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
109+
}
110+
111+
Y_UNIT_TEST(WithAliases) {
112+
TString labelNames = "label1 as alias1, label2 as alias2";
113+
TVector<TString> names;
114+
TVector<TString> aliases;
115+
116+
TVector<TString> expectedNames = {
117+
"label1", "label2"
118+
};
119+
TVector<TString> expectedAliases = {
120+
"alias1", "alias2"
121+
};
122+
123+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
124+
UNIT_ASSERT_EQUAL(names, expectedNames);
125+
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
126+
}
127+
128+
Y_UNIT_TEST(OneAlias) {
129+
TString labelNames = "label1, label2 as alias2, label3";
130+
TVector<TString> names;
131+
TVector<TString> aliases;
132+
133+
TVector<TString> expectedNames = {
134+
"label1", "label2", "label3"
135+
};
136+
TVector<TString> expectedAliases = {
137+
"label1", "alias2", "label3"
138+
};
139+
140+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
141+
UNIT_ASSERT_EQUAL(names, expectedNames);
142+
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
143+
}
144+
145+
Y_UNIT_TEST(CaseSensitivity) {
146+
TString labelNames = "label1, label2 AS alias2, label3";
147+
TVector<TString> names;
148+
TVector<TString> aliases;
149+
150+
TVector<TString> expectedNames = {
151+
"label1", "label2", "label3"
152+
};
153+
TVector<TString> expectedAliases = {
154+
"label1", "alias2", "label3"
155+
};
156+
157+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
158+
UNIT_ASSERT_EQUAL(names, expectedNames);
159+
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
160+
}
161+
162+
Y_UNIT_TEST(InvalidLabelName) {
163+
TString labelNames = "{}, {}";
164+
TVector<TString> names;
165+
TVector<TString> aliases;
166+
167+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
168+
}
169+
170+
Y_UNIT_TEST(NoAs) {
171+
TString labelNames = "label1 alias1";
172+
TVector<TString> names;
173+
TVector<TString> aliases;
174+
175+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
176+
}
177+
178+
Y_UNIT_TEST(EmptyAlias) {
179+
TString labelNames = "label1 as, label2";
180+
TVector<TString> names;
181+
TVector<TString> aliases;
182+
183+
UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
184+
}
185+
}
186+
93187
} // namespace NYql::NSo

ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
{"Index": 12, "Name": "DownsamplingFill", "Type": "TCoAtom"},
6464
{"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"},
6565
{"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
66-
{"Index": 15, "Name": "TotalMetricsCount", "Type": "TCoAtom"}
66+
{"Index": 15, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
67+
{"Index": 16, "Name": "LabelNameAliases", "Type": "TCoAtomList"}
6768
]
6869
},
6970
{
@@ -85,10 +86,11 @@
8586
{"Index": 2, "Name": "Object", "Type": "TSoObject"},
8687
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
8788
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
88-
{"Index": 5, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
89-
{"Index": 6, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
90-
{"Index": 7, "Name": "RowType", "Type": "TExprBase"},
91-
{"Index": 8, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
89+
{"Index": 5, "Name": "LabelNameAliases", "Type": "TCoAtomList"},
90+
{"Index": 6, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
91+
{"Index": 7, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
92+
{"Index": 8, "Name": "RowType", "Type": "TExprBase"},
93+
{"Index": 9, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
9294
]
9395
},
9496
{

ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@ message TDqSolomonSource {
6969
string GrpcEndpoint = 17;
7070
optional string Cluster = 18;
7171
optional string Service = 19;
72+
repeated string LabelNameAliases = 20;
7273
}

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
3838
}
3939

4040
TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
41-
if (!EnsureArgsCount(*input, 16, ctx)) {
41+
if (!EnsureArgsCount(*input, 17, ctx)) {
4242
return TStatus::Error;
4343
}
4444

@@ -71,6 +71,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
7171
return TStatus::Error;
7272
}
7373

74+
auto& labelNameAliases = *input->Child(TSoSourceSettings::idx_LabelNameAliases);
75+
if (!EnsureTupleOfAtoms(labelNameAliases, ctx)) {
76+
return TStatus::Error;
77+
}
78+
7479
auto& requiredLabelNames = *input->Child(TSoSourceSettings::idx_RequiredLabelNames);
7580
if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
7681
return TStatus::Error;
@@ -155,7 +160,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
155160
}
156161

157162
TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) {
158-
if (!EnsureMinMaxArgsCount(*input, 8U, 9U, ctx)) {
163+
if (!EnsureMinMaxArgsCount(*input, 9U, 10U, ctx)) {
159164
return TStatus::Error;
160165
}
161166

@@ -177,6 +182,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
177182
return TStatus::Error;
178183
}
179184

185+
auto& labelNameAliases = *input->Child(TSoReadObject::idx_LabelNameAliases);
186+
if (!EnsureTupleOfAtoms(labelNameAliases, ctx)) {
187+
return TStatus::Error;
188+
}
189+
180190
auto& requiredLabelNames = *input->Child(TSoReadObject::idx_RequiredLabelNames);
181191
if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
182192
return TStatus::Error;

0 commit comments

Comments
 (0)