Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task hangs when another exception is raised when options.waiter is cancelled. #634

Open
grvstick opened this issue Feb 23, 2024 · 3 comments

Comments

@grvstick
Copy link

Asyncssh 2.14.1
Linux mint 21.3 (ubuntu jammy)
Python 3.12.0

The issue is reproducable using the following code

import asyncio
from traceback import print_exc
from typing import TypedDict

import asyncssh



ops_config = {
 #config dict
}


class SshCred(TypedDict):
    host: str
    username: str
    password: str

class Config(TypedDict):
    ssh: SshCred

QUERY = "Some Query"


class ErrorCase:
    def __init__(
        self, config: Config
    ) -> None:
        cfg: Config = config

        self._ssh_cred: SshCred = cfg["ssh"]
        self.task_q: asyncio.Queue[tuple[str, asyncio.Queue]] = asyncio.Queue()
        self.terminate = asyncio.Event()

    async def query_task(self):
        listener: None | asyncssh.SSHListener
        try:
            async with asyncssh.connect(**self._ssh_cred) as conn:
                listener = await conn.forward_local_port("", 3306, "localhost", 3306)
                raise ZeroDivisionError
        finally:
            if listener is not None:
                listener.close()
                await listener.wait_closed()

    async def query(self, query: str):
        loop = asyncio.get_running_loop()
        resp_q = asyncio.Queue()

        await self.task_q.put((query, resp_q))

        try:
            async with asyncio.timeout(5) as cm:
                await self.task_q.join()
                cm.reschedule(loop.time() + 1)
                return await resp_q.get()
        except TimeoutError:
            return

    async def main_task(self):
        while not self.terminate.is_set():
            try:
                await self.query_task()
            except ZeroDivisionError:
                pass
            except Exception:
                print_exc()
            finally:
                await asyncio.sleep(3)


async def main():
    db = ErrorCase(ops_config)
    task = asyncio.create_task(db.main_task())
    for _ in range(3):
        print(await db.query(QUERY))

    # db.terminate.set()
    task.cancel()
    await task

asyncio.run(main())

Running this code results in this traceback. Which is not odd, but it hangs there indefinitely.

None
None
None
Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
    async with asyncssh.connect(**self._ssh_cred) as conn:
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
    self._coro_result = await self._coro
                        ^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
    return await asyncio.wait_for(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
    await options.waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
    await self.query_task()
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
    if listener is not None:
       ^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value
^CTraceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
    async with asyncssh.connect(**self._ssh_cred) as conn:
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
    self._coro_result = await self._coro
                        ^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
    return await asyncio.wait_for(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
    await options.waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
    await self.query_task()
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
    if listener is not None:
       ^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value

When I ran the debugger, the main_task is not cancelled, and still runs the loop.

@ronf
Copy link
Owner

ronf commented Feb 23, 2024

I'm not sure this is an AsyncSSH problem.

Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check if self.terminate is set, but it won't be since you have that commented out.

As for the secondary error, that's because you are successfully cancelling the asyncssh.connect() call in query_task, and that causes it to attempt to run the "finally" block which references listener, but listener is only assigned in the case where the asyncssh.connect() succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to be None prior to the "try".

I haven't tested it, but if you put in an except CancelledError in main_task which raises the error to prevent the except Exception from capturing it, I think it could fix the problem. Alternately, add a raise after print_exc() in the except Exception block, or explicitly break out of the loop when you see a CancelledError.

@grvstick
Copy link
Author

grvstick commented Feb 23, 2024

Thanks for the swift reply, here are a few comments

Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check if self.terminate is set, but it won't be since you have that commented out.

From what I know, this used to be the early behavior of Exception class, and now asyncio.CancelledError is categorized differently. To add up, changing the main_task like this does not change the behavior.

    async def main_task(self):
        while not self.terminate.is_set():
            try:
                await self.query_task()
            except ZeroDivisionError:
                pass
            except asyncio.CancelledError:
                raise
            except Exception:
                print_exc()
            finally:
                await asyncio.sleep(3)

As for the secondary error, that's because you are successfully cancelling the asyncssh.connect() call in query_task, and that causes it to attempt to run the "finally" block which references listener, but listener is only assigned in the case where the asyncssh.connect() succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to be None prior to the "try".

Omitting listener initialization was my mistake in the beginning, and I fixed it in my main code that I'm using. I left it there because it could reproduce the problem. Even with the unbound local variable mistake, the task should be able to cancel when it was asked to cancel, or at least that is what I think.

@ronf
Copy link
Owner

ronf commented Jul 3, 2024

Sorry - it looks like I missed your last post here. Were you able to resolve this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants