Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check and reconnect method to Database class #466

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 69 additions & 57 deletions common/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,27 @@ def __init__(self, logger, dbname=None, user=None, password=None, host=None, por
if self.log is None:
self.log = logging

self.commit()
def reconnect(self, tries=3, wait=10):
"""
Reconnect to the database

:param int tries: Number of tries to reconnect
:param int wait: Time to wait between tries (first try is immediate)
"""
for i in range(tries):
try:
self.connection = psycopg2.connect(dbname=self.connection.info.dbname,
user=self.connection.info.user,
password=self.connection.info.password,
host=self.connection.info.host,
port=self.connection.info.port,
application_name=self.appname)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
time.sleep(wait)
self.log.error("Failed to reconnect to database after %d tries" % tries)

def query(self, query, replacements=None, cursor=None):
"""
Expand All @@ -63,25 +83,38 @@ def query(self, query, replacements=None, cursor=None):
if not cursor:
cursor = self.get_cursor()

self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))

return cursor.execute(query, replacements)
self.log.debug("Executing query %s" % cursor.mogrify(query, replacements))
try:
cursor.execute(query, replacements)
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
self.reconnect()
cursor = self.get_cursor()
cursor.execute(query, replacements)
return cursor

def execute(self, query, replacements=None):
def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True):
"""
Execute a query, and commit afterwards

This is required for UPDATE/INSERT/DELETE/etc to stick
:param string query: Query
:param cursor: Cursor to use. Default - use get_cursor method for common cursor
:param replacements: Replacement values
:param bool commit: Commit transaction after query?
:param bool close_cursor: Close cursor after query?
"""
cursor = self.get_cursor()
cursor = self.query(query, replacements, cursor)

self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
cursor.execute(query, replacements)
self.commit()
if commit:
self.commit()

cursor.close()
rowcount = cursor.rowcount

if close_cursor:
cursor.close()

return rowcount

def execute_many(self, query, commit=True, replacements=None):
"""
Expand All @@ -95,7 +128,14 @@ def execute_many(self, query, commit=True, replacements=None):
:param commit: Commit transaction after query?
"""
cursor = self.get_cursor()
execute_values(cursor, query, replacements)
try:
execute_values(cursor, query, replacements)
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
self.reconnect()
cursor = self.get_cursor()
execute_values(cursor, query, replacements)

cursor.close()
if commit:
self.commit()
Expand Down Expand Up @@ -128,16 +168,8 @@ def update(self, table, data, where=None, commit=True):

query = sql.SQL(query).format(*identifiers)

cursor = self.get_cursor()
self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
cursor.execute(query, replacements)

if commit:
self.commit()

result = cursor.rowcount
cursor.close()
return result
rowcount = self.execute(query, replacements=replacements, commit=commit)
return rowcount

def delete(self, table, where, commit=True):
"""
Expand All @@ -164,16 +196,8 @@ def delete(self, table, where, commit=True):
identifiers.insert(0, sql.Identifier(table))
query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)

cursor = self.get_cursor()
self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
cursor.execute(query, replacements)

if commit:
self.commit()

result = cursor.rowcount
cursor.close()
return result
rowcount = self.execute(query, replacements=replacements, commit=commit)
return rowcount

def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
"""
Expand Down Expand Up @@ -219,13 +243,9 @@ def insert(self, table, data, commit=True, safe=False, constraints=None, return_
replacements = (tuple(data.values()),)

cursor = self.get_cursor()
self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
cursor.execute(query, replacements)

if commit:
self.commit()
rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False)

result = cursor.rowcount if not return_field else cursor.fetchone()[return_field]
result = rowcount if not return_field else cursor.fetchone()[return_field]
cursor.close()
return result

Expand Down Expand Up @@ -265,16 +285,8 @@ def upsert(self, table, data, commit=True, constraints=None):
query = sql.SQL(protoquery).format(*identifiers)
replacements = (tuple(data.values()),)

cursor = self.get_cursor()
self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
cursor.execute(query, replacements)

if commit:
self.commit()

result = cursor.rowcount
cursor.close()
return result
rowcount = self.execute(query, replacements=replacements, commit=commit)
return rowcount

def fetchall(self, query, *args):
"""
Expand All @@ -285,9 +297,7 @@ def fetchall(self, query, *args):
:param commit: Commit transaction after query?
:return list: The result rows, as a list
"""
cursor = self.get_cursor()
self.log.debug("Executing query: %s" % cursor.mogrify(query, *args))
self.query(query, cursor=cursor, *args)
cursor = self.query(query, *args)

try:
result = cursor.fetchall()
Expand Down Expand Up @@ -316,8 +326,7 @@ def fetchone(self, query, *args):
:param commit: Commit transaction after query?
:return: The row, as a dictionary, or None if there were no rows
"""
cursor = self.get_cursor()
self.query(query, cursor=cursor, *args)
cursor = self.query(query, *args)

try:
result = cursor.fetchone()
Expand Down Expand Up @@ -359,12 +368,10 @@ def fetchall_interruptable(self, queue, query, *args):
pid = self.connection.get_backend_pid()
self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)

# make the query
# run the query
cursor = self.get_cursor()
self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args))

try:
self.query(query, cursor=cursor, *args)
cursor = self.query(query, cursor=cursor, *args)
except psycopg2.extensions.QueryCanceledError:
# interrupted with cancellation worker (or manually)
self.log.debug("Query in connection %s was interrupted..." % self.appname)
Expand Down Expand Up @@ -422,4 +429,9 @@ def get_cursor(self):

:return: Cursor
"""
return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try:
return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
self.reconnect()
return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
Loading