File tree Expand file tree Collapse file tree 6 files changed +39
-2
lines changed
services/pegboard/db/runner-log/migrations Expand file tree Collapse file tree 6 files changed +39
-2
lines changed Original file line number Diff line number Diff line change @@ -38,6 +38,7 @@ pub struct LogShipper {
38
38
pub vector_socket_addr : String ,
39
39
40
40
pub runner_id : String ,
41
+ pub actor_id : Option < String > ,
41
42
42
43
pub env_id : Uuid ,
43
44
}
@@ -94,6 +95,7 @@ impl LogShipper {
94
95
while let Result :: Ok ( message) = self . msg_rx . recv ( ) {
95
96
let vector_message = VectorMessage :: Runners {
96
97
runner_id : self . runner_id . as_str ( ) ,
98
+ actor_id : self . actor_id . as_ref ( ) . map ( |x| x. as_str ( ) ) ,
97
99
env_id : self . env_id ,
98
100
stream_type : message. stream_type as u8 ,
99
101
ts : message. ts ,
@@ -117,6 +119,7 @@ enum VectorMessage<'a> {
117
119
#[ serde( rename = "runners" ) ]
118
120
Runners {
119
121
runner_id : & ' a str ,
122
+ actor_id : Option < & ' a str > ,
120
123
env_id : Uuid ,
121
124
stream_type : u8 ,
122
125
ts : u64 ,
Original file line number Diff line number Diff line change @@ -37,6 +37,8 @@ fn main() -> Result<()> {
37
37
. transpose ( )
38
38
. context ( "failed to parse vector socket addr" ) ?;
39
39
let runner_id = var ( "RUNNER_ID" ) ?;
40
+ // Only set if this is a single allocation runner (one actor running on it)
41
+ let actor_id = var ( "ACTOR_ID" ) . ok ( ) ;
40
42
let env_id = Uuid :: parse_str ( & var ( "ENVIRONMENT_ID" ) ?) ?;
41
43
println ! ( "Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}" , vector_socket_addr. as_ref( ) . map( |x| x. as_str( ) ) . unwrap_or( "?" ) ) ;
42
44
@@ -51,6 +53,7 @@ fn main() -> Result<()> {
51
53
msg_rx,
52
54
vector_socket_addr,
53
55
runner_id,
56
+ actor_id,
54
57
env_id,
55
58
} ;
56
59
let log_shipper_thread = log_shipper. spawn ( ) ;
Original file line number Diff line number Diff line change @@ -116,9 +116,15 @@ impl Actor {
116
116
. context ( "should have runner config" ) ?
117
117
{
118
118
protocol:: ActorRunner :: New { .. } => {
119
+ let actor_id = matches ! (
120
+ self . runner. config( ) . image. allocation_type,
121
+ protocol:: ImageAllocationType :: Single
122
+ )
123
+ . then_some ( self . actor_id ) ;
124
+
119
125
// Because the runner is not already started we can get the ports here instead of reading from
120
126
// sqlite
121
- let ports = self . runner . start ( ctx) . await ?;
127
+ let ports = self . runner . start ( ctx, actor_id ) . await ?;
122
128
123
129
let pid = self . runner . pid ( ) . await ?;
124
130
Original file line number Diff line number Diff line change @@ -250,9 +250,12 @@ impl Runner {
250
250
Ok ( ( ) )
251
251
}
252
252
253
+ // `actor_id` is set if this runner has a single allocation type which means there is only one actor
254
+ // runner on it
253
255
pub async fn start (
254
256
self : & Arc < Self > ,
255
257
ctx : & Arc < Ctx > ,
258
+ actor_id : Option < Uuid > ,
256
259
) -> Result < protocol:: HashableMap < String , protocol:: ProxiedPort > > {
257
260
tracing:: info!( runner_id=?self . runner_id, "starting" ) ;
258
261
@@ -305,7 +308,7 @@ impl Runner {
305
308
let self2 = self . clone ( ) ;
306
309
let ctx2 = ctx. clone ( ) ;
307
310
tokio:: spawn ( async move {
308
- match self2. run ( & ctx2) . await {
311
+ match self2. run ( & ctx2, actor_id ) . await {
309
312
Ok ( _) => {
310
313
if let Err ( err) = self2. observe ( & ctx2, false ) . await {
311
314
tracing:: error!( runner_id=?self2. runner_id, ?err, "observe failed" ) ;
@@ -339,7 +342,12 @@ impl Runner {
339
342
. to_string( ) ,
340
343
) ,
341
344
( "RUNNER_ID" , self . runner_id. to_string( ) ) ,
345
+ (
346
+ "ENVIRONMENT_ID" ,
347
+ self . metadata. environment. env_id. to_string( ) ,
348
+ ) ,
342
349
] ;
350
+
343
351
if let Some ( vector) = & ctx. config ( ) . vector {
344
352
runner_env. push ( ( "VECTOR_SOCKET_ADDR" , vector. address . to_string ( ) ) ) ;
345
353
}
Original file line number Diff line number Diff line change
1
+
2
+ CREATE TABLE IF NOT EXISTS runner_logs (
3
+ runner_id UUID,
4
+ actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5
+ stream_type UInt8, -- pegboard::types::LogsStreamType
6
+ ts DateTime64 (9 ),
7
+ message String
8
+ ) ENGINE = ReplicatedMergeTree ()
9
+ PARTITION BY
10
+ toStartOfHour (ts)
11
+ ORDER BY (
12
+ runner_id,
13
+ toUnixTimestamp (ts),
14
+ stream_type
15
+ )
16
+ TTL toDate (ts + toIntervalDay (3 ))
17
+ SETTINGS index_granularity = 8192 , ttl_only_drop_parts = 1 ;
You can’t perform that action at this time.
0 commit comments