Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct LogShipper {
pub vector_socket_addr: String,

pub runner_id: String,
pub actor_id: Option<String>,

pub env_id: Uuid,
}
Expand Down Expand Up @@ -94,6 +95,7 @@ impl LogShipper {
while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage::Runners {
runner_id: self.runner_id.as_str(),
actor_id: self.actor_id.as_ref().map(|x| x.as_str()),
env_id: self.env_id,
stream_type: message.stream_type as u8,
ts: message.ts,
Expand All @@ -117,6 +119,7 @@ enum VectorMessage<'a> {
#[serde(rename = "runners")]
Runners {
runner_id: &'a str,
actor_id: Option<&'a str>,
env_id: Uuid,
stream_type: u8,
ts: u64,
Expand Down
3 changes: 3 additions & 0 deletions packages/edge/infra/client/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ fn main() -> Result<()> {
.transpose()
.context("failed to parse vector socket addr")?;
let runner_id = var("RUNNER_ID")?;
// Only set if this is a single allocation runner (one actor running on it)
let actor_id = var("ACTOR_ID").ok();
let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?;
println!("Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}", vector_socket_addr.as_ref().map(|x| x.as_str()).unwrap_or("?"));

Expand All @@ -51,6 +53,7 @@ fn main() -> Result<()> {
msg_rx,
vector_socket_addr,
runner_id,
actor_id,
env_id,
};
let log_shipper_thread = log_shipper.spawn();
Expand Down
8 changes: 7 additions & 1 deletion packages/edge/infra/client/manager/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,15 @@ impl Actor {
.context("should have runner config")?
{
protocol::ActorRunner::New { .. } => {
let actor_id = matches!(
self.runner.config().image.allocation_type,
protocol::ImageAllocationType::Single
)
.then_some(self.actor_id);

// Because the runner is not already started we can get the ports here instead of reading from
// sqlite
let ports = self.runner.start(ctx).await?;
let ports = self.runner.start(ctx, actor_id).await?;

let pid = self.runner.pid().await?;

Expand Down
10 changes: 9 additions & 1 deletion packages/edge/infra/client/manager/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ impl Runner {
Ok(())
}

// `actor_id` is set if this runner has a single allocation type which means there is only one actor
// runner on it
pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
actor_id: Option<Uuid>,
) -> Result<protocol::HashableMap<String, protocol::ProxiedPort>> {
tracing::info!(runner_id=?self.runner_id, "starting");

Expand Down Expand Up @@ -305,7 +308,7 @@ impl Runner {
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
match self2.run(&ctx2).await {
match self2.run(&ctx2, actor_id).await {
Ok(_) => {
if let Err(err) = self2.observe(&ctx2, false).await {
tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed");
Expand Down Expand Up @@ -339,7 +342,12 @@ impl Runner {
.to_string(),
),
("RUNNER_ID", self.runner_id.to_string()),
(
"ENVIRONMENT_ID",
self.metadata.environment.env_id.to_string(),
),
];

if let Some(vector) = &ctx.config().vector {
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

CREATE TABLE IF NOT EXISTS runner_logs (
runner_id UUID,
actor_id UUID, -- When not set will be the NIL UUID (all zeros)
stream_type UInt8, -- pegboard::types::LogsStreamType
ts DateTime64 (9),
message String
) ENGINE = ReplicatedMergeTree ()
PARTITION BY
toStartOfHour (ts)
ORDER BY (
runner_id,
toUnixTimestamp (ts),
stream_type
)
TTL toDate (ts + toIntervalDay (3))
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;
Loading