Edit on GitHub

backend.workers.api

  1import socket
  2import time
  3import json
  4
  5from backend.lib.worker import BasicWorker
  6
  7
  8class InternalAPI(BasicWorker):
  9	"""
 10	Offer a local server that listens on a port for API calls and answers them
 11	"""
 12	type = "api"
 13	max_workers = 1
 14
 15	ensure_job = {"remote_id": "localhost"}
 16
 17	host = None
 18	port = None
 19
 20	def work(self):
 21		"""
 22		Listen for API requests
 23
 24		Opens a socket that continuously listens for requests, and passes a
 25		client object on to a handling method if a connection is established
 26
 27		:return:
 28		"""
 29		self.host = self.config.get('API_HOST')
 30		self.port = self.config.get('API_PORT')
 31
 32		if self.port == 0:
 33			# if configured not to listen, just loop until the backend shuts
 34			# down we can't return here immediately, since this is a worker,
 35			# and workers that end just get started again
 36			self.db.close()
 37			self.manager.log.info("Local API not available per configuration")
 38			while not self.interrupted:
 39				time.sleep(1)
 40			return
 41
 42		# set up the socket
 43		server = socket.socket()
 44		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 45		server.settimeout(2)  # should be plenty
 46
 47		has_time = True
 48		start_trying = int(time.time())
 49		while has_time:
 50			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
 51			try:
 52				server.bind((self.host, self.port))
 53				break
 54			except OSError as e:
 55				if has_time and not self.interrupted:
 56					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
 57					time.sleep(10.0)  # wait a few seconds before retrying
 58					continue
 59				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
 60				return
 61			except ConnectionRefusedError:
 62				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
 63				return
 64
 65		server.listen()
 66		server.settimeout(2)
 67		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
 68
 69		# continually listen for new connections
 70		while not self.interrupted:
 71			try:
 72				client, address = server.accept()
 73			except (socket.timeout, TimeoutError):
 74				if self.interrupted:
 75					break
 76				# no problemo, just listen again - this only times out so it won't hang the entire app when
 77				# trying to exit, as there's no other way to easily interrupt accept()
 78				continue
 79
 80			self.api_response(client, address)
 81			client.close()
 82
 83		self.manager.log.info("Shutting down local API server")
 84
 85	def api_response(self, client, address):
 86		"""
 87		Respond to API requests
 88
 89		Gets the request, parses it as JSON, and if it is indeed JSON and of
 90		the proper format, a response is generated and returned as JSON via
 91		the client object
 92
 93		:param client:  Client object representing the connection
 94		:param tuple address:  (IP, port) of the request
 95		:return: Response
 96		"""
 97		client.settimeout(2)  # should be plenty
 98
 99		try:
