Skip to content

Commit 71023be

Browse files
committed
chore(vector): add vector pipeline to ship clickhouse events from the edge
1 parent eeb9b08 commit 71023be

File tree

43 files changed

+1206
-44
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1206
-44
lines changed

Cargo.lock

Lines changed: 31 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Large diffs are not rendered by default.

docker/dev-full/rivet-guard/config.jsonc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@
8080
}
8181
}
8282
},
83+
"vector_http": {
84+
"host": "vector-client",
85+
"port": 5022
86+
},
8387
"prometheus": {
8488
"url": "http://prometheus:9090"
8589
},

docker/dev-full/rivet-server/config.jsonc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@
6565
}
6666
}
6767
},
68+
"vector_http": {
69+
"host": "vector-client",
70+
"port": 5022
71+
},
6872
"prometheus": {
6973
"url": "http://prometheus:9090"
7074
},

docker/dev-full/vector-client/vector.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ sources:
1111
- http://rivet-client:6090
1212
scrape_interval_secs: 15
1313

14+
dynamic_events_http:
15+
type: http_server
16+
address: 0.0.0.0:5022
17+
decoding:
18+
codec: json
19+
1420
pegboard_manager:
1521
type: file
1622
include:
@@ -92,6 +98,7 @@ sinks:
9298
type: vector
9399
inputs:
94100
- metrics_add_meta
101+
- dynamic_events_http
95102
- pegboard_manager_add_meta
96103
- pegboard_v8_isolate_runner_add_meta
97104
- pegboard_container_runner_add_meta

docker/dev-full/vector-server/vector.yaml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,33 @@ transforms:
5959
condition:
6060
type: vrl
6161
source: .source == "pegboard_container_runner"
62+
63+
clickhouse_dynamic_events_filter:
64+
type: filter
65+
inputs:
66+
- vector
67+
condition:
68+
type: vrl
69+
source: .source == "clickhouse"
70+
71+
clickhouse_dynamic_events_transform:
72+
type: remap
73+
inputs:
74+
- clickhouse_dynamic_events_filter
75+
source: |
76+
# Extract and store metadata
77+
__database = .database
78+
__table = .table
79+
__columns = .columns
80+
81+
# Create a new object with just the columns data
82+
. = {
83+
"__database": __database,
84+
"__table": __table
85+
}
86+
87+
# Merge in the column data that should be inserted
88+
. = merge!(., __columns)
6289
6390
sinks:
6491
prom_exporter:
@@ -118,4 +145,19 @@ sinks:
118145
path: "/var/log/vector/pegboard_container_runner/%Y-%m-%d.log"
119146
encoding:
120147
codec: "text"
148+
149+
clickhouse_dynamic_events:
150+
type: clickhouse
151+
inputs:
152+
- clickhouse_dynamic_events_transform
153+
compression: gzip
154+
endpoint: http://clickhouse:8123
155+
database: "{{ __database }}"
156+
table: "{{ __table }}"
157+
auth:
158+
strategy: basic
159+
user: vector
160+
password: vector
161+
batch:
162+
timeout_secs: 1.0
121163

packages/common/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ impl ActivityCtx {
238238
self.conn.clickhouse().await
239239
}
240240

241+
#[tracing::instrument(skip_all)]
242+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
243+
self.conn.clickhouse_inserter().await
244+
}
245+
241246
#[tracing::instrument(skip_all)]
242247
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
243248
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/api.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ impl ApiCtx {
242242
self.conn.clickhouse().await
243243
}
244244

245+
#[tracing::instrument(skip_all)]
246+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
247+
self.conn.clickhouse_inserter().await
248+
}
249+
245250
#[tracing::instrument(skip_all)]
246251
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
247252
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ impl OperationCtx {
266266
self.conn.clickhouse().await
267267
}
268268

269+
#[tracing::instrument(skip_all)]
270+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
271+
self.conn.clickhouse_inserter().await
272+
}
273+
269274
#[tracing::instrument(skip_all)]
270275
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
271276
self.conn.fdb().await

packages/common/chirp-workflow/core/src/ctx/standalone.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ impl StandaloneCtx {
253253
self.conn.clickhouse().await
254254
}
255255

256+
#[tracing::instrument(skip_all)]
257+
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
258+
self.conn.clickhouse_inserter().await
259+
}
260+
256261
#[tracing::instrument(skip_all)]
257262
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
258263
self.conn.fdb().await

0 commit comments

Comments
 (0)