Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyodbc not releasing GIL #1404

Open
jasperboekel opened this issue Jan 31, 2025 · 0 comments
Open

pyodbc not releasing GIL #1404

jasperboekel opened this issue Jan 31, 2025 · 0 comments

Comments

@jasperboekel
Copy link

jasperboekel commented Jan 31, 2025

I came across this issue while I was building a FastAPI application while using aioodbc to fetch data from a sql server database. But I think it is actually fundamentally a pyodbc problem.

So from my understanding I/O operations should release the GIL and therefore parallelizing using multithreading should lead to speed gains. I will, however, provide code snippets that prove otherwise.

Python3.11
pyodbc==5.2.0
unixodbc 2.3.12
ODBC Driver 17 for SQL Server

import concurrent.futures
import datetime as dt
import os
from typing import AsyncIterator, Dict, List, Tuple, Union

import pyodbc
import structlog
import turbodbc

env = os.getenv("env")
log = structlog.stdlib.get_logger("test")

SQL_HOST = ""
SQL_USER = ""
SQL_PASSWORD = ""
SQL_DATABASE = ""


def execute_query(con_params: Dict[str, str], query: str) -> List[tuple]:
    """
    Executes a query on the database using pyodbc.
    Returns the results as a list of tuples.
    """
    connection_string = ";".join([f"{key}={value}" for key, value in con_params.items()])

    try:
        with pyodbc.connect(connection_string) as conn:
            with conn.cursor() as cursor:
                print("connection established")
                start = dt.datetime.now()
                cursor.execute(query)
                rows = cursor.fetchall()
                print(f"Query executed in {dt.datetime.now() - start}")
                return rows
    except Exception as e:
        print(f"Error executing query: {e}")
        return []


def fetch_trades(query):
    DATABASE_CONFIG = {
        "driver": "ODBC Driver 18 for SQL Server",
        "server": SQL_HOST,
        "database": SQL_DATABASE,
        "uid":  rf"{{{SQL_USER}}}",
        "pwd":  rf"{{{SQL_PASSWORD}}}",
        "TrustServerCertificate": "yes"
    }

    """Fetch trade data efficiently using `turbodbc`."""
    with turbodbc.connect(**DATABASE_CONFIG, use_async_io=True) as conn:
        cursor = conn.cursor()
        start = dt.datetime.now()
        cursor.execute(query)
        rows = cursor.fetchallnumpy()
        print(f"Time taken: {dt.datetime.now() - start}")
    return rows


def run_parallel_queries(con_params: Dict[str, str], query: str, num_queries: int = 5) -> List[List[tuple]]:
    """
    Executes multiple instances of the same query in parallel using ThreadPoolExecutor.
    Returns a list containing results from each query execution.
    """
    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_queries) as executor:
        future_to_query = {executor.submit(execute_query, con_params, query): i for i in range(num_queries)}
        # future_to_query = {executor.submit(fetch_trades, query): i for i in range(num_queries)}

        for future in concurrent.futures.as_completed(future_to_query):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"Error fetching result: {e}")

    return results


if __name__ == "__main__":
    con_params: Dict[str, str] = {
        "DRIVER": "{ODBC Driver 18 for SQL Server}",
        "Server": SQL_HOST,
        "Database": SQL_DATABASE,
        "UID": rf"{{{SQL_USER}}}",
        "PWD": rf"{{{SQL_PASSWORD}}}",
        "UseFMTONLY": "Yes",
    }
    con_params.update(
        {
            "Authentication": "ActiveDirectoryPassword",
            "TrustServerCertificate": "Yes",
            "ColumnEncryption": "Enabled",
            "Encrypt": "Yes",
            "ApplicationIntent": "ReadOnly",  # "ReadWrite"
        }
    )

    query = """
            DECLARE @num_rows INT = 10000; -- Change this to your desired number of rows

            WITH Numbers AS (
                SELECT 1 AS id
                UNION ALL
                SELECT id + 1 FROM Numbers WHERE id < @num_rows
            )
            SELECT
                id,
                DATEADD(DAY, id, '2025-01-01') AS delivery,
                CAST(ROUND(RAND(CHECKSUM(NEWID())) * 100, 2) AS DECIMAL(10,2)) AS value
            FROM Numbers
            OPTION (MAXRECURSION 10000); -- Prevents SQL Server's default recursion limit (100)
    """

    start = dt.datetime.now()
    results = run_parallel_queries(con_params, query, num_queries=5)
    for i, res in enumerate(results):
        print(f"Result set {i+1}: {len(res)} rows")

    print(f" total time: {dt.datetime.now() - start}")

