Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/util/httpreq"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/rangeio"
Expand Down Expand Up @@ -122,7 +123,7 @@ func New(params Params) (*Engine, error) {
rangeConfig: params.Config.RangeConfig,

scheduler: params.Scheduler,
bucket: params.Bucket,
bucket: bucket.NewXCapBucket(params.Bucket),
limits: params.Limits,
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/internal/worker/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/bucket"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {

cfg := executor.Config{
BatchSize: t.BatchSize,
Bucket: t.Bucket,
Bucket: bucket.NewXCapBucket(t.Bucket),

GetExternalInputs: func(_ context.Context, node physical.Node) []executor.Pipeline {
streams := job.Task.Sources[node]
Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/bucket/xcap_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package bucket

import (
"context"
"io"

"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/xcap"
)

// Statistics for tracking bucket operation counts.
var (
statBucketGet = xcap.NewStatisticInt64("bucket.get", xcap.AggregationTypeSum)
statBucketGetRange = xcap.NewStatisticInt64("bucket.getrange", xcap.AggregationTypeSum)
statBucketIter = xcap.NewStatisticInt64("bucket.iter", xcap.AggregationTypeSum)
statBucketAttributes = xcap.NewStatisticInt64("bucket.attributes", xcap.AggregationTypeSum)
)

// XCapBucket wraps an objstore.Bucket and records request counts to the xcap
// Region found in the context. If no Region is present in the context, the
// wrapper simply delegates to the underlying bucket without recording.
type XCapBucket struct {
bkt objstore.Bucket
}

// NewXCapBucket creates a new XcapBucket that wraps the given bucket and records
// request counts to xcap regions found in the context.
func NewXCapBucket(bkt objstore.Bucket) *XCapBucket {
return &XCapBucket{bkt: bkt}
}

// recordOp records a single operation to the xcap region if present in the context.
func recordOp(ctx context.Context, stat *xcap.StatisticInt64) {
region := xcap.RegionFromContext(ctx)
if region == nil {
return
}
region.Record(stat.Observe(1))
}

// Provider returns the underlying bucket provider.
func (b *XCapBucket) Provider() objstore.ObjProvider {
return b.bkt.Provider()
}

// Close closes the underlying bucket.
func (b *XCapBucket) Close() error {
return b.bkt.Close()
}

// Iter calls f for each entry in the given directory (not recursive.).
func (b *XCapBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
recordOp(ctx, statBucketIter)
return b.bkt.Iter(ctx, dir, f, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
func (b *XCapBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
recordOp(ctx, statBucketIter)
return b.bkt.IterWithAttributes(ctx, dir, f, options...)
}

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
func (b *XCapBucket) SupportedIterOptions() []objstore.IterOptionType {
return b.bkt.SupportedIterOptions()
}

// Get returns a reader for the given object name.
func (b *XCapBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
recordOp(ctx, statBucketGet)
return b.bkt.Get(ctx, name)
}

// GetRange returns a new range reader for the given object name and range.
func (b *XCapBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
recordOp(ctx, statBucketGetRange)
return b.bkt.GetRange(ctx, name, off, length)
}

// GetAndReplace an existing object with a new object.
func (b *XCapBucket) GetAndReplace(ctx context.Context, name string, f func(io.ReadCloser) (io.ReadCloser, error)) error {
return b.bkt.GetAndReplace(ctx, name, f)
}

// Exists checks if the given object exists in the bucket.
func (b *XCapBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.bkt.Exists(ctx, name)
}

// IsObjNotFoundErr returns true if error means that object is not found.
func (b *XCapBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// IsAccessDeniedErr returns true if access to object is denied.
func (b *XCapBucket) IsAccessDeniedErr(err error) bool {
return b.bkt.IsAccessDeniedErr(err)
}

// Attributes returns information about the specified object.
func (b *XCapBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
recordOp(ctx, statBucketAttributes)
return b.bkt.Attributes(ctx, name)
}

// Upload uploads the contents of the reader as an object into the bucket.
func (b *XCapBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.bkt.Upload(ctx, name, r)
}

// Delete removes the object with the given name.
func (b *XCapBucket) Delete(ctx context.Context, name string) error {
return b.bkt.Delete(ctx, name)
}

// Name returns the bucket name for the provider.
func (b *XCapBucket) Name() string {
return b.bkt.Name()
}

// Ensure XcapBucket implements objstore.Bucket interface.
var _ objstore.Bucket = (*XCapBucket)(nil)
4 changes: 2 additions & 2 deletions pkg/xcap/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func contextWithCapture(ctx context.Context, capture *Capture) context.Context {
return context.WithValue(ctx, captureKey, capture)
}

// regionFromContext returns the current Region from the context, or nil if no Region
// RegionFromContext returns the current Region from the context, or nil if no Region
// is present.
func regionFromContext(ctx context.Context) *Region {
func RegionFromContext(ctx context.Context) *Region {
v, ok := ctx.Value(regionKey).(*Region)
if !ok {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/xcap/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func StartRegion(ctx context.Context, name string, opts ...RegionOption) (contex
}

// extract parentID from context
if pr := regionFromContext(ctx); pr != nil {
if pr := RegionFromContext(ctx); pr != nil {
r.parentID = pr.id
}

Expand Down
Loading