Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003
* [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979
* [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945
* [ENHANCEMENT] Ruler: Emit an error message when the rule synchronization fails. #6902
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ parquet_converter:
# CLI flag: -parquet-converter.file-buffer-enabled
[file_buffer_enabled: <boolean> | default = true]

# Configure the additional sort columns, in order of precedence, to improve query performance.
# These will be applied during Parquet file generation.
# CLI flag: -parquet-converter.sort-columns
[additional_sort_columns: <list of string> | default = []]

# Local directory path for caching TSDB blocks during parquet conversion.
# CLI flag: -parquet-converter.data-dir
[data_dir: <string> | default = "./data"]
Expand Down
8 changes: 8 additions & 0 deletions docs/guides/parquet-mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ parquet_converter:
# Enable file buffering to reduce memory usage
file_buffer_enabled: true

# Defines additional sort columns applied during Parquet file generation.
additional_sort_columns: ["label1", "label2"]

# Ring configuration for distributed conversion
ring:
kvstore:
Expand All @@ -106,6 +109,9 @@ limits:

# Shard size for shuffle sharding (0 = disabled)
parquet_converter_tenant_shard_size: 0.8

# Defines sort columns applied during Parquet file generation for specific tenants
parquet_converter_sort_columns: ["label1", "label2"]
```

You can also configure per-tenant settings using runtime configuration:
Expand All @@ -115,6 +121,7 @@ overrides:
tenant-1:
parquet_converter_enabled: true
parquet_converter_tenant_shard_size: 2
parquet_converter_sort_columns: ["cluster", "namespace"]
tenant-2:
parquet_converter_enabled: false
```
Expand Down Expand Up @@ -280,6 +287,7 @@ cortex_parquet_queryable_cache_misses_total
1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns
2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory
3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance
4. **Sort Columns**: Configure `sort_columns` based on your most common query filters to improve query performance

### Fallback Configuration

Expand Down
27 changes: 22 additions & 5 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
"github.com/cortexproject/cortex/pkg/util/flagext"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand All @@ -54,10 +55,11 @@ const (
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
AdditionalSortColumns []string `yaml:"additional_sort_columns"`

DataDir string `yaml:"data_dir"`

Expand Down Expand Up @@ -109,6 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
f.Var((*flagext.StringSlice)(&cfg.AdditionalSortColumns), "parquet-converter.additional-sort-columns", "Configure the additional sort columns, in order of precedence, to improve query performance. These will be applied during parquet file generation.")
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
Expand All @@ -126,6 +129,13 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
}

func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter {
// Create base sort columns with metric name as the primary sort column
sortColumns := []string{labels.MetricName}
if len(cfg.AdditionalSortColumns) > 0 {
sortColumns = append(sortColumns, cfg.AdditionalSortColumns...)
}
cfg.AdditionalSortColumns = sortColumns

c := &Converter{
cfg: cfg,
reg: registerer,
Expand All @@ -139,7 +149,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithSortBy(labels.MetricName),
convert.WithSortBy(sortColumns...),
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
},
Expand Down Expand Up @@ -430,6 +440,13 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin

converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))

userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID)
if len(userConfiguredSortColumns) > 0 {
sortColumns := []string{labels.MetricName}
sortColumns = append(sortColumns, userConfiguredSortColumns...)
converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...))
}

if c.cfg.FileBufferEnabled {
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
}
Expand Down
73 changes: 70 additions & 3 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,19 @@ func TestConverter(t *testing.T) {
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
userSpecificSortColumns := []string{"cluster", "namespace"}

// Create a mock tenant limits implementation
tenantLimits := &mockTenantLimits{
limits: map[string]*validation.Limits{
user: {
ParquetConverterSortColumns: userSpecificSortColumns,
ParquetConverterEnabled: true,
},
},
}

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, tenantLimits)

ctx := context.Background()

Expand Down Expand Up @@ -157,7 +169,7 @@ func prepareConfig() Config {
return cfg
}

func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) {
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
flagext.DefaultValues(&storageCfg)
Expand All @@ -176,7 +188,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket,
flagext.DefaultValues(limits)
}

overrides := validation.NewOverrides(*limits, nil)
overrides := validation.NewOverrides(*limits, tenantLimits)

scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{
Strategy: cortex_tsdb.UserScanStrategyList,
Expand Down Expand Up @@ -350,6 +362,45 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) {
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
}

func TestConverter_SortColumns(t *testing.T) {
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

testCases := []struct {
desc string
cfg Config
expectedSortColumns []string
}{
{
desc: "no additional sort columns are added",
cfg: Config{
MetaSyncConcurrency: 1,
DataDir: t.TempDir(),
},
expectedSortColumns: []string{labels.MetricName},
},
{
desc: "additional sort columns are added",
cfg: Config{
MetaSyncConcurrency: 1,
DataDir: t.TempDir(),
AdditionalSortColumns: []string{"cluster", "namespace"},
},
expectedSortColumns: []string{labels.MetricName, "cluster", "namespace"},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
c, _, _ := prepare(t, tC.cfg, objstore.WithNoopInstr(bucketClient), limits, nil)
assert.Equal(t, tC.expectedSortColumns, c.cfg.AdditionalSortColumns,
"Converter should be created with the expected sort columns")
})
}
}

// mockBucket implements objstore.Bucket for testing
type mockBucket struct {
objstore.Bucket
Expand Down Expand Up @@ -384,3 +435,19 @@ func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDe
},
}, nil
}

// mockTenantLimits implements the validation.TenantLimits interface for testing
type mockTenantLimits struct {
limits map[string]*validation.Limits
}

func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits {
if limits, ok := m.limits[userID]; ok {
return limits
}
return nil
}

func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return m.limits
}
11 changes: 9 additions & 2 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ type Limits struct {
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`

// Parquet converter
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns" doc:"hidden"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -324,6 +325,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")
f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.")

// Parquet Queryable enforced limits.
f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.")
Expand Down Expand Up @@ -904,6 +906,11 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool {
return o.GetOverridesForUser(userID).ParquetConverterEnabled
}

// ParquetConverterSortColumns returns the additional sort columns for parquet files.
func (o *Overrides) ParquetConverterSortColumns(userID string) []string {
return o.GetOverridesForUser(userID).ParquetConverterSortColumns
}

// ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage.
func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int {
return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount
Expand Down
Loading