Edit on GitHub

common.lib.database

Database wrapper

  1"""
  2Database wrapper
  3"""
  4import itertools
  5import psycopg2.extras
  6import psycopg2
  7import logging
  8import time
  9
 10from psycopg2 import sql
 11from psycopg2.extras import execute_values
 12
 13from common.lib.exceptions import DatabaseQueryInterruptedException
 14
 15class Database:
 16	"""
 17	Simple database handler
 18
 19	Offers a number of abstraction methods that limit how much SQL one is
 20	required to write. Also makes the database connection mostly multithreading
 21	proof by instantiating a new cursor for each query (and closing it afterwards)
 22	"""
 23	cursor = None
 24	log = None
 25	appname=""
 26
 27	interrupted = False
 28	interruptable_timeout = 86400  # if a query takes this long, it should be cancelled. see also fetchall_interruptable()
 29	interruptable_job = None
 30
 31	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
 32		"""
 33		Set up database connection
 34
 35		:param logger:  Logger instance
 36		:param dbname:  Database name
 37		:param user:  Database username
 38		:param password:  Database password
 39		:param host:  Database server address
 40		:param port:  Database port
 41		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
 42		"""
 43		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
 44
 45		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
 46		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 47		self.log = logger
 48
 49		if self.log is None:
 50			self.log = logging
 51
 52		self.commit()
 53
 54	def query(self, query, replacements=None, cursor=None):
 55		"""
 56		Execute a query
 57
 58		:param string query: Query
 59		:param args: Replacement values
 60		:param cursor: Cursor to use. Default - use common cursor
 61		:return None:
 62		"""
 63		if not cursor:
 64			cursor = self.get_cursor()
 65
 66		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
 67
 68		return cursor.execute(query, replacements)
 69
 70	def execute(self, query, replacements=None):
 71		"""
 72		Execute a query, and commit afterwards
 73
 74		This is required for UPDATE/INSERT/DELETE/etc to stick
 75		:param string query:  Query
 76		:param replacements: Replacement values
 77		"""
 78		cursor = self.get_cursor()
 79
 80		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
 81		cursor.execute(query, replacements)
 82		self.commit()
 83
 84		cursor.close()
 85
 86	def execute_many(self, query, commit=True, replacements=None):
 87		"""
 88		Execute a query multiple times, each time with different values
 89
 90		This makes it particularly suitable for INSERT queries, but other types
 91		of query using VALUES are possible too.
 92
 93		:param string query:  Query
 94		:param replacements: A list of replacement values
 95		:param commit:  Commit transaction after query?
 96		"""
 97		cursor = self.get_cursor()
 98		execute_values(cursor, query, replacements)
 99		cursor.close()
100		if commit:
101			self.commit()
102
103	def update(self, table, data, where=None, commit=True):
104		"""
105		Update a database record
106
107		:param string table:  Table to update
108		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
109		:param dict data:  Data to set, Column => Value
110		:param bool commit:  Whether to commit after executing the query
111
112		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
113		"""
114		if where is None:
115			where = {}
116
117		# build query
118		identifiers = [sql.Identifier(column) for column in data.keys()]
119		identifiers.insert(0, sql.Identifier(table))
120		replacements = list(data.values())
121
122		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
123		if where:
124			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
125			for column in where.keys():
126				identifiers.append(sql.Identifier(column))
127				replacements.append(where[column])
128
129		query = sql.SQL(query).format(*identifiers)
130
131		cursor = self.get_cursor()
132		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
133		cursor.execute(query, replacements)
134
135		if commit:
136			self.commit()
137
138		result = cursor.rowcount
139		cursor.close()
140		return result
141
142	def delete(self, table, where, commit=True):
143		"""
144		Delete a database record
145
146		:param string table:  Table to delete from
147		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
148		:param bool commit:  Whether to commit after executing the query
149
150		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
151		"""
152		where_sql = []
153		replacements = []
154		for column in where.keys():
155			if type(where[column]) in (set, tuple, list):
156				where_sql.append("{} IN %s")
157				replacements.append(tuple(where[column]))
158			else:
159				where_sql.append("{} = %s")
160				replacements.append(where[column])
161
162		# build query
163		identifiers = [sql.Identifier(column) for column in where.keys()]
164		identifiers.insert(0, sql.Identifier(table))
165		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
166
167		cursor = self.get_cursor()
168		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
169		cursor.execute(query, replacements)
170
171		if commit:
172			self.commit()
173
174		result = cursor.rowcount
175		cursor.close()
176		return result
177
178	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
179		"""
180		Create database record
181
182		:param string table:  Table to insert record into
183		:param dict data:   Data to insert
184		:param bool commit: Whether to commit after executing the query
185		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
186						  insert the row and no error is thrown when the insert violates a unique index or other constraint
187		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
188								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
189		:param str return_field: If not empty or None, this makes the method
190		return this field of the inserted row, instead of the number of
191		affected rows, with `RETURNING`.
192		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
193		"""
194		if constraints is None:
195			constraints = []
196
197		# escape identifiers
198		identifiers = [sql.Identifier(column) for column in data.keys()]
199		identifiers.insert(0, sql.Identifier(table))
200
201		# construct ON NOTHING bit of query
202		if safe:
203			safe_bit = " ON CONFLICT "
204			if constraints:
205				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
206				identifiers.extend([sql.Identifier(column) for column in constraints])
207			safe_bit += " DO NOTHING"
208		else:
209			safe_bit = ""
210
211		# prepare parameter replacements
212		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
213
214		if return_field:
215			protoquery += " RETURNING {}"
216			identifiers.append(sql.Identifier(return_field))
217
218		query = sql.SQL(protoquery).format(*identifiers)
219		replacements = (tuple(data.values()),)
220
221		cursor = self.get_cursor()
222		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
223		cursor.execute(query, replacements)
224
225		if commit:
226			self.commit()
227
228		result = cursor.rowcount if not return_field else cursor.fetchone()[return_field]
229		cursor.close()
230		return result
231
232	def upsert(self, table, data, commit=True, constraints=None):
233		"""
234		Create or update database record
235
236		If the record could not be inserted because of a constraint, the
237		constraining record is updated instead.
238
239		:param string table:  Table to upsert record into
240		:param dict data:   Data to upsert
241		:param bool commit: Whether to commit after executing the query
242		:param tuple constraints: This tuple may contain the columns that should be used as a
243								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
244		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
245		"""
246		if constraints is None:
247			constraints = []
248
249		# escape identifiers
250		identifiers = [sql.Identifier(column) for column in data.keys()]
251		identifiers.insert(0, sql.Identifier(table))
252
253		# prepare parameter replacements
254		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
255		protoquery += " ON CONFLICT"
256
257		if constraints:
258			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
259			identifiers.extend([sql.Identifier(column) for column in constraints])
260
261		protoquery += " DO UPDATE SET "
262		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
263		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
264
265		query = sql.SQL(protoquery).format(*identifiers)
266		replacements = (tuple(data.values()),)
267
268		cursor = self.get_cursor()
269		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
270		cursor.execute(query, replacements)
271
272		if commit:
273			self.commit()
274
275		result = cursor.rowcount
276		cursor.close()
277		return result
278
279	def fetchall(self, query, *args):
280		"""
281		Fetch all rows for a query
282
283		:param string query:  Query
284		:param args: Replacement values
285		:param commit:  Commit transaction after query?
286		:return list: The result rows, as a list
287		"""
288		cursor = self.get_cursor()
289		self.log.debug("Executing query: %s" % cursor.mogrify(query, *args))
290		self.query(query, cursor=cursor, *args)
291
292		try:
293			result = cursor.fetchall()
294		except AttributeError:
295			result = []
296		except psycopg2.ProgrammingError as e:
297			# there seems to be a bug with psycopg2 where it sometimes raises
298			# this for empty query results even though it shouldn't. this
299			# doesn't seem to indicate an actual problem so we catch the
300			# exception and return an empty list
301			self.rollback()
302			result = []
303			self.log.warning("Caught ProgrammingError: %s" % e)
304
305		cursor.close()
306		self.commit()
307
308		return result
309
310	def fetchone(self, query, *args):
311		"""
312		Fetch one result row
313
314		:param string query: Query
315		:param args: Replacement values
316		:param commit:  Commit transaction after query?
317		:return: The row, as a dictionary, or None if there were no rows
318		"""
319		cursor = self.get_cursor()
320		self.query(query, cursor=cursor, *args)
321
322		try:
323			result = cursor.fetchone()
324		except psycopg2.ProgrammingError as e:
325			# no results to fetch
326			self.rollback()
327			result = None
328			self.log.warning("Caught ProgrammingError: %s" % e)
329
330		cursor.close()
331		self.commit()
332
333		return result
334
335	def fetchall_interruptable(self, queue, query, *args):
336		"""
337		Fetch all rows for a query, allowing for interruption
338
339		Before running the query, a job is queued to cancel the query after a
340		set amount of time. The query is expected to complete before this
341		timeout. If the backend is interrupted, however, that job will be
342		executed immediately, to cancel the database query. If this happens, a
343		DatabaseQueryInterruptedException will be raised, but the database
344		object will otherwise remain useable.
345
346		Note that in the event that the cancellation job is run, all queries
347		for this instance of the database object will be cancelled. However,
348		there should never be more than one active query per connection within
349		4CAT.
350
351		:param JobQueue queue:  A job queue object, required to schedule the
352		query cancellation job
353		:param str query:  SQL query
354		:param list args:  Replacement variables
355		:param commit:  Commit transaction after query?
356		:return list:  A list of rows, as dictionaries
357		"""
358		# schedule a job that will cancel the query we're about to make
359		pid = self.connection.get_backend_pid()
360		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
361
362		# make the query
363		cursor = self.get_cursor()
364		self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args))
365
366		try:
367			self.query(query, cursor=cursor, *args)
368		except psycopg2.extensions.QueryCanceledError:
369			# interrupted with cancellation worker (or manually)
370			self.log.debug("Query in connection %s was interrupted..." % self.appname)
371			self.rollback()
372			cursor.close()
373			raise DatabaseQueryInterruptedException("Interrupted while querying database")
374
375		# collect results
376		try:
377			result = cursor.fetchall()
378		except AttributeError as e:
379			result = []
380		except psycopg2.ProgrammingError as e:
381			result = []
382			self.log.warning("Caught ProgrammingError: %s" % e)
383
384		# clean up cancelling job when we have the data
385		self.interruptable_job.finish()
386		self.interruptable_job = None
387
388		cursor.close()
389		self.commit()
390
391		return result
392
393
394	def commit(self):
395		"""
396		Commit the current transaction
397
398		This is required for UPDATE etc to stick.
399		"""
400		self.connection.commit()
401
402	def rollback(self):
403		"""
404		Roll back the current transaction
405		"""
406		self.connection.rollback()
407
408	def close(self):
409		"""
410		Close connection
411
412		Running queries after this is probably a bad idea!
413		"""
414		self.connection.close()
415
416	def get_cursor(self):
417		"""
418		Get a new cursor
419
420		Re-using cursors seems to give issues when using per-thread
421		connections, so simply instantiate a new one each time
422
423		:return: Cursor
424		"""
425		return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
class Database:
 16class Database:
 17	"""
 18	Simple database handler
 19
 20	Offers a number of abstraction methods that limit how much SQL one is
 21	required to write. Also makes the database connection mostly multithreading
 22	proof by instantiating a new cursor for each query (and closing it afterwards)
 23	"""
 24	cursor = None
 25	log = None
 26	appname=""
 27
 28	interrupted = False
 29	interruptable_timeout = 86400  # if a query takes this long, it should be cancelled. see also fetchall_interruptable()
 30	interruptable_job = None
 31
 32	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
 33		"""
 34		Set up database connection
 35
 36		:param logger:  Logger instance
 37		:param dbname:  Database name
 38		:param user:  Database username
 39		:param password:  Database password
 40		:param host:  Database server address
 41		:param port:  Database port
 42		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
 43		"""
 44		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
 45
 46		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
 47		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 48		self.log = logger
 49
 50		if self.log is None:
 51			self.log = logging
 52
 53		self.commit()
 54
 55	def query(self, query, replacements=None, cursor=None):
 56		"""
 57		Execute a query
 58
 59		:param string query: Query
 60		:param args: Replacement values
 61		:param cursor: Cursor to use. Default - use common cursor
 62		:return None:
 63		"""
 64		if not cursor:
 65			cursor = self.get_cursor()
 66
 67		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
 68
 69		return cursor.execute(query, replacements)
 70
 71	def execute(self, query, replacements=None):
 72		"""
 73		Execute a query, and commit afterwards
 74
 75		This is required for UPDATE/INSERT/DELETE/etc to stick
 76		:param string query:  Query
 77		:param replacements: Replacement values
 78		"""
 79		cursor = self.get_cursor()
 80
 81		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
 82		cursor.execute(query, replacements)
 83		self.commit()
 84
 85		cursor.close()
 86
 87	def execute_many(self, query, commit=True, replacements=None):
 88		"""
 89		Execute a query multiple times, each time with different values
 90
 91		This makes it particularly suitable for INSERT queries, but other types
 92		of query using VALUES are possible too.
 93
 94		:param string query:  Query
 95		:param replacements: A list of replacement values
 96		:param commit:  Commit transaction after query?
 97		"""
 98		cursor = self.get_cursor()
 99		execute_values(cursor, query, replacements)
