Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RunTimeError: got Future <Future pending> attached to a different loop when using custom loop in sync fixtures when upgrading from 0.14.2 to 0.15.0 #1315

Open
2 tasks done
sevaho opened this issue Oct 17, 2021 · 29 comments
Labels
testclient TestClient-related

Comments

@sevaho
Copy link

sevaho commented Oct 17, 2021

Checklist

  • The bug is reproducible against the latest release and/or master.
  • There are no similar issues or pull requests to fix it yet.

Describe the bug

Upgrading starlette>=0.15.0 breaks current testing strategy. The setup is mocking a nats subscription by actually using the nats server.
The code works with starlette 0.14.2, upgradign to 0.15.0 gives RunTumeError got Future <Future pending> attached to a different loop . When upgrading to starlette 0.16.0 it gives TimeOut errors. I would love to keep tests sync.

To reproduce

requirements.txt

starlette
requests
pytest
asyncio-nats-client

code

from starlette.routing import Route
from starlette.testclient import TestClient
import pytest
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
import asyncio


from nats.aio.client import Client as NATS

"""
Test work with starlette 0.14.2
Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
Error with starlette 0.16.0: Nats timeout

The test is that the client code makes a nats request to a mocked nats service over nats itself.

Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
"""

HOST_NATS = "localhost:4222"


# =======================================================================
# CODE
# =======================================================================


def create_app():
    async def index(request):
        r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
        return PlainTextResponse(content=r.data.decode())

    async def setup() -> None:
        await app.state.nc.connect(HOST_NATS)
        print("Connected to nats")

    app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
    app.state.nc: NATS = NATS()

    return app


app = create_app()


# =======================================================================
# MOCKS & TESTS
# =======================================================================


class NatsServiceMock:
    def __init__(self) -> None:
        self.nc: NATS = NATS()

    async def lifespan(self) -> None:
        await self.nc.connect(HOST_NATS)
        await self.nc.subscribe("subject1", cb=self.handle)

    async def handle(self, msg):
        await self.nc.publish(msg.reply, b"PONG")

    def __enter__(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.lifespan())
        return self

    def __exit__(self, *args) -> None:
        pass


@pytest.fixture(scope="session")
def nats_service() -> None:
    with NatsServiceMock() as nc:
        yield nc


@pytest.fixture(scope="session")
def test_app(nats_service) -> None:
    with TestClient(create_app()) as client:
        yield client


# =======================================================================
# TESTS
# =======================================================================


def test_index_should_give_a_succesful_response(test_app):
    r = test_app.get("/")
    assert r.status_code == 200
    assert r.text == "PONG"

Run:

pytest <file>

Expected behavior

The test to work.

Actual behavior

Test does not work.

Debugging material

output running with starlette 0.15.0:

        try:
            # wait until the future completes or the timeout
            try:
>               await waiter
E               RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/from_thread.py:177> cb=[TaskGroup._spawn.<locals>.task_done() at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:622]> got Future <Future pending> attached to a different loop

/usr/lib/python3.8/asyncio/tasks.py:481: RuntimeError

output when running with starlette 0.16.0:

        # Wait for the response or give up on timeout.
        try:
            msg = await asyncio.wait_for(future, timeout, loop=self._loop)
            return msg
        except asyncio.TimeoutError:
            del self._resp_map[token.decode()]
            future.cancel()
>           raise ErrTimeout
E           nats.aio.errors.ErrTimeout: nats: Timeout

/home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/nats/aio/client.py:945: ErrTimeout

Environment

  • OS: Linux
  • Python version: 3.8
  • Starlette version: 0.14.2 / 0.15.0 / 0.16.0

Additional context

Important

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.
Fund with Polar
@sevaho sevaho changed the title RunTimeError: got Future <Future pending> attached to a different loop when using custom loop in sync fixtures RunTimeError: got Future <Future pending> attached to a different loop when using custom loop in sync fixtures when upgrading from 0.14.2 to 0.15.0 Oct 17, 2021
@voglster
Copy link

voglster commented Nov 3, 2021

I get the same issue with FastAPI (based on starlette) and mongo motor client. Just downgraded to 14.2 and fixed the problem as well.

@Mityuha
Copy link

Mityuha commented Nov 10, 2021

I've got similar issue in tests with FastAPI (0.68.2 --> 0.69.0 which means Starlette upgrading from 0.14.2 to 0.15.0) and databases library (based on asyncpg):

...
[stacktrace here]
...
asyncpg/protocol/protocol.pyx:323: in query
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

asyncpg/protocol/protocol.pyx:707: InterfaceError

With FastAPI 0.68.2 (Starlette 0.14.2) works like a charm.

@MatthewScholefield
Copy link

MatthewScholefield commented Nov 27, 2021

I think the problem is that TestClient switched to using AnyIO and in that switch now unconditionally creates its own event loop in TestClient.__enter__ via anyio.start_blocking_portal which means you can't run asyncio.get_event_loop() until you create the test client.

In my case it poses a serious problem because within the definition of my app, I instantiate AsyncIOMotorClient and some other things, one of which seems to attach to the current event loop.

@aminalaee
Copy link
Member

Shouldn't this be moved to the anyio repo?

@MatthewScholefield
Copy link

MatthewScholefield commented Nov 27, 2021

@sevaho Can you test if this version works without the error?

@aminalaee No, I don't think it's related to anyio at all; I believe the problem is related to a change in behavior caused by Starlette's switch to anyio.

Basically, I could be wrong but my initial understanding is:

  • Starlette used to do loop = asyncio.get_event_loop() in TestClient which would use a default event loop if one didn't exist or use an existing one if it did
  • When Starlette changed to AnyIO, TestClient now uses anyio.start_blocking_portal to get the event loop which means it will always create a new event loop

This means that if you create any objects that bind to the default event loop before calling TestClient.__enter__, you will probably get strange asyncio errors.

Does this sound like an accurate description of the situation? Asking because this is how I understand it, but I'm not 100% sure I understand anyio.start_blocking_portal because from the docs it doesn't explicitly indicate that it creates a new event loop.

@sevaho
Copy link
Author

sevaho commented Nov 29, 2021

@MatthewScholefield another error:

    @pytest.fixture(scope="session")
    def test_app() -> TestClient:
>       with TestClient(create_app()) as client:

