From 719e85231fa33d9e94025f364a4a5dc62ed08844 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 9 Aug 2018 09:13:49 +1000 Subject: [PATCH] (reactor) improve thread pool --- lib/libuv/reactor.rb | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/lib/libuv/reactor.rb b/lib/libuv/reactor.rb index db5d76e..55f412d 100644 --- a/lib/libuv/reactor.rb +++ b/lib/libuv/reactor.rb @@ -8,13 +8,20 @@ class Reactor extend Accessors + LIBUV_MIN_POOL = ENV['LIBUV_MIN_POOL'] || 8 + LIBUV_MAX_POOL = ENV['LIBUV_MAX_POOL'] || 40 + LIBUV_MAX_QUEUE = ENV['LIBUV_MAX_QUEUE'] || 50000 + THREAD_POOL = ::Concurrent::ThreadPoolExecutor.new( + min_threads: LIBUV_MIN_POOL, + max_threads: LIBUV_MAX_POOL, + max_queue: LIBUV_MAX_QUEUE + ) CRITICAL = ::Mutex.new - THREAD_POOL = ::Concurrent::FixedThreadPool.new(8) module ClassMethods # Get default reactor - # + # # @return [::Libuv::Reactor] def default return @default unless @default.nil? @@ -24,7 +31,7 @@ def default end # Create new Libuv reactor - # + # # @return [::Libuv::Reactor] def new(&blk) thread = create(::Libuv::Ext.loop_new) @@ -37,7 +44,7 @@ def new(&blk) end # Build a Ruby Libuv reactor from an existing reactor pointer - # + # # @return [::Libuv::Reactor] def create(pointer) allocate.tap { |i| i.send(:initialize, pointer) } @@ -234,7 +241,7 @@ def notifier self end - # Creates a deferred result object for where the result of an operation may only be returned + # Creates a deferred result object for where the result of an operation may only be returned # at some point in the future or is being processed on a different thread (thread safe) # # @return [::Libuv::Q::Deferred] @@ -283,7 +290,7 @@ def reject(reason) end # forces reactor time update, useful for getting more granular times - # + # # @return nil def update_time ::Libuv::Ext.update_time(@pointer) @@ -291,7 +298,7 @@ def update_time end # Get current time in milliseconds - # + # # @return [Integer] def now ::Libuv::Ext.now(@pointer) @@ -326,7 +333,7 @@ def sleep(msecs) end # Get a new TCP instance - # + # # @return [::Libuv::TCP] def tcp(**opts, &callback) TCP.new(@reactor, progress: callback, **opts) @@ -340,7 +347,7 @@ def udp(**opts, &callback) end # Get a new TTY instance - # + # # @param fileno [Integer] Integer file descriptor of a tty device # @param readable [true, false] Boolean indicating if TTY is readable # @return [::Libuv::TTY] @@ -351,7 +358,7 @@ def tty(fileno, readable = false) end # Get a new Pipe instance - # + # # @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes # @return [::Libuv::Pipe] def pipe(ipc = false) @@ -359,7 +366,7 @@ def pipe(ipc = false) end # Get a new timer instance - # + # # @param callback [Proc] the callback to be called on timer trigger # @return [::Libuv::Timer] def timer @@ -369,7 +376,7 @@ def timer end # Get a new Prepare handle - # + # # @return [::Libuv::Prepare] def prepare handle = Prepare.new(@reactor) @@ -378,7 +385,7 @@ def prepare end # Get a new Check handle - # + # # @return [::Libuv::Check] def check handle = Check.new(@reactor) @@ -387,7 +394,7 @@ def check end # Get a new Idle handle - # + # # @param callback [Proc] the callback to be called on idle trigger # @return [::Libuv::Idle] def idle @@ -397,7 +404,7 @@ def idle end # Get a new Async handle - # + # # @return [::Libuv::Async] def async handle = Async.new(@reactor) @@ -406,7 +413,7 @@ def async end # Get a new signal handler - # + # # @return [::Libuv::Signal] def signal(signum = nil) handle = Signal.new(@reactor) @@ -458,7 +465,7 @@ def lookup(hostname, hint = :IPv4, port = 9, wait: true) end # Get a new FSEvent instance - # + # # @param path [String] the path to the file or folder for watching # @return [::Libuv::FSEvent] # @raise [ArgumentError] if path is not a string