Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/experimental/csv-streaming-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import csv from 'k6/experimental/csv'

export const options = {
scenarios: {
default: {
executor: "shared-iterations",
vus: 1,
iterations: 100,
}
}
};

// Use the new StreamingParser for large CSV files to avoid memory issues
// This takes a file path string instead of a fs.File object and streams the data
const parser = new csv.StreamingParser('data.csv', {
skipFirstLine: true,
});

export default async function() {
// The streaming parser `next` method works the same as the regular parser
// but doesn't load the entire file into memory during initialization
const { done, value } = await parser.next();
if (done) {
throw new Error("No more rows to read");
}

// We expect the `value` property to be an array of strings, where each string is a field
// from the CSV record.
console.log(done, value);
}

// Optional: Clean up resources when the test finishes
export function teardown() {
// The streaming parser can be explicitly closed to free resources
// though it will be closed automatically when the test ends
parser.close();
}
120 changes: 118 additions & 2 deletions internal/js/modules/k6/experimental/csv/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib/fsext"
)

type (
Expand Down Expand Up @@ -67,8 +68,9 @@ func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
func (mi *ModuleInstance) Exports() modules.Exports {
return modules.Exports{
Named: map[string]any{
"parse": mi.Parse,
"Parser": mi.NewParser,
"parse": mi.Parse,
"Parser": mi.NewParser,
"StreamingParser": mi.NewStreamingParser,
},
}
}
Expand Down Expand Up @@ -187,6 +189,120 @@ func (mi *ModuleInstance) NewParser(call sobek.ConstructorCall) *sobek.Object {
return rt.ToValue(&parser).ToObject(rt)
}

// NewStreamingParser creates a new streaming CSV parser instance that doesn't load the entire file into memory.
//
// Unlike NewParser, this constructor takes a file path string and opens the file directly
// from the filesystem using a streaming approach, making it suitable for large CSV files.
func (mi *ModuleInstance) NewStreamingParser(call sobek.ConstructorCall) *sobek.Object {
rt := mi.vu.Runtime()

if mi.vu.State() != nil {
common.Throw(rt, errors.New("csv StreamingParser constructor must be called in the init context"))
}

if len(call.Arguments) < 1 || sobek.IsUndefined(call.Argument(0)) {
common.Throw(rt, fmt.Errorf("csv StreamingParser constructor takes at least one non-nil file path argument"))
}

filePathArg := call.Argument(0)
if common.IsNullish(filePathArg) {
common.Throw(rt, fmt.Errorf("csv StreamingParser constructor takes at least one non-nil file path argument"))
}

filePath := filePathArg.String()
if filePath == "" {
common.Throw(rt, fmt.Errorf("csv StreamingParser constructor requires a non-empty file path"))
}

options := newDefaultParserOptions()
if len(call.Arguments) == 2 && !sobek.IsUndefined(call.Argument(1)) {
var err error
options, err = newParserOptionsFrom(call.Argument(1).ToObject(rt))
if err != nil {
common.Throw(rt, fmt.Errorf("encountered an error while interpreting StreamingParser options; reason: %w", err))
}
}

// Use OS filesystem directly instead of k6's cached filesystem to avoid memory issues
// This bypasses k6's file caching which loads entire files into memory
osFs := fsext.NewOsFs()

// Create a streaming reader that opens the file directly without loading it into memory
streamingReader, err := NewStreamingReaderFrom(osFs, filePath, options)
if err != nil {
common.Throw(rt, fmt.Errorf("failed to create streaming parser; reason: %w", err))
}

// Create a new StreamingParser instance
parser := StreamingParser{
reader: streamingReader,
options: options,
vu: mi.vu,
}

return rt.ToValue(&parser).ToObject(rt)
}

// StreamingParser is a streaming CSV parser that doesn't load the entire file into memory.
type StreamingParser struct {
// currentLine holds the current line number being read by the parser.
currentLine atomic.Int64

// reader is the streaming CSV reader that enables reading records from the file
// without loading the entire file into memory.
reader *StreamingReader

// options holds the parser's options as provided by the user.
options options

// vu is the VU instance that owns this module instance.
vu modules.VU
}

// Next returns the next row in the CSV file using streaming approach.
func (p *StreamingParser) Next() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)

go func() {
var record any
var done bool
var err error

record, err = p.reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
resolve(parseResult{Done: true, Value: []string{}})
return
}

reject(err)
return
}

p.currentLine.Add(1)

resolve(parseResult{Done: done, Value: record})
}()

return promise
}

// Close closes the underlying streaming reader and file.
func (p *StreamingParser) Close() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)

