Skip to content

Commit ebbe9f7

Browse files
committed
chore(vector): add vector pipeline to ship clickhouse events from the edge (#2526)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
1 parent 925250a commit ebbe9f7

File tree

61 files changed

+1805
-137
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1805
-137
lines changed

Cargo.lock

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Large diffs are not rendered by default.

docker/dev-full/rivet-edge-server/config.jsonc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
}
6969
}
7070
},
71+
"vector_http": {
72+
"host": "vector-client",
73+
"port": 5022
74+
},
7175
"prometheus": {
7276
"url": "http://prometheus:9090"
7377
},

docker/dev-full/rivet-guard/config.jsonc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@
8080
}
8181
}
8282
},
83+
"vector_http": {
84+
"host": "vector-client",
85+
"port": 5022
86+
},
8387
"prometheus": {
8488
"url": "http://prometheus:9090"
8589
},

docker/dev-full/vector-client/vector.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ sources:
1111
- http://rivet-client:6090
1212
scrape_interval_secs: 15
1313

14+
dynamic_events_http:
15+
type: http_server
16+
address: 0.0.0.0:5022
17+
encoding: ndjson
18+
1419
pegboard_manager:
1520
type: file
1621
include:
@@ -92,6 +97,7 @@ sinks:
9297
type: vector
9398
inputs:
9499
- metrics_add_meta
100+
- dynamic_events_http
95101
- pegboard_manager_add_meta
96102
- pegboard_v8_isolate_runner_add_meta
97103
- pegboard_container_runner_add_meta

docker/dev-full/vector-server/vector.yaml

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,35 @@ transforms:
5959
condition:
6060
type: vrl
6161
source: .source == "pegboard_container_runner"
62+
63+
clickhouse_dynamic_events_filter:
64+
type: filter
65+
inputs:
66+
- vector
67+
condition:
68+
type: vrl
69+
source: .source == "clickhouse"
70+
71+
clickhouse_dynamic_events_transform:
72+
type: remap
73+
inputs:
74+
- clickhouse_dynamic_events_filter
75+
source: |
76+
# Extract and store metadata
77+
__database = .database
78+
__table = .table
79+
__columns = .columns
80+
81+
# Create a new object with just the columns data
82+
. = {
83+
"__database": __database,
84+
"__table": __table,
85+
# By default insert namespace column since most tables include this
86+
"namespace": "rivet"
87+
}
88+
89+
# Merge in the column data that should be inserted
90+
. = merge!(., __columns)
6291
6392
sinks:
6493
prom_exporter:
@@ -82,7 +111,7 @@ sinks:
82111
compression: gzip
83112
database: db_pegboard_actor_log
84113
endpoint: http://clickhouse:8123
85-
table: actor_logs
114+
table: actor_logs2
86115
auth:
87116
strategy: basic
88117
user: vector
@@ -118,4 +147,19 @@ sinks:
118147
path: "/var/log/vector/pegboard_container_runner/%Y-%m-%d.log"
119148
encoding:
120149
codec: "text"
150+
151+
clickhouse_dynamic_events:
152+
type: clickhouse
153+
inputs:
154+
- clickhouse_dynamic_events_transform
155+
compression: gzip
156+
endpoint: http://clickhouse:8123
157+
database: "{{ __database }}"
158+
table: "{{ __table }}"
159+
auth:
160+
strategy: basic
161+
user: vector
162+
password: vector
163+
batch:
164+
timeout_secs: 1.0
121165

docker/monolith/vector-server/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ sinks:
8282
compression: gzip
8383
endpoint: http://clickhouse:9300
8484
database: db_pegboard_actor_log
85-
table: actor_logs
85+
table: actor_logs2
8686
auth:
8787
strategy: basic
8888
user: vector

packages/common/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ impl ActivityCtx {
238238
self.conn.clickhouse().await
239239
}
240240

241+
#[tracing::instrument(skip_all)]
242+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
243+
self.conn.clickhouse_inserter().await
244+
}
245+
241246
#[tracing::instrument(skip_all)]
242247
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
243248
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/api.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ impl ApiCtx {
242242
self.conn.clickhouse().await
243243
}
244244

245+
#[tracing::instrument(skip_all)]
246+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
247+
self.conn.clickhouse_inserter().await
248+
}
249+
245250
#[tracing::instrument(skip_all)]
246251
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
247252
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ impl OperationCtx {
266266
self.conn.clickhouse().await
267267
}
268268

269+
#[tracing::instrument(skip_all)]
270+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
271+
self.conn.clickhouse_inserter().await
272+
}
273+
269274
#[tracing::instrument(skip_all)]
270275
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
271276
self.conn.fdb().await

0 commit comments

Comments
 (0)