100			buffer = ""
101			while True:
102				# receive data in 1k chunks
103				try:
104					data = client.recv(1024).decode("ascii")
105				except UnicodeDecodeError:
106					raise InternalAPIException
107
108				buffer += data
109				if not data or data.strip() == "" or len(data) > 2048:
110					break
111
112				# start processing as soon as we have valid json
113				try:
114					json.loads(buffer)
115					break
116				except json.JSONDecodeError:
117					pass
118
119			if not buffer:
120				raise InternalAPIException
121		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
122			# this means that no valid request was sent
123			self.manager.log.info("No input on API call from %s:%s - closing" % address)
124			return False
125
126		self.manager.log.debug("Received API request from %s:%s" % address)
127
128		try:
129			payload = json.loads(buffer)
130			if "request" not in payload:
131				raise InternalAPIException
132
133			response = self.process_request(payload["request"], payload)
134			if not response:
135				raise InternalAPIException
136
137			response = json.dumps({"error": False, "response": response})
138		except (json.JSONDecodeError, InternalAPIException):
139			response = json.dumps({"error": "Invalid JSON"})
140
141		try:
142			response = client.sendall(response.encode("ascii"))
143		except (BrokenPipeError, ConnectionError, socket.timeout):
144			response = None
145
146		return response
147
148	def process_request(self, request, payload):
149		"""
150		Generate API response
151
152		Checks the type of request, and returns an appropriate response for
153		the type.
154
155		:param str request:  Request identifier
156		:param payload:  Other data sent with the request
157		:return:  API response
158		"""
159		if request == "cancel-job":
160			# cancel a running job
161			payload = payload.get("payload", {})
162			remote_id = payload.get("remote_id")
163			jobtype = payload.get("jobtype")
164			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
165
166			self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level)
167			return "OK"
168
169		elif request == "workers":
170			# return the number of workers, sorted by type
171			workers = {}
172			for jobtype in self.manager.worker_pool:
173				workers[jobtype] = len(self.manager.worker_pool[jobtype])
174
175			workers["total"] = sum([workers[workertype] for workertype in workers])
176
177			return workers
178
179		if request == "jobs":
180			# return queued jobs, sorted by type
181			jobs = self.db.fetchall("SELECT * FROM jobs")
182			if jobs is None:
183				return {"error": "Database unavailable"}
184
185			response = {}
186			for job in jobs:
187				if job["jobtype"] not in response:
188					response[job["jobtype"]] = 0
189				response[job["jobtype"]] += 1
190
191			response["total"] = sum([response[jobtype] for jobtype in response])
192
193			return response
194
195		if request == "datasets":
196			# datasets created per time period
197			week = 86400 * 7
198			now = int(time.time())
199
200			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
201
202			response = {
203				"1h": 0,
204				"1d": 0,
205				"1w": 0
206			}
207
208			for item in items:
209				response["1w"] += 1
210				if item["timestamp"] > now - 3600:
211					response["1h"] += 1
212				if item["timestamp"] > now - 86400:
213					response["1d"] += 1
214
215			return response
216
217		if request == "worker-status":
218			# technically more 'job status' than 'worker status', this returns
219			# all jobs plus, for those that are currently active, some worker
220			# info as well as related datasets. useful to monitor server
221			# activity and judge whether 4CAT can safely be interrupted
222			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
223			running = []
224			queue = {}
225
226			for job in open_jobs:
227				try:
228					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
229				except IndexError:
230					worker = None
231
232				if not bool(worker):
233					if job["jobtype"] not in queue:
234						queue[job["jobtype"]] = 0
235
236					queue[job["jobtype"]] += 1
237				else:
238					if hasattr(worker, "dataset") and worker.dataset:
239						running_key = worker.dataset.key
240						running_user = worker.dataset.creator
241						running_parent = worker.dataset.top_parent().key
242					else:
243						running_key = None
244						running_user = None
245						running_parent = None
246
247					running.append({
248						"type": job["jobtype"],
249						"is_claimed": job["timestamp_claimed"] > 0,
250						"is_running": bool(worker),
251						"is_processor": hasattr(worker, "dataset"),
252						"is_recurring": (int(job["interval"]) > 0),
253						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
254						"dataset_key": running_key,
255						"dataset_user": running_user,
256						"dataset_parent_key": running_parent,
257						"timestamp_queued": job["timestamp"],
258						"timestamp_claimed": job["timestamp_lastclaimed"]
259					})
260
261			return {
262				"running": running,
263				"queued": queue
264			}
265
266
267
268		# no appropriate response
269		return False
270
271
272class InternalAPIException(Exception):
273	# raised if API request could not be parsed
274	pass
class InternalAPI(backend.lib.worker.BasicWorker):
  9class InternalAPI(BasicWorker):
 10	"""
 11	Offer a local server that listens on a port for API calls and answers them
 12	"""
 13	type = "api"
 14	max_workers = 1
 15
 16	ensure_job = {"remote_id": "localhost"}
 17
 18	host = None
 19	port = None
 20
 21	def work(self):
 22		"""
 23		Listen for API requests
 24
 25		Opens a socket that continuously listens for requests, and passes a
 26		client object on to a handling method if a connection is established
 27
 28		:return:
 29		"""
 30		self.host = self.config.get('API_HOST')
 31		self.port = self.config.get('API_PORT')
 32
 33		if self.port == 0:
 34			# if configured not to listen, just loop until the backend shuts
 35			# down we can't return here immediately, since this is a worker,
 36			# and workers that end just get started again
 37			self.db.close()
 38			self.manager.log.info("Local API not available per configuration")
 39			while not self.interrupted:
 40				time.sleep(1)
 41			return
 42
 43		# set up the socket
 44		server = socket.socket()
 45		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 46		server.settimeout(2)  # should be plenty
 47
 48		has_time = True
 49		start_trying = int(time.time())
 50		while has_time:
 51			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
 52			try:
 53				server.bind((self.host, self.port))
 54				break
 55			except OSError as e:
 56				if has_time and not self.interrupted:
 57					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
 58					time.sleep(10.0)  # wait a few seconds before retrying
 59					continue
 60				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
 61				return
 62			except ConnectionRefusedError:
 63				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
 64				return
 65
 66		server.listen()
 67		server.settimeout(2)
 68		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
 69
 70		# continually listen for new connections
 71		while not self.interrupted:
 72			try:
 73				client, address = server.accept()
 74			except (socket.timeout, TimeoutError):
 75				if self.interrupted:
 76					break
 77				# no problemo, just listen again - this only times out so it won't hang the entire app when
 78				# trying to exit, as there's no other way to easily interrupt accept()
 79				continue
 80
 81			self.api_response(client, address)
 82			client.close()
 83
 84		self.manager.log.info("Shutting down local API server")
 85
 86	def api_response(self, client, address):
 87		"""
 88		Respond to API requests
 89
 90		Gets the request, parses it as JSON, and if it is indeed JSON and of
 91		the proper format, a response is generated and returned as JSON via
 92		the client object
 93
 94		:param client:  Client object representing the connection
 95		:param tuple address:  (IP, port) of the request
 96		:return: Response
 97		"""
 98		client.settimeout(2)  # should be plenty
 99
