Skip to content

Commit

Permalink
add an extra lock for insert and delete on the cache database
Browse files Browse the repository at this point in the history
  • Loading branch information
tebben committed Feb 28, 2024
1 parent f996084 commit 20c15ef
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions ctod/core/factory/factory_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, cache_path: str, db_name: str = "factory_cache.db", in_memory
self.batch_processing = False
self.batch_rerun = False
self.lock = asyncio.Lock()
self.db_lock = asyncio.Lock()

async def initialize(self):
await self._create_table()
Expand Down Expand Up @@ -108,16 +109,17 @@ async def _add_batched(self) -> None:

if batch_copy:
try:
async with aiosqlite.connect(self.db_name) as db:
for key, value in batch_copy.items():
await db.execute(
"""
INSERT OR REPLACE INTO cache (key, value, timestamp)
VALUES (?, ?, ?)
""",
(key, pickle.dumps(value), time.time()),
)
await db.commit()
async with self.db_lock:
async with aiosqlite.connect(self.db_name) as db:
for key, value in batch_copy.items():
await db.execute(
"""
INSERT OR REPLACE INTO cache (key, value, timestamp)
VALUES (?, ?, ?)
""",
(key, pickle.dumps(value), time.time()),
)
await db.commit()
except aiosqlite.Error as e:
logging.error(f"Error adding to factory cache: {e}")

Expand Down Expand Up @@ -163,11 +165,12 @@ async def get(self, keys: List[str]) -> Dict[str, Any]:

async def clear_expired(self, keys_to_keep: List[str]) -> None:
try:
async with aiosqlite.connect(self.db_name) as db:
placeholders = ', '.join('?' for _ in keys_to_keep)
query = f"DELETE FROM cache WHERE (strftime('%s', 'now') - timestamp) > ? AND key NOT IN ({placeholders})"
await db.execute(query, (self.ttl, *keys_to_keep))
await db.commit()
async with self.db_lock:
async with aiosqlite.connect(self.db_name) as db:
placeholders = ', '.join('?' for _ in keys_to_keep)
query = f"DELETE FROM cache WHERE (strftime('%s', 'now') - timestamp) > ? AND key NOT IN ({placeholders})"
await db.execute(query, (self.ttl, *keys_to_keep))
await db.commit()

except aiosqlite.Error as e:
logging.error(f"Error clearing expired items from factory cache: {e}")
Expand Down

0 comments on commit 20c15ef

Please sign in to comment.