Skip to content

Commit

Permalink
implement per-origin connection threshold per pool
Browse files Browse the repository at this point in the history
defaulting to unbounded, in order to preserve current behaviour; this will cap the number of connections initiated for a given origin for a pool, which if not shared, will be per-origin; this will include connections from separate option profiles

a pool timeout is defined to checkout a connection when limit is reeached
  • Loading branch information
HoneyryderChuck committed Nov 19, 2024
1 parent d77e97d commit 1f9dcfb
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 39 deletions.
12 changes: 12 additions & 0 deletions lib/httpx/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def to_connection_error
end
end

# Raise when it can't acquire a connection for a given origin.
class PoolTimeoutError < TimeoutError
attr_reader :origin

# initializes the +origin+ it refers to, and the
# +timeout+ causing the error.
def initialize(origin, timeout)
@origin = origin
super(timeout, "Timed out after #{timeout} seconds while waiting for a connection to #{origin}")
end
end

# Error raised when there was a timeout establishing the connection to a server.
# This may be raised due to timeouts during TCP and TLS (when applicable) connection
# establishment.
Expand Down
56 changes: 45 additions & 11 deletions lib/httpx/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,32 @@
module HTTPX
class Pool
using ArrayExtensions::FilterMap
using URIExtensions

POOL_TIMEOUT = 5

# Sets up the connection pool with the given +options+, which can be the following:
#
# :max_connections_per_origin :: the maximum number of connections held in the pool pointing to a given origin.
# :pool_timeout :: the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)
#
def initialize(options)
@options = options
@pool_options = options.pool_options
@max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
@pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
@resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
@resolver_mtx = Thread::Mutex.new
@connections = []
@connection_mtx = Thread::Mutex.new
@origin_counters = Hash.new(0)
@origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
end

def pop_connection
@connection_mtx.synchronize { @connections.shift }
@connection_mtx.synchronize do
conn = @connections.shift
@origin_conds.delete(conn.origin) if conn && ((@origin_counters[conn.origin.to_s] -= 1) == 0)
conn
end
end

# opens a connection to the IP reachable through +uri+.
Expand All @@ -29,19 +43,29 @@ def checkout_connection(uri, options)
return checkout_new_connection(uri, options) if options.io

@connection_mtx.synchronize do
conn = @connections.find do |connection|
connection.match?(uri, options)
end
@connections.delete(conn) if conn
acquire_connection(uri, options) || begin
if @origin_counters[uri.origin] == @max_connections_per_origin

conn
end || checkout_new_connection(uri, options)
@origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)

return acquire_connection(uri, options) || raise(PoolTimeoutError.new(uri.origin, @pool_timeout))
end

@origin_counters[uri.origin] += 1

checkout_new_connection(uri, options)
end
end
end

def checkin_connection(connection, delete = false)
def checkin_connection(connection)
return if connection.options.io

@connection_mtx.synchronize { @connections << connection } unless delete
@connection_mtx.synchronize do
@connections << connection

@origin_conds[connection.origin.to_s].signal
end
end

def checkout_mergeable_connection(connection)
Expand Down Expand Up @@ -89,6 +113,16 @@ def checkin_resolver(resolver)

private

def acquire_connection(uri, options)
conn = @connections.find do |connection|
connection.match?(uri, options)
end

@connections.delete(conn) if conn

conn
end

def checkout_new_connection(uri, options)
options.connection_class.new(uri, options)
end
Expand Down
12 changes: 8 additions & 4 deletions lib/httpx/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def initialize(options = EMPTY_HASH, &blk)
@options = self.class.default_options.merge(options)
@responses = {}
@persistent = @options.persistent
@pool = @options.pool_class.new(@options)
@pool = @options.pool_class.new(@options.pool_options)
@wrapped = false
@closing = false
wrap(&blk) if blk
Expand Down Expand Up @@ -242,9 +242,13 @@ def fetch_response(request, _selector, _options)

# sends the +request+ to the corresponding HTTPX::Connection
def send_request(request, selector, options = request.options)
error = catch(:resolve_error) do
connection = find_connection(request.uri, selector, options)
connection.send(request)
error = begin
catch(:resolve_error) do
connection = find_connection(request.uri, selector, options)
connection.send(request)
end
rescue StandardError => e
e
end
return unless error.is_a?(Error)

Expand Down
6 changes: 6 additions & 0 deletions sig/errors.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ module HTTPX
def initialize: (Numeric timeout, String message) -> untyped
end

class PoolTimeoutError < TimeoutError
attr_reader origin: String

def initialize: (String origin, Numeric timeout) -> void
end

class ConnectTimeoutError < TimeoutError
end

Expand Down
2 changes: 1 addition & 1 deletion sig/options.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ module HTTPX
attr_reader resolver_options: Hash[Symbol, untyped]

# resolver_options
attr_reader pool_options: Hash[Symbol, untyped]
attr_reader pool_options: pool_options

# ip_families
attr_reader ip_families: Array[ip_family]
Expand Down
17 changes: 14 additions & 3 deletions sig/pool.rbs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
module HTTPX
type pool_options = {
max_connections_per_origin: Integer?,
pool_timeout: Numeric?
}

class Pool
type resolver_manager = Resolver::Multi | Resolver::System

