1
1
use anyhow:: Result ;
2
2
use clap:: Args ;
3
+ use std:: collections:: HashSet ;
4
+ use std:: time:: Duration ;
3
5
4
- use crate :: api:: { Api , Dataset , FetchEventsOptions } ;
6
+ use crate :: api:: { Api , Dataset , FetchEventsOptions , LogEntry } ;
5
7
use crate :: config:: Config ;
6
8
use crate :: utils:: formatting:: Table ;
7
9
@@ -26,6 +28,9 @@ const LOG_FIELDS: &[&str] = &[
26
28
"message" ,
27
29
] ;
28
30
31
+ /// Maximum number of log entries to keep in memory for deduplication
32
+ const MAX_DEDUP_BUFFER_SIZE : usize = 10_000 ;
33
+
29
34
/// Arguments for listing logs
30
35
#[ derive( Args ) ]
31
36
pub ( super ) struct ListLogsArgs {
@@ -45,6 +50,14 @@ pub(super) struct ListLogsArgs {
45
50
#[ arg( long = "query" , default_value = "" ) ]
46
51
#[ arg( help = "Query to filter logs. Example: \" level:error\" " ) ]
47
52
query : String ,
53
+
54
+ #[ arg( long = "live" ) ]
55
+ #[ arg( help = "Enable live streaming mode to continuously poll for new logs." ) ]
56
+ live : bool ,
57
+
58
+ #[ arg( long = "poll-interval" , default_value = "2" ) ]
59
+ #[ arg( help = "Polling interval in seconds for live streaming mode." ) ]
60
+ poll_interval : u64 ,
48
61
}
49
62
50
63
pub ( super ) fn execute ( args : ListLogsArgs ) -> Result < ( ) > {
@@ -76,7 +89,11 @@ pub(super) fn execute(args: ListLogsArgs) -> Result<()> {
76
89
Some ( args. query . as_str ( ) )
77
90
} ;
78
91
79
- execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
92
+ if args. live {
93
+ execute_live_streaming ( & api, & org, & project, query, LOG_FIELDS , & args)
94
+ } else {
95
+ execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
96
+ }
80
97
}
81
98
82
99
fn execute_single_fetch (
@@ -129,3 +146,257 @@ fn execute_single_fetch(
129
146
130
147
Ok ( ( ) )
131
148
}
149
+
150
+ /// Manages deduplication of log entries with a bounded buffer
151
+ struct LogDeduplicator {
152
+ /// Set of seen log IDs for quick lookup
153
+ seen_ids : HashSet < String > ,
154
+ /// Buffer of log entries in order (for maintaining size limit)
155
+ buffer : Vec < LogEntry > ,
156
+ /// Maximum size of the buffer
157
+ max_size : usize ,
158
+ }
159
+
160
+ impl LogDeduplicator {
161
+ fn new ( max_size : usize ) -> Self {
162
+ Self {
163
+ seen_ids : HashSet :: new ( ) ,
164
+ buffer : Vec :: new ( ) ,
165
+ max_size,
166
+ }
167
+ }
168
+
169
+ /// Add new logs and return only the ones that haven't been seen before
170
+ fn add_logs ( & mut self , new_logs : Vec < LogEntry > ) -> Vec < LogEntry > {
171
+ let mut unique_logs = Vec :: new ( ) ;
172
+
173
+ for log in new_logs {
174
+ if !self . seen_ids . contains ( & log. item_id ) {
175
+ self . seen_ids . insert ( log. item_id . clone ( ) ) ;
176
+ self . buffer . push ( log. clone ( ) ) ;
177
+ unique_logs. push ( log) ;
178
+ }
179
+ }
180
+
181
+ // Maintain buffer size limit by removing oldest entries
182
+ while self . buffer . len ( ) > self . max_size {
183
+ let removed_log = self . buffer . remove ( 0 ) ;
184
+ self . seen_ids . remove ( & removed_log. item_id ) ;
185
+ }
186
+
187
+ unique_logs
188
+ }
189
+ }
190
+
191
+ fn execute_live_streaming (
192
+ api : & Api ,
193
+ org : & str ,
194
+ project : & str ,
195
+ query : Option < & str > ,
196
+ fields : & [ & str ] ,
197
+ args : & ListLogsArgs ,
198
+ ) -> Result < ( ) > {
199
+ let mut deduplicator = LogDeduplicator :: new ( MAX_DEDUP_BUFFER_SIZE ) ;
200
+ let poll_duration = Duration :: from_secs ( args. poll_interval ) ;
201
+ let mut consecutive_new_only_count = 0 ;
202
+ const WARNING_THRESHOLD : usize = 3 ; // Show warning after 3 consecutive new-only responses
203
+
204
+ println ! ( "Starting live log streaming..." ) ;
205
+ println ! (
206
+ "Polling every {} seconds. Press Ctrl+C to stop." ,
207
+ args. poll_interval
208
+ ) ;
209
+
210
+ // Set up table with headers and print header once
211
+ let mut table = Table :: new ( ) ;
212
+ table
213
+ . title_row ( )
214
+ . add ( "Item ID" )
215
+ . add ( "Timestamp" )
216
+ . add ( "Severity" )
217
+ . add ( "Message" )
218
+ . add ( "Trace" ) ;
219
+
220
+ let mut header_printed = false ;
221
+
222
+ loop {
223
+ let options = FetchEventsOptions {
224
+ dataset : Dataset :: OurLogs ,
225
+ fields,
226
+ project_id : Some ( project) ,
227
+ cursor : None ,
228
+ query,
229
+ per_page : Some ( args. max_rows ) ,
230
+ stats_period : Some ( "1h" ) ,
231
+ sort : Some ( "-timestamp" ) ,
232
+ } ;
233
+
234
+ match api
235
+ . authenticated ( ) ?
236
+ . fetch_organization_events ( org, & options)
237
+ {
238
+ Ok ( logs) => {
239
+ let unique_logs = deduplicator. add_logs ( logs) ;
240
+
241
+ if unique_logs. is_empty ( ) {
242
+ consecutive_new_only_count += 1 ;
243
+
244
+ if consecutive_new_only_count >= WARNING_THRESHOLD && args. query . is_empty ( ) {
245
+ eprintln ! (
246
+ "\n ⚠️ Warning: No new logs found for {consecutive_new_only_count} consecutive polls."
247
+ ) ;
248
+ eprintln ! ( " Consider using --query to filter logs, as you may be missing some entries." ) ;
249
+ eprintln ! (
250
+ " Example: --query \" level:error\" or --query \" message:*error*\" "
251
+ ) ;
252
+
253
+ // Reset counter to avoid spam
254
+ consecutive_new_only_count = 0 ;
255
+ }
256
+ } else {
257
+ consecutive_new_only_count = 0 ;
258
+
259
+ // Add new logs to table
260
+ for log in unique_logs {
261
+ let row = table. add_row ( ) ;
262
+ row. add ( & log. item_id )
263
+ . add ( & log. timestamp )
264
+ . add ( log. severity . as_deref ( ) . unwrap_or ( "" ) )
265
+ . add ( log. message . as_deref ( ) . unwrap_or ( "" ) )
266
+ . add ( log. trace . as_deref ( ) . unwrap_or ( "" ) ) ;
267
+ }
268
+
269
+ if !header_printed {
270
+ // Print header with first data batch so column widths match actual data
271
+ table. print_table_start ( ) ;
272
+ header_printed = true ;
273
+ } else {
274
+ // Print only the rows (without table borders) for subsequent batches
275
+ table. print_rows_only ( ) ;
276
+ }
277
+ // Clear rows to free memory but keep the table structure for reuse
278
+ table. clear_rows ( ) ;
279
+ }
280
+ }
281
+ Err ( e) => {
282
+ eprintln ! ( "Error fetching logs: {e}" ) ;
283
+ }
284
+ }
285
+
286
+ std:: thread:: sleep ( poll_duration) ;
287
+ }
288
+ }
289
+
290
+ #[ cfg( test) ]
291
+ mod tests {
292
+ use super :: * ;
293
+
294
+ fn create_test_log ( id : & str , message : & str ) -> LogEntry {
295
+ LogEntry {
296
+ item_id : id. to_owned ( ) ,
297
+ trace : None ,
298
+ severity : Some ( "info" . to_owned ( ) ) ,
299
+ timestamp : "2025-01-01T00:00:00Z" . to_owned ( ) ,
300
+ message : Some ( message. to_owned ( ) ) ,
301
+ }
302
+ }
303
+
304
+ #[ test]
305
+ fn test_log_deduplicator_new ( ) {
306
+ let deduplicator = LogDeduplicator :: new ( 100 ) ;
307
+ assert_eq ! ( deduplicator. seen_ids. len( ) , 0 ) ;
308
+ }
309
+
310
+ #[ test]
311
+ fn test_log_deduplicator_add_unique_logs ( ) {
312
+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
313
+
314
+ let log1 = create_test_log ( "1" , "test message 1" ) ;
315
+ let log2 = create_test_log ( "2" , "test message 2" ) ;
316
+
317
+ let unique_logs = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
318
+
319
+ assert_eq ! ( unique_logs. len( ) , 2 ) ;
320
+ assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
321
+ }
322
+
323
+ #[ test]
324
+ fn test_log_deduplicator_deduplicate_logs ( ) {
325
+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
326
+
327
+ let log1 = create_test_log ( "1" , "test message 1" ) ;
328
+ let log2 = create_test_log ( "2" , "test message 2" ) ;
329
+
330
+ // Add logs first time
331
+ let unique_logs1 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
332
+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
333
+
334
+ // Add same logs again
335
+ let unique_logs2 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
336
+ assert_eq ! ( unique_logs2. len( ) , 0 ) ; // Should be empty as logs already seen
337
+
338
+ assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
339
+ }
340
+
341
+ #[ test]
342
+ fn test_log_deduplicator_buffer_size_limit ( ) {
343
+ let mut deduplicator = LogDeduplicator :: new ( 3 ) ;
344
+
345
+ // Add 5 logs to a buffer with max size 3
346
+ let logs = vec ! [
347
+ create_test_log( "1" , "test message 1" ) ,
348
+ create_test_log( "2" , "test message 2" ) ,
349
+ create_test_log( "3" , "test message 3" ) ,
350
+ create_test_log( "4" , "test message 4" ) ,
351
+ create_test_log( "5" , "test message 5" ) ,
352
+ ] ;
353
+
354
+ let unique_logs = deduplicator. add_logs ( logs) ;
355
+ assert_eq ! ( unique_logs. len( ) , 5 ) ;
356
+
357
+ // After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted
358
+ // So logs 1 and 2 should no longer be in the seen_ids set
359
+ // Adding them again should return them as new logs
360
+ let duplicate_logs = vec ! [
361
+ create_test_log( "1" , "test message 1" ) ,
362
+ create_test_log( "2" , "test message 2" ) ,
363
+ ] ;
364
+ let duplicate_unique_logs = deduplicator. add_logs ( duplicate_logs) ;
365
+ assert_eq ! ( duplicate_unique_logs. len( ) , 2 ) ;
366
+
367
+ // Test that adding new logs still works
368
+ let new_logs = vec ! [ create_test_log( "6" , "test message 6" ) ] ;
369
+ let new_unique_logs = deduplicator. add_logs ( new_logs) ;
370
+ assert_eq ! ( new_unique_logs. len( ) , 1 ) ;
371
+ }
372
+
373
+ #[ test]
374
+ fn test_log_deduplicator_mixed_new_and_old_logs ( ) {
375
+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
376
+
377
+ // Add initial logs
378
+ let initial_logs = vec ! [
379
+ create_test_log( "1" , "test message 1" ) ,
380
+ create_test_log( "2" , "test message 2" ) ,
381
+ ] ;
382
+ let unique_logs1 = deduplicator. add_logs ( initial_logs) ;
383
+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
384
+
385
+ // Add mix of new and old logs
386
+ let mixed_logs = vec ! [
387
+ create_test_log( "1" , "test message 1" ) , // old
388
+ create_test_log( "3" , "test message 3" ) , // new
389
+ create_test_log( "2" , "test message 2" ) , // old
390
+ create_test_log( "4" , "test message 4" ) , // new
391
+ ] ;
392
+ let unique_logs2 = deduplicator. add_logs ( mixed_logs) ;
393
+
394
+ // Should only return the new logs (3 and 4)
395
+ assert_eq ! ( unique_logs2. len( ) , 2 ) ;
396
+ assert_eq ! ( unique_logs2[ 0 ] . item_id, "3" ) ;
397
+ assert_eq ! ( unique_logs2[ 1 ] . item_id, "4" ) ;
398
+
399
+ assert_eq ! ( deduplicator. seen_ids. len( ) , 4 ) ;
400
+ assert_eq ! ( deduplicator. buffer. len( ) , 4 ) ;
401
+ }
402
+ }
0 commit comments