The above code snippet uses multithreading.
Running this for num_queries = 5 gives me the following:

connection established
connection established
connection established
connection established
connection established
Query executed in 0:00:02.210422
Query executed in 0:00:02.210795
Query executed in 0:00:02.258706
Query executed in 0:00:02.263452
Query executed in 0:00:02.240482
Result set 1: 10000 rows
Result set 2: 10000 rows
Result set 3: 10000 rows
Result set 4: 10000 rows
Result set 5: 10000 rows
total time: 0:00:03.798965

Whereas running this for num_queries = 1 gives me:

connection established
Query executed in 0:00:00.138177
Result set 1: 10000 rows
total time: 0:00:01.669335

The query time for 5 queries is more than 5 times the query time of 1 query

Furthermore, the following code snippet:

import datetime as dt
import multiprocessing as mp
import os
from typing import Dict, List

import pyodbc

SQL_HOST = ""
SQL_USER = ""
SQL_PASSWORD = ""
SQL_DATABASE = ""


def execute_query(con_params: Dict[str, str], query: str) -> List[tuple]:
    """
    Executes a query on the database using pyodbc in a separate process.
    Returns the results as a list of tuples.
    """
    connection_string = ";".join([f"{key}={value}" for key, value in con_params.items()])

    try:
        with pyodbc.connect(connection_string) as conn:
            with conn.cursor() as cursor:
                print(f"Process {os.getpid()}: Connection established")
                start = dt.datetime.now()
                cursor.execute(query)
                rows = cursor.fetchall()
                elapsed = dt.datetime.now() - start
                print(f"Process {os.getpid()}: Query executed in {elapsed.total_seconds()} seconds")
                return rows
    except Exception as e:
        print(f"Process {os.getpid()}: Error executing query: {e}")
        return []


def run_parallel_queries(con_params: Dict[str, str], query: str, num_queries: int = 5) -> List[List[tuple]]:
    """
    Executes multiple instances of the same query in parallel using multiprocessing.
    Returns a list containing results from each query execution.
    """
    start_time = dt.datetime.now()

    # Use multiprocessing Pool to spawn parallel processes
    with mp.Pool(processes=num_queries) as pool:
        results = pool.starmap(execute_query, [(con_params, query)] * num_queries)

    total_time = dt.datetime.now() - start_time
    print(f"Total execution time: {total_time.total_seconds()} seconds")

    return results


# Example usage
if __name__ == "__main__":
    con_params: Dict[str, str] = {
        "DRIVER": "{ODBC Driver 18 for SQL Server}",
        "Server": SQL_HOST,
        "Database": SQL_DATABASE,
        "UID": rf"{{{SQL_USER}}}",
        "PWD": rf"{{{SQL_PASSWORD}}}",
        "UseFMTONLY": "Yes",
    }
   con_params.update(
          {
              "Authentication": "ActiveDirectoryPassword",
              "TrustServerCertificate": "Yes",
              "ApplicationIntent": "ReadOnly",  # "ReadWrite"
          }
      )

    query = """
            DECLARE @num_rows INT = 10000; -- Change this to your desired number of rows

            WITH Numbers AS (
                SELECT 1 AS id
                UNION ALL
                SELECT id + 1 FROM Numbers WHERE id < @num_rows
            )
            SELECT
                id,
                DATEADD(DAY, id, '2025-01-01') AS delivery,
                CAST(ROUND(RAND(CHECKSUM(NEWID())) * 100, 2) AS DECIMAL(10,2)) AS value
            FROM Numbers
            OPTION (MAXRECURSION 10000); -- Prevents SQL Server's default recursion limit (100)
    """

    results = run_parallel_queries(con_params, query, num_queries=5)

    for i, res in enumerate(results):
        print(f"Result set {i+1}: {len(res)} rows")

The above code snippet implements multiprocessing and.
Prints this:

Process 595651: Connection established
Process 595653: Connection established
Process 595654: Connection established
Process 595655: Connection established
Process 595652: Connection established
Process 595654: Query executed in 0.171187 seconds
Process 595651: Query executed in 0.222585 seconds
Process 595652: Query executed in 0.163341 seconds
Process 595653: Query executed in 0.228181 seconds
Process 595655: Query executed in 0.189986 seconds
Total execution time: 1.703746 seconds
Result set 1: 10000 rows
Result set 2: 10000 rows
Result set 3: 10000 rows
Result set 4: 10000 rows
Result set 5: 10000 rows

The output I would expect under multithreading ..

EDIT:
All the above was executed on WSL, when I run the code on windows however, I do get the parallelism I would expect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant