backend.workers.api
1import socket 2import time 3import json 4 5from backend.lib.worker import BasicWorker 6from common.lib.job import Job 7from common.lib.exceptions import JobNotFoundException 8 9 10class InternalAPI(BasicWorker): 11 """ 12 Offer a local server that listens on a port for API calls and answers them 13 """ 14 type = "api" 15 max_workers = 1 16 17 host = None 18 port = None 19 20 @classmethod 21 def ensure_job(cls, config=None): 22 """ 23 Ensure that the API worker is always running 24 25 This is used to ensure that the API worker is always running, and if it 26 is not, it will be started by the WorkerManager. 27 28 :return: Job parameters for the worker 29 """ 30 return {"remote_id": "localhost"} 31 32 def work(self): 33 """ 34 Listen for API requests 35 36 Opens a socket that continuously listens for requests, and passes a 37 client object on to a handling method if a connection is established 38 39 :return: 40 """ 41 self.host = self.config.get('API_HOST') 42 self.port = self.config.get('API_PORT') 43 44 if self.port == 0: 45 # if configured not to listen, just loop until the backend shuts 46 # down we can't return here immediately, since this is a worker, 47 # and workers that end just get started again 48 self.db.close() 49 self.manager.log.info("Local API not available per configuration") 50 while not self.interrupted: 51 time.sleep(1) 52 return 53 54 # set up the socket 55 server = socket.socket() 56 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 57 server.settimeout(2) # should be plenty 58 59 has_time = True 60 start_trying = int(time.time()) 61 while has_time: 62 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 63 try: 64 server.bind((self.host, self.port)) 65 break 66 except OSError as e: 67 if has_time and not self.interrupted: 68 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 69 time.sleep(10.0) # wait a few seconds before retrying 70 continue 71 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 72 return 73 except ConnectionRefusedError: 74 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 75 return 76 77 server.listen() 78 server.settimeout(2) 79 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 80 81 # continually listen for new connections 82 while not self.interrupted: 83 try: 84 client, address = server.accept() 85 except (socket.timeout, TimeoutError): 86 if self.interrupted: 87 break 88 # no problemo, just listen again - this only times out so it won't hang the entire app when 89 # trying to exit, as there's no other way to easily interrupt accept() 90 continue 91 92 self.api_response(client, address) 93 client.close() 94 95 self.manager.log.info("Shutting down local API server") 96 97 def api_response(self, client, address): 98 """ 99 Respond to API requests 100 101 Gets the request, parses it as JSON, and if it is indeed JSON and of 102 the proper format, a response is generated and returned as JSON via 103 the client object 104 105 :param client: Client object representing the connection 106 :param tuple address: (IP, port) of the request 107 :return: Response 108 """ 109 client.settimeout(2) # should be plenty 110 111 try: 112 buffer = "" 113 while True: 114 # receive data in 1k chunks 115 try: 116 data = client.recv(1024).decode("ascii") 117 except UnicodeDecodeError: 118 raise InternalAPIException 119 120 buffer += data 121 if not data or data.strip() == "" or len(data) > 2048: 122 break 123 124 # start processing as soon as we have valid json 125 try: 126 json.loads(buffer) 127 break 128 except json.JSONDecodeError: 129 pass 130 131 if not buffer: 132 raise InternalAPIException 133 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 134 # this means that no valid request was sent 135 self.manager.log.info("No input on API call from %s:%s - closing" % address) 136 return False 137 138 self.manager.log.debug("Received API request from %s:%s" % address) 139 140 try: 141 payload = json.loads(buffer) 142 if "request" not in payload: 143 raise InternalAPIException 144 145 response = self.process_request(payload["request"], payload) 146 if not response: 147 raise InternalAPIException 148 149 response = json.dumps({"error": False, "response": response}) 150 except (json.JSONDecodeError, InternalAPIException): 151 response = json.dumps({"error": "Invalid JSON"}) 152 153 try: 154 response = client.sendall(response.encode("ascii")) 155 except (BrokenPipeError, ConnectionError, socket.timeout): 156 response = None 157 158 return response 159 160 def process_request(self, request, payload): 161 """ 162 Generate API response 163 164 Checks the type of request, and returns an appropriate response for 165 the type. 166 167 :param str request: Request identifier 168 :param payload: Other data sent with the request 169 :return: API response 170 """ 171 if request == "cancel-job": 172 # cancel a running job 173 payload = payload.get("payload", {}) 174 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 175 try: 176 job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db) 177 except JobNotFoundException: 178 return {"error": "Job not found"} 179 180 self.manager.request_interrupt(job=job, interrupt_level=level) 181 return "OK" 182 183 elif request == "workers": 184 # return the number of workers, sorted by type 185 workers = {} 186 for jobtype in self.manager.worker_pool: 187 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 188 189 workers["total"] = sum([workers[workertype] for workertype in workers]) 190 191 return workers 192 193 if request == "jobs": 194 # return queued jobs, sorted by type 195 jobs = self.db.fetchall("SELECT * FROM jobs") 196 if jobs is None: 197 return {"error": "Database unavailable"} 198 199 response = {} 200 for job in jobs: 201 if job["jobtype"] not in response: 202 response[job["jobtype"]] = 0 203 response[job["jobtype"]] += 1 204 205 response["total"] = sum([response[jobtype] for jobtype in response]) 206 207 return response 208 209 if request == "datasets": 210 # datasets created per time period 211 week = 86400 * 7 212 now = int(time.time()) 213 214 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 215 216 response = { 217 "1h": 0, 218 "1d": 0, 219 "1w": 0 220 } 221 222 for item in items: 223 response["1w"] += 1 224 if item["timestamp"] > now - 3600: 225 response["1h"] += 1 226 if item["timestamp"] > now - 86400: 227 response["1d"] += 1 228 229 return response 230 231 if request == "worker-status": 232 # technically more 'job status' than 'worker status', this returns 233 # all jobs plus, for those that are currently active, some worker 234 # info as well as related datasets. useful to monitor server 235 # activity and judge whether 4CAT can safely be interrupted 236 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 237 running = [] 238 queue = {} 239 240 for job in open_jobs: 241 try: 242 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 243 except IndexError: 244 worker = None 245 246 if not bool(worker): 247 if job["jobtype"] not in queue: 248 queue[job["jobtype"]] = 0 249 250 queue[job["jobtype"]] += 1 251 else: 252 if hasattr(worker, "dataset") and worker.dataset: 253 running_key = worker.dataset.key 254 running_user = worker.dataset.creator 255 running_parent = worker.dataset.top_parent().key 256 else: 257 running_key = None 258 running_user = None 259 running_parent = None 260 261 running.append({ 262 "type": job["jobtype"], 263 "is_claimed": job["timestamp_claimed"] > 0, 264 "is_running": bool(worker), 265 "is_processor": hasattr(worker, "dataset"), 266 "is_recurring": (int(job["interval"]) > 0), 267 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 268 "dataset_key": running_key, 269 "dataset_user": running_user, 270 "dataset_parent_key": running_parent, 271 "timestamp_queued": job["timestamp"], 272 "timestamp_claimed": job["timestamp_lastclaimed"] 273 }) 274 275 return { 276 "running": running, 277 "queued": queue 278 } 279 280 281 282 # no appropriate response 283 return False 284 285 286class InternalAPIException(Exception): 287 # raised if API request could not be parsed 288 pass
11class InternalAPI(BasicWorker): 12 """ 13 Offer a local server that listens on a port for API calls and answers them 14 """ 15 type = "api" 16 max_workers = 1 17 18 host = None 19 port = None 20 21 @classmethod 22 def ensure_job(cls, config=None): 23 """ 24 Ensure that the API worker is always running 25 26 This is used to ensure that the API worker is always running, and if it 27 is not, it will be started by the WorkerManager. 28 29 :return: Job parameters for the worker 30 """ 31 return {"remote_id": "localhost"} 32 33 def work(self): 34 """ 35 Listen for API requests 36 37 Opens a socket that continuously listens for requests, and passes a 38 client object on to a handling method if a connection is established 39 40 :return: 41 """ 42 self.host = self.config.get('API_HOST') 43 self.port = self.config.get('API_PORT') 44 45 if self.port == 0: 46 # if configured not to listen, just loop until the backend shuts 47 # down we can't return here immediately, since this is a worker, 48 # and workers that end just get started again 49 self.db.close() 50 self.manager.log.info("Local API not available per configuration") 51 while not self.interrupted: 52 time.sleep(1) 53 return 54 55 # set up the socket 56 server = socket.socket() 57 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 58 server.settimeout(2) # should be plenty 59 60 has_time = True 61 start_trying = int(time.time()) 62 while has_time: 63 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 64 try: 65 server.bind((self.host, self.port)) 66 break 67 except OSError as e: 68 if has_time and not self.interrupted: 69 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 70 time.sleep(10.0) # wait a few seconds before retrying 71 continue 72 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 73 return 74 except ConnectionRefusedError: 75 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 76 return 77 78 server.listen() 79 server.settimeout(2) 80 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 81 82 # continually listen for new connections 83 while not self.interrupted: 84 try: 85 client, address = server.accept() 86 except (socket.timeout, TimeoutError): 87 if self.interrupted: 88 break 89 # no problemo, just listen again - this only times out so it won't hang the entire app when 90 # trying to exit, as there's no other way to easily interrupt accept() 91 continue 92 93 self.api_response(client, address) 94 client.close() 95 96 self.manager.log.info("Shutting down local API server") 97 98 def api_response(self, client, address): 99 """ 100 Respond to API requests 101 102 Gets the request, parses it as JSON, and if it is indeed JSON and of 103 the proper format, a response is generated and returned as JSON via 104 the client object 105 106 :param client: Client object representing the connection 107 :param tuple address: (IP, port) of the request 108 :return: Response 109 """ 110 client.settimeout(2) # should be plenty 111 112 try: 113 buffer = "" 114 while True: 115 # receive data in 1k chunks 116 try: 117 data = client.recv(1024).decode("ascii") 118 except UnicodeDecodeError: 119 raise InternalAPIException 120 121 buffer += data 122 if not data or data.strip() == "" or len(data) > 2048: 123 break 124 125 # start processing as soon as we have valid json 126 try: 127 json.loads(buffer) 128 break 129 except json.JSONDecodeError: 130 pass 131 132 if not buffer: 133 raise InternalAPIException 134 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 135 # this means that no valid request was sent 136 self.manager.log.info("No input on API call from %s:%s - closing" % address) 137 return False 138 139 self.manager.log.debug("Received API request from %s:%s" % address) 140 141 try: 142 payload = json.loads(buffer) 143 if "request" not in payload: 144 raise InternalAPIException 145 146 response = self.process_request(payload["request"], payload) 147 if not response: 148 raise InternalAPIException 149 150 response = json.dumps({"error": False, "response": response}) 151 except (json.JSONDecodeError, InternalAPIException): 152 response = json.dumps({"error": "Invalid JSON"}) 153 154 try: 155 response = client.sendall(response.encode("ascii")) 156 except (BrokenPipeError, ConnectionError, socket.timeout): 157 response = None 158 159 return response 160 161 def process_request(self, request, payload): 162 """ 163 Generate API response 164 165 Checks the type of request, and returns an appropriate response for 166 the type. 167 168 :param str request: Request identifier 169 :param payload: Other data sent with the request 170 :return: API response 171 """ 172 if request == "cancel-job": 173 # cancel a running job 174 payload = payload.get("payload", {}) 175 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 176 try: 177 job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db) 178 except JobNotFoundException: 179 return {"error": "Job not found"} 180 181 self.manager.request_interrupt(job=job, interrupt_level=level) 182 return "OK" 183 184 elif request == "workers": 185 # return the number of workers, sorted by type 186 workers = {} 187 for jobtype in self.manager.worker_pool: 188 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 189 190 workers["total"] = sum([workers[workertype] for workertype in workers]) 191 192 return workers 193 194 if request == "jobs": 195 # return queued jobs, sorted by type 196 jobs = self.db.fetchall("SELECT * FROM jobs") 197 if jobs is None: 198 return {"error": "Database unavailable"} 199 200 response = {} 201 for job in jobs: 202 if job["jobtype"] not in response: 203 response[job["jobtype"]] = 0 204 response[job["jobtype"]] += 1 205 206 response["total"] = sum([response[jobtype] for jobtype in response]) 207 208 return response 209 210 if request == "datasets": 211 # datasets created per time period 212 week = 86400 * 7 213 now = int(time.time()) 214 215 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 216 217 response = { 218 "1h": 0, 219 "1d": 0, 220 "1w": 0 221 } 222 223 for item in items: 224 response["1w"] += 1 225 if item["timestamp"] > now - 3600: 226 response["1h"] += 1 227 if item["timestamp"] > now - 86400: 228 response["1d"] += 1 229 230 return response 231 232 if request == "worker-status": 233 # technically more 'job status' than 'worker status', this returns 234 # all jobs plus, for those that are currently active, some worker 235 # info as well as related datasets. useful to monitor server 236 # activity and judge whether 4CAT can safely be interrupted 237 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 238 running = [] 239 queue = {} 240 241 for job in open_jobs: 242 try: 243 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 244 except IndexError: 245 worker = None 246 247 if not bool(worker): 248 if job["jobtype"] not in queue: 249 queue[job["jobtype"]] = 0 250 251 queue[job["jobtype"]] += 1 252 else: 253 if hasattr(worker, "dataset") and worker.dataset: 254 running_key = worker.dataset.key 255 running_user = worker.dataset.creator 256 running_parent = worker.dataset.top_parent().key 257 else: 258 running_key = None 259 running_user = None 260 running_parent = None 261 262 running.append({ 263 "type": job["jobtype"], 264 "is_claimed": job["timestamp_claimed"] > 0, 265 "is_running": bool(worker), 266 "is_processor": hasattr(worker, "dataset"), 267 "is_recurring": (int(job["interval"]) > 0), 268 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 269 "dataset_key": running_key, 270 "dataset_user": running_user, 271 "dataset_parent_key": running_parent, 272 "timestamp_queued": job["timestamp"], 273 "timestamp_claimed": job["timestamp_lastclaimed"] 274 }) 275 276 return { 277 "running": running, 278 "queued": queue 279 } 280 281 282 283 # no appropriate response 284 return False
Offer a local server that listens on a port for API calls and answers them
21 @classmethod 22 def ensure_job(cls, config=None): 23 """ 24 Ensure that the API worker is always running 25 26 This is used to ensure that the API worker is always running, and if it 27 is not, it will be started by the WorkerManager. 28 29 :return: Job parameters for the worker 30 """ 31 return {"remote_id": "localhost"}
Ensure that the API worker is always running
This is used to ensure that the API worker is always running, and if it is not, it will be started by the WorkerManager.
Returns
Job parameters for the worker
33 def work(self): 34 """ 35 Listen for API requests 36 37 Opens a socket that continuously listens for requests, and passes a 38 client object on to a handling method if a connection is established 39 40 :return: 41 """ 42 self.host = self.config.get('API_HOST') 43 self.port = self.config.get('API_PORT') 44 45 if self.port == 0: 46 # if configured not to listen, just loop until the backend shuts 47 # down we can't return here immediately, since this is a worker, 48 # and workers that end just get started again 49 self.db.close() 50 self.manager.log.info("Local API not available per configuration") 51 while not self.interrupted: 52 time.sleep(1) 53 return 54 55 # set up the socket 56 server = socket.socket() 57 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 58 server.settimeout(2) # should be plenty 59 60 has_time = True 61 start_trying = int(time.time()) 62 while has_time: 63 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 64 try: 65 server.bind((self.host, self.port)) 66 break 67 except OSError as e: 68 if has_time and not self.interrupted: 69 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 70 time.sleep(10.0) # wait a few seconds before retrying 71 continue 72 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 73 return 74 except ConnectionRefusedError: 75 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 76 return 77 78 server.listen() 79 server.settimeout(2) 80 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 81 82 # continually listen for new connections 83 while not self.interrupted: 84 try: 85 client, address = server.accept() 86 except (socket.timeout, TimeoutError): 87 if self.interrupted: 88 break 89 # no problemo, just listen again - this only times out so it won't hang the entire app when 90 # trying to exit, as there's no other way to easily interrupt accept() 91 continue 92 93 self.api_response(client, address) 94 client.close() 95 96 self.manager.log.info("Shutting down local API server")
Listen for API requests
Opens a socket that continuously listens for requests, and passes a client object on to a handling method if a connection is established
Returns
98 def api_response(self, client, address): 99 """ 100 Respond to API requests 101 102 Gets the request, parses it as JSON, and if it is indeed JSON and of 103 the proper format, a response is generated and returned as JSON via 104 the client object 105 106 :param client: Client object representing the connection 107 :param tuple address: (IP, port) of the request 108 :return: Response 109 """ 110 client.settimeout(2) # should be plenty 111 112 try: 113 buffer = "" 114 while True: 115 # receive data in 1k chunks 116 try: 117 data = client.recv(1024).decode("ascii") 118 except UnicodeDecodeError: 119 raise InternalAPIException 120 121 buffer += data 122 if not data or data.strip() == "" or len(data) > 2048: 123 break 124 125 # start processing as soon as we have valid json 126 try: 127 json.loads(buffer) 128 break 129 except json.JSONDecodeError: 130 pass 131 132 if not buffer: 133 raise InternalAPIException 134 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 135 # this means that no valid request was sent 136 self.manager.log.info("No input on API call from %s:%s - closing" % address) 137 return False 138 139 self.manager.log.debug("Received API request from %s:%s" % address) 140 141 try: 142 payload = json.loads(buffer) 143 if "request" not in payload: 144 raise InternalAPIException 145 146 response = self.process_request(payload["request"], payload) 147 if not response: 148 raise InternalAPIException 149 150 response = json.dumps({"error": False, "response": response}) 151 except (json.JSONDecodeError, InternalAPIException): 152 response = json.dumps({"error": "Invalid JSON"}) 153 154 try: 155 response = client.sendall(response.encode("ascii")) 156 except (BrokenPipeError, ConnectionError, socket.timeout): 157 response = None 158 159 return response
Respond to API requests
Gets the request, parses it as JSON, and if it is indeed JSON and of the proper format, a response is generated and returned as JSON via the client object
Parameters
- client: Client object representing the connection
- tuple address: (IP, port) of the request
Returns
Response
161 def process_request(self, request, payload): 162 """ 163 Generate API response 164 165 Checks the type of request, and returns an appropriate response for 166 the type. 167 168 :param str request: Request identifier 169 :param payload: Other data sent with the request 170 :return: API response 171 """ 172 if request == "cancel-job": 173 # cancel a running job 174 payload = payload.get("payload", {}) 175 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 176 try: 177 job = Job.get_by_remote_ID(jobtype=payload.get("jobtype"), remote_id=payload.get("remote_id"), database=self.db) 178 except JobNotFoundException: 179 return {"error": "Job not found"} 180 181 self.manager.request_interrupt(job=job, interrupt_level=level) 182 return "OK" 183 184 elif request == "workers": 185 # return the number of workers, sorted by type 186 workers = {} 187 for jobtype in self.manager.worker_pool: 188 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 189 190 workers["total"] = sum([workers[workertype] for workertype in workers]) 191 192 return workers 193 194 if request == "jobs": 195 # return queued jobs, sorted by type 196 jobs = self.db.fetchall("SELECT * FROM jobs") 197 if jobs is None: 198 return {"error": "Database unavailable"} 199 200 response = {} 201 for job in jobs: 202 if job["jobtype"] not in response: 203 response[job["jobtype"]] = 0 204 response[job["jobtype"]] += 1 205 206 response["total"] = sum([response[jobtype] for jobtype in response]) 207 208 return response 209 210 if request == "datasets": 211 # datasets created per time period 212 week = 86400 * 7 213 now = int(time.time()) 214 215 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 216 217 response = { 218 "1h": 0, 219 "1d": 0, 220 "1w": 0 221 } 222 223 for item in items: 224 response["1w"] += 1 225 if item["timestamp"] > now - 3600: 226 response["1h"] += 1 227 if item["timestamp"] > now - 86400: 228 response["1d"] += 1 229 230 return response 231 232 if request == "worker-status": 233 # technically more 'job status' than 'worker status', this returns 234 # all jobs plus, for those that are currently active, some worker 235 # info as well as related datasets. useful to monitor server 236 # activity and judge whether 4CAT can safely be interrupted 237 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 238 running = [] 239 queue = {} 240 241 for job in open_jobs: 242 try: 243 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 244 except IndexError: 245 worker = None 246 247 if not bool(worker): 248 if job["jobtype"] not in queue: 249 queue[job["jobtype"]] = 0 250 251 queue[job["jobtype"]] += 1 252 else: 253 if hasattr(worker, "dataset") and worker.dataset: 254 running_key = worker.dataset.key 255 running_user = worker.dataset.creator 256 running_parent = worker.dataset.top_parent().key 257 else: 258 running_key = None 259 running_user = None 260 running_parent = None 261 262 running.append({ 263 "type": job["jobtype"], 264 "is_claimed": job["timestamp_claimed"] > 0, 265 "is_running": bool(worker), 266 "is_processor": hasattr(worker, "dataset"), 267 "is_recurring": (int(job["interval"]) > 0), 268 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 269 "dataset_key": running_key, 270 "dataset_user": running_user, 271 "dataset_parent_key": running_parent, 272 "timestamp_queued": job["timestamp"], 273 "timestamp_claimed": job["timestamp_lastclaimed"] 274 }) 275 276 return { 277 "running": running, 278 "queued": queue 279 } 280 281 282 283 # no appropriate response 284 return False
Generate API response
Checks the type of request, and returns an appropriate response for the type.
Parameters
- str request: Request identifier
- payload: Other data sent with the request
Returns
API response
Common base class for all non-exit exceptions.