Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move "fit" ProcessPool out of module top-level #647

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

tncowart
Copy link

@tncowart tncowart commented Oct 6, 2023

Some background: I'm using pystan in some multi-process code. Pystan works, but my script was hanging at the end. I found that httpstan was creating a bunch of processes and never ending them, which meant the pystan processes weren't ending and so my process wasn't ending. I traced the issue to this code that creates a ProcessPool in services_stub.py but never shuts down the pool.

Creating the ProcessPoolExecutor at the top level of the services_stub.py is a problem because the pool can't be shut down properly. This can break code using the httpstan library because, as the processes are never shut down, the calling process might hang on exit due to waiting for all child processes to exit..

I think this change solves the issue that creating the pool at the module top-level was meant to solve, but does so in a way that allows the process pool to complete its lifecycle, thus playing well with others.

Creating the ProcessPoolExecutor at the top level of the services_stub.py
is a problem because the pool can't be shut down properly. This can break
code using the httpstan library because the processes are never shut down
and the calling process might hang on exit.

I think this change solves the issue that creating the pool at the module
top-level was meant to solve, but does so in a way that allows the process
pool to complete its lifecycle, thus playing well with others.
If the pool is never created, we don't want to create one just to shut it down.
@tncowart tncowart marked this pull request as ready for review October 6, 2023 19:21
@riddell-stan
Copy link
Contributor

Thanks for the note and the code. This is an interesting change.

I'm cautious about changing anything involving multiprocessing because it's a platform-compatibility minefield and we know the current setup works (see e4b2a09 for one non-obvious bugfix involving the code you're changing).

I'd like to understand the issue better before committing to a change. In my experience on Linux with pystan you could always abort the main process with a keyboard interrupt quickly. If there were other processes still running, they didn't interfere with aborting the main process. Perhaps you can give me a fuller description of the problem or help me understand why Python isn't cleaning up properly?

@tncowart
Copy link
Author

tncowart commented Oct 6, 2023

What I'm trying to do in the application is build a stan model then create many sets of samples with different "data" parameters for the model. Since each sample set is independent this can be parallelized across multiple processes.

The following code (see below), adapted from the pystan documentation, illustrates what I'm doing. With httpstan 4.10.1 (the current version pystan uses, but I don't think there are any changes that would solve it in httpstan 4.11) this code will just hang after printing "done!". You can ctrl-c a bunch to get out of it, but even then there will be dozens of "zombie" processes left that you have to kill somehow. These zombie processes are the ones the httpstan process pool generates but doesn't shutdown.

My changes shutdown the pools properly when the httpstan aiohttp app shuts down, leaving no zombie processes and allowing the parent pystan processes to shut down and therefore allowing the main python process to shutdown.

import asyncio
import concurrent.futures
import multiprocessing as mp
from random import randint

import httpstan
import stan

schools_code = """
data {
  int<lower=0> J;         // number of schools
  array[J] real y;              // estimated treatment effects
  array[J] real<lower=0> sigma; // standard error of effect estimates
}
parameters {
  real mu;                // population treatment effect
  real<lower=0> tau;      // standard deviation in treatment effects
  vector[J] eta;          // unscaled deviation from mu by school
}
transformed parameters {
  vector[J] theta = mu + tau * eta;        // school treatment effects
}
model {
  target += normal_lpdf(eta | 0, 1);       // prior log-density
  target += normal_lpdf(y | theta, sigma); // log-likelihood
}
"""


async def build_model():
    model_name = httpstan.models.calculate_model_name(schools_code)
    async with stan.common.HttpstanClient() as client:
        response = await client.get("/models")
        model_names = {model["name"] for model in response.json()["models"]}

        if model_name in model_names:
            print("Model is cached")
        else:
            # Pre-build the model so it gets cached and doesn't get built repeatedly in each subprocess
            print("Building Model...")
            await client.post("/models", json={"program_code": schools_code})


def gen_data():
    data = []
    for _ in range(20):
        j = randint(1, 10)
        y = [randint(-10, 10) for _ in range(j)]
        sigma = [randint(1, 10) for _ in range(j)]
        data.append((j, y, sigma))

    return data


def run_stan(stan_args):
    J, y, sigma = stan_args
    schools_data = {"J": J, "y": y, "sigma": sigma}
    posterior = stan.build(schools_code, data=schools_data)
    fit = posterior.sample(num_chains=4, num_samples=1000)
    return fit


async def run():
    await build_model()
    stan_params = gen_data()
    results = []
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count())
    for samples in executor.map(run_stan, stan_params):
        results.append(samples)
    executor.shutdown(wait=False, cancel_futures=True)


