1- use crate :: {
2- rivet_api:: { apis, models} ,
3- ToolchainCtx ,
4- } ;
1+ use crate :: { rivet_api:: apis, ToolchainCtx } ;
52use anyhow:: * ;
63use base64:: { engine:: general_purpose:: STANDARD , Engine } ;
74use chrono:: { DateTime , Utc } ;
85use clap:: ValueEnum ;
6+ use serde_json:: json;
97use std:: time:: Duration ;
108use tokio:: signal;
119use tokio:: sync:: watch;
@@ -63,18 +61,17 @@ async fn tail_streams(
6361 stdout_fetched_tx : watch:: Sender < bool > ,
6462 stderr_fetched_tx : watch:: Sender < bool > ,
6563) -> Result < ( ) > {
66- // TODO: Update ot use ActorsQueryLogStream::All
6764 tokio:: try_join!(
6865 tail_stream(
6966 ctx,
7067 & opts,
71- models :: ActorsQueryLogStream :: StdOut ,
68+ 0 , // stdout
7269 stdout_fetched_tx
7370 ) ,
7471 tail_stream(
7572 ctx,
7673 & opts,
77- models :: ActorsQueryLogStream :: StdErr ,
74+ 1 , // stderr
7875 stderr_fetched_tx
7976 ) ,
8077 )
@@ -85,7 +82,7 @@ async fn tail_streams(
8582async fn tail_stream (
8683 ctx : & ToolchainCtx ,
8784 opts : & TailOpts < ' _ > ,
88- stream : models :: ActorsQueryLogStream ,
85+ stream : i32 , // 0 = stdout, 1 = stderr
8986 log_fetched_tx : watch:: Sender < bool > ,
9087) -> Result < ( ) > {
9188 let mut watch_index: Option < String > = None ;
@@ -95,8 +92,8 @@ async fn tail_stream(
9592 // future doesn't exit.
9693 match ( & opts. stream , stream) {
9794 ( LogStream :: All , _) => { }
98- ( LogStream :: StdOut , models :: ActorsQueryLogStream :: StdOut ) => { }
99- ( LogStream :: StdErr , models :: ActorsQueryLogStream :: StdErr ) => { }
95+ ( LogStream :: StdOut , 0 ) => { }
96+ ( LogStream :: StdErr , 1 ) => { }
10097 _ => {
10198 // Notify poll_actor_state
10299 log_fetched_tx. send ( true ) . ok ( ) ;
@@ -107,15 +104,19 @@ async fn tail_stream(
107104 }
108105
109106 loop {
107+ // Build query expression for a single actor
108+ let query_expr = json ! ( {
109+ "property" : "actor_id" ,
110+ "value" : opts. actor_id. to_string( ) ,
111+ "case_sensitive" : true
112+ } ) ;
113+ let query_json = serde_json:: to_string ( & query_expr) ?;
114+
110115 let res = apis:: actors_logs_api:: actors_logs_get (
111116 & ctx. openapi_config_cloud ,
112- stream,
113- & serde_json:: to_string ( & vec ! [ opts. actor_id] ) ?,
117+ & query_json,
114118 Some ( & ctx. project . name_id ) ,
115119 Some ( opts. environment ) ,
116- None ,
117- None ,
118- None ,
119120 watch_index. as_deref ( ) ,
120121 )
121122 . await
@@ -127,7 +128,15 @@ async fn tail_stream(
127128 first_batch_fetched = true ;
128129 }
129130
130- for ( ts, line) in res. timestamps . iter ( ) . zip ( res. lines . iter ( ) ) {
131+ // Filter logs by stream type
132+ for ( i, ( ts, line) ) in res. timestamps . iter ( ) . zip ( res. lines . iter ( ) ) . enumerate ( ) {
133+ // Check if this log entry is from the stream we're interested in
134+ if let Some ( & log_stream) = res. streams . get ( i) {
135+ if log_stream != stream {
136+ continue ;
137+ }
138+ }
139+
131140 let Result :: Ok ( ts) = ts. parse :: < DateTime < Utc > > ( ) else {
132141 eprintln ! ( "Failed to parse timestamp: {ts} for line {line}" ) ;
133142 continue ;
0 commit comments