Edit on GitHub

common.lib.queue

A job queue, to divide work over the workers

  1"""
  2A job queue, to divide work over the workers
  3"""
  4import time
  5import json
  6
  7from common.lib.job import Job
  8import psycopg2
  9
 10
 11class JobQueue:
 12	"""
 13	A simple job queue
 14
 15	Jobs are basically database records. The job has some information that a worker
 16	can use to do its job. The job queue is shared between workers so that nothing
 17	is done twice.
 18	"""
 19	db = None
 20	log = None
 21
 22	def __init__(self, logger, database):
 23		"""
 24		Set up database handler
 25		"""
 26		self.log = logger
 27
 28		self.db = database
 29
 30	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
 31		"""
 32		Get job of a specific type
 33
 34		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
 35		kinds of data.
 36
 37		:param string jobtype:  Job type
 38		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
 39							   a negative value (default), any job with a "claim after" time
 40							   earlier than the current time is selected.
 41		:param bool restrict_claimable:  Only return jobs that may be claimed
 42		according to their parameters
 43		:return dict: Job data, or `None` if no job was found
 44		"""
 45		if timestamp < 0:
 46			timestamp = int(time.time())
 47
 48		# select the number of jobs of the same type that have been queued for
 49		# longer than the job as well
 50		replacements = [jobtype]
 51		query = (
 52			"SELECT main_queue.* FROM jobs AS main_queue"
 53			"        WHERE main_queue.jobtype = %s"
 54		)
 55
 56		if restrict_claimable:
 57			# claimability is determined through various timestamps
 58			query += (
 59			"          AND main_queue.timestamp_claimed = 0"
 60			"          AND main_queue.timestamp_after < %s"
 61			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
 62			)
 63			replacements.append(timestamp)
 64			replacements.append(timestamp)
 65
 66		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
 67
 68		job = self.db.fetchone(query, tuple(replacements))
 69
 70		return Job.get_by_data(job, database=self.db) if job else None
 71
 72	def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True):
 73		"""
 74		Get all unclaimed (and claimable) jobs
 75
 76		:param string jobtype:  Type of job, "*" for all types
 77		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 78		:param bool restrict_claimable:  Only return jobs that may be claimed
 79		according to their parameters
 80		:return list:
 81		"""
 82		replacements = []
 83		if remote_id:
 84			filter = "WHERE remote_id = %s"
 85			replacements = [remote_id]
 86		elif jobtype != "*":
 87			filter = "WHERE jobtype = %s"
 88			replacements = [jobtype]
 89		else:
 90			filter = "WHERE jobtype != ''"
 91
 92		query = "SELECT * FROM jobs %s" % filter
 93
 94		if restrict_claimable:
 95			query += ("        AND timestamp_claimed = 0"
 96					  "              AND timestamp_after < %s"
 97					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 98
 99			now = int(time.time())
100			replacements.append(now)
101			replacements.append(now)
102
103		query += "         ORDER BY timestamp ASC"
104
105		jobs = self.db.fetchall(query, replacements)
106
107		return [Job.get_by_data(job, self.db) for job in jobs if job]
108
109	def get_job_count(self, jobtype="*"):
110		"""
111		Get total number of jobs
112
113		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
114		:return int:  Number of jobs
115		"""
116		if jobtype == "*":
117			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
118		else:
119			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
120
121		return int(count["count"])
122
123	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
124		"""
125		Add a new job to the queue
126
127		There can only be one job for any combination of job type and remote id. If a job
128		already exists for the given combination, no new job is added.
129
130		:param jobtype:  Job type
131		:param details:  Job details - may be empty, will be stored as JSON
132		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
133		:param claim_after:  Absolute timestamp after which job may be claimed
134		:param interval:  If this is not zero, the job is made a repeating job,
135		                  which will be repeated at most every `interval` seconds.
136
137		:return Job: A job that matches the input type and remote ID. This may
138		             be a newly added job or an existing that matched the same
139		             combination (which is required to be unique, so no new job
140		             with those parameters could be queued, and the old one is
141		             just as valid).
142		"""
143		data = {
144			"jobtype": jobtype,
145			"details": json.dumps(details),
146			"timestamp": int(time.time()),
147			"timestamp_claimed": 0,
148			"timestamp_lastclaimed": 0,
149			"remote_id": remote_id,
150			"timestamp_after": claim_after,
151			"interval": interval,
152			"attempts": 0
153		}
154
155		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
156
157		return Job.get_by_data(data, database=self.db)
158
159	def release_all(self):
160		"""
161		Release all jobs
162
163		All claimed jobs are released. This is useful to run when the backend is restarted.
164		"""
165		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
166
167	def get_place_in_queue(self, job):
168		"""
169		What is the place of this job in the queue?
170
171		:param Job job:  Job to get place in queue for
172
173		:return int: Place in queue. 0 means the job is currently being
174		processed; 1+ means the job is queued, with 1 corresponding to the
175		front of the queue.
176		"""
177		if job.data["timestamp_claimed"] > 0:
178			return 0
179
180		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
181		our_timestamp = job.data["timestamp"]
182		return len(
183			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])
class JobQueue:
 12class JobQueue:
 13	"""
 14	A simple job queue
 15
 16	Jobs are basically database records. The job has some information that a worker
 17	can use to do its job. The job queue is shared between workers so that nothing
 18	is done twice.
 19	"""
 20	db = None
 21	log = None
 22
 23	def __init__(self, logger, database):
 24		"""
 25		Set up database handler
 26		"""
 27		self.log = logger
 28
 29		self.db = database
 30
 31	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
 32		"""
 33		Get job of a specific type
 34
 35		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
 36		kinds of data.
 37
 38		:param string jobtype:  Job type
 39		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
 40							   a negative value (default), any job with a "claim after" time
 41							   earlier than the current time is selected.
 42		:param bool restrict_claimable:  Only return jobs that may be claimed
 43		according to their parameters
 44		:return dict: Job data, or `None` if no job was found
 45		"""
 46		if timestamp < 0:
 47			timestamp = int(time.time())
 48
 49		# select the number of jobs of the same type that have been queued for
 50		# longer than the job as well
 51		replacements = [jobtype]
 52		query = (
 53			"SELECT main_queue.* FROM jobs AS main_queue"
 54			"        WHERE main_queue.jobtype = %s"
 55		)
 56
 57		if restrict_claimable:
 58			# claimability is determined through various timestamps
 59			query += (
 60			"          AND main_queue.timestamp_claimed = 0"
 61			"          AND main_queue.timestamp_after < %s"
 62			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
 63			)
 64			replacements.append(timestamp)
 65			replacements.append(timestamp)
 66
 67		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
 68
 69		job = self.db.fetchone(query, tuple(replacements))
 70
 71		return Job.get_by_data(job, database=self.db) if job else None
 72
 73	def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True):
 74		"""
 75		Get all unclaimed (and claimable) jobs
 76
 77		:param string jobtype:  Type of job, "*" for all types
 78		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 79		:param bool restrict_claimable:  Only return jobs that may be claimed
 80		according to their parameters
 81		:return list:
 82		"""
 83		replacements = []
 84		if remote_id:
 85			filter = "WHERE remote_id = %s"
 86			replacements = [remote_id]
 87		elif jobtype != "*":
 88			filter = "WHERE jobtype = %s"
 89			replacements = [jobtype]
 90		else:
 91			filter = "WHERE jobtype != ''"
 92
 93		query = "SELECT * FROM jobs %s" % filter
 94
 95		if restrict_claimable:
 96			query += ("        AND timestamp_claimed = 0"
 97					  "              AND timestamp_after < %s"
 98					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 99
100			now = int(time.time())
101			replacements.append(now)
102			replacements.append(now)
103
104		query += "         ORDER BY timestamp ASC"
105
106		jobs = self.db.fetchall(query, replacements)
107
108		return [Job.get_by_data(job, self.db) for job in jobs if job]
109
110	def get_job_count(self, jobtype="*"):
111		"""
112		Get total number of jobs
113
114		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
115		:return int:  Number of jobs
116		"""
117		if jobtype == "*":
118			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
119		else:
120			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
121
122		return int(count["count"])
123
124	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
125		"""
126		Add a new job to the queue
127
128		There can only be one job for any combination of job type and remote id. If a job
129		already exists for the given combination, no new job is added.
130
131		:param jobtype:  Job type
132		:param details:  Job details - may be empty, will be stored as JSON
133		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
134		:param claim_after:  Absolute timestamp after which job may be claimed
135		:param interval:  If this is not zero, the job is made a repeating job,
136		                  which will be repeated at most every `interval` seconds.
137
138		:return Job: A job that matches the input type and remote ID. This may
139		             be a newly added job or an existing that matched the same
140		             combination (which is required to be unique, so no new job
141		             with those parameters could be queued, and the old one is
142		             just as valid).
143		"""
144		data = {
145			"jobtype": jobtype,
146			"details": json.dumps(details),
147			"timestamp": int(time.time()),
148			"timestamp_claimed": 0,
149			"timestamp_lastclaimed": 0,
150			"remote_id": remote_id,
151			"timestamp_after": claim_after,
152			"interval": interval,
153			"attempts": 0
154		}
155
156		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
157
158		return Job.get_by_data(data, database=self.db)
159
160	def release_all(self):
161		"""
162		Release all jobs
163
164		All claimed jobs are released. This is useful to run when the backend is restarted.
165		"""
166		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")
167
168	def get_place_in_queue(self, job):
169		"""
170		What is the place of this job in the queue?
171
172		:param Job job:  Job to get place in queue for
173
174		:return int: Place in queue. 0 means the job is currently being
175		processed; 1+ means the job is queued, with 1 corresponding to the
176		front of the queue.
177		"""
178		if job.data["timestamp_claimed"] > 0:
179			return 0
180
181		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
182		our_timestamp = job.data["timestamp"]
183		return len(
184			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])

