Edit on GitHub

common.lib.database

Database wrapper

  1"""
  2Database wrapper
  3"""
  4import itertools
  5import psycopg2.extras
  6import psycopg2
  7import logging
  8import time
  9
 10from psycopg2 import sql
 11from psycopg2.extras import execute_values
 12
 13from common.lib.exceptions import DatabaseQueryInterruptedException
 14
 15class Database:
 16	"""
 17	Simple database handler
 18
 19	Offers a number of abstraction methods that limit how much SQL one is
 20	required to write. Also makes the database connection mostly multithreading
 21	proof by instantiating a new cursor for each query (and closing it afterwards)
 22	"""
 23	cursor = None
 24	log = None
 25	appname=""
 26
 27	interrupted = False
 28	interruptable_timeout = 86400  # if a query takes this long, it should be cancelled. see also fetchall_interruptable()
 29	interruptable_job = None
 30
 31	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
 32		"""
 33		Set up database connection
 34
 35		:param logger:  Logger instance
 36		:param dbname:  Database name
 37		:param user:  Database username
 38		:param password:  Database password
 39		:param host:  Database server address
 40		:param port:  Database port
 41		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
 42		"""
 43		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
 44
 45		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
 46		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 47		self.log = logger
 48
 49		if self.log is None:
 50			self.log = logging
 51
 52	def reconnect(self, tries=3, wait=10):
 53		"""
 54		Reconnect to the database
 55
 56		:param int tries: Number of tries to reconnect
 57        :param int wait: Time to wait between tries (first try is immediate)
 58		"""
 59		for i in range(tries):
 60			try:
 61				self.connection = psycopg2.connect(dbname=self.connection.info.dbname,
 62												   user=self.connection.info.user,
 63												   password=self.connection.info.password,
 64												   host=self.connection.info.host,
 65												   port=self.connection.info.port,
 66												   application_name=self.appname)
 67				self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 68				return
 69			except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
 70				self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
 71				time.sleep(wait)
 72		self.log.error("Failed to reconnect to database after %d tries" % tries)
 73
 74	def _execute_query(self, query, replacements=None, cursor=None):
 75		"""
 76		Execute a query
 77
 78		Simple wrapper to get a cursor to execute a query with. Do not call
 79		from outside this class - use `execute()` instead.
 80
 81		:param string query: Query
 82		:param args: Replacement values
 83		:param cursor: Cursor to use. Default - use common cursor
 84		:return None:
 85		"""
 86		if not cursor:
 87			cursor = self.get_cursor()
 88
 89		self.log.debug("Executing query %s" % cursor.mogrify(query, replacements))
 90		try:
 91			cursor.execute(query, replacements)
 92		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
 93			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
 94			self.reconnect()
 95			cursor = self.get_cursor()
 96			cursor.execute(query, replacements)
 97		return cursor
 98
 99	def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True):
