Skip to content

Commit 8d1f2c3

Browse files
committed
chore(vector): add vector pipeline to ship clickhouse events from the edge
1 parent fe5fde3 commit 8d1f2c3

File tree

29 files changed

+746
-34
lines changed

29 files changed

+746
-34
lines changed

Cargo.lock

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Large diffs are not rendered by default.

docker/dev-full/vector-client/vector.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ sources:
1111
- http://rivet-client:6090
1212
scrape_interval_secs: 15
1313

14+
http_events:
15+
type: http_server
16+
address: 0.0.0.0:5022
17+
decoding:
18+
codec: json
19+
1420
pegboard_manager:
1521
type: file
1622
include:
@@ -92,6 +98,7 @@ sinks:
9298
type: vector
9399
inputs:
94100
- metrics_add_meta
101+
- http_events
95102
- pegboard_manager_add_meta
96103
- pegboard_v8_isolate_runner_add_meta
97104
- pegboard_container_runner_add_meta

docker/dev-full/vector-server/vector.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ sources:
1919
decoding:
2020
codec: json
2121

22+
http_json:
23+
type: http_server
24+
address: 0.0.0.0:6200
25+
decoding:
26+
codec: json
27+
path: ""
28+
strict_path: false
29+
2230
vector_metrics:
2331
type: internal_metrics
2432

@@ -59,6 +67,32 @@ transforms:
5967
condition:
6068
type: vrl
6169
source: .source == "pegboard_container_runner"
70+
71+
clickhouse_events:
72+
type: filter
73+
inputs:
74+
- http_json
75+
condition:
76+
type: vrl
77+
source: .source == "clickhouse"
78+
79+
clickhouse_events_transform:
80+
type: remap
81+
inputs:
82+
- clickhouse_events
83+
source: |
84+
# Extract and store metadata
85+
__database = .database ?? "custom_events"
86+
__table = .table
87+
88+
# Create a new object with just the columns data
89+
. = {
90+
"__database": __database,
91+
"__table": __table
92+
}
93+
94+
# Merge in the column data that should be inserted
95+
. = merge(., .columns)
6296
6397
sinks:
6498
prom_exporter:
@@ -118,4 +152,19 @@ sinks:
118152
path: "/var/log/vector/pegboard_container_runner/%Y-%m-%d.log"
119153
encoding:
120154
codec: "text"
155+
156+
clickhouse_dynamic_events:
157+
type: clickhouse
158+
inputs:
159+
- clickhouse_events_transform
160+
compression: gzip
161+
endpoint: http://clickhouse:8123
162+
database: "{{ __database }}"
163+
table: "{{ __table }}"
164+
auth:
165+
strategy: basic
166+
user: vector
167+
password: vector
168+
batch:
169+
timeout_secs: 1.0
121170

packages/common/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ impl ActivityCtx {
238238
self.conn.clickhouse().await
239239
}
240240

241+
#[tracing::instrument(skip_all)]
242+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
243+
self.conn.clickhouse_inserter().await
244+
}
245+
241246
#[tracing::instrument(skip_all)]
242247
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
243248
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/api.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ impl ApiCtx {
242242
self.conn.clickhouse().await
243243
}
244244

245+
#[tracing::instrument(skip_all)]
246+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
247+
self.conn.clickhouse_inserter().await
248+
}
249+
245250
#[tracing::instrument(skip_all)]
246251
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
247252
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ impl OperationCtx {
266266
self.conn.clickhouse().await
267267
}
268268

269+
#[tracing::instrument(skip_all)]
270+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
271+
self.conn.clickhouse_inserter().await
272+
}
273+
269274
#[tracing::instrument(skip_all)]
270275
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
271276
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/standalone.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ impl StandaloneCtx {
253253
self.conn.clickhouse().await
254254
}
255255

256+
#[tracing::instrument(skip_all)]
257+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
258+
self.conn.clickhouse_inserter().await
259+
}
260+
256261
#[tracing::instrument(skip_all)]
257262
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
258263
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/test.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,11 @@ impl TestCtx {
296296
self.conn.clickhouse().await
297297
}
298298

299+
#[tracing::instrument(skip_all)]
300+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
301+
self.conn.clickhouse_inserter().await
302+
}
303+
299304
#[tracing::instrument(skip_all, fields(%workflow_id))]
300305
pub async fn sqlite_for_workflow(&self, workflow_id: Uuid) -> GlobalResult<SqlitePool> {
301306
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, true)

packages/common/chirp/worker/src/test.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,8 @@ impl TestCtx {
8484
pub async fn clickhouse(&self) -> GlobalResult<ClickHousePool> {
8585
self.op_ctx.clickhouse().await
8686
}
87+
88+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
89+
self.op_ctx.clickhouse_inserter().await
90+
}
8791
}

0 commit comments

Comments
 (0)