common.lib.job
Class that represents a job in the job queue
1""" 2Class that represents a job in the job queue 3""" 4 5import time 6import json 7import math 8from common.lib.exceptions import JobClaimedException, JobNotFoundException 9 10class Job: 11 """ 12 Job in queue 13 """ 14 data = {} 15 db = None 16 17 is_finished = False 18 is_claimed = False 19 20 def __init__(self, data, database=None): 21 """ 22 Instantiate Job object 23 24 :param dict data: Job data, should correspond to a database record 25 :param database: Database handler 26 """ 27 self.data = data 28 self.db = database 29 30 self.data["remote_id"] = str(self.data["remote_id"]) 31 32 try: 33 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 34 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 35 except KeyError: 36 raise Exception 37 38 def get_by_ID(id, database): 39 """ 40 Instantiate job object by ID 41 42 :param int id: Job ID 43 :param database: Database handler 44 :return Job: Job object 45 """ 46 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 47 if not data: 48 raise JobNotFoundException 49 50 return Job.get_by_data(data, database) 51 52 def get_by_data(data, database): 53 """ 54 Instantiate job object with given data 55 56 :param dict data: Job data, should correspond to a database row 57 :param database: Database handler 58 :return Job: Job object 59 """ 60 return Job(data, database) 61 62 def get_by_remote_ID(remote_id, database, jobtype="*"): 63 """ 64 Instantiate job object by combination of remote ID and job type 65 66 This combination is guaranteed to be unique. 67 68 :param database: Database handler 69 :param str jobtype: Job type 70 :param str remote_id: Job remote ID 71 :return Job: Job object 72 """ 73 if jobtype != "*": 74 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 75 else: 76 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 77 78 if not data: 79 raise JobNotFoundException 80 81 return Job.get_by_data(data, database=database) 82 83 def claim(self): 84 """ 85 Claim a job 86 87 This marks it in the database so it cannot be claimed again. 88 """ 89 if self.data["interval"] == 0: 90 claim_time = int(time.time()) 91 else: 92 # the claim time should be a multiple of the interval to prevent 93 # drift of the interval over time. this ensures that on average, 94 # the interval remains as set 95 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 96 97 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 98 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 99 "timestamp_claimed": 0}) 100 101 if updated == 0: 102 raise JobClaimedException 103 104 self.data["timestamp_claimed"] = claim_time 105 self.data["timestamp_lastclaimed"] = claim_time 106 107 self.is_claimed = True 108 109 def finish(self, delete=False): 110 """ 111 Finish job 112 113 This deletes it from the database, or in the case of recurring jobs, 114 resets the claim flags. 115 116 :param bool delete: Whether to force deleting the job even if it is a 117 job with an interval. 118 """ 119 if self.data["interval"] == 0 or delete: 120 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 121 else: 122 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 123 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 124 125 self.is_finished = True 126 127 def release(self, delay=0, claim_after=0): 128 """ 129 Release a job so it may be claimed again 130 131 :param int delay: Delay in seconds after which job may be reclaimed. 132 :param int claim_after: Timestamp after which job may be claimed. This 133 is overridden by `delay`. 134 """ 135 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 136 if delay > 0: 137 update["timestamp_after"] = int(time.time()) + delay 138 elif claim_after is not None: 139 update["timestamp_after"] = claim_after 140 141 self.db.update("jobs", data=update, 142 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 143 self.is_claimed = False 144 145 def is_claimable(self): 146 """ 147 Can this job be claimed? 148 149 :return bool: If the job is not claimed yet and also isn't finished. 150 """ 151 return not self.is_claimed and not self.is_finished 152 153 def get_place_in_queue(self): 154 """ 155 Get the place of this job in the queue 156 157 :return int: Place in queue 158 """ 159 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 160 replacements = [self.data["jobtype"]] 161 if self.data["timestamp_after"] == 0: 162 # Job can be claimed immediately 163 query += ( 164 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 165 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 166 replacements += [self.data["timestamp"], self.data["timestamp"]] 167 else: 168 # Job must wait until timestamp_after 169 query += ( 170 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 171 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 172 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 173 queue_result = self.db.fetchone(query, replacements) 174 if queue_result["queue_ahead"] is None: 175 raise Exception(f"what? {queue_result}") 176 177 return queue_result["queue_ahead"] 178 179 @property 180 def details(self): 181 try: 182 details = json.loads(self.data["details"]) 183 if details: 184 return details 185 else: 186 return {} 187 except (TypeError, json.JSONDecodeError): 188 return {}
class
Job:
11class Job: 12 """ 13 Job in queue 14 """ 15 data = {} 16 db = None 17 18 is_finished = False 19 is_claimed = False 20 21 def __init__(self, data, database=None): 22 """ 23 Instantiate Job object 24 25 :param dict data: Job data, should correspond to a database record 26 :param database: Database handler 27 """ 28 self.data = data 29 self.db = database 30 31 self.data["remote_id"] = str(self.data["remote_id"]) 32 33 try: 34 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 35 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 36 except KeyError: 37 raise Exception 38 39 def get_by_ID(id, database): 40 """ 41 Instantiate job object by ID 42 43 :param int id: Job ID 44 :param database: Database handler 45 :return Job: Job object 46 """ 47 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 48 if not data: 49 raise JobNotFoundException 50 51 return Job.get_by_data(data, database) 52 53 def get_by_data(data, database): 54 """ 55 Instantiate job object with given data 56 57 :param dict data: Job data, should correspond to a database row 58 :param database: Database handler 59 :return Job: Job object 60 """ 61 return Job(data, database) 62 63 def get_by_remote_ID(remote_id, database, jobtype="*"): 64 """ 65 Instantiate job object by combination of remote ID and job type 66 67 This combination is guaranteed to be unique. 68 69 :param database: Database handler 70 :param str jobtype: Job type 71 :param str remote_id: Job remote ID 72 :return Job: Job object 73 """ 74 if jobtype != "*": 75 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 76 else: 77 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 78 79 if not data: 80 raise JobNotFoundException 81 82 return Job.get_by_data(data, database=database) 83 84 def claim(self): 85 """ 86 Claim a job 87 88 This marks it in the database so it cannot be claimed again. 89 """ 90 if self.data["interval"] == 0: 91 claim_time = int(time.time()) 92 else: 93 # the claim time should be a multiple of the interval to prevent 94 # drift of the interval over time. this ensures that on average, 95 # the interval remains as set 96 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 97 98 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 99 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 100 "timestamp_claimed": 0}) 101 102 if updated == 0: 103 raise JobClaimedException 104 105 self.data["timestamp_claimed"] = claim_time 106 self.data["timestamp_lastclaimed"] = claim_time 107 108 self.is_claimed = True 109 110 def finish(self, delete=False): 111 """ 112 Finish job 113 114 This deletes it from the database, or in the case of recurring jobs, 115 resets the claim flags. 116 117 :param bool delete: Whether to force deleting the job even if it is a 118 job with an interval. 119 """ 120 if self.data["interval"] == 0 or delete: 121 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 122 else: 123 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 124 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 125 126 self.is_finished = True 127 128 def release(self, delay=0, claim_after=0): 129 """ 130 Release a job so it may be claimed again 131 132 :param int delay: Delay in seconds after which job may be reclaimed. 133 :param int claim_after: Timestamp after which job may be claimed. This 134 is overridden by `delay`. 135 """ 136 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 137 if delay > 0: 138 update["timestamp_after"] = int(time.time()) + delay 139 elif claim_after is not None: 140 update["timestamp_after"] = claim_after 141 142 self.db.update("jobs", data=update, 143 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 144 self.is_claimed = False 145 146 def is_claimable(self): 147 """ 148 Can this job be claimed? 149 150 :return bool: If the job is not claimed yet and also isn't finished. 151 """ 152 return not self.is_claimed and not self.is_finished 153 154 def get_place_in_queue(self): 155 """ 156 Get the place of this job in the queue 157 158 :return int: Place in queue 159 """ 160 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 161 replacements = [self.data["jobtype"]] 162 if self.data["timestamp_after"] == 0: 163 # Job can be claimed immediately 164 query += ( 165 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 166 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 167 replacements += [self.data["timestamp"], self.data["timestamp"]] 168 else: 169 # Job must wait until timestamp_after 170 query += ( 171 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 172 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 173 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 174 queue_result = self.db.fetchone(query, replacements) 175 if queue_result["queue_ahead"] is None: 176 raise Exception(f"what? {queue_result}") 177 178 return queue_result["queue_ahead"] 179 180 @property 181 def details(self): 182 try: 183 details = json.loads(self.data["details"]) 184 if details: 185 return details 186 else: 187 return {} 188 except (TypeError, json.JSONDecodeError): 189 return {}
Job in queue
Job(data, database=None)
21 def __init__(self, data, database=None): 22 """ 23 Instantiate Job object 24 25 :param dict data: Job data, should correspond to a database record 26 :param database: Database handler 27 """ 28 self.data = data 29 self.db = database 30 31 self.data["remote_id"] = str(self.data["remote_id"]) 32 33 try: 34 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 35 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 36 except KeyError: 37 raise Exception
Instantiate Job object
Parameters
- dict data: Job data, should correspond to a database record
- database: Database handler
def
get_by_ID(id, database):
39 def get_by_ID(id, database): 40 """ 41 Instantiate job object by ID 42 43 :param int id: Job ID 44 :param database: Database handler 45 :return Job: Job object 46 """ 47 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 48 if not data: 49 raise JobNotFoundException 50 51 return Job.get_by_data(data, database)
Instantiate job object by ID
Parameters
- int id: Job ID
- database: Database handler
Returns
Job object
def
get_by_data(data, database):
53 def get_by_data(data, database): 54 """ 55 Instantiate job object with given data 56 57 :param dict data: Job data, should correspond to a database row 58 :param database: Database handler 59 :return Job: Job object 60 """ 61 return Job(data, database)
Instantiate job object with given data
Parameters
- dict data: Job data, should correspond to a database row
- database: Database handler
Returns
Job object
def
get_by_remote_ID(remote_id, database, jobtype='*'):
63 def get_by_remote_ID(remote_id, database, jobtype="*"): 64 """ 65 Instantiate job object by combination of remote ID and job type 66 67 This combination is guaranteed to be unique. 68 69 :param database: Database handler 70 :param str jobtype: Job type 71 :param str remote_id: Job remote ID 72 :return Job: Job object 73 """ 74 if jobtype != "*": 75 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 76 else: 77 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 78 79 if not data: 80 raise JobNotFoundException 81 82 return Job.get_by_data(data, database=database)
Instantiate job object by combination of remote ID and job type
This combination is guaranteed to be unique.
Parameters
- database: Database handler
- str jobtype: Job type
- str remote_id: Job remote ID
Returns
Job object
def
claim(self):
84 def claim(self): 85 """ 86 Claim a job 87 88 This marks it in the database so it cannot be claimed again. 89 """ 90 if self.data["interval"] == 0: 91 claim_time = int(time.time()) 92 else: 93 # the claim time should be a multiple of the interval to prevent 94 # drift of the interval over time. this ensures that on average, 95 # the interval remains as set 96 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 97 98 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 99 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 100 "timestamp_claimed": 0}) 101 102 if updated == 0: 103 raise JobClaimedException 104 105 self.data["timestamp_claimed"] = claim_time 106 self.data["timestamp_lastclaimed"] = claim_time 107 108 self.is_claimed = True
Claim a job
This marks it in the database so it cannot be claimed again.
def
finish(self, delete=False):
110 def finish(self, delete=False): 111 """ 112 Finish job 113 114 This deletes it from the database, or in the case of recurring jobs, 115 resets the claim flags. 116 117 :param bool delete: Whether to force deleting the job even if it is a 118 job with an interval. 119 """ 120 if self.data["interval"] == 0 or delete: 121 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 122 else: 123 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 124 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 125 126 self.is_finished = True
Finish job
This deletes it from the database, or in the case of recurring jobs, resets the claim flags.
Parameters
- bool delete: Whether to force deleting the job even if it is a job with an interval.
def
release(self, delay=0, claim_after=0):
128 def release(self, delay=0, claim_after=0): 129 """ 130 Release a job so it may be claimed again 131 132 :param int delay: Delay in seconds after which job may be reclaimed. 133 :param int claim_after: Timestamp after which job may be claimed. This 134 is overridden by `delay`. 135 """ 136 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 137 if delay > 0: 138 update["timestamp_after"] = int(time.time()) + delay 139 elif claim_after is not None: 140 update["timestamp_after"] = claim_after 141 142 self.db.update("jobs", data=update, 143 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 144 self.is_claimed = False
Release a job so it may be claimed again
Parameters
- int delay: Delay in seconds after which job may be reclaimed.
- int claim_after: Timestamp after which job may be claimed. This
is overridden by
delay
.
def
is_claimable(self):
146 def is_claimable(self): 147 """ 148 Can this job be claimed? 149 150 :return bool: If the job is not claimed yet and also isn't finished. 151 """ 152 return not self.is_claimed and not self.is_finished
Can this job be claimed?
Returns
If the job is not claimed yet and also isn't finished.
def
get_place_in_queue(self):
154 def get_place_in_queue(self): 155 """ 156 Get the place of this job in the queue 157 158 :return int: Place in queue 159 """ 160 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 161 replacements = [self.data["jobtype"]] 162 if self.data["timestamp_after"] == 0: 163 # Job can be claimed immediately 164 query += ( 165 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 166 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 167 replacements += [self.data["timestamp"], self.data["timestamp"]] 168 else: 169 # Job must wait until timestamp_after 170 query += ( 171 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 172 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 173 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 174 queue_result = self.db.fetchone(query, replacements) 175 if queue_result["queue_ahead"] is None: 176 raise Exception(f"what? {queue_result}") 177 178 return queue_result["queue_ahead"]
Get the place of this job in the queue
Returns
Place in queue