100		"""
101		Execute a query, and commit afterwards
102
103		This is required for UPDATE/INSERT/DELETE/etc to stick!
104
105		:param string query:  Query
106		:param cursor: Cursor to use. By default, use the result of
107		`get_cursor()`.
108		:param replacements: Replacement values
109		:param bool commit:  Commit transaction after query?
110		:param bool close_cursor:  Close cursor after query?
111		"""
112		cursor = self._execute_query(query, replacements, cursor)
113
114		if commit:
115			self.commit()
116
117		rowcount = cursor.rowcount
118
119		if close_cursor:
120			cursor.close()
121
122		return rowcount
123
124	def execute_many(self, query, commit=True, replacements=None):
125		"""
126		Execute a query multiple times, each time with different values
127
128		This makes it particularly suitable for INSERT queries, but other types
129		of query using VALUES are possible too.
130
131		:param string query:  Query
132		:param replacements: A list of replacement values
133		:param commit:  Commit transaction after query?
134		"""
135		cursor = self.get_cursor()
136		try:
137			execute_values(cursor, query, replacements)
138		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
139			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
140			self.reconnect()
141			cursor = self.get_cursor()
142			execute_values(cursor, query, replacements)
143
144		cursor.close()
145		if commit:
146			self.commit()
147
148	def update(self, table, data, where=None, commit=True):
149		"""
150		Update a database record
151
152		:param string table:  Table to update
153		:param dict data:  Data to set, Column => Value
154		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
155		:param bool commit:  Whether to commit after executing the query
156
157		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
158		"""
159		if where is None:
160			where = {}
161
162		# build query
163		identifiers = [sql.Identifier(column) for column in data.keys()]
164		identifiers.insert(0, sql.Identifier(table))
165		replacements = list(data.values())
166
167		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
168		if where:
169			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
170			for column in where.keys():
171				identifiers.append(sql.Identifier(column))
172				replacements.append(where[column])
173
174		query = sql.SQL(query).format(*identifiers)
175
176		rowcount = self.execute(query, replacements=replacements, commit=commit)
177		return rowcount
178
179	def delete(self, table, where, commit=True):
180		"""
181		Delete a database record
182
183		:param string table:  Table to delete from
184		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
185		:param bool commit:  Whether to commit after executing the query
186
187		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
188		"""
189		where_sql = []
190		replacements = []
191		for column in where.keys():
192			if type(where[column]) in (set, tuple, list):
193				where_sql.append("{} IN %s")
194				replacements.append(tuple(where[column]))
195			else:
196				where_sql.append("{} = %s")
197				replacements.append(where[column])
198
199		# build query
200		identifiers = [sql.Identifier(column) for column in where.keys()]
201		identifiers.insert(0, sql.Identifier(table))
202		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
203
204		rowcount = self.execute(query, replacements=replacements, commit=commit)
205		return rowcount
206
207	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
208		"""
209		Create database record
210
211		:param string table:  Table to insert record into
212		:param dict data:   Data to insert
213		:param bool commit: Whether to commit after executing the query
214		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
215						  insert the row and no error is thrown when the insert violates a unique index or other constraint
216		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
217								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
218		:param str return_field: If not empty or None, this makes the method
219		return this field of the inserted row, instead of the number of
220		affected rows, with `RETURNING`.
221		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
222		"""
223		if constraints is None:
224			constraints = []
225
226		# escape identifiers
227		identifiers = [sql.Identifier(column) for column in data.keys()]
228		identifiers.insert(0, sql.Identifier(table))
229
230		# construct ON NOTHING bit of query
231		if safe:
232			safe_bit = " ON CONFLICT "
233			if constraints:
234				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
235				identifiers.extend([sql.Identifier(column) for column in constraints])
236			safe_bit += " DO NOTHING"
237		else:
238			safe_bit = ""
239
240		# prepare parameter replacements
241		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
242
243		if return_field:
244			protoquery += " RETURNING {}"
245			identifiers.append(sql.Identifier(return_field))
246
247		query = sql.SQL(protoquery).format(*identifiers)
248		replacements = (tuple(data.values()),)
249
250		cursor = self.get_cursor()
251		rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False)
252
253		result = rowcount if not return_field else cursor.fetchone()[return_field]
254		cursor.close()
255		return result
256
257	def upsert(self, table, data, commit=True, constraints=None):
258		"""
259		Create or update database record
260
261		If the record could not be inserted because of a constraint, the
262		constraining record is updated instead.
263
264		:param string table:  Table to upsert record into
265		:param dict data:   Data to upsert
266		:param bool commit: Whether to commit after executing the query
267		:param tuple constraints: This tuple may contain the columns that should be used as a
268								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
269		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
270		"""
271		if constraints is None:
272			constraints = []
273
274		# escape identifiers
275		identifiers = [sql.Identifier(column) for column in data.keys()]
276		identifiers.insert(0, sql.Identifier(table))
277
278		# prepare parameter replacements
279		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
280		protoquery += " ON CONFLICT"
281
282		if constraints:
283			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
284			identifiers.extend([sql.Identifier(column) for column in constraints])
285
286		protoquery += " DO UPDATE SET "
287		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
288		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
289
290		query = sql.SQL(protoquery).format(*identifiers)
291		replacements = (tuple(data.values()),)
292
293		rowcount = self.execute(query, replacements=replacements, commit=commit)
294		return rowcount
295
296	def fetchall(self, query, *args):
297		"""
298		Fetch all rows for a query
299
300		:param string query:  Query
301		:param args: Replacement values
302		:param commit:  Commit transaction after query?
303		:return list: The result rows, as a list
304		"""
305		cursor = self._execute_query(query, *args)
306
307		try:
308			result = cursor.fetchall()
309		except AttributeError:
310			result = []
311		except psycopg2.ProgrammingError as e:
312			# there seems to be a bug with psycopg2 where it sometimes raises
313			# this for empty query results even though it shouldn't. this
314			# doesn't seem to indicate an actual problem so we catch the
315			# exception and return an empty list
316			self.rollback()
317			result = []
318			self.log.warning("Caught ProgrammingError: %s" % e)
319
320		cursor.close()
321		self.commit()
322
323		return result
324
325	def fetchone(self, query, *args):
326		"""
327		Fetch one result row
328
329		:param string query: Query
330		:param args: Replacement values
331		:param commit:  Commit transaction after query?
332		:return: The row, as a dictionary, or None if there were no rows
333		"""
334		cursor = self._execute_query(query, *args)
335
336		try:
337			result = cursor.fetchone()
338		except psycopg2.ProgrammingError as e:
339			# no results to fetch
340			self.rollback()
341			result = None
342			self.log.warning("Caught ProgrammingError: %s" % e)
343
344		cursor.close()
345		self.commit()
346
347		return result
348
349	def fetchall_interruptable(self, queue, query, *args):
350		"""
351		Fetch all rows for a query, allowing for interruption
352
353		Before running the query, a job is queued to cancel the query after a
354		set amount of time. The query is expected to complete before this
355		timeout. If the backend is interrupted, however, that job will be
356		executed immediately, to cancel the database query. If this happens, a
357		DatabaseQueryInterruptedException will be raised, but the database
358		object will otherwise remain useable.
359
360		Note that in the event that the cancellation job is run, all queries
361		for this instance of the database object will be cancelled. However,
362		there should never be more than one active query per connection within
363		4CAT.
364
365		:param JobQueue queue:  A job queue object, required to schedule the
366		query cancellation job
367		:param str query:  SQL query
368		:param list args:  Replacement variables
369		:param commit:  Commit transaction after query?
370		:return list:  A list of rows, as dictionaries
371		"""
372		# schedule a job that will cancel the query we're about to make
373		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
374
375		# run the query
376		cursor = self.get_cursor()
377		try:
378			cursor = self._execute_query(query, cursor=cursor, *args)
379		except psycopg2.extensions.QueryCanceledError:
380			# interrupted with cancellation worker (or manually)
381			self.log.debug("Query in connection %s was interrupted..." % self.appname)
382			self.rollback()
383			cursor.close()
384			raise DatabaseQueryInterruptedException("Interrupted while querying database")
385
386		# collect results
387		try:
388			result = cursor.fetchall()
389		except AttributeError:
390			result = []
391		except psycopg2.ProgrammingError as e:
392			result = []
393			self.log.warning("Caught ProgrammingError: %s" % e)
394
395		# clean up cancelling job when we have the data
396		self.interruptable_job.finish()
397		self.interruptable_job = None
398
399		cursor.close()
400		self.commit()
401
402		return result
403
404
405	def commit(self):
406		"""
407		Commit the current transaction
408
409		This is required for UPDATE etc to stick.
410		"""
411		self.connection.commit()
412
413	def rollback(self):
414		"""
415		Roll back the current transaction
416		"""
417		self.connection.rollback()
418
419	def close(self):
420		"""
421		Close connection
422
423		Running queries after this is probably a bad idea!
424		"""
425		self.connection.close()
426
427	def get_cursor(self):
428		"""
429		Get a new cursor
430
431		Re-using cursors seems to give issues when using per-thread
432		connections, so simply instantiate a new one each time
433
434		:return: Cursor
435		"""
436		try:
437			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
438		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
439			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
440			self.reconnect()
441			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
class Database:
 16class Database:
 17	"""
 18	Simple database handler
 19
 20	Offers a number of abstraction methods that limit how much SQL one is
 21	required to write. Also makes the database connection mostly multithreading
 22	proof by instantiating a new cursor for each query (and closing it afterwards)
 23	"""
 24	cursor = None
 25	log = None
 26	appname=""
 27
 28	interrupted = False
 29	interruptable_timeout = 86400  # if a query takes this long, it should be cancelled. see also fetchall_interruptable()
 30	interruptable_job = None
 31
 32	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
 33		"""
 34		Set up database connection
 35
 36		:param logger:  Logger instance
 37		:param dbname:  Database name
 38		:param user:  Database username
 39		:param password:  Database password
 40		:param host:  Database server address
 41		:param port:  Database port
 42		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
 43		"""
 44		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
 45
 46		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
 47		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 48		self.log = logger
 49
 50		if self.log is None:
 51			self.log = logging
 52
 53	def reconnect(self, tries=3, wait=10):
 54		"""
 55		Reconnect to the database
 56
 57		:param int tries: Number of tries to reconnect
 58        :param int wait: Time to wait between tries (first try is immediate)
 59		"""
 60		for i in range(tries):
 61			try:
 62				self.connection = psycopg2.connect(dbname=self.connection.info.dbname,
 63												   user=self.connection.info.user,
 64												   password=self.connection.info.password,
 65												   host=self.connection.info.host,
 66												   port=self.connection.info.port,
 67												   application_name=self.appname)
 68				self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 69				return
 70			except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
 71				self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
 72				time.sleep(wait)
 73		self.log.error("Failed to reconnect to database after %d tries" % tries)
 74
 75	def _execute_query(self, query, replacements=None, cursor=None):
 76		"""
 77		Execute a query
 78
 79		Simple wrapper to get a cursor to execute a query with. Do not call
 80		from outside this class - use `execute()` instead.
 81
 82		:param string query: Query
 83		:param args: Replacement values
 84		:param cursor: Cursor to use. Default - use common cursor
 85		:return None:
 86		"""
 87		if not cursor:
 88			cursor = self.get_cursor()
 89
 90		self.log.debug("Executing query %s" % cursor.mogrify(query, replacements))
 91		try:
 92			cursor.execute(query, replacements)
 93		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
 94			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
 95			self.reconnect()
 96			cursor = self.get_cursor()
 97			cursor.execute(query, replacements)
 98		return cursor
 99
100	def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True):
101		"""
102		Execute a query, and commit afterwards
103
104		This is required for UPDATE/INSERT/DELETE/etc to stick!
105
106		:param string query:  Query
107		:param cursor: Cursor to use. By default, use the result of
108		`get_cursor()`.
109		:param replacements: Replacement values
110		:param bool commit:  Commit transaction after query?
111		:param bool close_cursor:  Close cursor after query?
112		"""
113		cursor = self._execute_query(query, replacements, cursor)
114
115		if commit:
116			self.commit()
117
118		rowcount = cursor.rowcount
119
120		if close_cursor:
121			cursor.close()
122
123		return rowcount
124
125	def execute_many(self, query, commit=True, replacements=None):
126		"""
127		Execute a query multiple times, each time with different values
128
129		This makes it particularly suitable for INSERT queries, but other types
130		of query using VALUES are possible too.
131
132		:param string query:  Query
133		:param replacements: A list of replacement values
134		:param commit:  Commit transaction after query?
135		"""
136		cursor = self.get_cursor()
137		try:
138			execute_values(cursor, query, replacements)
139		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
140			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
141			self.reconnect()
142			cursor = self.get_cursor()
143			execute_values(cursor, query, replacements)
144
145		cursor.close()
146		if commit:
147			self.commit()
148
149	def update(self, table, data, where=None, commit=True):
150		"""
151		Update a database record
152
153		:param string table:  Table to update
154		:param dict data:  Data to set, Column => Value
155		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
156		:param bool commit:  Whether to commit after executing the query
157
158		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
159		"""
160		if where is None:
161			where = {}
162
163		# build query
164		identifiers = [sql.Identifier(column) for column in data.keys()]
165		identifiers.insert(0, sql.Identifier(table))
166		replacements = list(data.values())
167
168		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
169		if where:
170			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
171			for column in where.keys():
172				identifiers.append(sql.Identifier(column))
173				replacements.append(where[column])
174
175		query = sql.SQL(query).format(*identifiers)
176
177		rowcount = self.execute(query, replacements=replacements, commit=commit)
178		return rowcount
179
180	def delete(self, table, where, commit=True):
181		"""
182		Delete a database record
183
184		:param string table:  Table to delete from
185		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
186		:param bool commit:  Whether to commit after executing the query
187
188		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
189		"""
190		where_sql = []
191		replacements = []
192		for column in where.keys():
193			if type(where[column]) in (set, tuple, list):
194				where_sql.append("{} IN %s")
195				replacements.append(tuple(where[column]))
196			else:
197				where_sql.append("{} = %s")
198				replacements.append(where[column])
199
200		# build query
201		identifiers = [sql.Identifier(column) for column in where.keys()]
202		identifiers.insert(0, sql.Identifier(table))
203		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
204
205		rowcount = self.execute(query, replacements=replacements, commit=commit)
206		return rowcount
207
208	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
209		"""
210		Create database record
211
212		:param string table:  Table to insert record into
213		:param dict data:   Data to insert
214		:param bool commit: Whether to commit after executing the query
215		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
216						  insert the row and no error is thrown when the insert violates a unique index or other constraint
217		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
218								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
219		:param str return_field: If not empty or None, this makes the method
220		return this field of the inserted row, instead of the number of
221		affected rows, with `RETURNING`.
222		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
223		"""
224		if constraints is None:
225			constraints = []
226
227		# escape identifiers
228		identifiers = [sql.Identifier(column) for column in data.keys()]
229		identifiers.insert(0, sql.Identifier(table))
230
231		# construct ON NOTHING bit of query
232		if safe:
233			safe_bit = " ON CONFLICT "
234			if constraints:
235				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
236				identifiers.extend([sql.Identifier(column) for column in constraints])
237			safe_bit += " DO NOTHING"
238		else:
239			safe_bit = ""
240
241		# prepare parameter replacements
242		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
243
244		if return_field:
245			protoquery += " RETURNING {}"
246			identifiers.append(sql.Identifier(return_field))
247
248		query = sql.SQL(protoquery).format(*identifiers)
249		replacements = (tuple(data.values()),)
250
251		cursor = self.get_cursor()
252		rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False)
253
254		result = rowcount if not return_field else cursor.fetchone()[return_field]
255		cursor.close()
256		return result
257
258	def upsert(self, table, data, commit=True, constraints=None):
259		"""
260		Create or update database record
261
262		If the record could not be inserted because of a constraint, the
263		constraining record is updated instead.
264
265		:param string table:  Table to upsert record into
266		:param dict data:   Data to upsert
267		:param bool commit: Whether to commit after executing the query
268		:param tuple constraints: This tuple may contain the columns that should be used as a
269								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
270		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
271		"""
272		if constraints is None:
273			constraints = []
274
275		# escape identifiers
276		identifiers = [sql.Identifier(column) for column in data.keys()]
277		identifiers.insert(0, sql.Identifier(table))
278
279		# prepare parameter replacements
280		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
281		protoquery += " ON CONFLICT"
282
283		if constraints:
284			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
285			identifiers.extend([sql.Identifier(column) for column in constraints])
286
287		protoquery += " DO UPDATE SET "
288		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
289		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
290
291		query = sql.SQL(protoquery).format(*identifiers)
292		replacements = (tuple(data.values()),)
293
294		rowcount = self.execute(query, replacements=replacements, commit=commit)
295		return rowcount
296
297	def fetchall(self, query, *args):
298		"""
299		Fetch all rows for a query
300
301		:param string query:  Query
302		:param args: Replacement values
303		:param commit:  Commit transaction after query?
304		:return list: The result rows, as a list
305		"""
306		cursor = self._execute_query(query, *args)
307
308		try:
309			result = cursor.fetchall()
310		except AttributeError:
311			result = []
312		except psycopg2.ProgrammingError as e:
313			# there seems to be a bug with psycopg2 where it sometimes raises
314			# this for empty query results even though it shouldn't. this
315			# doesn't seem to indicate an actual problem so we catch the
316			# exception and return an empty list
317			self.rollback()
318			result = []
319			self.log.warning("Caught ProgrammingError: %s" % e)
320
321		cursor.close()
322		self.commit()
323
324		return result
325
326	def fetchone(self, query, *args):
327		"""
328		Fetch one result row
329
330		:param string query: Query
331		:param args: Replacement values
332		:param commit:  Commit transaction after query?
333		:return: The row, as a dictionary, or None if there were no rows
334		"""
335		cursor = self._execute_query(query, *args)
336
337		try:
338			result = cursor.fetchone()
339		except psycopg2.ProgrammingError as e:
340			# no results to fetch
341			self.rollback()
342			result = None
343			self.log.warning("Caught ProgrammingError: %s" % e)
344
345		cursor.close()
346		self.commit()
347
348		return result
349
350	def fetchall_interruptable(self, queue, query, *args):
351		"""
352		Fetch all rows for a query, allowing for interruption
353
354		Before running the query, a job is queued to cancel the query after a
355		set amount of time. The query is expected to complete before this
356		timeout. If the backend is interrupted, however, that job will be
357		executed immediately, to cancel the database query. If this happens, a
358		DatabaseQueryInterruptedException will be raised, but the database
359		object will otherwise remain useable.
360
361		Note that in the event that the cancellation job is run, all queries
362		for this instance of the database object will be cancelled. However,
363		there should never be more than one active query per connection within
364		4CAT.
365
366		:param JobQueue queue:  A job queue object, required to schedule the
367		query cancellation job
368		:param str query:  SQL query
369		:param list args:  Replacement variables
370		:param commit:  Commit transaction after query?
371		:return list:  A list of rows, as dictionaries
372		"""
373		# schedule a job that will cancel the query we're about to make
374		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
375
376		# run the query
377		cursor = self.get_cursor()
378		try:
379			cursor = self._execute_query(query, cursor=cursor, *args)
380		except psycopg2.extensions.QueryCanceledError:
381			# interrupted with cancellation worker (or manually)
382			self.log.debug("Query in connection %s was interrupted..." % self.appname)
383			self.rollback()
384			cursor.close()
385			raise DatabaseQueryInterruptedException("Interrupted while querying database")
386
387		# collect results
388		try:
389			result = cursor.fetchall()
390		except AttributeError:
391			result = []
392		except psycopg2.ProgrammingError as e:
393			result = []
394			self.log.warning("Caught ProgrammingError: %s" % e)
395
396		# clean up cancelling job when we have the data
397		self.interruptable_job.finish()
398		self.interruptable_job = None
399
400		cursor.close()
401		self.commit()
402
403		return result
404
405
406	def commit(self):
407		"""
408		Commit the current transaction
409
410		This is required for UPDATE etc to stick.
411		"""
412		self.connection.commit()
413
414	def rollback(self):
415		"""
416		Roll back the current transaction
417		"""
418		self.connection.rollback()
419
420	def close(self):
421		"""
422		Close connection
423
424		Running queries after this is probably a bad idea!
425		"""
426		self.connection.close()
427
428	def get_cursor(self):
429		"""
430		Get a new cursor
431
432		Re-using cursors seems to give issues when using per-thread
433		connections, so simply instantiate a new one each time
434
435		:return: Cursor
436		"""
437		try:
438			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
439		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
440			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
441			self.reconnect()
442			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

