Skip to content

Commit f54d281

Browse files
committed
feat(pb): add runner logs
1 parent 0074232 commit f54d281

File tree

6 files changed

+35
-4
lines changed

6 files changed

+35
-4
lines changed

packages/edge/infra/client/container-runner/src/log_shipper.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct LogShipper {
3737
pub vector_socket_addr: String,
3838

3939
pub runner_id: String,
40+
pub actor_id: Option<String>,
4041
}
4142

4243
impl LogShipper {
@@ -91,7 +92,7 @@ impl LogShipper {
9192
while let Result::Ok(message) = self.msg_rx.recv() {
9293
let vector_message = VectorMessage::Runners {
9394
runner_id: self.runner_id.as_str(),
94-
task: "main", // Backwards compatibility with logs
95+
actor_id: self.actor_id.as_ref().map(|x| x.as_str()),
9596
stream_type: message.stream_type as u8,
9697
ts: message.ts,
9798
message: message.message.as_str(),
@@ -114,7 +115,7 @@ enum VectorMessage<'a> {
114115
#[serde(rename = "runners")]
115116
Runners {
116117
runner_id: &'a str,
117-
task: &'a str,
118+
actor_id: Option<&'a str>,
118119
stream_type: u8,
119120
ts: u64,
120121
message: &'a str,

packages/edge/infra/client/container-runner/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ fn main() -> Result<()> {
3636
.transpose()
3737
.context("failed to parse vector socket addr")?;
3838
let runner_id = var("RUNNER_ID")?;
39+
// Only set if this is a single allocation runner (one actor running on it)
40+
let actor_id = var("ACTOR_ID").ok();
3941

4042
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
4143

@@ -48,6 +50,7 @@ fn main() -> Result<()> {
4850
msg_rx,
4951
vector_socket_addr,
5052
runner_id,
53+
actor_id,
5154
};
5255
let log_shipper_thread = log_shipper.spawn();
5356
(Some(msg_tx), Some(log_shipper_thread))

packages/edge/infra/client/manager/src/actor/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,15 @@ impl Actor {
116116
.context("should have runner config")?
117117
{
118118
protocol::ActorRunner::New { .. } => {
119+
let actor_id = matches!(
120+
self.runner.config().image.allocation_type,
121+
protocol::ImageAllocationType::Single
122+
)
123+
.then_some(self.actor_id);
124+
119125
// Because the runner is not already started we can get the ports here instead of reading from
120126
// sqlite
121-
let ports = self.runner.start(ctx).await?;
127+
let ports = self.runner.start(ctx, actor_id).await?;
122128

123129
let pid = self.runner.pid().await?;
124130

packages/edge/infra/client/manager/src/runner/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,12 @@ impl Runner {
250250
Ok(())
251251
}
252252

253+
// `actor_id` is set if this runner has a single allocation type which means there is only one actor
254+
// runner on it
253255
pub async fn start(
254256
self: &Arc<Self>,
255257
ctx: &Arc<Ctx>,
258+
actor_id: Option<Uuid>,
256259
) -> Result<protocol::HashableMap<String, protocol::ProxiedPort>> {
257260
tracing::info!(runner_id=?self.runner_id, "starting");
258261

@@ -305,7 +308,7 @@ impl Runner {
305308
let self2 = self.clone();
306309
let ctx2 = ctx.clone();
307310
tokio::spawn(async move {
308-
match self2.run(&ctx2).await {
311+
match self2.run(&ctx2, actor_id).await {
309312
Ok(_) => {
310313
if let Err(err) = self2.observe(&ctx2, false).await {
311314
tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed");
@@ -340,6 +343,7 @@ impl Runner {
340343
),
341344
("RUNNER_ID", self.runner_id.to_string()),
342345
];
346+
343347
if let Some(vector) = &ctx.config().vector {
344348
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
345349
}

packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.down.sql

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
CREATE TABLE IF NOT EXISTS runner_logs (
3+
runner_id UUID,
4+
actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5+
stream_type UInt8, -- pegboard::types::LogsStreamType
6+
ts DateTime64 (9),
7+
message String
8+
) ENGINE = ReplicatedMergeTree ()
9+
PARTITION BY
10+
toStartOfHour (ts)
11+
ORDER BY (
12+
runner_id,
13+
toUnixTimestamp (ts),
14+
stream_type
15+
)
16+
TTL toDate (ts + toIntervalDay (3))
17+
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

0 commit comments

Comments
 (0)