From 0bcf1a5175ba06d364fa83e249ee7a13260c1c22 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 4 Jun 2020 12:57:19 +0300 Subject: [PATCH] Add using next token --- lib/logstash/inputs/cloudwatch_logs.rb | 27 ++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 38d3cc6..0f0047f 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -179,15 +179,21 @@ def determine_start_position(groups, sincedb) private def process_group(group) next_token = nil + new_start_position = 0 + if !@sincedb.member?(group) + @sincedb[group] = 0 + end + initial_start_time = @sincedb[group] + initial_end_time = DateTime.now.strftime('%Q') + logger.info("Read logs use time: #{parse_time(initial_start_time)} for group:#{group}") loop do - if !@sincedb.member?(group) - @sincedb[group] = 0 - end + logger.info("Read logs use token since for group:#{group}") if !next_token.nil? params = { - :log_group_name => group, - :start_time => @sincedb[group], - :interleaved => true, - :next_token => next_token + :log_group_name => group, + :start_time => initial_start_time, + :end_time => initial_end_time, + :interleaved => true, + :next_token => next_token } resp = @cloudwatch.filter_log_events(params) @@ -195,11 +201,10 @@ def process_group(group) process_log(event, group) end - _sincedb_write - next_token = resp.next_token break if next_token.nil? end + _sincedb_write @priority.delete(group) @priority << group end #def process_group @@ -217,7 +222,9 @@ def process_log(log, group) decorate(event) @queue << event - @sincedb[group] = log.timestamp + 1 + if @sincedb[group] < log.timestamp + @sincedb[group] = log.timestamp + 1 + end end end # def process_log