Simple database handler

Offers a number of abstraction methods that limit how much SQL one is required to write. Also makes the database connection mostly multithreading proof by instantiating a new cursor for each query (and closing it afterwards)

Database( logger, dbname=None, user=None, password=None, host=None, port=None, appname=None)
32	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
33		"""
34		Set up database connection
35
36		:param logger:  Logger instance
37		:param dbname:  Database name
38		:param user:  Database username
39		:param password:  Database password
40		:param host:  Database server address
41		:param port:  Database port
42		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
43		"""
44		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
45
46		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
47		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
48		self.log = logger
49
50		if self.log is None:
51			self.log = logging

Set up database connection

Parameters
  • logger: Logger instance
  • dbname: Database name
  • user: Database username
  • password: Database password
  • host: Database server address
  • port: Database port
  • appname: App name, mostly useful to trace connections in pg_stat_activity
cursor = None
log = None
appname = ''
interrupted = False
interruptable_timeout = 86400
interruptable_job = None
connection
def reconnect(self, tries=3, wait=10):
53	def reconnect(self, tries=3, wait=10):
54		"""
55		Reconnect to the database
56
57		:param int tries: Number of tries to reconnect
58        :param int wait: Time to wait between tries (first try is immediate)
59		"""
60		for i in range(tries):
61			try:
62				self.connection = psycopg2.connect(dbname=self.connection.info.dbname,
63												   user=self.connection.info.user,
64												   password=self.connection.info.password,
65												   host=self.connection.info.host,
66												   port=self.connection.info.port,
67												   application_name=self.appname)
68				self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
69				return
70			except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
71				self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
72				time.sleep(wait)
73		self.log.error("Failed to reconnect to database after %d tries" % tries)

Reconnect to the database

    :param int tries: Number of tries to reconnect
Parameters
  • int wait: Time to wait between tries (first try is immediate)
def execute( self, query, replacements=None, commit=True, cursor=None, close_cursor=True):
100	def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True):
101		"""
102		Execute a query, and commit afterwards
103
104		This is required for UPDATE/INSERT/DELETE/etc to stick!
105
106		:param string query:  Query
107		:param cursor: Cursor to use. By default, use the result of
108		`get_cursor()`.
109		:param replacements: Replacement values
110		:param bool commit:  Commit transaction after query?
111		:param bool close_cursor:  Close cursor after query?
112		"""
113		cursor = self._execute_query(query, replacements, cursor)
114
115		if commit:
116			self.commit()
117
118		rowcount = cursor.rowcount
119
120		if close_cursor:
121			cursor.close()
122
123		return rowcount

Execute a query, and commit afterwards

This is required for UPDATE/INSERT/DELETE/etc to stick!

Parameters
  • string query: Query
  • cursor: Cursor to use. By default, use the result of get_cursor().
  • replacements: Replacement values
  • bool commit: Commit transaction after query?
  • bool close_cursor: Close cursor after query?
def execute_many(self, query, commit=True, replacements=None):
125	def execute_many(self, query, commit=True, replacements=None):
126		"""
127		Execute a query multiple times, each time with different values
128
129		This makes it particularly suitable for INSERT queries, but other types
130		of query using VALUES are possible too.
131
132		:param string query:  Query
133		:param replacements: A list of replacement values
134		:param commit:  Commit transaction after query?
135		"""
136		cursor = self.get_cursor()
137		try:
138			execute_values(cursor, query, replacements)
139		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
140			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
141			self.reconnect()
142			cursor = self.get_cursor()
143			execute_values(cursor, query, replacements)
144
145		cursor.close()
146		if commit:
147			self.commit()

Execute a query multiple times, each time with different values

This makes it particularly suitable for INSERT queries, but other types of query using VALUES are possible too.

Parameters
  • string query: Query
  • replacements: A list of replacement values
  • commit: Commit transaction after query?
def update(self, table, data, where=None, commit=True):
149	def update(self, table, data, where=None, commit=True):
150		"""
151		Update a database record
152
153		:param string table:  Table to update
154		:param dict data:  Data to set, Column => Value
155		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
156		:param bool commit:  Whether to commit after executing the query
157
158		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
159		"""
160		if where is None:
161			where = {}
162
163		# build query
164		identifiers = [sql.Identifier(column) for column in data.keys()]
165		identifiers.insert(0, sql.Identifier(table))
166		replacements = list(data.values())
167
168		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
169		if where:
170			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
171			for column in where.keys():
172				identifiers.append(sql.Identifier(column))
173				replacements.append(where[column])
174
175		query = sql.SQL(query).format(*identifiers)
176
177		rowcount = self.execute(query, replacements=replacements, commit=commit)
178		return rowcount

Update a database record

Parameters
  • string table: Table to update
  • dict data: Data to set, Column => Value
  • dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
  • bool commit: Whether to commit after executing the query
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def delete(self, table, where, commit=True):
180	def delete(self, table, where, commit=True):
181		"""
182		Delete a database record
183
184		:param string table:  Table to delete from
185		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
186		:param bool commit:  Whether to commit after executing the query
187
188		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
189		"""
190		where_sql = []
191		replacements = []
192		for column in where.keys():
193			if type(where[column]) in (set, tuple, list):
194				where_sql.append("{} IN %s")
195				replacements.append(tuple(where[column]))
196			else:
197				where_sql.append("{} = %s")
198				replacements.append(where[column])
199
200		# build query
201		identifiers = [sql.Identifier(column) for column in where.keys()]
202		identifiers.insert(0, sql.Identifier(table))
203		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
204
205		rowcount = self.execute(query, replacements=replacements, commit=commit)
206		return rowcount

