Skip to content

Commit dfc3254

Browse files
committed
fix(pegboard): include namespace in actor log query
1 parent 3a57fb1 commit dfc3254

File tree

21 files changed

+562
-435
lines changed

21 files changed

+562
-435
lines changed

Cargo.lock

Lines changed: 276 additions & 276 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/dev-full/vector-server/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ sinks:
111111
compression: gzip
112112
database: db_pegboard_actor_log
113113
endpoint: http://clickhouse:8123
114-
table: actor_logs2
114+
table: actor_logs3
115115
auth:
116116
strategy: basic
117117
user: vector

docker/monolith/vector-server/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ sinks:
8282
compression: gzip
8383
endpoint: http://clickhouse:9300
8484
database: db_pegboard_actor_log
85-
table: actor_logs2
85+
table: actor_logs3
8686
auth:
8787
strategy: basic
8888
user: vector

packages/core/api/actor/src/route/logs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub async fn get_logs(
9696
// frequently and should not return a significant amount of logs.
9797
let logs_res = ctx
9898
.op(pegboard::ops::actor::log::read::Input {
99+
env_id,
99100
actor_ids: actor_ids_clone.clone(),
100101
stream_types: stream_types_clone.clone(),
101102
count: 64,
@@ -136,6 +137,7 @@ pub async fn get_logs(
136137
// Read most recent logs
137138

138139
ctx.op(pegboard::ops::actor::log::read::Input {
140+
env_id,
139141
actor_ids: actor_ids.clone(),
140142
stream_types: stream_types.clone(),
141143
count: 256,

packages/edge/infra/client/container-runner/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ rivet-logs.workspace = true
1616
serde = { version = "1.0.195", features = ["derive"] }
1717
serde_json = "1.0.111"
1818
signal-hook = "0.3.17"
19+
uuid = { version = "1.6.1", features = ["v4"] }
1920

2021
[dev-dependencies]
2122
portpicker = "0.1.1"
2223
tempfile = "3.9.0"
23-
uuid = { version = "1.6.1", features = ["v4"] }

packages/edge/infra/client/container-runner/src/log_shipper.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{io::Write, net::TcpStream, sync::mpsc, thread::JoinHandle};
33
use anyhow::*;
44
use serde::Serialize;
55
use serde_json;
6+
use uuid::Uuid;
67

78
#[derive(Copy, Clone, Debug)]
89
#[repr(u8)]
@@ -37,6 +38,8 @@ pub struct LogShipper {
3738
pub vector_socket_addr: String,
3839

3940
pub actor_id: String,
41+
42+
pub env_id: Uuid,
4043
}
4144

4245
impl LogShipper {
@@ -91,7 +94,7 @@ impl LogShipper {
9194
while let Result::Ok(message) = self.msg_rx.recv() {
9295
let vector_message = VectorMessage::Actors {
9396
actor_id: self.actor_id.as_str(),
94-
task: "main", // Backwards compatibility with logs
97+
env_id: self.env_id,
9598
stream_type: message.stream_type as u8,
9699
ts: message.ts,
97100
message: message.message.as_str(),
@@ -114,7 +117,7 @@ enum VectorMessage<'a> {
114117
#[serde(rename = "actors")]
115118
Actors {
116119
actor_id: &'a str,
117-
task: &'a str,
120+
env_id: Uuid,
118121
stream_type: u8,
119122
ts: u64,
120123
message: &'a str,

packages/edge/infra/client/container-runner/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{fs, path::Path, sync::mpsc, time::Duration};
22

33
use anyhow::*;
44
use utils::var;
5+
use uuid::Uuid;
56

67
mod container;
78
mod log_shipper;
@@ -36,6 +37,7 @@ fn main() -> Result<()> {
3637
.transpose()
3738
.context("failed to parse vector socket addr")?;
3839
let actor_id = var("ACTOR_ID")?;
40+
let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?;
3941

4042
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
4143

@@ -48,6 +50,7 @@ fn main() -> Result<()> {
4850
msg_rx,
4951
vector_socket_addr,
5052
actor_id,
53+
env_id,
5154
};
5255
let log_shipper_thread = log_shipper.spawn();
5356
(Some(msg_tx), Some(log_shipper_thread))

packages/edge/infra/client/manager/src/actor/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,24 @@ pub struct Actor {
2929
actor_id: Uuid,
3030
generation: u32,
3131
config: protocol::ActorConfig,
32+
metadata: protocol::ActorMetadata,
3233

3334
runner: Mutex<Option<runner::Handle>>,
3435
exited: Mutex<bool>,
3536
}
3637

3738
impl Actor {
38-
pub fn new(actor_id: Uuid, generation: u32, config: protocol::ActorConfig) -> Arc<Self> {
39+
pub fn new(
40+
actor_id: Uuid,
41+
generation: u32,
42+
config: protocol::ActorConfig,
43+
metadata: protocol::ActorMetadata,
44+
) -> Arc<Self> {
3945
Arc::new(Actor {
4046
actor_id,
4147
generation,
4248
config,
49+
metadata,
4350

4451
runner: Mutex::new(None),
4552
exited: Mutex::new(false),
@@ -50,12 +57,14 @@ impl Actor {
5057
actor_id: Uuid,
5158
generation: u32,
5259
config: protocol::ActorConfig,
60+
metadata: protocol::ActorMetadata,
5361
runner: runner::Handle,
5462
) -> Arc<Self> {
5563
Arc::new(Actor {
5664
actor_id,
5765
generation,
5866
config,
67+
metadata,
5968

6069
runner: Mutex::new(Some(runner)),
6170
exited: Mutex::new(false),
@@ -209,6 +218,10 @@ impl Actor {
209218
.to_string(),
210219
),
211220
("ACTOR_ID", self.actor_id.to_string()),
221+
(
222+
"ENVIRONMENT_ID",
223+
self.metadata.environment.env_id.to_string(),
224+
),
212225
];
213226
if let Some(vector) = &ctx.config().vector {
214227
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));

packages/edge/infra/client/manager/src/ctx.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,8 @@ impl Ctx {
421421
generation,
422422
config,
423423
} => {
424+
let metadata = config.metadata.deserialize()?;
425+
424426
let mut actors = self.actors.write().await;
425427

426428
if actors.contains_key(&(actor_id, generation)) {
@@ -430,7 +432,7 @@ impl Ctx {
430432
"actor with this actor id + generation already exists, ignoring start command",
431433
);
432434
} else {
433-
let actor = Actor::new(actor_id, generation, *config);
435+
let actor = Actor::new(actor_id, generation, *config, metadata);
434436

435437
// Insert actor
436438
actors.insert((actor_id, generation), actor);
@@ -718,6 +720,7 @@ impl Ctx {
718720

719721
let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;
720722
let generation = row.generation.try_into()?;
723+
let metadata = config.metadata.deserialize()?;
721724

722725
match &isolate_runner {
723726
Some(isolate_runner) if pid == isolate_runner.pid().as_raw() => {}
@@ -736,7 +739,7 @@ impl Ctx {
736739
}
737740

738741
// Clean up actor. We run `cleanup_setup` instead of `cleanup` because `cleanup` publishes events.
739-
let actor = Actor::new(row.actor_id, generation, config);
742+
let actor = Actor::new(row.actor_id, generation, config, metadata);
740743
actor.cleanup_setup(self).await;
741744
}
742745

@@ -878,6 +881,7 @@ impl Ctx {
878881

879882
let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;
880883
let generation = row.generation.try_into()?;
884+
let metadata = config.metadata.deserialize()?;
881885

882886
let runner = match &isolate_runner {
883887
// We have to clone the existing isolate runner handle instead of creating a new one so it
@@ -901,7 +905,7 @@ impl Ctx {
901905
},
902906
};
903907

904-
let actor = Actor::with_runner(row.actor_id, generation, config, runner);
908+
let actor = Actor::with_runner(row.actor_id, generation, config, metadata, runner);
905909
let actor = actors_guard
906910
.entry((row.actor_id, generation))
907911
.or_insert(actor);

packages/edge/services/pegboard/db/actor-log/migrations/20250703191728_drop_actor_logs_metadata.down.sql

Whitespace-only changes.

0 commit comments

Comments
 (0)