File tree Expand file tree Collapse file tree 6 files changed +39
-2
lines changed
services/pegboard/db/runner-log/migrations Expand file tree Collapse file tree 6 files changed +39
-2
lines changed Original file line number Diff line number Diff line change @@ -38,6 +38,7 @@ pub struct LogShipper {
3838 pub vector_socket_addr : String ,
3939
4040 pub runner_id : String ,
41+ pub actor_id : Option < String > ,
4142
4243 pub env_id : Uuid ,
4344}
@@ -94,6 +95,7 @@ impl LogShipper {
9495 while let Result :: Ok ( message) = self . msg_rx . recv ( ) {
9596 let vector_message = VectorMessage :: Runners {
9697 runner_id : self . runner_id . as_str ( ) ,
98+ actor_id : self . actor_id . as_ref ( ) . map ( |x| x. as_str ( ) ) ,
9799 env_id : self . env_id ,
98100 stream_type : message. stream_type as u8 ,
99101 ts : message. ts ,
@@ -117,6 +119,7 @@ enum VectorMessage<'a> {
117119 #[ serde( rename = "runners" ) ]
118120 Runners {
119121 runner_id : & ' a str ,
122+ actor_id : Option < & ' a str > ,
120123 env_id : Uuid ,
121124 stream_type : u8 ,
122125 ts : u64 ,
Original file line number Diff line number Diff line change @@ -37,6 +37,8 @@ fn main() -> Result<()> {
3737 . transpose ( )
3838 . context ( "failed to parse vector socket addr" ) ?;
3939 let runner_id = var ( "RUNNER_ID" ) ?;
40+ // Only set if this is a single allocation runner (one actor running on it)
41+ let actor_id = var ( "ACTOR_ID" ) . ok ( ) ;
4042 let env_id = Uuid :: parse_str ( & var ( "ENVIRONMENT_ID" ) ?) ?;
4143 println ! ( "Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}" , vector_socket_addr. as_ref( ) . map( |x| x. as_str( ) ) . unwrap_or( "?" ) ) ;
4244
@@ -51,6 +53,7 @@ fn main() -> Result<()> {
5153 msg_rx,
5254 vector_socket_addr,
5355 runner_id,
56+ actor_id,
5457 env_id,
5558 } ;
5659 let log_shipper_thread = log_shipper. spawn ( ) ;
Original file line number Diff line number Diff line change @@ -116,9 +116,15 @@ impl Actor {
116116 . context ( "should have runner config" ) ?
117117 {
118118 protocol:: ActorRunner :: New { .. } => {
119+ let actor_id = matches ! (
120+ self . runner. config( ) . image. allocation_type,
121+ protocol:: ImageAllocationType :: Single
122+ )
123+ . then_some ( self . actor_id ) ;
124+
119125 // Because the runner is not already started we can get the ports here instead of reading from
120126 // sqlite
121- let ports = self . runner . start ( ctx) . await ?;
127+ let ports = self . runner . start ( ctx, actor_id ) . await ?;
122128
123129 let pid = self . runner . pid ( ) . await ?;
124130
Original file line number Diff line number Diff line change @@ -250,9 +250,12 @@ impl Runner {
250250 Ok ( ( ) )
251251 }
252252
253+ // `actor_id` is set if this runner has a single allocation type which means there is only one actor
254+ // runner on it
253255 pub async fn start (
254256 self : & Arc < Self > ,
255257 ctx : & Arc < Ctx > ,
258+ actor_id : Option < Uuid > ,
256259 ) -> Result < protocol:: HashableMap < String , protocol:: ProxiedPort > > {
257260 tracing:: info!( runner_id=?self . runner_id, "starting" ) ;
258261
@@ -305,7 +308,7 @@ impl Runner {
305308 let self2 = self . clone ( ) ;
306309 let ctx2 = ctx. clone ( ) ;
307310 tokio:: spawn ( async move {
308- match self2. run ( & ctx2) . await {
311+ match self2. run ( & ctx2, actor_id ) . await {
309312 Ok ( _) => {
310313 if let Err ( err) = self2. observe ( & ctx2, false ) . await {
311314 tracing:: error!( runner_id=?self2. runner_id, ?err, "observe failed" ) ;
@@ -339,7 +342,12 @@ impl Runner {
339342 . to_string( ) ,
340343 ) ,
341344 ( "RUNNER_ID" , self . runner_id. to_string( ) ) ,
345+ (
346+ "ENVIRONMENT_ID" ,
347+ self . metadata. environment. env_id. to_string( ) ,
348+ ) ,
342349 ] ;
350+
343351 if let Some ( vector) = & ctx. config ( ) . vector {
344352 runner_env. push ( ( "VECTOR_SOCKET_ADDR" , vector. address . to_string ( ) ) ) ;
345353 }
Original file line number Diff line number Diff line change 1+
2+ CREATE TABLE IF NOT EXISTS runner_logs (
3+ runner_id UUID,
4+ actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5+ stream_type UInt8, -- pegboard::types::LogsStreamType
6+ ts DateTime64 (9 ),
7+ message String
8+ ) ENGINE = ReplicatedMergeTree ()
9+ PARTITION BY
10+ toStartOfHour (ts)
11+ ORDER BY (
12+ runner_id,
13+ toUnixTimestamp (ts),
14+ stream_type
15+ )
16+ TTL toDate (ts + toIntervalDay (3 ))
17+ SETTINGS index_granularity = 8192 , ttl_only_drop_parts = 1 ;
You can’t perform that action at this time.
0 commit comments