@max_connections_per_origin: Integer
@pool_timeout: Numeric
@options: Options
@resolvers: Hash[Class, Array[resolver_manager]]
@resolver_mtx: Thread::Mutex
@connections: Array[Connection]
@connections: Hash[String, Array[Connection]]
@connection_mtx: Thread::Mutex
@origin_counters: Hash[String, Integer]
@origin_conds: Hash[String, ConditionVariable]

def pop_connection: () -> Connection?

def checkout_connection: (http_uri uri, Options options) -> Connection

def checkin_connection: (Connection connection, ?boolish delete) -> void
def checkin_connection: (Connection connection) -> void

def checkout_mergeable_connection: (Connection connection) -> Connection?

Expand All @@ -24,7 +33,9 @@ module HTTPX

private

def initialize: (Options options) -> void
def initialize: (pool_options options) -> void

def acquire_connection: (http_uri, Options options) -> Connection?

def checkout_new_connection: (http_uri uri, Options options) -> Connection

Expand Down
82 changes: 81 additions & 1 deletion test/pool_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,88 @@

class PoolTest < Minitest::Test
include HTTPHelpers
include HTTPX

# TODO: add connection pool tests
using URIExtensions

def test_pool_max_connections_per_origin
uri = URI(build_uri("/"))
responses = []
q = Queue.new
mtx = Thread::Mutex.new

pool = Pool.new(max_connections_per_origin: 2)
def pool.connections
@connections
end

def pool.origin_counters
@origin_counters
end
ths = 3.times.map do |_i|
Thread.start do
HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 30 }) do |http|
http.instance_variable_set(:@pool, pool)
response = http.get(uri)
mtx.synchronize { responses << response }
q.pop
end
end
end

not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3
until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2
ths.first(&:alive?).join(not_after - now)
end

assert pool.connections.empty?, "thread sessions should still be holding to the connections"
assert pool.origin_counters[uri.origin] <= 2

3.times { q << :done }
ths.each(&:join)

assert responses.size == 3
responses.each do |res|
verify_status(res, 200)
end
end

def test_pool_pool_timeout
uri = URI(build_uri("/"))
q = Queue.new
Thread::Mutex.new

pool = Pool.new(max_connections_per_origin: 2, pool_timeout: 1)

ths = 3.times.map do |_i|
Thread.start do
res = nil
HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 1 }) do |http|
begin
http.instance_variable_set(:@pool, pool)
res = http.get(uri).tap { q.pop }
rescue StandardError => e
res = e
end
end
res
end
end

not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3
until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2
ths.first(&:alive?).join(not_after - now)
end
sleep 1
3.times { q << :done }
ths.each(&:join)

results = ths.map(&:value)

assert(results.one?(ErrorResponse))
err_res = results.find { |r| r.is_a?(ErrorResponse) }
verify_error_response(err_res, PoolTimeoutError)
end

private

Expand Down
8 changes: 4 additions & 4 deletions test/proxy_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "httpx/plugins/proxy"

class ProxyTest < Minitest::Test
include HTTPHelpers
include HTTPX

def test_parameters_equality
Expand All @@ -29,10 +30,9 @@ def test_parameters_equality
end

def test_proxy_unsupported_scheme
ex = assert_raises(HTTPX::HTTPProxyError) do
HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com")
end
assert ex.message == "https: unsupported proxy protocol"
res = HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com")
verify_error_response(res, HTTPX::HTTPProxyError)
verify_error_response(res, "https: unsupported proxy protocol")
end

private
Expand Down
3 changes: 2 additions & 1 deletion test/support/requests/plugins/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ module Proxy
def test_plugin_no_proxy_defined
http = HTTPX.plugin(:proxy)
uri = build_uri("/get")
assert_raises(HTTPX::HTTPProxyError) { http.with_proxy(uri: []).get(uri) }
res = http.with_proxy(uri: []).get(uri)
verify_error_response(res, HTTPX::HTTPProxyError)
end

def test_plugin_http_http_proxy
Expand Down
2 changes: 1 addition & 1 deletion test/support/requests/resolvers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def decode_response_body(_response)
response = session.get(uri, resolver_class: resolver_type, resolver_options: options.merge(resolver_opts))
verify_status(response, 200)

resolver = session.pool.resolver
resolver = session.resolver
assert resolver.instance_variable_get(:@ns_index) == 1
end
end
Expand Down
23 changes: 10 additions & 13 deletions test/support/session_with_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,7 @@

module SessionWithPool
module PoolMethods
include HTTPX

attr_reader :resolvers

def resolver
resolver_type = @options.resolver_class
resolver_type = Resolver.resolver_for(resolver_type)

resolver = @resolvers[resolver_type].first

resolver = resolver.resolvers[0] if resolver.is_a?(Resolver::Multi)

resolver
end
end

module InstanceMethods
Expand All @@ -29,6 +16,16 @@ def initialize(*)
super
end

def resolver
resolver_type = HTTPX::Resolver.resolver_for(@options.resolver_class)

resolver = @pool.resolvers[resolver_type].first

resolver = resolver.resolvers[0] if resolver.is_a?(HTTPX::Resolver::Multi)

resolver
end

private

def do_init_connection(connection, *)
Expand Down

0 comments on commit 1f9dcfb

Please sign in to comment.