backend.workers.api
1import socket 2import time 3import json 4 5from common.config_manager import config 6from backend.lib.worker import BasicWorker 7 8 9class InternalAPI(BasicWorker): 10 """ 11 Offer a local server that listens on a port for API calls and answers them 12 """ 13 type = "api" 14 max_workers = 1 15 16 ensure_job = {"remote_id": "localhost"} 17 18 host = config.get('API_HOST') 19 port = config.get('API_PORT') 20 21 def work(self): 22 """ 23 Listen for API requests 24 25 Opens a socket that continuously listens for requests, and passes a 26 client object on to a handling method if a connection is established 27 28 :return: 29 """ 30 if self.port == 0: 31 # if configured not to listen, just loop until the backend shuts 32 # down we can't return here immediately, since this is a worker, 33 # and workers that end just get started again 34 self.db.close() 35 self.manager.log.info("Local API not available per configuration") 36 while not self.interrupted: 37 time.sleep(1) 38 return 39 40 # set up the socket 41 server = socket.socket() 42 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 43 server.settimeout(2) # should be plenty 44 45 has_time = True 46 start_trying = int(time.time()) 47 while has_time: 48 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 49 try: 50 server.bind((self.host, self.port)) 51 break 52 except OSError as e: 53 if has_time and not self.interrupted: 54 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 55 time.sleep(10.0) # wait a few seconds before retrying 56 continue 57 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 58 return 59 except ConnectionRefusedError: 60 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 61 return 62 63 server.listen() 64 server.settimeout(2) 65 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 66 67 # continually listen for new connections 68 while not self.interrupted: 69 try: 70 client, address = server.accept() 71 except (socket.timeout, TimeoutError) as e: 72 if self.interrupted: 73 break 74 # no problemo, just listen again - this only times out so it won't hang the entire app when 75 # trying to exit, as there's no other way to easily interrupt accept() 76 continue 77 78 self.api_response(client, address) 79 client.close() 80 81 self.manager.log.info("Shutting down local API server") 82 83 def api_response(self, client, address): 84 """ 85 Respond to API requests 86 87 Gets the request, parses it as JSON, and if it is indeed JSON and of 88 the proper format, a response is generated and returned as JSON via 89 the client object 90 91 :param client: Client object representing the connection 92 :param tuple address: (IP, port) of the request 93 :return: Response 94 """ 95 client.settimeout(2) # should be plenty 96 97 try: 98 buffer = "" 99 while True: 100 # receive data in 1k chunks 101 try: 102 data = client.recv(1024).decode("ascii") 103 except UnicodeDecodeError: 104 raise InternalAPIException 105 106 buffer += data 107 if not data or data.strip() == "" or len(data) > 2048: 108 break 109 110 # start processing as soon as we have valid json 111 try: 112 test = json.loads(buffer) 113 break 114 except json.JSONDecodeError: 115 pass 116 117 if not buffer: 118 raise InternalAPIException 119 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 120 # this means that no valid request was sent 121 self.manager.log.info("No input on API call from %s:%s - closing" % address) 122 return False 123 124 self.manager.log.debug("Received API request from %s:%s" % address) 125 126 try: 127 payload = json.loads(buffer) 128 if "request" not in payload: 129 raise InternalAPIException 130 131 response = self.process_request(payload["request"], payload) 132 if not response: 133 raise InternalAPIException 134 135 response = json.dumps({"error": False, "response": response}) 136 except (json.JSONDecodeError, InternalAPIException): 137 response = json.dumps({"error": "Invalid JSON"}) 138 139 try: 140 response = client.sendall(response.encode("ascii")) 141 except (BrokenPipeError, ConnectionError, socket.timeout): 142 response = None 143 144 return response 145 146 def process_request(self, request, payload): 147 """ 148 Generate API response 149 150 Checks the type of request, and returns an appropriate response for 151 the type. 152 153 :param str request: Request identifier 154 :param payload: Other data sent with the request 155 :return: API response 156 """ 157 if request == "cancel-job": 158 # cancel a running job 159 payload = payload.get("payload", {}) 160 remote_id = payload.get("remote_id") 161 jobtype = payload.get("jobtype") 162 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 163 164 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 165 return "OK" 166 167 elif request == "workers": 168 # return the number of workers, sorted by type 169 workers = {} 170 for jobtype in self.manager.worker_pool: 171 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 172 173 workers["total"] = sum([workers[workertype] for workertype in workers]) 174 175 return workers 176 177 if request == "jobs": 178 # return queued jobs, sorted by type 179 jobs = self.db.fetchall("SELECT * FROM jobs") 180 if jobs is None: 181 return {"error": "Database unavailable"} 182 183 response = {} 184 for job in jobs: 185 if job["jobtype"] not in response: 186 response[job["jobtype"]] = 0 187 response[job["jobtype"]] += 1 188 189 response["total"] = sum([response[jobtype] for jobtype in response]) 190 191 return response 192 193 if request == "datasets": 194 # datasets created per time period 195 week = 86400 * 7 196 now = int(time.time()) 197 198 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 199 200 response = { 201 "1h": 0, 202 "1d": 0, 203 "1w": 0 204 } 205 206 for item in items: 207 response["1w"] += 1 208 if item["timestamp"] > now - 3600: 209 response["1h"] += 1 210 if item["timestamp"] > now - 86400: 211 response["1d"] += 1 212 213 return response 214 215 if request == "worker-status": 216 # technically more 'job status' than 'worker status', this returns 217 # all jobs plus, for those that are currently active, some worker 218 # info as well as related datasets. useful to monitor server 219 # activity and judge whether 4CAT can safely be interrupted 220 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 221 running = [] 222 queue = {} 223 224 for job in open_jobs: 225 try: 226 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 227 except IndexError: 228 worker = None 229 230 if not bool(worker): 231 if job["jobtype"] not in queue: 232 queue[job["jobtype"]] = 0 233 234 queue[job["jobtype"]] += 1 235 else: 236 if hasattr(worker, "dataset") and worker.dataset: 237 running_key = worker.dataset.key 238 running_user = worker.dataset.creator 239 running_parent = worker.dataset.top_parent().key 240 else: 241 running_key = None 242 running_user = None 243 running_parent = None 244 245 running.append({ 246 "type": job["jobtype"], 247 "is_claimed": job["timestamp_claimed"] > 0, 248 "is_running": bool(worker), 249 "is_processor": hasattr(worker, "dataset"), 250 "is_recurring": (int(job["interval"]) > 0), 251 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 252 "dataset_key": running_key, 253 "dataset_user": running_user, 254 "dataset_parent_key": running_parent, 255 "timestamp_queued": job["timestamp"], 256 "timestamp_claimed": job["timestamp_lastclaimed"] 257 }) 258 259 return { 260 "running": running, 261 "queued": queue 262 } 263 264 265 266 # no appropriate response 267 return False 268 269 270class InternalAPIException(Exception): 271 # raised if API request could not be parsed 272 pass
10class InternalAPI(BasicWorker): 11 """ 12 Offer a local server that listens on a port for API calls and answers them 13 """ 14 type = "api" 15 max_workers = 1 16 17 ensure_job = {"remote_id": "localhost"} 18 19 host = config.get('API_HOST') 20 port = config.get('API_PORT') 21 22 def work(self): 23 """ 24 Listen for API requests 25 26 Opens a socket that continuously listens for requests, and passes a 27 client object on to a handling method if a connection is established 28 29 :return: 30 """ 31 if self.port == 0: 32 # if configured not to listen, just loop until the backend shuts 33 # down we can't return here immediately, since this is a worker, 34 # and workers that end just get started again 35 self.db.close() 36 self.manager.log.info("Local API not available per configuration") 37 while not self.interrupted: 38 time.sleep(1) 39 return 40 41 # set up the socket 42 server = socket.socket() 43 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 44 server.settimeout(2) # should be plenty 45 46 has_time = True 47 start_trying = int(time.time()) 48 while has_time: 49 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 50 try: 51 server.bind((self.host, self.port)) 52 break 53 except OSError as e: 54 if has_time and not self.interrupted: 55 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 56 time.sleep(10.0) # wait a few seconds before retrying 57 continue 58 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 59 return 60 except ConnectionRefusedError: 61 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 62 return 63 64 server.listen() 65 server.settimeout(2) 66 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 67 68 # continually listen for new connections 69 while not self.interrupted: 70 try: 71 client, address = server.accept() 72 except (socket.timeout, TimeoutError) as e: 73 if self.interrupted: 74 break 75 # no problemo, just listen again - this only times out so it won't hang the entire app when 76 # trying to exit, as there's no other way to easily interrupt accept() 77 continue 78 79 self.api_response(client, address) 80 client.close() 81 82 self.manager.log.info("Shutting down local API server") 83 84 def api_response(self, client, address): 85 """ 86 Respond to API requests 87 88 Gets the request, parses it as JSON, and if it is indeed JSON and of 89 the proper format, a response is generated and returned as JSON via 90 the client object 91 92 :param client: Client object representing the connection 93 :param tuple address: (IP, port) of the request 94 :return: Response 95 """ 96 client.settimeout(2) # should be plenty 97 98 try: 99 buffer = "" 100 while True: 101 # receive data in 1k chunks 102 try: 103 data = client.recv(1024).decode("ascii") 104 except UnicodeDecodeError: 105 raise InternalAPIException 106 107 buffer += data 108 if not data or data.strip() == "" or len(data) > 2048: 109 break 110 111 # start processing as soon as we have valid json 112 try: 113 test = json.loads(buffer) 114 break 115 except json.JSONDecodeError: 116 pass 117 118 if not buffer: 119 raise InternalAPIException 120 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 121 # this means that no valid request was sent 122 self.manager.log.info("No input on API call from %s:%s - closing" % address) 123 return False 124 125 self.manager.log.debug("Received API request from %s:%s" % address) 126 127 try: 128 payload = json.loads(buffer) 129 if "request" not in payload: 130 raise InternalAPIException 131 132 response = self.process_request(payload["request"], payload) 133 if not response: 134 raise InternalAPIException 135 136 response = json.dumps({"error": False, "response": response}) 137 except (json.JSONDecodeError, InternalAPIException): 138 response = json.dumps({"error": "Invalid JSON"}) 139 140 try: 141 response = client.sendall(response.encode("ascii")) 142 except (BrokenPipeError, ConnectionError, socket.timeout): 143 response = None 144 145 return response 146 147 def process_request(self, request, payload): 148 """ 149 Generate API response 150 151 Checks the type of request, and returns an appropriate response for 152 the type. 153 154 :param str request: Request identifier 155 :param payload: Other data sent with the request 156 :return: API response 157 """ 158 if request == "cancel-job": 159 # cancel a running job 160 payload = payload.get("payload", {}) 161 remote_id = payload.get("remote_id") 162 jobtype = payload.get("jobtype") 163 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 164 165 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 166 return "OK" 167 168 elif request == "workers": 169 # return the number of workers, sorted by type 170 workers = {} 171 for jobtype in self.manager.worker_pool: 172 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 173 174 workers["total"] = sum([workers[workertype] for workertype in workers]) 175 176 return workers 177 178 if request == "jobs": 179 # return queued jobs, sorted by type 180 jobs = self.db.fetchall("SELECT * FROM jobs") 181 if jobs is None: 182 return {"error": "Database unavailable"} 183 184 response = {} 185 for job in jobs: 186 if job["jobtype"] not in response: 187 response[job["jobtype"]] = 0 188 response[job["jobtype"]] += 1 189 190 response["total"] = sum([response[jobtype] for jobtype in response]) 191 192 return response 193 194 if request == "datasets": 195 # datasets created per time period 196 week = 86400 * 7 197 now = int(time.time()) 198 199 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 200 201 response = { 202 "1h": 0, 203 "1d": 0, 204 "1w": 0 205 } 206 207 for item in items: 208 response["1w"] += 1 209 if item["timestamp"] > now - 3600: 210 response["1h"] += 1 211 if item["timestamp"] > now - 86400: 212 response["1d"] += 1 213 214 return response 215 216 if request == "worker-status": 217 # technically more 'job status' than 'worker status', this returns 218 # all jobs plus, for those that are currently active, some worker 219 # info as well as related datasets. useful to monitor server 220 # activity and judge whether 4CAT can safely be interrupted 221 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 222 running = [] 223 queue = {} 224 225 for job in open_jobs: 226 try: 227 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 228 except IndexError: 229 worker = None 230 231 if not bool(worker): 232 if job["jobtype"] not in queue: 233 queue[job["jobtype"]] = 0 234 235 queue[job["jobtype"]] += 1 236 else: 237 if hasattr(worker, "dataset") and worker.dataset: 238 running_key = worker.dataset.key 239 running_user = worker.dataset.creator 240 running_parent = worker.dataset.top_parent().key 241 else: 242 running_key = None 243 running_user = None 244 running_parent = None 245 246 running.append({ 247 "type": job["jobtype"], 248 "is_claimed": job["timestamp_claimed"] > 0, 249 "is_running": bool(worker), 250 "is_processor": hasattr(worker, "dataset"), 251 "is_recurring": (int(job["interval"]) > 0), 252 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 253 "dataset_key": running_key, 254 "dataset_user": running_user, 255 "dataset_parent_key": running_parent, 256 "timestamp_queued": job["timestamp"], 257 "timestamp_claimed": job["timestamp_lastclaimed"] 258 }) 259 260 return { 261 "running": running, 262 "queued": queue 263 } 264 265 266 267 # no appropriate response 268 return False
Offer a local server that listens on a port for API calls and answers them
def
work(self):
22 def work(self): 23 """ 24 Listen for API requests 25 26 Opens a socket that continuously listens for requests, and passes a 27 client object on to a handling method if a connection is established 28 29 :return: 30 """ 31 if self.port == 0: 32 # if configured not to listen, just loop until the backend shuts 33 # down we can't return here immediately, since this is a worker, 34 # and workers that end just get started again 35 self.db.close() 36 self.manager.log.info("Local API not available per configuration") 37 while not self.interrupted: 38 time.sleep(1) 39 return 40 41 # set up the socket 42 server = socket.socket() 43 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 44 server.settimeout(2) # should be plenty 45 46 has_time = True 47 start_trying = int(time.time()) 48 while has_time: 49 has_time = start_trying > time.time() - 300 # stop trying after 5 minutes 50 try: 51 server.bind((self.host, self.port)) 52 break 53 except OSError as e: 54 if has_time and not self.interrupted: 55 self.manager.log.info("Could not open port %i yet (%s), retrying in 10 seconds" % (self.port, e)) 56 time.sleep(10.0) # wait a few seconds before retrying 57 continue 58 self.manager.log.error("Port %s is already in use! Local API not available. Check if a residual 4CAT process may still be listening at the port." % self.port) 59 return 60 except ConnectionRefusedError: 61 self.manager.log.error("OS refused listening at port %i! Local API not available." % self.port) 62 return 63 64 server.listen() 65 server.settimeout(2) 66 self.manager.log.info("Local API listening for requests at %s:%s" % (self.host, self.port)) 67 68 # continually listen for new connections 69 while not self.interrupted: 70 try: 71 client, address = server.accept() 72 except (socket.timeout, TimeoutError) as e: 73 if self.interrupted: 74 break 75 # no problemo, just listen again - this only times out so it won't hang the entire app when 76 # trying to exit, as there's no other way to easily interrupt accept() 77 continue 78 79 self.api_response(client, address) 80 client.close() 81 82 self.manager.log.info("Shutting down local API server")
Listen for API requests
Opens a socket that continuously listens for requests, and passes a client object on to a handling method if a connection is established
Returns
def
api_response(self, client, address):
84 def api_response(self, client, address): 85 """ 86 Respond to API requests 87 88 Gets the request, parses it as JSON, and if it is indeed JSON and of 89 the proper format, a response is generated and returned as JSON via 90 the client object 91 92 :param client: Client object representing the connection 93 :param tuple address: (IP, port) of the request 94 :return: Response 95 """ 96 client.settimeout(2) # should be plenty 97 98 try: 99 buffer = "" 100 while True: 101 # receive data in 1k chunks 102 try: 103 data = client.recv(1024).decode("ascii") 104 except UnicodeDecodeError: 105 raise InternalAPIException 106 107 buffer += data 108 if not data or data.strip() == "" or len(data) > 2048: 109 break 110 111 # start processing as soon as we have valid json 112 try: 113 test = json.loads(buffer) 114 break 115 except json.JSONDecodeError: 116 pass 117 118 if not buffer: 119 raise InternalAPIException 120 except (socket.timeout, TimeoutError, ConnectionError, InternalAPIException): 121 # this means that no valid request was sent 122 self.manager.log.info("No input on API call from %s:%s - closing" % address) 123 return False 124 125 self.manager.log.debug("Received API request from %s:%s" % address) 126 127 try: 128 payload = json.loads(buffer) 129 if "request" not in payload: 130 raise InternalAPIException 131 132 response = self.process_request(payload["request"], payload) 133 if not response: 134 raise InternalAPIException 135 136 response = json.dumps({"error": False, "response": response}) 137 except (json.JSONDecodeError, InternalAPIException): 138 response = json.dumps({"error": "Invalid JSON"}) 139 140 try: 141 response = client.sendall(response.encode("ascii")) 142 except (BrokenPipeError, ConnectionError, socket.timeout): 143 response = None 144 145 return response
Respond to API requests
Gets the request, parses it as JSON, and if it is indeed JSON and of the proper format, a response is generated and returned as JSON via the client object
Parameters
- client: Client object representing the connection
- tuple address: (IP, port) of the request
Returns
Response
def
process_request(self, request, payload):
147 def process_request(self, request, payload): 148 """ 149 Generate API response 150 151 Checks the type of request, and returns an appropriate response for 152 the type. 153 154 :param str request: Request identifier 155 :param payload: Other data sent with the request 156 :return: API response 157 """ 158 if request == "cancel-job": 159 # cancel a running job 160 payload = payload.get("payload", {}) 161 remote_id = payload.get("remote_id") 162 jobtype = payload.get("jobtype") 163 level = payload.get("level", BasicWorker.INTERRUPT_RETRY) 164 165 self.manager.request_interrupt(remote_id=remote_id, jobtype=jobtype, interrupt_level=level) 166 return "OK" 167 168 elif request == "workers": 169 # return the number of workers, sorted by type 170 workers = {} 171 for jobtype in self.manager.worker_pool: 172 workers[jobtype] = len(self.manager.worker_pool[jobtype]) 173 174 workers["total"] = sum([workers[workertype] for workertype in workers]) 175 176 return workers 177 178 if request == "jobs": 179 # return queued jobs, sorted by type 180 jobs = self.db.fetchall("SELECT * FROM jobs") 181 if jobs is None: 182 return {"error": "Database unavailable"} 183 184 response = {} 185 for job in jobs: 186 if job["jobtype"] not in response: 187 response[job["jobtype"]] = 0 188 response[job["jobtype"]] += 1 189 190 response["total"] = sum([response[jobtype] for jobtype in response]) 191 192 return response 193 194 if request == "datasets": 195 # datasets created per time period 196 week = 86400 * 7 197 now = int(time.time()) 198 199 items = self.db.fetchall("SELECT * FROM datasets WHERE timestamp > %s ORDER BY timestamp ASC", (now - week,)) 200 201 response = { 202 "1h": 0, 203 "1d": 0, 204 "1w": 0 205 } 206 207 for item in items: 208 response["1w"] += 1 209 if item["timestamp"] > now - 3600: 210 response["1h"] += 1 211 if item["timestamp"] > now - 86400: 212 response["1d"] += 1 213 214 return response 215 216 if request == "worker-status": 217 # technically more 'job status' than 'worker status', this returns 218 # all jobs plus, for those that are currently active, some worker 219 # info as well as related datasets. useful to monitor server 220 # activity and judge whether 4CAT can safely be interrupted 221 open_jobs = self.db.fetchall("SELECT jobtype, timestamp, timestamp_claimed, timestamp_lastclaimed, interval, remote_id FROM jobs ORDER BY jobtype ASC, timestamp ASC, remote_id ASC") 222 running = [] 223 queue = {} 224 225 for job in open_jobs: 226 try: 227 worker = list(filter(lambda worker: worker.job.data["jobtype"] == job["jobtype"] and worker.job.data["remote_id"] == job["remote_id"], self.manager.worker_pool.get(job["jobtype"], [])))[0] 228 except IndexError: 229 worker = None 230 231 if not bool(worker): 232 if job["jobtype"] not in queue: 233 queue[job["jobtype"]] = 0 234 235 queue[job["jobtype"]] += 1 236 else: 237 if hasattr(worker, "dataset") and worker.dataset: 238 running_key = worker.dataset.key 239 running_user = worker.dataset.creator 240 running_parent = worker.dataset.top_parent().key 241 else: 242 running_key = None 243 running_user = None 244 running_parent = None 245 246 running.append({ 247 "type": job["jobtype"], 248 "is_claimed": job["timestamp_claimed"] > 0, 249 "is_running": bool(worker), 250 "is_processor": hasattr(worker, "dataset"), 251 "is_recurring": (int(job["interval"]) > 0), 252 "is_maybe_crashed": job["timestamp_claimed"] > 0 and not worker, 253 "dataset_key": running_key, 254 "dataset_user": running_user, 255 "dataset_parent_key": running_parent, 256 "timestamp_queued": job["timestamp"], 257 "timestamp_claimed": job["timestamp_lastclaimed"] 258 }) 259 260 return { 261 "running": running, 262 "queued": queue 263 } 264 265 266 267 # no appropriate response 268 return False
Generate API response
Checks the type of request, and returns an appropriate response for the type.
Parameters
- str request: Request identifier
- payload: Other data sent with the request
Returns
API response
class
InternalAPIException(builtins.Exception):
Common base class for all non-exit exceptions.