Skip to content

Commit

Permalink
(reactor) improve thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach committed Aug 8, 2018
1 parent d3523d0 commit 719e852
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions lib/libuv/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ class Reactor
extend Accessors


LIBUV_MIN_POOL = ENV['LIBUV_MIN_POOL'] || 8
LIBUV_MAX_POOL = ENV['LIBUV_MAX_POOL'] || 40
LIBUV_MAX_QUEUE = ENV['LIBUV_MAX_QUEUE'] || 50000
THREAD_POOL = ::Concurrent::ThreadPoolExecutor.new(
min_threads: LIBUV_MIN_POOL,
max_threads: LIBUV_MAX_POOL,
max_queue: LIBUV_MAX_QUEUE
)
CRITICAL = ::Mutex.new
THREAD_POOL = ::Concurrent::FixedThreadPool.new(8)


module ClassMethods
# Get default reactor
#
#
# @return [::Libuv::Reactor]
def default
return @default unless @default.nil?
Expand All @@ -24,7 +31,7 @@ def default
end

# Create new Libuv reactor
#
#
# @return [::Libuv::Reactor]
def new(&blk)
thread = create(::Libuv::Ext.loop_new)
Expand All @@ -37,7 +44,7 @@ def new(&blk)
end

# Build a Ruby Libuv reactor from an existing reactor pointer
#
#
# @return [::Libuv::Reactor]
def create(pointer)
allocate.tap { |i| i.send(:initialize, pointer) }
Expand Down Expand Up @@ -234,7 +241,7 @@ def notifier
self
end

# Creates a deferred result object for where the result of an operation may only be returned
# Creates a deferred result object for where the result of an operation may only be returned
# at some point in the future or is being processed on a different thread (thread safe)
#
# @return [::Libuv::Q::Deferred]
Expand Down Expand Up @@ -283,15 +290,15 @@ def reject(reason)
end

# forces reactor time update, useful for getting more granular times
#
#
# @return nil
def update_time
::Libuv::Ext.update_time(@pointer)
self
end

# Get current time in milliseconds
#
#
# @return [Integer]
def now
::Libuv::Ext.now(@pointer)
Expand Down Expand Up @@ -326,7 +333,7 @@ def sleep(msecs)
end

# Get a new TCP instance
#
#
# @return [::Libuv::TCP]
def tcp(**opts, &callback)
TCP.new(@reactor, progress: callback, **opts)
Expand All @@ -340,7 +347,7 @@ def udp(**opts, &callback)
end

# Get a new TTY instance
#
#
# @param fileno [Integer] Integer file descriptor of a tty device
# @param readable [true, false] Boolean indicating if TTY is readable
# @return [::Libuv::TTY]
Expand All @@ -351,15 +358,15 @@ def tty(fileno, readable = false)
end

# Get a new Pipe instance
#
#
# @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes
# @return [::Libuv::Pipe]
def pipe(ipc = false)
Pipe.new(@reactor, ipc)
end

# Get a new timer instance
#
#
# @param callback [Proc] the callback to be called on timer trigger
# @return [::Libuv::Timer]
def timer
Expand All @@ -369,7 +376,7 @@ def timer
end

# Get a new Prepare handle
#
#
# @return [::Libuv::Prepare]
def prepare
handle = Prepare.new(@reactor)
Expand All @@ -378,7 +385,7 @@ def prepare
end

# Get a new Check handle
#
#
# @return [::Libuv::Check]
def check
handle = Check.new(@reactor)
Expand All @@ -387,7 +394,7 @@ def check
end

# Get a new Idle handle
#
#
# @param callback [Proc] the callback to be called on idle trigger
# @return [::Libuv::Idle]
def idle
Expand All @@ -397,7 +404,7 @@ def idle
end

# Get a new Async handle
#
#
# @return [::Libuv::Async]
def async
handle = Async.new(@reactor)
Expand All @@ -406,7 +413,7 @@ def async
end

# Get a new signal handler
#
#
# @return [::Libuv::Signal]
def signal(signum = nil)
handle = Signal.new(@reactor)
Expand Down Expand Up @@ -458,7 +465,7 @@ def lookup(hostname, hint = :IPv4, port = 9, wait: true)
end

# Get a new FSEvent instance
#
#
# @param path [String] the path to the file or folder for watching
# @return [::Libuv::FSEvent]
# @raise [ArgumentError] if path is not a string
Expand Down

0 comments on commit 719e852

Please sign in to comment.