File tree Expand file tree Collapse file tree 6 files changed +35
-4
lines changed
services/pegboard/db/runner-log/migrations Expand file tree Collapse file tree 6 files changed +35
-4
lines changed Original file line number Diff line number Diff line change @@ -37,6 +37,7 @@ pub struct LogShipper {
3737 pub vector_socket_addr : String ,
3838
3939 pub runner_id : String ,
40+ pub actor_id : Option < String > ,
4041}
4142
4243impl LogShipper {
@@ -91,7 +92,7 @@ impl LogShipper {
9192 while let Result :: Ok ( message) = self . msg_rx . recv ( ) {
9293 let vector_message = VectorMessage :: Runners {
9394 runner_id : self . runner_id . as_str ( ) ,
94- task : "main" , // Backwards compatibility with logs
95+ actor_id : self . actor_id . as_ref ( ) . map ( |x| x . as_str ( ) ) ,
9596 stream_type : message. stream_type as u8 ,
9697 ts : message. ts ,
9798 message : message. message . as_str ( ) ,
@@ -114,7 +115,7 @@ enum VectorMessage<'a> {
114115 #[ serde( rename = "runners" ) ]
115116 Runners {
116117 runner_id : & ' a str ,
117- task : & ' a str ,
118+ actor_id : Option < & ' a str > ,
118119 stream_type : u8 ,
119120 ts : u64 ,
120121 message : & ' a str ,
Original file line number Diff line number Diff line change @@ -36,6 +36,8 @@ fn main() -> Result<()> {
3636 . transpose ( )
3737 . context ( "failed to parse vector socket addr" ) ?;
3838 let runner_id = var ( "RUNNER_ID" ) ?;
39+ // Only set if this is a single allocation runner (one actor running on it)
40+ let actor_id = var ( "ACTOR_ID" ) . ok ( ) ;
3941
4042 let ( shutdown_tx, shutdown_rx) = mpsc:: sync_channel ( 1 ) ;
4143
@@ -48,6 +50,7 @@ fn main() -> Result<()> {
4850 msg_rx,
4951 vector_socket_addr,
5052 runner_id,
53+ actor_id,
5154 } ;
5255 let log_shipper_thread = log_shipper. spawn ( ) ;
5356 ( Some ( msg_tx) , Some ( log_shipper_thread) )
Original file line number Diff line number Diff line change @@ -116,9 +116,15 @@ impl Actor {
116116 . context ( "should have runner config" ) ?
117117 {
118118 protocol:: ActorRunner :: New { .. } => {
119+ let actor_id = matches ! (
120+ self . runner. config( ) . image. allocation_type,
121+ protocol:: ImageAllocationType :: Single
122+ )
123+ . then_some ( self . actor_id ) ;
124+
119125 // Because the runner is not already started we can get the ports here instead of reading from
120126 // sqlite
121- let ports = self . runner . start ( ctx) . await ?;
127+ let ports = self . runner . start ( ctx, actor_id ) . await ?;
122128
123129 let pid = self . runner . pid ( ) . await ?;
124130
Original file line number Diff line number Diff line change @@ -250,9 +250,12 @@ impl Runner {
250250 Ok ( ( ) )
251251 }
252252
253+ // `actor_id` is set if this runner has a single allocation type which means there is only one actor
254+ // runner on it
253255 pub async fn start (
254256 self : & Arc < Self > ,
255257 ctx : & Arc < Ctx > ,
258+ actor_id : Option < Uuid > ,
256259 ) -> Result < protocol:: HashableMap < String , protocol:: ProxiedPort > > {
257260 tracing:: info!( runner_id=?self . runner_id, "starting" ) ;
258261
@@ -305,7 +308,7 @@ impl Runner {
305308 let self2 = self . clone ( ) ;
306309 let ctx2 = ctx. clone ( ) ;
307310 tokio:: spawn ( async move {
308- match self2. run ( & ctx2) . await {
311+ match self2. run ( & ctx2, actor_id ) . await {
309312 Ok ( _) => {
310313 if let Err ( err) = self2. observe ( & ctx2, false ) . await {
311314 tracing:: error!( runner_id=?self2. runner_id, ?err, "observe failed" ) ;
@@ -340,6 +343,7 @@ impl Runner {
340343 ) ,
341344 ( "RUNNER_ID" , self . runner_id. to_string( ) ) ,
342345 ] ;
346+
343347 if let Some ( vector) = & ctx. config ( ) . vector {
344348 runner_env. push ( ( "VECTOR_SOCKET_ADDR" , vector. address . to_string ( ) ) ) ;
345349 }
Original file line number Diff line number Diff line change 1+
2+ CREATE TABLE IF NOT EXISTS runner_logs (
3+ runner_id UUID,
4+ actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5+ stream_type UInt8, -- pegboard::types::LogsStreamType
6+ ts DateTime64 (9 ),
7+ message String
8+ ) ENGINE = ReplicatedMergeTree ()
9+ PARTITION BY
10+ toStartOfHour (ts)
11+ ORDER BY (
12+ runner_id,
13+ toUnixTimestamp (ts),
14+ stream_type
15+ )
16+ TTL toDate (ts + toIntervalDay (3 ))
17+ SETTINGS index_granularity = 8192 , ttl_only_drop_parts = 1 ;
You can’t perform that action at this time.
0 commit comments