Edit on GitHub

backend.workers.api

  1import socket
  2import time
  3import json
  4
  5from backend.lib.worker import BasicWorker
  6from common.lib.job import Job
  7from common.lib.exceptions import JobNotFoundException
  8
  9
 10class InternalAPI(BasicWorker):
 11	"""
 12	Offer a local server that listens on a port for API calls and answers them
 13	"""
 14	type = "api"
 15	max_workers = 1
 16
 17	host = None
 18	port = None
 19
 20	@classmethod
 21	def ensure_job(cls, config=None):
 22		"""
 23		Ensure that the API worker is always running
 24
 25		This is used to ensure that the API worker is always running, and if it
 26		is not, it will be started by the WorkerManager.
 27
 28		:return:  Job parameters for the worker
 29		"""
 30		return {"remote_id": "localhost"}
 31
 32	def work(self):
 33		"""
 34		Listen for API requests
 35
 36		Opens a socket that continuously listens for requests, and passes a
 37		client object on to a handling method if a connection is established
 38
 39		:return:
 40		"""
 41		self.host = self.config.get('API_HOST')
 42		self.port = self.config.get('API_PORT')
 43
 44		if self.port == 0:
 45			# if configured not to listen, just loop until the backend shuts
 46			# down we can't return here immediately, since this is a worker,
 47			# and workers that end just get started again
 48			self.db.close()
 49			self.manager.log.info("Local API not available per configuration")
 50			while not self.interrupted:
 51				time.sleep(1)
 52			return
 53
 54		# set up the socket
 55		server = socket.socket()
 56		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 57		server.settimeout(2)  # should be plenty
 58
 59		has_time = True
 60		start_trying = int(time.time())
 61		while has_time:
 62			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
 63			try:
 64				server.bind((self.host, self.port))
 65				break
 66			except OSError as e:
 67				if has_time and not self.interrupted:
 68					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
 69					time.sleep(10.0)  # wait a few seconds before retrying
 70					continue
 71				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
 72				return
 73			except ConnectionRefusedError:
 74				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
 75				return
 76
 77		server.listen()
 78		server.settimeout(2)
 79		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
 80
 81		# continually listen for new connections
 82		while not self.interrupted:
 83			try:
 84				client, address = server.accept()
 85			except (socket.timeout, TimeoutError):
 86				if self.interrupted:
 87					break
 88				# no problemo, just listen again - this only times out so it won't hang the entire app when
 89				# trying to exit, as there's no other way to easily interrupt accept()
 90				continue
 91
 92			self.api_response(client, address)
 93			client.close()
 94
 95		self.manager.log.info("Shutting down local API server")
 96
 97	def api_response(self, client, address):
 98		"""
 99		Respond to API requests
100
101		Gets the request, parses it as JSON, and if it is indeed JSON and of
102		the proper format, a response is generated and returned as JSON via
103		the client object
104
105		:param client:  Client object representing the connection
106		:param tuple address:  (IP, port) of the request
107		:return: Response
108		"""
109		client.settimeout(2)  # should be plenty
110
111		try:
112			buffer = ""
113			while True:
114				# receive data in 1k chunks
115				try:
116					data = client.recv(1024).decode("ascii")
117				except UnicodeDecodeError:
118					raise InternalAPIException
119
120				buffer += data
121				if not data or data.strip() == "" or len(data) > 2048:
122					break
123
124				# start processing as soon as we have valid json
125				try:
126					json.loads(buffer)
127					break
128				except json.JSONDecodeError:
129					pass
130
131			if not buffer:
132				raise InternalAPIException
133		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
134			# this means that no valid request was sent
135			self.manager.log.info("No input on API call from %s:%s - closing" % address)
136			return False
137
138		self.manager.log.debug("Received API request from %s:%s" % address)
139
140		try:
141			payload = json.loads(buffer)
142			if "request" not in payload:
143				raise InternalAPIException
144
145			response = self.process_request(payload["request"], payload)
146			if not response:
147				raise InternalAPIException
148
149			response = json.dumps({"error": False, "response": response})
150		except (json.JSONDecodeError, InternalAPIException):
151			response = json.dumps({"error": "Invalid JSON"})
152
153		try:
154			response = client.sendall(response.encode("ascii"))
155		except (BrokenPipeError, ConnectionError, socket.timeout):
156			response = None
157
158		return response
159
160	def process_request(self, request, payload):
161		"""
162		Generate API response
163
164		Checks the type of request, and returns an appropriate response for
165		the type.
166
167		:param str request:  Request identifier
168		:param payload:  Other data sent with the request
169		:return:  API response
170		"""
171		if request == "cancel-job":
172			# cancel a running job
173			payload = payload.get("payload", {})
174			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
175			try:
176				job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db)
177			except JobNotFoundException:
178				return {"error": "Job not found"}
179
180			self.manager.request_interrupt(job=job, interrupt_level=level)
181			return "OK"
182
183		elif request == "workers":
184			# return the number of workers, sorted by type
185			workers = {}
186			for jobtype in self.manager.worker_pool:
187				workers[jobtype] = len(self.manager.worker_pool[jobtype])
188
189			workers["total"] = sum([workers[workertype] for workertype in workers])
190
191			return workers
192
193		if request == "jobs":
194			# return queued jobs, sorted by type
195			jobs = self.db.fetchall("SELECT * FROM jobs")
196			if jobs is None:
197				return {"error": "Database unavailable"}
198
199			response = {}
200			for job in jobs:
201				if job["jobtype"] not in response:
202					response[job["jobtype"]] = 0
203				response[job["jobtype"]] += 1
204
205			response["total"] = sum([response[jobtype] for jobtype in response])
206
207			return response
208
209		if request == "datasets":
210			# datasets created per time period
211			week = 86400 * 7
212			now = int(time.time())
213
214			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
215
216			response = {
217				"1h": 0,
218				"1d": 0,
219				"1w": 0
220			}
221
222			for item in items:
223				response["1w"] += 1
224				if item["timestamp"] > now - 3600:
225					response["1h"] += 1
226				if item["timestamp"] > now - 86400:
227					response["1d"] += 1
228
229			return response
230
231		if request == "worker-status":
232			# technically more 'job status' than 'worker status', this returns
233			# all jobs plus, for those that are currently active, some worker
234			# info as well as related datasets. useful to monitor server
235			# activity and judge whether 4CAT can safely be interrupted
236			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
237			running = []
238			queue = {}
239
240			for job in open_jobs:
241				try:
242					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
243				except IndexError:
244					worker = None
245
246				if not bool(worker):
247					if job["jobtype"] not in queue:
248						queue[job["jobtype"]] = 0
249
250					queue[job["jobtype"]] += 1
251				else:
252					if hasattr(worker, "dataset") and worker.dataset:
253						running_key = worker.dataset.key
254						running_user = worker.dataset.creator
255						running_parent = worker.dataset.top_parent().key
256					else:
257						running_key = None
258						running_user = None
259						running_parent = None
260
261					running.append({
262						"type": job["jobtype"],
263						"is_claimed": job["timestamp_claimed"] > 0,
264						"is_running": bool(worker),
265						"is_processor": hasattr(worker, "dataset"),
266						"is_recurring": (int(job["interval"]) > 0),
267						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
268						"dataset_key": running_key,
269						"dataset_user": running_user,
270						"dataset_parent_key": running_parent,
271						"timestamp_queued": job["timestamp"],
272						"timestamp_claimed": job["timestamp_lastclaimed"]
273					})
274
275			return {
276				"running": running,
277				"queued": queue
278			}
279
280
281
282		# no appropriate response
283		return False
284
285
286class InternalAPIException(Exception):
287	# raised if API request could not be parsed
288	pass
class InternalAPI(backend.lib.worker.BasicWorker):
 11class InternalAPI(BasicWorker):
 12	"""
 13	Offer a local server that listens on a port for API calls and answers them
 14	"""
 15	type = "api"
 16	max_workers = 1
 17
 18	host = None
 19	port = None
 20
 21	@classmethod
 22	def ensure_job(cls, config=None):
 23		"""
 24		Ensure that the API worker is always running
 25
 26		This is used to ensure that the API worker is always running, and if it
 27		is not, it will be started by the WorkerManager.
 28
 29		:return:  Job parameters for the worker
 30		"""
 31		return {"remote_id": "localhost"}
 32
 33	def work(self):
 34		"""
 35		Listen for API requests
 36
 37		Opens a socket that continuously listens for requests, and passes a
 38		client object on to a handling method if a connection is established
 39
 40		:return:
 41		"""
 42		self.host = self.config.get('API_HOST')
 43		self.port = self.config.get('API_PORT')
 44
 45		if self.port == 0:
 46			# if configured not to listen, just loop until the backend shuts
 47			# down we can't return here immediately, since this is a worker,
 48			# and workers that end just get started again
 49			self.db.close()
 50			self.manager.log.info("Local API not available per configuration")
 51			while not self.interrupted:
 52				time.sleep(1)
 53			return
 54
 55		# set up the socket
 56		server = socket.socket()
 57		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 58		server.settimeout(2)  # should be plenty
 59
 60		has_time = True
 61		start_trying = int(time.time())
 62		while has_time:
 63			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
 64			try:
 65				server.bind((self.host, self.port))
 66				break
 67			except OSError as e:
 68				if has_time and not self.interrupted:
 69					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
 70					time.sleep(10.0)  # wait a few seconds before retrying
 71					continue
 72				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
 73				return
 74			except ConnectionRefusedError:
 75				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
 76				return
 77
 78		server.listen()
 79		server.settimeout(2)
 80		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
 81
 82		# continually listen for new connections
 83		while not self.interrupted:
 84			try:
 85				client, address = server.accept()
 86			except (socket.timeout, TimeoutError):
 87				if self.interrupted:
 88					break
 89				# no problemo, just listen again - this only times out so it won't hang the entire app when
 90				# trying to exit, as there's no other way to easily interrupt accept()
 91				continue
 92
 93			self.api_response(client, address)
 94			client.close()
 95
 96		self.manager.log.info("Shutting down local API server")
 97
 98	def api_response(self, client, address):
 99		"""
100		Respond to API requests
101
102		Gets the request, parses it as JSON, and if it is indeed JSON and of
103		the proper format, a response is generated and returned as JSON via
104		the client object
105
106		:param client:  Client object representing the connection
107		:param tuple address:  (IP, port) of the request
108		:return: Response
109		"""
110		client.settimeout(2)  # should be plenty
111
112		try:
113			buffer = ""
114			while True:
115				# receive data in 1k chunks
116				try:
117					data = client.recv(1024).decode("ascii")
118				except UnicodeDecodeError:
119					raise InternalAPIException
120
121				buffer += data
122				if not data or data.strip() == "" or len(data) > 2048:
123					break
124
125				# start processing as soon as we have valid json
126				try:
127					json.loads(buffer)
128					break
129				except json.JSONDecodeError:
130					pass
131
132			if not buffer:
133				raise InternalAPIException
134		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
135			# this means that no valid request was sent
136			self.manager.log.info("No input on API call from %s:%s - closing" % address)
137			return False
138
139		self.manager.log.debug("Received API request from %s:%s" % address)
140
141		try:
142			payload = json.loads(buffer)
143			if "request" not in payload:
144				raise InternalAPIException
145
146			response = self.process_request(payload["request"], payload)
147			if not response:
148				raise InternalAPIException
149
150			response = json.dumps({"error": False, "response": response})
151		except (json.JSONDecodeError, InternalAPIException):
152			response = json.dumps({"error": "Invalid JSON"})
153
154		try:
155			response = client.sendall(response.encode("ascii"))
156		except (BrokenPipeError, ConnectionError, socket.timeout):
157			response = None
158
159		return response
160
161	def process_request(self, request, payload):
162		"""
163		Generate API response
164
165		Checks the type of request, and returns an appropriate response for
166		the type.
167
168		:param str request:  Request identifier
169		:param payload:  Other data sent with the request
170		:return:  API response
171		"""
172		if request == "cancel-job":
173			# cancel a running job
174			payload = payload.get("payload", {})
175			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
176			try:
177				job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db)
178			except JobNotFoundException:
179				return {"error": "Job not found"}
180
181			self.manager.request_interrupt(job=job, interrupt_level=level)
182			return "OK"
183
184		elif request == "workers":
185			# return the number of workers, sorted by type
186			workers = {}
187			for jobtype in self.manager.worker_pool:
188				workers[jobtype] = len(self.manager.worker_pool[jobtype])
189
190			workers["total"] = sum([workers[workertype] for workertype in workers])
191
192			return workers
193
194		if request == "jobs":
195			# return queued jobs, sorted by type
196			jobs = self.db.fetchall("SELECT * FROM jobs")
197			if jobs is None:
198				return {"error": "Database unavailable"}
199
200			response = {}
201			for job in jobs:
202				if job["jobtype"] not in response:
203					response[job["jobtype"]] = 0
204				response[job["jobtype"]] += 1
205
206			response["total"] = sum([response[jobtype] for jobtype in response])
207
208			return response
209
210		if request == "datasets":
211			# datasets created per time period
212			week = 86400 * 7
213			now = int(time.time())
214
215			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
216
217			response = {
218				"1h": 0,
219				"1d": 0,
220				"1w": 0
221			}
222
223			for item in items:
224				response["1w"] += 1
225				if item["timestamp"] > now - 3600:
226					response["1h"] += 1
227				if item["timestamp"] > now - 86400:
228					response["1d"] += 1
229
230			return response
231
232		if request == "worker-status":
233			# technically more 'job status' than 'worker status', this returns
234			# all jobs plus, for those that are currently active, some worker
235			# info as well as related datasets. useful to monitor server
236			# activity and judge whether 4CAT can safely be interrupted
237			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
238			running = []
239			queue = {}
240
241			for job in open_jobs:
242				try:
243					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
244				except IndexError:
245					worker = None
246
247				if not bool(worker):
248					if job["jobtype"] not in queue:
249						queue[job["jobtype"]] = 0
250
251					queue[job["jobtype"]] += 1
252				else:
253					if hasattr(worker, "dataset") and worker.dataset:
254						running_key = worker.dataset.key
255						running_user = worker.dataset.creator
256						running_parent = worker.dataset.top_parent().key
257					else:
258						running_key = None
259						running_user = None
260						running_parent = None
261
262					running.append({
263						"type": job["jobtype"],
264						"is_claimed": job["timestamp_claimed"] > 0,
265						"is_running": bool(worker),
266						"is_processor": hasattr(worker, "dataset"),
267						"is_recurring": (int(job["interval"]) > 0),
268						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
269						"dataset_key": running_key,
270						"dataset_user": running_user,
271						"dataset_parent_key": running_parent,
272						"timestamp_queued": job["timestamp"],
273						"timestamp_claimed": job["timestamp_lastclaimed"]
274					})
275
276			return {
277				"running": running,
278				"queued": queue
279			}
280
281
282
283		# no appropriate response
284		return False

