-
Notifications
You must be signed in to change notification settings - Fork 114
chore(vector): add vector pipeline to ship clickhouse events from the edge #2526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Deploying rivet-studio with
|
Latest commit: |
526f37e
|
Status: | ✅ Deploy successful! |
Preview URL: | https://124bf5a3.rivet-studio.pages.dev |
Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet-studio.pages.dev |
b9cc460
to
8d1f2c3
Compare
Deploying rivet with
|
Latest commit: |
526f37e
|
Status: | ✅ Deploy successful! |
Preview URL: | https://4f91f438.rivet.pages.dev |
Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet.pages.dev |
Deploying rivet-hub with
|
Latest commit: |
526f37e
|
Status: | ✅ Deploy successful! |
Preview URL: | https://04f65c11.rivet-hub-7jb.pages.dev |
Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet-hub-7jb.pages.dev |
71023be
to
58a5eca
Compare
packages/common/pools/src/pools.rs
Outdated
@@ -40,12 +42,24 @@ impl Pools { | |||
let clickhouse = crate::db::clickhouse::setup(config.clone())?; | |||
let sqlite = SqlitePoolManager::new(fdb.clone()).await?; | |||
|
|||
// Create the ClickHouse inserter if ClickHouse is enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Create the ClickHouse inserter if ClickHouse is enabled | |
// Create the ClickHouse inserter if vector is enabled |
) -> GlobalResult<Response<Full<Bytes>>> { | ||
let _request_start = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused?
let request_id = Uuid::new_v4(); | ||
let mut request_context = | ||
RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uses new_with_request_id
with a new uuid, same as new
let request_id = Uuid::new_v4(); | |
let mut request_context = | |
RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone()); | |
let mut request_context = | |
RequestContext::new(self.state.clickhouse_inserter.clone()); |
network_ports_ingress: HashMap<String, ActorClickHouseRowPortIngress>, | ||
network_ports_host: HashMap<String, ActorClickHouseRowPortHost>, | ||
network_ports_proxied: HashMap<String, ActorClickHouseRowPortProxied>, | ||
client_id: Uuid, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add runner_id now or later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now
packages/edge/services/pegboard/src/workflows/actor/analytics.rs
Outdated
Show resolved
Hide resolved
if let Some(actor_input) = &input.actor_input { | ||
// Get metadata | ||
let meta = ctx | ||
.activity(GetMetaInput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also be .v(2)
?
@@ -261,6 +267,12 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul | |||
}) | |||
.await?; | |||
|
|||
ctx.v(2).activity(InsertClickHouseInput { | |||
input: input.clone(), | |||
meta, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This metadata includes build data which changes if the actor's build is changed. the metadata is re-fetched on reschedule in reschedule_actor
as actor_setup.meta
43b5230
to
64194de
Compare
64194de
to
e120ace
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds a Vector pipeline to ship ClickHouse events from edge services, with significant changes across the codebase. Here are the key points:
- Introduces new
clickhouse-inserter
package for batched event insertion with configurable batch sizes and intervals - Adds
http_requests
table in ClickHouse for guard analytics with 30-day TTL and comprehensive request metadata - Creates new
actor_logs2
table with 14-day TTL and materialized viewactor_logs2_with_metadata
for enhanced actor logging - Implements request context tracking in guard service with detailed metrics collection and WebSocket support
- Adds non-interactive route management capabilities to CLI with auto-create and auto-sync options
58 file(s) reviewed, 14 comment(s)
Edit PR Review Bot Settings | Greptile
dynamic_events_http: | ||
type: http_server | ||
address: 0.0.0.0:5022 | ||
encoding: ndjson |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: No transform is defined for dynamic_events_http before sending to sink. Consider adding a transform to ensure consistent metadata tagging like other sources.
if events.len() >= BATCH_SIZE { | ||
if let Err(e) = self.send_events(&events).await { | ||
tracing::error!(?e, "failed to send events to Vector"); | ||
} | ||
events.clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Events are cleared even if send_events fails. This could result in data loss. Consider only clearing after successful send or implementing a retry mechanism.
if events.len() >= BATCH_SIZE { | |
if let Err(e) = self.send_events(&events).await { | |
tracing::error!(?e, "failed to send events to Vector"); | |
} | |
events.clear(); | |
} | |
if events.len() >= BATCH_SIZE { | |
if let Err(e) = self.send_events(&events).await { | |
tracing::error!(?e, "failed to send events to Vector"); | |
} else { | |
events.clear(); | |
} | |
} |
use global_error::GlobalResult; | ||
use lazy_static::lazy_static; | ||
use std::{net::IpAddr, time::SystemTime}; | ||
use tracing::warn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: tracing::warn is imported but never used in the code
use tracing::warn; | |
use global_error::GlobalResult; | |
use lazy_static::lazy_static; | |
use std::{net::IpAddr, time::SystemTime}; | |
use uuid::Uuid; |
let request_send_start = Instant::now(); | ||
match timeout(timeout_duration, self.client.request(proxied_req)).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: request_send_start is captured but response_receive_time is calculated and never used. Consider either removing the unused timing or using it for metrics/logging.
ts DateTime64 (9), | ||
stream_type UInt8, -- pegboard::types::LogsStreamType | ||
message String | ||
) ENGINE = ReplicatedMergeTree () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: ReplicatedMergeTree() missing required parameters for zookeeper path and replica name
ALTER TABLE state ADD project_id BLOB DEFAULT X'00000000000000000000000000000000'; -- UUID | ||
ALTER TABLE state ADD root_user_enabled INT DEFAULT false; | ||
ALTER TABLE state ADD build_kind INT DEFAULT -1; | ||
ALTER TABLE state ADD build_compression INT DEFAULT -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The default values for build_kind and build_compression are set to -1, which may be invalid enum values. Consider using 0 or NULL if these are meant to represent unset states.
let state_row = sql_fetch_one!( | ||
[ctx, StateRow, &pool] | ||
" | ||
SELECT | ||
project_id, | ||
env_id, | ||
json(tags) AS tags, | ||
resources_cpu_millicores, | ||
resources_memory_mib, | ||
selected_resources_cpu_millicores, | ||
selected_resources_memory_mib, | ||
client_id, | ||
client_workflow_id, | ||
client_wan_hostname, | ||
lifecycle_kill_timeout_ms, | ||
lifecycle_durable, | ||
create_ts, | ||
start_ts, | ||
connectable_ts, | ||
finish_ts, | ||
destroy_ts, | ||
image_id, | ||
build_kind, | ||
build_compression, | ||
root_user_enabled, | ||
json(args) AS args, | ||
network_mode, | ||
json(environment) AS environment | ||
FROM state | ||
", | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: SQL query lacks a WHERE clause to filter by actor_id, which could return incorrect data if multiple actors exist.
let state_row = sql_fetch_one!( | |
[ctx, StateRow, &pool] | |
" | |
SELECT | |
project_id, | |
env_id, | |
json(tags) AS tags, | |
resources_cpu_millicores, | |
resources_memory_mib, | |
selected_resources_cpu_millicores, | |
selected_resources_memory_mib, | |
client_id, | |
client_workflow_id, | |
client_wan_hostname, | |
lifecycle_kill_timeout_ms, | |
lifecycle_durable, | |
create_ts, | |
start_ts, | |
connectable_ts, | |
finish_ts, | |
destroy_ts, | |
image_id, | |
build_kind, | |
build_compression, | |
root_user_enabled, | |
json(args) AS args, | |
network_mode, | |
json(environment) AS environment | |
FROM state | |
", | |
) | |
.await?; | |
let state_row = sql_fetch_one!( | |
[ctx, StateRow, &pool] | |
" | |
SELECT | |
project_id, | |
env_id, | |
json(tags) AS tags, | |
resources_cpu_millicores, | |
resources_memory_mib, | |
selected_resources_cpu_millicores, | |
selected_resources_memory_mib, | |
client_id, | |
client_workflow_id, | |
client_wan_hostname, | |
lifecycle_kill_timeout_ms, | |
lifecycle_durable, | |
create_ts, | |
start_ts, | |
connectable_ts, | |
finish_ts, | |
destroy_ts, | |
image_id, | |
build_kind, | |
build_compression, | |
root_user_enabled, | |
json(args) AS args, | |
network_mode, | |
json(environment) AS environment | |
FROM state | |
WHERE actor_id = ? | |
", | |
input.actor_id | |
) | |
.await?; |
sql_execute!( | ||
[ctx, pool] | ||
" | ||
UPDATE state | ||
SET | ||
project_id = ?, | ||
build_kind = ?, | ||
build_compression = ?, | ||
root_user_enabled = ? | ||
", | ||
input.meta.project_id, | ||
input.meta.build_kind as i64, | ||
input.meta.build_compression as i64, | ||
input.root_user_enabled, | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: SQL update lacks a WHERE clause - could update all state records unintentionally
let result = apis::routes_api::routes_update( | ||
&ctx.openapi_config_cloud, | ||
&self.name, | ||
update_route_body.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Unnecessary clone() of update_route_body since it's not used after the API call
update_route_body.clone(), | |
update_route_body, |
let matching_route = routes_response | ||
.routes | ||
.iter() | ||
.find(|route| route.id == *route_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Route lookup uses ID equality but name field may not be an ID. Consider documenting the expected format or validating input.
b43c653
to
a32e202
Compare
9bc9e25
to
21b6b12
Compare
21b6b12
to
ec7ede8
Compare
a32e202
to
526f37e
Compare
Merge activity
|
… edge (#2526) <!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
Changes