Skip to content

Commit 9249e7e

Browse files
committed
Merge pull request #328 from mattbrictson/connection-pool-rewrite
Rewrite ConnectionPool for efficiency
2 parents 211a168 + 6de8614 commit 9249e7e

File tree

7 files changed

+272
-176
lines changed

7 files changed

+272
-176
lines changed

CHANGELOG.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@ This file is written in reverse chronological order, newer releases will
44
appear at the top.
55

66
## `master` (Unreleased)
7+
* Add your entries below here, remember to credit yourself however you want
8+
to be credited!
79
* Do not auto-include DSL in context of `require`. Load DSL with Gem and allow to include it.
810
[PR #219](https://github.com/capistrano/sshkit/pull/219)
911
@beatrichartz
10-
* `SSHKit::Backend::Netssh.pool.idle_timeout = 0` doesn't disable connection pooling anymore,
11-
only connection expiration. To disable connection polling use `SSHKit::Backend::Netssh.pool.enabled = false`
12-
* Add your entries below here, remember to credit yourself however you want
13-
to be credited!
1412
* make sure working directory for commands is properly cleared after `within` blocks
1513
[PR #307](https://github.com/capistrano/sshkit/pull/307)
1614
@steved
@@ -30,6 +28,10 @@ appear at the top.
3028
* Fix a race condition experienced in JRuby that could cause multi-server
3129
deploys to fail. [PR #322](https://github.com/capistrano/sshkit/pull/322)
3230
@mattbrictson
31+
* The ConnectionPool has been rewritten in this release to be more efficient
32+
and have a cleaner internal API. You can still completely disable the pool
33+
by setting `SSHKit::Backend::Netssh.pool.idle_timeout = 0`.
34+
@mattbrictson @byroot [PR #328](https://github.com/capistrano/sshkit/pull/328)
3335

3436
## 1.8.1
3537

README.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -500,18 +500,13 @@ seconds. This timeout can be changed as follows:
500500
SSHKit::Backend::Netssh.pool.idle_timeout = 60 # seconds
501501
```
502502

503-
If you suspect the connection pruning is causing problems, you can disable the
504-
expiration behaviour entirely by setting the idle_timeout to zero:
503+
If you suspect the connection pooling is causing problems, you can disable the
504+
pooling behaviour entirely by setting the idle_timeout to zero:
505505

506506
```ruby
507507
SSHKit::Backend::Netssh.pool.idle_timeout = 0 # disabled
508508
```
509509

510-
Or even disable the pooling entirely, but it will severely reduce performance:
511-
```
512-
SSHKit::Backend::Netssh.pool.enabled = false # disabled
513-
```
514-
515510
## Tunneling and other related SSH themes
516511

517512
In order to do special gymnasitcs with SSH, tunneling, aliasing, complex options, etc with SSHKit it is possible to use [the underlying Net::SSH API](https://github.com/capistrano/sshkit/blob/master/EXAMPLES.md#setting-global-ssh-options) however in many cases it is preferred to use the system SSH configuration file at [`~/.ssh/config`](http://man.cx/ssh_config). This allows you to have personal configuration tied to your machine that does not have to be committed with the repository. If this is not suitable (everyone on the team needs a proxy command, or some special aliasing) a file in the same format can be placed in the project directory at `~/yourproject/.ssh/config`, this will be merged with the system settings in `~/.ssh/config`, and with any configuration specified in [`SSHKit::Backend::Netssh.config.ssh_options`](https://github.com/capistrano/sshkit/blob/master/lib/sshkit/backends/netssh.rb#L133).
Lines changed: 134 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,159 @@
1+
require "monitor"
12
require "thread"
23

3-
# Since we call to_s on new_connection_args and use that as a hash
4-
# We need to make sure the memory address of the object is not used as part of the key
5-
# Otherwise identical objects with different memory address won't get a hash hit.
6-
# In the case of proxy commands, this can lead to proxy processes leaking
7-
# And in severe cases can cause deploys to fail due to default file descriptor limits
8-
# An alternate solution would be to use a different means of generating hash keys
4+
# Since we call to_s on new connection arguments and use that as a cache key, we
5+
# need to make sure the memory address of the object is not used as part of the
6+
# key. Otherwise identical objects with different memory address won't reuse the
7+
# cache.
8+
#
9+
# In the case of proxy commands, this can lead to proxy processes leaking, and
10+
# in severe cases can cause deploys to fail due to default file descriptor
11+
# limits. An alternate solution would be to use a different means of generating
12+
# hash keys.
13+
#
914
require "net/ssh/proxy/command"
1015
class Net::SSH::Proxy::Command
16+
# Ensure a stable string value is used, rather than memory address.
1117
def inspect
1218
@command_line_template
1319
end
1420
end
1521

16-
module SSHKit
17-
18-
module Backend
19-
20-
class ConnectionPool
21-
22-
attr_accessor :idle_timeout, :enabled
23-
24-
def initialize
25-
self.idle_timeout = 30
26-
self.enabled = true
27-
@mutex = Mutex.new
28-
@pool = {}
29-
end
22+
# The ConnectionPool caches connections and allows them to be reused, so long as
23+
# the reuse happens within the `idle_timeout` period. Timed out connections are
24+
# closed, forcing a new connection to be used in that case.
25+
#
26+
# Additionally, a background thread is started to check for abandoned
27+
# connections that have timed out without any attempt at being reused. These
28+
# are eventually closed as well and removed from the cache.
29+
#
30+
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and
31+
# a new connection is created and then immediately closed each time. The default
32+
# timeout is 30 (seconds).
33+
#
34+
# There is a single public method: `with`. Example usage:
35+
#
36+
# pool = SSHKit::Backend::ConnectionPool.new
37+
# pool.with(Net::SSH.method(:start), "host", "username") do |connection|
38+
# # do stuff with connection
39+
# end
40+
#
41+
class SSHKit::Backend::ConnectionPool
42+
attr_accessor :idle_timeout
43+
44+
def initialize(idle_timeout=30)
45+
@idle_timeout = idle_timeout
46+
@caches = {}
47+
@caches.extend(MonitorMixin)
48+
@timed_out_connections = Queue.new
49+
Thread.new { run_eviction_loop }
50+
end
3051

31-
def prune_expired?
32-
idle_timeout && idle_timeout != 0
33-
end
52+
# Creates a new connection or reuses a cached connection (if possible) and
53+
# yields the connection to the given block. Connections are created by
54+
# invoking the `connection_factory` proc with the given `args`. The arguments
55+
# are used to construct a key used for caching.
56+
def with(connection_factory, *args)
57+
cache = find_cache(args)
58+
conn = cache.pop || begin
59+
connection_factory.call(*args)
60+
end
61+
yield(conn)
62+
ensure
63+
cache.push(conn) unless conn.nil?
64+
end
3465

35-
def checkout(*new_connection_args, &block)
36-
entry = nil
37-
key = new_connection_args.to_s
38-
if enabled
39-
prune_expired if prune_expired?
40-
entry = find_live_entry(key)
41-
end
42-
entry || create_new_entry(new_connection_args, key, &block)
43-
end
66+
# Immediately remove all cached connections, without closing them. This only
67+
# exists for unit test purposes.
68+
def flush_connections
69+
caches.synchronize { caches.clear }
70+
end
4471

45-
def checkin(entry)
46-
if enabled
47-
if prune_expired?
48-
entry.expires_at = Time.now + idle_timeout
49-
prune_expired
50-
end
51-
@mutex.synchronize do
52-
@pool[entry.key] ||= []
53-
@pool[entry.key] << entry
54-
end
55-
end
56-
end
72+
# Immediately close all cached connections and empty the pool.
73+
def close_connections
74+
caches.synchronize do
75+
caches.values.each(&:clear)
76+
caches.clear
77+
process_deferred_close
78+
end
79+
end
5780

58-
def close_connections
59-
@mutex.synchronize do
60-
@pool.values.flatten.map(&:connection).uniq.each do |conn|
61-
if conn.respond_to?(:closed?) && conn.respond_to?(:close)
62-
conn.close unless conn.closed?
63-
end
64-
end
65-
@pool.clear
66-
end
67-
end
81+
private
6882

69-
def flush_connections
70-
@mutex.synchronize { @pool.clear }
71-
end
83+
attr_reader :caches, :timed_out_connections
7284

73-
private
74-
75-
def prune_expired
76-
@mutex.synchronize do
77-
@pool.each_value do |entries|
78-
entries.collect! do |entry|
79-
if entry.expired?
80-
entry.close unless entry.closed?
81-
nil
82-
else
83-
entry
84-
end
85-
end.compact!
86-
end
87-
end
88-
end
85+
def cache_enabled?
86+
idle_timeout && idle_timeout > 0
87+
end
8988

90-
def find_live_entry(key)
91-
@mutex.synchronize do
92-
return nil unless @pool.key?(key)
93-
while (entry = @pool[key].shift)
94-
return entry if entry.live?
95-
end
96-
end
97-
nil
98-
end
89+
# Look up a Cache that matches the given connection arguments.
90+
def find_cache(args)
91+
if cache_enabled?
92+
key = args.to_s
93+
caches[key] || thread_safe_find_or_create_cache(key)
94+
else
95+
NilCache.new(method(:silently_close_connection))
96+
end
97+
end
9998

100-
def create_new_entry(args, key, &block)
101-
Entry.new block.call(*args), key
99+
# Cache creation needs to happen in a mutex, because otherwise a race
100+
# condition might cause two identical caches to be created for the same key.
101+
def thread_safe_find_or_create_cache(key)
102+
caches.synchronize do
103+
caches[key] ||= begin
104+
Cache.new(idle_timeout, method(:silently_close_connection_later))
102105
end
106+
end
107+
end
103108

104-
Entry = Struct.new(:connection, :key) do
105-
attr_accessor :expires_at
106-
107-
def live?
108-
!expired? && !closed?
109-
end
109+
# Loops indefinitely to close connections and to find abandoned connections
110+
# that need to be closed.
111+
def run_eviction_loop
112+
loop do
113+
process_deferred_close
110114

111-
def expired?
112-
expires_at && Time.now > expires_at
113-
end
115+
# Periodically sweep all Caches to evict stale connections
116+
sleep([idle_timeout, 5].min)
117+
caches.values.each(&:evict)
118+
end
119+
end
114120

115-
def close
116-
connection.respond_to?(:close) && connection.close
117-
end
121+
# Immediately close any connections that are pending closure.
122+
# rubocop:disable Lint/HandleExceptions
123+
def process_deferred_close
124+
until timed_out_connections.empty?
125+
connection = timed_out_connections.pop(true)
126+
silently_close_connection(connection)
127+
end
128+
rescue ThreadError
129+
# Queue#pop(true) raises ThreadError if the queue is empty.
130+
# This could only happen if `close_connections` is called at the same time
131+
# the background eviction thread has woken up to close connections. In any
132+
# case, it is not something we need to care about, since an empty queue is
133+
# perfectly OK.
134+
end
135+
# rubocop:enable Lint/HandleExceptions
118136

119-
def closed?
120-
connection.respond_to?(:closed?) && connection.closed?
121-
end
122-
end
137+
# Adds the connection to a queue that is processed asynchronously by a
138+
# background thread. The connection will eventually be closed.
139+
def silently_close_connection_later(connection)
140+
timed_out_connections << connection
141+
end
123142

124-
end
143+
# Close the given `connection` immediately, assuming it responds to a `close`
144+
# method. If it doesn't, or if `nil` is provided, it is silently ignored. Any
145+
# `StandardError` is also silently ignored. Returns `true` if the connection
146+
# was closed; `false` if it was already closed or could not be closed due to
147+
# an error.
148+
def silently_close_connection(connection)
149+
return false unless connection.respond_to?(:close)
150+
return false if connection.respond_to?(:closed?) && connection.closed?
151+
connection.close
152+
true
153+
rescue StandardError
154+
false
125155
end
126156
end
157+
158+
require "sshkit/backends/connection_pool/cache"
159+
require "sshkit/backends/connection_pool/nil_cache"
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# A Cache holds connections for a given key. Each connection is stored along
2+
# with an expiration time so that its idle duration can be measured.
3+
class SSHKit::Backend::ConnectionPool::Cache
4+
def initialize(idle_timeout, closer)
5+
@connections = []
6+
@connections.extend(MonitorMixin)
7+
@idle_timeout = idle_timeout
8+
@closer = closer
9+
end
10+
11+
# Remove and return a fresh connection from this Cache. Returns `nil` if
12+
# the Cache is empty or if all existing connections have gone stale.
13+
def pop
14+
connections.synchronize do
15+
evict
16+
_, connection = connections.pop
17+
connection
18+
end
19+
end
20+
21+
# Return a connection to this Cache.
22+
def push(conn)
23+
# No need to cache if the connection has already been closed.
24+
return if closed?(conn)
25+
26+
connections.synchronize do
27+
connections.push([Time.now + idle_timeout, conn])
28+
end
29+
end
30+
31+
# Close and remove any connections in this Cache that have been idle for
32+
# too long.
33+
def evict
34+
# Peek at the first connection to see if it is still fresh. If so, we can
35+
# return right away without needing to use `synchronize`.
36+
first_expires_at, _connection = connections.first
37+
return if first_expires_at.nil? || fresh?(first_expires_at)
38+
39+
connections.synchronize do
40+
fresh, stale = connections.partition do |expires_at, _|
41+
fresh?(expires_at)
42+
end
43+
connections.replace(fresh)
44+
stale.each { |_, conn| closer.call(conn) }
45+
end
46+
end
47+
48+
# Close all connections and completely clear the cache.
49+
def clear
50+
connections.synchronize do
51+
connections.map(&:last).each(&closer)
52+
connections.clear
53+
end
54+
end
55+
56+
private
57+
58+
attr_reader :connections, :idle_timeout, :closer
59+
60+
def fresh?(expires_at)
61+
expires_at > Time.now
62+
end
63+
64+
def closed?(conn)
65+
conn.respond_to?(:closed?) && conn.closed?
66+
end
67+
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# A cache that holds no connections. Any connection provided to this cache
2+
# is simply closed.
3+
SSHKit::Backend::ConnectionPool::NilCache = Struct.new(:closer) do
4+
def pop
5+
nil
6+
end
7+
8+
def push(conn)
9+
closer.call(conn)
10+
end
11+
end

0 commit comments

Comments
 (0)