Offer a local server that listens on a port for API calls and answers them

type = 'api'
max_workers = 1
host = None
port = None
@classmethod
def ensure_job(cls, config=None):
21	@classmethod
22	def ensure_job(cls, config=None):
23		"""
24		Ensure that the API worker is always running
25
26		This is used to ensure that the API worker is always running, and if it
27		is not, it will be started by the WorkerManager.
28
29		:return:  Job parameters for the worker
30		"""
31		return {"remote_id": "localhost"}

Ensure that the API worker is always running

This is used to ensure that the API worker is always running, and if it is not, it will be started by the WorkerManager.

Returns

Job parameters for the worker

def work(self):
33	def work(self):
34		"""
35		Listen for API requests
36
37		Opens a socket that continuously listens for requests, and passes a
38		client object on to a handling method if a connection is established
39
40		:return:
41		"""
42		self.host = self.config.get('API_HOST')
43		self.port = self.config.get('API_PORT')
44
45		if self.port == 0:
46			# if configured not to listen, just loop until the backend shuts
47			# down we can't return here immediately, since this is a worker,
48			# and workers that end just get started again
49			self.db.close()
50			self.manager.log.info("Local API not available per configuration")
51			while not self.interrupted:
52				time.sleep(1)
53			return
54
55		# set up the socket
56		server = socket.socket()
57		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58		server.settimeout(2)  # should be plenty
59
60		has_time = True
61		start_trying = int(time.time())
62		while has_time:
63			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
64			try:
65				server.bind((self.host, self.port))
66				break
67			except OSError as e:
68				if has_time and not self.interrupted:
69					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
70					time.sleep(10.0)  # wait a few seconds before retrying
71					continue
72				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
73				return
74			except ConnectionRefusedError:
75				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
76				return
77
78		server.listen()
79		server.settimeout(2)
80		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
81
82		# continually listen for new connections
83		while not self.interrupted:
84			try:
85				client, address = server.accept()
86			except (socket.timeout, TimeoutError):
87				if self.interrupted:
88					break
89				# no problemo, just listen again - this only times out so it won't hang the entire app when
90				# trying to exit, as there's no other way to easily interrupt accept()
91				continue
92
93			self.api_response(client, address)
94			client.close()
95
96		self.manager.log.info("Shutting down local API server")

