Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880
* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025
* [BUGFIX] Distributor: Fix the `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026

## 1.19.0 2025-02-27

Expand Down
50 changes: 50 additions & 0 deletions integration/api_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"net/http"
"path/filepath"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

Expand Down Expand Up @@ -85,3 +88,50 @@ func TestConfigAPIEndpoint(t *testing.T) {
cortex2 := e2ecortex.NewSingleBinaryWithConfigFile("cortex-2", cortexConfigFile, configOverrides, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex2))
}

func Test_AllUserStats_WhenIngesterRollingUpdate(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.replication-factor"] = "3"
flags["-distributor.sharding-strategy"] = "shuffle-sharding"
flags["-distributor.ingestion-tenant-shard-size"] = "3"
flags["-distributor.shard-by-all-labels"] = "true"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// stop ingester1 to emulate rolling update
require.NoError(t, s.Stop(ingester1))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

now := time.Now()
series, _ := generateSeries("series_1", now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// QueriedIngesters is 2 since ingester1 has been stopped.
userStats, err := client.AllUserStats()
require.NoError(t, err)
require.Len(t, userStats, 1)
require.Equal(t, uint64(2), userStats[0].QueriedIngesters)
}
35 changes: 35 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
)
Expand Down Expand Up @@ -115,6 +116,40 @@ func NewPromQueryClient(address string) (*Client, error) {
return c, nil
}

func (c *Client) AllUserStats() ([]ingester.UserIDStats, error) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/distributor/all_user_stats", c.distributorAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json")

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, err
}

bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

userStats := make([]ingester.UserIDStats, 0)
err = json.Unmarshal(bodyBytes, &userStats)
if err != nil {
return nil, err
}

return userStats, nil
}

// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricMetadata) (*http.Response, error) {
// Create write request
Expand Down
23 changes: 15 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1603,26 +1603,31 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error

// AllUserStats returns statistics about all users.
// Note it does not divide by the ReplicationFactor like UserStats()
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) {
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, int, error) {
// Add up by user, across all responses from ingesters
perUserTotals := make(map[string]ingester.UserStats)
queriedIngesterNum := 0

req := &ingester_client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.ForReplicationSet(), so we can fail after first error.
replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read)
if err != nil {
return nil, err
return nil, 0, err
}
for _, ingester := range replicationSet.Instances {
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
return nil, 0, err
}
resp, err := client.(ingester_client.IngesterClient).AllUserStats(ctx, req)
if err != nil {
return nil, err
// During an ingester rolling update, an ingester might be temporarily
// in stopping or starting state. Therefore, returning an error would
// cause the API to fail during the update. This is an expected error in
// that scenario, we continue the loop to work API.
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we are returning incomplete stats after this change.
I think I prefer the current behavior.

can you clarify why incomplete stats are ok?

Copy link
Member Author

@SungJin1212 SungJin1212 Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, an internal batch job in my system calls the /distributor/all_user_stats API. In my workloads, since ingester deployments take over 6 hours, all jobs are failing during the update.

What do you think about adding a dedicated column for the reflected replication factor?
In deployment, some tenants may have a 2 replication factor, but it would not be incomplete stats.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the context. 🙏

mmm. I went through the code again and the stats are coming from healthy ingesters. So my understanding is probably that check might be outdated due to the way the ring in memberlist works.

Also my understanding is this API is very flaky in large clusters. There is always an ingester stopping somewhere. We should definitely fix that.

I would be fine with this change if we also add another field to the stats that expresses how many ingesters have been queried, perfect if that number shows up also in

const tpl = `

@SungJin1212 wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it would be good if we add a line explaining how many ingesters have been queried.
If not all ingesters are reflected, the stats results could be unstable. Would that be acceptable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a dedicated column # Queried Ingesters and added a line expressing how many ingesters are reflected in total.
스크린샷 2025-09-22 오후 3 43 00

}
queriedIngesterNum++
for _, u := range resp.Stats {
s := perUserTotals[u.UserId]
s.IngestionRate += u.Data.IngestionRate
Expand All @@ -1631,6 +1636,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats,
s.NumSeries += u.Data.NumSeries
s.ActiveSeries += u.Data.ActiveSeries
s.LoadedBlocks += u.Data.LoadedBlocks
s.QueriedIngesters += 1
perUserTotals[u.UserId] = s
}
}
Expand All @@ -1647,22 +1653,23 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats,
NumSeries: stats.NumSeries,
ActiveSeries: stats.ActiveSeries,
LoadedBlocks: stats.LoadedBlocks,
QueriedIngesters: stats.QueriedIngesters,
},
})
}

return response, nil
return response, queriedIngesterNum, nil
}

// AllUserStatsHandler shows stats for all users.
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.AllUserStats(r.Context())
stats, queriedIngesterNum, err := d.AllUserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor())
ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor(), queriedIngesterNum)
}

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/ingester/http_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const tpl = `
{{if (gt .ReplicationFactor 0)}}
<p><b>NB stats do not account for replication factor, which is currently set to {{ .ReplicationFactor }}</b></p>
{{end}}
<p><b> These stats were aggregated from {{ .QueriedIngesterNum }} ingesters.</b></p>

<form action="" method="POST">
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
<table border="1">
Expand All @@ -37,6 +39,7 @@ const tpl = `
<th>Total Ingest Rate</th>
<th>API Ingest Rate</th>
<th>Rule Ingest Rate</th>
<th># Queried Ingesters</th>
</tr>
</thead>
<tbody>
Expand All @@ -49,6 +52,7 @@ const tpl = `
<td align='right'>{{ printf "%.2f" .UserStats.IngestionRate }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.APIIngestionRate }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.RuleIngestionRate }}</td>
<td align='right'>{{ .UserStats.QueriedIngesters }}</td>
</tr>
{{ end }}
</tbody>
Expand Down Expand Up @@ -87,10 +91,11 @@ type UserStats struct {
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
LoadedBlocks uint64 `json:"loadedBlocks"`
QueriedIngesters uint64 `json:"queriedIngesters"`
}

// AllUserStatsRender render data for all users or return in json format.
func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf int) {
func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf, queriedIngesterNum int) {
sort.Sort(UserStatsByTimeseries(stats))

if encodings, found := r.Header["Accept"]; found &&
Expand All @@ -102,12 +107,14 @@ func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDSt
}

util.RenderHTTPResponse(w, struct {
Now time.Time `json:"now"`
Stats []UserIDStats `json:"stats"`
ReplicationFactor int `json:"replicationFactor"`
Now time.Time `json:"now"`
Stats []UserIDStats `json:"stats"`
ReplicationFactor int `json:"replicationFactor"`
QueriedIngesterNum int `json:"queriedIngesterNum"`
}{
Now: time.Now(),
Stats: stats,
ReplicationFactor: rf,
Now: time.Now(),
Stats: stats,
ReplicationFactor: rf,
QueriedIngesterNum: queriedIngesterNum,
}, UserStatsTmpl, r)
}
2 changes: 1 addition & 1 deletion pkg/ingester/http_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestUserStatsPageRendered(t *testing.T) {
},
},
}
AllUserStatsRender(res, req, userStats, 3)
AllUserStatsRender(res, req, userStats, 3, 3)
assert.Equal(t, http.StatusOK, res.Code)
body := res.Body.String()
assert.Regexp(t, "<td.+123.+/td>", body)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2133,7 +2133,7 @@ func (i *Ingester) userStats() []UserIDStats {
func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats := i.userStats()

AllUserStatsRender(w, r, stats, 0)
AllUserStatsRender(w, r, stats, 0, 0)
}

// AllUserStats returns ingestion statistics for all users known to this ingester.
Expand Down
Loading