100		cursor.close()
101		if commit:
102			self.commit()
103
104	def update(self, table, data, where=None, commit=True):
105		"""
106		Update a database record
107
108		:param string table:  Table to update
109		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
110		:param dict data:  Data to set, Column => Value
111		:param bool commit:  Whether to commit after executing the query
112
113		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
114		"""
115		if where is None:
116			where = {}
117
118		# build query
119		identifiers = [sql.Identifier(column) for column in data.keys()]
120		identifiers.insert(0, sql.Identifier(table))
121		replacements = list(data.values())
122
123		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
124		if where:
125			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
126			for column in where.keys():
127				identifiers.append(sql.Identifier(column))
128				replacements.append(where[column])
129
130		query = sql.SQL(query).format(*identifiers)
131
132		cursor = self.get_cursor()
133		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
134		cursor.execute(query, replacements)
135
136		if commit:
137			self.commit()
138
139		result = cursor.rowcount
140		cursor.close()
141		return result
142
143	def delete(self, table, where, commit=True):
144		"""
145		Delete a database record
146
147		:param string table:  Table to delete from
148		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
149		:param bool commit:  Whether to commit after executing the query
150
151		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
152		"""
153		where_sql = []
154		replacements = []
155		for column in where.keys():
156			if type(where[column]) in (set, tuple, list):
157				where_sql.append("{} IN %s")
158				replacements.append(tuple(where[column]))
159			else:
160				where_sql.append("{} = %s")
161				replacements.append(where[column])
162
163		# build query
164		identifiers = [sql.Identifier(column) for column in where.keys()]
165		identifiers.insert(0, sql.Identifier(table))
166		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
167
168		cursor = self.get_cursor()
169		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
170		cursor.execute(query, replacements)
171
172		if commit:
173			self.commit()
174
175		result = cursor.rowcount
176		cursor.close()
177		return result
178
179	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
180		"""
181		Create database record
182
183		:param string table:  Table to insert record into
184		:param dict data:   Data to insert
185		:param bool commit: Whether to commit after executing the query
186		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
187						  insert the row and no error is thrown when the insert violates a unique index or other constraint
188		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
189								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
190		:param str return_field: If not empty or None, this makes the method
191		return this field of the inserted row, instead of the number of
192		affected rows, with `RETURNING`.
193		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
194		"""
195		if constraints is None:
196			constraints = []
197
198		# escape identifiers
199		identifiers = [sql.Identifier(column) for column in data.keys()]
200		identifiers.insert(0, sql.Identifier(table))
201
202		# construct ON NOTHING bit of query
203		if safe:
204			safe_bit = " ON CONFLICT "
205			if constraints:
206				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
207				identifiers.extend([sql.Identifier(column) for column in constraints])
208			safe_bit += " DO NOTHING"
209		else:
210			safe_bit = ""
211
212		# prepare parameter replacements
213		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
214
215		if return_field:
216			protoquery += " RETURNING {}"
217			identifiers.append(sql.Identifier(return_field))
218
219		query = sql.SQL(protoquery).format(*identifiers)
220		replacements = (tuple(data.values()),)
221
222		cursor = self.get_cursor()
223		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
224		cursor.execute(query, replacements)
225
226		if commit:
227			self.commit()
228
229		result = cursor.rowcount if not return_field else cursor.fetchone()[return_field]
230		cursor.close()
231		return result
232
233	def upsert(self, table, data, commit=True, constraints=None):
234		"""
235		Create or update database record
236
237		If the record could not be inserted because of a constraint, the
238		constraining record is updated instead.
239
240		:param string table:  Table to upsert record into
241		:param dict data:   Data to upsert
242		:param bool commit: Whether to commit after executing the query
243		:param tuple constraints: This tuple may contain the columns that should be used as a
244								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
245		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
246		"""
247		if constraints is None:
248			constraints = []
249
250		# escape identifiers
251		identifiers = [sql.Identifier(column) for column in data.keys()]
252		identifiers.insert(0, sql.Identifier(table))
253
254		# prepare parameter replacements
255		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
256		protoquery += " ON CONFLICT"
257
258		if constraints:
259			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
260			identifiers.extend([sql.Identifier(column) for column in constraints])
261
262		protoquery += " DO UPDATE SET "
263		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
264		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
265
266		query = sql.SQL(protoquery).format(*identifiers)
267		replacements = (tuple(data.values()),)
268
269		cursor = self.get_cursor()
270		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
271		cursor.execute(query, replacements)
272
273		if commit:
274			self.commit()
275
276		result = cursor.rowcount
277		cursor.close()
278		return result
279
280	def fetchall(self, query, *args):
281		"""
282		Fetch all rows for a query
283
284		:param string query:  Query
285		:param args: Replacement values
286		:param commit:  Commit transaction after query?
287		:return list: The result rows, as a list
288		"""
289		cursor = self.get_cursor()
290		self.log.debug("Executing query: %s" % cursor.mogrify(query, *args))
291		self.query(query, cursor=cursor, *args)
292
293		try:
294			result = cursor.fetchall()
295		except AttributeError:
296			result = []
297		except psycopg2.ProgrammingError as e:
298			# there seems to be a bug with psycopg2 where it sometimes raises
299			# this for empty query results even though it shouldn't. this
300			# doesn't seem to indicate an actual problem so we catch the
301			# exception and return an empty list
302			self.rollback()
303			result = []
304			self.log.warning("Caught ProgrammingError: %s" % e)
305
306		cursor.close()
307		self.commit()
308
309		return result
310
311	def fetchone(self, query, *args):
312		"""
313		Fetch one result row
314
315		:param string query: Query
316		:param args: Replacement values
317		:param commit:  Commit transaction after query?
318		:return: The row, as a dictionary, or None if there were no rows
319		"""
320		cursor = self.get_cursor()
321		self.query(query, cursor=cursor, *args)
322
323		try:
324			result = cursor.fetchone()
325		except psycopg2.ProgrammingError as e:
326			# no results to fetch
327			self.rollback()
328			result = None
329			self.log.warning("Caught ProgrammingError: %s" % e)
330
331		cursor.close()
332		self.commit()
333
334		return result
335
336	def fetchall_interruptable(self, queue, query, *args):
337		"""
338		Fetch all rows for a query, allowing for interruption
339
340		Before running the query, a job is queued to cancel the query after a
341		set amount of time. The query is expected to complete before this
342		timeout. If the backend is interrupted, however, that job will be
343		executed immediately, to cancel the database query. If this happens, a
344		DatabaseQueryInterruptedException will be raised, but the database
345		object will otherwise remain useable.
346
347		Note that in the event that the cancellation job is run, all queries
348		for this instance of the database object will be cancelled. However,
349		there should never be more than one active query per connection within
350		4CAT.
351
352		:param JobQueue queue:  A job queue object, required to schedule the
353		query cancellation job
354		:param str query:  SQL query
355		:param list args:  Replacement variables
356		:param commit:  Commit transaction after query?
357		:return list:  A list of rows, as dictionaries
358		"""
359		# schedule a job that will cancel the query we're about to make
360		pid = self.connection.get_backend_pid()
361		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
362
363		# make the query
364		cursor = self.get_cursor()
365		self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args))
366
367		try:
368			self.query(query, cursor=cursor, *args)
369		except psycopg2.extensions.QueryCanceledError:
370			# interrupted with cancellation worker (or manually)
371			self.log.debug("Query in connection %s was interrupted..." % self.appname)
372			self.rollback()
373			cursor.close()
374			raise DatabaseQueryInterruptedException("Interrupted while querying database")
375
376		# collect results
377		try:
378			result = cursor.fetchall()
379		except AttributeError as e:
380			result = []
381		except psycopg2.ProgrammingError as e:
382			result = []
383			self.log.warning("Caught ProgrammingError: %s" % e)
384
385		# clean up cancelling job when we have the data
386		self.interruptable_job.finish()
387		self.interruptable_job = None
388
389		cursor.close()
390		self.commit()
391
392		return result
393
394
395	def commit(self):
396		"""
397		Commit the current transaction
398
399		This is required for UPDATE etc to stick.
400		"""
401		self.connection.commit()
402
403	def rollback(self):
404		"""
405		Roll back the current transaction
406		"""
407		self.connection.rollback()
408
409	def close(self):
410		"""
411		Close connection
412
413		Running queries after this is probably a bad idea!
414		"""
415		self.connection.close()
416
417	def get_cursor(self):
418		"""
419		Get a new cursor
420
421		Re-using cursors seems to give issues when using per-thread
422		connections, so simply instantiate a new one each time
423
424		:return: Cursor
425		"""
426		return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