100		try:
101			buffer = ""
102			while True:
103				# receive data in 1k chunks
104				try:
105					data = client.recv(1024).decode("ascii")
106				except UnicodeDecodeError:
107					raise InternalAPIException
108
109				buffer += data
110				if not data or data.strip() == "" or len(data) > 2048:
111					break
112
113				# start processing as soon as we have valid json
114				try:
115					json.loads(buffer)
116					break
117				except json.JSONDecodeError:
118					pass
119
120			if not buffer:
121				raise InternalAPIException
122		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
123			# this means that no valid request was sent
124			self.manager.log.info("No input on API call from %s:%s - closing" % address)
125			return False
126
127		self.manager.log.debug("Received API request from %s:%s" % address)
128
129		try:
130			payload = json.loads(buffer)
131			if "request" not in payload:
132				raise InternalAPIException
133
134			response = self.process_request(payload["request"], payload)
135			if not response:
136				raise InternalAPIException
137
138			response = json.dumps({"error": False, "response": response})
139		except (json.JSONDecodeError, InternalAPIException):
140			response = json.dumps({"error": "Invalid JSON"})
141
142		try:
143			response = client.sendall(response.encode("ascii"))
144		except (BrokenPipeError, ConnectionError, socket.timeout):
145			response = None
146
147		return response
148
149	def process_request(self, request, payload):
150		"""
151		Generate API response
152
153		Checks the type of request, and returns an appropriate response for
154		the type.
155
156		:param str request:  Request identifier
157		:param payload:  Other data sent with the request
158		:return:  API response
159		"""
160		if request == "cancel-job":
161			# cancel a running job
162			payload = payload.get("payload", {})
163			remote_id = payload.get("remote_id")
164			jobtype = payload.get("jobtype")
165			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
166
167			self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level)
168			return "OK"
169
170		elif request == "workers":
171			# return the number of workers, sorted by type
172			workers = {}
173			for jobtype in self.manager.worker_pool:
174				workers[jobtype] = len(self.manager.worker_pool[jobtype])
175
176			workers["total"] = sum([workers[workertype] for workertype in workers])
177
178			return workers
179
180		if request == "jobs":
181			# return queued jobs, sorted by type
182			jobs = self.db.fetchall("SELECT * FROM jobs")
183			if jobs is None:
184				return {"error": "Database unavailable"}
185
186			response = {}
187			for job in jobs:
188				if job["jobtype"] not in response:
189					response[job["jobtype"]] = 0
190				response[job["jobtype"]] += 1
191
192			response["total"] = sum([response[jobtype] for jobtype in response])
193
194			return response
195
196		if request == "datasets":
197			# datasets created per time period
198			week = 86400 * 7
199			now = int(time.time())
200
201			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
202
203			response = {
204				"1h": 0,
205				"1d": 0,
206				"1w": 0
207			}
208
209			for item in items:
210				response["1w"] += 1
211				if item["timestamp"] > now - 3600:
212					response["1h"] += 1
213				if item["timestamp"] > now - 86400:
214					response["1d"] += 1
215
216			return response
217
218		if request == "worker-status":
219			# technically more 'job status' than 'worker status', this returns
220			# all jobs plus, for those that are currently active, some worker
221			# info as well as related datasets. useful to monitor server
222			# activity and judge whether 4CAT can safely be interrupted
223			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
224			running = []
225			queue = {}
226
227			for job in open_jobs:
228				try:
229					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
230				except IndexError:
231					worker = None
232
233				if not bool(worker):
234					if job["jobtype"] not in queue:
235						queue[job["jobtype"]] = 0
236
237					queue[job["jobtype"]] += 1
238				else:
239					if hasattr(worker, "dataset") and worker.dataset:
240						running_key = worker.dataset.key
241						running_user = worker.dataset.creator
242						running_parent = worker.dataset.top_parent().key
243					else:
244						running_key = None
245						running_user = None
246						running_parent = None
247
248					running.append({
249						"type": job["jobtype"],
250						"is_claimed": job["timestamp_claimed"] > 0,
251						"is_running": bool(worker),
252						"is_processor": hasattr(worker, "dataset"),
253						"is_recurring": (int(job["interval"]) > 0),
254						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
255						"dataset_key": running_key,
256						"dataset_user": running_user,
257						"dataset_parent_key": running_parent,
258						"timestamp_queued": job["timestamp"],
259						"timestamp_claimed": job["timestamp_lastclaimed"]
260					})
261
262			return {
263				"running": running,
264				"queued": queue
265			}
266
267
268
269		# no appropriate response
270		return False

