@@ -279,7 +279,7 @@ def process_window(
279
279
# build a complete list otherwise expired windows could be deleted
280
280
# in state.delete_windows() and never be fetched.
281
281
expired_windows = list (
282
- self ._expired_windows (state , max_expired_window_start , collect )
282
+ self ._expired_windows (key , state , max_expired_window_start , collect )
283
283
)
284
284
285
285
state .delete_windows (
@@ -289,14 +289,14 @@ def process_window(
289
289
290
290
return reversed (updated_windows ), expired_windows
291
291
292
- def _expired_windows (self , state , max_expired_window_start , collect ):
292
+ def _expired_windows (self , key , state , max_expired_window_start , collect ):
293
293
for window in state .expire_windows (
294
294
max_start_time = max_expired_window_start ,
295
295
delete = False ,
296
296
collect = collect ,
297
297
end_inclusive = True ,
298
298
):
299
- (start , end ), (max_timestamp , aggregated ), collected , key = window
299
+ (start , end ), (max_timestamp , aggregated ), collected , _ = window
300
300
if end == max_timestamp :
301
301
yield key , self ._results (aggregated , collected , start , end )
302
302
0 commit comments