diff --git a/examples/experimental/csv-streaming-parser.js b/examples/experimental/csv-streaming-parser.js new file mode 100644 index 00000000000..4f936aac151 --- /dev/null +++ b/examples/experimental/csv-streaming-parser.js @@ -0,0 +1,37 @@ +import csv from 'k6/experimental/csv' + +export const options = { + scenarios: { + default: { + executor: "shared-iterations", + vus: 1, + iterations: 100, + } + } +}; + +// Use the new StreamingParser for large CSV files to avoid memory issues +// This takes a file path string instead of a fs.File object and streams the data +const parser = new csv.StreamingParser('data.csv', { + skipFirstLine: true, +}); + +export default async function() { + // The streaming parser `next` method works the same as the regular parser + // but doesn't load the entire file into memory during initialization + const { done, value } = await parser.next(); + if (done) { + throw new Error("No more rows to read"); + } + + // We expect the `value` property to be an array of strings, where each string is a field + // from the CSV record. + console.log(done, value); +} + +// Optional: Clean up resources when the test finishes +export function teardown() { + // The streaming parser can be explicitly closed to free resources + // though it will be closed automatically when the test ends + parser.close(); +} \ No newline at end of file diff --git a/internal/js/modules/k6/experimental/csv/module.go b/internal/js/modules/k6/experimental/csv/module.go index 9f55c56ec51..5b40874f741 100644 --- a/internal/js/modules/k6/experimental/csv/module.go +++ b/internal/js/modules/k6/experimental/csv/module.go @@ -21,6 +21,7 @@ import ( "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" + "go.k6.io/k6/lib/fsext" ) type ( @@ -67,8 +68,9 @@ func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { func (mi *ModuleInstance) Exports() modules.Exports { return modules.Exports{ Named: map[string]any{ - "parse": mi.Parse, - "Parser": mi.NewParser, + "parse": mi.Parse, + "Parser": mi.NewParser, + "StreamingParser": mi.NewStreamingParser, }, } } @@ -187,6 +189,120 @@ func (mi *ModuleInstance) NewParser(call sobek.ConstructorCall) *sobek.Object { return rt.ToValue(&parser).ToObject(rt) } +// NewStreamingParser creates a new streaming CSV parser instance that doesn't load the entire file into memory. +// +// Unlike NewParser, this constructor takes a file path string and opens the file directly +// from the filesystem using a streaming approach, making it suitable for large CSV files. +func (mi *ModuleInstance) NewStreamingParser(call sobek.ConstructorCall) *sobek.Object { + rt := mi.vu.Runtime() + + if mi.vu.State() != nil { + common.Throw(rt, errors.New("csv StreamingParser constructor must be called in the init context")) + } + + if len(call.Arguments) < 1 || sobek.IsUndefined(call.Argument(0)) { + common.Throw(rt, fmt.Errorf("csv StreamingParser constructor takes at least one non-nil file path argument")) + } + + filePathArg := call.Argument(0) + if common.IsNullish(filePathArg) { + common.Throw(rt, fmt.Errorf("csv StreamingParser constructor takes at least one non-nil file path argument")) + } + + filePath := filePathArg.String() + if filePath == "" { + common.Throw(rt, fmt.Errorf("csv StreamingParser constructor requires a non-empty file path")) + } + + options := newDefaultParserOptions() + if len(call.Arguments) == 2 && !sobek.IsUndefined(call.Argument(1)) { + var err error + options, err = newParserOptionsFrom(call.Argument(1).ToObject(rt)) + if err != nil { + common.Throw(rt, fmt.Errorf("encountered an error while interpreting StreamingParser options; reason: %w", err)) + } + } + + // Use OS filesystem directly instead of k6's cached filesystem to avoid memory issues + // This bypasses k6's file caching which loads entire files into memory + osFs := fsext.NewOsFs() + + // Create a streaming reader that opens the file directly without loading it into memory + streamingReader, err := NewStreamingReaderFrom(osFs, filePath, options) + if err != nil { + common.Throw(rt, fmt.Errorf("failed to create streaming parser; reason: %w", err)) + } + + // Create a new StreamingParser instance + parser := StreamingParser{ + reader: streamingReader, + options: options, + vu: mi.vu, + } + + return rt.ToValue(&parser).ToObject(rt) +} + +// StreamingParser is a streaming CSV parser that doesn't load the entire file into memory. +type StreamingParser struct { + // currentLine holds the current line number being read by the parser. + currentLine atomic.Int64 + + // reader is the streaming CSV reader that enables reading records from the file + // without loading the entire file into memory. + reader *StreamingReader + + // options holds the parser's options as provided by the user. + options options + + // vu is the VU instance that owns this module instance. + vu modules.VU +} + +// Next returns the next row in the CSV file using streaming approach. +func (p *StreamingParser) Next() *sobek.Promise { + promise, resolve, reject := promises.New(p.vu) + + go func() { + var record any + var done bool + var err error + + record, err = p.reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + resolve(parseResult{Done: true, Value: []string{}}) + return + } + + reject(err) + return + } + + p.currentLine.Add(1) + + resolve(parseResult{Done: done, Value: record}) + }() + + return promise +} + +// Close closes the underlying streaming reader and file. +func (p *StreamingParser) Close() *sobek.Promise { + promise, resolve, reject := promises.New(p.vu) + + go func() { + err := p.reader.Close() + if err != nil { + reject(fmt.Errorf("failed to close streaming parser; reason: %w", err)) + return + } + resolve(sobek.Undefined()) + }() + + return promise +} + // Next returns the next row in the CSV file. func (p *Parser) Next() *sobek.Promise { promise, resolve, reject := promises.New(p.vu) diff --git a/internal/js/modules/k6/experimental/csv/streaming_reader.go b/internal/js/modules/k6/experimental/csv/streaming_reader.go new file mode 100644 index 00000000000..251d2fb6deb --- /dev/null +++ b/internal/js/modules/k6/experimental/csv/streaming_reader.go @@ -0,0 +1,158 @@ +package csv + +import ( + "bufio" + "encoding/csv" + "fmt" + "io" + "sync/atomic" + + "github.com/spf13/afero" + "go.k6.io/k6/internal/js/modules/k6/data" + "go.k6.io/k6/lib/fsext" +) + +// StreamingReader is a CSV reader that streams data without loading the entire file into memory. +// +// Unlike the current Reader implementation, this one opens the file directly from the filesystem +// and uses a buffered reader to process CSV records one at a time, avoiding memory issues with large files. +type StreamingReader struct { + csvReader *csv.Reader + file afero.File + + // currentLine tracks the current line number. + currentLine atomic.Int64 + + // options holds the reader's options. + options options + + // columnNames stores the column names when the asObjects option is enabled + // in order to be able to map each row values to their corresponding column. + columnNames []string + + // initialized tracks whether the reader has been properly initialized + initialized bool +} + +// NewStreamingReaderFrom creates a new streaming CSV reader from the provided file path. +// +// Instead of loading the entire file into memory like the current implementation, +// this opens the file with a small buffer and processes it line by line. +func NewStreamingReaderFrom(fs fsext.Fs, filePath string, options options) (*StreamingReader, error) { + if err := validateOptions(options); err != nil { + return nil, err + } + + if options.Delimiter == 0 { + options.Delimiter = ',' + } + + // Open the file directly from the filesystem without loading it into memory + file, err := fs.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open file %s: %w", filePath, err) + } + + // Create a buffered reader to efficiently read the file in small chunks + bufferedReader := bufio.NewReaderSize(file, 64*1024) // 64KB buffer + csvReader := csv.NewReader(bufferedReader) + csvReader.Comma = options.Delimiter + + reader := &StreamingReader{ + csvReader: csvReader, + file: file, + options: options, + } + + // Initialize the reader (handle header, skip lines, etc.) + if err := reader.initialize(); err != nil { + file.Close() // Clean up on error + return nil, err + } + + return reader, nil +} + +// initialize handles the initial setup of the reader (reading headers, skipping lines, etc.) +func (r *StreamingReader) initialize() error { + if r.initialized { + return nil + } + + asObjectsEnabled := r.options.AsObjects.Valid && r.options.AsObjects.Bool + if asObjectsEnabled { + header, err := r.csvReader.Read() + if err != nil { + return fmt.Errorf("failed to read the first line; reason: %w", err) + } + r.columnNames = header + r.currentLine.Add(1) + } + + if r.options.SkipFirstLine && (!r.options.FromLine.Valid || r.options.FromLine.Int64 == 0) { + if _, err := r.csvReader.Read(); err != nil { + return fmt.Errorf("failed to skip the first line; reason: %w", err) + } + r.currentLine.Add(1) + } + + if r.options.FromLine.Valid && r.options.FromLine.Int64 > 0 { + for r.currentLine.Load() < r.options.FromLine.Int64 { + if _, err := r.csvReader.Read(); err != nil { + return fmt.Errorf("failed to skip lines until line %d; reason: %w", r.options.FromLine.Int64, err) + } + r.currentLine.Add(1) + } + } + + r.initialized = true + return nil +} + +// Read reads a record from the CSV file using streaming approach. +func (r *StreamingReader) Read() (any, error) { + toLineSet := r.options.ToLine.Valid + + // If the `toLine` option was set and we have reached it, we return EOF. + if toLineSet && r.options.ToLine.Int64 > 0 && r.currentLine.Load() > r.options.ToLine.Int64 { + return nil, io.EOF + } + + record, err := r.csvReader.Read() + if err != nil { + return nil, err + } + + r.currentLine.Add(1) + + // If header option is enabled, return a map of the record. + if r.options.AsObjects.Valid && r.options.AsObjects.Bool { + if r.columnNames == nil { + return nil, fmt.Errorf("the 'asObjects' option is enabled, but no header was found") + } + + if len(record) != len(r.columnNames) { + return nil, fmt.Errorf("record length (%d) doesn't match header length (%d)", len(record), len(r.columnNames)) + } + + recordMap := make(map[string]string) + for i, value := range record { + recordMap[r.columnNames[i]] = value + } + + return recordMap, nil + } + + return record, nil +} + +// Close closes the underlying file. +func (r *StreamingReader) Close() error { + if r.file != nil { + return r.file.Close() + } + return nil +} + +// Ensure StreamingReader implements the RecordReader interface +var _ data.RecordReader = (*StreamingReader)(nil) \ No newline at end of file diff --git a/internal/js/modules/k6/experimental/csv/streaming_test.go b/internal/js/modules/k6/experimental/csv/streaming_test.go new file mode 100644 index 00000000000..49ac405b3c2 --- /dev/null +++ b/internal/js/modules/k6/experimental/csv/streaming_test.go @@ -0,0 +1,191 @@ +package csv + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib/fsext" +) + +func TestStreamingReaderBasic(t *testing.T) { + t.Parallel() + + // Create a temporary CSV file + tmpDir := t.TempDir() + csvPath := filepath.Join(tmpDir, "test.csv") + csvContent := "name,age,city\nJohn,30,NYC\nJane,25,LA\nBob,35,Chicago" + + err := os.WriteFile(csvPath, []byte(csvContent), 0644) + require.NoError(t, err) + + // Create filesystem + fs := fsext.NewOsFs() + + // Test streaming reader + options := newDefaultParserOptions() + reader, err := NewStreamingReaderFrom(fs, csvPath, options) + require.NoError(t, err) + defer reader.Close() + + // Read first record + record1, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"name", "age", "city"}, record1) + + // Read second record + record2, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"John", "30", "NYC"}, record2) + + // Read third record + record3, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Jane", "25", "LA"}, record3) + + // Read fourth record + record4, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Bob", "35", "Chicago"}, record4) + + // Should reach EOF + _, err = reader.Read() + assert.True(t, errors.Is(err, io.EOF)) +} + +func TestStreamingReaderWithSkipFirstLine(t *testing.T) { + t.Parallel() + + // Create a temporary CSV file + tmpDir := t.TempDir() + csvPath := filepath.Join(tmpDir, "test.csv") + csvContent := "name,age,city\nJohn,30,NYC\nJane,25,LA" + + err := os.WriteFile(csvPath, []byte(csvContent), 0644) + require.NoError(t, err) + + // Create filesystem + fs := fsext.NewOsFs() + + // Test streaming reader with skipFirstLine + options := newDefaultParserOptions() + options.SkipFirstLine = true + + reader, err := NewStreamingReaderFrom(fs, csvPath, options) + require.NoError(t, err) + defer reader.Close() + + // Should read John's record first (header skipped) + record1, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"John", "30", "NYC"}, record1) + + // Should read Jane's record second + record2, err := reader.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Jane", "25", "LA"}, record2) + + // Should reach EOF + _, err = reader.Read() + assert.True(t, errors.Is(err, io.EOF)) +} + +func TestStreamingReaderWithAsObjects(t *testing.T) { + t.Parallel() + + // Create a temporary CSV file + tmpDir := t.TempDir() + csvPath := filepath.Join(tmpDir, "test.csv") + csvContent := "name,age,city\nJohn,30,NYC\nJane,25,LA" + + err := os.WriteFile(csvPath, []byte(csvContent), 0644) + require.NoError(t, err) + + // Create filesystem + fs := fsext.NewOsFs() + + // Test streaming reader with asObjects + options := newDefaultParserOptions() + options.AsObjects.Valid = true + options.AsObjects.Bool = true + + reader, err := NewStreamingReaderFrom(fs, csvPath, options) + require.NoError(t, err) + defer reader.Close() + + // Should read John's record as object + record1, err := reader.Read() + require.NoError(t, err) + expected1 := map[string]string{"name": "John", "age": "30", "city": "NYC"} + assert.Equal(t, expected1, record1) + + // Should read Jane's record as object + record2, err := reader.Read() + require.NoError(t, err) + expected2 := map[string]string{"name": "Jane", "age": "25", "city": "LA"} + assert.Equal(t, expected2, record2) + + // Should reach EOF + _, err = reader.Read() + assert.True(t, errors.Is(err, io.EOF)) +} + +func TestStreamingReaderNonExistentFile(t *testing.T) { + t.Parallel() + + // Create filesystem + fs := fsext.NewOsFs() + + // Test streaming reader with non-existent file + options := newDefaultParserOptions() + + _, err := NewStreamingReaderFrom(fs, "non-existent-file.csv", options) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to open file") +} + +func TestStreamingReaderMemoryUsage(t *testing.T) { + t.Parallel() + + // Create a larger CSV file to test memory efficiency + tmpDir := t.TempDir() + csvPath := filepath.Join(tmpDir, "large.csv") + + // Create a CSV with 1000 rows + var csvContent string + csvContent = "id,name,value\n" + for i := 0; i < 1000; i++ { + csvContent += fmt.Sprintf("%d,user%d,value%d\n", i, i, i) + } + + err := os.WriteFile(csvPath, []byte(csvContent), 0644) + require.NoError(t, err) + + // Create filesystem + fs := fsext.NewOsFs() + + // Test streaming reader + options := newDefaultParserOptions() + reader, err := NewStreamingReaderFrom(fs, csvPath, options) + require.NoError(t, err) + defer reader.Close() + + // Read all records to ensure they're processed correctly + recordCount := 0 + for { + _, err := reader.Read() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + recordCount++ + } + + // Should have read header + 1000 data rows + assert.Equal(t, 1001, recordCount) +} \ No newline at end of file