Simple database handler

Offers a number of abstraction methods that limit how much SQL one is required to write. Also makes the database connection mostly multithreading proof by instantiating a new cursor for each query (and closing it afterwards)

Database( logger, dbname=None, user=None, password=None, host=None, port=None, appname=None)
32	def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None):
33		"""
34		Set up database connection
35
36		:param logger:  Logger instance
37		:param dbname:  Database name
38		:param user:  Database username
39		:param password:  Database password
40		:param host:  Database server address
41		:param port:  Database port
42		:param appname:  App name, mostly useful to trace connections in pg_stat_activity
43		"""
44		self.appname = "4CAT" if not appname else "4CAT-%s" % appname
45
46		self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname)
47		self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
48		self.log = logger
49
50		if self.log is None:
51			self.log = logging
52
53		self.commit()

Set up database connection

Parameters
  • logger: Logger instance
  • dbname: Database name
  • user: Database username
  • password: Database password
  • host: Database server address
  • port: Database port
  • appname: App name, mostly useful to trace connections in pg_stat_activity
cursor = None
log = None
appname = ''
interrupted = False
interruptable_timeout = 86400
interruptable_job = None
connection
def query(self, query, replacements=None, cursor=None):
55	def query(self, query, replacements=None, cursor=None):
56		"""
57		Execute a query
58
59		:param string query: Query
60		:param args: Replacement values
61		:param cursor: Cursor to use. Default - use common cursor
62		:return None:
63		"""
64		if not cursor:
65			cursor = self.get_cursor()
66
67		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
68
69		return cursor.execute(query, replacements)

Execute a query

Parameters
  • string query: Query
  • args: Replacement values
  • cursor: Cursor to use. Default - use common cursor
Returns
def execute(self, query, replacements=None):
71	def execute(self, query, replacements=None):
72		"""
73		Execute a query, and commit afterwards
74
75		This is required for UPDATE/INSERT/DELETE/etc to stick
76		:param string query:  Query
77		:param replacements: Replacement values
78		"""
79		cursor = self.get_cursor()
80
81		self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))
82		cursor.execute(query, replacements)
83		self.commit()
84
85		cursor.close()

Execute a query, and commit afterwards

This is required for UPDATE/INSERT/DELETE/etc to stick

Parameters
  • string query: Query
  • replacements: Replacement values
def execute_many(self, query, commit=True, replacements=None):
 87	def execute_many(self, query, commit=True, replacements=None):
 88		"""
 89		Execute a query multiple times, each time with different values
 90
 91		This makes it particularly suitable for INSERT queries, but other types
 92		of query using VALUES are possible too.
 93
 94		:param string query:  Query
 95		:param replacements: A list of replacement values
 96		:param commit:  Commit transaction after query?
 97		"""
 98		cursor = self.get_cursor()
 99		execute_values(cursor, query, replacements)
100		cursor.close()
101		if commit:
102			self.commit()