Delete a database record

Parameters
  • string table: Table to delete from
  • dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
  • bool commit: Whether to commit after executing the query
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def insert( self, table, data, commit=True, safe=False, constraints=None, return_field=''):
208	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
209		"""
210		Create database record
211
212		:param string table:  Table to insert record into
213		:param dict data:   Data to insert
214		:param bool commit: Whether to commit after executing the query
215		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
216						  insert the row and no error is thrown when the insert violates a unique index or other constraint
217		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
218								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
219		:param str return_field: If not empty or None, this makes the method
220		return this field of the inserted row, instead of the number of
221		affected rows, with `RETURNING`.
222		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
223		"""
224		if constraints is None:
225			constraints = []
226
227		# escape identifiers
228		identifiers = [sql.Identifier(column) for column in data.keys()]
229		identifiers.insert(0, sql.Identifier(table))
230
231		# construct ON NOTHING bit of query
232		if safe:
233			safe_bit = " ON CONFLICT "
234			if constraints:
235				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
236				identifiers.extend([sql.Identifier(column) for column in constraints])
237			safe_bit += " DO NOTHING"
238		else:
239			safe_bit = ""
240
241		# prepare parameter replacements
242		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
243
244		if return_field:
245			protoquery += " RETURNING {}"
246			identifiers.append(sql.Identifier(return_field))
247
248		query = sql.SQL(protoquery).format(*identifiers)
249		replacements = (tuple(data.values()),)
250
251		cursor = self.get_cursor()
252		rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False)
253
254		result = rowcount if not return_field else cursor.fetchone()[return_field]
255		cursor.close()
256		return result

Create database record

Parameters
  • string table: Table to insert record into
  • dict data: Data to insert
  • bool commit: Whether to commit after executing the query
  • bool safe: If set to True, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not insert the row and no error is thrown when the insert violates a unique index or other constraint
  • tuple constraints: If safe is True, this tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
  • str return_field: If not empty or None, this makes the method return this field of the inserted row, instead of the number of affected rows, with RETURNING.
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def upsert(self, table, data, commit=True, constraints=None):
258	def upsert(self, table, data, commit=True, constraints=None):
259		"""
260		Create or update database record
261
262		If the record could not be inserted because of a constraint, the
263		constraining record is updated instead.
264
265		:param string table:  Table to upsert record into
266		:param dict data:   Data to upsert
267		:param bool commit: Whether to commit after executing the query
268		:param tuple constraints: This tuple may contain the columns that should be used as a
269								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
270		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
271		"""
272		if constraints is None:
273			constraints = []
274
275		# escape identifiers
276		identifiers = [sql.Identifier(column) for column in data.keys()]
277		identifiers.insert(0, sql.Identifier(table))
278
279		# prepare parameter replacements
280		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
281		protoquery += " ON CONFLICT"
282
283		if constraints:
284			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
285			identifiers.extend([sql.Identifier(column) for column in constraints])
286
287		protoquery += " DO UPDATE SET "
288		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
289		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
290
291		query = sql.SQL(protoquery).format(*identifiers)
292		replacements = (tuple(data.values()),)
293
294		rowcount = self.execute(query, replacements=replacements, commit=commit)
295		return rowcount

Create or update database record

If the record could not be inserted because of a constraint, the constraining record is updated instead.

Parameters
  • string table: Table to upsert record into
  • dict data: Data to upsert
  • bool commit: Whether to commit after executing the query
  • tuple constraints: This tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def fetchall(self, query, *args):
297	def fetchall(self, query, *args):
298		"""
299		Fetch all rows for a query
300
301		:param string query:  Query
302		:param args: Replacement values
303		:param commit:  Commit transaction after query?
304		:return list: The result rows, as a list
305		"""
306		cursor = self._execute_query(query, *args)
307
308		try:
309			result = cursor.fetchall()
310		except AttributeError:
311			result = []
312		except psycopg2.ProgrammingError as e:
313			# there seems to be a bug with psycopg2 where it sometimes raises
314			# this for empty query results even though it shouldn't. this
315			# doesn't seem to indicate an actual problem so we catch the
316			# exception and return an empty list
317			self.rollback()
318			result = []
319			self.log.warning("Caught ProgrammingError: %s" % e)
320
321		cursor.close()
322		self.commit()
323
324		return result