Offer a local server that listens on a port for API calls and answers them

type = 'api'
max_workers = 1
ensure_job = {'remote_id': 'localhost'}
host = None
port = None
def work(self):
21	def work(self):
22		"""
23		Listen for API requests
24
25		Opens a socket that continuously listens for requests, and passes a
26		client object on to a handling method if a connection is established
27
28		:return:
29		"""
30		self.host = self.config.get('API_HOST')
31		self.port = self.config.get('API_PORT')
32
33		if self.port == 0:
34			# if configured not to listen, just loop until the backend shuts
35			# down we can't return here immediately, since this is a worker,
36			# and workers that end just get started again
37			self.db.close()
38			self.manager.log.info("Local API not available per configuration")
39			while not self.interrupted:
40				time.sleep(1)
41			return
42
43		# set up the socket
44		server = socket.socket()
45		server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
46		server.settimeout(2)  # should be plenty
47
48		has_time = True
49		start_trying = int(time.time())
50		while has_time:
51			has_time = start_trying > time.time() - 300  # stop trying after 5 minutes
52			try:
53				server.bind((self.host, self.port))
54				break
55			except OSError as e:
56				if has_time and not self.interrupted:
57					self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e))
58					time.sleep(10.0)  # wait a few seconds before retrying
59					continue
60				self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port)
61				return
62			except ConnectionRefusedError:
63				self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port)
64				return
65
66		server.listen()
67		server.settimeout(2)
68		self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port))
69
70		# continually listen for new connections
71		while not self.interrupted:
72			try:
73				client, address = server.accept()
74			except (socket.timeout, TimeoutError):
75				if self.interrupted:
76					break
77				# no problemo, just listen again - this only times out so it won't hang the entire app when
78				# trying to exit, as there's no other way to easily interrupt accept()
79				continue
80
81			self.api_response(client, address)
82			client.close()
83
84		self.manager.log.info("Shutting down local API server")

Listen for API requests

Opens a socket that continuously listens for requests, and passes a client object on to a handling method if a connection is established

Returns
def api_response(self, client, address):
 86	def api_response(self, client, address):
 87		"""
 88		Respond to API requests
 89
 90		Gets the request, parses it as JSON, and if it is indeed JSON and of
 91		the proper format, a response is generated and returned as JSON via
 92		the client object
 93
 94		:param client:  Client object representing the connection
 95		:param tuple address:  (IP, port) of the request
 96		:return: Response
 97		"""
 98		client.settimeout(2)  # should be plenty
 99
100		try:
101			buffer = ""
102			while True:
103				# receive data in 1k chunks
104				try:
105					data = client.recv(1024).decode("ascii")
106				except UnicodeDecodeError:
107					raise InternalAPIException
108
109				buffer += data
110				if not data or data.strip() == "" or len(data) > 2048:
111					break
112
113				# start processing as soon as we have valid json
114				try:
115					json.loads(buffer)
116					break
117				except json.JSONDecodeError:
118					pass
119
120			if not buffer:
121				raise InternalAPIException
122		except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException):
123			# this means that no valid request was sent
124			self.manager.log.info("No input on API call from %s:%s - closing" % address)
125			return False
126
127		self.manager.log.debug("Received API request from %s:%s" % address)
128
129		try:
130			payload = json.loads(buffer)
131			if "request" not in payload:
132				raise InternalAPIException
133
134			response = self.process_request(payload["request"], payload)
135			if not response:
136				raise InternalAPIException
137
138			response = json.dumps({"error": False, "response": response})
139		except (json.JSONDecodeError, InternalAPIException):
140			response = json.dumps({"error": "Invalid JSON"})
141
142		try:
143			response = client.sendall(response.encode("ascii"))
144		except (BrokenPipeError, ConnectionError, socket.timeout):
145			response = None
146
147		return response