Listen for API requests

Opens a socket that continuously listens for requests, and passes a client object on to a handling method if a connection is established

Returns
def api_response(self, client, address):
 98	def api_response(self, client, address):
 99		"""
100		Respond to API requests
101
102		Gets the request, parses it as JSON, and if it is indeed JSON and of
103		the proper format, a response is generated and returned as JSON via
104		the client object
105
106		:param client:  Client object representing the connection
107		:param tuple address:  (IP, port) of the request
108		:return: Response
109		"""
110		client.settimeout(2)  # should be plenty
111
112		try:
113			buffer = ""
114			while True:
115				# receive data in 1k chunks
116				try:
117					data = client.recv(1024).decode("ascii")
118				except UnicodeDecodeError:
119					raise InternalAPIException
120
121				buffer += data
122				if not data or data.strip() == "" or len(data) > 2048:
123					break
124
125				# start processing as soon as we have valid json
126				try:
127					json.loads(buffer)
128					break
129				except json.JSONDecodeError:
130					pass
131
132			if not buffer:
133				raise InternalAPIException
134		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
135			# this means that no valid request was sent
136			self.manager.log.info("No input on API call from %s:%s - closing" % address)
137			return False
138
139		self.manager.log.debug("Received API request from %s:%s" % address)
140
141		try:
142			payload = json.loads(buffer)
143			if "request" not in payload:
144				raise InternalAPIException
145
146			response = self.process_request(payload["request"], payload)
147			if not response:
148				raise InternalAPIException
149
150			response = json.dumps({"error": False, "response": response})
151		except (json.JSONDecodeError, InternalAPIException):
152			response = json.dumps({"error": "Invalid JSON"})
153
154		try:
155			response = client.sendall(response.encode("ascii"))
156		except (BrokenPipeError, ConnectionError, socket.timeout):
157			response = None
158
159		return response

Respond to API requests

Gets the request, parses it as JSON, and if it is indeed JSON and of the proper format, a response is generated and returned as JSON via the client object

Parameters
  • client: Client object representing the connection
  • tuple address: (IP, port) of the request
Returns

Response

def process_request(self, request, payload):
161	def process_request(self, request, payload):
162		"""
163		Generate API response
164
165		Checks the type of request, and returns an appropriate response for
166		the type.
167
168		:param str request:  Request identifier
169		:param payload:  Other data sent with the request
170		:return:  API response
171		"""
172		if request == "cancel-job":
173			# cancel a running job
174			payload = payload.get("payload", {})
175			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
176			try:
177				job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db)
178			except JobNotFoundException:
179				return {"error": "Job not found"}
180
181			self.manager.request_interrupt(job=job, interrupt_level=level)
182			return "OK"
183
184		elif request == "workers":
185			# return the number of workers, sorted by type
186			workers = {}
187			for jobtype in self.manager.worker_pool:
188				workers[jobtype] = len(self.manager.worker_pool[jobtype])
189
190			workers["total"] = sum([workers[workertype] for workertype in workers])
191
192			return workers
193
194		if request == "jobs":
195			# return queued jobs, sorted by type
196			jobs = self.db.fetchall("SELECT * FROM jobs")
197			if jobs is None:
198				return {"error": "Database unavailable"}
199
200			response = {}
201			for job in jobs:
202				if job["jobtype"] not in response:
203					response[job["jobtype"]] = 0
204				response[job["jobtype"]] += 1
205
206			response["total"] = sum([response[jobtype] for jobtype in response])
207
208			return response
209
210		if request == "datasets":
211			# datasets created per time period
212			week = 86400 * 7
213			now = int(time.time())
214
215			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
216
217			response = {
218				"1h": 0,
219				"1d": 0,
220				"1w": 0
221			}
222
223			for item in items:
224				response["1w"] += 1
225				if item["timestamp"] > now - 3600:
226					response["1h"] += 1
227				if item["timestamp"] > now - 86400:
228					response["1d"] += 1
229
230			return response
231
232		if request == "worker-status":
233			# technically more 'job status' than 'worker status', this returns
234			# all jobs plus, for those that are currently active, some worker
235			# info as well as related datasets. useful to monitor server
236			# activity and judge whether 4CAT can safely be interrupted
237			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
238			running = []
239			queue = {}
240
241			for job in open_jobs:
242				try:
243					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
244				except IndexError:
245					worker = None
246
247				if not bool(worker):
248					if job["jobtype"] not in queue:
249						queue[job["jobtype"]] = 0
250
251					queue[job["jobtype"]] += 1
252				else:
253					if hasattr(worker, "dataset") and worker.dataset:
254						running_key = worker.dataset.key
255						running_user = worker.dataset.creator
256						running_parent = worker.dataset.top_parent().key
257					else:
258						running_key = None
259						running_user = None
260						running_parent = None
261
262					running.append({
263						"type": job["jobtype"],
264						"is_claimed": job["timestamp_claimed"] > 0,
265						"is_running": bool(worker),
266						"is_processor": hasattr(worker, "dataset"),
267						"is_recurring": (int(job["interval"]) > 0),
268						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
269						"dataset_key": running_key,
270						"dataset_user": running_user,
271						"dataset_parent_key": running_parent,
272						"timestamp_queued": job["timestamp"],
273						"timestamp_claimed": job["timestamp_lastclaimed"]
274					})
275
276			return {
277				"running": running,
278				"queued": queue
279			}
280
281
282
283		# no appropriate response
284		return False

Generate API response

Checks the type of request, and returns an appropriate response for the type.

Parameters
  • str request: Request identifier
  • payload: Other data sent with the request
Returns

API response

class InternalAPIException(builtins.Exception):
287class InternalAPIException(Exception):
288	# raised if API request could not be parsed
289	pass

Common base class for all non-exit exceptions.