Execute a query multiple times, each time with different values

This makes it particularly suitable for INSERT queries, but other types of query using VALUES are possible too.

Parameters
  • string query: Query
  • replacements: A list of replacement values
  • commit: Commit transaction after query?
def update(self, table, data, where=None, commit=True):
104	def update(self, table, data, where=None, commit=True):
105		"""
106		Update a database record
107
108		:param string table:  Table to update
109		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
110		:param dict data:  Data to set, Column => Value
111		:param bool commit:  Whether to commit after executing the query
112
113		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
114		"""
115		if where is None:
116			where = {}
117
118		# build query
119		identifiers = [sql.Identifier(column) for column in data.keys()]
120		identifiers.insert(0, sql.Identifier(table))
121		replacements = list(data.values())
122
123		query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data])
124		if where:
125			query += " WHERE " + " AND ".join(["{} = %s" for column in where])
126			for column in where.keys():
127				identifiers.append(sql.Identifier(column))
128				replacements.append(where[column])
129
130		query = sql.SQL(query).format(*identifiers)
131
132		cursor = self.get_cursor()
133		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
134		cursor.execute(query, replacements)
135
136		if commit:
137			self.commit()
138
139		result = cursor.rowcount
140		cursor.close()
141		return result

Update a database record

Parameters
  • string table: Table to update
  • dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
  • dict data: Data to set, Column => Value
  • bool commit: Whether to commit after executing the query
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def delete(self, table, where, commit=True):
143	def delete(self, table, where, commit=True):
144		"""
145		Delete a database record
146
147		:param string table:  Table to delete from
148		:param dict where:  Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
149		:param bool commit:  Whether to commit after executing the query
150
151		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
152		"""
153		where_sql = []
154		replacements = []
155		for column in where.keys():
156			if type(where[column]) in (set, tuple, list):
157				where_sql.append("{} IN %s")
158				replacements.append(tuple(where[column]))
159			else:
160				where_sql.append("{} = %s")
161				replacements.append(where[column])
162
163		# build query
164		identifiers = [sql.Identifier(column) for column in where.keys()]
165		identifiers.insert(0, sql.Identifier(table))
166		query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers)
167
168		cursor = self.get_cursor()
169		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
170		cursor.execute(query, replacements)
171
172		if commit:
173			self.commit()
174
175		result = cursor.rowcount
176		cursor.close()
177		return result

Delete a database record

