common.lib.job
Class that represents a job in the job queue
1""" 2Class that represents a job in the job queue 3""" 4 5import time 6import json 7import math 8from common.lib.exceptions import JobClaimedException, JobNotFoundException 9 10 11class Job: 12 """ 13 Job in queue 14 """ 15 data = {} 16 db = None 17 18 is_finished = False 19 is_claimed = False 20 21 def __init__(self, data, database=None): 22 """ 23 Instantiate Job object 24 25 :param dict data: Job data, should correspond to a database record 26 :param database: Database handler 27 """ 28 self.data = data 29 self.db = database 30 31 self.data["remote_id"] = str(self.data["remote_id"]) 32 33 try: 34 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 35 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 36 except KeyError: 37 raise Exception 38 39 def get_by_ID(id, database): 40 """ 41 Instantiate job object by ID 42 43 :param int id: Job ID 44 :param database: Database handler 45 :return Job: Job object 46 """ 47 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 48 if not data: 49 raise JobNotFoundException 50 51 return Job.get_by_data(data, database) 52 53 def get_by_data(data, database): 54 """ 55 Instantiate job object with given data 56 57 :param dict data: Job data, should correspond to a database row 58 :param database: Database handler 59 :return Job: Job object 60 """ 61 return Job(data, database) 62 63 def get_by_remote_ID(remote_id, database, jobtype="*"): 64 """ 65 Instantiate job object by combination of remote ID and job type 66 67 This combination is guaranteed to be unique. 68 69 :param database: Database handler 70 :param str jobtype: Job type 71 :param str remote_id: Job remote ID 72 :return Job: Job object 73 """ 74 if jobtype != "*": 75 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 76 else: 77 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 78 79 if not data: 80 raise JobNotFoundException 81 82 return Job.get_by_data(data, database=database) 83 84 def claim(self): 85 """ 86 Claim a job 87 88 This marks it in the database so it cannot be claimed again. 89 """ 90 if self.data["interval"] == 0: 91 claim_time = int(time.time()) 92 else: 93 # the claim time should be a multiple of the interval to prevent 94 # drift of the interval over time. this ensures that on average, 95 # the interval remains as set 96 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 97 98 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 99 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 100 "timestamp_claimed": 0}) 101 102 if updated == 0: 103 raise JobClaimedException 104 105 self.data["timestamp_claimed"] = claim_time 106 self.data["timestamp_lastclaimed"] = claim_time 107 108 self.is_claimed = True 109 110 def finish(self, delete=False): 111 """ 112 Finish job 113 114 This deletes it from the database, or in the case of recurring jobs, 115 resets the claim flags. 116 117 :param bool delete: Whether to force deleting the job even if it is a 118 job with an interval. 119 """ 120 if self.data["interval"] == 0 or delete: 121 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 122 else: 123 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 124 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 125 126 self.is_finished = True 127 128 def release(self, delay=0, claim_after=0): 129 """ 130 Release a job so it may be claimed again 131 132 :param int delay: Delay in seconds after which job may be reclaimed. 133 :param int claim_after: Timestamp after which job may be claimed. This 134 is overridden by `delay`. 135 """ 136 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 137 if delay > 0: 138 update["timestamp_after"] = int(time.time()) + delay 139 elif claim_after is not None: 140 update["timestamp_after"] = claim_after 141 142 self.db.update("jobs", data=update, 143 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 144 self.is_claimed = False 145 146 def is_claimable(self): 147 """ 148 Can this job be claimed? 149 150 :return bool: If the job is not claimed yet and also isn't finished. 151 """ 152 return not self.is_claimed and not self.is_finished 153 154 def get_place_in_queue(self): 155 """ 156 Get the place of this job in the queue 157 158 :return int: Place in queue 159 """ 160 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 161 replacements = [self.data["jobtype"]] 162 if self.data["timestamp_after"] == 0: 163 # Job can be claimed immediately 164 query += ( 165 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 166 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 167 replacements += [self.data["timestamp"], self.data["timestamp"]] 168 else: 169 # Job must wait until timestamp_after 170 query += ( 171 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 172 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 173 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 174 queue_result = self.db.fetchone(query, replacements) 175 if queue_result["queue_ahead"] is None: 176 raise Exception(f"what? {queue_result}") 177 178 return queue_result["queue_ahead"] 179 180 @property 181 def details(self): 182 try: 183 details = json.loads(self.data["details"]) 184 if details: 185 return details 186 else: 187 return {} 188 except (TypeError, json.JSONDecodeError): 189 return {}
class
Job:
12class Job: 13 """ 14 Job in queue 15 """ 16 data = {} 17 db = None 18 19 is_finished = False 20 is_claimed = False 21 22 def __init__(self, data, database=None): 23 """ 24 Instantiate Job object 25 26 :param dict data: Job data, should correspond to a database record 27 :param database: Database handler 28 """ 29 self.data = data 30 self.db = database 31 32 self.data["remote_id"] = str(self.data["remote_id"]) 33 34 try: 35 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 36 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 37 except KeyError: 38 raise Exception 39 40 def get_by_ID(id, database): 41 """ 42 Instantiate job object by ID 43 44 :param int id: Job ID 45 :param database: Database handler 46 :return Job: Job object 47 """ 48 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 49 if not data: 50 raise JobNotFoundException 51 52 return Job.get_by_data(data, database) 53 54 def get_by_data(data, database): 55 """ 56 Instantiate job object with given data 57 58 :param dict data: Job data, should correspond to a database row 59 :param database: Database handler 60 :return Job: Job object 61 """ 62 return Job(data, database) 63 64 def get_by_remote_ID(remote_id, database, jobtype="*"): 65 """ 66 Instantiate job object by combination of remote ID and job type 67 68 This combination is guaranteed to be unique. 69 70 :param database: Database handler 71 :param str jobtype: Job type 72 :param str remote_id: Job remote ID 73 :return Job: Job object 74 """ 75 if jobtype != "*": 76 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 77 else: 78 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 79 80 if not data: 81 raise JobNotFoundException 82 83 return Job.get_by_data(data, database=database) 84 85 def claim(self): 86 """ 87 Claim a job 88 89 This marks it in the database so it cannot be claimed again. 90 """ 91 if self.data["interval"] == 0: 92 claim_time = int(time.time()) 93 else: 94 # the claim time should be a multiple of the interval to prevent 95 # drift of the interval over time. this ensures that on average, 96 # the interval remains as set 97 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 98 99 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 100 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 101 "timestamp_claimed": 0}) 102 103 if updated == 0: 104 raise JobClaimedException 105 106 self.data["timestamp_claimed"] = claim_time 107 self.data["timestamp_lastclaimed"] = claim_time 108 109 self.is_claimed = True 110 111 def finish(self, delete=False): 112 """ 113 Finish job 114 115 This deletes it from the database, or in the case of recurring jobs, 116 resets the claim flags. 117 118 :param bool delete: Whether to force deleting the job even if it is a 119 job with an interval. 120 """ 121 if self.data["interval"] == 0 or delete: 122 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 123 else: 124 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 125 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 126 127 self.is_finished = True 128 129 def release(self, delay=0, claim_after=0): 130 """ 131 Release a job so it may be claimed again 132 133 :param int delay: Delay in seconds after which job may be reclaimed. 134 :param int claim_after: Timestamp after which job may be claimed. This 135 is overridden by `delay`. 136 """ 137 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 138 if delay > 0: 139 update["timestamp_after"] = int(time.time()) + delay 140 elif claim_after is not None: 141 update["timestamp_after"] = claim_after 142 143 self.db.update("jobs", data=update, 144 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 145 self.is_claimed = False 146 147 def is_claimable(self): 148 """ 149 Can this job be claimed? 150 151 :return bool: If the job is not claimed yet and also isn't finished. 152 """ 153 return not self.is_claimed and not self.is_finished 154 155 def get_place_in_queue(self): 156 """ 157 Get the place of this job in the queue 158 159 :return int: Place in queue 160 """ 161 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 162 replacements = [self.data["jobtype"]] 163 if self.data["timestamp_after"] == 0: 164 # Job can be claimed immediately 165 query += ( 166 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 167 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 168 replacements += [self.data["timestamp"], self.data["timestamp"]] 169 else: 170 # Job must wait until timestamp_after 171 query += ( 172 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 173 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 174 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 175 queue_result = self.db.fetchone(query, replacements) 176 if queue_result["queue_ahead"] is None: 177 raise Exception(f"what? {queue_result}") 178 179 return queue_result["queue_ahead"] 180 181 @property 182 def details(self): 183 try: 184 details = json.loads(self.data["details"]) 185 if details: 186 return details 187 else: 188 return {} 189 except (TypeError, json.JSONDecodeError): 190 return {}
Job in queue
Job(data, database=None)
22 def __init__(self, data, database=None): 23 """ 24 Instantiate Job object 25 26 :param dict data: Job data, should correspond to a database record 27 :param database: Database handler 28 """ 29 self.data = data 30 self.db = database 31 32 self.data["remote_id"] = str(self.data["remote_id"]) 33 34 try: 35 self.is_finished = "is_finished" in self.data and self.data["is_finished"] 36 self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 37 except KeyError: 38 raise Exception
Instantiate Job object
Parameters
- dict data: Job data, should correspond to a database record
- database: Database handler
def
get_by_ID(id, database):
40 def get_by_ID(id, database): 41 """ 42 Instantiate job object by ID 43 44 :param int id: Job ID 45 :param database: Database handler 46 :return Job: Job object 47 """ 48 data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,)) 49 if not data: 50 raise JobNotFoundException 51 52 return Job.get_by_data(data, database)
Instantiate job object by ID
Parameters
- int id: Job ID
- database: Database handler
Returns
Job object
def
get_by_data(data, database):
54 def get_by_data(data, database): 55 """ 56 Instantiate job object with given data 57 58 :param dict data: Job data, should correspond to a database row 59 :param database: Database handler 60 :return Job: Job object 61 """ 62 return Job(data, database)
Instantiate job object with given data
Parameters
- dict data: Job data, should correspond to a database row
- database: Database handler
Returns
Job object
def
get_by_remote_ID(remote_id, database, jobtype='*'):
64 def get_by_remote_ID(remote_id, database, jobtype="*"): 65 """ 66 Instantiate job object by combination of remote ID and job type 67 68 This combination is guaranteed to be unique. 69 70 :param database: Database handler 71 :param str jobtype: Job type 72 :param str remote_id: Job remote ID 73 :return Job: Job object 74 """ 75 if jobtype != "*": 76 data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id)) 77 else: 78 data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,)) 79 80 if not data: 81 raise JobNotFoundException 82 83 return Job.get_by_data(data, database=database)
Instantiate job object by combination of remote ID and job type
This combination is guaranteed to be unique.
Parameters
- database: Database handler
- str jobtype: Job type
- str remote_id: Job remote ID
Returns
Job object
def
claim(self):
85 def claim(self): 86 """ 87 Claim a job 88 89 This marks it in the database so it cannot be claimed again. 90 """ 91 if self.data["interval"] == 0: 92 claim_time = int(time.time()) 93 else: 94 # the claim time should be a multiple of the interval to prevent 95 # drift of the interval over time. this ensures that on average, 96 # the interval remains as set 97 claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"] 98 99 updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time}, 100 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"], 101 "timestamp_claimed": 0}) 102 103 if updated == 0: 104 raise JobClaimedException 105 106 self.data["timestamp_claimed"] = claim_time 107 self.data["timestamp_lastclaimed"] = claim_time 108 109 self.is_claimed = True
Claim a job
This marks it in the database so it cannot be claimed again.
def
finish(self, delete=False):
111 def finish(self, delete=False): 112 """ 113 Finish job 114 115 This deletes it from the database, or in the case of recurring jobs, 116 resets the claim flags. 117 118 :param bool delete: Whether to force deleting the job even if it is a 119 job with an interval. 120 """ 121 if self.data["interval"] == 0 or delete: 122 self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 123 else: 124 self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, 125 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 126 127 self.is_finished = True
Finish job
This deletes it from the database, or in the case of recurring jobs, resets the claim flags.
Parameters
- bool delete: Whether to force deleting the job even if it is a job with an interval.
def
release(self, delay=0, claim_after=0):
129 def release(self, delay=0, claim_after=0): 130 """ 131 Release a job so it may be claimed again 132 133 :param int delay: Delay in seconds after which job may be reclaimed. 134 :param int claim_after: Timestamp after which job may be claimed. This 135 is overridden by `delay`. 136 """ 137 update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} 138 if delay > 0: 139 update["timestamp_after"] = int(time.time()) + delay 140 elif claim_after is not None: 141 update["timestamp_after"] = claim_after 142 143 self.db.update("jobs", data=update, 144 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) 145 self.is_claimed = False
Release a job so it may be claimed again
Parameters
- int delay: Delay in seconds after which job may be reclaimed.
- int claim_after: Timestamp after which job may be claimed. This
is overridden by
delay
.
def
is_claimable(self):
147 def is_claimable(self): 148 """ 149 Can this job be claimed? 150 151 :return bool: If the job is not claimed yet and also isn't finished. 152 """ 153 return not self.is_claimed and not self.is_finished
Can this job be claimed?
Returns
If the job is not claimed yet and also isn't finished.
def
get_place_in_queue(self):
155 def get_place_in_queue(self): 156 """ 157 Get the place of this job in the queue 158 159 :return int: Place in queue 160 """ 161 query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s" 162 replacements = [self.data["jobtype"]] 163 if self.data["timestamp_after"] == 0: 164 # Job can be claimed immediately 165 query += ( 166 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job being queued 167 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting for a specific time, but prior to this job being queued 168 replacements += [self.data["timestamp"], self.data["timestamp"]] 169 else: 170 # Job must wait until timestamp_after 171 query += ( 172 " AND (timestamp_after = 0 AND timestamp < %s OR " # Other jobs that can be claimed immediately and were queued prior to this job 173 " timestamp_after > 0 AND timestamp_after < %s) ") # Other jobs that are waiting, but prior to this job's start time 174 replacements += [self.data["timestamp_after"], self.data["timestamp_after"]] 175 queue_result = self.db.fetchone(query, replacements) 176 if queue_result["queue_ahead"] is None: 177 raise Exception(f"what? {queue_result}") 178 179 return queue_result["queue_ahead"]
Get the place of this job in the queue
Returns
Place in queue