backend.workers.api
1import socket 2import time 3import json 4 5from backend.lib.worker import BasicWorker 6 7 8class InternalAPI(BasicWorker): 9 """ 10 Offer a local server that listens on a port for API calls and answers them 11 """ 12 type = "api" 13 max_workers = 1 14 15 ensure_job = {"remote_id": "localhost"} 16 17 host = None 18 port = None 19 20 def work(self): 21 """ 22 Listen for API requests 23 24 Opens a socket that continuously listens for requests, and passes a 25 client object on to a handling method if a connection is established 26 27 :return: 28 """ 29 self.host = self.config.get('API_HOST') 30 self.port = self.config.get('API_PORT') 31 32 if self.port == 0: 33 # if configured not to listen, just loop until the backend shuts 34 # down we can't return here immediately, since this is a worker, 35 # and workers that end just get started again 36 self.db.close() 37 self.manager.log.info("Local API not available per configuration") 38 while not self.interrupted: 39 time.sleep(1) 40 return 41 42 # set up the socket 43 server = socket.socket() 44 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 45 server.settimeout(2) # should be plenty 46 47 has_time = True 48 start_trying = int(time.time()) 49 while has_time: 50 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 51 try: 52 server.bind((self.host, self.port)) 53 break 54 except OSError as e: 55 if has_time and not self.interrupted: 56 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 57 time.sleep(10.0) # wait a few seconds before retrying 58 continue 59 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 60 return 61 except ConnectionRefusedError: 62 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 63 return 64 65 server.listen() 66 server.settimeout(2) 67 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 68 69 # continually listen for new connections 70 while not self.interrupted: 71 try: 72 client, address = server.accept() 73 except (socket.timeout, TimeoutError): 74 if self.interrupted: 75 break 76 # no problemo, just listen again - this only times out so it won't hang the entire app when 77 # trying to exit, as there's no other way to easily interrupt accept() 78 continue 79 80 self.api_response(client, address) 81 client.close() 82 83 self.manager.log.info("Shutting down local API server") 84 85 def api_response(self, client, address): 86 """ 87 Respond to API requests 88 89 Gets the request, parses it as JSON, and if it is indeed JSON and of 90 the proper format, a response is generated and returned as JSON via 91 the client object 92 93 :param client: Client object representing the connection 94 :param tuple address: (IP, port) of the request 95 :return: Response 96 """ 97 client.settimeout(2) # should be plenty 98 99 try: 100 buffer = "" 101 while True: 102 # receive data in 1k chunks 103 try: 104 data = client.recv(1024).decode("ascii") 105 except UnicodeDecodeError: 106 raise InternalAPIException 107 108 buffer += data 109 if not data or data.strip() == "" or len(data) > 2048: 110 break 111 112 # start processing as soon as we have valid json 113 try: 114 json.loads(buffer) 115 break 116 except json.JSONDecodeError: 117 pass 118 119 if not buffer: 120 raise InternalAPIException 121 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 122 # this means that no valid request was sent 123 self.manager.log.info("No input on API call from %s:%s - closing" % address) 124 return False 125 126 self.manager.log.debug("Received API request from %s:%s" % address) 127 128 try: 129 payload = json.loads(buffer) 130 if "request" not in payload: 131 raise InternalAPIException 132 133 response = self.process_request(payload["request"], payload) 134 if not response: 135 raise InternalAPIException 136 137 response = json.dumps({"error": False, "response": response}) 138 except (json.JSONDecodeError, InternalAPIException): 139 response = json.dumps({"error": "Invalid JSON"}) 140 141 try: 142 response = client.sendall(response.encode("ascii")) 143 except (BrokenPipeError, ConnectionError, socket.timeout): 144 response = None 145 146 return response 147 148 def process_request(self, request, payload): 149 """ 150 Generate API response 151 152 Checks the type of request, and returns an appropriate response for 153 the type. 154 155 :param str request: Request identifier 156 :param payload: Other data sent with the request 157 :return: API response 158 """ 159 if request == "cancel-job": 160 # cancel a running job 161 payload = payload.get("payload", {}) 162 remote_id = payload.get("remote_id") 163 jobtype = payload.get("jobtype") 164 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 165 166 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 167 return "OK" 168 169 elif request == "workers": 170 # return the number of workers, sorted by type 171 workers = {} 172 for jobtype in self.manager.worker_pool: 173 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 174 175 workers["total"] = sum([workers[workertype] for workertype in workers]) 176 177 return workers 178 179 if request == "jobs": 180 # return queued jobs, sorted by type 181 jobs = self.db.fetchall("SELECT * FROM jobs") 182 if jobs is None: 183 return {"error": "Database unavailable"} 184 185 response = {} 186 for job in jobs: 187 if job["jobtype"] not in response: 188 response[job["jobtype"]] = 0 189 response[job["jobtype"]] += 1 190 191 response["total"] = sum([response[jobtype] for jobtype in response]) 192 193 return response 194 195 if request == "datasets": 196 # datasets created per time period 197 week = 86400 * 7 198 now = int(time.time()) 199 200 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 201 202 response = { 203 "1h": 0, 204 "1d": 0, 205 "1w": 0 206 } 207 208 for item in items: 209 response["1w"] += 1 210 if item["timestamp"] > now - 3600: 211 response["1h"] += 1 212 if item["timestamp"] > now - 86400: 213 response["1d"] += 1 214 215 return response 216 217 if request == "worker-status": 218 # technically more 'job status' than 'worker status', this returns 219 # all jobs plus, for those that are currently active, some worker 220 # info as well as related datasets. useful to monitor server 221 # activity and judge whether 4CAT can safely be interrupted 222 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 223 running = [] 224 queue = {} 225 226 for job in open_jobs: 227 try: 228 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 229 except IndexError: 230 worker = None 231 232 if not bool(worker): 233 if job["jobtype"] not in queue: 234 queue[job["jobtype"]] = 0 235 236 queue[job["jobtype"]] += 1 237 else: 238 if hasattr(worker, "dataset") and worker.dataset: 239 running_key = worker.dataset.key 240 running_user = worker.dataset.creator 241 running_parent = worker.dataset.top_parent().key 242 else: 243 running_key = None 244 running_user = None 245 running_parent = None 246 247 running.append({ 248 "type": job["jobtype"], 249 "is_claimed": job["timestamp_claimed"] > 0, 250 "is_running": bool(worker), 251 "is_processor": hasattr(worker, "dataset"), 252 "is_recurring": (int(job["interval"]) > 0), 253 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 254 "dataset_key": running_key, 255 "dataset_user": running_user, 256 "dataset_parent_key": running_parent, 257 "timestamp_queued": job["timestamp"], 258 "timestamp_claimed": job["timestamp_lastclaimed"] 259 }) 260 261 return { 262 "running": running, 263 "queued": queue 264 } 265 266 267 268 # no appropriate response 269 return False 270 271 272class InternalAPIException(Exception): 273 # raised if API request could not be parsed 274 pass
9class InternalAPI(BasicWorker): 10 """ 11 Offer a local server that listens on a port for API calls and answers them 12 """ 13 type = "api" 14 max_workers = 1 15 16 ensure_job = {"remote_id": "localhost"} 17 18 host = None 19 port = None 20 21 def work(self): 22 """ 23 Listen for API requests 24 25 Opens a socket that continuously listens for requests, and passes a 26 client object on to a handling method if a connection is established 27 28 :return: 29 """ 30 self.host = self.config.get('API_HOST') 31 self.port = self.config.get('API_PORT') 32 33 if self.port == 0: 34 # if configured not to listen, just loop until the backend shuts 35 # down we can't return here immediately, since this is a worker, 36 # and workers that end just get started again 37 self.db.close() 38 self.manager.log.info("Local API not available per configuration") 39 while not self.interrupted: 40 time.sleep(1) 41 return 42 43 # set up the socket 44 server = socket.socket() 45 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 46 server.settimeout(2) # should be plenty 47 48 has_time = True 49 start_trying = int(time.time()) 50 while has_time: 51 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 52 try: 53 server.bind((self.host, self.port)) 54 break 55 except OSError as e: 56 if has_time and not self.interrupted: 57 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 58 time.sleep(10.0) # wait a few seconds before retrying 59 continue 60 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 61 return 62 except ConnectionRefusedError: 63 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 64 return 65 66 server.listen() 67 server.settimeout(2) 68 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 69 70 # continually listen for new connections 71 while not self.interrupted: 72 try: 73 client, address = server.accept() 74 except (socket.timeout, TimeoutError): 75 if self.interrupted: 76 break 77 # no problemo, just listen again - this only times out so it won't hang the entire app when 78 # trying to exit, as there's no other way to easily interrupt accept() 79 continue 80 81 self.api_response(client, address) 82 client.close() 83 84 self.manager.log.info("Shutting down local API server") 85 86 def api_response(self, client, address): 87 """ 88 Respond to API requests 89 90 Gets the request, parses it as JSON, and if it is indeed JSON and of 91 the proper format, a response is generated and returned as JSON via 92 the client object 93 94 :param client: Client object representing the connection 95 :param tuple address: (IP, port) of the request 96 :return: Response 97 """ 98 client.settimeout(2) # should be plenty 99 100 try: 101 buffer = "" 102 while True: 103 # receive data in 1k chunks 104 try: 105 data = client.recv(1024).decode("ascii") 106 except UnicodeDecodeError: 107 raise InternalAPIException 108 109 buffer += data 110 if not data or data.strip() == "" or len(data) > 2048: 111 break 112 113 # start processing as soon as we have valid json 114 try: 115 json.loads(buffer) 116 break 117 except json.JSONDecodeError: 118 pass 119 120 if not buffer: 121 raise InternalAPIException 122 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 123 # this means that no valid request was sent 124 self.manager.log.info("No input on API call from %s:%s - closing" % address) 125 return False 126 127 self.manager.log.debug("Received API request from %s:%s" % address) 128 129 try: 130 payload = json.loads(buffer) 131 if "request" not in payload: 132 raise InternalAPIException 133 134 response = self.process_request(payload["request"], payload) 135 if not response: 136 raise InternalAPIException 137 138 response = json.dumps({"error": False, "response": response}) 139 except (json.JSONDecodeError, InternalAPIException): 140 response = json.dumps({"error": "Invalid JSON"}) 141 142 try: 143 response = client.sendall(response.encode("ascii")) 144 except (BrokenPipeError, ConnectionError, socket.timeout): 145 response = None 146 147 return response 148 149 def process_request(self, request, payload): 150 """ 151 Generate API response 152 153 Checks the type of request, and returns an appropriate response for 154 the type. 155 156 :param str request: Request identifier 157 :param payload: Other data sent with the request 158 :return: API response 159 """ 160 if request == "cancel-job": 161 # cancel a running job 162 payload = payload.get("payload", {}) 163 remote_id = payload.get("remote_id") 164 jobtype = payload.get("jobtype") 165 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 166 167 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 168 return "OK" 169 170 elif request == "workers": 171 # return the number of workers, sorted by type 172 workers = {} 173 for jobtype in self.manager.worker_pool: 174 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 175 176 workers["total"] = sum([workers[workertype] for workertype in workers]) 177 178 return workers 179 180 if request == "jobs": 181 # return queued jobs, sorted by type 182 jobs = self.db.fetchall("SELECT * FROM jobs") 183 if jobs is None: 184 return {"error": "Database unavailable"} 185 186 response = {} 187 for job in jobs: 188 if job["jobtype"] not in response: 189 response[job["jobtype"]] = 0 190 response[job["jobtype"]] += 1 191 192 response["total"] = sum([response[jobtype] for jobtype in response]) 193 194 return response 195 196 if request == "datasets": 197 # datasets created per time period 198 week = 86400 * 7 199 now = int(time.time()) 200 201 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 202 203 response = { 204 "1h": 0, 205 "1d": 0, 206 "1w": 0 207 } 208 209 for item in items: 210 response["1w"] += 1 211 if item["timestamp"] > now - 3600: 212 response["1h"] += 1 213 if item["timestamp"] > now - 86400: 214 response["1d"] += 1 215 216 return response 217 218 if request == "worker-status": 219 # technically more 'job status' than 'worker status', this returns 220 # all jobs plus, for those that are currently active, some worker 221 # info as well as related datasets. useful to monitor server 222 # activity and judge whether 4CAT can safely be interrupted 223 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 224 running = [] 225 queue = {} 226 227 for job in open_jobs: 228 try: 229 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 230 except IndexError: 231 worker = None 232 233 if not bool(worker): 234 if job["jobtype"] not in queue: 235 queue[job["jobtype"]] = 0 236 237 queue[job["jobtype"]] += 1 238 else: 239 if hasattr(worker, "dataset") and worker.dataset: 240 running_key = worker.dataset.key 241 running_user = worker.dataset.creator 242 running_parent = worker.dataset.top_parent().key 243 else: 244 running_key = None 245 running_user = None 246 running_parent = None 247 248 running.append({ 249 "type": job["jobtype"], 250 "is_claimed": job["timestamp_claimed"] > 0, 251 "is_running": bool(worker), 252 "is_processor": hasattr(worker, "dataset"), 253 "is_recurring": (int(job["interval"]) > 0), 254 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 255 "dataset_key": running_key, 256 "dataset_user": running_user, 257 "dataset_parent_key": running_parent, 258 "timestamp_queued": job["timestamp"], 259 "timestamp_claimed": job["timestamp_lastclaimed"] 260 }) 261 262 return { 263 "running": running, 264 "queued": queue 265 } 266 267 268 269 # no appropriate response 270 return False
Offer a local server that listens on a port for API calls and answers them
def
work(self):
21 def work(self): 22 """ 23 Listen for API requests 24 25 Opens a socket that continuously listens for requests, and passes a 26 client object on to a handling method if a connection is established 27 28 :return: 29 """ 30 self.host = self.config.get('API_HOST') 31 self.port = self.config.get('API_PORT') 32 33 if self.port == 0: 34 # if configured not to listen, just loop until the backend shuts 35 # down we can't return here immediately, since this is a worker, 36 # and workers that end just get started again 37 self.db.close() 38 self.manager.log.info("Local API not available per configuration") 39 while not self.interrupted: 40 time.sleep(1) 41 return 42 43 # set up the socket 44 server = socket.socket() 45 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 46 server.settimeout(2) # should be plenty 47 48 has_time = True 49 start_trying = int(time.time()) 50 while has_time: 51 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 52 try: 53 server.bind((self.host, self.port)) 54 break 55 except OSError as e: 56 if has_time and not self.interrupted: 57 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 58 time.sleep(10.0) # wait a few seconds before retrying 59 continue 60 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 61 return 62 except ConnectionRefusedError: 63 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 64 return 65 66 server.listen() 67 server.settimeout(2) 68 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 69 70 # continually listen for new connections 71 while not self.interrupted: 72 try: 73 client, address = server.accept() 74 except (socket.timeout, TimeoutError): 75 if self.interrupted: 76 break 77 # no problemo, just listen again - this only times out so it won't hang the entire app when 78 # trying to exit, as there's no other way to easily interrupt accept() 79 continue 80 81 self.api_response(client, address) 82 client.close() 83 84 self.manager.log.info("Shutting down local API server")
Listen for API requests
Opens a socket that continuously listens for requests, and passes a client object on to a handling method if a connection is established
Returns
def
api_response(self, client, address):
86 def api_response(self, client, address): 87 """ 88 Respond to API requests 89 90 Gets the request, parses it as JSON, and if it is indeed JSON and of 91 the proper format, a response is generated and returned as JSON via 92 the client object 93 94 :param client: Client object representing the connection 95 :param tuple address: (IP, port) of the request 96 :return: Response 97 """ 98 client.settimeout(2) # should be plenty 99 100 try: 101 buffer = "" 102 while True: 103 # receive data in 1k chunks 104 try: 105 data = client.recv(1024).decode("ascii") 106 except UnicodeDecodeError: 107 raise InternalAPIException 108 109 buffer += data 110 if not data or data.strip() == "" or len(data) > 2048: 111 break 112 113 # start processing as soon as we have valid json 114 try: 115 json.loads(buffer) 116 break 117 except json.JSONDecodeError: 118 pass 119 120 if not buffer: 121 raise InternalAPIException 122 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 123 # this means that no valid request was sent 124 self.manager.log.info("No input on API call from %s:%s - closing" % address) 125 return False 126 127 self.manager.log.debug("Received API request from %s:%s" % address) 128 129 try: 130 payload = json.loads(buffer) 131 if "request" not in payload: 132 raise InternalAPIException 133 134 response = self.process_request(payload["request"], payload) 135 if not response: 136 raise InternalAPIException 137 138 response = json.dumps({"error": False, "response": response}) 139 except (json.JSONDecodeError, InternalAPIException): 140 response = json.dumps({"error": "Invalid JSON"}) 141 142 try: 143 response = client.sendall(response.encode("ascii")) 144 except (BrokenPipeError, ConnectionError, socket.timeout): 145 response = None 146 147 return response
Respond to API requests
Gets the request, parses it as JSON, and if it is indeed JSON and of the proper format, a response is generated and returned as JSON via the client object
Parameters
- client: Client object representing the connection
- tuple address: (IP, port) of the request
Returns
Response
def
process_request(self, request, payload):
149 def process_request(self, request, payload): 150 """ 151 Generate API response 152 153 Checks the type of request, and returns an appropriate response for 154 the type. 155 156 :param str request: Request identifier 157 :param payload: Other data sent with the request 158 :return: API response 159 """ 160 if request == "cancel-job": 161 # cancel a running job 162 payload = payload.get("payload", {}) 163 remote_id = payload.get("remote_id") 164 jobtype = payload.get("jobtype") 165 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 166 167 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 168 return "OK" 169 170 elif request == "workers": 171 # return the number of workers, sorted by type 172 workers = {} 173 for jobtype in self.manager.worker_pool: 174 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 175 176 workers["total"] = sum([workers[workertype] for workertype in workers]) 177 178 return workers 179 180 if request == "jobs": 181 # return queued jobs, sorted by type 182 jobs = self.db.fetchall("SELECT * FROM jobs") 183 if jobs is None: 184 return {"error": "Database unavailable"} 185 186 response = {} 187 for job in jobs: 188 if job["jobtype"] not in response: 189 response[job["jobtype"]] = 0 190 response[job["jobtype"]] += 1 191 192 response["total"] = sum([response[jobtype] for jobtype in response]) 193 194 return response 195 196 if request == "datasets": 197 # datasets created per time period 198 week = 86400 * 7 199 now = int(time.time()) 200 201 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 202 203 response = { 204 "1h": 0, 205 "1d": 0, 206 "1w": 0 207 } 208 209 for item in items: 210 response["1w"] += 1 211 if item["timestamp"] > now - 3600: 212 response["1h"] += 1 213 if item["timestamp"] > now - 86400: 214 response["1d"] += 1 215 216 return response 217 218 if request == "worker-status": 219 # technically more 'job status' than 'worker status', this returns 220 # all jobs plus, for those that are currently active, some worker 221 # info as well as related datasets. useful to monitor server 222 # activity and judge whether 4CAT can safely be interrupted 223 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 224 running = [] 225 queue = {} 226 227 for job in open_jobs: 228 try: 229 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 230 except IndexError: 231 worker = None 232 233 if not bool(worker): 234 if job["jobtype"] not in queue: 235 queue[job["jobtype"]] = 0 236 237 queue[job["jobtype"]] += 1 238 else: 239 if hasattr(worker, "dataset") and worker.dataset: 240 running_key = worker.dataset.key 241 running_user = worker.dataset.creator 242 running_parent = worker.dataset.top_parent().key 243 else: 244 running_key = None 245 running_user = None 246 running_parent = None 247 248 running.append({ 249 "type": job["jobtype"], 250 "is_claimed": job["timestamp_claimed"] > 0, 251 "is_running": bool(worker), 252 "is_processor": hasattr(worker, "dataset"), 253 "is_recurring": (int(job["interval"]) > 0), 254 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 255 "dataset_key": running_key, 256 "dataset_user": running_user, 257 "dataset_parent_key": running_parent, 258 "timestamp_queued": job["timestamp"], 259 "timestamp_claimed": job["timestamp_lastclaimed"] 260 }) 261 262 return { 263 "running": running, 264 "queued": queue 265 } 266 267 268 269 # no appropriate response 270 return False
Generate API response
Checks the type of request, and returns an appropriate response for the type.
Parameters
- str request: Request identifier
- payload: Other data sent with the request
Returns
API response
class
InternalAPIException(builtins.Exception):
Common base class for all non-exit exceptions.