Parameters
  • string table: Table to delete from
  • dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
  • bool commit: Whether to commit after executing the query
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def insert( self, table, data, commit=True, safe=False, constraints=None, return_field=''):
179	def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""):
180		"""
181		Create database record
182
183		:param string table:  Table to insert record into
184		:param dict data:   Data to insert
185		:param bool commit: Whether to commit after executing the query
186		:param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not
187						  insert the row and no error is thrown when the insert violates a unique index or other constraint
188		:param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a
189								  constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
190		:param str return_field: If not empty or None, this makes the method
191		return this field of the inserted row, instead of the number of
192		affected rows, with `RETURNING`.
193		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
194		"""
195		if constraints is None:
196			constraints = []
197
198		# escape identifiers
199		identifiers = [sql.Identifier(column) for column in data.keys()]
200		identifiers.insert(0, sql.Identifier(table))
201
202		# construct ON NOTHING bit of query
203		if safe:
204			safe_bit = " ON CONFLICT "
205			if constraints:
206				safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")"
207				identifiers.extend([sql.Identifier(column) for column in constraints])
208			safe_bit += " DO NOTHING"
209		else:
210			safe_bit = ""
211
212		# prepare parameter replacements
213		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit
214
215		if return_field:
216			protoquery += " RETURNING {}"
217			identifiers.append(sql.Identifier(return_field))
218
219		query = sql.SQL(protoquery).format(*identifiers)
220		replacements = (tuple(data.values()),)
221
222		cursor = self.get_cursor()
223		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
224		cursor.execute(query, replacements)
225
226		if commit:
227			self.commit()
228
229		result = cursor.rowcount if not return_field else cursor.fetchone()[return_field]
230		cursor.close()
231		return result

Create database record

Parameters
  • string table: Table to insert record into
  • dict data: Data to insert
  • bool commit: Whether to commit after executing the query
  • bool safe: If set to True, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not insert the row and no error is thrown when the insert violates a unique index or other constraint
  • tuple constraints: If safe is True, this tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING
  • str return_field: If not empty or None, this makes the method return this field of the inserted row, instead of the number of affected rows, with RETURNING.
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def upsert(self, table, data, commit=True, constraints=None):
233	def upsert(self, table, data, commit=True, constraints=None):
234		"""
235		Create or update database record
236
237		If the record could not be inserted because of a constraint, the
238		constraining record is updated instead.
239
240		:param string table:  Table to upsert record into
241		:param dict data:   Data to upsert
242		:param bool commit: Whether to commit after executing the query
243		:param tuple constraints: This tuple may contain the columns that should be used as a
244								  constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
245		:return int: Number of affected rows. Note that this may be unreliable if `commit` is `False`
246		"""
247		if constraints is None:
248			constraints = []
249
250		# escape identifiers
251		identifiers = [sql.Identifier(column) for column in data.keys()]
252		identifiers.insert(0, sql.Identifier(table))
253
254		# prepare parameter replacements
255		protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()])
256		protoquery += " ON CONFLICT"
257
258		if constraints:
259			protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")"
260			identifiers.extend([sql.Identifier(column) for column in constraints])
261
262		protoquery += " DO UPDATE SET "
263		protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()])
264		identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()])))
265
266		query = sql.SQL(protoquery).format(*identifiers)
267		replacements = (tuple(data.values()),)
268
269		cursor = self.get_cursor()
270		self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements))
271		cursor.execute(query, replacements)
272
273		if commit:
274			self.commit()
275
276		result = cursor.rowcount
277		cursor.close()
278		return result

Create or update database record

If the record could not be inserted because of a constraint, the constraining record is updated instead.

Parameters
  • string table: Table to upsert record into
  • dict data: Data to upsert
  • bool commit: Whether to commit after executing the query
  • tuple constraints: This tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
Returns

Number of affected rows. Note that this may be unreliable if commit is False

def fetchall(self, query, *args):
280	def fetchall(self, query, *args):
281		"""
282		Fetch all rows for a query
283
284		:param string query:  Query
285		:param args: Replacement values
286		:param commit:  Commit transaction after query?
287		:return list: The result rows, as a list
288		"""
289		cursor = self.get_cursor()
290		self.log.debug("Executing query: %s" % cursor.mogrify(query, *args))
291		self.query(query, cursor=cursor, *args)
292
293		try:
294			result = cursor.fetchall()
295		except AttributeError:
296			result = []
297		except psycopg2.ProgrammingError as e:
298			# there seems to be a bug with psycopg2 where it sometimes raises
299			# this for empty query results even though it shouldn't. this
300			# doesn't seem to indicate an actual problem so we catch the
301			# exception and return an empty list
302			self.rollback()
303			result = []
304			self.log.warning("Caught ProgrammingError: %s" % e)
305
306		cursor.close()
307		self.commit()
308
309		return result

