backend.lib.proxied_requests
1from requests_futures.sessions import FuturesSession 2from concurrent.futures import ThreadPoolExecutor 3 4import time 5import urllib3 6import ural 7import requests 8from collections import namedtuple 9 10 11class FailedProxiedRequest: 12 """ 13 A delegated request that has failed for whatever reason 14 15 The failure context (usually the exception) is stored in the `context` 16 property. 17 """ 18 19 context = None 20 21 def __init__(self, context=None): 22 self.context = context 23 24 25class ProxyStatus: 26 """ 27 An enum of possible statuses of a SophisticatedFuturesProxy 28 """ 29 30 AVAILABLE = 3 31 CLAIMED = 4 32 RUNNING = 5 33 COOLING_OFF = 6 34 35 36class SophisticatedFuturesProxy: 37 """ 38 A proxy that can be used in combination with the DelegatedRequestHandler 39 40 This keeps track of cooloffs, etc, to ensure that any individual proxy does 41 not request more often than it should. This is a separate class because of 42 an additional piece of logic that allows this cooloff to be kept track of 43 on a per-hostname basis. This is useful because rate limits are typically 44 enforced per site, so we can have (figuratively) unlimited concurrent 45 request as long as each is on a separate hostname, but need to be more 46 careful when requesting from a single host. 47 """ 48 49 log = None 50 looping = True 51 52 COOLOFF = 0 53 MAX_CONCURRENT_OVERALL = 0 54 MAX_CONCURRENT_PER_HOST = 0 55 56 def __init__( 57 self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2 58 ): 59 self.proxy_url = url 60 self.hostnames = {} 61 self.log = log 62 63 self.COOLOFF = cooloff 64 self.MAX_CONCURRENT_OVERALL = concurrent_overall 65 self.MAX_CONCURRENT_PER_HOST = concurrent_host 66 67 def know_hostname(self, url): 68 """ 69 Make sure the hostname is known to this proxy 70 71 This means that we can now keep track of some per-hostname statistics 72 for this hostname. If the hostname is not known yet, the statistics are 73 re-initialised. 74 75 :param str url: URL with host name to keep stats for. Case-insensitive. 76 :param str: The host name, as parsed for internal use 77 """ 78 hostname = ural.get_hostname(url).lower() 79 if hostname not in self.hostnames: 80 self.hostnames[hostname] = namedtuple( 81 "HostnameForProxiedRequests", ("running",) 82 ) 83 self.hostnames[hostname].running = [] 84 85 return hostname 86 87 def release_cooled_off(self): 88 """ 89 Release proxies that have finished cooling off. 90 91 Proxies cool off for a certain amount of time after starting a request. 92 This method removes cooled off requests, so that new ones may fill 93 their slot. 94 """ 95 for hostname, metadata in self.hostnames.copy().items(): 96 for request in metadata.running: 97 if ( 98 request.status == ProxyStatus.COOLING_OFF 99 and request.timestamp_finished < time.time() - self.COOLOFF 100 ): 101 self.log.debug( 102 f"Releasing proxy {self.proxy_url} for host name {hostname}" 103 ) 104 self.hostnames[hostname].running.remove(request) 105 106 # get rid of hostnames with no running or cooling off 107 # requests, else this might grow indefinitely 108 if len(self.hostnames[hostname].running) == 0: 109 del self.hostnames[hostname] 110 111 def claim_for(self, url): 112 """ 113 Try claiming a slot in this proxy for the given URL 114 115 Whether a slot is available depends both on the overall concurrency 116 limit, and the per-hostname limit. If both are not maxed out, fill 117 the slot and return the proxy object. 118 119 :param str url: URL to proxy a request for. 120 :return: `False` if no proxy is available, or the 121 `SophisticatedFuturesProxy` object if one is. 122 """ 123 self.release_cooled_off() 124 hostname = self.know_hostname(url) 125 126 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 127 if total_running >= self.MAX_CONCURRENT_OVERALL: 128 return False 129 130 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 131 request = namedtuple( 132 "ProxiedRequest", 133 ("url", "status", "timestamp_started", "timestamp_finished"), 134 ) 135 request.url = url 136 request.status = ProxyStatus.CLAIMED 137 request.timestamp_started = 0 138 request.timestamp_finished = 0 139 self.hostnames[hostname].running.append(request) 140 self.log.debug( 141 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 142 ) 143 return self 144 else: 145 return False 146 147 def mark_request_started(self, url): 148 """ 149 Mark a request for a URL as started 150 151 This updates the status for the related slot. If no matching slot 152 exists that is waiting for a request to start running, a `ValueError` 153 is raised. 154 155 :param str url: URL of the proxied request. 156 """ 157 hostname = self.know_hostname(url) 158 159 for i, metadata in enumerate(self.hostnames[hostname].running): 160 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 161 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 162 self.hostnames[hostname].running[i].timestamp_started = time.time() 163 return 164 165 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!") 166 167 def mark_request_finished(self, url): 168 """ 169 Mark a request for a URL as finished 170 171 This updates the status for the related slot. If no matching slot 172 exists that is waiting for a request to finish, a `ValueError` is 173 raised. After this, the proxy will be marked as cooling off, and is 174 released after cooling off is completed. 175 176 :param str url: URL of the proxied request. 177 """ 178 hostname = self.know_hostname(url) 179 180 for i, metadata in enumerate(self.hostnames[hostname].running): 181 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 182 self.hostnames[hostname].running[i].timestamp_finished = time.time() 183 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 184 return 185 186 raise ValueError(f"No proxy is currently running a request for URL {url}!") 187 188 189class DelegatedRequestHandler: 190 queue = {} 191 session = None 192 proxy_pool = {} 193 proxy_settings = {} 194 halted = set() 195 log = None 196 index = 0 197 198 # some magic values 199 REQUEST_STATUS_QUEUED = 0 200 REQUEST_STATUS_STARTED = 1 201 REQUEST_STATUS_WAITING_FOR_YIELD = 2 202 PROXY_LOCALHOST = "__localhost__" 203 204 def __init__(self, log, config): 205 pool = ThreadPoolExecutor() 206 self.session = FuturesSession(executor=pool) 207 self.log = log 208 209 self.refresh_settings(config) 210 211 def refresh_settings(self, config): 212 """ 213 Load proxy settings 214 215 This is done on demand, so that we do not need a persistent 216 configuration reader, which could make things complicated in 217 thread-based contexts. 218 219 :param config: Configuration reader 220 """ 221 222 self.proxy_settings = { 223 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 224 "proxies.concurrent-host", "proxies.concurrent-host") 225 } 226 227 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 228 """ 229 Add URLs to the request queue 230 231 :param urls: An iterable of URLs. 232 :param queue_name: Queue name to add to. 233 :param position: Where in queue to insert; -1 adds to end of queue 234 :param kwargs: Other keyword arguments will be passed on to 235 `requests.get()` 236 """ 237 if queue_name in self.halted or "_" in self.halted: 238 # do not add URLs while delegator is shutting down 239 return 240 241 if queue_name not in self.queue: 242 self.queue[queue_name] = [] 243 244 for i, url in enumerate(urls): 245 url_metadata = namedtuple( 246 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 247 ) 248 url_metadata.url = url 249 url_metadata.kwargs = kwargs 250 url_metadata.index = self.index 251 url_metadata.proxied = None 252 url_metadata.status = self.REQUEST_STATUS_QUEUED 253 self.index += 1 254 255 if position == -1: 256 self.queue[queue_name].append(url_metadata) 257 else: 258 self.queue[queue_name].insert(position + i, url_metadata) 259 260 self.manage_requests() 261 262 def get_queue_length(self, queue_name="_"): 263 """ 264 Get the length, of the queue 265 266 :param str queue_name: Queue name 267 :return int: Amount of URLs in the queue (regardless of status) 268 """ 269 queue_length = 0 270 for queue in self.queue: 271 if queue == queue_name or queue_name == "_": 272 queue_length += len(self.queue[queue_name]) 273 274 return queue_length 275 276 def claim_proxy(self, url): 277 """ 278 Find a proxy to do the request with 279 280 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 281 282 :param str url: URL to proxy a request for 283 :return SophisticatedFuturesProxy or False: 284 """ 285 if not self.proxy_pool: 286 # this will trigger the first time this method is called 287 # build a proxy pool with some information per available proxy 288 proxies = self.proxy_settings["proxies.urls"] 289 for proxy_url in proxies: 290 self.proxy_pool[proxy_url] = namedtuple( 291 "ProxyEntry", ("proxy", "last_used") 292 ) 293 self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy( 294 proxy_url, 295 self.log, 296 self.proxy_settings["proxies.cooloff"], 297 self.proxy_settings["proxies.concurrent-overall"], 298 self.proxy_settings["proxies.concurrent-host"], 299 ) 300 self.proxy_pool[proxy_url].last_used = 0 301 302 self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.") 303 304 # within the pool, find the least recently used proxy that is available 305 sorted_by_cooloff = sorted( 306 self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used 307 ) 308 for proxy_id in sorted_by_cooloff: 309 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 310 if claimed_proxy: 311 self.proxy_pool[proxy_id].last_used = time.time() 312 return claimed_proxy 313 314 return False 315 316 def manage_requests(self): 317 """ 318 Manage requests asynchronously 319 320 First, make sure proxy status is up to date; then go through the list 321 of queued URLs and see if they have been requested, and release the 322 proxy accordingly. If the URL is not being requested, and a proxy is 323 available, start the request. 324 325 Note that this method does *not* return any requested data. This is 326 done in a separate function, which calls this one before returning any 327 finished requests in the original queue order (`get_results()`). 328 """ 329 # go through queue and look at the status of each URL 330 for queue_name in self.queue: 331 for i, url_metadata in enumerate(self.queue[queue_name]): 332 url = url_metadata.url 333 334 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 335 # waiting to be flushed or passed by `get_result()` 336 continue 337 338 if url_metadata.proxied and url_metadata.proxied.request.done(): 339 # collect result and buffer it for yielding 340 # done() here doesn't necessarily mean the request finished 341 # successfully, just that it has returned - a timed out 342 # request will also be done()! 343 self.log.debug(f"Request for {url} finished, collecting result") 344 url_metadata.proxied.proxy.mark_request_finished(url) 345 try: 346 response = url_metadata.proxied.request.result() 347 url_metadata.proxied.result = response 348 349 except ( 350 ConnectionError, 351 requests.exceptions.RequestException, 352 urllib3.exceptions.HTTPError, 353 ) as e: 354 # this is where timeouts, etc, go 355 url_metadata.proxied.result = FailedProxiedRequest(e) 356 357 finally: 358 # success or fail, we can pass it on 359 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 360 361 else: 362 # running - ignore for now 363 # could do some health checks here... 364 # logging.debug(f"Request for {url} running...") 365 pass 366 367 if not url_metadata.proxied and not ( 368 queue_name in self.halted or "_" in self.halted 369 ): 370 # no request running for this URL yet, try to start one 371 proxy = self.claim_proxy(url) 372 if not proxy: 373 # no available proxies, try again next loop 374 continue 375 376 proxy_url = proxy.proxy_url 377 proxy_definition = ( 378 {"http": proxy_url, "https": proxy_url} 379 if proxy_url != self.PROXY_LOCALHOST 380 else None 381 ) 382 383 # start request for URL 384 self.log.debug(f"Request for {url} started") 385 request = namedtuple( 386 "DelegatedRequest", 387 ( 388 "request", 389 "created", 390 "result", 391 "proxy", 392 "url", 393 "index", 394 ), 395 ) 396 request.created = time.time() 397 request.request = self.session.get( 398 **{ 399 "url": url, 400 "timeout": 30, 401 "proxies": proxy_definition, 402 **url_metadata.kwargs 403 } 404 ) 405 406 request.proxy = proxy 407 request.url = url 408 request.index = ( 409 url_metadata.index 410 ) # this is to allow for multiple requests for the same URL 411 412 url_metadata.status = self.REQUEST_STATUS_STARTED 413 url_metadata.proxied = request 414 415 proxy.mark_request_started(url) 416 417 self.queue[queue_name][i] = url_metadata 418 419 def get_results(self, queue_name="_", preserve_order=True): 420 """ 421 Return available results, without skipping 422 423 Loops through the queue, returning values (and updating the queue) for 424 requests that have been finished. If a request is not finished yet, 425 stop returning. This ensures that in the end, values are only ever 426 returned in the original queue order, at the cost of potential 427 buffering. 428 429 :param str queue_name: Queue name to get results from 430 :param bool preserve_order: Return results in the order they were 431 added to the queue. This means that other results are buffered and 432 potentially remain in the queue, which may in the worst case 433 significantly slow down data collection. For example, if the first 434 request in the queue takes a really long time while all other 435 requests are already finished, the queue will nevertheless remain 436 'full'. 437 438 :return: 439 """ 440 self.manage_requests() 441 442 # no results, no return 443 if queue_name not in self.queue: 444 return 445 446 # use list comprehensions here to avoid having to modify the 447 # lists while iterating through them 448 for url_metadata in [u for u in self.queue[queue_name]]: 449 # for each URL in the queue... 450 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 451 # see if a finished request is available... 452 self.queue[queue_name].remove(url_metadata) 453 yield url_metadata.url, url_metadata.proxied.result 454 455 elif preserve_order: 456 # ...but as soon as a URL has no finished result, return 457 # unless we don't care about the order, then continue and yield 458 # as much as possible 459 return 460 461 def _halt(self, queue_name="_"): 462 """ 463 Interrupt fetching of results 464 465 Can be used when 4CAT is interrupted. Clears queue and tries to cancel 466 running requests. 467 468 Note that running requests *cannot* always be cancelled via `.cancel()` 469 particularly when using `stream=True`. It is therefore recommended to 470 use `halt_and_wait()` which is blocking until all running requests have 471 properly terminated, instead of calling this method directly. 472 473 :param str queue_name: Queue name to stop fetching results for. By 474 default, halt all queues. 475 """ 476 self.halted.add(queue_name) 477 478 for queue in self.queue: 479 if queue_name == "_" or queue_name == queue: 480 # use a list comprehension here to avoid having to modify the 481 # list while iterating through it 482 for url_metadata in [u for u in self.queue[queue]]: 483 if url_metadata.status != self.REQUEST_STATUS_STARTED: 484 self.queue[queue].remove(url_metadata) 485 else: 486 url_metadata.proxied.request.cancel() 487 488 self.halted.remove(queue_name) 489 490 def halt_and_wait(self, queue_name="_"): 491 """ 492 Cancel any queued requests and wait until ongoing ones are finished 493 494 Blocking! 495 496 :param str queue_name: Queue name to stop fetching results for. By 497 default, halt all queues. 498 """ 499 self._halt(queue_name) 500 while self.get_queue_length(queue_name) > 0: 501 # exhaust generator without doing something w/ results 502 all(self.get_results(queue_name, preserve_order=False)) 503 504 if queue_name in self.queue: 505 del self.queue[queue_name]
12class FailedProxiedRequest: 13 """ 14 A delegated request that has failed for whatever reason 15 16 The failure context (usually the exception) is stored in the `context` 17 property. 18 """ 19 20 context = None 21 22 def __init__(self, context=None): 23 self.context = context
A delegated request that has failed for whatever reason
The failure context (usually the exception) is stored in the context
property.
26class ProxyStatus: 27 """ 28 An enum of possible statuses of a SophisticatedFuturesProxy 29 """ 30 31 AVAILABLE = 3 32 CLAIMED = 4 33 RUNNING = 5 34 COOLING_OFF = 6
An enum of possible statuses of a SophisticatedFuturesProxy
37class SophisticatedFuturesProxy: 38 """ 39 A proxy that can be used in combination with the DelegatedRequestHandler 40 41 This keeps track of cooloffs, etc, to ensure that any individual proxy does 42 not request more often than it should. This is a separate class because of 43 an additional piece of logic that allows this cooloff to be kept track of 44 on a per-hostname basis. This is useful because rate limits are typically 45 enforced per site, so we can have (figuratively) unlimited concurrent 46 request as long as each is on a separate hostname, but need to be more 47 careful when requesting from a single host. 48 """ 49 50 log = None 51 looping = True 52 53 COOLOFF = 0 54 MAX_CONCURRENT_OVERALL = 0 55 MAX_CONCURRENT_PER_HOST = 0 56 57 def __init__( 58 self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2 59 ): 60 self.proxy_url = url 61 self.hostnames = {} 62 self.log = log 63 64 self.COOLOFF = cooloff 65 self.MAX_CONCURRENT_OVERALL = concurrent_overall 66 self.MAX_CONCURRENT_PER_HOST = concurrent_host 67 68 def know_hostname(self, url): 69 """ 70 Make sure the hostname is known to this proxy 71 72 This means that we can now keep track of some per-hostname statistics 73 for this hostname. If the hostname is not known yet, the statistics are 74 re-initialised. 75 76 :param str url: URL with host name to keep stats for. Case-insensitive. 77 :param str: The host name, as parsed for internal use 78 """ 79 hostname = ural.get_hostname(url).lower() 80 if hostname not in self.hostnames: 81 self.hostnames[hostname] = namedtuple( 82 "HostnameForProxiedRequests", ("running",) 83 ) 84 self.hostnames[hostname].running = [] 85 86 return hostname 87 88 def release_cooled_off(self): 89 """ 90 Release proxies that have finished cooling off. 91 92 Proxies cool off for a certain amount of time after starting a request. 93 This method removes cooled off requests, so that new ones may fill 94 their slot. 95 """ 96 for hostname, metadata in self.hostnames.copy().items(): 97 for request in metadata.running: 98 if ( 99 request.status == ProxyStatus.COOLING_OFF 100 and request.timestamp_finished < time.time() - self.COOLOFF 101 ): 102 self.log.debug( 103 f"Releasing proxy {self.proxy_url} for host name {hostname}" 104 ) 105 self.hostnames[hostname].running.remove(request) 106 107 # get rid of hostnames with no running or cooling off 108 # requests, else this might grow indefinitely 109 if len(self.hostnames[hostname].running) == 0: 110 del self.hostnames[hostname] 111 112 def claim_for(self, url): 113 """ 114 Try claiming a slot in this proxy for the given URL 115 116 Whether a slot is available depends both on the overall concurrency 117 limit, and the per-hostname limit. If both are not maxed out, fill 118 the slot and return the proxy object. 119 120 :param str url: URL to proxy a request for. 121 :return: `False` if no proxy is available, or the 122 `SophisticatedFuturesProxy` object if one is. 123 """ 124 self.release_cooled_off() 125 hostname = self.know_hostname(url) 126 127 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 128 if total_running >= self.MAX_CONCURRENT_OVERALL: 129 return False 130 131 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 132 request = namedtuple( 133 "ProxiedRequest", 134 ("url", "status", "timestamp_started", "timestamp_finished"), 135 ) 136 request.url = url 137 request.status = ProxyStatus.CLAIMED 138 request.timestamp_started = 0 139 request.timestamp_finished = 0 140 self.hostnames[hostname].running.append(request) 141 self.log.debug( 142 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 143 ) 144 return self 145 else: 146 return False 147 148 def mark_request_started(self, url): 149 """ 150 Mark a request for a URL as started 151 152 This updates the status for the related slot. If no matching slot 153 exists that is waiting for a request to start running, a `ValueError` 154 is raised. 155 156 :param str url: URL of the proxied request. 157 """ 158 hostname = self.know_hostname(url) 159 160 for i, metadata in enumerate(self.hostnames[hostname].running): 161 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 162 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 163 self.hostnames[hostname].running[i].timestamp_started = time.time() 164 return 165 166 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!") 167 168 def mark_request_finished(self, url): 169 """ 170 Mark a request for a URL as finished 171 172 This updates the status for the related slot. If no matching slot 173 exists that is waiting for a request to finish, a `ValueError` is 174 raised. After this, the proxy will be marked as cooling off, and is 175 released after cooling off is completed. 176 177 :param str url: URL of the proxied request. 178 """ 179 hostname = self.know_hostname(url) 180 181 for i, metadata in enumerate(self.hostnames[hostname].running): 182 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 183 self.hostnames[hostname].running[i].timestamp_finished = time.time() 184 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 185 return 186 187 raise ValueError(f"No proxy is currently running a request for URL {url}!")
A proxy that can be used in combination with the DelegatedRequestHandler
This keeps track of cooloffs, etc, to ensure that any individual proxy does not request more often than it should. This is a separate class because of an additional piece of logic that allows this cooloff to be kept track of on a per-hostname basis. This is useful because rate limits are typically enforced per site, so we can have (figuratively) unlimited concurrent request as long as each is on a separate hostname, but need to be more careful when requesting from a single host.
68 def know_hostname(self, url): 69 """ 70 Make sure the hostname is known to this proxy 71 72 This means that we can now keep track of some per-hostname statistics 73 for this hostname. If the hostname is not known yet, the statistics are 74 re-initialised. 75 76 :param str url: URL with host name to keep stats for. Case-insensitive. 77 :param str: The host name, as parsed for internal use 78 """ 79 hostname = ural.get_hostname(url).lower() 80 if hostname not in self.hostnames: 81 self.hostnames[hostname] = namedtuple( 82 "HostnameForProxiedRequests", ("running",) 83 ) 84 self.hostnames[hostname].running = [] 85 86 return hostname
Make sure the hostname is known to this proxy
This means that we can now keep track of some per-hostname statistics for this hostname. If the hostname is not known yet, the statistics are re-initialised.
Parameters
- str url: URL with host name to keep stats for. Case-insensitive.
- str: The host name, as parsed for internal use
88 def release_cooled_off(self): 89 """ 90 Release proxies that have finished cooling off. 91 92 Proxies cool off for a certain amount of time after starting a request. 93 This method removes cooled off requests, so that new ones may fill 94 their slot. 95 """ 96 for hostname, metadata in self.hostnames.copy().items(): 97 for request in metadata.running: 98 if ( 99 request.status == ProxyStatus.COOLING_OFF 100 and request.timestamp_finished < time.time() - self.COOLOFF 101 ): 102 self.log.debug( 103 f"Releasing proxy {self.proxy_url} for host name {hostname}" 104 ) 105 self.hostnames[hostname].running.remove(request) 106 107 # get rid of hostnames with no running or cooling off 108 # requests, else this might grow indefinitely 109 if len(self.hostnames[hostname].running) == 0: 110 del self.hostnames[hostname]
Release proxies that have finished cooling off.
Proxies cool off for a certain amount of time after starting a request. This method removes cooled off requests, so that new ones may fill their slot.
112 def claim_for(self, url): 113 """ 114 Try claiming a slot in this proxy for the given URL 115 116 Whether a slot is available depends both on the overall concurrency 117 limit, and the per-hostname limit. If both are not maxed out, fill 118 the slot and return the proxy object. 119 120 :param str url: URL to proxy a request for. 121 :return: `False` if no proxy is available, or the 122 `SophisticatedFuturesProxy` object if one is. 123 """ 124 self.release_cooled_off() 125 hostname = self.know_hostname(url) 126 127 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 128 if total_running >= self.MAX_CONCURRENT_OVERALL: 129 return False 130 131 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 132 request = namedtuple( 133 "ProxiedRequest", 134 ("url", "status", "timestamp_started", "timestamp_finished"), 135 ) 136 request.url = url 137 request.status = ProxyStatus.CLAIMED 138 request.timestamp_started = 0 139 request.timestamp_finished = 0 140 self.hostnames[hostname].running.append(request) 141 self.log.debug( 142 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 143 ) 144 return self 145 else: 146 return False
Try claiming a slot in this proxy for the given URL
Whether a slot is available depends both on the overall concurrency limit, and the per-hostname limit. If both are not maxed out, fill the slot and return the proxy object.
Parameters
- str url: URL to proxy a request for.
Returns
False
if no proxy is available, or theSophisticatedFuturesProxy
object if one is.
148 def mark_request_started(self, url): 149 """ 150 Mark a request for a URL as started 151 152 This updates the status for the related slot. If no matching slot 153 exists that is waiting for a request to start running, a `ValueError` 154 is raised. 155 156 :param str url: URL of the proxied request. 157 """ 158 hostname = self.know_hostname(url) 159 160 for i, metadata in enumerate(self.hostnames[hostname].running): 161 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 162 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 163 self.hostnames[hostname].running[i].timestamp_started = time.time() 164 return 165 166 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
Mark a request for a URL as started
This updates the status for the related slot. If no matching slot
exists that is waiting for a request to start running, a ValueError
is raised.
Parameters
- str url: URL of the proxied request.
168 def mark_request_finished(self, url): 169 """ 170 Mark a request for a URL as finished 171 172 This updates the status for the related slot. If no matching slot 173 exists that is waiting for a request to finish, a `ValueError` is 174 raised. After this, the proxy will be marked as cooling off, and is 175 released after cooling off is completed. 176 177 :param str url: URL of the proxied request. 178 """ 179 hostname = self.know_hostname(url) 180 181 for i, metadata in enumerate(self.hostnames[hostname].running): 182 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 183 self.hostnames[hostname].running[i].timestamp_finished = time.time() 184 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 185 return 186 187 raise ValueError(f"No proxy is currently running a request for URL {url}!")
Mark a request for a URL as finished
This updates the status for the related slot. If no matching slot
exists that is waiting for a request to finish, a ValueError
is
raised. After this, the proxy will be marked as cooling off, and is
released after cooling off is completed.
Parameters
- str url: URL of the proxied request.
190class DelegatedRequestHandler: 191 queue = {} 192 session = None 193 proxy_pool = {} 194 proxy_settings = {} 195 halted = set() 196 log = None 197 index = 0 198 199 # some magic values 200 REQUEST_STATUS_QUEUED = 0 201 REQUEST_STATUS_STARTED = 1 202 REQUEST_STATUS_WAITING_FOR_YIELD = 2 203 PROXY_LOCALHOST = "__localhost__" 204 205 def __init__(self, log, config): 206 pool = ThreadPoolExecutor() 207 self.session = FuturesSession(executor=pool) 208 self.log = log 209 210 self.refresh_settings(config) 211 212 def refresh_settings(self, config): 213 """ 214 Load proxy settings 215 216 This is done on demand, so that we do not need a persistent 217 configuration reader, which could make things complicated in 218 thread-based contexts. 219 220 :param config: Configuration reader 221 """ 222 223 self.proxy_settings = { 224 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 225 "proxies.concurrent-host", "proxies.concurrent-host") 226 } 227 228 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 229 """ 230 Add URLs to the request queue 231 232 :param urls: An iterable of URLs. 233 :param queue_name: Queue name to add to. 234 :param position: Where in queue to insert; -1 adds to end of queue 235 :param kwargs: Other keyword arguments will be passed on to 236 `requests.get()` 237 """ 238 if queue_name in self.halted or "_" in self.halted: 239 # do not add URLs while delegator is shutting down 240 return 241 242 if queue_name not in self.queue: 243 self.queue[queue_name] = [] 244 245 for i, url in enumerate(urls): 246 url_metadata = namedtuple( 247 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 248 ) 249 url_metadata.url = url 250 url_metadata.kwargs = kwargs 251 url_metadata.index = self.index 252 url_metadata.proxied = None 253 url_metadata.status = self.REQUEST_STATUS_QUEUED 254 self.index += 1 255 256 if position == -1: 257 self.queue[queue_name].append(url_metadata) 258 else: 259 self.queue[queue_name].insert(position + i, url_metadata) 260 261 self.manage_requests() 262 263 def get_queue_length(self, queue_name="_"): 264 """ 265 Get the length, of the queue 266 267 :param str queue_name: Queue name 268 :return int: Amount of URLs in the queue (regardless of status) 269 """ 270 queue_length = 0 271 for queue in self.queue: 272 if queue == queue_name or queue_name == "_": 273 queue_length += len(self.queue[queue_name]) 274 275 return queue_length 276 277 def claim_proxy(self, url): 278 """ 279 Find a proxy to do the request with 280 281 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 282 283 :param str url: URL to proxy a request for 284 :return SophisticatedFuturesProxy or False: 285 """ 286 if not self.proxy_pool: 287 # this will trigger the first time this method is called 288 # build a proxy pool with some information per available proxy 289 proxies = self.proxy_settings["proxies.urls"] 290 for proxy_url in proxies: 291 self.proxy_pool[proxy_url] = namedtuple( 292 "ProxyEntry", ("proxy", "last_used") 293 ) 294 self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy( 295 proxy_url, 296 self.log, 297 self.proxy_settings["proxies.cooloff"], 298 self.proxy_settings["proxies.concurrent-overall"], 299 self.proxy_settings["proxies.concurrent-host"], 300 ) 301 self.proxy_pool[proxy_url].last_used = 0 302 303 self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.") 304 305 # within the pool, find the least recently used proxy that is available 306 sorted_by_cooloff = sorted( 307 self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used 308 ) 309 for proxy_id in sorted_by_cooloff: 310 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 311 if claimed_proxy: 312 self.proxy_pool[proxy_id].last_used = time.time() 313 return claimed_proxy 314 315 return False 316 317 def manage_requests(self): 318 """ 319 Manage requests asynchronously 320 321 First, make sure proxy status is up to date; then go through the list 322 of queued URLs and see if they have been requested, and release the 323 proxy accordingly. If the URL is not being requested, and a proxy is 324 available, start the request. 325 326 Note that this method does *not* return any requested data. This is 327 done in a separate function, which calls this one before returning any 328 finished requests in the original queue order (`get_results()`). 329 """ 330 # go through queue and look at the status of each URL 331 for queue_name in self.queue: 332 for i, url_metadata in enumerate(self.queue[queue_name]): 333 url = url_metadata.url 334 335 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 336 # waiting to be flushed or passed by `get_result()` 337 continue 338 339 if url_metadata.proxied and url_metadata.proxied.request.done(): 340 # collect result and buffer it for yielding 341 # done() here doesn't necessarily mean the request finished 342 # successfully, just that it has returned - a timed out 343 # request will also be done()! 344 self.log.debug(f"Request for {url} finished, collecting result") 345 url_metadata.proxied.proxy.mark_request_finished(url) 346 try: 347 response = url_metadata.proxied.request.result() 348 url_metadata.proxied.result = response 349 350 except ( 351 ConnectionError, 352 requests.exceptions.RequestException, 353 urllib3.exceptions.HTTPError, 354 ) as e: 355 # this is where timeouts, etc, go 356 url_metadata.proxied.result = FailedProxiedRequest(e) 357 358 finally: 359 # success or fail, we can pass it on 360 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 361 362 else: 363 # running - ignore for now 364 # could do some health checks here... 365 # logging.debug(f"Request for {url} running...") 366 pass 367 368 if not url_metadata.proxied and not ( 369 queue_name in self.halted or "_" in self.halted 370 ): 371 # no request running for this URL yet, try to start one 372 proxy = self.claim_proxy(url) 373 if not proxy: 374 # no available proxies, try again next loop 375 continue 376 377 proxy_url = proxy.proxy_url 378 proxy_definition = ( 379 {"http": proxy_url, "https": proxy_url} 380 if proxy_url != self.PROXY_LOCALHOST 381 else None 382 ) 383 384 # start request for URL 385 self.log.debug(f"Request for {url} started") 386 request = namedtuple( 387 "DelegatedRequest", 388 ( 389 "request", 390 "created", 391 "result", 392 "proxy", 393 "url", 394 "index", 395 ), 396 ) 397 request.created = time.time() 398 request.request = self.session.get( 399 **{ 400 "url": url, 401 "timeout": 30, 402 "proxies": proxy_definition, 403 **url_metadata.kwargs 404 } 405 ) 406 407 request.proxy = proxy 408 request.url = url 409 request.index = ( 410 url_metadata.index 411 ) # this is to allow for multiple requests for the same URL 412 413 url_metadata.status = self.REQUEST_STATUS_STARTED 414 url_metadata.proxied = request 415 416 proxy.mark_request_started(url) 417 418 self.queue[queue_name][i] = url_metadata 419 420 def get_results(self, queue_name="_", preserve_order=True): 421 """ 422 Return available results, without skipping 423 424 Loops through the queue, returning values (and updating the queue) for 425 requests that have been finished. If a request is not finished yet, 426 stop returning. This ensures that in the end, values are only ever 427 returned in the original queue order, at the cost of potential 428 buffering. 429 430 :param str queue_name: Queue name to get results from 431 :param bool preserve_order: Return results in the order they were 432 added to the queue. This means that other results are buffered and 433 potentially remain in the queue, which may in the worst case 434 significantly slow down data collection. For example, if the first 435 request in the queue takes a really long time while all other 436 requests are already finished, the queue will nevertheless remain 437 'full'. 438 439 :return: 440 """ 441 self.manage_requests() 442 443 # no results, no return 444 if queue_name not in self.queue: 445 return 446 447 # use list comprehensions here to avoid having to modify the 448 # lists while iterating through them 449 for url_metadata in [u for u in self.queue[queue_name]]: 450 # for each URL in the queue... 451 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 452 # see if a finished request is available... 453 self.queue[queue_name].remove(url_metadata) 454 yield url_metadata.url, url_metadata.proxied.result 455 456 elif preserve_order: 457 # ...but as soon as a URL has no finished result, return 458 # unless we don't care about the order, then continue and yield 459 # as much as possible 460 return 461 462 def _halt(self, queue_name="_"): 463 """ 464 Interrupt fetching of results 465 466 Can be used when 4CAT is interrupted. Clears queue and tries to cancel 467 running requests. 468 469 Note that running requests *cannot* always be cancelled via `.cancel()` 470 particularly when using `stream=True`. It is therefore recommended to 471 use `halt_and_wait()` which is blocking until all running requests have 472 properly terminated, instead of calling this method directly. 473 474 :param str queue_name: Queue name to stop fetching results for. By 475 default, halt all queues. 476 """ 477 self.halted.add(queue_name) 478 479 for queue in self.queue: 480 if queue_name == "_" or queue_name == queue: 481 # use a list comprehension here to avoid having to modify the 482 # list while iterating through it 483 for url_metadata in [u for u in self.queue[queue]]: 484 if url_metadata.status != self.REQUEST_STATUS_STARTED: 485 self.queue[queue].remove(url_metadata) 486 else: 487 url_metadata.proxied.request.cancel() 488 489 self.halted.remove(queue_name) 490 491 def halt_and_wait(self, queue_name="_"): 492 """ 493 Cancel any queued requests and wait until ongoing ones are finished 494 495 Blocking! 496 497 :param str queue_name: Queue name to stop fetching results for. By 498 default, halt all queues. 499 """ 500 self._halt(queue_name) 501 while self.get_queue_length(queue_name) > 0: 502 # exhaust generator without doing something w/ results 503 all(self.get_results(queue_name, preserve_order=False)) 504 505 if queue_name in self.queue: 506 del self.queue[queue_name]
212 def refresh_settings(self, config): 213 """ 214 Load proxy settings 215 216 This is done on demand, so that we do not need a persistent 217 configuration reader, which could make things complicated in 218 thread-based contexts. 219 220 :param config: Configuration reader 221 """ 222 223 self.proxy_settings = { 224 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 225 "proxies.concurrent-host", "proxies.concurrent-host") 226 }
Load proxy settings
This is done on demand, so that we do not need a persistent configuration reader, which could make things complicated in thread-based contexts.
Parameters
- config: Configuration reader
228 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 229 """ 230 Add URLs to the request queue 231 232 :param urls: An iterable of URLs. 233 :param queue_name: Queue name to add to. 234 :param position: Where in queue to insert; -1 adds to end of queue 235 :param kwargs: Other keyword arguments will be passed on to 236 `requests.get()` 237 """ 238 if queue_name in self.halted or "_" in self.halted: 239 # do not add URLs while delegator is shutting down 240 return 241 242 if queue_name not in self.queue: 243 self.queue[queue_name] = [] 244 245 for i, url in enumerate(urls): 246 url_metadata = namedtuple( 247 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 248 ) 249 url_metadata.url = url 250 url_metadata.kwargs = kwargs 251 url_metadata.index = self.index 252 url_metadata.proxied = None 253 url_metadata.status = self.REQUEST_STATUS_QUEUED 254 self.index += 1 255 256 if position == -1: 257 self.queue[queue_name].append(url_metadata) 258 else: 259 self.queue[queue_name].insert(position + i, url_metadata) 260 261 self.manage_requests()
Add URLs to the request queue
Parameters
- urls: An iterable of URLs.
- queue_name: Queue name to add to.
- position: Where in queue to insert; -1 adds to end of queue
- kwargs: Other keyword arguments will be passed on to
requests.get()
263 def get_queue_length(self, queue_name="_"): 264 """ 265 Get the length, of the queue 266 267 :param str queue_name: Queue name 268 :return int: Amount of URLs in the queue (regardless of status) 269 """ 270 queue_length = 0 271 for queue in self.queue: 272 if queue == queue_name or queue_name == "_": 273 queue_length += len(self.queue[queue_name]) 274 275 return queue_length
Get the length, of the queue
Parameters
- str queue_name: Queue name
Returns
Amount of URLs in the queue (regardless of status)
277 def claim_proxy(self, url): 278 """ 279 Find a proxy to do the request with 280 281 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 282 283 :param str url: URL to proxy a request for 284 :return SophisticatedFuturesProxy or False: 285 """ 286 if not self.proxy_pool: 287 # this will trigger the first time this method is called 288 # build a proxy pool with some information per available proxy 289 proxies = self.proxy_settings["proxies.urls"] 290 for proxy_url in proxies: 291 self.proxy_pool[proxy_url] = namedtuple( 292 "ProxyEntry", ("proxy", "last_used") 293 ) 294 self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy( 295 proxy_url, 296 self.log, 297 self.proxy_settings["proxies.cooloff"], 298 self.proxy_settings["proxies.concurrent-overall"], 299 self.proxy_settings["proxies.concurrent-host"], 300 ) 301 self.proxy_pool[proxy_url].last_used = 0 302 303 self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.") 304 305 # within the pool, find the least recently used proxy that is available 306 sorted_by_cooloff = sorted( 307 self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used 308 ) 309 for proxy_id in sorted_by_cooloff: 310 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 311 if claimed_proxy: 312 self.proxy_pool[proxy_id].last_used = time.time() 313 return claimed_proxy 314 315 return False
Find a proxy to do the request with
Finds a SophisticatedFuturesProxy
that has an open slot for this URL.
Parameters
- str url: URL to proxy a request for
Returns
317 def manage_requests(self): 318 """ 319 Manage requests asynchronously 320 321 First, make sure proxy status is up to date; then go through the list 322 of queued URLs and see if they have been requested, and release the 323 proxy accordingly. If the URL is not being requested, and a proxy is 324 available, start the request. 325 326 Note that this method does *not* return any requested data. This is 327 done in a separate function, which calls this one before returning any 328 finished requests in the original queue order (`get_results()`). 329 """ 330 # go through queue and look at the status of each URL 331 for queue_name in self.queue: 332 for i, url_metadata in enumerate(self.queue[queue_name]): 333 url = url_metadata.url 334 335 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 336 # waiting to be flushed or passed by `get_result()` 337 continue 338 339 if url_metadata.proxied and url_metadata.proxied.request.done(): 340 # collect result and buffer it for yielding 341 # done() here doesn't necessarily mean the request finished 342 # successfully, just that it has returned - a timed out 343 # request will also be done()! 344 self.log.debug(f"Request for {url} finished, collecting result") 345 url_metadata.proxied.proxy.mark_request_finished(url) 346 try: 347 response = url_metadata.proxied.request.result() 348 url_metadata.proxied.result = response 349 350 except ( 351 ConnectionError, 352 requests.exceptions.RequestException, 353 urllib3.exceptions.HTTPError, 354 ) as e: 355 # this is where timeouts, etc, go 356 url_metadata.proxied.result = FailedProxiedRequest(e) 357 358 finally: 359 # success or fail, we can pass it on 360 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 361 362 else: 363 # running - ignore for now 364 # could do some health checks here... 365 # logging.debug(f"Request for {url} running...") 366 pass 367 368 if not url_metadata.proxied and not ( 369 queue_name in self.halted or "_" in self.halted 370 ): 371 # no request running for this URL yet, try to start one 372 proxy = self.claim_proxy(url) 373 if not proxy: 374 # no available proxies, try again next loop 375 continue 376 377 proxy_url = proxy.proxy_url 378 proxy_definition = ( 379 {"http": proxy_url, "https": proxy_url} 380 if proxy_url != self.PROXY_LOCALHOST 381 else None 382 ) 383 384 # start request for URL 385 self.log.debug(f"Request for {url} started") 386 request = namedtuple( 387 "DelegatedRequest", 388 ( 389 "request", 390 "created", 391 "result", 392 "proxy", 393 "url", 394 "index", 395 ), 396 ) 397 request.created = time.time() 398 request.request = self.session.get( 399 **{ 400 "url": url, 401 "timeout": 30, 402 "proxies": proxy_definition, 403 **url_metadata.kwargs 404 } 405 ) 406 407 request.proxy = proxy 408 request.url = url 409 request.index = ( 410 url_metadata.index 411 ) # this is to allow for multiple requests for the same URL 412 413 url_metadata.status = self.REQUEST_STATUS_STARTED 414 url_metadata.proxied = request 415 416 proxy.mark_request_started(url) 417 418 self.queue[queue_name][i] = url_metadata
Manage requests asynchronously
First, make sure proxy status is up to date; then go through the list of queued URLs and see if they have been requested, and release the proxy accordingly. If the URL is not being requested, and a proxy is available, start the request.
Note that this method does not return any requested data. This is
done in a separate function, which calls this one before returning any
finished requests in the original queue order (get_results()
).
420 def get_results(self, queue_name="_", preserve_order=True): 421 """ 422 Return available results, without skipping 423 424 Loops through the queue, returning values (and updating the queue) for 425 requests that have been finished. If a request is not finished yet, 426 stop returning. This ensures that in the end, values are only ever 427 returned in the original queue order, at the cost of potential 428 buffering. 429 430 :param str queue_name: Queue name to get results from 431 :param bool preserve_order: Return results in the order they were 432 added to the queue. This means that other results are buffered and 433 potentially remain in the queue, which may in the worst case 434 significantly slow down data collection. For example, if the first 435 request in the queue takes a really long time while all other 436 requests are already finished, the queue will nevertheless remain 437 'full'. 438 439 :return: 440 """ 441 self.manage_requests() 442 443 # no results, no return 444 if queue_name not in self.queue: 445 return 446 447 # use list comprehensions here to avoid having to modify the 448 # lists while iterating through them 449 for url_metadata in [u for u in self.queue[queue_name]]: 450 # for each URL in the queue... 451 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 452 # see if a finished request is available... 453 self.queue[queue_name].remove(url_metadata) 454 yield url_metadata.url, url_metadata.proxied.result 455 456 elif preserve_order: 457 # ...but as soon as a URL has no finished result, return 458 # unless we don't care about the order, then continue and yield 459 # as much as possible 460 return
Return available results, without skipping
Loops through the queue, returning values (and updating the queue) for requests that have been finished. If a request is not finished yet, stop returning. This ensures that in the end, values are only ever returned in the original queue order, at the cost of potential buffering.
Parameters
- str queue_name: Queue name to get results from
- bool preserve_order: Return results in the order they were added to the queue. This means that other results are buffered and potentially remain in the queue, which may in the worst case significantly slow down data collection. For example, if the first request in the queue takes a really long time while all other requests are already finished, the queue will nevertheless remain 'full'.
Returns
491 def halt_and_wait(self, queue_name="_"): 492 """ 493 Cancel any queued requests and wait until ongoing ones are finished 494 495 Blocking! 496 497 :param str queue_name: Queue name to stop fetching results for. By 498 default, halt all queues. 499 """ 500 self._halt(queue_name) 501 while self.get_queue_length(queue_name) > 0: 502 # exhaust generator without doing something w/ results 503 all(self.get_results(queue_name, preserve_order=False)) 504 505 if queue_name in self.queue: 506 del self.queue[queue_name]
Cancel any queued requests and wait until ongoing ones are finished
Blocking!
Parameters
- str queue_name: Queue name to stop fetching results for. By default, halt all queues.