From c82eab1d2aadbc4b552949716d2bd79ca351d795 Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Fri, 8 Aug 2014 02:47:51 +0000 Subject: [PATCH 1/6] add support for generic selector based on i. --- lib/synapse/service_watcher/ec2tag.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a3319c26..cc5b7137 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -88,14 +88,16 @@ def sleep_until_next_check(start_time) def discover_instances AWS.memoize do instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value']) - + if @discovery['selector'] + instances = eval("instances.select { |i| #{@discovery['selector']}}") + end new_backends = [] # choice of private_dns_name, dns_name, private_ip_address or # ip_address, for now, just stick with the private fields. instances.each do |instance| new_backends << { - 'name' => instance.private_dns_name, + 'name' => instance.tags["Name"], 'host' => instance.private_ip_address, 'port' => @haproxy['server_port_override'], } From 8309b70e503ecdb9d8845df199077102ce33b902 Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Fri, 22 Aug 2014 16:07:17 +0000 Subject: [PATCH 2/6] add max_checks and check_inteval options default for check_interval is 15 sec default for max_checks is 0 (unlimited) --- lib/synapse.rb | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..4584f7a8 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -27,6 +27,16 @@ def initialize(opts={}) # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true + if opts['check_interval'] + @check_interval = opts['check_interval'].to_i + else + @check_interval = 15 + end + if opts['max_checks'] + @max_checks = opts['max_checks'].to_i + else + @max_checks = 0 + end log.debug "synapse: completed init" end @@ -50,10 +60,14 @@ def run log.info "synapse: regenerating haproxy config" @haproxy.update_config(@service_watchers) else - sleep 1 + sleep @check_interval end loops += 1 + if @max_checks != 0 and loops > @max_checks + log.info "synapse: exiting after #{loops} loops" + break + end log.debug "synapse: still running at #{Time.now}" if (loops % 60) == 0 end From db7259db8c8f165bbe50e929f96ea9cc8b58d64e Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Wed, 27 Aug 2014 19:05:46 +0000 Subject: [PATCH 3/6] add InstanceCache to reduce AWS API Calls AWS.memoize does not do the job as it only does it per thread Added an InstanceCache singleton to manage instances metadata with timeouts --- lib/synapse/service_watcher/ec2tag.rb | 76 ++++++++++++++++++++------- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index cc5b7137..a5f6bd20 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -1,8 +1,34 @@ require 'synapse/service_watcher/base' require 'aws-sdk' +require 'ostruct' + +InstanceCache = Object.new +class << InstanceCache + def __init__ + @i_time = Time.now + @instances = nil + @cacheTimeout = 60 + @mutex = Mutex.new + end + def set(instances) + @instances = instances + @i_time = Time.now + end + def get() + if Time.now - @i_time > (@cacheTimeout + rand(10)) + @instances = nil + end + @instances + end + def get_mutex() + @mutex + end +end +InstanceCache.__init__() module Synapse class EC2Watcher < BaseWatcher + attr_reader :check_interval @@ -18,9 +44,10 @@ def start @check_interval = @discovery['check_interval'] || 15.0 log.info "synapse: ec2tag watcher looking for instances " + - "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" + "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']} #{@discovery['selector']} " @watcher = Thread.new { watch } + instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value']) end private @@ -86,32 +113,41 @@ def sleep_until_next_check(start_time) end def discover_instances - AWS.memoize do instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value']) if @discovery['selector'] instances = eval("instances.select { |i| #{@discovery['selector']}}") end - new_backends = [] - - # choice of private_dns_name, dns_name, private_ip_address or - # ip_address, for now, just stick with the private fields. - instances.each do |instance| - new_backends << { - 'name' => instance.tags["Name"], - 'host' => instance.private_ip_address, - 'port' => @haproxy['server_port_override'], - } - end - - new_backends - end + # do not want to update the cached objects + inst = instances.clone() + # add server port + inst.each { | i | i['port'] = @haproxy['server_port_override'] } + # sort so that the back end are generated in the same way + inst.sort_by! { |i| i['name'] } + inst end def instances_with_tags(tag_name, tag_value) - @ec2.instances - .tagged(tag_name) - .tagged_values(tag_value) - .select { |i| i.status == :running } + InstanceCache.get_mutex().synchronize do + inst = InstanceCache.get() + if inst.nil? + AWS.memoize do + log.info ("AWS API Call for #{tag_name}, #{tag_value}") + instances = @ec2.instances + .tagged(tag_name) + .tagged_values(tag_value) + .select { |i| i.status == :running } + inst = [] + instances.each { | i | + inst << OpenStruct.new({'tags' => i.tags.to_h, + 'host' => i.private_ip_address, + 'name' => i.tags["Name"]}) + } + + end + InstanceCache.set(inst) + end + return inst + end end def configure_backends(new_backends) From 301b37c3f60cfa79aee8e5c71b96222102156ba3 Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Wed, 27 Aug 2014 20:11:46 +0000 Subject: [PATCH 4/6] fixed issues with port variable being shared incorrectly --- lib/synapse/service_watcher/ec2tag.rb | 32 ++++++++++++++++++--------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a5f6bd20..44c1bedb 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -88,11 +88,11 @@ def watch current_backends = discover_instances if last_backends != current_backends - log.info "synapse: ec2tag watcher backends have changed." + log.info "synapse: ec2tag watcher #{@name} backends have changed." last_backends = current_backends configure_backends(current_backends) else - log.info "synapse: ec2tag watcher backends are unchanged." + log.info "synapse: ec2tag watcher #{@name} backends are unchanged." end sleep_until_next_check(start) @@ -118,9 +118,13 @@ def discover_instances instances = eval("instances.select { |i| #{@discovery['selector']}}") end # do not want to update the cached objects - inst = instances.clone() + inst = [] # add server port - inst.each { | i | i['port'] = @haproxy['server_port_override'] } + instances.each { | i | + iclone = OpenStruct.new(i.to_h) + iclone['port'] = @haproxy['server_port_override'] + inst << iclone + } # sort so that the back end are generated in the same way inst.sort_by! { |i| i['name'] } inst @@ -132,17 +136,25 @@ def instances_with_tags(tag_name, tag_value) if inst.nil? AWS.memoize do log.info ("AWS API Call for #{tag_name}, #{tag_value}") - instances = @ec2.instances - .tagged(tag_name) - .tagged_values(tag_value) - .select { |i| i.status == :running } + begin + instances = @ec2.instances + .tagged(tag_name) + .tagged_values(tag_value) + .select { |i| i.status == :running } + rescue Exception => e + puts e.backtrace.inspect + puts e.message + raise e + end inst = [] instances.each { | i | - inst << OpenStruct.new({'tags' => i.tags.to_h, + inst_info = OpenStruct.new({'tags' => i.tags.to_h, 'host' => i.private_ip_address, 'name' => i.tags["Name"]}) + inst_info.freeze + inst << inst_info } - + inst.freeze end InstanceCache.set(inst) end From eeb4dfc2de3c77b988a0173626d92894a8730078 Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Thu, 28 Aug 2014 01:11:02 +0000 Subject: [PATCH 5/6] Add retries if "Request limit exceeds" "Request limit exceeded" is handled by a random back off --- lib/synapse/service_watcher/ec2tag.rb | 28 ++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index 44c1bedb..a31dc7cb 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -134,18 +134,28 @@ def instances_with_tags(tag_name, tag_value) InstanceCache.get_mutex().synchronize do inst = InstanceCache.get() if inst.nil? + tries = 0 + ntries = 4 AWS.memoize do - log.info ("AWS API Call for #{tag_name}, #{tag_value}") - begin - instances = @ec2.instances - .tagged(tag_name) - .tagged_values(tag_value) - .select { |i| i.status == :running } + while tries < ntries do + log.info ("AWS API Call for #{tag_name}, #{tag_value}") + begin + instances = @ec2.instances + .tagged(tag_name) + .tagged_values(tag_value) + .select { |i| i.status == :running } + break rescue Exception => e - puts e.backtrace.inspect - puts e.message - raise e + if e.message.include?("Request limit exceeded") + sleeping = rand(6) + 4 + log.warn ("#{e.message} retry #{tries} after #{sleeping} sec") + tries += 1 + sleep(sleeping) + else + raise e + end end + end inst = [] instances.each { | i | inst_info = OpenStruct.new({'tags' => i.tags.to_h, From 17e953fade09337544acf7a9c9815f9eb701715f Mon Sep 17 00:00:00 2001 From: Mandar U Jog Date: Thu, 28 Aug 2014 03:22:50 +0000 Subject: [PATCH 6/6] use AWS API logging, for API access watch --- lib/synapse/service_watcher/ec2tag.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a31dc7cb..0d309f7b 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -35,7 +35,7 @@ class EC2Watcher < BaseWatcher def start region = @discovery['aws_region'] || ENV['AWS_REGION'] log.info "Connecting to EC2 region: #{region}" - + AWS.config(:logger => log) @ec2 = AWS::EC2.new( region: region, access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'],