common.lib.queue
A job queue, to divide work over the workers
1""" 2A job queue, to divide work over the workers 3""" 4import time 5import json 6 7from common.lib.job import Job 8 9 10class JobQueue: 11 """ 12 A simple job queue 13 14 Jobs are basically database records. The job has some information that a worker 15 can use to do its job. The job queue is shared between workers so that nothing 16 is done twice. 17 """ 18 db = None 19 log = None 20 21 def __init__(self, logger, database): 22 """ 23 Set up database handler 24 """ 25 self.log = logger 26 27 self.db = database 28 29 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 30 """ 31 Get job of a specific type 32 33 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 34 kinds of data. 35 36 :param string jobtype: Job type 37 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 38 a negative value (default), any job with a "claim after" time 39 earlier than the current time is selected. 40 :param bool restrict_claimable: Only return jobs that may be claimed 41 according to their parameters 42 :return dict: Job data, or `None` if no job was found 43 """ 44 if timestamp < 0: 45 timestamp = int(time.time()) 46 47 # select the number of jobs of the same type that have been queued for 48 # longer than the job as well 49 replacements = [jobtype] 50 query = ( 51 "SELECT main_queue.* FROM jobs AS main_queue" 52 " WHERE main_queue.jobtype = %s" 53 ) 54 55 if restrict_claimable: 56 # claimability is determined through various timestamps 57 query += ( 58 " AND main_queue.timestamp_claimed = 0" 59 " AND main_queue.timestamp_after < %s" 60 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 61 ) 62 replacements.append(timestamp) 63 replacements.append(timestamp) 64 65 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 66 67 job = self.db.fetchone(query, tuple(replacements)) 68 69 return Job.get_by_data(job, database=self.db) if job else None 70 71 def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True): 72 """ 73 Get all unclaimed (and claimable) jobs 74 75 :param string jobtype: Type of job, "*" for all types 76 :param string remote_id: Remote ID, takes precedence over `jobtype` 77 :param bool restrict_claimable: Only return jobs that may be claimed 78 according to their parameters 79 :return list: 80 """ 81 replacements = [] 82 if remote_id: 83 filter = "WHERE remote_id = %s" 84 replacements = [remote_id] 85 elif jobtype != "*": 86 filter = "WHERE jobtype = %s" 87 replacements = [jobtype] 88 else: 89 filter = "WHERE jobtype != ''" 90 91 query = "SELECT * FROM jobs %s" % filter 92 93 if restrict_claimable: 94 query += (" AND timestamp_claimed = 0" 95 " AND timestamp_after < %s" 96 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 97 98 now = int(time.time()) 99 replacements.append(now) 100 replacements.append(now) 101 102 query += " ORDER BY timestamp ASC" 103 104 if limit is not None: 105 query += " LIMIT %s" 106 replacements.append(limit) 107 if offset is not None: 108 query += " OFFSET %s" 109 replacements.append(offset) 110 111 jobs = self.db.fetchall(query, replacements) 112 113 return [Job.get_by_data(job, self.db) for job in jobs if job] 114 115 def get_job_count(self, jobtype="*"): 116 """ 117 Get total number of jobs 118 119 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 120 :return int: Number of jobs 121 """ 122 if jobtype == "*": 123 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 124 else: 125 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 126 127 return int(count["count"]) 128 129 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 130 """ 131 Add a new job to the queue 132 133 There can only be one job for any combination of job type and remote id. If a job 134 already exists for the given combination, no new job is added. 135 136 :param jobtype: Job type 137 :param details: Job details - may be empty, will be stored as JSON 138 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 139 :param claim_after: Absolute timestamp after which job may be claimed 140 :param interval: If this is not zero, the job is made a repeating job, 141 which will be repeated at most every `interval` seconds. 142 143 :return Job: A job that matches the input type and remote ID. This may 144 be a newly added job or an existing that matched the same 145 combination (which is required to be unique, so no new job 146 with those parameters could be queued, and the old one is 147 just as valid). 148 """ 149 data = { 150 "jobtype": jobtype, 151 "details": json.dumps(details), 152 "timestamp": int(time.time()), 153 "timestamp_claimed": 0, 154 "timestamp_lastclaimed": 0, 155 "remote_id": remote_id, 156 "timestamp_after": claim_after, 157 "interval": interval, 158 "attempts": 0 159 } 160 161 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 162 163 return Job.get_by_data(data, database=self.db) 164 165 def release_all(self): 166 """ 167 Release all jobs 168 169 All claimed jobs are released. This is useful to run when the backend is restarted. 170 """ 171 self.db.execute("UPDATE jobs SET timestamp_claimed = 0") 172 173 def get_place_in_queue(self, job): 174 """ 175 What is the place of this job in the queue? 176 177 :param Job job: Job to get place in queue for 178 179 :return int: Place in queue. 0 means the job is currently being 180 processed; 1+ means the job is queued, with 1 corresponding to the 181 front of the queue. 182 """ 183 if job.data["timestamp_claimed"] > 0: 184 return 0 185 186 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 187 our_timestamp = job.data["timestamp"] 188 return len( 189 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
11class JobQueue: 12 """ 13 A simple job queue 14 15 Jobs are basically database records. The job has some information that a worker 16 can use to do its job. The job queue is shared between workers so that nothing 17 is done twice. 18 """ 19 db = None 20 log = None 21 22 def __init__(self, logger, database): 23 """ 24 Set up database handler 25 """ 26 self.log = logger 27 28 self.db = database 29 30 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 31 """ 32 Get job of a specific type 33 34 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 35 kinds of data. 36 37 :param string jobtype: Job type 38 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 39 a negative value (default), any job with a "claim after" time 40 earlier than the current time is selected. 41 :param bool restrict_claimable: Only return jobs that may be claimed 42 according to their parameters 43 :return dict: Job data, or `None` if no job was found 44 """ 45 if timestamp < 0: 46 timestamp = int(time.time()) 47 48 # select the number of jobs of the same type that have been queued for 49 # longer than the job as well 50 replacements = [jobtype] 51 query = ( 52 "SELECT main_queue.* FROM jobs AS main_queue" 53 " WHERE main_queue.jobtype = %s" 54 ) 55 56 if restrict_claimable: 57 # claimability is determined through various timestamps 58 query += ( 59 " AND main_queue.timestamp_claimed = 0" 60 " AND main_queue.timestamp_after < %s" 61 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 62 ) 63 replacements.append(timestamp) 64 replacements.append(timestamp) 65 66 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 67 68 job = self.db.fetchone(query, tuple(replacements)) 69 70 return Job.get_by_data(job, database=self.db) if job else None 71 72 def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True): 73 """ 74 Get all unclaimed (and claimable) jobs 75 76 :param string jobtype: Type of job, "*" for all types 77 :param string remote_id: Remote ID, takes precedence over `jobtype` 78 :param bool restrict_claimable: Only return jobs that may be claimed 79 according to their parameters 80 :return list: 81 """ 82 replacements = [] 83 if remote_id: 84 filter = "WHERE remote_id = %s" 85 replacements = [remote_id] 86 elif jobtype != "*": 87 filter = "WHERE jobtype = %s" 88 replacements = [jobtype] 89 else: 90 filter = "WHERE jobtype != ''" 91 92 query = "SELECT * FROM jobs %s" % filter 93 94 if restrict_claimable: 95 query += (" AND timestamp_claimed = 0" 96 " AND timestamp_after < %s" 97 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 98 99 now = int(time.time()) 100 replacements.append(now) 101 replacements.append(now) 102 103 query += " ORDER BY timestamp ASC" 104 105 if limit is not None: 106 query += " LIMIT %s" 107 replacements.append(limit) 108 if offset is not None: 109 query += " OFFSET %s" 110 replacements.append(offset) 111 112 jobs = self.db.fetchall(query, replacements) 113 114 return [Job.get_by_data(job, self.db) for job in jobs if job] 115 116 def get_job_count(self, jobtype="*"): 117 """ 118 Get total number of jobs 119 120 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 121 :return int: Number of jobs 122 """ 123 if jobtype == "*": 124 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 125 else: 126 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 127 128 return int(count["count"]) 129 130 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 131 """ 132 Add a new job to the queue 133 134 There can only be one job for any combination of job type and remote id. If a job 135 already exists for the given combination, no new job is added. 136 137 :param jobtype: Job type 138 :param details: Job details - may be empty, will be stored as JSON 139 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 140 :param claim_after: Absolute timestamp after which job may be claimed 141 :param interval: If this is not zero, the job is made a repeating job, 142 which will be repeated at most every `interval` seconds. 143 144 :return Job: A job that matches the input type and remote ID. This may 145 be a newly added job or an existing that matched the same 146 combination (which is required to be unique, so no new job 147 with those parameters could be queued, and the old one is 148 just as valid). 149 """ 150 data = { 151 "jobtype": jobtype, 152 "details": json.dumps(details), 153 "timestamp": int(time.time()), 154 "timestamp_claimed": 0, 155 "timestamp_lastclaimed": 0, 156 "remote_id": remote_id, 157 "timestamp_after": claim_after, 158 "interval": interval, 159 "attempts": 0 160 } 161 162 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 163 164 return Job.get_by_data(data, database=self.db) 165 166 def release_all(self): 167 """ 168 Release all jobs 169 170 All claimed jobs are released. This is useful to run when the backend is restarted. 171 """ 172 self.db.execute("UPDATE jobs SET timestamp_claimed = 0") 173 174 def get_place_in_queue(self, job): 175 """ 176 What is the place of this job in the queue? 177 178 :param Job job: Job to get place in queue for 179 180 :return int: Place in queue. 0 means the job is currently being 181 processed; 1+ means the job is queued, with 1 corresponding to the 182 front of the queue. 183 """ 184 if job.data["timestamp_claimed"] > 0: 185 return 0 186 187 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 188 our_timestamp = job.data["timestamp"] 189 return len( 190 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
A simple job queue
Jobs are basically database records. The job has some information that a worker can use to do its job. The job queue is shared between workers so that nothing is done twice.
22 def __init__(self, logger, database): 23 """ 24 Set up database handler 25 """ 26 self.log = logger 27 28 self.db = database
Set up database handler
30 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 31 """ 32 Get job of a specific type 33 34 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 35 kinds of data. 36 37 :param string jobtype: Job type 38 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 39 a negative value (default), any job with a "claim after" time 40 earlier than the current time is selected. 41 :param bool restrict_claimable: Only return jobs that may be claimed 42 according to their parameters 43 :return dict: Job data, or `None` if no job was found 44 """ 45 if timestamp < 0: 46 timestamp = int(time.time()) 47 48 # select the number of jobs of the same type that have been queued for 49 # longer than the job as well 50 replacements = [jobtype] 51 query = ( 52 "SELECT main_queue.* FROM jobs AS main_queue" 53 " WHERE main_queue.jobtype = %s" 54 ) 55 56 if restrict_claimable: 57 # claimability is determined through various timestamps 58 query += ( 59 " AND main_queue.timestamp_claimed = 0" 60 " AND main_queue.timestamp_after < %s" 61 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 62 ) 63 replacements.append(timestamp) 64 replacements.append(timestamp) 65 66 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 67 68 job = self.db.fetchone(query, tuple(replacements)) 69 70 return Job.get_by_data(job, database=self.db) if job else None
Get job of a specific type
Returns a job's data. The details
column is parsed JSON, and can thus contain all
kinds of data.
Parameters
- string jobtype: Job type
- int timestamp: Find jobs that may be claimed after this timestamp. If set to a negative value (default), any job with a "claim after" time earlier than the current time is selected.
- bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
Job data, or
None
if no job was found
72 def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True): 73 """ 74 Get all unclaimed (and claimable) jobs 75 76 :param string jobtype: Type of job, "*" for all types 77 :param string remote_id: Remote ID, takes precedence over `jobtype` 78 :param bool restrict_claimable: Only return jobs that may be claimed 79 according to their parameters 80 :return list: 81 """ 82 replacements = [] 83 if remote_id: 84 filter = "WHERE remote_id = %s" 85 replacements = [remote_id] 86 elif jobtype != "*": 87 filter = "WHERE jobtype = %s" 88 replacements = [jobtype] 89 else: 90 filter = "WHERE jobtype != ''" 91 92 query = "SELECT * FROM jobs %s" % filter 93 94 if restrict_claimable: 95 query += (" AND timestamp_claimed = 0" 96 " AND timestamp_after < %s" 97 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 98 99 now = int(time.time()) 100 replacements.append(now) 101 replacements.append(now) 102 103 query += " ORDER BY timestamp ASC" 104 105 if limit is not None: 106 query += " LIMIT %s" 107 replacements.append(limit) 108 if offset is not None: 109 query += " OFFSET %s" 110 replacements.append(offset) 111 112 jobs = self.db.fetchall(query, replacements) 113 114 return [Job.get_by_data(job, self.db) for job in jobs if job]
Get all unclaimed (and claimable) jobs
Parameters
- string jobtype: Type of job, "*" for all types
- string remote_id: Remote ID, takes precedence over
jobtype
- bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
116 def get_job_count(self, jobtype="*"): 117 """ 118 Get total number of jobs 119 120 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 121 :return int: Number of jobs 122 """ 123 if jobtype == "*": 124 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 125 else: 126 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 127 128 return int(count["count"])
Get total number of jobs
Parameters
- jobtype: Type of jobs to count. Default (
*
) counts all jobs.
Returns
Number of jobs
130 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 131 """ 132 Add a new job to the queue 133 134 There can only be one job for any combination of job type and remote id. If a job 135 already exists for the given combination, no new job is added. 136 137 :param jobtype: Job type 138 :param details: Job details - may be empty, will be stored as JSON 139 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 140 :param claim_after: Absolute timestamp after which job may be claimed 141 :param interval: If this is not zero, the job is made a repeating job, 142 which will be repeated at most every `interval` seconds. 143 144 :return Job: A job that matches the input type and remote ID. This may 145 be a newly added job or an existing that matched the same 146 combination (which is required to be unique, so no new job 147 with those parameters could be queued, and the old one is 148 just as valid). 149 """ 150 data = { 151 "jobtype": jobtype, 152 "details": json.dumps(details), 153 "timestamp": int(time.time()), 154 "timestamp_claimed": 0, 155 "timestamp_lastclaimed": 0, 156 "remote_id": remote_id, 157 "timestamp_after": claim_after, 158 "interval": interval, 159 "attempts": 0 160 } 161 162 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 163 164 return Job.get_by_data(data, database=self.db)
Add a new job to the queue
There can only be one job for any combination of job type and remote id. If a job already exists for the given combination, no new job is added.
Parameters
- jobtype: Job type
- details: Job details - may be empty, will be stored as JSON
- remote_id: Remote ID of object to work on. For example, a post or thread ID
- claim_after: Absolute timestamp after which job may be claimed
- interval: If this is not zero, the job is made a repeating job,
which will be repeated at most every
interval
seconds.
Returns
A job that matches the input type and remote ID. This may be a newly added job or an existing that matched the same combination (which is required to be unique, so no new job with those parameters could be queued, and the old one is just as valid).
166 def release_all(self): 167 """ 168 Release all jobs 169 170 All claimed jobs are released. This is useful to run when the backend is restarted. 171 """ 172 self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
Release all jobs
All claimed jobs are released. This is useful to run when the backend is restarted.
174 def get_place_in_queue(self, job): 175 """ 176 What is the place of this job in the queue? 177 178 :param Job job: Job to get place in queue for 179 180 :return int: Place in queue. 0 means the job is currently being 181 processed; 1+ means the job is queued, with 1 corresponding to the 182 front of the queue. 183 """ 184 if job.data["timestamp_claimed"] > 0: 185 return 0 186 187 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 188 our_timestamp = job.data["timestamp"] 189 return len( 190 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
What is the place of this job in the queue?
Parameters
- Job job: Job to get place in queue for
Returns
Place in queue. 0 means the job is currently being processed; 1+ means the job is queued, with 1 corresponding to the front of the queue.