Edit on GitHub

common.lib.job

Class that represents a job in the job queue

  1"""
  2Class that represents a job in the job queue
  3"""
  4
  5import time
  6import json
  7import math
  8from common.lib.exceptions import JobClaimedException, JobNotFoundException
  9
 10class Job:
 11	"""
 12	Job in queue
 13	"""
 14	data = {}
 15	db = None
 16
 17	is_finished = False
 18	is_claimed = False
 19
 20	def __init__(self, data, database=None):
 21		"""
 22		Instantiate Job object
 23
 24		:param dict data:  Job data, should correspond to a database record
 25		:param database:  Database handler
 26		"""
 27		self.data = data
 28		self.db = database
 29
 30		self.data["remote_id"] = str(self.data["remote_id"])
 31
 32		try:
 33			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
 34			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
 35		except KeyError:
 36			raise Exception
 37
 38	def get_by_ID(id, database):
 39		"""
 40		Instantiate job object by ID
 41
 42		:param int id: Job ID
 43		:param database:  Database handler
 44		:return Job: Job object
 45		"""
 46		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
 47		if not data:
 48			raise JobNotFoundException
 49
 50		return Job.get_by_data(data, database)
 51
 52	def get_by_data(data, database):
 53		"""
 54		Instantiate job object with given data
 55
 56		:param dict data:  Job data, should correspond to a database row
 57		:param database: Database handler
 58		:return Job: Job object
 59		"""
 60		return Job(data, database)
 61
 62	def get_by_remote_ID(remote_id, database, jobtype="*"):
 63		"""
 64		Instantiate job object by combination of remote ID and job type
 65
 66		This combination is guaranteed to be unique.
 67
 68		:param database: Database handler
 69		:param str jobtype: Job type
 70		:param str remote_id: Job remote ID
 71		:return Job: Job object
 72		"""
 73		if jobtype != "*":
 74			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
 75		else:
 76			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
 77
 78		if not data:
 79			raise JobNotFoundException
 80
 81		return Job.get_by_data(data, database=database)
 82
 83	def claim(self):
 84		"""
 85		Claim a job
 86
 87		This marks it in the database so it cannot be claimed again.
 88		"""
 89		if self.data["interval"] == 0:
 90			claim_time = int(time.time())
 91		else:
 92			# the claim time should be a multiple of the interval to prevent
 93			# drift of the interval over time. this ensures that on average,
 94			# the interval remains as set
 95			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 96
 97		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
 98								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
 99										"timestamp_claimed": 0})