Fetch all rows for a query

Parameters
  • string query: Query
  • args: Replacement values
  • commit: Commit transaction after query?
Returns

The result rows, as a list

def fetchone(self, query, *args):
326	def fetchone(self, query, *args):
327		"""
328		Fetch one result row
329
330		:param string query: Query
331		:param args: Replacement values
332		:param commit:  Commit transaction after query?
333		:return: The row, as a dictionary, or None if there were no rows
334		"""
335		cursor = self._execute_query(query, *args)
336
337		try:
338			result = cursor.fetchone()
339		except psycopg2.ProgrammingError as e:
340			# no results to fetch
341			self.rollback()
342			result = None
343			self.log.warning("Caught ProgrammingError: %s" % e)
344
345		cursor.close()
346		self.commit()
347
348		return result

Fetch one result row

Parameters
  • string query: Query
  • args: Replacement values
  • commit: Commit transaction after query?
Returns

The row, as a dictionary, or None if there were no rows

def fetchall_interruptable(self, queue, query, *args):
350	def fetchall_interruptable(self, queue, query, *args):
351		"""
352		Fetch all rows for a query, allowing for interruption
353
354		Before running the query, a job is queued to cancel the query after a
355		set amount of time. The query is expected to complete before this
356		timeout. If the backend is interrupted, however, that job will be
357		executed immediately, to cancel the database query. If this happens, a
358		DatabaseQueryInterruptedException will be raised, but the database
359		object will otherwise remain useable.
360
361		Note that in the event that the cancellation job is run, all queries
362		for this instance of the database object will be cancelled. However,
363		there should never be more than one active query per connection within
364		4CAT.
365
366		:param JobQueue queue:  A job queue object, required to schedule the
367		query cancellation job
368		:param str query:  SQL query
369		:param list args:  Replacement variables
370		:param commit:  Commit transaction after query?
371		:return list:  A list of rows, as dictionaries
372		"""
373		# schedule a job that will cancel the query we're about to make
374		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
375
376		# run the query
377		cursor = self.get_cursor()
378		try:
379			cursor = self._execute_query(query, cursor=cursor, *args)
380		except psycopg2.extensions.QueryCanceledError:
381			# interrupted with cancellation worker (or manually)
382			self.log.debug("Query in connection %s was interrupted..." % self.appname)
383			self.rollback()
384			cursor.close()
385			raise DatabaseQueryInterruptedException("Interrupted while querying database")
386
387		# collect results
388		try:
389			result = cursor.fetchall()
390		except AttributeError:
391			result = []
392		except psycopg2.ProgrammingError as e:
393			result = []
394			self.log.warning("Caught ProgrammingError: %s" % e)
395
396		# clean up cancelling job when we have the data
397		self.interruptable_job.finish()
398		self.interruptable_job = None
399
400		cursor.close()
401		self.commit()
402
403		return result

Fetch all rows for a query, allowing for interruption

Before running the query, a job is queued to cancel the query after a set amount of time. The query is expected to complete before this timeout. If the backend is interrupted, however, that job will be executed immediately, to cancel the database query. If this happens, a DatabaseQueryInterruptedException will be raised, but the database object will otherwise remain useable.

Note that in the event that the cancellation job is run, all queries for this instance of the database object will be cancelled. However, there should never be more than one active query per connection within 4CAT.

Parameters
  • JobQueue queue: A job queue object, required to schedule the query cancellation job
  • str query: SQL query
  • list args: Replacement variables
  • commit: Commit transaction after query?
Returns

A list of rows, as dictionaries

def commit(self):
406	def commit(self):
407		"""
408		Commit the current transaction
409
410		This is required for UPDATE etc to stick.
411		"""
412		self.connection.commit()

Commit the current transaction

This is required for UPDATE etc to stick.

def rollback(self):
414	def rollback(self):
415		"""
416		Roll back the current transaction
417		"""
418		self.connection.rollback()

Roll back the current transaction

def close(self):
420	def close(self):
421		"""
422		Close connection
423
424		Running queries after this is probably a bad idea!
425		"""
426		self.connection.close()

Close connection

Running queries after this is probably a bad idea!

def get_cursor(self):
428	def get_cursor(self):
429		"""
430		Get a new cursor
431
432		Re-using cursors seems to give issues when using per-thread
433		connections, so simply instantiate a new one each time
434
435		:return: Cursor
436		"""
437		try:
438			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
439		except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
440			self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...")
441			self.reconnect()
442			return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

Get a new cursor

Re-using cursors seems to give issues when using per-thread connections, so simply instantiate a new one each time

Returns

Cursor