Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docker/dev-full/rivet-edge-server/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
}
}
},
"vector_http": {
"host": "vector-client",
"port": 5022
},
"prometheus": {
"url": "http://prometheus:9090"
},
Expand Down
4 changes: 4 additions & 0 deletions docker/dev-full/rivet-guard/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
}
}
},
"vector_http": {
"host": "vector-client",
"port": 5022
},
"prometheus": {
"url": "http://prometheus:9090"
},
Expand Down
6 changes: 6 additions & 0 deletions docker/dev-full/vector-client/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ sources:
- http://rivet-client:6090
scrape_interval_secs: 15

dynamic_events_http:
type: http_server
address: 0.0.0.0:5022
encoding: ndjson
Comment on lines +14 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No transform is defined for dynamic_events_http before sending to sink. Consider adding a transform to ensure consistent metadata tagging like other sources.


pegboard_manager:
type: file
include:
Expand Down Expand Up @@ -92,6 +97,7 @@ sinks:
type: vector
inputs:
- metrics_add_meta
- dynamic_events_http
- pegboard_manager_add_meta
- pegboard_v8_isolate_runner_add_meta
- pegboard_container_runner_add_meta
Expand Down
46 changes: 45 additions & 1 deletion docker/dev-full/vector-server/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ transforms:
condition:
type: vrl
source: .source == "pegboard_container_runner"

clickhouse_dynamic_events_filter:
type: filter
inputs:
- vector
condition:
type: vrl
source: .source == "clickhouse"

clickhouse_dynamic_events_transform:
type: remap
inputs:
- clickhouse_dynamic_events_filter
source: |
# Extract and store metadata
__database = .database
__table = .table
__columns = .columns

# Create a new object with just the columns data
. = {
"__database": __database,
"__table": __table,
# By default insert namespace column since most tables include this
"namespace": "rivet"
}

# Merge in the column data that should be inserted
. = merge!(., __columns)

sinks:
prom_exporter:
Expand All @@ -82,7 +111,7 @@ sinks:
compression: gzip
database: db_pegboard_actor_log
endpoint: http://clickhouse:8123
table: actor_logs
table: actor_logs2
auth:
strategy: basic
user: vector
Expand Down Expand Up @@ -118,4 +147,19 @@ sinks:
path: "/var/log/vector/pegboard_container_runner/%Y-%m-%d.log"
encoding:
codec: "text"

clickhouse_dynamic_events:
type: clickhouse
inputs:
- clickhouse_dynamic_events_transform
compression: gzip
endpoint: http://clickhouse:8123
database: "{{ __database }}"
table: "{{ __table }}"
auth:
strategy: basic
user: vector
password: vector
batch:
timeout_secs: 1.0

2 changes: 1 addition & 1 deletion docker/monolith/vector-server/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ sinks:
compression: gzip
endpoint: http://clickhouse:9300
database: db_pegboard_actor_log
table: actor_logs
table: actor_logs2
auth:
strategy: basic
user: vector
Expand Down
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ impl ActivityCtx {
self.conn.clickhouse().await
}

#[tracing::instrument(skip_all)]
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.conn.clickhouse_inserter().await
}

#[tracing::instrument(skip_all)]
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
self.conn.fdb().await
Expand Down
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl ApiCtx {
self.conn.clickhouse().await
}

#[tracing::instrument(skip_all)]
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.conn.clickhouse_inserter().await
}

#[tracing::instrument(skip_all)]
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
self.conn.fdb().await
Expand Down
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ impl OperationCtx {
self.conn.clickhouse().await
}

#[tracing::instrument(skip_all)]
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.conn.clickhouse_inserter().await
}

#[tracing::instrument(skip_all)]
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
self.conn.fdb().await
Expand Down
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ impl StandaloneCtx {
self.conn.clickhouse().await
}

#[tracing::instrument(skip_all)]
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.conn.clickhouse_inserter().await
}

#[tracing::instrument(skip_all)]
pub async fn fdb(&self) -> Result<FdbPool, rivet_pools::Error> {
self.conn.fdb().await
Expand Down
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ impl TestCtx {
self.conn.clickhouse().await
}

#[tracing::instrument(skip_all)]
pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.conn.clickhouse_inserter().await
}

#[tracing::instrument(skip_all, fields(%workflow_id))]
pub async fn sqlite_for_workflow(&self, workflow_id: Uuid) -> GlobalResult<SqlitePool> {
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, true)
Expand Down
4 changes: 4 additions & 0 deletions packages/common/chirp/worker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ impl TestCtx {
pub async fn clickhouse(&self) -> GlobalResult<ClickHousePool> {
self.op_ctx.clickhouse().await
}

pub async fn clickhouse_inserter(&self) -> GlobalResult<ClickHouseInserterHandle> {
self.op_ctx.clickhouse_inserter().await
}
}
19 changes: 19 additions & 0 deletions packages/common/clickhouse-inserter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "clickhouse-inserter"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
global-error.workspace = true
tokio.workspace = true
reqwest = { version = "0.11", features = ["json"] }
tracing.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
thiserror = "1.0"
tokio-util = "0.7"
futures = "0.3"
async-channel = "2.1.1"
anyhow.workspace = true
16 changes: 16 additions & 0 deletions packages/common/clickhouse-inserter/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("failed to send event to ClickHouse inserter")]
ChannelSendError,

#[error("serialization error: {0}")]
SerializationError(#[source] serde_json::Error),

#[error("failed to build reqwest client: {0}")]
ReqwestBuildError(#[source] reqwest::Error),

#[error("failed to spawn background task")]
TaskSpawnError,
}
Loading
Loading