100
101		if updated == 0:
102			raise JobClaimedException
103
104		self.data["timestamp_claimed"] = claim_time
105		self.data["timestamp_lastclaimed"] = claim_time
106
107		self.is_claimed = True
108
109	def finish(self, delete=False):
110		"""
111		Finish job
112
113		This deletes it from the database, or in the case of recurring jobs,
114		resets the claim flags.
115
116		:param bool delete: Whether to force deleting the job even if it is a
117							job with an interval.
118		"""
119		if self.data["interval"] == 0 or delete:
120			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
121		else:
122			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
123						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
124
125		self.is_finished = True
126
127	def release(self, delay=0, claim_after=0):
128		"""
129		Release a job so it may be claimed again
130
131		:param int delay: Delay in seconds after which job may be reclaimed.
132		:param int claim_after:  Timestamp after which job may be claimed. This
133		is overridden by `delay`.
134		"""
135		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
136		if delay > 0:
137			update["timestamp_after"] = int(time.time()) + delay
138		elif claim_after is not None:
139			update["timestamp_after"] = claim_after
140
141		self.db.update("jobs", data=update,
142					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
143		self.is_claimed = False
144
145	def is_claimable(self):
146		"""
147		Can this job be claimed?
148
149		:return bool: If the job is not claimed yet and also isn't finished.
150		"""
151		return not self.is_claimed and not self.is_finished
152
153	def get_place_in_queue(self):
154		"""
155		Get the place of this job in the queue
156
157		:return int: Place in queue
158		"""
159		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
160		replacements = [self.data["jobtype"]]
161		if self.data["timestamp_after"] == 0:
162			# Job can be claimed immediately
163			query += (
164				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
165				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
166			replacements += [self.data["timestamp"], self.data["timestamp"]]
167		else:
168			# Job must wait until timestamp_after
169			query += (
170				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
171				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
172			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
173		queue_result = self.db.fetchone(query, replacements)
174		if queue_result["queue_ahead"] is None:
175			raise Exception(f"what? {queue_result}")
176
177		return queue_result["queue_ahead"]
178
179	@property
180	def details(self):
181		try:
182			details = json.loads(self.data["details"])
183			if details:
184				return details
185			else:
186				return {}
187		except (TypeError, json.JSONDecodeError):
188			return {}
class Job:
 11class Job:
 12	"""
 13	Job in queue
 14	"""
 15	data = {}
 16	db = None
 17
 18	is_finished = False
 19	is_claimed = False
 20
 21	def __init__(self, data, database=None):
 22		"""
 23		Instantiate Job object
 24
 25		:param dict data:  Job data, should correspond to a database record
 26		:param database:  Database handler
 27		"""
 28		self.data = data
 29		self.db = database
 30
 31		self.data["remote_id"] = str(self.data["remote_id"])
 32
 33		try:
 34			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
 35			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
 36		except KeyError:
 37			raise Exception
 38
 39	def get_by_ID(id, database):
 40		"""
 41		Instantiate job object by ID
 42
 43		:param int id: Job ID
 44		:param database:  Database handler
 45		:return Job: Job object
 46		"""
 47		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
 48		if not data:
 49			raise JobNotFoundException
 50
 51		return Job.get_by_data(data, database)
 52
 53	def get_by_data(data, database):
 54		"""
 55		Instantiate job object with given data
 56
 57		:param dict data:  Job data, should correspond to a database row
 58		:param database: Database handler
 59		:return Job: Job object
 60		"""
 61		return Job(data, database)
 62
 63	def get_by_remote_ID(remote_id, database, jobtype="*"):
 64		"""
 65		Instantiate job object by combination of remote ID and job type
 66
 67		This combination is guaranteed to be unique.
 68
 69		:param database: Database handler
 70		:param str jobtype: Job type
 71		:param str remote_id: Job remote ID
 72		:return Job: Job object
 73		"""
 74		if jobtype != "*":
 75			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
 76		else:
 77			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
 78
 79		if not data:
 80			raise JobNotFoundException
 81
 82		return Job.get_by_data(data, database=database)
 83
 84	def claim(self):
 85		"""
 86		Claim a job
 87
 88		This marks it in the database so it cannot be claimed again.
 89		"""
 90		if self.data["interval"] == 0:
 91			claim_time = int(time.time())
 92		else:
 93			# the claim time should be a multiple of the interval to prevent
 94			# drift of the interval over time. this ensures that on average,
 95			# the interval remains as set
 96			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 97
 98		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
 99								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
100										"timestamp_claimed": 0})
101
102		if updated == 0:
103			raise JobClaimedException
104
105		self.data["timestamp_claimed"] = claim_time
106		self.data["timestamp_lastclaimed"] = claim_time
107
108		self.is_claimed = True
109
110	def finish(self, delete=False):
111		"""
112		Finish job
113
114		This deletes it from the database, or in the case of recurring jobs,
115		resets the claim flags.
116
117		:param bool delete: Whether to force deleting the job even if it is a
118							job with an interval.
119		"""
120		if self.data["interval"] == 0 or delete:
121			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
122		else:
123			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
124						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
125
126		self.is_finished = True
127
128	def release(self, delay=0, claim_after=0):
129		"""
130		Release a job so it may be claimed again
131
132		:param int delay: Delay in seconds after which job may be reclaimed.
133		:param int claim_after:  Timestamp after which job may be claimed. This
134		is overridden by `delay`.
135		"""
136		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
137		if delay > 0:
138			update["timestamp_after"] = int(time.time()) + delay
139		elif claim_after is not None:
140			update["timestamp_after"] = claim_after
141
142		self.db.update("jobs", data=update,
143					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
144		self.is_claimed = False
145
146	def is_claimable(self):
147		"""
148		Can this job be claimed?
149
150		:return bool: If the job is not claimed yet and also isn't finished.
151		"""
152		return not self.is_claimed and not self.is_finished
153
154	def get_place_in_queue(self):
155		"""
156		Get the place of this job in the queue
157
158		:return int: Place in queue
159		"""
160		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
161		replacements = [self.data["jobtype"]]
162		if self.data["timestamp_after"] == 0:
163			# Job can be claimed immediately
164			query += (
165				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
166				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
167			replacements += [self.data["timestamp"], self.data["timestamp"]]
168		else:
169			# Job must wait until timestamp_after
170			query += (
171				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
172				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
173			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
174		queue_result = self.db.fetchone(query, replacements)
175		if queue_result["queue_ahead"] is None:
176			raise Exception(f"what? {queue_result}")
177
178		return queue_result["queue_ahead"]
179
180	@property
181	def details(self):
182		try:
183			details = json.loads(self.data["details"])
184			if details:
185				return details
186			else:
187				return {}
188		except (TypeError, json.JSONDecodeError):
189			return {}

Job in queue

Job(data, database=None)
21	def __init__(self, data, database=None):
22		"""
23		Instantiate Job object
24
25		:param dict data:  Job data, should correspond to a database record
26		:param database:  Database handler
27		"""
28		self.data = data
29		self.db = database
30
31		self.data["remote_id"] = str(self.data["remote_id"])
32
33		try:
34			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
35			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
36		except KeyError:
37			raise Exception

Instantiate Job object

Parameters
  • dict data: Job data, should correspond to a database record
  • database: Database handler
data = {}
db = None
is_finished = False
is_claimed = False
def get_by_ID(id, database):
39	def get_by_ID(id, database):
40		"""
41		Instantiate job object by ID
42
43		:param int id: Job ID
44		:param database:  Database handler
45		:return Job: Job object
46		"""
47		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
48		if not data:
49			raise JobNotFoundException
50
51		return Job.get_by_data(data, database)

Instantiate job object by ID

Parameters
  • int id: Job ID
  • database: Database handler
Returns

Job object

def get_by_data(data, database):
53	def get_by_data(data, database):
54		"""
55		Instantiate job object with given data
56
57		:param dict data:  Job data, should correspond to a database row
58		:param database: Database handler
59		:return Job: Job object
60		"""
61		return Job(data, database)

Instantiate job object with given data

Parameters
  • dict data: Job data, should correspond to a database row
  • database: Database handler
Returns

Job object

def get_by_remote_ID(remote_id, database, jobtype='*'):
63	def get_by_remote_ID(remote_id, database, jobtype="*"):
64		"""
65		Instantiate job object by combination of remote ID and job type
66
67		This combination is guaranteed to be unique.
68
69		:param database: Database handler
70		:param str jobtype: Job type
71		:param str remote_id: Job remote ID
72		:return Job: Job object
73		"""
74		if jobtype != "*":
75			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
76		else:
77			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
78
79		if not data:
80			raise JobNotFoundException
81
82		return Job.get_by_data(data, database=database)

Instantiate job object by combination of remote ID and job type

This combination is guaranteed to be unique.

Parameters
  • database: Database handler
  • str jobtype: Job type
  • str remote_id: Job remote ID
Returns

Job object

def claim(self):
 84	def claim(self):
 85		"""
 86		Claim a job
 87
 88		This marks it in the database so it cannot be claimed again.
 89		"""
 90		if self.data["interval"] == 0:
 91			claim_time = int(time.time())
 92		else:
 93			# the claim time should be a multiple of the interval to prevent
 94			# drift of the interval over time. this ensures that on average,
 95			# the interval remains as set
 96			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 97
 98		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
 99								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
100										"timestamp_claimed": 0})
101
102		if updated == 0:
103			raise JobClaimedException
104
105		self.data["timestamp_claimed"] = claim_time
106		self.data["timestamp_lastclaimed"] = claim_time
107
108		self.is_claimed = True

Claim a job

This marks it in the database so it cannot be claimed again.

def finish(self, delete=False):
110	def finish(self, delete=False):
111		"""
112		Finish job
113
114		This deletes it from the database, or in the case of recurring jobs,
115		resets the claim flags.
116
117		:param bool delete: Whether to force deleting the job even if it is a
118							job with an interval.
119		"""
120		if self.data["interval"] == 0 or delete:
121			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
122		else:
123			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
124						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
125
126		self.is_finished = True

Finish job

This deletes it from the database, or in the case of recurring jobs, resets the claim flags.

Parameters
  • bool delete: Whether to force deleting the job even if it is a job with an interval.
def release(self, delay=0, claim_after=0):
128	def release(self, delay=0, claim_after=0):
129		"""
130		Release a job so it may be claimed again
131
132		:param int delay: Delay in seconds after which job may be reclaimed.
133		:param int claim_after:  Timestamp after which job may be claimed. This
134		is overridden by `delay`.
135		"""
136		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
137		if delay > 0:
138			update["timestamp_after"] = int(time.time()) + delay
139		elif claim_after is not None:
140			update["timestamp_after"] = claim_after
141
142		self.db.update("jobs", data=update,
143					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
144		self.is_claimed = False

Release a job so it may be claimed again

Parameters
  • int delay: Delay in seconds after which job may be reclaimed.
  • int claim_after: Timestamp after which job may be claimed. This is overridden by delay.
def is_claimable(self):
146	def is_claimable(self):
147		"""
148		Can this job be claimed?
149
150		:return bool: If the job is not claimed yet and also isn't finished.
151		"""
152		return not self.is_claimed and not self.is_finished

Can this job be claimed?

Returns

If the job is not claimed yet and also isn't finished.

def get_place_in_queue(self):
154	def get_place_in_queue(self):
155		"""
156		Get the place of this job in the queue
157
158		:return int: Place in queue
159		"""
160		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
161		replacements = [self.data["jobtype"]]
162		if self.data["timestamp_after"] == 0:
163			# Job can be claimed immediately
164			query += (
165				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
166				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
167			replacements += [self.data["timestamp"], self.data["timestamp"]]
168		else:
169			# Job must wait until timestamp_after
170			query += (
171				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
172				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
173			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
174		queue_result = self.db.fetchone(query, replacements)
175		if queue_result["queue_ahead"] is None:
176			raise Exception(f"what? {queue_result}")
177
178		return queue_result["queue_ahead"]

Get the place of this job in the queue

Returns

Place in queue

details
180	@property
181	def details(self):
182		try:
183			details = json.loads(self.data["details"])
184			if details:
185				return details
186			else:
187				return {}
188		except (TypeError, json.JSONDecodeError):
189			return {}