Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions aioodbc/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(
self._executor = executor
self._loop = asyncio.get_event_loop()
self._conn = None
# Add lock to synchronize cursor operations
self._cursor_lock = asyncio.Lock()

self._timeout = timeout
self._last_usage = self._loop.time()
Expand Down Expand Up @@ -120,10 +122,11 @@ def echo(self) -> bool:
return self._echo

async def _cursor(self):
c = await self._execute(self._conn.cursor)
self._last_usage = self._loop.time()
connection = self
return Cursor(c, connection, echo=self._echo)
async with self._cursor_lock:
c = await self._execute(self._conn.cursor)
self._last_usage = self._loop.time()
connection = self
return Cursor(c, connection, echo=self._echo)

def cursor(self) -> _ContextManager:
return _ContextManager["Cursor"](
Expand Down Expand Up @@ -162,15 +165,16 @@ async def execute(self, sql: str, *args) -> Cursor:
:param args: tuple, arguments for construction of sql statement
"""
assert self._conn is not None # mypy
try:
_cursor = await self._execute(self._conn.execute, sql, *args)
connection = self
cursor = Cursor(_cursor, connection, echo=self._echo)
return cursor
except pyodbc.Error as e:
if _is_conn_close_error(e):
await self.close()
raise
async with self._cursor_lock:
try:
_cursor = await self._execute(self._conn.execute, sql, *args)
connection = self
cursor = Cursor(_cursor, connection, echo=self._echo)
return cursor
except pyodbc.Error as e:
if _is_conn_close_error(e):
await self.close()
raise

def getinfo(self, type_):
"""Returns general information about the driver and data source
Expand Down
7 changes: 5 additions & 2 deletions aioodbc/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ async def close(self):
"""
if self._conn is None:
return
await self._run_operation(self._impl.close)
self._conn = None

# Use the connection's cursor lock to ensure proper cleanup
async with self._conn._cursor_lock:
await self._run_operation(self._impl.close)
self._conn = None

async def execute(self, sql, *params):
"""Executes the given operation substituting any markers with
Expand Down
12 changes: 12 additions & 0 deletions aioodbc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ def _is_conn_close_error(e: Exception) -> bool:
return False

sqlstate, msg = e.args[0], e.args[1]

# Handle multiple HY000 error messages
if sqlstate == "HY000":
error_msg = str(msg)
return (
error_msg.startswith(
"[HY000] server closed the connection unexpectedly"
)
or "Connection is busy with results for another command"
in error_msg
)

if sqlstate not in _CONN_CLOSE_ERRORS:
return False

Expand Down
69 changes: 69 additions & 0 deletions tests/test_connection_race_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio

import pytest

import aioodbc


@pytest.mark.parametrize("db", pytest.db_list)
@pytest.mark.asyncio
async def test_cursor_race_condition(conn):
"""Test that cursor operations don't cause race conditions."""

# This test simulates the scenario described in the issue:
# 1. Execute first query and get results
# 2. Let cursor go out of scope (garbage collection)
# 3. Execute second query immediately

# First query - cursor will go out of scope
result1 = await conn.execute("SELECT 42 as value")
rows1 = await result1.fetchall()
assert rows1[0][0] == 42

# Second query - should not cause "Connection is busy" error
result2 = await conn.execute("SELECT 100 as value")
rows2 = await result2.fetchall()
assert rows2[0][0] == 100


@pytest.mark.parametrize("db", pytest.db_list)
@pytest.mark.asyncio
async def test_concurrent_cursor_operations(conn):
"""Test concurrent cursor operations on the same connection."""

async def execute_query(query_id):
cursor = await conn.cursor()
await cursor.execute(f"SELECT {query_id} as value")
result = await cursor.fetchone()
await cursor.close()
return result[0]

# Execute multiple queries concurrently
tasks = [execute_query(i) for i in range(5)]
results = await asyncio.gather(*tasks)

# All queries should complete successfully
assert results == [0, 1, 2, 3, 4]


@pytest.mark.parametrize("db", pytest.db_list)
@pytest.mark.asyncio
async def test_cursor_cleanup_under_pressure(conn):
"""Test cursor cleanup under high pressure."""

async def execute_and_forget():
cursor = await conn.cursor()
await cursor.execute("SELECT 1")
await cursor.fetchone()
# Don't explicitly close - let it be cleaned up by GC

# Execute many queries without explicit cleanup
tasks = [execute_and_forget() for _ in range(10)]
await asyncio.gather(*tasks)

# Should still be able to execute a new query
cursor = await conn.cursor()
await cursor.execute("SELECT 999")
result = await cursor.fetchone()
await cursor.close()
assert result[0] == 999