test.py:78: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/starlette/testclient.py:509: in __enter__
    self.portal = portal = stack.enter_context(
/usr/lib/python3.9/contextlib.py:448: in enter_context
    result = _cm_type.__enter__(cm)
/usr/lib/python3.9/contextlib.py:119: in __enter__
    return next(self.gen)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/from_thread.py:416: in start_blocking_portal
    run_future.result()
/usr/lib/python3.9/concurrent/futures/_base.py:438: in result
    return self.__get_result()
/usr/lib/python3.9/concurrent/futures/_base.py:390: in __get_result
    raise self._exception
/usr/lib/python3.9/concurrent/futures/thread.py:52: in run
    result = self.fn(*self.args, **self.kwargs)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56: in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233: in run
    return native_run(wrapper(), debug=debug)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/nest_asyncio.py:30: in run
    loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x7f09c55478e0>

    def get_event_loop(self):
        """Get the event loop for the current context.
    
        Returns an instance of EventLoop or raises an exception.
        """
        if (self._local._loop is None and
                not self._local._set_called and
                threading.current_thread() is threading.main_thread()):
            self.set_event_loop(self.new_event_loop())
    
        if self._local._loop is None:
>           raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
E           RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

/usr/lib/python3.9/asyncio/events.py:642: RuntimeError
======================================================================================================================================================================================================= short test summary info =======================================================================================================================================================================================================
ERROR test.py::test_index_should_give_a_succesful_response - RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

Tested with starlette 0.15.0 and 0.16.0 and 0.17.1.

@Hazzari
Copy link

Hazzari commented Dec 14, 2021

Also faced with this problem.
It seems to me that the problem is that the reference to the database is in a separate event loop, which closes before the test receives data.

Described part of my project:

python = "^3.9"
fastapi = "^0.70.1"
uvicorn = "^0.16.0"
environs = "^9.3.5"
odmantic = "^0.3.5"

pytest = "^6.2.5"
httpx = "^0.21.1"
requests = "^2.26.0"
mongomock = "^3.23.0"

main.py

from fastapi import FastAPI
from app.api.api_v1.api import router as api_router

app = FastAPI()
app.include_router(api_router)

apy/api.py

from fastapi import APIRouter
from app.api.api_v1.endpoints.employee import router as user_router

router = APIRouter()
router.include_router(user_router)

api/api_v1/endpoints/employee.py

from fastapi import APIRouter
from app.crud.employee import EmployeeCRUD

router = APIRouter()


@router.get("/employee/count/", response_model=int, tags=["Data"])
async def count_employees():
    return await EmployeeCRUD.count_employees()

crud/employee.py

from dataclasses import dataclass

from fastapi import APIRouter

from app.database.database import engine
from app.model.employee import Employee

router = APIRouter()


@dataclass
class EmployeeCRUD:
    model = Employee

    @classmethod
    async def count_employees(cls):
        return await engine.count(cls.model)

database/database.py

from motor.motor_asyncio import AsyncIOMotorClient
from odmantic import AIOEngine

from app.core.config import MONGO_DB_URL

client = AsyncIOMotorClient(MONGO_DB_URL)
engine = AIOEngine(motor_client=client)

test

from fastapi.testclient import TestClient
from app.main import app


def test_ping():
    client = TestClient(app)
    response = client.get('/api/employee/count/')
    assert response.status_code == 200
================================================= test session starts ==================================================
platform darwin -- Python 3.10.1, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
rootdir: /Users/aleksan/Code/employees-app — копия
plugins: anyio-3.4.0
collected 1 item                                                                                                       

tests/test_api.py F                                                                                              [100%]

======================================================= FAILURES =======================================================
______________________________________________________ test_count ______________________________________________________

    def test_count():
        client = TestClient(app)
>       response = client.get('/api/employee/count/')

tests/test_api.py:7: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:555: in get
    return self.request('GET', url, **kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:468: in request
    return super().request(
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:542: in request
    resp = self.send(prep, **send_kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:655: in send
    r = adapter.send(request, **kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:266: in send
    raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:263: in send
    portal.call(self.app, scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:240: in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py:445: in result
    return self.__get_result()
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py:390: in __get_result
    raise self._exception
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:187: in _call_func
    retval = await retval
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/applications.py:208: in __call__
    await super().__call__(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/applications.py:112: in __call__
    await self.middleware_stack(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py:181: in __call__
    raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py:159: in __call__
    await self.app(scope, receive, _send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/cors.py:84: in __call__
    await self.app(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/exceptions.py:82: in __call__
    raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/exceptions.py:71: in __call__
    await self.app(scope, receive, sender)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:656: in __call__
    await route.handle(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:259: in handle
    await self.app(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:61: in app
    response = await func(request)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/routing.py:226: in app
    raw_response = await run_endpoint_function(
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/routing.py:159: in run_endpoint_function
    return await dependant.call(**values)
app/api/api_v1/endpoints/employee.py:24: in count_employees
    return await EmployeeCRUD.count_employees()
app/crud/employee.py:28: in count_employees
    count = await engine.count(cls.model)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <odmantic.engine.AIOEngine object at 0x110145180>, model = <class 'app.model.employee.Employee'>, queries = ()
query = QueryExpression()
collection = AsyncIOMotorCollection(Collection(Database(MongoClient(host=['mongo:27017'], document_class=dict, tz_aware=False, connect=False, driver=DriverInfo(name='Motor', version='2.3.1', platform='asyncio')), 'test'), 'employee'))

    async def count(
        self, model: Type[ModelType], *queries: Union[QueryExpression, Dict, bool]
    ) -> int:
        """Get the count of documents matching a query
    
        Args:
            model: model to perform the operation on
            queries: query filters to apply
    
        Returns:
            number of document matching the query
    
        <!---
        #noqa: DAR401 TypeError
        -->
        """
        if not lenient_issubclass(model, Model):
            raise TypeError("Can only call count with a Model class")
        query = AIOEngine._build_query(*queries)
        collection = self.database[model.__collection__]
>       count = await collection.count_documents(query)
E       RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:187> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:629]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/futures.py:384]> attached to a different loop

../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/odmantic/engine.py:417: RuntimeError
=================================================== warnings summary ===================================================
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/marshmallow/__init__.py:14
  /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/marshmallow/__init__.py:14: DeprecationWarning: The distutils package is deprecated and slated for removal in Python 3.12. Use setuptools or check PEP 632 for potential alternatives
    from distutils.version import LooseVersion

../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/motor/frameworks/asyncio/__init__.py:42
  /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/motor/frameworks/asyncio/__init__.py:42: DeprecationWarning: There is no current event loop
    return asyncio.get_event_loop()

-- Docs: https://docs.pytest.org/en/stable/warnings.html
=============================================== short test summary info ================================================
FAILED tests/test_api.py::test_count - RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_...
============================================ 1 failed, 2 warnings in 0.26s =============================================

@aminalaee
Copy link
Member

@Hazzari have you tried setting the loop on Motor client?

client = AsyncIOMotorClient()
client.get_io_loop = asyncio.get_event_loop
engine = AIOEngine(motor_client=client)

@Hazzari
Copy link

Hazzari commented Dec 15, 2021

@Hazzari have you tried setting the loop on Motor client?

client = AsyncIOMotorClient()
client.get_io_loop = asyncio.get_event_loop
engine = AIOEngine(motor_client=client)

Thank you so much! That solved the problem!
Asynchrony is not my strongest suit yet!

@LuisLuii
Copy link

LuisLuii commented Jan 8, 2022

I've got similar issue in tests with FastAPI (0.68.2 --> 0.69.0 which means Starlette upgrading from 0.14.2 to 0.15.0) and databases library (based on asyncpg):

...
[stacktrace here]
...
asyncpg/protocol/protocol.pyx:323: in query
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

asyncpg/protocol/protocol.pyx:707: InterfaceError

With FastAPI 0.68.2 (Starlette 0.14.2) works like a charm.

I have the same issue about this with using SQLAlchemy async connect
I change the code from

engine = create_async_engine(TEST_DATABASE_URL, echo=True, future=True)

async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session

to

async def get_session() -> AsyncSession:
    engine = create_async_engine(TEST_DATABASE_URL, echo=True, future=True)

    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    async with async_session() as session:
        yield session

to fix the problem, but obviously not a solution

@DavidVentura
Copy link
Contributor

Here's a very minimal reproduction:

import pytest
import asyncio
from starlette.responses import JSONResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def fn(request):
    print('in endpoint', id(asyncio.get_running_loop()))
    return JSONResponse({})

def make_app():
    app = Starlette(routes=[Route("/", endpoint=fn)])
    return app

#######################

from starlette.testclient import TestClient

pytestmark = pytest.mark.anyio

@pytest.fixture
def anyio_backend():
    return 'asyncio'

@pytest.fixture
async def tclient():
    with TestClient(app=make_app()) as c:
        yield c

@pytest.mark.anyio
async def test_bug(tclient):
    print('in test', id(asyncio.get_running_loop()))
    print(tclient.get('/'))

when running this under pytest, you will see two lines printed with event-loop ids. My expectation is that the IDs would be the same, but they are not.

This makes testing impossible for my use-case -- create a Task in a "request context" (when a request comes in, create_task and kick it to the event loop for scheduling); then asyncio.wait_for it during tests, to assert the result of execution.

@glenbot
Copy link

glenbot commented Mar 3, 2022

I'm not using Mongo so for now my only solution was to downgrade and pin fastapi to 0.68.2

@konstantin-stepanov
Copy link

How to reproduce with asyncio.Lock and ab

import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route


lock = asyncio.Lock()


async def homepage(request):
    async with lock:
        await asyncio.sleep(0.1)
    return JSONResponse({'hello': 'world'})


app = Starlette(debug=True, routes=[
    Route('/', homepage),
])

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
$ ab -n 100 -c 100  "http://0.0.0.0:8000/"

@akhilputhiry
Copy link

akhilputhiry commented May 5, 2022

I am also facing similar issue in my test cases

@cnc8
Copy link

cnc8 commented May 21, 2022

My solution for async SQLAlchemy + FastApi + Pytest:

in 'main.py'

@app.on_event('startup')
async def startup_event():
    await db.init()
    await db.create_all()


@app.on_event('shutdown')
async def shutdown_event():
    await db.close()

in db file

Base = declarative_base()


class Database:
    def __init__(self, url):
        self._session = None
        self._engine = None
        self.url = url

    async def create_all(self):
        async with self._engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

    def __getattr__(self, name) -> AsyncSession:
        return getattr(self._session, name)

    async def init(self):
        # closes connections if a session is created, 
        # so as not to create repeated connections
        if self._session:
            await self._session.close()

        self._engine = create_async_engine(self.url, future=True)
        self._session = sessionmaker(
            self._engine,
            expire_on_commit=False,
            class_=AsyncSession
        )()


db: AsyncSession | Database = Database(settings.db_url())

and client object in "test.py"

@pytest.fixture(scope="module")
def client() -> TestClient:
    with TestClient(app) as client:
        yield client


def test_get_files_without_params(client: TestClient):
    response = client.get("/api/v1/file")

@EdgyEdgemond
Copy link

EdgyEdgemond commented Jun 15, 2022

I am seeing similar issues, where I have my db_session fixture that ensures unittest changes are rolled back after every test, this requires the app to use the same session to be able to see the data loaded into the database.

With the change to anyio, I am yet to find a combination of things that will allow me to have the fixture db_session on the same loop as the TestClient.

The following worked prior to anyio.

server.py

def build_app():
    app = FastAPI()

    @app.on_event("startup")
    async def app_setup():
        app.state.database_pool = await asyncpg.create_pool(
            dsn=config.postgres_dsn,
            min_size=config.postgres_pool_min_size,
            max_size=config.postgres_pool_max_size,
            server_settings={
                "application_name": "tfx_backend_v{}".format(tfx_backend.__version__),
            },
        )

    @app.on_event("shutdown")
    async def app_shutdown():
        # cleanly shutdown the connections in the pool
        await app.state.database_pool.close()

    @app.middleware("http")
    async def middleware_asyncpg(request, call_next):
        # on request, inject a database transaction
        async with request.app.state.database_pool.acquire() as database_connection:
            async with database_connection.transaction():
                request.state.database_connection = database_connection
                return await call_next(request)
    return app

tests/conftest.py

@pytest.fixture(scope="session")
def engine(settings):
    backend = get_backend(settings.postgres_dsn)

    _upgrade(backend)

    yield

    # Rollback all migrations
    _downgrade(backend, count=None)


@pytest.fixture(scope="function", autouse=True)
async def db_session(engine, settings):
    conn = await asyncpg.connect(settings.postgres_dsn)
    tr = conn.transaction()
    await tr.start()
    try:
        yield conn
    except Exception:
        pass
    finally:
        await tr.rollback()

tests/views/conftest.py

import contextlib


class MockPool:
    def __init__(self, db_session):
        self.db_session = db_session

    @contextlib.asynccontextmanager
    async def acquire(self):
        yield self.db_session

    async def close(self):
        pass

# We don't want to build the app for every test, create a session level fixture for the app
@pytest.fixture(scope="session")
def app():
    return server.build_app()


# We do want to patch the db_session for every test, create a funcion level fixture for the client.
@pytest.fixture
def client(monkeypatch, db_session, app, tmp_path):
    mock_pool = MockPool(db_session)
    monkeypatch.setattr(server.asyncpg, "create_pool", AsyncMock(return_value=mock_pool))
    with TestClient(app=app, raise_server_exceptions=False, base_url="http://localhost") as client:
        yield client

This allowed me to populate the database, then make a request on the TestClient and see the same data.

If there was a way to access the loop that TestClient started up, I could create a new db_session for the client as asyncpg will accept a loop parameter, allowing me to run them on the same loop (hopefully). As long as the TestClient doesn't spawn new loops on requests.

@graingert
Copy link
Member

graingert commented Jun 15, 2022

If there was a way to access the loop that TestClient started up,

you can use:

@contextlib.asynccontextmanager
async def some_function(app):
    async with some_resource() as v:
        app.v = v
        yield
    
with TestClient(app=app) as tc, tc.portal.wrap_async_context_manager(some_function(app)):
    ...

@EdgyEdgemond
Copy link

EdgyEdgemond commented Jun 16, 2022

I have a feeling that has worked, so I now get the same error in other fixtures that use the db_session, as its now connected to the TestClient loop.

@contextlib.asynccontextmanager
async def session_factory(app):
    conn = await asyncpg.connect(app.state.config.postgres_dsn)
    tr = conn.transaction()
    await tr.start()
    try:
        app.state.database_pool = MockPool(conn)
        yield
    except Exception:
        pass
    finally:
        await tr.rollback()


# We don't want to build the app for every test, create a session level fixture for the app
@pytest.fixture(scope="session")
def app():
    return server.build_app()


# We do want to patch the db_session for every test, create a funcion level fixture for the client.
@pytest.fixture
def client(engine, app):
    with TestClient(
        app=app,
        raise_server_exceptions=False,
        base_url="http://localhost",
    ) as client, client.portal.wrap_async_context_manager(session_factory(app)):
        yield client


@pytest.fixture
def db_session(client):
    return client.app.state.database_pool.db_session

The RuntimeError now raises from the database factory's creating users etc. Rather than from within the view when calling the TestClient.

Seems like I need to influence the loop that TestClient creates (like it used to attach to the test loop).

MRichards99 added a commit to ral-facilities/operationsgateway-api that referenced this issue Jun 20, 2022
- Using this version of FastAPI gives us Starlette 0.14.2 which fixes a bug we faced
- Bug described at encode/starlette#1315
- Downgrading FastAPI means `UploadFile` is no longer supported. I've reverted to `File()` as an alternative solution, although `UploadFile` is probably easier to use, and I'd reimplement it in that way if we could upgrade in the future
@ivanychev
Copy link

ivanychev commented Jun 26, 2022

I have faced the same issues in my unit tests. I fixed the issue by using a context manager for TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) 😞

Doesn't work:

@pytest.fixture(scope="session")
def client() -> TestClient:
    return TestClient(app)

Works:

@pytest.fixture(scope="session")
def client() -> TestClient:
    with TestClient(app) as c:
        yield c

@adriangb
Copy link
Member

Any time I am doing async stuff outside of my app I'll just switch to httpx so that I don't have to deal with juggling event looks. I realize it requires rewriting of tests so it may not be a great solution for existing tests that were broken, but it might be a good idea for new tests.

@Kludex
Copy link
Member

Kludex commented Jun 26, 2022

I have faced the same issues in my unit tests. I fixed the issue by using a context manager for TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) 😞

Doesn't work:

@pytest.fixture(scope="session")
def client() -> TestClient:
    return TestClient(app)

Works:

@pytest.fixture(scope="session")
def client() -> TestClient:
    with TestClient(app) as c:
        yield c

The latter calls the startup event, while the former doesn't.

@gitkeerthi
Copy link

Same issue. Had to address by pinning Fastapi to 0.68.2

@Kludex Kludex added the testclient TestClient-related label Jul 31, 2022
@nlyn
Copy link

nlyn commented Aug 18, 2022

I have faced the same issues in my unit tests. I fixed the issue by using a context manager for TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) 😞

Doesn't work:

@pytest.fixture(scope="session")
def client() -> TestClient:
    return TestClient(app)

Works:

@pytest.fixture(scope="session")
def client() -> TestClient:
    with TestClient(app) as c:
        yield c

I continue to see the error even after making this change. Could you please share an example of how you're using that client fixture, @ivanychev ?

@graingert
Copy link
Member

FastAPI needs context manager support for lifespan

@amacfie
Copy link

amacfie commented Oct 14, 2022

For testing FastAPI with SQLAlchemy, one option is overriding the engine to use a NullPool (https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#using-multiple-asyncio-event-loops).

@utkarshgupta137
Copy link

How to reproduce with asyncio.Lock and ab

import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route


lock = asyncio.Lock()


async def homepage(request):
    async with lock:
        await asyncio.sleep(0.1)
    return JSONResponse({'hello': 'world'})


app = Starlette(debug=True, routes=[
    Route('/', homepage),
])

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
$ ab -n 100 -c 100  "http://0.0.0.0:8000/"

Thanks for this! This helped me figure out that the issue is creating asyncio.Lock or similar objects at the module level (so before the event loop is created). I've created a fix for the redis package: redis/redis-py#2471. I think this will have to be solved at the package level, or users should not define the connections from the offending packages at the module level.

@wwnbb
Copy link

wwnbb commented Jul 4, 2023

Still actual in 2023, if you encountered with the same issue, here is workaround that helped me:

first: set up asgi-lifespan https://pypi.org/project/asgi-lifespan/

If you have any connection initialization actions in fixtures with session level make sure you override event_loop fixture.

@pytest.fixture(scope="session")
def event_loop():
    """Overrides pytest default function scoped event loop"""
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()

otherwise you will receive ScopeMismatch error.

Startup your test client with LifespanManager:

@pytest.fixture(scope='session')
async def client(app):
    """Async http client for FastAPI application, ASGI init signals handled by
    LifespanManager.
    """
    async with LifespanManager(app, startup_timeout=100, shutdown_timeout=100):
        base_chars = ''.join(random.choices(string.ascii_uppercase, k=4))
        async with AsyncClient(app=app, base_url=f"http://{base_chars}") as ac:
            yield ac

@Kludex
Copy link
Member

Kludex commented Dec 24, 2023

This comment shows the issue: #1315 (comment)

The event loop ID is different on both prints.

@akhilputhiry
Copy link

My 2 cents - https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-multiple-asyncio-event-loops

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
testclient TestClient-related
Projects
None yet
Development

No branches or pull requests