Edit on GitHub

common.lib.queue

A job queue, to divide work over the workers

  1"""
  2A job queue, to divide work over the workers
  3"""
  4import time
  5import json
  6
  7from common.lib.job import Job
  8
  9
 10class JobQueue:
 11	"""
 12	A simple job queue
 13
 14	Jobs are basically database records. The job has some information that a worker
 15	can use to do its job. The job queue is shared between workers so that nothing
 16	is done twice.
 17	"""
 18	db = None
 19	log = None
 20
 21	def __init__(self, logger, database):
 22		"""
 23		Set up database handler
 24		"""
 25		self.log = logger
 26
 27		self.db = database
 28
 29	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
 30		"""
 31		Get job of a specific type
 32
 33		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
 34		kinds of data.
 35
 36		:param string jobtype:  Job type
 37		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
 38							   a negative value (default), any job with a "claim after" time
 39							   earlier than the current time is selected.
 40		:param bool restrict_claimable:  Only return jobs that may be claimed
 41		according to their parameters
 42		:return dict: Job data, or `None` if no job was found
 43		"""
 44		if timestamp < 0:
 45			timestamp = int(time.time())
 46
 47		# select the number of jobs of the same type that have been queued for
 48		# longer than the job as well
 49		replacements = [jobtype]
 50		query = (
 51			"SELECT main_queue.* FROM jobs AS main_queue"
 52			"        WHERE main_queue.jobtype = %s"
 53		)
 54
 55		if restrict_claimable:
 56			# claimability is determined through various timestamps
 57			query += (
 58			"          AND main_queue.timestamp_claimed = 0"
 59			"          AND main_queue.timestamp_after < %s"
 60			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
 61			)
 62			replacements.append(timestamp)
 63			replacements.append(timestamp)
 64
 65		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
 66
 67		job = self.db.fetchone(query, tuple(replacements))
 68
 69		return Job.get_by_data(job, database=self.db) if job else None
 70
 71	def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True):
 72		"""
 73		Get all unclaimed (and claimable) jobs
 74
 75		:param string jobtype:  Type of job, "*" for all types
 76		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 77		:param bool restrict_claimable:  Only return jobs that may be claimed
 78		according to their parameters
 79		:return list:
 80		"""
 81		replacements = []
 82		if remote_id:
 83			filter = "WHERE remote_id = %s"
 84			replacements = [remote_id]
 85		elif jobtype != "*":
 86			filter = "WHERE jobtype = %s"
 87			replacements = [jobtype]
 88		else:
 89			filter = "WHERE jobtype != ''"
 90
 91		query = "SELECT * FROM jobs %s" % filter
 92
 93		if restrict_claimable:
 94			query += ("        AND timestamp_claimed = 0"
 95					  "              AND timestamp_after < %s"
 96					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 97
 98			now = int(time.time())
 99			replacements.append(now)
100			replacements.append(now)
101
102		query += "         ORDER BY timestamp ASC"
103
104		if limit is not None:
105			query += " LIMIT %s"
106			replacements.append(limit)
107		if offset is not None:
108			query += " OFFSET %s"
109			replacements.append(offset)
110
111		jobs = self.db.fetchall(query, replacements)
112
113		return [Job.get_by_data(job, self.db) for job in jobs if job]
114
115	def get_job_count(self, jobtype="*"):
116		"""
117		Get total number of jobs
118
119		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
120		:return int:  Number of jobs
121		"""
122		if jobtype == "*":
123			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
124		else:
125			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
126
127		return int(count["count"])
128
129	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
130		"""
131		Add a new job to the queue
132
133		There can only be one job for any combination of job type and remote id. If a job
134		already exists for the given combination, no new job is added.
135
136		:param jobtype:  Job type
137		:param details:  Job details - may be empty, will be stored as JSON
138		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
139		:param claim_after:  Absolute timestamp after which job may be claimed
140		:param interval:  If this is not zero, the job is made a repeating job,
141		                  which will be repeated at most every `interval` seconds.
142
143		:return Job: A job that matches the input type and remote ID. This may
144		             be a newly added job or an existing that matched the same
145		             combination (which is required to be unique, so no new job
146		             with those parameters could be queued, and the old one is
147		             just as valid).
148		"""
149		data = {
150			"jobtype": jobtype,
151			"details": json.dumps(details),
152			"timestamp": int(time.time()),
153			"timestamp_claimed": 0,
154			"timestamp_lastclaimed": 0,
155			"remote_id": remote_id,
156			"timestamp_after": claim_after,
157			"interval": interval,
158			"attempts": 0
159		}
160
161		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
162
163		return Job.get_by_data(data, database=self.db)
164
165	def release_all(self):
166		"""
167		Release all jobs
168
169		All claimed jobs are released. This is useful to run when the backend is restarted.
170		"""
171		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
172
173	def get_place_in_queue(self, job):
174		"""
175		What is the place of this job in the queue?
176
177		:param Job job:  Job to get place in queue for
178
179		:return int: Place in queue. 0 means the job is currently being
180		processed; 1+ means the job is queued, with 1 corresponding to the
181		front of the queue.
182		"""
183		if job.data["timestamp_claimed"] > 0:
184			return 0
185
186		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
187		our_timestamp = job.data["timestamp"]
188		return len(
189			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
class JobQueue:
 11class JobQueue:
 12	"""
 13	A simple job queue
 14
 15	Jobs are basically database records. The job has some information that a worker
 16	can use to do its job. The job queue is shared between workers so that nothing
 17	is done twice.
 18	"""
 19	db = None
 20	log = None
 21
 22	def __init__(self, logger, database):
 23		"""
 24		Set up database handler
 25		"""
 26		self.log = logger
 27
 28		self.db = database
 29
 30	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
 31		"""
 32		Get job of a specific type
 33
 34		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
 35		kinds of data.
 36
 37		:param string jobtype:  Job type
 38		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
 39							   a negative value (default), any job with a "claim after" time
 40							   earlier than the current time is selected.
 41		:param bool restrict_claimable:  Only return jobs that may be claimed
 42		according to their parameters
 43		:return dict: Job data, or `None` if no job was found
 44		"""
 45		if timestamp < 0:
 46			timestamp = int(time.time())
 47
 48		# select the number of jobs of the same type that have been queued for
 49		# longer than the job as well
 50		replacements = [jobtype]
 51		query = (
 52			"SELECT main_queue.* FROM jobs AS main_queue"
 53			"        WHERE main_queue.jobtype = %s"
 54		)
 55
 56		if restrict_claimable:
 57			# claimability is determined through various timestamps
 58			query += (
 59			"          AND main_queue.timestamp_claimed = 0"
 60			"          AND main_queue.timestamp_after < %s"
 61			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
 62			)
 63			replacements.append(timestamp)
 64			replacements.append(timestamp)
 65
 66		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
 67
 68		job = self.db.fetchone(query, tuple(replacements))
 69
 70		return Job.get_by_data(job, database=self.db) if job else None
 71
 72	def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True):
 73		"""
 74		Get all unclaimed (and claimable) jobs
 75
 76		:param string jobtype:  Type of job, "*" for all types
 77		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 78		:param bool restrict_claimable:  Only return jobs that may be claimed
 79		according to their parameters
 80		:return list:
 81		"""
 82		replacements = []
 83		if remote_id:
 84			filter = "WHERE remote_id = %s"
 85			replacements = [remote_id]
 86		elif jobtype != "*":
 87			filter = "WHERE jobtype = %s"
 88			replacements = [jobtype]
 89		else:
 90			filter = "WHERE jobtype != ''"
 91
 92		query = "SELECT * FROM jobs %s" % filter
 93
 94		if restrict_claimable:
 95			query += ("        AND timestamp_claimed = 0"
 96					  "              AND timestamp_after < %s"
 97					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 98
 99			now = int(time.time())
100			replacements.append(now)
101			replacements.append(now)
102
103		query += "         ORDER BY timestamp ASC"
104
105		if limit is not None:
106			query += " LIMIT %s"
107			replacements.append(limit)
108		if offset is not None:
109			query += " OFFSET %s"
110			replacements.append(offset)
111
112		jobs = self.db.fetchall(query, replacements)
113
114		return [Job.get_by_data(job, self.db) for job in jobs if job]
115
116	def get_job_count(self, jobtype="*"):
117		"""
118		Get total number of jobs
119
120		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
121		:return int:  Number of jobs
122		"""
123		if jobtype == "*":
124			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
125		else:
126			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
127
128		return int(count["count"])
129
130	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
131		"""
132		Add a new job to the queue
133
134		There can only be one job for any combination of job type and remote id. If a job
135		already exists for the given combination, no new job is added.
136
137		:param jobtype:  Job type
138		:param details:  Job details - may be empty, will be stored as JSON
139		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
140		:param claim_after:  Absolute timestamp after which job may be claimed
141		:param interval:  If this is not zero, the job is made a repeating job,
142		                  which will be repeated at most every `interval` seconds.
143
144		:return Job: A job that matches the input type and remote ID. This may
145		             be a newly added job or an existing that matched the same
146		             combination (which is required to be unique, so no new job
147		             with those parameters could be queued, and the old one is
148		             just as valid).
149		"""
150		data = {
151			"jobtype": jobtype,
152			"details": json.dumps(details),
153			"timestamp": int(time.time()),
154			"timestamp_claimed": 0,
155			"timestamp_lastclaimed": 0,
156			"remote_id": remote_id,
157			"timestamp_after": claim_after,
158			"interval": interval,
159			"attempts": 0
160		}
161
162		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
163
164		return Job.get_by_data(data, database=self.db)
165
166	def release_all(self):
167		"""
168		Release all jobs
169
170		All claimed jobs are released. This is useful to run when the backend is restarted.
171		"""
172		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
173
174	def get_place_in_queue(self, job):
175		"""
176		What is the place of this job in the queue?
177
178		:param Job job:  Job to get place in queue for
179
180		:return int: Place in queue. 0 means the job is currently being
181		processed; 1+ means the job is queued, with 1 corresponding to the
182		front of the queue.
183		"""
184		if job.data["timestamp_claimed"] > 0:
185			return 0
186
187		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
188		our_timestamp = job.data["timestamp"]
189		return len(
190			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])

A simple job queue

Jobs are basically database records. The job has some information that a worker can use to do its job. The job queue is shared between workers so that nothing is done twice.

JobQueue(logger, database)
22	def __init__(self, logger, database):
23		"""
24		Set up database handler
25		"""
26		self.log = logger
27
28		self.db = database

Set up database handler

db = None
log = None
def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
30	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
31		"""
32		Get job of a specific type
33
34		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
35		kinds of data.
36
37		:param string jobtype:  Job type
38		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
39							   a negative value (default), any job with a "claim after" time
40							   earlier than the current time is selected.
41		:param bool restrict_claimable:  Only return jobs that may be claimed
42		according to their parameters
43		:return dict: Job data, or `None` if no job was found
44		"""
45		if timestamp < 0:
46			timestamp = int(time.time())
47
48		# select the number of jobs of the same type that have been queued for
49		# longer than the job as well
50		replacements = [jobtype]
51		query = (
52			"SELECT main_queue.* FROM jobs AS main_queue"
53			"        WHERE main_queue.jobtype = %s"
54		)
55
56		if restrict_claimable:
57			# claimability is determined through various timestamps
58			query += (
59			"          AND main_queue.timestamp_claimed = 0"
60			"          AND main_queue.timestamp_after < %s"
61			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
62			)
63			replacements.append(timestamp)
64			replacements.append(timestamp)
65
66		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
67
68		job = self.db.fetchone(query, tuple(replacements))
69
70		return Job.get_by_data(job, database=self.db) if job else None

Get job of a specific type

Returns a job's data. The details column is parsed JSON, and can thus contain all kinds of data.

Parameters
  • string jobtype: Job type
  • int timestamp: Find jobs that may be claimed after this timestamp. If set to a negative value (default), any job with a "claim after" time earlier than the current time is selected.
  • bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns

Job data, or None if no job was found

def get_all_jobs( self, jobtype='*', limit=None, offset=None, remote_id=False, restrict_claimable=True):
 72	def get_all_jobs(self, jobtype="*", limit=None, offset=None, remote_id=False, restrict_claimable=True):
 73		"""
 74		Get all unclaimed (and claimable) jobs
 75
 76		:param string jobtype:  Type of job, "*" for all types
 77		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 78		:param bool restrict_claimable:  Only return jobs that may be claimed
 79		according to their parameters
 80		:return list:
 81		"""
 82		replacements = []
 83		if remote_id:
 84			filter = "WHERE remote_id = %s"
 85			replacements = [remote_id]
 86		elif jobtype != "*":
 87			filter = "WHERE jobtype = %s"
 88			replacements = [jobtype]
 89		else:
 90			filter = "WHERE jobtype != ''"
 91
 92		query = "SELECT * FROM jobs %s" % filter
 93
 94		if restrict_claimable:
 95			query += ("        AND timestamp_claimed = 0"
 96					  "              AND timestamp_after < %s"
 97					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 98
 99			now = int(time.time())
100			replacements.append(now)
101			replacements.append(now)
102
103		query += "         ORDER BY timestamp ASC"
104
105		if limit is not None:
106			query += " LIMIT %s"
107			replacements.append(limit)
108		if offset is not None:
109			query += " OFFSET %s"
110			replacements.append(offset)
111
112		jobs = self.db.fetchall(query, replacements)
113
114		return [Job.get_by_data(job, self.db) for job in jobs if job]

Get all unclaimed (and claimable) jobs

Parameters
  • string jobtype: Type of job, "*" for all types
  • string remote_id: Remote ID, takes precedence over jobtype
  • bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
def get_job_count(self, jobtype='*'):
116	def get_job_count(self, jobtype="*"):
117		"""
118		Get total number of jobs
119
120		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
121		:return int:  Number of jobs
122		"""
123		if jobtype == "*":
124			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
125		else:
126			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
127
128		return int(count["count"])

Get total number of jobs

Parameters
  • jobtype: Type of jobs to count. Default (*) counts all jobs.
Returns

Number of jobs

def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
130	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
131		"""
132		Add a new job to the queue
133
134		There can only be one job for any combination of job type and remote id. If a job
135		already exists for the given combination, no new job is added.
136
137		:param jobtype:  Job type
138		:param details:  Job details - may be empty, will be stored as JSON
139		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
140		:param claim_after:  Absolute timestamp after which job may be claimed
141		:param interval:  If this is not zero, the job is made a repeating job,
142		                  which will be repeated at most every `interval` seconds.
143
144		:return Job: A job that matches the input type and remote ID. This may
145		             be a newly added job or an existing that matched the same
146		             combination (which is required to be unique, so no new job
147		             with those parameters could be queued, and the old one is
148		             just as valid).
149		"""
150		data = {
151			"jobtype": jobtype,
152			"details": json.dumps(details),
153			"timestamp": int(time.time()),
154			"timestamp_claimed": 0,
155			"timestamp_lastclaimed": 0,
156			"remote_id": remote_id,
157			"timestamp_after": claim_after,
158			"interval": interval,
159			"attempts": 0
160		}
161
162		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
163
164		return Job.get_by_data(data, database=self.db)

Add a new job to the queue

There can only be one job for any combination of job type and remote id. If a job already exists for the given combination, no new job is added.

Parameters
  • jobtype: Job type
  • details: Job details - may be empty, will be stored as JSON
  • remote_id: Remote ID of object to work on. For example, a post or thread ID
  • claim_after: Absolute timestamp after which job may be claimed
  • interval: If this is not zero, the job is made a repeating job, which will be repeated at most every interval seconds.
Returns

A job that matches the input type and remote ID. This may be a newly added job or an existing that matched the same combination (which is required to be unique, so no new job with those parameters could be queued, and the old one is just as valid).

def release_all(self):
166	def release_all(self):
167		"""
168		Release all jobs
169
170		All claimed jobs are released. This is useful to run when the backend is restarted.
171		"""
172		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")

Release all jobs

All claimed jobs are released. This is useful to run when the backend is restarted.

def get_place_in_queue(self, job):
174	def get_place_in_queue(self, job):
175		"""
176		What is the place of this job in the queue?
177
178		:param Job job:  Job to get place in queue for
179
180		:return int: Place in queue. 0 means the job is currently being
181		processed; 1+ means the job is queued, with 1 corresponding to the
182		front of the queue.
183		"""
184		if job.data["timestamp_claimed"] > 0:
185			return 0
186
187		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
188		our_timestamp = job.data["timestamp"]
189		return len(
190			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])

What is the place of this job in the queue?

Parameters
  • Job job: Job to get place in queue for
Returns

Place in queue. 0 means the job is currently being processed; 1+ means the job is queued, with 1 corresponding to the front of the queue.