Edit on GitHub

common.lib.job

Class that represents a job in the job queue

  1"""
  2Class that represents a job in the job queue
  3"""
  4
  5import time
  6import json
  7import math
  8from common.lib.exceptions import JobClaimedException, JobNotFoundException
  9
 10
 11class Job:
 12	"""
 13	Job in queue
 14	"""
 15	data = {}
 16	db = None
 17
 18	is_finished = False
 19	is_claimed = False
 20
 21	def __init__(self, data, database=None):
 22		"""
 23		Instantiate Job object
 24
 25		:param dict data:  Job data, should correspond to a database record
 26		:param database:  Database handler
 27		"""
 28		self.data = data
 29		self.db = database
 30
 31		self.data["remote_id"] = str(self.data["remote_id"])
 32
 33		try:
 34			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
 35			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
 36		except KeyError:
 37			raise Exception
 38
 39	def get_by_ID(id, database):
 40		"""
 41		Instantiate job object by ID
 42
 43		:param int id: Job ID
 44		:param database:  Database handler
 45		:return Job: Job object
 46		"""
 47		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
 48		if not data:
 49			raise JobNotFoundException
 50
 51		return Job.get_by_data(data, database)
 52
 53	def get_by_data(data, database):
 54		"""
 55		Instantiate job object with given data
 56
 57		:param dict data:  Job data, should correspond to a database row
 58		:param database: Database handler
 59		:return Job: Job object
 60		"""
 61		return Job(data, database)
 62
 63	def get_by_remote_ID(remote_id, database, jobtype="*"):
 64		"""
 65		Instantiate job object by combination of remote ID and job type
 66
 67		This combination is guaranteed to be unique.
 68
 69		:param database: Database handler
 70		:param str jobtype: Job type
 71		:param str remote_id: Job remote ID
 72		:return Job: Job object
 73		"""
 74		if jobtype != "*":
 75			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
 76		else:
 77			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
 78
 79		if not data:
 80			raise JobNotFoundException
 81
 82		return Job.get_by_data(data, database=database)
 83
 84	def claim(self):
 85		"""
 86		Claim a job
 87
 88		This marks it in the database so it cannot be claimed again.
 89		"""
 90		if self.data["interval"] == 0:
 91			claim_time = int(time.time())
 92		else:
 93			# the claim time should be a multiple of the interval to prevent
 94			# drift of the interval over time. this ensures that on average,
 95			# the interval remains as set
 96			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 97
 98		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
 99								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