if __name__ == "__main__":
    asyncio.run(run())

    print("done!")

@riddell-stan
Copy link
Contributor

Thanks for the illustration. I'm still not entirely sure I understand why this isn't a problem with Python itself. Surely at some point Python is calling executor.shutdown(wait=False, cancel_futures=True), right?

Second quick query -- can't one just use a simpler change, something roughly like:

async def shutdown():
    httpstan/services_stub.executor.shutdown(...)

app.on_cleanup.append(shutdown)

@tncowart
Copy link
Author

Thanks for the illustration. I'm still not entirely sure I understand why this isn't a
problem with Python itself. Surely at some  point Python is calling 
executor.shutdown(wait=False, cancel_futures=True), right?

It's not. Python doesn't know when you're done using an executor unless you use a Context Manager, which isn't the case here. Therefore the executors must be shut down explicitly.

Second quick query -- can't one just use a simpler change, something roughly like:

async def shutdown():
    httpstan.services_stub.executor.shutdown(...)

app.on_cleanup.append(shutdown)

I just tried making only the following code changes to app.py and using the example script I wrote above

diff --git a/httpstan/app.py b/httpstan/app.py
index 65bdff4e..05cc84a5 100644
--- a/httpstan/app.py
+++ b/httpstan/app.py
@@ -8,6 +8,7 @@ import logging
 import aiohttp.web

 import httpstan.routes
+import httpstan.services_stub

 try:
     from uvloop import EventLoopPolicy
@@ -29,6 +30,10 @@ async def _warn_unfinished_operations(app: aiohttp.web.Application) -> None:
             logger.critical(f"Operation `{name}` cancelled before finishing.")


+async def shutdown_pool(app: aiohttp.web.Application):
+    httpstan.services_stub.executor.shutdown()
+
+
 def make_app() -> aiohttp.web.Application:
     """Assemble aiohttp Application.

@@ -42,4 +47,5 @@ def make_app() -> aiohttp.web.Application:
     # startup and shutdown tasks
     app["operations"] = {}
     app.on_cleanup.append(_warn_unfinished_operations)
+    app.on_cleanup.append(shutdown_pool)
     return app

And it doesn't work -- the script doesn't hang but the sampling never happens and there are a bunch of crash logs:

concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 262, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 214, in _process_chunk
    return [fn(*args) for args in chunk]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 214, in <listcomp>
    return [fn(*args) for args in chunk]
            ^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 60, in run_stan
    fit = posterior.sample(num_chains=4, num_samples=1000)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 89, in sample
    return self.hmc_nuts_diag_e_adapt(num_chains=num_chains, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 108, in hmc_nuts_diag_e_adapt
    return self._create_fit(function=function, num_chains=num_chains, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 313, in _create_fit
    return asyncio.run(go())
           ^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 236, in go
    raise RuntimeError(message)
RuntimeError: Exception during call to services function: `RuntimeError('cannot schedule new futures after shutdown')`, traceback: `['  File "/Users/tc325/Documents/httpstan/httpstan/services_stub.py", line 112, in call\n    future = asyncio.get_running_loop().run_in_executor(executor, lazy_function_wrapper_partial)  # type: ignore\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 826, in run_in_executor\n    executor.submit(func, *args), loop=self)\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 754, in submit\n    raise RuntimeError("cannot schedule new futures after shutdown")\n']`
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 78, in <module>
    asyncio.run(run())
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 72, in run
    for samples in executor.map(run_stan, stan_params):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 588, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 618, in result_iterator
    yield _result_or_cancel(fs.pop())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 318, in _result_or_cancel
    return fut.result(timeout)
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
RuntimeError: Exception during call to services function: `RuntimeError('cannot schedule new futures after shutdown')`, traceback: `['  File "/Users/tc325/Documents/httpstan/httpstan/services_stub.py", line 112, in call\n    future = asyncio.get_running_loop().run_in_executor(executor, lazy_function_wrapper_partial)  # type: ignore\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 826, in run_in_executor\n    executor.submit(func, *args), loop=self)\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 754, in submit\n    raise RuntimeError("cannot schedule new futures after shutdown")\n']`

@riddell-stan
Copy link
Contributor

I'd like to give this attention when I have some bandwidth available. I'm wary of adding anything to services_stub -- it's already a bit of a pain to understand what's going on.

I'd like to return to this once I figure out if we can build Apple Silicon wheels with the new Github runners. There are lots of people clamoring for them.

@riddell-stan
Copy link
Contributor

Thanks again for submitting such a careful patch. Sorry for the delay in reviewing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants