Skip to content

Commit 291f81a

Browse files
committed
Reduce calls to CloudFormation APIs to avoid throttling
Signed-off-by: Francesco De Martino <[email protected]>
1 parent 4574613 commit 291f81a

File tree

3 files changed

+49
-31
lines changed

3 files changed

+49
-31
lines changed

jobwatcher/jobwatcher.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
from configparser import ConfigParser
2222
from retrying import retry
2323

24+
from common.time_utils import seconds
2425
from common.utils import get_asg_name, get_asg_settings, get_compute_instance_type, get_instance_properties, load_module
2526

27+
LOOP_TIME = 60
28+
UPDATE_INSTANCE_PROPERTIES_INTERVAL = 180
29+
2630
log = logging.getLogger(__name__)
2731

2832

@@ -76,14 +80,20 @@ def _poll_scheduler_status(config, asg_name, scheduler_module):
7680
:param scheduler_module: scheduler module
7781
"""
7882
instance_type = None
83+
instance_properties = None
84+
update_instance_properties_timer = 0
7985
while True:
8086
# Get instance properties
81-
new_instance_type = get_compute_instance_type(
82-
config.region, config.proxy_config, config.stack_name, fallback=instance_type
83-
)
84-
if new_instance_type != instance_type:
85-
instance_type = new_instance_type
86-
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
87+
if not instance_properties or update_instance_properties_timer >= UPDATE_INSTANCE_PROPERTIES_INTERVAL:
88+
logging.info("Refreshing compute instance properties")
89+
update_instance_properties_timer = 0
90+
new_instance_type = get_compute_instance_type(
91+
config.region, config.proxy_config, config.stack_name, fallback=instance_type
92+
)
93+
if new_instance_type != instance_type:
94+
instance_type = new_instance_type
95+
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
96+
update_instance_properties_timer += LOOP_TIME
8797

8898
# get current limits
8999
_, current_desired, max_size = get_asg_settings(config.region, config.proxy_config, asg_name)
@@ -123,10 +133,10 @@ def _poll_scheduler_status(config, asg_name, scheduler_module):
123133
asg_client = boto3.client("autoscaling", region_name=config.region, config=config.proxy_config)
124134
asg_client.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=requested)
125135

126-
time.sleep(60)
136+
time.sleep(LOOP_TIME)
127137

128138

129-
@retry(wait_fixed=60000)
139+
@retry(wait_fixed=seconds(LOOP_TIME))
130140
def main():
131141
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s")
132142
log.info("jobwatcher startup")

nodewatcher/nodewatcher.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,7 @@
2929
from retrying import RetryError, retry
3030

3131
from common.time_utils import minutes, seconds
32-
from common.utils import (
33-
CriticalError,
34-
get_asg_name,
35-
get_asg_settings,
36-
get_compute_instance_type,
37-
get_instance_properties,
38-
load_module,
39-
)
32+
from common.utils import CriticalError, get_asg_name, get_asg_settings, get_instance_properties, load_module
4033

4134
log = logging.getLogger(__name__)
4235

@@ -281,7 +274,7 @@ def _init_idletime():
281274
return idletime
282275

283276

284-
def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id):
277+
def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type):
285278
"""
286279
Verify instance/scheduler status and self-terminate the instance.
287280
@@ -291,12 +284,12 @@ def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance
291284
:param asg_name: ASG name
292285
:param hostname: current hostname
293286
:param instance_id: current instance id
287+
:param instance_type: current instance type
294288
"""
295289
_wait_for_stack_ready(config.stack_name, config.region, config.proxy_config)
296290
_terminate_if_down(scheduler_module, config, asg_name, instance_id, INITIAL_TERMINATE_TIMEOUT)
297291

298292
idletime = _init_idletime()
299-
instance_type = get_compute_instance_type(config.region, config.proxy_config, config.stack_name)
300293
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
301294
while True:
302295
time.sleep(60)
@@ -356,10 +349,11 @@ def main():
356349

357350
instance_id = _get_metadata("instance-id")
358351
hostname = _get_metadata("local-hostname")
359-
log.info("Instance id is %s, hostname is %s", instance_id, hostname)
352+
instance_type = _get_metadata("instance-type")
353+
log.info("Instance id is %s, hostname is %s, instance type is %s", instance_id, hostname, instance_type)
360354
asg_name = get_asg_name(config.stack_name, config.region, config.proxy_config)
361355

362-
_poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id)
356+
_poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type)
363357
except Exception as e:
364358
log.critical("An unexpected error occurred: %s", e)
365359
raise

sqswatcher/sqswatcher.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from configparser import ConfigParser
2727
from retrying import retry
2828

29+
from common.time_utils import seconds
2930
from common.utils import (
3031
CriticalError,
3132
get_asg_name,
@@ -35,6 +36,9 @@
3536
load_module,
3637
)
3738

39+
LOOP_TIME = 30
40+
CLUSTER_PROPERTIES_REFRESH_INTERVAL = 180
41+
3842

3943
class QueryConfigError(Exception):
4044
pass
@@ -336,17 +340,27 @@ def _poll_queue(sqs_config, queue, table, asg_name):
336340

337341
max_cluster_size = None
338342
instance_type = None
343+
cluster_properties_refresh_timer = 0
339344
while True:
345+
force_cluster_update = False
340346
# dynamically retrieve max_cluster_size and compute_instance_type
341-
new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size)
342-
new_instance_type = get_compute_instance_type(
343-
sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type
344-
)
345-
force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type
346-
if new_instance_type != instance_type:
347-
instance_type = new_instance_type
348-
instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type)
349-
max_cluster_size = new_max_cluster_size
347+
if (
348+
not max_cluster_size
349+
or not instance_type
350+
or cluster_properties_refresh_timer >= CLUSTER_PROPERTIES_REFRESH_INTERVAL
351+
):
352+
cluster_properties_refresh_timer = 0
353+
logging.info("Refreshing cluster properties")
354+
new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size)
355+
new_instance_type = get_compute_instance_type(
356+
sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type
357+
)
358+
force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type
359+
if new_instance_type != instance_type:
360+
instance_type = new_instance_type
361+
instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type)
362+
max_cluster_size = new_max_cluster_size
363+
cluster_properties_refresh_timer += LOOP_TIME
350364

351365
messages = _retrieve_all_sqs_messages(queue)
352366
update_events = _parse_sqs_messages(messages, table)
@@ -360,10 +374,10 @@ def _poll_queue(sqs_config, queue, table, asg_name):
360374
instance_properties,
361375
force_cluster_update,
362376
)
363-
time.sleep(30)
377+
time.sleep(LOOP_TIME)
364378

365379

366-
@retry(wait_fixed=30000)
380+
@retry(wait_fixed=seconds(LOOP_TIME))
367381
def main():
368382
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s")
369383
log.info("sqswatcher startup")

0 commit comments

Comments
 (0)