100										"timestamp_claimed": 0})
101
102		if updated == 0:
103			raise JobClaimedException
104
105		self.data["timestamp_claimed"] = claim_time
106		self.data["timestamp_lastclaimed"] = claim_time
107
108		self.is_claimed = True
109
110	def finish(self, delete=False):
111		"""
112		Finish job
113
114		This deletes it from the database, or in the case of recurring jobs,
115		resets the claim flags.
116
117		:param bool delete: Whether to force deleting the job even if it is a
118							job with an interval.
119		"""
120		if self.data["interval"] == 0 or delete:
121			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
122		else:
123			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
124						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
125
126		self.is_finished = True
127
128	def release(self, delay=0, claim_after=0):
129		"""
130		Release a job so it may be claimed again
131
132		:param int delay: Delay in seconds after which job may be reclaimed.
133		:param int claim_after:  Timestamp after which job may be claimed. This
134		is overridden by `delay`.
135		"""
136		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
137		if delay > 0:
138			update["timestamp_after"] = int(time.time()) + delay
139		elif claim_after is not None:
140			update["timestamp_after"] = claim_after
141
142		self.db.update("jobs", data=update,
143					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
144		self.is_claimed = False
145
146	def is_claimable(self):
147		"""
148		Can this job be claimed?
149
150		:return bool: If the job is not claimed yet and also isn't finished.
151		"""
152		return not self.is_claimed and not self.is_finished
153
154	def get_place_in_queue(self):
155		"""
156		Get the place of this job in the queue
157
158		:return int: Place in queue
159		"""
160		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
161		replacements = [self.data["jobtype"]]
162		if self.data["timestamp_after"] == 0:
163			# Job can be claimed immediately
164			query += (
165				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
166				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
167			replacements += [self.data["timestamp"], self.data["timestamp"]]
168		else:
169			# Job must wait until timestamp_after
170			query += (
171				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
172				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
173			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
174		queue_result = self.db.fetchone(query, replacements)
175		if queue_result["queue_ahead"] is None:
176			raise Exception(f"what? {queue_result}")
177
178		return queue_result["queue_ahead"]
179
180	@property
181	def details(self):
182		try:
183			details = json.loads(self.data["details"])
184			if details:
185				return details
186			else:
187				return {}
188		except (TypeError, json.JSONDecodeError):
189			return {}
class Job:
 12class Job:
 13	"""
 14	Job in queue
 15	"""
 16	data = {}
 17	db = None
 18
 19	is_finished = False
 20	is_claimed = False
 21
 22	def __init__(self, data, database=None):
 23		"""
 24		Instantiate Job object
 25
 26		:param dict data:  Job data, should correspond to a database record
 27		:param database:  Database handler
 28		"""
 29		self.data = data
 30		self.db = database
 31
 32		self.data["remote_id"] = str(self.data["remote_id"])
 33
 34		try:
 35			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
 36			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
 37		except KeyError:
 38			raise Exception
 39
 40	def get_by_ID(id, database):
 41		"""
 42		Instantiate job object by ID
 43
 44		:param int id: Job ID
 45		:param database:  Database handler
 46		:return Job: Job object
 47		"""
 48		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
 49		if not data:
 50			raise JobNotFoundException
 51
 52		return Job.get_by_data(data, database)
 53
 54	def get_by_data(data, database):
 55		"""
 56		Instantiate job object with given data
 57
 58		:param dict data:  Job data, should correspond to a database row
 59		:param database: Database handler
 60		:return Job: Job object
 61		"""
 62		return Job(data, database)
 63
 64	def get_by_remote_ID(remote_id, database, jobtype="*"):
 65		"""
 66		Instantiate job object by combination of remote ID and job type
 67
 68		This combination is guaranteed to be unique.
 69
 70		:param database: Database handler
 71		:param str jobtype: Job type
 72		:param str remote_id: Job remote ID
 73		:return Job: Job object
 74		"""
 75		if jobtype != "*":
 76			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
 77		else:
 78			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
 79
 80		if not data:
 81			raise JobNotFoundException
 82
 83		return Job.get_by_data(data, database=database)
 84
 85	def claim(self):
 86		"""
 87		Claim a job
 88
 89		This marks it in the database so it cannot be claimed again.
 90		"""
 91		if self.data["interval"] == 0:
 92			claim_time = int(time.time())
 93		else:
 94			# the claim time should be a multiple of the interval to prevent
 95			# drift of the interval over time. this ensures that on average,
 96			# the interval remains as set
 97			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 98
 99		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
100								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
101										"timestamp_claimed": 0})
102
103		if updated == 0:
104			raise JobClaimedException
105
106		self.data["timestamp_claimed"] = claim_time
107		self.data["timestamp_lastclaimed"] = claim_time
108
109		self.is_claimed = True
110
111	def finish(self, delete=False):
112		"""
113		Finish job
114
115		This deletes it from the database, or in the case of recurring jobs,
116		resets the claim flags.
117
118		:param bool delete: Whether to force deleting the job even if it is a
119							job with an interval.
120		"""
121		if self.data["interval"] == 0 or delete:
122			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
123		else:
124			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
125						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
126
127		self.is_finished = True
128
129	def release(self, delay=0, claim_after=0):
130		"""
131		Release a job so it may be claimed again
132
133		:param int delay: Delay in seconds after which job may be reclaimed.
134		:param int claim_after:  Timestamp after which job may be claimed. This
135		is overridden by `delay`.
136		"""
137		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
138		if delay > 0:
139			update["timestamp_after"] = int(time.time()) + delay
140		elif claim_after is not None:
141			update["timestamp_after"] = claim_after
142
143		self.db.update("jobs", data=update,
144					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
145		self.is_claimed = False
146
147	def is_claimable(self):
148		"""
149		Can this job be claimed?
150
151		:return bool: If the job is not claimed yet and also isn't finished.
152		"""
153		return not self.is_claimed and not self.is_finished
154
155	def get_place_in_queue(self):
156		"""
157		Get the place of this job in the queue
158
159		:return int: Place in queue
160		"""
161		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
162		replacements = [self.data["jobtype"]]
163		if self.data["timestamp_after"] == 0:
164			# Job can be claimed immediately
165			query += (
166				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
167				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
168			replacements += [self.data["timestamp"], self.data["timestamp"]]
169		else:
170			# Job must wait until timestamp_after
171			query += (
172				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
173				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
174			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
175		queue_result = self.db.fetchone(query, replacements)
176		if queue_result["queue_ahead"] is None:
177			raise Exception(f"what? {queue_result}")
178
179		return queue_result["queue_ahead"]
180
181	@property
182	def details(self):
183		try:
184			details = json.loads(self.data["details"])
185			if details:
186				return details
187			else:
188				return {}
189		except (TypeError, json.JSONDecodeError):
190			return {}

Job in queue

Job(data, database=None)
22	def __init__(self, data, database=None):
23		"""
24		Instantiate Job object
25
26		:param dict data:  Job data, should correspond to a database record
27		:param database:  Database handler
28		"""
29		self.data = data
30		self.db = database
31
32		self.data["remote_id"] = str(self.data["remote_id"])
33
34		try:
35			self.is_finished = "is_finished" in self.data and self.data["is_finished"]
36			self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
37		except KeyError:
38			raise Exception

Instantiate Job object

Parameters
  • dict data: Job data, should correspond to a database record
  • database: Database handler
data = {}
db = None
is_finished = False
is_claimed = False
def get_by_ID(id, database):
40	def get_by_ID(id, database):
41		"""
42		Instantiate job object by ID
43
44		:param int id: Job ID
45		:param database:  Database handler
46		:return Job: Job object
47		"""
48		data = database.fetchone("SELECT * FROM jobs WHERE id = %s", (id,))
49		if not data:
50			raise JobNotFoundException
51
52		return Job.get_by_data(data, database)

Instantiate job object by ID

Parameters
  • int id: Job ID
  • database: Database handler
Returns

Job object

def get_by_data(data, database):
54	def get_by_data(data, database):
55		"""
56		Instantiate job object with given data
57
58		:param dict data:  Job data, should correspond to a database row
59		:param database: Database handler
60		:return Job: Job object
61		"""
62		return Job(data, database)

Instantiate job object with given data

Parameters
  • dict data: Job data, should correspond to a database row
  • database: Database handler
Returns

Job object

def get_by_remote_ID(remote_id, database, jobtype='*'):
64	def get_by_remote_ID(remote_id, database, jobtype="*"):
65		"""
66		Instantiate job object by combination of remote ID and job type
67
68		This combination is guaranteed to be unique.
69
70		:param database: Database handler
71		:param str jobtype: Job type
72		:param str remote_id: Job remote ID
73		:return Job: Job object
74		"""
75		if jobtype != "*":
76			data = database.fetchone("SELECT * FROM jobs WHERE jobtype = %s AND remote_id = %s", (jobtype, remote_id))
77		else:
78			data = database.fetchone("SELECT * FROM jobs WHERE remote_id = %s", (remote_id,))
79
80		if not data:
81			raise JobNotFoundException
82
83		return Job.get_by_data(data, database=database)

Instantiate job object by combination of remote ID and job type

This combination is guaranteed to be unique.

Parameters
  • database: Database handler
  • str jobtype: Job type
  • str remote_id: Job remote ID
Returns

Job object

def claim(self):
 85	def claim(self):
 86		"""
 87		Claim a job
 88
 89		This marks it in the database so it cannot be claimed again.
 90		"""
 91		if self.data["interval"] == 0:
 92			claim_time = int(time.time())
 93		else:
 94			# the claim time should be a multiple of the interval to prevent
 95			# drift of the interval over time. this ensures that on average,
 96			# the interval remains as set
 97			claim_time = math.floor(int(time.time()) / self.data["interval"]) * self.data["interval"]
 98
 99		updated = self.db.update("jobs", data={"timestamp_claimed": claim_time, "timestamp_lastclaimed": claim_time},
100								 where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"],
101										"timestamp_claimed": 0})
102
103		if updated == 0:
104			raise JobClaimedException
105
106		self.data["timestamp_claimed"] = claim_time
107		self.data["timestamp_lastclaimed"] = claim_time
108
109		self.is_claimed = True

Claim a job

This marks it in the database so it cannot be claimed again.

def finish(self, delete=False):
111	def finish(self, delete=False):
112		"""
113		Finish job
114
115		This deletes it from the database, or in the case of recurring jobs,
116		resets the claim flags.
117
118		:param bool delete: Whether to force deleting the job even if it is a
119							job with an interval.
120		"""
121		if self.data["interval"] == 0 or delete:
122			self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
123		else:
124			self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
125						   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
126
127		self.is_finished = True

Finish job

This deletes it from the database, or in the case of recurring jobs, resets the claim flags.

Parameters
  • bool delete: Whether to force deleting the job even if it is a job with an interval.
def release(self, delay=0, claim_after=0):
129	def release(self, delay=0, claim_after=0):
130		"""
131		Release a job so it may be claimed again
132
133		:param int delay: Delay in seconds after which job may be reclaimed.
134		:param int claim_after:  Timestamp after which job may be claimed. This
135		is overridden by `delay`.
136		"""
137		update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
138		if delay > 0:
139			update["timestamp_after"] = int(time.time()) + delay
140		elif claim_after is not None:
141			update["timestamp_after"] = claim_after
142
143		self.db.update("jobs", data=update,
144					   where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
145		self.is_claimed = False

Release a job so it may be claimed again

Parameters
  • int delay: Delay in seconds after which job may be reclaimed.
  • int claim_after: Timestamp after which job may be claimed. This is overridden by delay.
def is_claimable(self):
147	def is_claimable(self):
148		"""
149		Can this job be claimed?
150
151		:return bool: If the job is not claimed yet and also isn't finished.
152		"""
153		return not self.is_claimed and not self.is_finished

Can this job be claimed?

Returns

If the job is not claimed yet and also isn't finished.

def get_place_in_queue(self):
155	def get_place_in_queue(self):
156		"""
157		Get the place of this job in the queue
158
159		:return int: Place in queue
160		"""
161		query = "SELECT COUNT(*) as queue_ahead FROM jobs WHERE jobtype = %s"
162		replacements = [self.data["jobtype"]]
163		if self.data["timestamp_after"] == 0:
164			# Job can be claimed immediately
165			query += (
166				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job being queued
167				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting for a specific time, but prior to this job being queued
168			replacements += [self.data["timestamp"], self.data["timestamp"]]
169		else:
170			# Job must wait until timestamp_after
171			query += (
172				" AND (timestamp_after = 0 AND timestamp < %s OR "  # Other jobs that can be claimed immediately and were queued prior to this job
173				" timestamp_after > 0 AND timestamp_after < %s) ")  # Other jobs that are waiting, but prior to this job's start time
174			replacements += [self.data["timestamp_after"], self.data["timestamp_after"]]
175		queue_result = self.db.fetchone(query, replacements)
176		if queue_result["queue_ahead"] is None:
177			raise Exception(f"what? {queue_result}")
178
179		return queue_result["queue_ahead"]

Get the place of this job in the queue

Returns

Place in queue

details
181	@property
182	def details(self):
183		try:
184			details = json.loads(self.data["details"])
185			if details:
186				return details
187			else:
188				return {}
189		except (TypeError, json.JSONDecodeError):
190			return {}