common.lib.queue
A job queue, to divide work over the workers
1""" 2A job queue, to divide work over the workers 3""" 4import time 5import json 6 7from common.lib.job import Job 8import psycopg2 9 10 11class JobQueue: 12 """ 13 A simple job queue 14 15 Jobs are basically database records. The job has some information that a worker 16 can use to do its job. The job queue is shared between workers so that nothing 17 is done twice. 18 """ 19 db = None 20 log = None 21 22 def __init__(self, logger, database): 23 """ 24 Set up database handler 25 """ 26 self.log = logger 27 28 self.db = database 29 30 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 31 """ 32 Get job of a specific type 33 34 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 35 kinds of data. 36 37 :param string jobtype: Job type 38 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 39 a negative value (default), any job with a "claim after" time 40 earlier than the current time is selected. 41 :param bool restrict_claimable: Only return jobs that may be claimed 42 according to their parameters 43 :return dict: Job data, or `None` if no job was found 44 """ 45 if timestamp < 0: 46 timestamp = int(time.time()) 47 48 # select the number of jobs of the same type that have been queued for 49 # longer than the job as well 50 replacements = [jobtype] 51 query = ( 52 "SELECT main_queue.* FROM jobs AS main_queue" 53 " WHERE main_queue.jobtype = %s" 54 ) 55 56 if restrict_claimable: 57 # claimability is determined through various timestamps 58 query += ( 59 " AND main_queue.timestamp_claimed = 0" 60 " AND main_queue.timestamp_after < %s" 61 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 62 ) 63 replacements.append(timestamp) 64 replacements.append(timestamp) 65 66 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 67 68 job = self.db.fetchone(query, tuple(replacements)) 69 70 return Job.get_by_data(job, database=self.db) if job else None 71 72 def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True): 73 """ 74 Get all unclaimed (and claimable) jobs 75 76 :param string jobtype: Type of job, "*" for all types 77 :param string remote_id: Remote ID, takes precedence over `jobtype` 78 :param bool restrict_claimable: Only return jobs that may be claimed 79 according to their parameters 80 :return list: 81 """ 82 replacements = [] 83 if remote_id: 84 filter = "WHERE remote_id = %s" 85 replacements = [remote_id] 86 elif jobtype != "*": 87 filter = "WHERE jobtype = %s" 88 replacements = [jobtype] 89 else: 90 filter = "WHERE jobtype != ''" 91 92 query = "SELECT * FROM jobs %s" % filter 93 94 if restrict_claimable: 95 query += (" AND timestamp_claimed = 0" 96 " AND timestamp_after < %s" 97 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 98 99 now = int(time.time()) 100 replacements.append(now) 101 replacements.append(now) 102 103 query += " ORDER BY timestamp ASC" 104 105 jobs = self.db.fetchall(query, replacements) 106 107 return [Job.get_by_data(job, self.db) for job in jobs if job] 108 109 def get_job_count(self, jobtype="*"): 110 """ 111 Get total number of jobs 112 113 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 114 :return int: Number of jobs 115 """ 116 if jobtype == "*": 117 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 118 else: 119 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 120 121 return int(count["count"]) 122 123 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 124 """ 125 Add a new job to the queue 126 127 There can only be one job for any combination of job type and remote id. If a job 128 already exists for the given combination, no new job is added. 129 130 :param jobtype: Job type 131 :param details: Job details - may be empty, will be stored as JSON 132 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 133 :param claim_after: Absolute timestamp after which job may be claimed 134 :param interval: If this is not zero, the job is made a repeating job, 135 which will be repeated at most every `interval` seconds. 136 137 :return Job: A job that matches the input type and remote ID. This may 138 be a newly added job or an existing that matched the same 139 combination (which is required to be unique, so no new job 140 with those parameters could be queued, and the old one is 141 just as valid). 142 """ 143 data = { 144 "jobtype": jobtype, 145 "details": json.dumps(details), 146 "timestamp": int(time.time()), 147 "timestamp_claimed": 0, 148 "timestamp_lastclaimed": 0, 149 "remote_id": remote_id, 150 "timestamp_after": claim_after, 151 "interval": interval, 152 "attempts": 0 153 } 154 155 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 156 157 return Job.get_by_data(data, database=self.db) 158 159 def release_all(self): 160 """ 161 Release all jobs 162 163 All claimed jobs are released. This is useful to run when the backend is restarted. 164 """ 165 self.db.execute("UPDATE jobs SET timestamp_claimed = 0") 166 167 def get_place_in_queue(self, job): 168 """ 169 What is the place of this job in the queue? 170 171 :param Job job: Job to get place in queue for 172 173 :return int: Place in queue. 0 means the job is currently being 174 processed; 1+ means the job is queued, with 1 corresponding to the 175 front of the queue. 176 """ 177 if job.data["timestamp_claimed"] > 0: 178 return 0 179 180 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 181 our_timestamp = job.data["timestamp"] 182 return len( 183 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
12class JobQueue: 13 """ 14 A simple job queue 15 16 Jobs are basically database records. The job has some information that a worker 17 can use to do its job. The job queue is shared between workers so that nothing 18 is done twice. 19 """ 20 db = None 21 log = None 22 23 def __init__(self, logger, database): 24 """ 25 Set up database handler 26 """ 27 self.log = logger 28 29 self.db = database 30 31 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 32 """ 33 Get job of a specific type 34 35 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 36 kinds of data. 37 38 :param string jobtype: Job type 39 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 40 a negative value (default), any job with a "claim after" time 41 earlier than the current time is selected. 42 :param bool restrict_claimable: Only return jobs that may be claimed 43 according to their parameters 44 :return dict: Job data, or `None` if no job was found 45 """ 46 if timestamp < 0: 47 timestamp = int(time.time()) 48 49 # select the number of jobs of the same type that have been queued for 50 # longer than the job as well 51 replacements = [jobtype] 52 query = ( 53 "SELECT main_queue.* FROM jobs AS main_queue" 54 " WHERE main_queue.jobtype = %s" 55 ) 56 57 if restrict_claimable: 58 # claimability is determined through various timestamps 59 query += ( 60 " AND main_queue.timestamp_claimed = 0" 61 " AND main_queue.timestamp_after < %s" 62 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 63 ) 64 replacements.append(timestamp) 65 replacements.append(timestamp) 66 67 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 68 69 job = self.db.fetchone(query, tuple(replacements)) 70 71 return Job.get_by_data(job, database=self.db) if job else None 72 73 def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True): 74 """ 75 Get all unclaimed (and claimable) jobs 76 77 :param string jobtype: Type of job, "*" for all types 78 :param string remote_id: Remote ID, takes precedence over `jobtype` 79 :param bool restrict_claimable: Only return jobs that may be claimed 80 according to their parameters 81 :return list: 82 """ 83 replacements = [] 84 if remote_id: 85 filter = "WHERE remote_id = %s" 86 replacements = [remote_id] 87 elif jobtype != "*": 88 filter = "WHERE jobtype = %s" 89 replacements = [jobtype] 90 else: 91 filter = "WHERE jobtype != ''" 92 93 query = "SELECT * FROM jobs %s" % filter 94 95 if restrict_claimable: 96 query += (" AND timestamp_claimed = 0" 97 " AND timestamp_after < %s" 98 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 99 100 now = int(time.time()) 101 replacements.append(now) 102 replacements.append(now) 103 104 query += " ORDER BY timestamp ASC" 105 106 jobs = self.db.fetchall(query, replacements) 107 108 return [Job.get_by_data(job, self.db) for job in jobs if job] 109 110 def get_job_count(self, jobtype="*"): 111 """ 112 Get total number of jobs 113 114 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 115 :return int: Number of jobs 116 """ 117 if jobtype == "*": 118 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 119 else: 120 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 121 122 return int(count["count"]) 123 124 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 125 """ 126 Add a new job to the queue 127 128 There can only be one job for any combination of job type and remote id. If a job 129 already exists for the given combination, no new job is added. 130 131 :param jobtype: Job type 132 :param details: Job details - may be empty, will be stored as JSON 133 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 134 :param claim_after: Absolute timestamp after which job may be claimed 135 :param interval: If this is not zero, the job is made a repeating job, 136 which will be repeated at most every `interval` seconds. 137 138 :return Job: A job that matches the input type and remote ID. This may 139 be a newly added job or an existing that matched the same 140 combination (which is required to be unique, so no new job 141 with those parameters could be queued, and the old one is 142 just as valid). 143 """ 144 data = { 145 "jobtype": jobtype, 146 "details": json.dumps(details), 147 "timestamp": int(time.time()), 148 "timestamp_claimed": 0, 149 "timestamp_lastclaimed": 0, 150 "remote_id": remote_id, 151 "timestamp_after": claim_after, 152 "interval": interval, 153 "attempts": 0 154 } 155 156 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 157 158 return Job.get_by_data(data, database=self.db) 159 160 def release_all(self): 161 """ 162 Release all jobs 163 164 All claimed jobs are released. This is useful to run when the backend is restarted. 165 """ 166 self.db.execute("UPDATE jobs SET timestamp_claimed = 0") 167 168 def get_place_in_queue(self, job): 169 """ 170 What is the place of this job in the queue? 171 172 :param Job job: Job to get place in queue for 173 174 :return int: Place in queue. 0 means the job is currently being 175 processed; 1+ means the job is queued, with 1 corresponding to the 176 front of the queue. 177 """ 178 if job.data["timestamp_claimed"] > 0: 179 return 0 180 181 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 182 our_timestamp = job.data["timestamp"] 183 return len( 184 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
A simple job queue
Jobs are basically database records. The job has some information that a worker can use to do its job. The job queue is shared between workers so that nothing is done twice.
23 def __init__(self, logger, database): 24 """ 25 Set up database handler 26 """ 27 self.log = logger 28 29 self.db = database
Set up database handler
31 def get_job(self, jobtype, timestamp=-1, restrict_claimable=True): 32 """ 33 Get job of a specific type 34 35 Returns a job's data. The `details` column is parsed JSON, and can thus contain all 36 kinds of data. 37 38 :param string jobtype: Job type 39 :param int timestamp: Find jobs that may be claimed after this timestamp. If set to 40 a negative value (default), any job with a "claim after" time 41 earlier than the current time is selected. 42 :param bool restrict_claimable: Only return jobs that may be claimed 43 according to their parameters 44 :return dict: Job data, or `None` if no job was found 45 """ 46 if timestamp < 0: 47 timestamp = int(time.time()) 48 49 # select the number of jobs of the same type that have been queued for 50 # longer than the job as well 51 replacements = [jobtype] 52 query = ( 53 "SELECT main_queue.* FROM jobs AS main_queue" 54 " WHERE main_queue.jobtype = %s" 55 ) 56 57 if restrict_claimable: 58 # claimability is determined through various timestamps 59 query += ( 60 " AND main_queue.timestamp_claimed = 0" 61 " AND main_queue.timestamp_after < %s" 62 " AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)" 63 ) 64 replacements.append(timestamp) 65 replacements.append(timestamp) 66 67 query += " ORDER BY main_queue.timestamp ASC LIMIT 1;" 68 69 job = self.db.fetchone(query, tuple(replacements)) 70 71 return Job.get_by_data(job, database=self.db) if job else None
Get job of a specific type
Returns a job's data. The details
column is parsed JSON, and can thus contain all
kinds of data.
Parameters
- string jobtype: Job type
- int timestamp: Find jobs that may be claimed after this timestamp. If set to a negative value (default), any job with a "claim after" time earlier than the current time is selected.
- bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
Job data, or
None
if no job was found
73 def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True): 74 """ 75 Get all unclaimed (and claimable) jobs 76 77 :param string jobtype: Type of job, "*" for all types 78 :param string remote_id: Remote ID, takes precedence over `jobtype` 79 :param bool restrict_claimable: Only return jobs that may be claimed 80 according to their parameters 81 :return list: 82 """ 83 replacements = [] 84 if remote_id: 85 filter = "WHERE remote_id = %s" 86 replacements = [remote_id] 87 elif jobtype != "*": 88 filter = "WHERE jobtype = %s" 89 replacements = [jobtype] 90 else: 91 filter = "WHERE jobtype != ''" 92 93 query = "SELECT * FROM jobs %s" % filter 94 95 if restrict_claimable: 96 query += (" AND timestamp_claimed = 0" 97 " AND timestamp_after < %s" 98 " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") 99 100 now = int(time.time()) 101 replacements.append(now) 102 replacements.append(now) 103 104 query += " ORDER BY timestamp ASC" 105 106 jobs = self.db.fetchall(query, replacements) 107 108 return [Job.get_by_data(job, self.db) for job in jobs if job]
Get all unclaimed (and claimable) jobs
Parameters
- string jobtype: Type of job, "*" for all types
- string remote_id: Remote ID, takes precedence over
jobtype
- bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
110 def get_job_count(self, jobtype="*"): 111 """ 112 Get total number of jobs 113 114 :param jobtype: Type of jobs to count. Default (`*`) counts all jobs. 115 :return int: Number of jobs 116 """ 117 if jobtype == "*": 118 count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ()) 119 else: 120 count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,)) 121 122 return int(count["count"])
Get total number of jobs
Parameters
- jobtype: Type of jobs to count. Default (
*
) counts all jobs.
Returns
Number of jobs
124 def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0): 125 """ 126 Add a new job to the queue 127 128 There can only be one job for any combination of job type and remote id. If a job 129 already exists for the given combination, no new job is added. 130 131 :param jobtype: Job type 132 :param details: Job details - may be empty, will be stored as JSON 133 :param remote_id: Remote ID of object to work on. For example, a post or thread ID 134 :param claim_after: Absolute timestamp after which job may be claimed 135 :param interval: If this is not zero, the job is made a repeating job, 136 which will be repeated at most every `interval` seconds. 137 138 :return Job: A job that matches the input type and remote ID. This may 139 be a newly added job or an existing that matched the same 140 combination (which is required to be unique, so no new job 141 with those parameters could be queued, and the old one is 142 just as valid). 143 """ 144 data = { 145 "jobtype": jobtype, 146 "details": json.dumps(details), 147 "timestamp": int(time.time()), 148 "timestamp_claimed": 0, 149 "timestamp_lastclaimed": 0, 150 "remote_id": remote_id, 151 "timestamp_after": claim_after, 152 "interval": interval, 153 "attempts": 0 154 } 155 156 self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id")) 157 158 return Job.get_by_data(data, database=self.db)
Add a new job to the queue
There can only be one job for any combination of job type and remote id. If a job already exists for the given combination, no new job is added.
Parameters
- jobtype: Job type
- details: Job details - may be empty, will be stored as JSON
- remote_id: Remote ID of object to work on. For example, a post or thread ID
- claim_after: Absolute timestamp after which job may be claimed
- interval: If this is not zero, the job is made a repeating job,
which will be repeated at most every
interval
seconds.
Returns
A job that matches the input type and remote ID. This may be a newly added job or an existing that matched the same combination (which is required to be unique, so no new job with those parameters could be queued, and the old one is just as valid).
160 def release_all(self): 161 """ 162 Release all jobs 163 164 All claimed jobs are released. This is useful to run when the backend is restarted. 165 """ 166 self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
Release all jobs
All claimed jobs are released. This is useful to run when the backend is restarted.
168 def get_place_in_queue(self, job): 169 """ 170 What is the place of this job in the queue? 171 172 :param Job job: Job to get place in queue for 173 174 :return int: Place in queue. 0 means the job is currently being 175 processed; 1+ means the job is queued, with 1 corresponding to the 176 front of the queue. 177 """ 178 if job.data["timestamp_claimed"] > 0: 179 return 0 180 181 all_queued = self.get_all_jobs(jobtype=job.data["jobtype"]) 182 our_timestamp = job.data["timestamp"] 183 return len( 184 [queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
What is the place of this job in the queue?
Parameters
- Job job: Job to get place in queue for
Returns
Place in queue. 0 means the job is currently being processed; 1+ means the job is queued, with 1 corresponding to the front of the queue.