A simple job queue

Jobs are basically database records. The job has some information that a worker can use to do its job. The job queue is shared between workers so that nothing is done twice.

JobQueue(logger, database)
23	def __init__(self, logger, database):
24		"""
25		Set up database handler
26		"""
27		self.log = logger
28
29		self.db = database

Set up database handler

db = None
log = None
def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
31	def get_job(self, jobtype, timestamp=-1, restrict_claimable=True):
32		"""
33		Get job of a specific type
34
35		Returns a job's data. The `details` column is parsed JSON, and can thus contain all
36		kinds of data.
37
38		:param string jobtype:  Job type
39		:param int timestamp:  Find jobs that may be claimed after this timestamp. If set to
40							   a negative value (default), any job with a "claim after" time
41							   earlier than the current time is selected.
42		:param bool restrict_claimable:  Only return jobs that may be claimed
43		according to their parameters
44		:return dict: Job data, or `None` if no job was found
45		"""
46		if timestamp < 0:
47			timestamp = int(time.time())
48
49		# select the number of jobs of the same type that have been queued for
50		# longer than the job as well
51		replacements = [jobtype]
52		query = (
53			"SELECT main_queue.* FROM jobs AS main_queue"
54			"        WHERE main_queue.jobtype = %s"
55		)
56
57		if restrict_claimable:
58			# claimability is determined through various timestamps
59			query += (
60			"          AND main_queue.timestamp_claimed = 0"
61			"          AND main_queue.timestamp_after < %s"
62			"          AND (main_queue.interval = 0 OR main_queue.timestamp_lastclaimed + main_queue.interval < %s)"
63			)
64			replacements.append(timestamp)
65			replacements.append(timestamp)
66
67		query += "    ORDER BY main_queue.timestamp ASC LIMIT 1;"
68
69		job = self.db.fetchone(query, tuple(replacements))
70
71		return Job.get_by_data(job, database=self.db) if job else None