go func() {
err := p.reader.Close()
if err != nil {
reject(fmt.Errorf("failed to close streaming parser; reason: %w", err))
return
}
resolve(sobek.Undefined())
}()

return promise
}

// Next returns the next row in the CSV file.
func (p *Parser) Next() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)
Expand Down
158 changes: 158 additions & 0 deletions internal/js/modules/k6/experimental/csv/streaming_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package csv

import (
"bufio"
"encoding/csv"
"fmt"
"io"
"sync/atomic"

"github.com/spf13/afero"
"go.k6.io/k6/internal/js/modules/k6/data"
"go.k6.io/k6/lib/fsext"
)

// StreamingReader is a CSV reader that streams data without loading the entire file into memory.
//
// Unlike the current Reader implementation, this one opens the file directly from the filesystem
// and uses a buffered reader to process CSV records one at a time, avoiding memory issues with large files.
type StreamingReader struct {
csvReader *csv.Reader
file afero.File

// currentLine tracks the current line number.
currentLine atomic.Int64

// options holds the reader's options.
options options

// columnNames stores the column names when the asObjects option is enabled
// in order to be able to map each row values to their corresponding column.
columnNames []string

// initialized tracks whether the reader has been properly initialized
initialized bool
}

// NewStreamingReaderFrom creates a new streaming CSV reader from the provided file path.
//
// Instead of loading the entire file into memory like the current implementation,
// this opens the file with a small buffer and processes it line by line.
func NewStreamingReaderFrom(fs fsext.Fs, filePath string, options options) (*StreamingReader, error) {
if err := validateOptions(options); err != nil {
return nil, err
}

if options.Delimiter == 0 {
options.Delimiter = ','
}

// Open the file directly from the filesystem without loading it into memory
file, err := fs.Open(filePath)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", filePath, err)
}

// Create a buffered reader to efficiently read the file in small chunks
bufferedReader := bufio.NewReaderSize(file, 64*1024) // 64KB buffer
csvReader := csv.NewReader(bufferedReader)
csvReader.Comma = options.Delimiter

reader := &StreamingReader{
csvReader: csvReader,
file: file,
options: options,
}

// Initialize the reader (handle header, skip lines, etc.)
if err := reader.initialize(); err != nil {
file.Close() // Clean up on error
return nil, err
}

return reader, nil
}

// initialize handles the initial setup of the reader (reading headers, skipping lines, etc.)
func (r *StreamingReader) initialize() error {
if r.initialized {
return nil
}

asObjectsEnabled := r.options.AsObjects.Valid && r.options.AsObjects.Bool
if asObjectsEnabled {
header, err := r.csvReader.Read()
if err != nil {
return fmt.Errorf("failed to read the first line; reason: %w", err)
}
r.columnNames = header
r.currentLine.Add(1)
}

if r.options.SkipFirstLine && (!r.options.FromLine.Valid || r.options.FromLine.Int64 == 0) {
if _, err := r.csvReader.Read(); err != nil {
return fmt.Errorf("failed to skip the first line; reason: %w", err)
}
r.currentLine.Add(1)
}

if r.options.FromLine.Valid && r.options.FromLine.Int64 > 0 {
for r.currentLine.Load() < r.options.FromLine.Int64 {
if _, err := r.csvReader.Read(); err != nil {
return fmt.Errorf("failed to skip lines until line %d; reason: %w", r.options.FromLine.Int64, err)
}
r.currentLine.Add(1)
}
}

r.initialized = true
return nil
}

// Read reads a record from the CSV file using streaming approach.
func (r *StreamingReader) Read() (any, error) {
toLineSet := r.options.ToLine.Valid

// If the `toLine` option was set and we have reached it, we return EOF.
if toLineSet && r.options.ToLine.Int64 > 0 && r.currentLine.Load() > r.options.ToLine.Int64 {
return nil, io.EOF
}

record, err := r.csvReader.Read()
if err != nil {
return nil, err
}

r.currentLine.Add(1)

// If header option is enabled, return a map of the record.
if r.options.AsObjects.Valid && r.options.AsObjects.Bool {
if r.columnNames == nil {
return nil, fmt.Errorf("the 'asObjects' option is enabled, but no header was found")
}

if len(record) != len(r.columnNames) {
return nil, fmt.Errorf("record length (%d) doesn't match header length (%d)", len(record), len(r.columnNames))
}

recordMap := make(map[string]string)
for i, value := range record {
recordMap[r.columnNames[i]] = value
}

return recordMap, nil
}

return record, nil
}

// Close closes the underlying file.
func (r *StreamingReader) Close() error {
if r.file != nil {
return r.file.Close()
}
return nil
}

// Ensure StreamingReader implements the RecordReader interface
var _ data.RecordReader = (*StreamingReader)(nil)
Loading