Respond to API requests

Gets the request, parses it as JSON, and if it is indeed JSON and of the proper format, a response is generated and returned as JSON via the client object

Parameters
  • client: Client object representing the connection
  • tuple address: (IP, port) of the request
Returns

Response

def process_request(self, request, payload):
149	def process_request(self, request, payload):
150		"""
151		Generate API response
152
153		Checks the type of request, and returns an appropriate response for
154		the type.
155
156		:param str request:  Request identifier
157		:param payload:  Other data sent with the request
158		:return:  API response
159		"""
160		if request == "cancel-job":
161			# cancel a running job
162			payload = payload.get("payload", {})
163			remote_id = payload.get("remote_id")
164			jobtype = payload.get("jobtype")
165			level = payload.get("level", BasicWorker.INTERRUPT_RETRY)
166
167			self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level)
168			return "OK"
169
170		elif request == "workers":
171			# return the number of workers, sorted by type
172			workers = {}
173			for jobtype in self.manager.worker_pool:
174				workers[jobtype] = len(self.manager.worker_pool[jobtype])
175
176			workers["total"] = sum([workers[workertype] for workertype in workers])
177
178			return workers
179
180		if request == "jobs":
181			# return queued jobs, sorted by type
182			jobs = self.db.fetchall("SELECT * FROM jobs")
183			if jobs is None:
184				return {"error": "Database unavailable"}
185
186			response = {}
187			for job in jobs:
188				if job["jobtype"] not in response:
189					response[job["jobtype"]] = 0
190				response[job["jobtype"]] += 1
191
192			response["total"] = sum([response[jobtype] for jobtype in response])
193
194			return response
195
196		if request == "datasets":
197			# datasets created per time period
198			week = 86400 * 7
199			now = int(time.time())
200
201			items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,))
202
203			response = {
204				"1h": 0,
205				"1d": 0,
206				"1w": 0
207			}
208
209			for item in items:
210				response["1w"] += 1
211				if item["timestamp"] > now - 3600:
212					response["1h"] += 1
213				if item["timestamp"] > now - 86400:
214					response["1d"] += 1
215
216			return response
217
218		if request == "worker-status":
219			# technically more 'job status' than 'worker status', this returns
220			# all jobs plus, for those that are currently active, some worker
221			# info as well as related datasets. useful to monitor server
222			# activity and judge whether 4CAT can safely be interrupted
223			open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC")
224			running = []
225			queue = {}
226
227			for job in open_jobs:
228				try:
229					worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0]
230				except IndexError:
231					worker = None
232
233				if not bool(worker):
234					if job["jobtype"] not in queue:
235						queue[job["jobtype"]] = 0
236
237					queue[job["jobtype"]] += 1
238				else:
239					if hasattr(worker, "dataset") and worker.dataset:
240						running_key = worker.dataset.key
241						running_user = worker.dataset.creator
242						running_parent = worker.dataset.top_parent().key
243					else:
244						running_key = None
245						running_user = None
246						running_parent = None
247
248					running.append({
249						"type": job["jobtype"],
250						"is_claimed": job["timestamp_claimed"] > 0,
251						"is_running": bool(worker),
252						"is_processor": hasattr(worker, "dataset"),
253						"is_recurring": (int(job["interval"]) > 0),
254						"is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker,
255						"dataset_key": running_key,
256						"dataset_user": running_user,
257						"dataset_parent_key": running_parent,
258						"timestamp_queued": job["timestamp"],
259						"timestamp_claimed": job["timestamp_lastclaimed"]
260					})
261
262			return {
263				"running": running,
264				"queued": queue
265			}
266
267
268
269		# no appropriate response
270		return False

Generate API response

Checks the type of request, and returns an appropriate response for the type.

Parameters
  • str request: Request identifier
  • payload: Other data sent with the request
Returns

API response

class InternalAPIException(builtins.Exception):
273class InternalAPIException(Exception):
274	# raised if API request could not be parsed
275	pass

Common base class for all non-exit exceptions.