1
- use crate :: {
2
- rivet_api:: { apis, models} ,
3
- ToolchainCtx ,
4
- } ;
1
+ use crate :: { rivet_api:: apis, ToolchainCtx } ;
5
2
use anyhow:: * ;
6
3
use base64:: { engine:: general_purpose:: STANDARD , Engine } ;
7
4
use chrono:: { DateTime , Utc } ;
8
5
use clap:: ValueEnum ;
6
+ use serde_json:: json;
9
7
use std:: time:: Duration ;
10
8
use tokio:: signal;
11
9
use tokio:: sync:: watch;
@@ -63,18 +61,17 @@ async fn tail_streams(
63
61
stdout_fetched_tx : watch:: Sender < bool > ,
64
62
stderr_fetched_tx : watch:: Sender < bool > ,
65
63
) -> Result < ( ) > {
66
- // TODO: Update ot use ActorsQueryLogStream::All
67
64
tokio:: try_join!(
68
65
tail_stream(
69
66
ctx,
70
67
& opts,
71
- models :: ActorsQueryLogStream :: StdOut ,
68
+ 0 , // stdout
72
69
stdout_fetched_tx
73
70
) ,
74
71
tail_stream(
75
72
ctx,
76
73
& opts,
77
- models :: ActorsQueryLogStream :: StdErr ,
74
+ 1 , // stderr
78
75
stderr_fetched_tx
79
76
) ,
80
77
)
@@ -85,7 +82,7 @@ async fn tail_streams(
85
82
async fn tail_stream (
86
83
ctx : & ToolchainCtx ,
87
84
opts : & TailOpts < ' _ > ,
88
- stream : models :: ActorsQueryLogStream ,
85
+ stream : i32 , // 0 = stdout, 1 = stderr
89
86
log_fetched_tx : watch:: Sender < bool > ,
90
87
) -> Result < ( ) > {
91
88
let mut watch_index: Option < String > = None ;
@@ -95,8 +92,8 @@ async fn tail_stream(
95
92
// future doesn't exit.
96
93
match ( & opts. stream , stream) {
97
94
( LogStream :: All , _) => { }
98
- ( LogStream :: StdOut , models :: ActorsQueryLogStream :: StdOut ) => { }
99
- ( LogStream :: StdErr , models :: ActorsQueryLogStream :: StdErr ) => { }
95
+ ( LogStream :: StdOut , 0 ) => { }
96
+ ( LogStream :: StdErr , 1 ) => { }
100
97
_ => {
101
98
// Notify poll_actor_state
102
99
log_fetched_tx. send ( true ) . ok ( ) ;
@@ -107,15 +104,19 @@ async fn tail_stream(
107
104
}
108
105
109
106
loop {
107
+ // Build query expression for a single actor
108
+ let query_expr = json ! ( {
109
+ "property" : "actor_id" ,
110
+ "value" : opts. actor_id. to_string( ) ,
111
+ "case_sensitive" : true
112
+ } ) ;
113
+ let query_json = serde_json:: to_string ( & query_expr) ?;
114
+
110
115
let res = apis:: actors_logs_api:: actors_logs_get (
111
116
& ctx. openapi_config_cloud ,
112
- stream,
113
- & serde_json:: to_string ( & vec ! [ opts. actor_id] ) ?,
117
+ & query_json,
114
118
Some ( & ctx. project . name_id ) ,
115
119
Some ( opts. environment ) ,
116
- None ,
117
- None ,
118
- None ,
119
120
watch_index. as_deref ( ) ,
120
121
)
121
122
. await
@@ -127,7 +128,15 @@ async fn tail_stream(
127
128
first_batch_fetched = true ;
128
129
}
129
130
130
- for ( ts, line) in res. timestamps . iter ( ) . zip ( res. lines . iter ( ) ) {
131
+ // Filter logs by stream type
132
+ for ( i, ( ts, line) ) in res. timestamps . iter ( ) . zip ( res. lines . iter ( ) ) . enumerate ( ) {
133
+ // Check if this log entry is from the stream we're interested in
134
+ if let Some ( & log_stream) = res. streams . get ( i) {
135
+ if log_stream != stream {
136
+ continue ;
137
+ }
138
+ }
139
+
131
140
let Result :: Ok ( ts) = ts. parse :: < DateTime < Utc > > ( ) else {
132
141
eprintln ! ( "Failed to parse timestamp: {ts} for line {line}" ) ;
133
142
continue ;
0 commit comments