common.lib.database
Database wrapper
1""" 2Database wrapper 3""" 4import itertools 5import psycopg2.extras 6import psycopg2 7import logging 8import time 9 10from psycopg2 import sql 11from psycopg2.extras import execute_values 12 13from common.lib.exceptions import DatabaseQueryInterruptedException 14 15class Database: 16 """ 17 Simple database handler 18 19 Offers a number of abstraction methods that limit how much SQL one is 20 required to write. Also makes the database connection mostly multithreading 21 proof by instantiating a new cursor for each query (and closing it afterwards) 22 """ 23 cursor = None 24 log = None 25 appname="" 26 27 interrupted = False 28 interruptable_timeout = 86400 # if a query takes this long, it should be cancelled. see also fetchall_interruptable() 29 interruptable_job = None 30 31 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 32 """ 33 Set up database connection 34 35 :param logger: Logger instance 36 :param dbname: Database name 37 :param user: Database username 38 :param password: Database password 39 :param host: Database server address 40 :param port: Database port 41 :param appname: App name, mostly useful to trace connections in pg_stat_activity 42 """ 43 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 44 45 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 46 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 47 self.log = logger 48 49 if self.log is None: 50 self.log = logging 51 52 self.commit() 53 54 def query(self, query, replacements=None, cursor=None): 55 """ 56 Execute a query 57 58 :param string query: Query 59 :param args: Replacement values 60 :param cursor: Cursor to use. Default - use common cursor 61 :return None: 62 """ 63 if not cursor: 64 cursor = self.get_cursor() 65 66 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 67 68 return cursor.execute(query, replacements) 69 70 def execute(self, query, replacements=None): 71 """ 72 Execute a query, and commit afterwards 73 74 This is required for UPDATE/INSERT/DELETE/etc to stick 75 :param string query: Query 76 :param replacements: Replacement values 77 """ 78 cursor = self.get_cursor() 79 80 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 81 cursor.execute(query, replacements) 82 self.commit() 83 84 cursor.close() 85 86 def execute_many(self, query, commit=True, replacements=None): 87 """ 88 Execute a query multiple times, each time with different values 89 90 This makes it particularly suitable for INSERT queries, but other types 91 of query using VALUES are possible too. 92 93 :param string query: Query 94 :param replacements: A list of replacement values 95 :param commit: Commit transaction after query? 96 """ 97 cursor = self.get_cursor() 98 execute_values(cursor, query, replacements) 99 cursor.close() 100 if commit: 101 self.commit() 102 103 def update(self, table, data, where=None, commit=True): 104 """ 105 Update a database record 106 107 :param string table: Table to update 108 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 109 :param dict data: Data to set, Column => Value 110 :param bool commit: Whether to commit after executing the query 111 112 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 113 """ 114 if where is None: 115 where = {} 116 117 # build query 118 identifiers = [sql.Identifier(column) for column in data.keys()] 119 identifiers.insert(0, sql.Identifier(table)) 120 replacements = list(data.values()) 121 122 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 123 if where: 124 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 125 for column in where.keys(): 126 identifiers.append(sql.Identifier(column)) 127 replacements.append(where[column]) 128 129 query = sql.SQL(query).format(*identifiers) 130 131 cursor = self.get_cursor() 132 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 133 cursor.execute(query, replacements) 134 135 if commit: 136 self.commit() 137 138 result = cursor.rowcount 139 cursor.close() 140 return result 141 142 def delete(self, table, where, commit=True): 143 """ 144 Delete a database record 145 146 :param string table: Table to delete from 147 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 148 :param bool commit: Whether to commit after executing the query 149 150 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 151 """ 152 where_sql = [] 153 replacements = [] 154 for column in where.keys(): 155 if type(where[column]) in (set, tuple, list): 156 where_sql.append("{} IN %s") 157 replacements.append(tuple(where[column])) 158 else: 159 where_sql.append("{} = %s") 160 replacements.append(where[column]) 161 162 # build query 163 identifiers = [sql.Identifier(column) for column in where.keys()] 164 identifiers.insert(0, sql.Identifier(table)) 165 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 166 167 cursor = self.get_cursor() 168 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 169 cursor.execute(query, replacements) 170 171 if commit: 172 self.commit() 173 174 result = cursor.rowcount 175 cursor.close() 176 return result 177 178 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 179 """ 180 Create database record 181 182 :param string table: Table to insert record into 183 :param dict data: Data to insert 184 :param bool commit: Whether to commit after executing the query 185 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 186 insert the row and no error is thrown when the insert violates a unique index or other constraint 187 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 188 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 189 :param str return_field: If not empty or None, this makes the method 190 return this field of the inserted row, instead of the number of 191 affected rows, with `RETURNING`. 192 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 193 """ 194 if constraints is None: 195 constraints = [] 196 197 # escape identifiers 198 identifiers = [sql.Identifier(column) for column in data.keys()] 199 identifiers.insert(0, sql.Identifier(table)) 200 201 # construct ON NOTHING bit of query 202 if safe: 203 safe_bit = " ON CONFLICT " 204 if constraints: 205 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 206 identifiers.extend([sql.Identifier(column) for column in constraints]) 207 safe_bit += " DO NOTHING" 208 else: 209 safe_bit = "" 210 211 # prepare parameter replacements 212 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 213 214 if return_field: 215 protoquery += " RETURNING {}" 216 identifiers.append(sql.Identifier(return_field)) 217 218 query = sql.SQL(protoquery).format(*identifiers) 219 replacements = (tuple(data.values()),) 220 221 cursor = self.get_cursor() 222 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 223 cursor.execute(query, replacements) 224 225 if commit: 226 self.commit() 227 228 result = cursor.rowcount if not return_field else cursor.fetchone()[return_field] 229 cursor.close() 230 return result 231 232 def upsert(self, table, data, commit=True, constraints=None): 233 """ 234 Create or update database record 235 236 If the record could not be inserted because of a constraint, the 237 constraining record is updated instead. 238 239 :param string table: Table to upsert record into 240 :param dict data: Data to upsert 241 :param bool commit: Whether to commit after executing the query 242 :param tuple constraints: This tuple may contain the columns that should be used as a 243 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 244 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 245 """ 246 if constraints is None: 247 constraints = [] 248 249 # escape identifiers 250 identifiers = [sql.Identifier(column) for column in data.keys()] 251 identifiers.insert(0, sql.Identifier(table)) 252 253 # prepare parameter replacements 254 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 255 protoquery += " ON CONFLICT" 256 257 if constraints: 258 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 259 identifiers.extend([sql.Identifier(column) for column in constraints]) 260 261 protoquery += " DO UPDATE SET " 262 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 263 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 264 265 query = sql.SQL(protoquery).format(*identifiers) 266 replacements = (tuple(data.values()),) 267 268 cursor = self.get_cursor() 269 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 270 cursor.execute(query, replacements) 271 272 if commit: 273 self.commit() 274 275 result = cursor.rowcount 276 cursor.close() 277 return result 278 279 def fetchall(self, query, *args): 280 """ 281 Fetch all rows for a query 282 283 :param string query: Query 284 :param args: Replacement values 285 :param commit: Commit transaction after query? 286 :return list: The result rows, as a list 287 """ 288 cursor = self.get_cursor() 289 self.log.debug("Executing query: %s" % cursor.mogrify(query, *args)) 290 self.query(query, cursor=cursor, *args) 291 292 try: 293 result = cursor.fetchall() 294 except AttributeError: 295 result = [] 296 except psycopg2.ProgrammingError as e: 297 # there seems to be a bug with psycopg2 where it sometimes raises 298 # this for empty query results even though it shouldn't. this 299 # doesn't seem to indicate an actual problem so we catch the 300 # exception and return an empty list 301 self.rollback() 302 result = [] 303 self.log.warning("Caught ProgrammingError: %s" % e) 304 305 cursor.close() 306 self.commit() 307 308 return result 309 310 def fetchone(self, query, *args): 311 """ 312 Fetch one result row 313 314 :param string query: Query 315 :param args: Replacement values 316 :param commit: Commit transaction after query? 317 :return: The row, as a dictionary, or None if there were no rows 318 """ 319 cursor = self.get_cursor() 320 self.query(query, cursor=cursor, *args) 321 322 try: 323 result = cursor.fetchone() 324 except psycopg2.ProgrammingError as e: 325 # no results to fetch 326 self.rollback() 327 result = None 328 self.log.warning("Caught ProgrammingError: %s" % e) 329 330 cursor.close() 331 self.commit() 332 333 return result 334 335 def fetchall_interruptable(self, queue, query, *args): 336 """ 337 Fetch all rows for a query, allowing for interruption 338 339 Before running the query, a job is queued to cancel the query after a 340 set amount of time. The query is expected to complete before this 341 timeout. If the backend is interrupted, however, that job will be 342 executed immediately, to cancel the database query. If this happens, a 343 DatabaseQueryInterruptedException will be raised, but the database 344 object will otherwise remain useable. 345 346 Note that in the event that the cancellation job is run, all queries 347 for this instance of the database object will be cancelled. However, 348 there should never be more than one active query per connection within 349 4CAT. 350 351 :param JobQueue queue: A job queue object, required to schedule the 352 query cancellation job 353 :param str query: SQL query 354 :param list args: Replacement variables 355 :param commit: Commit transaction after query? 356 :return list: A list of rows, as dictionaries 357 """ 358 # schedule a job that will cancel the query we're about to make 359 pid = self.connection.get_backend_pid() 360 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 361 362 # make the query 363 cursor = self.get_cursor() 364 self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args)) 365 366 try: 367 self.query(query, cursor=cursor, *args) 368 except psycopg2.extensions.QueryCanceledError: 369 # interrupted with cancellation worker (or manually) 370 self.log.debug("Query in connection %s was interrupted..." % self.appname) 371 self.rollback() 372 cursor.close() 373 raise DatabaseQueryInterruptedException("Interrupted while querying database") 374 375 # collect results 376 try: 377 result = cursor.fetchall() 378 except AttributeError as e: 379 result = [] 380 except psycopg2.ProgrammingError as e: 381 result = [] 382 self.log.warning("Caught ProgrammingError: %s" % e) 383 384 # clean up cancelling job when we have the data 385 self.interruptable_job.finish() 386 self.interruptable_job = None 387 388 cursor.close() 389 self.commit() 390 391 return result 392 393 394 def commit(self): 395 """ 396 Commit the current transaction 397 398 This is required for UPDATE etc to stick. 399 """ 400 self.connection.commit() 401 402 def rollback(self): 403 """ 404 Roll back the current transaction 405 """ 406 self.connection.rollback() 407 408 def close(self): 409 """ 410 Close connection 411 412 Running queries after this is probably a bad idea! 413 """ 414 self.connection.close() 415 416 def get_cursor(self): 417 """ 418 Get a new cursor 419 420 Re-using cursors seems to give issues when using per-thread 421 connections, so simply instantiate a new one each time 422 423 :return: Cursor 424 """ 425 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
16class Database: 17 """ 18 Simple database handler 19 20 Offers a number of abstraction methods that limit how much SQL one is 21 required to write. Also makes the database connection mostly multithreading 22 proof by instantiating a new cursor for each query (and closing it afterwards) 23 """ 24 cursor = None 25 log = None 26 appname="" 27 28 interrupted = False 29 interruptable_timeout = 86400 # if a query takes this long, it should be cancelled. see also fetchall_interruptable() 30 interruptable_job = None 31 32 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 33 """ 34 Set up database connection 35 36 :param logger: Logger instance 37 :param dbname: Database name 38 :param user: Database username 39 :param password: Database password 40 :param host: Database server address 41 :param port: Database port 42 :param appname: App name, mostly useful to trace connections in pg_stat_activity 43 """ 44 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 45 46 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 47 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 48 self.log = logger 49 50 if self.log is None: 51 self.log = logging 52 53 self.commit() 54 55 def query(self, query, replacements=None, cursor=None): 56 """ 57 Execute a query 58 59 :param string query: Query 60 :param args: Replacement values 61 :param cursor: Cursor to use. Default - use common cursor 62 :return None: 63 """ 64 if not cursor: 65 cursor = self.get_cursor() 66 67 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 68 69 return cursor.execute(query, replacements) 70 71 def execute(self, query, replacements=None): 72 """ 73 Execute a query, and commit afterwards 74 75 This is required for UPDATE/INSERT/DELETE/etc to stick 76 :param string query: Query 77 :param replacements: Replacement values 78 """ 79 cursor = self.get_cursor() 80 81 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 82 cursor.execute(query, replacements) 83 self.commit() 84 85 cursor.close() 86 87 def execute_many(self, query, commit=True, replacements=None): 88 """ 89 Execute a query multiple times, each time with different values 90 91 This makes it particularly suitable for INSERT queries, but other types 92 of query using VALUES are possible too. 93 94 :param string query: Query 95 :param replacements: A list of replacement values 96 :param commit: Commit transaction after query? 97 """ 98 cursor = self.get_cursor() 99 execute_values(cursor, query, replacements) 100 cursor.close() 101 if commit: 102 self.commit() 103 104 def update(self, table, data, where=None, commit=True): 105 """ 106 Update a database record 107 108 :param string table: Table to update 109 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 110 :param dict data: Data to set, Column => Value 111 :param bool commit: Whether to commit after executing the query 112 113 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 114 """ 115 if where is None: 116 where = {} 117 118 # build query 119 identifiers = [sql.Identifier(column) for column in data.keys()] 120 identifiers.insert(0, sql.Identifier(table)) 121 replacements = list(data.values()) 122 123 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 124 if where: 125 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 126 for column in where.keys(): 127 identifiers.append(sql.Identifier(column)) 128 replacements.append(where[column]) 129 130 query = sql.SQL(query).format(*identifiers) 131 132 cursor = self.get_cursor() 133 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 134 cursor.execute(query, replacements) 135 136 if commit: 137 self.commit() 138 139 result = cursor.rowcount 140 cursor.close() 141 return result 142 143 def delete(self, table, where, commit=True): 144 """ 145 Delete a database record 146 147 :param string table: Table to delete from 148 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 149 :param bool commit: Whether to commit after executing the query 150 151 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 152 """ 153 where_sql = [] 154 replacements = [] 155 for column in where.keys(): 156 if type(where[column]) in (set, tuple, list): 157 where_sql.append("{} IN %s") 158 replacements.append(tuple(where[column])) 159 else: 160 where_sql.append("{} = %s") 161 replacements.append(where[column]) 162 163 # build query 164 identifiers = [sql.Identifier(column) for column in where.keys()] 165 identifiers.insert(0, sql.Identifier(table)) 166 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 167 168 cursor = self.get_cursor() 169 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 170 cursor.execute(query, replacements) 171 172 if commit: 173 self.commit() 174 175 result = cursor.rowcount 176 cursor.close() 177 return result 178 179 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 180 """ 181 Create database record 182 183 :param string table: Table to insert record into 184 :param dict data: Data to insert 185 :param bool commit: Whether to commit after executing the query 186 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 187 insert the row and no error is thrown when the insert violates a unique index or other constraint 188 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 189 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 190 :param str return_field: If not empty or None, this makes the method 191 return this field of the inserted row, instead of the number of 192 affected rows, with `RETURNING`. 193 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 194 """ 195 if constraints is None: 196 constraints = [] 197 198 # escape identifiers 199 identifiers = [sql.Identifier(column) for column in data.keys()] 200 identifiers.insert(0, sql.Identifier(table)) 201 202 # construct ON NOTHING bit of query 203 if safe: 204 safe_bit = " ON CONFLICT " 205 if constraints: 206 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 207 identifiers.extend([sql.Identifier(column) for column in constraints]) 208 safe_bit += " DO NOTHING" 209 else: 210 safe_bit = "" 211 212 # prepare parameter replacements 213 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 214 215 if return_field: 216 protoquery += " RETURNING {}" 217 identifiers.append(sql.Identifier(return_field)) 218 219 query = sql.SQL(protoquery).format(*identifiers) 220 replacements = (tuple(data.values()),) 221 222 cursor = self.get_cursor() 223 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 224 cursor.execute(query, replacements) 225 226 if commit: 227 self.commit() 228 229 result = cursor.rowcount if not return_field else cursor.fetchone()[return_field] 230 cursor.close() 231 return result 232 233 def upsert(self, table, data, commit=True, constraints=None): 234 """ 235 Create or update database record 236 237 If the record could not be inserted because of a constraint, the 238 constraining record is updated instead. 239 240 :param string table: Table to upsert record into 241 :param dict data: Data to upsert 242 :param bool commit: Whether to commit after executing the query 243 :param tuple constraints: This tuple may contain the columns that should be used as a 244 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 245 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 246 """ 247 if constraints is None: 248 constraints = [] 249 250 # escape identifiers 251 identifiers = [sql.Identifier(column) for column in data.keys()] 252 identifiers.insert(0, sql.Identifier(table)) 253 254 # prepare parameter replacements 255 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 256 protoquery += " ON CONFLICT" 257 258 if constraints: 259 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 260 identifiers.extend([sql.Identifier(column) for column in constraints]) 261 262 protoquery += " DO UPDATE SET " 263 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 264 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 265 266 query = sql.SQL(protoquery).format(*identifiers) 267 replacements = (tuple(data.values()),) 268 269 cursor = self.get_cursor() 270 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 271 cursor.execute(query, replacements) 272 273 if commit: 274 self.commit() 275 276 result = cursor.rowcount 277 cursor.close() 278 return result 279 280 def fetchall(self, query, *args): 281 """ 282 Fetch all rows for a query 283 284 :param string query: Query 285 :param args: Replacement values 286 :param commit: Commit transaction after query? 287 :return list: The result rows, as a list 288 """ 289 cursor = self.get_cursor() 290 self.log.debug("Executing query: %s" % cursor.mogrify(query, *args)) 291 self.query(query, cursor=cursor, *args) 292 293 try: 294 result = cursor.fetchall() 295 except AttributeError: 296 result = [] 297 except psycopg2.ProgrammingError as e: 298 # there seems to be a bug with psycopg2 where it sometimes raises 299 # this for empty query results even though it shouldn't. this 300 # doesn't seem to indicate an actual problem so we catch the 301 # exception and return an empty list 302 self.rollback() 303 result = [] 304 self.log.warning("Caught ProgrammingError: %s" % e) 305 306 cursor.close() 307 self.commit() 308 309 return result 310 311 def fetchone(self, query, *args): 312 """ 313 Fetch one result row 314 315 :param string query: Query 316 :param args: Replacement values 317 :param commit: Commit transaction after query? 318 :return: The row, as a dictionary, or None if there were no rows 319 """ 320 cursor = self.get_cursor() 321 self.query(query, cursor=cursor, *args) 322 323 try: 324 result = cursor.fetchone() 325 except psycopg2.ProgrammingError as e: 326 # no results to fetch 327 self.rollback() 328 result = None 329 self.log.warning("Caught ProgrammingError: %s" % e) 330 331 cursor.close() 332 self.commit() 333 334 return result 335 336 def fetchall_interruptable(self, queue, query, *args): 337 """ 338 Fetch all rows for a query, allowing for interruption 339 340 Before running the query, a job is queued to cancel the query after a 341 set amount of time. The query is expected to complete before this 342 timeout. If the backend is interrupted, however, that job will be 343 executed immediately, to cancel the database query. If this happens, a 344 DatabaseQueryInterruptedException will be raised, but the database 345 object will otherwise remain useable. 346 347 Note that in the event that the cancellation job is run, all queries 348 for this instance of the database object will be cancelled. However, 349 there should never be more than one active query per connection within 350 4CAT. 351 352 :param JobQueue queue: A job queue object, required to schedule the 353 query cancellation job 354 :param str query: SQL query 355 :param list args: Replacement variables 356 :param commit: Commit transaction after query? 357 :return list: A list of rows, as dictionaries 358 """ 359 # schedule a job that will cancel the query we're about to make 360 pid = self.connection.get_backend_pid() 361 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 362 363 # make the query 364 cursor = self.get_cursor() 365 self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args)) 366 367 try: 368 self.query(query, cursor=cursor, *args) 369 except psycopg2.extensions.QueryCanceledError: 370 # interrupted with cancellation worker (or manually) 371 self.log.debug("Query in connection %s was interrupted..." % self.appname) 372 self.rollback() 373 cursor.close() 374 raise DatabaseQueryInterruptedException("Interrupted while querying database") 375 376 # collect results 377 try: 378 result = cursor.fetchall() 379 except AttributeError as e: 380 result = [] 381 except psycopg2.ProgrammingError as e: 382 result = [] 383 self.log.warning("Caught ProgrammingError: %s" % e) 384 385 # clean up cancelling job when we have the data 386 self.interruptable_job.finish() 387 self.interruptable_job = None 388 389 cursor.close() 390 self.commit() 391 392 return result 393 394 395 def commit(self): 396 """ 397 Commit the current transaction 398 399 This is required for UPDATE etc to stick. 400 """ 401 self.connection.commit() 402 403 def rollback(self): 404 """ 405 Roll back the current transaction 406 """ 407 self.connection.rollback() 408 409 def close(self): 410 """ 411 Close connection 412 413 Running queries after this is probably a bad idea! 414 """ 415 self.connection.close() 416 417 def get_cursor(self): 418 """ 419 Get a new cursor 420 421 Re-using cursors seems to give issues when using per-thread 422 connections, so simply instantiate a new one each time 423 424 :return: Cursor 425 """ 426 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
Simple database handler
Offers a number of abstraction methods that limit how much SQL one is required to write. Also makes the database connection mostly multithreading proof by instantiating a new cursor for each query (and closing it afterwards)
32 def __init__(self, logger, dbname=None, user=None, password=None, host=None, port=None, appname=None): 33 """ 34 Set up database connection 35 36 :param logger: Logger instance 37 :param dbname: Database name 38 :param user: Database username 39 :param password: Database password 40 :param host: Database server address 41 :param port: Database port 42 :param appname: App name, mostly useful to trace connections in pg_stat_activity 43 """ 44 self.appname = "4CAT" if not appname else "4CAT-%s" % appname 45 46 self.connection = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port, application_name=self.appname) 47 self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) 48 self.log = logger 49 50 if self.log is None: 51 self.log = logging 52 53 self.commit()
Set up database connection
Parameters
- logger: Logger instance
- dbname: Database name
- user: Database username
- password: Database password
- host: Database server address
- port: Database port
- appname: App name, mostly useful to trace connections in pg_stat_activity
55 def query(self, query, replacements=None, cursor=None): 56 """ 57 Execute a query 58 59 :param string query: Query 60 :param args: Replacement values 61 :param cursor: Cursor to use. Default - use common cursor 62 :return None: 63 """ 64 if not cursor: 65 cursor = self.get_cursor() 66 67 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 68 69 return cursor.execute(query, replacements)
Execute a query
Parameters
- string query: Query
- args: Replacement values
- cursor: Cursor to use. Default - use common cursor
Returns
71 def execute(self, query, replacements=None): 72 """ 73 Execute a query, and commit afterwards 74 75 This is required for UPDATE/INSERT/DELETE/etc to stick 76 :param string query: Query 77 :param replacements: Replacement values 78 """ 79 cursor = self.get_cursor() 80 81 self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements)) 82 cursor.execute(query, replacements) 83 self.commit() 84 85 cursor.close()
Execute a query, and commit afterwards
This is required for UPDATE/INSERT/DELETE/etc to stick
Parameters
- string query: Query
- replacements: Replacement values
87 def execute_many(self, query, commit=True, replacements=None): 88 """ 89 Execute a query multiple times, each time with different values 90 91 This makes it particularly suitable for INSERT queries, but other types 92 of query using VALUES are possible too. 93 94 :param string query: Query 95 :param replacements: A list of replacement values 96 :param commit: Commit transaction after query? 97 """ 98 cursor = self.get_cursor() 99 execute_values(cursor, query, replacements) 100 cursor.close() 101 if commit: 102 self.commit()
Execute a query multiple times, each time with different values
This makes it particularly suitable for INSERT queries, but other types of query using VALUES are possible too.
Parameters
- string query: Query
- replacements: A list of replacement values
- commit: Commit transaction after query?
104 def update(self, table, data, where=None, commit=True): 105 """ 106 Update a database record 107 108 :param string table: Table to update 109 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 110 :param dict data: Data to set, Column => Value 111 :param bool commit: Whether to commit after executing the query 112 113 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 114 """ 115 if where is None: 116 where = {} 117 118 # build query 119 identifiers = [sql.Identifier(column) for column in data.keys()] 120 identifiers.insert(0, sql.Identifier(table)) 121 replacements = list(data.values()) 122 123 query = "UPDATE {} SET " + ", ".join(["{} = %s" for column in data]) 124 if where: 125 query += " WHERE " + " AND ".join(["{} = %s" for column in where]) 126 for column in where.keys(): 127 identifiers.append(sql.Identifier(column)) 128 replacements.append(where[column]) 129 130 query = sql.SQL(query).format(*identifiers) 131 132 cursor = self.get_cursor() 133 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 134 cursor.execute(query, replacements) 135 136 if commit: 137 self.commit() 138 139 result = cursor.rowcount 140 cursor.close() 141 return result
Update a database record
Parameters
- string table: Table to update
- dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
- dict data: Data to set, Column => Value
- bool commit: Whether to commit after executing the query
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
143 def delete(self, table, where, commit=True): 144 """ 145 Delete a database record 146 147 :param string table: Table to delete from 148 :param dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc 149 :param bool commit: Whether to commit after executing the query 150 151 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 152 """ 153 where_sql = [] 154 replacements = [] 155 for column in where.keys(): 156 if type(where[column]) in (set, tuple, list): 157 where_sql.append("{} IN %s") 158 replacements.append(tuple(where[column])) 159 else: 160 where_sql.append("{} = %s") 161 replacements.append(where[column]) 162 163 # build query 164 identifiers = [sql.Identifier(column) for column in where.keys()] 165 identifiers.insert(0, sql.Identifier(table)) 166 query = sql.SQL("DELETE FROM {} WHERE " + " AND ".join(where_sql)).format(*identifiers) 167 168 cursor = self.get_cursor() 169 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 170 cursor.execute(query, replacements) 171 172 if commit: 173 self.commit() 174 175 result = cursor.rowcount 176 cursor.close() 177 return result
Delete a database record
Parameters
- string table: Table to delete from
- dict where: Simple conditions, parsed as "column1 = value1 AND column2 = value2" etc
- bool commit: Whether to commit after executing the query
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
179 def insert(self, table, data, commit=True, safe=False, constraints=None, return_field=""): 180 """ 181 Create database record 182 183 :param string table: Table to insert record into 184 :param dict data: Data to insert 185 :param bool commit: Whether to commit after executing the query 186 :param bool safe: If set to `True`, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not 187 insert the row and no error is thrown when the insert violates a unique index or other constraint 188 :param tuple constraints: If `safe` is `True`, this tuple may contain the columns that should be used as a 189 constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING 190 :param str return_field: If not empty or None, this makes the method 191 return this field of the inserted row, instead of the number of 192 affected rows, with `RETURNING`. 193 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 194 """ 195 if constraints is None: 196 constraints = [] 197 198 # escape identifiers 199 identifiers = [sql.Identifier(column) for column in data.keys()] 200 identifiers.insert(0, sql.Identifier(table)) 201 202 # construct ON NOTHING bit of query 203 if safe: 204 safe_bit = " ON CONFLICT " 205 if constraints: 206 safe_bit += "(" + ", ".join(["{}" for each in constraints]) + ")" 207 identifiers.extend([sql.Identifier(column) for column in constraints]) 208 safe_bit += " DO NOTHING" 209 else: 210 safe_bit = "" 211 212 # prepare parameter replacements 213 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) + safe_bit 214 215 if return_field: 216 protoquery += " RETURNING {}" 217 identifiers.append(sql.Identifier(return_field)) 218 219 query = sql.SQL(protoquery).format(*identifiers) 220 replacements = (tuple(data.values()),) 221 222 cursor = self.get_cursor() 223 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 224 cursor.execute(query, replacements) 225 226 if commit: 227 self.commit() 228 229 result = cursor.rowcount if not return_field else cursor.fetchone()[return_field] 230 cursor.close() 231 return result
Create database record
Parameters
- string table: Table to insert record into
- dict data: Data to insert
- bool commit: Whether to commit after executing the query
- bool safe: If set to
True
, "ON CONFLICT DO NOTHING" is added to the insert query, so it does not insert the row and no error is thrown when the insert violates a unique index or other constraint - tuple constraints: If
safe
isTrue
, this tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO NOTHING - str return_field: If not empty or None, this makes the method
return this field of the inserted row, instead of the number of
affected rows, with
RETURNING
.
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
233 def upsert(self, table, data, commit=True, constraints=None): 234 """ 235 Create or update database record 236 237 If the record could not be inserted because of a constraint, the 238 constraining record is updated instead. 239 240 :param string table: Table to upsert record into 241 :param dict data: Data to upsert 242 :param bool commit: Whether to commit after executing the query 243 :param tuple constraints: This tuple may contain the columns that should be used as a 244 constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE 245 :return int: Number of affected rows. Note that this may be unreliable if `commit` is `False` 246 """ 247 if constraints is None: 248 constraints = [] 249 250 # escape identifiers 251 identifiers = [sql.Identifier(column) for column in data.keys()] 252 identifiers.insert(0, sql.Identifier(table)) 253 254 # prepare parameter replacements 255 protoquery = "INSERT INTO {} (%s) VALUES %%s" % ", ".join(["{}" for column in data.keys()]) 256 protoquery += " ON CONFLICT" 257 258 if constraints: 259 protoquery += " (" + ", ".join(["{}" for each in constraints]) + ")" 260 identifiers.extend([sql.Identifier(column) for column in constraints]) 261 262 protoquery += " DO UPDATE SET " 263 protoquery += ", ".join(["%s = EXCLUDED.%s" % (column, column) for column in data.keys()]) 264 identifiers.extend(list(itertools.chain.from_iterable([[column, column] for column in data.keys()]))) 265 266 query = sql.SQL(protoquery).format(*identifiers) 267 replacements = (tuple(data.values()),) 268 269 cursor = self.get_cursor() 270 self.log.debug("Executing query: %s" % cursor.mogrify(query, replacements)) 271 cursor.execute(query, replacements) 272 273 if commit: 274 self.commit() 275 276 result = cursor.rowcount 277 cursor.close() 278 return result
Create or update database record
If the record could not be inserted because of a constraint, the constraining record is updated instead.
Parameters
- string table: Table to upsert record into
- dict data: Data to upsert
- bool commit: Whether to commit after executing the query
- tuple constraints: This tuple may contain the columns that should be used as a constraint, e.g. ON CONFLICT (name, lastname) DO UPDATE
Returns
Number of affected rows. Note that this may be unreliable if
commit
isFalse
280 def fetchall(self, query, *args): 281 """ 282 Fetch all rows for a query 283 284 :param string query: Query 285 :param args: Replacement values 286 :param commit: Commit transaction after query? 287 :return list: The result rows, as a list 288 """ 289 cursor = self.get_cursor() 290 self.log.debug("Executing query: %s" % cursor.mogrify(query, *args)) 291 self.query(query, cursor=cursor, *args) 292 293 try: 294 result = cursor.fetchall() 295 except AttributeError: 296 result = [] 297 except psycopg2.ProgrammingError as e: 298 # there seems to be a bug with psycopg2 where it sometimes raises 299 # this for empty query results even though it shouldn't. this 300 # doesn't seem to indicate an actual problem so we catch the 301 # exception and return an empty list 302 self.rollback() 303 result = [] 304 self.log.warning("Caught ProgrammingError: %s" % e) 305 306 cursor.close() 307 self.commit() 308 309 return result
Fetch all rows for a query
Parameters
- string query: Query
- args: Replacement values
- commit: Commit transaction after query?
Returns
The result rows, as a list
311 def fetchone(self, query, *args): 312 """ 313 Fetch one result row 314 315 :param string query: Query 316 :param args: Replacement values 317 :param commit: Commit transaction after query? 318 :return: The row, as a dictionary, or None if there were no rows 319 """ 320 cursor = self.get_cursor() 321 self.query(query, cursor=cursor, *args) 322 323 try: 324 result = cursor.fetchone() 325 except psycopg2.ProgrammingError as e: 326 # no results to fetch 327 self.rollback() 328 result = None 329 self.log.warning("Caught ProgrammingError: %s" % e) 330 331 cursor.close() 332 self.commit() 333 334 return result
Fetch one result row
Parameters
- string query: Query
- args: Replacement values
- commit: Commit transaction after query?
Returns
The row, as a dictionary, or None if there were no rows
336 def fetchall_interruptable(self, queue, query, *args): 337 """ 338 Fetch all rows for a query, allowing for interruption 339 340 Before running the query, a job is queued to cancel the query after a 341 set amount of time. The query is expected to complete before this 342 timeout. If the backend is interrupted, however, that job will be 343 executed immediately, to cancel the database query. If this happens, a 344 DatabaseQueryInterruptedException will be raised, but the database 345 object will otherwise remain useable. 346 347 Note that in the event that the cancellation job is run, all queries 348 for this instance of the database object will be cancelled. However, 349 there should never be more than one active query per connection within 350 4CAT. 351 352 :param JobQueue queue: A job queue object, required to schedule the 353 query cancellation job 354 :param str query: SQL query 355 :param list args: Replacement variables 356 :param commit: Commit transaction after query? 357 :return list: A list of rows, as dictionaries 358 """ 359 # schedule a job that will cancel the query we're about to make 360 pid = self.connection.get_backend_pid() 361 self.interruptable_job = queue.add_job("cancel-pg-query", details={}, remote_id=self.appname, claim_after=time.time() + self.interruptable_timeout) 362 363 # make the query 364 cursor = self.get_cursor() 365 self.log.debug("Executing interruptable query: %s" % cursor.mogrify(query, *args)) 366 367 try: 368 self.query(query, cursor=cursor, *args) 369 except psycopg2.extensions.QueryCanceledError: 370 # interrupted with cancellation worker (or manually) 371 self.log.debug("Query in connection %s was interrupted..." % self.appname) 372 self.rollback() 373 cursor.close() 374 raise DatabaseQueryInterruptedException("Interrupted while querying database") 375 376 # collect results 377 try: 378 result = cursor.fetchall() 379 except AttributeError as e: 380 result = [] 381 except psycopg2.ProgrammingError as e: 382 result = [] 383 self.log.warning("Caught ProgrammingError: %s" % e) 384 385 # clean up cancelling job when we have the data 386 self.interruptable_job.finish() 387 self.interruptable_job = None 388 389 cursor.close() 390 self.commit() 391 392 return result
Fetch all rows for a query, allowing for interruption
Before running the query, a job is queued to cancel the query after a set amount of time. The query is expected to complete before this timeout. If the backend is interrupted, however, that job will be executed immediately, to cancel the database query. If this happens, a DatabaseQueryInterruptedException will be raised, but the database object will otherwise remain useable.
Note that in the event that the cancellation job is run, all queries for this instance of the database object will be cancelled. However, there should never be more than one active query per connection within 4CAT.
Parameters
- JobQueue queue: A job queue object, required to schedule the query cancellation job
- str query: SQL query
- list args: Replacement variables
- commit: Commit transaction after query?
Returns
A list of rows, as dictionaries
395 def commit(self): 396 """ 397 Commit the current transaction 398 399 This is required for UPDATE etc to stick. 400 """ 401 self.connection.commit()
Commit the current transaction
This is required for UPDATE etc to stick.
403 def rollback(self): 404 """ 405 Roll back the current transaction 406 """ 407 self.connection.rollback()
Roll back the current transaction
409 def close(self): 410 """ 411 Close connection 412 413 Running queries after this is probably a bad idea! 414 """ 415 self.connection.close()
Close connection
Running queries after this is probably a bad idea!
417 def get_cursor(self): 418 """ 419 Get a new cursor 420 421 Re-using cursors seems to give issues when using per-thread 422 connections, so simply instantiate a new one each time 423 424 :return: Cursor 425 """ 426 return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
Get a new cursor
Re-using cursors seems to give issues when using per-thread connections, so simply instantiate a new one each time
Returns
Cursor