@@ -41,23 +41,34 @@ def subscribe(self):
4141 if self .starting_position_ts :
4242 self ._lambda_client \
4343 .create_event_source_mapping (
44- EventSourceArn = self .stream_name ,
45- FunctionName = self .function_name ,
46- BatchSize = self .batch_size ,
47- StartingPosition = self .starting_position ,
48- StartingPositionTimestamp = self .starting_position_ts )
44+ EventSourceArn = self .stream_name ,
45+ FunctionName = self .function_name ,
46+ BatchSize = self .batch_size ,
47+ StartingPosition = self .starting_position ,
48+ StartingPositionTimestamp = self .starting_position_ts )
4949 else :
5050 self ._lambda_client \
5151 .create_event_source_mapping (
52- EventSourceArn = self .stream_name ,
53- FunctionName = self .function_name ,
54- BatchSize = self .batch_size ,
55- StartingPosition = self .starting_position )
52+ EventSourceArn = self .stream_name ,
53+ FunctionName = self .function_name ,
54+ BatchSize = self .batch_size ,
55+ StartingPosition = self .starting_position )
5656 LOG .debug ('Subscription created' )
5757 except botocore .exceptions .ClientError as ex :
5858 response_code = ex .response ['Error' ]['Code' ]
5959 if response_code == 'ResourceConflictException' :
60- LOG .debug ('Subscription exists' )
60+ LOG .debug ('Subscription exists. Updating ...' )
61+ resp = self ._lambda_client \
62+ .list_event_source_mappings (
63+ FunctionName = self .function_name ,
64+ EventSourceArn = self .stream_name )
65+ uuid = resp ['EventSourceMappings' ][0 ]['UUID' ]
66+ self ._lambda_client \
67+ .update_event_source_mapping (
68+ UUID = uuid ,
69+ FunctionName = self .function_name ,
70+ Enabled = True ,
71+ BatchSize = self .batch_size )
6172 else :
6273 LOG .error ('Subscription failed, error=%s' % str (ex ))
6374 raise ex
0 commit comments