Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ If something is missing, or you found a mistake in one of these examples, please
- [insert_file_stream_parquet.ts](node/insert_file_stream_parquet.ts) - (Node.js only) stream a Parquet file into ClickHouse.
- [insert_arbitrary_format_stream.ts](node/insert_arbitrary_format_stream.ts) - (Node.js only) stream in arbitrary format into ClickHouse. In this case, the input format is [AVRO](https://clickhouse.com/docs/interfaces/formats/Avro), inserting the data from an Avro data file generated ad-hoc.
- [stream_created_from_array_raw.ts](node/stream_created_from_array_raw.ts) - (Node.js only) converting the string input into a stream and sending it to ClickHouse; in this scenario, the base input is a CSV string.
- [insert_streaming_with_backpressure.ts](node/insert_streaming_with_backpressure.ts) - (Node.js only) advanced streaming INSERT example with proper backpressure handling, demonstrating how to handle high-throughput scenarios where your application pushes data to the stream.
- [insert_streaming_backpressure_simple.ts](node/insert_streaming_backpressure_simple.ts) - (Node.js only) simple streaming INSERT example with backpressure handling, showing the essential pattern for streaming data from your application to ClickHouse.
- [insert_values_and_functions.ts](insert_values_and_functions.ts) - generating an `INSERT INTO ... VALUES` statement that uses a combination of values and function calls.
- [insert_ephemeral_columns.ts](insert_ephemeral_columns.ts) - inserting data into a table that has [ephemeral columns](https://clickhouse.com/docs/en/sql-reference/statements/create/table#ephemeral).

Expand Down
137 changes: 137 additions & 0 deletions examples/node/insert_streaming_backpressure_simple.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { createClient } from '@clickhouse/client'
import * as Stream from 'node:stream'

interface DataRow {
id: number
name: string
value: number
}

class SimpleBackpressureStream extends Stream.Readable {
#currentId = 1
#maxRecords = 0
#intervalId: NodeJS.Timeout | null = null
#isPaused = false

constructor(maxRecords: number) {
super({ objectMode: true, highWaterMark: 5 })
this.#maxRecords = maxRecords
}

_read() {
if (this.#isPaused) {
console.log('Backpressure relieved - resuming data production')
this.#isPaused = false
this.#startProducing()
}
}

#startProducing() {
if (this.#intervalId || this.#currentId > this.#maxRecords) {
return
}

this.#intervalId = setInterval(() => {
if (this.#currentId > this.#maxRecords) {
console.log('All data produced, ending stream')
this.push(null) // End the stream
this.#stopProducing()
return
}

const data: DataRow = {
id: this.#currentId++,
name: `Name_${this.#currentId - 1}`,
value: Math.random() * 1000,
}
const canContinue = this.push(data)

if (!canContinue) {
// console.log('Backpressure detected - pausing data production')
this.#isPaused = true
this.#stopProducing()
} else if (this.#currentId % 500 === 0) {
console.log(`Produced ${this.#currentId - 1} records`)
}
}, 1)
}

#stopProducing() {
if (this.#intervalId) {
clearInterval(this.#intervalId)
this.#intervalId = null
}
}

start() {
this.#startProducing()
}

_destroy(error: Error | null, callback: (error?: Error | null) => void) {
this.#stopProducing()
callback(error)
}
}

void (async () => {
const tableName = 'simple_streaming_demo'
const client = createClient()

// Setup table
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})

await client.command({
query: `
CREATE TABLE ${tableName}
(
id UInt32,
name String,
value Float64
)
ENGINE = MergeTree()
ORDER BY id
`,
})

const maxRecords = 10000
console.log('Creating backpressure-aware data stream...')

const dataStream = new SimpleBackpressureStream(maxRecords)

try {
console.log('Starting streaming insert with backpressure demonstration...')

const insertPromise = client.insert({
table: tableName,
values: dataStream,
format: 'JSONEachRow',
clickhouse_settings: {
// Use async inserts to handle streaming data more efficiently
async_insert: 1,
wait_for_async_insert: 1,
async_insert_max_data_size: '10485760', // 10MB
async_insert_busy_timeout_ms: 1000,
},
})

setTimeout(() => dataStream.start(), 100)

await insertPromise

console.log('Insert completed successfully!')

const result = await client.query({
query: `SELECT count() as total FROM ${tableName}`,
format: 'JSONEachRow',
})

const [{ total }] = await result.json<{ total: string }>()
console.log(`Total records inserted: ${total}`)
} catch (error) {
console.error('Insert failed:', error)
} finally {
await client.close()
}
})()
Loading