common.lib.database
Database wrapper
1""" 2Database wrapper 3""" 4import itertools 5import psycopg2.extras 6import psycopg2 7import logging 8import time 9 10from psycopg2 import sql 11from psycopg2.extras import execute_values 12 13from common.lib.exceptions import DatabaseQueryInterruptedException 14 15class Database: 16 """ 17 Simple database handler 18 19 Offers a number of abstraction methods that limit how much SQL one is 20 required to write. Also makes the database connection mostly multithreading 21 proof by instantiating a new cursor for each query (and closing it afterwards) 22 """ 23 cursor = None 24 log = None 25 appname="" 26 27 interrupted = False 28 interruptable_timeout = 86400 # if a query takes this long, it should be cancelled. see also fetchall_interruptable() 29 interruptable_job = None 30 31 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 32 """ 33 Set up database connection 34 35 :param logger: Logger instance 36 :param dbname: Database name 37 :param user: Database username 38 :param password: Database password 39 :param host: Database server address 40 :param port: Database port 41 :param appname: App name, mostly useful to trace connections in pg_stat_activity 42 """ 43 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 44 45 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 46 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 47 self.log = logger 48 49 if self.log is None: 50 self.log = logging 51 52 def reconnect(self, tries=3, wait=10): 53 """ 54 Reconnect to the database 55 56 :param int tries: Number of tries to reconnect 57 :param int wait: Time to wait between tries (first try is immediate) 58 """ 59 for i in range(tries): 60 try: 61 self.connection = psycopg2.connect(dbname=self.connection.info.dbname, 62 user=self.connection.info.user, 63 password=self.connection.info.password, 64 host=self.connection.info.host, 65 port=self.connection.info.port, 66 application_name=self.appname) 67 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 68 return 69 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 70 self.log.warning(f"Database connection closed. Reconnecting...\n{e}") 71 time.sleep(wait) 72 self.log.error("Failed to reconnect to database after %d tries" % tries) 73 74 def _execute_query(self, query, replacements=None, cursor=None): 75 """ 76 Execute a query 77 78 Simple wrapper to get a cursor to execute a query with. Do not call 79 from outside this class - use `execute()` instead. 80 81 :param string query: Query 82 :param args: Replacement values 83 :param cursor: Cursor to use. Default - use common cursor 84 :return None: 85 """ 86 if not cursor: 87 cursor = self.get_cursor() 88 89 self.log.debug("Executing query %s" % cursor.mogrify(query, replacements)) 90 try: 91 cursor.execute(query, replacements) 92 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 93 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 94 self.reconnect() 95 cursor = self.get_cursor() 96 cursor.execute(query, replacements) 97 return cursor 98 99 def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True): 100 """ 101 Execute a query, and commit afterwards 102 103 This is required for UPDATE/INSERT/DELETE/etc to stick! 104 105 :param string query: Query 106 :param cursor: Cursor to use. By default, use the result of 107 `get_cursor()`. 108 :param replacements: Replacement values 109 :param bool commit: Commit transaction after query? 110 :param bool close_cursor: Close cursor after query? 111 """ 112 cursor = self._execute_query(query, replacements, cursor) 113 114 if commit: 115 self.commit() 116 117 rowcount = cursor.rowcount 118 119 if close_cursor: 120 cursor.close() 121 122 return rowcount 123 124 def execute_many(self, query, commit=True, replacements=None): 125 """ 126 Execute a query multiple times, each time with different values 127 128 This makes it particularly suitable for INSERT queries, but other types 129 of query using VALUES are possible too. 130 131 :param string query: Query 132 :param replacements: A list of replacement values 133 :param commit: Commit transaction after query? 134 """ 135 cursor = self.get_cursor() 136 try: 137 execute_values(cursor, query, replacements) 138 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 139 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 140 self.reconnect() 141 cursor = self.get_cursor() 142 execute_values(cursor, query, replacements) 143 144 cursor.close() 145 if commit: 146 self.commit() 147 148 def update(self, table, data, where=None, commit=True): 149 """ 150 Update a database record 151 152 :param string table: Table to update 153 :param dict data: Data to set, Column => Value 154 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 155 :param bool commit: Whether to commit after executing the query 156 157 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 158 """ 159 if where is None: 160 where = {} 161 162 # build query 163 identifiers = [sql.Identifier(column) for column in data.keys()] 164 identifiers.insert(0, sql.Identifier(table)) 165 replacements = list(data.values()) 166 167 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 168 if where: 169 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 170 for column in where.keys(): 171 identifiers.append(sql.Identifier(column)) 172 replacements.append(where[column]) 173 174 query = sql.SQL(query).format(*identifiers) 175 176 rowcount = self.execute(query, replacements=replacements, commit=commit) 177 return rowcount 178 179 def delete(self, table, where, commit=True): 180 """ 181 Delete a database record 182 183 :param string table: Table to delete from 184 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 185 :param bool commit: Whether to commit after executing the query 186 187 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 188 """ 189 where_sql = [] 190 replacements = [] 191 for column in where.keys(): 192 if type(where[column]) in (set, tuple, list): 193 where_sql.append("{} IN %s") 194 replacements.append(tuple(where[column])) 195 else: 196 where_sql.append("{} = %s") 197 replacements.append(where[column]) 198 199 # build query 200 identifiers = [sql.Identifier(column) for column in where.keys()] 201 identifiers.insert(0, sql.Identifier(table)) 202 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 203 204 rowcount = self.execute(query, replacements=replacements, commit=commit) 205 return rowcount 206 207 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 208 """ 209 Create database record 210 211 :param string table: Table to insert record into 212 :param dict data: Data to insert 213 :param bool commit: Whether to commit after executing the query 214 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 215 insert the row and no error is thrown when the insert violates a unique index or other constraint 216 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 217 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 218 :param str return_field: If not empty or None, this makes the method 219 return this field of the inserted row, instead of the number of 220 affected rows, with `RETURNING`. 221 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 222 """ 223 if constraints is None: 224 constraints = [] 225 226 # escape identifiers 227 identifiers = [sql.Identifier(column) for column in data.keys()] 228 identifiers.insert(0, sql.Identifier(table)) 229 230 # construct ON NOTHING bit of query 231 if safe: 232 safe_bit = " ON CONFLICT " 233 if constraints: 234 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 235 identifiers.extend([sql.Identifier(column) for column in constraints]) 236 safe_bit += " DO NOTHING" 237 else: 238 safe_bit = "" 239 240 # prepare parameter replacements 241 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 242 243 if return_field: 244 protoquery += " RETURNING {}" 245 identifiers.append(sql.Identifier(return_field)) 246 247 query = sql.SQL(protoquery).format(*identifiers) 248 replacements = (tuple(data.values()),) 249 250 cursor = self.get_cursor() 251 rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False) 252 253 result = rowcount if not return_field else cursor.fetchone()[return_field] 254 cursor.close() 255 return result 256 257 def upsert(self, table, data, commit=True, constraints=None): 258 """ 259 Create or update database record 260 261 If the record could not be inserted because of a constraint, the 262 constraining record is updated instead. 263 264 :param string table: Table to upsert record into 265 :param dict data: Data to upsert 266 :param bool commit: Whether to commit after executing the query 267 :param tuple constraints: This tuple may contain the columns that should be used as a 268 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 269 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 270 """ 271 if constraints is None: 272 constraints = [] 273 274 # escape identifiers 275 identifiers = [sql.Identifier(column) for column in data.keys()] 276 identifiers.insert(0, sql.Identifier(table)) 277 278 # prepare parameter replacements 279 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 280 protoquery += " ON CONFLICT" 281 282 if constraints: 283 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 284 identifiers.extend([sql.Identifier(column) for column in constraints]) 285 286 protoquery += " DO UPDATE SET " 287 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 288 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 289 290 query = sql.SQL(protoquery).format(*identifiers) 291 replacements = (tuple(data.values()),) 292 293 rowcount = self.execute(query, replacements=replacements, commit=commit) 294 return rowcount 295 296 def fetchall(self, query, *args): 297 """ 298 Fetch all rows for a query 299 300 :param string query: Query 301 :param args: Replacement values 302 :param commit: Commit transaction after query? 303 :return list: The result rows, as a list 304 """ 305 cursor = self._execute_query(query, *args) 306 307 try: 308 result = cursor.fetchall() 309 except AttributeError: 310 result = [] 311 except psycopg2.ProgrammingError as e: 312 # there seems to be a bug with psycopg2 where it sometimes raises 313 # this for empty query results even though it shouldn't. this 314 # doesn't seem to indicate an actual problem so we catch the 315 # exception and return an empty list 316 self.rollback() 317 result = [] 318 self.log.warning("Caught ProgrammingError: %s" % e) 319 320 cursor.close() 321 self.commit() 322 323 return result 324 325 def fetchone(self, query, *args): 326 """ 327 Fetch one result row 328 329 :param string query: Query 330 :param args: Replacement values 331 :param commit: Commit transaction after query? 332 :return: The row, as a dictionary, or None if there were no rows 333 """ 334 cursor = self._execute_query(query, *args) 335 336 try: 337 result = cursor.fetchone() 338 except psycopg2.ProgrammingError as e: 339 # no results to fetch 340 self.rollback() 341 result = None 342 self.log.warning("Caught ProgrammingError: %s" % e) 343 344 cursor.close() 345 self.commit() 346 347 return result 348 349 def fetchall_interruptable(self, queue, query, *args): 350 """ 351 Fetch all rows for a query, allowing for interruption 352 353 Before running the query, a job is queued to cancel the query after a 354 set amount of time. The query is expected to complete before this 355 timeout. If the backend is interrupted, however, that job will be 356 executed immediately, to cancel the database query. If this happens, a 357 DatabaseQueryInterruptedException will be raised, but the database 358 object will otherwise remain useable. 359 360 Note that in the event that the cancellation job is run, all queries 361 for this instance of the database object will be cancelled. However, 362 there should never be more than one active query per connection within 363 4CAT. 364 365 :param JobQueue queue: A job queue object, required to schedule the 366 query cancellation job 367 :param str query: SQL query 368 :param list args: Replacement variables 369 :param commit: Commit transaction after query? 370 :return list: A list of rows, as dictionaries 371 """ 372 # schedule a job that will cancel the query we're about to make 373 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 374 375 # run the query 376 cursor = self.get_cursor() 377 try: 378 cursor = self._execute_query(query, cursor=cursor, *args) 379 except psycopg2.extensions.QueryCanceledError: 380 # interrupted with cancellation worker (or manually) 381 self.log.debug("Query in connection %s was interrupted..." % self.appname) 382 self.rollback() 383 cursor.close() 384 raise DatabaseQueryInterruptedException("Interrupted while querying database") 385 386 # collect results 387 try: 388 result = cursor.fetchall() 389 except AttributeError: 390 result = [] 391 except psycopg2.ProgrammingError as e: 392 result = [] 393 self.log.warning("Caught ProgrammingError: %s" % e) 394 395 # clean up cancelling job when we have the data 396 self.interruptable_job.finish() 397 self.interruptable_job = None 398 399 cursor.close() 400 self.commit() 401 402 return result 403 404 405 def commit(self): 406 """ 407 Commit the current transaction 408 409 This is required for UPDATE etc to stick. 410 """ 411 self.connection.commit() 412 413 def rollback(self): 414 """ 415 Roll back the current transaction 416 """ 417 self.connection.rollback() 418 419 def close(self): 420 """ 421 Close connection 422 423 Running queries after this is probably a bad idea! 424 """ 425 self.connection.close() 426 427 def get_cursor(self): 428 """ 429 Get a new cursor 430 431 Re-using cursors seems to give issues when using per-thread 432 connections, so simply instantiate a new one each time 433 434 :return: Cursor 435 """ 436 try: 437 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 438 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 439 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 440 self.reconnect() 441 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
16class Database: 17 """ 18 Simple database handler 19 20 Offers a number of abstraction methods that limit how much SQL one is 21 required to write. Also makes the database connection mostly multithreading 22 proof by instantiating a new cursor for each query (and closing it afterwards) 23 """ 24 cursor = None 25 log = None 26 appname="" 27 28 interrupted = False 29 interruptable_timeout = 86400 # if a query takes this long, it should be cancelled. see also fetchall_interruptable() 30 interruptable_job = None 31 32 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 33 """ 34 Set up database connection 35 36 :param logger: Logger instance 37 :param dbname: Database name 38 :param user: Database username 39 :param password: Database password 40 :param host: Database server address 41 :param port: Database port 42 :param appname: App name, mostly useful to trace connections in pg_stat_activity 43 """ 44 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 45 46 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 47 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 48 self.log = logger 49 50 if self.log is None: 51 self.log = logging 52 53 def reconnect(self, tries=3, wait=10): 54 """ 55 Reconnect to the database 56 57 :param int tries: Number of tries to reconnect 58 :param int wait: Time to wait between tries (first try is immediate) 59 """ 60 for i in range(tries): 61 try: 62 self.connection = psycopg2.connect(dbname=self.connection.info.dbname, 63 user=self.connection.info.user, 64 password=self.connection.info.password, 65 host=self.connection.info.host, 66 port=self.connection.info.port, 67 application_name=self.appname) 68 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 69 return 70 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 71 self.log.warning(f"Database connection closed. Reconnecting...\n{e}") 72 time.sleep(wait) 73 self.log.error("Failed to reconnect to database after %d tries" % tries) 74 75 def _execute_query(self, query, replacements=None, cursor=None): 76 """ 77 Execute a query 78 79 Simple wrapper to get a cursor to execute a query with. Do not call 80 from outside this class - use `execute()` instead. 81 82 :param string query: Query 83 :param args: Replacement values 84 :param cursor: Cursor to use. Default - use common cursor 85 :return None: 86 """ 87 if not cursor: 88 cursor = self.get_cursor() 89 90 self.log.debug("Executing query %s" % cursor.mogrify(query, replacements)) 91 try: 92 cursor.execute(query, replacements) 93 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 94 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 95 self.reconnect() 96 cursor = self.get_cursor() 97 cursor.execute(query, replacements) 98 return cursor 99 100 def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True): 101 """ 102 Execute a query, and commit afterwards 103 104 This is required for UPDATE/INSERT/DELETE/etc to stick! 105 106 :param string query: Query 107 :param cursor: Cursor to use. By default, use the result of 108 `get_cursor()`. 109 :param replacements: Replacement values 110 :param bool commit: Commit transaction after query? 111 :param bool close_cursor: Close cursor after query? 112 """ 113 cursor = self._execute_query(query, replacements, cursor) 114 115 if commit: 116 self.commit() 117 118 rowcount = cursor.rowcount 119 120 if close_cursor: 121 cursor.close() 122 123 return rowcount 124 125 def execute_many(self, query, commit=True, replacements=None): 126 """ 127 Execute a query multiple times, each time with different values 128 129 This makes it particularly suitable for INSERT queries, but other types 130 of query using VALUES are possible too. 131 132 :param string query: Query 133 :param replacements: A list of replacement values 134 :param commit: Commit transaction after query? 135 """ 136 cursor = self.get_cursor() 137 try: 138 execute_values(cursor, query, replacements) 139 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 140 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 141 self.reconnect() 142 cursor = self.get_cursor() 143 execute_values(cursor, query, replacements) 144 145 cursor.close() 146 if commit: 147 self.commit() 148 149 def update(self, table, data, where=None, commit=True): 150 """ 151 Update a database record 152 153 :param string table: Table to update 154 :param dict data: Data to set, Column => Value 155 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 156 :param bool commit: Whether to commit after executing the query 157 158 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 159 """ 160 if where is None: 161 where = {} 162 163 # build query 164 identifiers = [sql.Identifier(column) for column in data.keys()] 165 identifiers.insert(0, sql.Identifier(table)) 166 replacements = list(data.values()) 167 168 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 169 if where: 170 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 171 for column in where.keys(): 172 identifiers.append(sql.Identifier(column)) 173 replacements.append(where[column]) 174 175 query = sql.SQL(query).format(*identifiers) 176 177 rowcount = self.execute(query, replacements=replacements, commit=commit) 178 return rowcount 179 180 def delete(self, table, where, commit=True): 181 """ 182 Delete a database record 183 184 :param string table: Table to delete from 185 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 186 :param bool commit: Whether to commit after executing the query 187 188 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 189 """ 190 where_sql = [] 191 replacements = [] 192 for column in where.keys(): 193 if type(where[column]) in (set, tuple, list): 194 where_sql.append("{} IN %s") 195 replacements.append(tuple(where[column])) 196 else: 197 where_sql.append("{} = %s") 198 replacements.append(where[column]) 199 200 # build query 201 identifiers = [sql.Identifier(column) for column in where.keys()] 202 identifiers.insert(0, sql.Identifier(table)) 203 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 204 205 rowcount = self.execute(query, replacements=replacements, commit=commit) 206 return rowcount 207 208 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 209 """ 210 Create database record 211 212 :param string table: Table to insert record into 213 :param dict data: Data to insert 214 :param bool commit: Whether to commit after executing the query 215 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 216 insert the row and no error is thrown when the insert violates a unique index or other constraint 217 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 218 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 219 :param str return_field: If not empty or None, this makes the method 220 return this field of the inserted row, instead of the number of 221 affected rows, with `RETURNING`. 222 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 223 """ 224 if constraints is None: 225 constraints = [] 226 227 # escape identifiers 228 identifiers = [sql.Identifier(column) for column in data.keys()] 229 identifiers.insert(0, sql.Identifier(table)) 230 231 # construct ON NOTHING bit of query 232 if safe: 233 safe_bit = " ON CONFLICT " 234 if constraints: 235 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 236 identifiers.extend([sql.Identifier(column) for column in constraints]) 237 safe_bit += " DO NOTHING" 238 else: 239 safe_bit = "" 240 241 # prepare parameter replacements 242 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 243 244 if return_field: 245 protoquery += " RETURNING {}" 246 identifiers.append(sql.Identifier(return_field)) 247 248 query = sql.SQL(protoquery).format(*identifiers) 249 replacements = (tuple(data.values()),) 250 251 cursor = self.get_cursor() 252 rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False) 253 254 result = rowcount if not return_field else cursor.fetchone()[return_field] 255 cursor.close() 256 return result 257 258 def upsert(self, table, data, commit=True, constraints=None): 259 """ 260 Create or update database record 261 262 If the record could not be inserted because of a constraint, the 263 constraining record is updated instead. 264 265 :param string table: Table to upsert record into 266 :param dict data: Data to upsert 267 :param bool commit: Whether to commit after executing the query 268 :param tuple constraints: This tuple may contain the columns that should be used as a 269 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 270 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 271 """ 272 if constraints is None: 273 constraints = [] 274 275 # escape identifiers 276 identifiers = [sql.Identifier(column) for column in data.keys()] 277 identifiers.insert(0, sql.Identifier(table)) 278 279 # prepare parameter replacements 280 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 281 protoquery += " ON CONFLICT" 282 283 if constraints: 284 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 285 identifiers.extend([sql.Identifier(column) for column in constraints]) 286 287 protoquery += " DO UPDATE SET " 288 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 289 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 290 291 query = sql.SQL(protoquery).format(*identifiers) 292 replacements = (tuple(data.values()),) 293 294 rowcount = self.execute(query, replacements=replacements, commit=commit) 295 return rowcount 296 297 def fetchall(self, query, *args): 298 """ 299 Fetch all rows for a query 300 301 :param string query: Query 302 :param args: Replacement values 303 :param commit: Commit transaction after query? 304 :return list: The result rows, as a list 305 """ 306 cursor = self._execute_query(query, *args) 307 308 try: 309 result = cursor.fetchall() 310 except AttributeError: 311 result = [] 312 except psycopg2.ProgrammingError as e: 313 # there seems to be a bug with psycopg2 where it sometimes raises 314 # this for empty query results even though it shouldn't. this 315 # doesn't seem to indicate an actual problem so we catch the 316 # exception and return an empty list 317 self.rollback() 318 result = [] 319 self.log.warning("Caught ProgrammingError: %s" % e) 320 321 cursor.close() 322 self.commit() 323 324 return result 325 326 def fetchone(self, query, *args): 327 """ 328 Fetch one result row 329 330 :param string query: Query 331 :param args: Replacement values 332 :param commit: Commit transaction after query? 333 :return: The row, as a dictionary, or None if there were no rows 334 """ 335 cursor = self._execute_query(query, *args) 336 337 try: 338 result = cursor.fetchone() 339 except psycopg2.ProgrammingError as e: 340 # no results to fetch 341 self.rollback() 342 result = None 343 self.log.warning("Caught ProgrammingError: %s" % e) 344 345 cursor.close() 346 self.commit() 347 348 return result 349 350 def fetchall_interruptable(self, queue, query, *args): 351 """ 352 Fetch all rows for a query, allowing for interruption 353 354 Before running the query, a job is queued to cancel the query after a 355 set amount of time. The query is expected to complete before this 356 timeout. If the backend is interrupted, however, that job will be 357 executed immediately, to cancel the database query. If this happens, a 358 DatabaseQueryInterruptedException will be raised, but the database 359 object will otherwise remain useable. 360 361 Note that in the event that the cancellation job is run, all queries 362 for this instance of the database object will be cancelled. However, 363 there should never be more than one active query per connection within 364 4CAT. 365 366 :param JobQueue queue: A job queue object, required to schedule the 367 query cancellation job 368 :param str query: SQL query 369 :param list args: Replacement variables 370 :param commit: Commit transaction after query? 371 :return list: A list of rows, as dictionaries 372 """ 373 # schedule a job that will cancel the query we're about to make 374 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 375 376 # run the query 377 cursor = self.get_cursor() 378 try: 379 cursor = self._execute_query(query, cursor=cursor, *args) 380 except psycopg2.extensions.QueryCanceledError: 381 # interrupted with cancellation worker (or manually) 382 self.log.debug("Query in connection %s was interrupted..." % self.appname) 383 self.rollback() 384 cursor.close() 385 raise DatabaseQueryInterruptedException("Interrupted while querying database") 386 387 # collect results 388 try: 389 result = cursor.fetchall() 390 except AttributeError: 391 result = [] 392 except psycopg2.ProgrammingError as e: 393 result = [] 394 self.log.warning("Caught ProgrammingError: %s" % e) 395 396 # clean up cancelling job when we have the data 397 self.interruptable_job.finish() 398 self.interruptable_job = None 399 400 cursor.close() 401 self.commit() 402 403 return result 404 405 406 def commit(self): 407 """ 408 Commit the current transaction 409 410 This is required for UPDATE etc to stick. 411 """ 412 self.connection.commit() 413 414 def rollback(self): 415 """ 416 Roll back the current transaction 417 """ 418 self.connection.rollback() 419 420 def close(self): 421 """ 422 Close connection 423 424 Running queries after this is probably a bad idea! 425 """ 426 self.connection.close() 427 428 def get_cursor(self): 429 """ 430 Get a new cursor 431 432 Re-using cursors seems to give issues when using per-thread 433 connections, so simply instantiate a new one each time 434 435 :return: Cursor 436 """ 437 try: 438 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 439 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 440 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 441 self.reconnect() 442 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
Simple database handler
Offers a number of abstraction methods that limit how much SQL one is required to write. Also makes the database connection mostly multithreading proof by instantiating a new cursor for each query (and closing it afterwards)
32 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 33 """ 34 Set up database connection 35 36 :param logger: Logger instance 37 :param dbname: Database name 38 :param user: Database username 39 :param password: Database password 40 :param host: Database server address 41 :param port: Database port 42 :param appname: App name, mostly useful to trace connections in pg_stat_activity 43 """ 44 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 45 46 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 47 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 48 self.log = logger 49 50 if self.log is None: 51 self.log = logging
Set up database connection
Parameters
- logger: Logger instance
- dbname: Database name
- user: Database username
- password: Database password
- host: Database server address
- port: Database port
- appname: App name, mostly useful to trace connections in pg_stat_activity
53 def reconnect(self, tries=3, wait=10): 54 """ 55 Reconnect to the database 56 57 :param int tries: Number of tries to reconnect 58 :param int wait: Time to wait between tries (first try is immediate) 59 """ 60 for i in range(tries): 61 try: 62 self.connection = psycopg2.connect(dbname=self.connection.info.dbname, 63 user=self.connection.info.user, 64 password=self.connection.info.password, 65 host=self.connection.info.host, 66 port=self.connection.info.port, 67 application_name=self.appname) 68 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 69 return 70 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 71 self.log.warning(f"Database connection closed. Reconnecting...\n{e}") 72 time.sleep(wait) 73 self.log.error("Failed to reconnect to database after %d tries" % tries)
Reconnect to the database
:param int tries: Number of tries to reconnect
Parameters
- int wait: Time to wait between tries (first try is immediate)
100 def execute(self, query, replacements=None, commit=True, cursor=None, close_cursor=True): 101 """ 102 Execute a query, and commit afterwards 103 104 This is required for UPDATE/INSERT/DELETE/etc to stick! 105 106 :param string query: Query 107 :param cursor: Cursor to use. By default, use the result of 108 `get_cursor()`. 109 :param replacements: Replacement values 110 :param bool commit: Commit transaction after query? 111 :param bool close_cursor: Close cursor after query? 112 """ 113 cursor = self._execute_query(query, replacements, cursor) 114 115 if commit: 116 self.commit() 117 118 rowcount = cursor.rowcount 119 120 if close_cursor: 121 cursor.close() 122 123 return rowcount
Execute a query, and commit afterwards
This is required for UPDATE/INSERT/DELETE/etc to stick!
Parameters
- string query: Query
- cursor: Cursor to use. By default, use the result of
get_cursor()
. - replacements: Replacement values
- bool commit: Commit transaction after query?
- bool close_cursor: Close cursor after query?
125 def execute_many(self, query, commit=True, replacements=None): 126 """ 127 Execute a query multiple times, each time with different values 128 129 This makes it particularly suitable for INSERT queries, but other types 130 of query using VALUES are possible too. 131 132 :param string query: Query 133 :param replacements: A list of replacement values 134 :param commit: Commit transaction after query? 135 """ 136 cursor = self.get_cursor() 137 try: 138 execute_values(cursor, query, replacements) 139 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 140 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 141 self.reconnect() 142 cursor = self.get_cursor() 143 execute_values(cursor, query, replacements) 144 145 cursor.close() 146 if commit: 147 self.commit()
Execute a query multiple times, each time with different values
This makes it particularly suitable for INSERT queries, but other types of query using VALUES are possible too.
Parameters
- string query: Query
- replacements: A list of replacement values
- commit: Commit transaction after query?
149 def update(self, table, data, where=None, commit=True): 150 """ 151 Update a database record 152 153 :param string table: Table to update 154 :param dict data: Data to set, Column => Value 155 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 156 :param bool commit: Whether to commit after executing the query 157 158 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 159 """ 160 if where is None: 161 where = {} 162 163 # build query 164 identifiers = [sql.Identifier(column) for column in data.keys()] 165 identifiers.insert(0, sql.Identifier(table)) 166 replacements = list(data.values()) 167 168 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 169 if where: 170 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 171 for column in where.keys(): 172 identifiers.append(sql.Identifier(column)) 173 replacements.append(where[column]) 174 175 query = sql.SQL(query).format(*identifiers) 176 177 rowcount = self.execute(query, replacements=replacements, commit=commit) 178 return rowcount
Update a database record
Parameters
- string table: Table to update
- dict data: Data to set, Column => Value
- dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
- bool commit: Whether to commit after executing the query
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
180 def delete(self, table, where, commit=True): 181 """ 182 Delete a database record 183 184 :param string table: Table to delete from 185 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 186 :param bool commit: Whether to commit after executing the query 187 188 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 189 """ 190 where_sql = [] 191 replacements = [] 192 for column in where.keys(): 193 if type(where[column]) in (set, tuple, list): 194 where_sql.append("{} IN %s") 195 replacements.append(tuple(where[column])) 196 else: 197 where_sql.append("{} = %s") 198 replacements.append(where[column]) 199 200 # build query 201 identifiers = [sql.Identifier(column) for column in where.keys()] 202 identifiers.insert(0, sql.Identifier(table)) 203 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 204 205 rowcount = self.execute(query, replacements=replacements, commit=commit) 206 return rowcount
Delete a database record
Parameters
- string table: Table to delete from
- dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
- bool commit: Whether to commit after executing the query
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
208 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 209 """ 210 Create database record 211 212 :param string table: Table to insert record into 213 :param dict data: Data to insert 214 :param bool commit: Whether to commit after executing the query 215 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 216 insert the row and no error is thrown when the insert violates a unique index or other constraint 217 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 218 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 219 :param str return_field: If not empty or None, this makes the method 220 return this field of the inserted row, instead of the number of 221 affected rows, with `RETURNING`. 222 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 223 """ 224 if constraints is None: 225 constraints = [] 226 227 # escape identifiers 228 identifiers = [sql.Identifier(column) for column in data.keys()] 229 identifiers.insert(0, sql.Identifier(table)) 230 231 # construct ON NOTHING bit of query 232 if safe: 233 safe_bit = " ON CONFLICT " 234 if constraints: 235 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 236 identifiers.extend([sql.Identifier(column) for column in constraints]) 237 safe_bit += " DO NOTHING" 238 else: 239 safe_bit = "" 240 241 # prepare parameter replacements 242 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 243 244 if return_field: 245 protoquery += " RETURNING {}" 246 identifiers.append(sql.Identifier(return_field)) 247 248 query = sql.SQL(protoquery).format(*identifiers) 249 replacements = (tuple(data.values()),) 250 251 cursor = self.get_cursor() 252 rowcount = self.execute(query, replacements=replacements, cursor=cursor, commit=commit, close_cursor=False) 253 254 result = rowcount if not return_field else cursor.fetchone()[return_field] 255 cursor.close() 256 return result
Create database record
Parameters
- string table: Table to insert record into
- dict data: Data to insert
- bool commit: Whether to commit after executing the query
- bool safe: If set to
True
, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not insert the row and no error is thrown when the insert violates a unique index or other constraint - tuple constraints: If
safe
isTrue
, this tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING - str return_field: If not empty or None, this makes the method
return this field of the inserted row, instead of the number of
affected rows, with
RETURNING
.
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
258 def upsert(self, table, data, commit=True, constraints=None): 259 """ 260 Create or update database record 261 262 If the record could not be inserted because of a constraint, the 263 constraining record is updated instead. 264 265 :param string table: Table to upsert record into 266 :param dict data: Data to upsert 267 :param bool commit: Whether to commit after executing the query 268 :param tuple constraints: This tuple may contain the columns that should be used as a 269 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 270 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 271 """ 272 if constraints is None: 273 constraints = [] 274 275 # escape identifiers 276 identifiers = [sql.Identifier(column) for column in data.keys()] 277 identifiers.insert(0, sql.Identifier(table)) 278 279 # prepare parameter replacements 280 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 281 protoquery += " ON CONFLICT" 282 283 if constraints: 284 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 285 identifiers.extend([sql.Identifier(column) for column in constraints]) 286 287 protoquery += " DO UPDATE SET " 288 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 289 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 290 291 query = sql.SQL(protoquery).format(*identifiers) 292 replacements = (tuple(data.values()),) 293 294 rowcount = self.execute(query, replacements=replacements, commit=commit) 295 return rowcount
Create or update database record
If the record could not be inserted because of a constraint, the constraining record is updated instead.
Parameters
- string table: Table to upsert record into
- dict data: Data to upsert
- bool commit: Whether to commit after executing the query
- tuple constraints: This tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
297 def fetchall(self, query, *args): 298 """ 299 Fetch all rows for a query 300 301 :param string query: Query 302 :param args: Replacement values 303 :param commit: Commit transaction after query? 304 :return list: The result rows, as a list 305 """ 306 cursor = self._execute_query(query, *args) 307 308 try: 309 result = cursor.fetchall() 310 except AttributeError: 311 result = [] 312 except psycopg2.ProgrammingError as e: 313 # there seems to be a bug with psycopg2 where it sometimes raises 314 # this for empty query results even though it shouldn't. this 315 # doesn't seem to indicate an actual problem so we catch the 316 # exception and return an empty list 317 self.rollback() 318 result = [] 319 self.log.warning("Caught ProgrammingError: %s" % e) 320 321 cursor.close() 322 self.commit() 323 324 return result
Fetch all rows for a query
Parameters
- string query: Query
- args: Replacement values
- commit: Commit transaction after query?
Returns
The result rows, as a list
326 def fetchone(self, query, *args): 327 """ 328 Fetch one result row 329 330 :param string query: Query 331 :param args: Replacement values 332 :param commit: Commit transaction after query? 333 :return: The row, as a dictionary, or None if there were no rows 334 """ 335 cursor = self._execute_query(query, *args) 336 337 try: 338 result = cursor.fetchone() 339 except psycopg2.ProgrammingError as e: 340 # no results to fetch 341 self.rollback() 342 result = None 343 self.log.warning("Caught ProgrammingError: %s" % e) 344 345 cursor.close() 346 self.commit() 347 348 return result
Fetch one result row
Parameters
- string query: Query
- args: Replacement values
- commit: Commit transaction after query?
Returns
The row, as a dictionary, or None if there were no rows
350 def fetchall_interruptable(self, queue, query, *args): 351 """ 352 Fetch all rows for a query, allowing for interruption 353 354 Before running the query, a job is queued to cancel the query after a 355 set amount of time. The query is expected to complete before this 356 timeout. If the backend is interrupted, however, that job will be 357 executed immediately, to cancel the database query. If this happens, a 358 DatabaseQueryInterruptedException will be raised, but the database 359 object will otherwise remain useable. 360 361 Note that in the event that the cancellation job is run, all queries 362 for this instance of the database object will be cancelled. However, 363 there should never be more than one active query per connection within 364 4CAT. 365 366 :param JobQueue queue: A job queue object, required to schedule the 367 query cancellation job 368 :param str query: SQL query 369 :param list args: Replacement variables 370 :param commit: Commit transaction after query? 371 :return list: A list of rows, as dictionaries 372 """ 373 # schedule a job that will cancel the query we're about to make 374 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 375 376 # run the query 377 cursor = self.get_cursor() 378 try: 379 cursor = self._execute_query(query, cursor=cursor, *args) 380 except psycopg2.extensions.QueryCanceledError: 381 # interrupted with cancellation worker (or manually) 382 self.log.debug("Query in connection %s was interrupted..." % self.appname) 383 self.rollback() 384 cursor.close() 385 raise DatabaseQueryInterruptedException("Interrupted while querying database") 386 387 # collect results 388 try: 389 result = cursor.fetchall() 390 except AttributeError: 391 result = [] 392 except psycopg2.ProgrammingError as e: 393 result = [] 394 self.log.warning("Caught ProgrammingError: %s" % e) 395 396 # clean up cancelling job when we have the data 397 self.interruptable_job.finish() 398 self.interruptable_job = None 399 400 cursor.close() 401 self.commit() 402 403 return result
Fetch all rows for a query, allowing for interruption
Before running the query, a job is queued to cancel the query after a set amount of time. The query is expected to complete before this timeout. If the backend is interrupted, however, that job will be executed immediately, to cancel the database query. If this happens, a DatabaseQueryInterruptedException will be raised, but the database object will otherwise remain useable.
Note that in the event that the cancellation job is run, all queries for this instance of the database object will be cancelled. However, there should never be more than one active query per connection within 4CAT.
Parameters
- JobQueue queue: A job queue object, required to schedule the query cancellation job
- str query: SQL query
- list args: Replacement variables
- commit: Commit transaction after query?
Returns
A list of rows, as dictionaries
406 def commit(self): 407 """ 408 Commit the current transaction 409 410 This is required for UPDATE etc to stick. 411 """ 412 self.connection.commit()
Commit the current transaction
This is required for UPDATE etc to stick.
414 def rollback(self): 415 """ 416 Roll back the current transaction 417 """ 418 self.connection.rollback()
Roll back the current transaction
420 def close(self): 421 """ 422 Close connection 423 424 Running queries after this is probably a bad idea! 425 """ 426 self.connection.close()
Close connection
Running queries after this is probably a bad idea!
428 def get_cursor(self): 429 """ 430 Get a new cursor 431 432 Re-using cursors seems to give issues when using per-thread 433 connections, so simply instantiate a new one each time 434 435 :return: Cursor 436 """ 437 try: 438 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 439 except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: 440 self.log.warning(f"Database Exception: {e}\nReconnecting and retrying query...") 441 self.reconnect() 442 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
Get a new cursor
Re-using cursors seems to give issues when using per-thread connections, so simply instantiate a new one each time
Returns
Cursor