diff --git a/Project.toml b/Project.toml index 1735553..073dd3e 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ConcurrentUtilities" uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb" authors = ["Jacob Quinn "] -version = "2.3.1" +version = "2.4.0" [deps] Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" diff --git a/src/ConcurrentUtilities.jl b/src/ConcurrentUtilities.jl index 6d18413..80ecad0 100644 --- a/src/ConcurrentUtilities.jl +++ b/src/ConcurrentUtilities.jl @@ -4,6 +4,14 @@ export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunloc Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException, Pool, acquire, release, drain!, try_with_timeout, TimeoutException +macro samethreadpool_spawn(expr) + if isdefined(Base.Threads, :threadpool) + esc(:(Threads.@spawn Threads.threadpool() $expr)) + else + esc(:(Threads.@spawn $expr)) + end +end + include("try_with_timeout.jl") include("workers.jl") using .Workers diff --git a/src/try_with_timeout.jl b/src/try_with_timeout.jl index b742399..1c4f6f8 100644 --- a/src/try_with_timeout.jl +++ b/src/try_with_timeout.jl @@ -77,7 +77,7 @@ function try_with_timeout(f, timeout, ::Type{T}=Any) where {T} ch = Channel{T}(0) x = TimedOut(ch) timer = Timer(tm -> !isready(ch) && close(ch, TimeoutException(timeout)), timeout) - Threads.@spawn begin + @samethreadpool_spawn begin try put!(ch, $f(x)::T) catch e diff --git a/test/try_with_timeout.jl b/test/try_with_timeout.jl index 083e8c9..079ccea 100644 --- a/test/try_with_timeout.jl +++ b/test/try_with_timeout.jl @@ -26,4 +26,11 @@ using ConcurrentUtilities, Test @test e isa CapturedException rethrow(e.ex) end + + # try_with_timeout should not migrate the task to a different thread pool + if isdefined(Base.Threads, :threadpool) + @test try_with_timeout(_ -> Threads.threadpool(), 1) == Threads.threadpool() + @test read(`julia -t 1,1 -E 'using ConcurrentUtilities; try_with_timeout(_ -> Threads.threadpool(), 1)'`, String) == ":interactive\n" + @test read(`julia -t 1,1 -E 'using ConcurrentUtilities; fetch(Threads.@spawn begin try_with_timeout(_ -> Threads.threadpool(), 1) end)'`, String) == ":default\n" + end end