Skip to content

Commit 46ec0a5

Browse files
authored
allow queriers to execute from logical query plans (#6929)
Signed-off-by: rubywtl <[email protected]>
1 parent 47efa80 commit 46ec0a5

File tree

14 files changed

+534
-48
lines changed

14 files changed

+534
-48
lines changed

integration/querier_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,3 +1375,78 @@ func TestQuerierEngineConfigs(t *testing.T) {
13751375
}
13761376

13771377
}
1378+
1379+
func TestQuerierDistributedExecution(t *testing.T) {
1380+
// e2e test setup
1381+
s, err := e2e.NewScenario(networkName)
1382+
require.NoError(t, err)
1383+
defer s.Close()
1384+
1385+
// initialize the flags
1386+
flags := mergeFlags(
1387+
BlocksStorageFlags(),
1388+
map[string]string{
1389+
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
1390+
"-blocks-storage.tsdb.ship-interval": "1s",
1391+
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
1392+
"-querier.thanos-engine": "true",
1393+
// enable distributed execution (logical plan execution)
1394+
"-querier.distributed-exec-enabled": "true",
1395+
},
1396+
)
1397+
1398+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1399+
consul := e2edb.NewConsul()
1400+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1401+
1402+
// start services
1403+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1404+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1405+
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
1406+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1407+
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
1408+
flags = mergeFlags(flags, map[string]string{
1409+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
1410+
})
1411+
1412+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags, map[string]string{
1413+
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1414+
}), "")
1415+
require.NoError(t, s.Start(queryFrontend))
1416+
1417+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1418+
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
1419+
}), "")
1420+
require.NoError(t, s.StartAndWaitReady(querier))
1421+
1422+
// wait until the distributor and querier has updated the ring.
1423+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1424+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
1425+
1426+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1427+
require.NoError(t, err)
1428+
1429+
series1Timestamp := time.Now()
1430+
series2Timestamp := series1Timestamp.Add(time.Minute * 1)
1431+
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
1432+
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
1433+
1434+
res, err := c.Push(series1)
1435+
require.NoError(t, err)
1436+
require.Equal(t, 200, res.StatusCode)
1437+
1438+
res, err = c.Push(series2)
1439+
require.NoError(t, err)
1440+
require.Equal(t, 200, res.StatusCode)
1441+
1442+
// main tests
1443+
// - make sure queries are still executable with distributed execution enabled
1444+
var val model.Value
1445+
val, err = c.Query("series_1", series1Timestamp)
1446+
require.NoError(t, err)
1447+
require.Equal(t, expectedVector1, val.(model.Vector))
1448+
1449+
val, err = c.Query("series_2", series2Timestamp)
1450+
require.NoError(t, err)
1451+
require.Equal(t, expectedVector2, val.(model.Vector))
1452+
}

pkg/api/handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import (
1919
"github.com/prometheus/common/route"
2020
"github.com/prometheus/common/version"
2121
"github.com/prometheus/prometheus/config"
22-
"github.com/prometheus/prometheus/promql"
2322
"github.com/prometheus/prometheus/storage"
2423
v1 "github.com/prometheus/prometheus/web/api/v1"
2524
"github.com/weaveworks/common/instrument"
2625
"github.com/weaveworks/common/middleware"
2726

2827
"github.com/cortexproject/cortex/pkg/api/queryapi"
28+
"github.com/cortexproject/cortex/pkg/engine"
2929
"github.com/cortexproject/cortex/pkg/querier"
3030
"github.com/cortexproject/cortex/pkg/querier/codec"
3131
"github.com/cortexproject/cortex/pkg/querier/stats"
@@ -164,7 +164,7 @@ func NewQuerierHandler(
164164
querierCfg querier.Config,
165165
queryable storage.SampleAndChunkQueryable,
166166
exemplarQueryable storage.ExemplarQueryable,
167-
engine promql.QueryEngine,
167+
engine engine.QueryEngine,
168168
metadataQuerier querier.MetadataQuerier,
169169
reg prometheus.Registerer,
170170
logger log.Logger,

pkg/api/queryapi/query_api.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/prometheus/util/annotations"
1717
"github.com/prometheus/prometheus/util/httputil"
1818
v1 "github.com/prometheus/prometheus/web/api/v1"
19+
"github.com/thanos-io/promql-engine/logicalplan"
1920
"github.com/weaveworks/common/httpgrpc"
2021

2122
"github.com/cortexproject/cortex/pkg/engine"
@@ -26,7 +27,7 @@ import (
2627

2728
type QueryAPI struct {
2829
queryable storage.SampleAndChunkQueryable
29-
queryEngine promql.QueryEngine
30+
queryEngine engine.QueryEngine
3031
now func() time.Time
3132
statsRenderer v1.StatsRenderer
3233
logger log.Logger
@@ -35,7 +36,7 @@ type QueryAPI struct {
3536
}
3637

3738
func NewQueryAPI(
38-
qe promql.QueryEngine,
39+
qe engine.QueryEngine,
3940
q storage.SampleAndChunkQueryable,
4041
statsRenderer v1.StatsRenderer,
4142
logger log.Logger,
@@ -101,10 +102,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
101102

102103
ctx = engine.AddEngineTypeToContext(ctx, r)
103104
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
104-
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
105-
if err != nil {
106-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
105+
106+
var qry promql.Query
107+
startTime := convertMsToTime(start)
108+
endTime := convertMsToTime(end)
109+
stepDuration := convertMsToDuration(step)
110+
111+
byteLP := []byte(r.PostFormValue("plan"))
112+
if len(byteLP) != 0 {
113+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
114+
if err != nil {
115+
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
116+
}
117+
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
118+
if err != nil {
119+
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
120+
}
121+
} else { // if there is logical plan field is empty, fall back
122+
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
123+
if err != nil {
124+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
125+
}
107126
}
127+
108128
// From now on, we must only return with a finalizer in the result (to
109129
// be called by the caller) or call qry.Close ourselves (which is
110130
// required in the case of a panic).
@@ -157,9 +177,25 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
157177

158178
ctx = engine.AddEngineTypeToContext(ctx, r)
159179
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
160-
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
161-
if err != nil {
162-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
180+
181+
var qry promql.Query
182+
tsTime := convertMsToTime(ts)
183+
184+
byteLP := []byte(r.PostFormValue("plan"))
185+
if len(byteLP) != 0 {
186+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
187+
if err != nil {
188+
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
189+
}
190+
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
191+
if err != nil {
192+
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil}
193+
}
194+
} else { // if there is logical plan field is empty, fall back
195+
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
196+
if err != nil {
197+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
198+
}
163199
}
164200

165201
// From now on, we must only return with a finalizer in the result (to

0 commit comments

Comments
 (0)