Fetch all rows for a query

Parameters
  • string query: Query
  • args: Replacement values
  • commit: Commit transaction after query?
Returns

The result rows, as a list

def fetchone(self, query, *args):
311	def fetchone(self, query, *args):
312		"""
313		Fetch one result row
314
315		:param string query: Query
316		:param args: Replacement values
317		:param commit:  Commit transaction after query?
318		:return: The row, as a dictionary, or None if there were no rows
319		"""
320		cursor = self.get_cursor()
321		self.query(query, cursor=cursor, *args)
322
323		try:
324			result = cursor.fetchone()
325		except psycopg2.ProgrammingError as e:
326			# no results to fetch
327			self.rollback()
328			result = None
329			self.log.warning("Caught ProgrammingError: %s" % e)
330
331		cursor.close()
332		self.commit()
333
334		return result

Fetch one result row

Parameters
  • string query: Query
  • args: Replacement values
  • commit: Commit transaction after query?
Returns

The row, as a dictionary, or None if there were no rows

def fetchall_interruptable(self, queue, query, *args):
336	def fetchall_interruptable(self, queue, query, *args):
337		"""
338		Fetch all rows for a query, allowing for interruption
339
340		Before running the query, a job is queued to cancel the query after a
341		set amount of time. The query is expected to complete before this
342		timeout. If the backend is interrupted, however, that job will be
343		executed immediately, to cancel the database query. If this happens, a
344		DatabaseQueryInterruptedException will be raised, but the database
345		object will otherwise remain useable.
346
347		Note that in the event that the cancellation job is run, all queries
348		for this instance of the database object will be cancelled. However,
349		there should never be more than one active query per connection within
350		4CAT.
351
352		:param JobQueue queue:  A job queue object, required to schedule the
353		query cancellation job
354		:param str query:  SQL query
355		:param list args:  Replacement variables
356		:param commit:  Commit transaction after query?
357		:return list:  A list of rows, as dictionaries
358		"""
359		# schedule a job that will cancel the query we're about to make
360		pid = self.connection.get_backend_pid()
361		self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout)
362
363		# make the query
364		cursor = self.get_cursor()
365		self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args))
366
367		try:
368			self.query(query, cursor=cursor, *args)
369		except psycopg2.extensions.QueryCanceledError:
370			# interrupted with cancellation worker (or manually)
371			self.log.debug("Query in connection %s was interrupted..." % self.appname)
372			self.rollback()
373			cursor.close()
374			raise DatabaseQueryInterruptedException("Interrupted while querying database")
375
376		# collect results
377		try:
378			result = cursor.fetchall()
379		except AttributeError as e:
380			result = []
381		except psycopg2.ProgrammingError as e:
382			result = []
383			self.log.warning("Caught ProgrammingError: %s" % e)
384
385		# clean up cancelling job when we have the data
386		self.interruptable_job.finish()
387		self.interruptable_job = None
388
389		cursor.close()
390		self.commit()
391
392		return result

Fetch all rows for a query, allowing for interruption

Before running the query, a job is queued to cancel the query after a set amount of time. The query is expected to complete before this timeout. If the backend is interrupted, however, that job will be executed immediately, to cancel the database query. If this happens, a DatabaseQueryInterruptedException will be raised, but the database object will otherwise remain useable.

Note that in the event that the cancellation job is run, all queries for this instance of the database object will be cancelled. However, there should never be more than one active query per connection within 4CAT.

Parameters
  • JobQueue queue: A job queue object, required to schedule the query cancellation job
  • str query: SQL query
  • list args: Replacement variables
  • commit: Commit transaction after query?
Returns

A list of rows, as dictionaries

def commit(self):
395	def commit(self):
396		"""
397		Commit the current transaction
398
399		This is required for UPDATE etc to stick.
400		"""
401		self.connection.commit()

Commit the current transaction

This is required for UPDATE etc to stick.

def rollback(self):
403	def rollback(self):
404		"""
405		Roll back the current transaction
406		"""
407		self.connection.rollback()

Roll back the current transaction

def close(self):
409	def close(self):
410		"""
411		Close connection
412
413		Running queries after this is probably a bad idea!
414		"""
415		self.connection.close()

Close connection

Running queries after this is probably a bad idea!

def get_cursor(self):
417	def get_cursor(self):
418		"""
419		Get a new cursor
420
421		Re-using cursors seems to give issues when using per-thread
422		connections, so simply instantiate a new one each time
423
424		:return: Cursor
425		"""
426		return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

Get a new cursor

Re-using cursors seems to give issues when using per-thread connections, so simply instantiate a new one each time

Returns

Cursor