Get job of a specific type

Returns a job's data. The details column is parsed JSON, and can thus contain all kinds of data.

Parameters
  • string jobtype: Job type
  • int timestamp: Find jobs that may be claimed after this timestamp. If set to a negative value (default), any job with a "claim after" time earlier than the current time is selected.
  • bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns

Job data, or None if no job was found

def get_all_jobs(self, jobtype='*', remote_id=False, restrict_claimable=True):
 73	def get_all_jobs(self, jobtype="*", remote_id=False, restrict_claimable=True):
 74		"""
 75		Get all unclaimed (and claimable) jobs
 76
 77		:param string jobtype:  Type of job, "*" for all types
 78		:param string remote_id:  Remote ID, takes precedence over `jobtype`
 79		:param bool restrict_claimable:  Only return jobs that may be claimed
 80		according to their parameters
 81		:return list:
 82		"""
 83		replacements = []
 84		if remote_id:
 85			filter = "WHERE remote_id = %s"
 86			replacements = [remote_id]
 87		elif jobtype != "*":
 88			filter = "WHERE jobtype = %s"
 89			replacements = [jobtype]
 90		else:
 91			filter = "WHERE jobtype != ''"
 92
 93		query = "SELECT * FROM jobs %s" % filter
 94
 95		if restrict_claimable:
 96			query += ("        AND timestamp_claimed = 0"
 97					  "              AND timestamp_after < %s"
 98					  "              AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
 99
100			now = int(time.time())
101			replacements.append(now)
102			replacements.append(now)
103
104		query += "         ORDER BY timestamp ASC"
105
106		jobs = self.db.fetchall(query, replacements)
107
108		return [Job.get_by_data(job, self.db) for job in jobs if job]

Get all unclaimed (and claimable) jobs

Parameters
  • string jobtype: Type of job, "*" for all types
  • string remote_id: Remote ID, takes precedence over jobtype
  • bool restrict_claimable: Only return jobs that may be claimed according to their parameters
Returns
def get_job_count(self, jobtype='*'):
110	def get_job_count(self, jobtype="*"):
111		"""
112		Get total number of jobs
113
114		:param jobtype:  Type of jobs to count. Default (`*`) counts all jobs.
115		:return int:  Number of jobs
116		"""
117		if jobtype == "*":
118			count = self.db.fetchone("SELECT COUNT(*) FROM jobs;", ())
119		else:
120			count = self.db.fetchone("SELECT COUNT(*) FROM jobs WHERE jobtype = %s;", (jobtype,))
121
122		return int(count["count"])

Get total number of jobs

Parameters
  • jobtype: Type of jobs to count. Default (*) counts all jobs.
Returns

Number of jobs

def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
124	def add_job(self, jobtype, details=None, remote_id=0, claim_after=0, interval=0):
125		"""
126		Add a new job to the queue
127
128		There can only be one job for any combination of job type and remote id. If a job
129		already exists for the given combination, no new job is added.
130
131		:param jobtype:  Job type
132		:param details:  Job details - may be empty, will be stored as JSON
133		:param remote_id:  Remote ID of object to work on. For example, a post or thread ID
134		:param claim_after:  Absolute timestamp after which job may be claimed
135		:param interval:  If this is not zero, the job is made a repeating job,
136		                  which will be repeated at most every `interval` seconds.
137
138		:return Job: A job that matches the input type and remote ID. This may
139		             be a newly added job or an existing that matched the same
140		             combination (which is required to be unique, so no new job
141		             with those parameters could be queued, and the old one is
142		             just as valid).
143		"""
144		data = {
145			"jobtype": jobtype,
146			"details": json.dumps(details),
147			"timestamp": int(time.time()),
148			"timestamp_claimed": 0,
149			"timestamp_lastclaimed": 0,
150			"remote_id": remote_id,
151			"timestamp_after": claim_after,
152			"interval": interval,
153			"attempts": 0
154		}
155
156		self.db.insert("jobs", data, safe=True, constraints=("jobtype", "remote_id"))
157
158		return Job.get_by_data(data, database=self.db)

Add a new job to the queue

There can only be one job for any combination of job type and remote id. If a job already exists for the given combination, no new job is added.

Parameters
  • jobtype: Job type
  • details: Job details - may be empty, will be stored as JSON
  • remote_id: Remote ID of object to work on. For example, a post or thread ID
  • claim_after: Absolute timestamp after which job may be claimed
  • interval: If this is not zero, the job is made a repeating job, which will be repeated at most every interval seconds.
Returns

A job that matches the input type and remote ID. This may be a newly added job or an existing that matched the same combination (which is required to be unique, so no new job with those parameters could be queued, and the old one is just as valid).

def release_all(self):
160	def release_all(self):
161		"""
162		Release all jobs
163
164		All claimed jobs are released. This is useful to run when the backend is restarted.
165		"""
166		self.db.execute("UPDATE jobs SET timestamp_claimed = 0")

Release all jobs

All claimed jobs are released. This is useful to run when the backend is restarted.

def get_place_in_queue(self, job):
168	def get_place_in_queue(self, job):
169		"""
170		What is the place of this job in the queue?
171
172		:param Job job:  Job to get place in queue for
173
174		:return int: Place in queue. 0 means the job is currently being
175		processed; 1+ means the job is queued, with 1 corresponding to the
176		front of the queue.
177		"""
178		if job.data["timestamp_claimed"] > 0:
179			return 0
180
181		all_queued = self.get_all_jobs(jobtype=job.data["jobtype"])
182		our_timestamp = job.data["timestamp"]
183		return len(
184			[queued_job for queued_job in all_queued if queued_job.data["timestamp"] < our_timestamp or queued_job.data["timestamp_claimed"] > 0])

What is the place of this job in the queue?

Parameters
  • Job job: Job to get place in queue for
Returns

Place in queue. 0 means the job is currently being processed; 1+ means the job is queued, with 1 corresponding to the front of the queue.