backend.lib.proxied_requests
1from requests_futures.sessions import FuturesSession 2from concurrent.futures import ThreadPoolExecutor 3 4import time 5import urllib3 6import ural 7import requests 8 9from collections import namedtuple 10from asyncio import CancelledError as asyncioCancelledError 11from concurrent.futures import CancelledError as futureCancelledError 12 13class FailedProxiedRequest: 14 """ 15 A delegated request that has failed for whatever reason 16 17 The failure context (usually the exception) is stored in the `context` 18 property. We also keep track of the proxy URL that serviced the request so 19 downstream consumers can make informed retry decisions. 20 """ 21 22 context = None 23 proxy_url = None 24 25 def __init__(self, context=None, proxy_url=None): 26 self.context = context 27 self.proxy_url = proxy_url 28 29 30class NoProxiesAvailableError(Exception): 31 """ 32 Raised when all proxies are unhealthy and localhost fallback is disabled 33 """ 34 pass 35 36 37class ProxyStatus: 38 """ 39 An enum of possible statuses of a SophisticatedFuturesProxy 40 """ 41 42 AVAILABLE = 3 43 CLAIMED = 4 44 RUNNING = 5 45 COOLING_OFF = 6 46 47 48class SophisticatedFuturesProxy: 49 """ 50 A proxy that can be used in combination with the DelegatedRequestHandler 51 52 This keeps track of cooloffs, etc, to ensure that any individual proxy does 53 not request more often than it should. This is a separate class because of 54 an additional piece of logic that allows this cooloff to be kept track of 55 on a per-hostname basis. This is useful because rate limits are typically 56 enforced per site, so we can have (figuratively) unlimited concurrent 57 request as long as each is on a separate hostname, but need to be more 58 careful when requesting from a single host. 59 """ 60 61 log = None 62 looping = True 63 64 COOLOFF = 0 65 MAX_CONCURRENT_OVERALL = 0 66 MAX_CONCURRENT_PER_HOST = 0 67 68 def __init__( 69 self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2 70 ): 71 self.proxy_url = url 72 self.hostnames = {} 73 self.log = log 74 75 self.COOLOFF = cooloff 76 self.MAX_CONCURRENT_OVERALL = concurrent_overall 77 self.MAX_CONCURRENT_PER_HOST = concurrent_host 78 79 def know_hostname(self, url): 80 """ 81 Make sure the hostname is known to this proxy 82 83 This means that we can now keep track of some per-hostname statistics 84 for this hostname. If the hostname is not known yet, the statistics are 85 re-initialised. 86 87 :param str url: URL with host name to keep stats for. Case-insensitive. 88 :param str: The host name, as parsed for internal use 89 """ 90 hostname = ural.get_hostname(url).lower() 91 if hostname not in self.hostnames: 92 self.hostnames[hostname] = namedtuple( 93 "HostnameForProxiedRequests", ("running",) 94 ) 95 self.hostnames[hostname].running = [] 96 97 return hostname 98 99 def has_active_requests(self): 100 """ 101 Check if this proxy has any truly active requests. 102 103 Active means CLAIMED or RUNNING status - not COOLING_OFF. 104 COOLING_OFF requests are essentially finished and waiting to be released, 105 so they shouldn't prevent proxy removal. 106 107 :return bool: True if proxy has requests that are claimed or running 108 """ 109 for hostname, metadata in self.hostnames.values(): 110 for request in metadata.running: 111 if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING): 112 return True 113 return False 114 115 def release_cooled_off(self): 116 """ 117 Release proxies that have finished cooling off. 118 119 Proxies cool off for a certain amount of time after starting a request. 120 This method removes cooled off requests, so that new ones may fill 121 their slot. 122 """ 123 for hostname, metadata in self.hostnames.copy().items(): 124 for request in metadata.running: 125 if ( 126 request.status == ProxyStatus.COOLING_OFF 127 and request.timestamp_finished < time.time() - self.COOLOFF 128 ): 129 self.log.debug( 130 f"Releasing proxy {self.proxy_url} for host name {hostname}" 131 ) 132 self.hostnames[hostname].running.remove(request) 133 134 # get rid of hostnames with no running or cooling off 135 # requests, else this might grow indefinitely 136 if len(self.hostnames[hostname].running) == 0: 137 del self.hostnames[hostname] 138 139 def claim_for(self, url): 140 """ 141 Try claiming a slot in this proxy for the given URL 142 143 Whether a slot is available depends both on the overall concurrency 144 limit, and the per-hostname limit. If both are not maxed out, fill 145 the slot and return the proxy object. 146 147 :param str url: URL to proxy a request for. 148 :return: `False` if no proxy is available, or the 149 `SophisticatedFuturesProxy` object if one is. 150 """ 151 self.release_cooled_off() 152 hostname = self.know_hostname(url) 153 154 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 155 if total_running >= self.MAX_CONCURRENT_OVERALL: 156 return False 157 158 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 159 request = namedtuple( 160 "ProxiedRequest", 161 ("url", "status", "timestamp_started", "timestamp_finished"), 162 ) 163 request.url = url 164 request.status = ProxyStatus.CLAIMED 165 request.timestamp_started = 0 166 request.timestamp_finished = 0 167 self.hostnames[hostname].running.append(request) 168 self.log.debug( 169 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 170 ) 171 return self 172 else: 173 return False 174 175 def mark_request_started(self, url): 176 """ 177 Mark a request for a URL as started 178 179 This updates the status for the related slot. If no matching slot 180 exists that is waiting for a request to start running, a `ValueError` 181 is raised. 182 183 :param str url: URL of the proxied request. 184 """ 185 hostname = self.know_hostname(url) 186 187 for i, metadata in enumerate(self.hostnames[hostname].running): 188 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 189 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 190 self.hostnames[hostname].running[i].timestamp_started = time.time() 191 return 192 193 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!") 194 195 def mark_request_finished(self, url): 196 """ 197 Mark a request for a URL as finished 198 199 This updates the status for the related slot. If no matching slot 200 exists that is waiting for a request to finish, a `ValueError` is 201 raised. After this, the proxy will be marked as cooling off, and is 202 released after cooling off is completed. 203 204 :param str url: URL of the proxied request. 205 """ 206 hostname = self.know_hostname(url) 207 208 for i, metadata in enumerate(self.hostnames[hostname].running): 209 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 210 self.hostnames[hostname].running[i].timestamp_finished = time.time() 211 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 212 return 213 214 raise ValueError(f"No proxy is currently running a request for URL {url}!") 215 216 217class DelegatedRequestHandler: 218 queue = {} 219 session = None 220 proxy_pool = {} 221 proxy_settings = {} 222 halted = set() 223 log = None 224 index = 0 225 226 # some magic values 227 REQUEST_STATUS_QUEUED = 0 228 REQUEST_STATUS_STARTED = 1 229 REQUEST_STATUS_WAITING_FOR_YIELD = 2 230 PROXY_LOCALHOST = "__localhost__" 231 232 def __init__(self, log, config): 233 pool = ThreadPoolExecutor() 234 self.session = FuturesSession(executor=pool) 235 self.log = log 236 237 # Proxy health tracking 238 self.proxy_health = {} 239 self.proxy_warnings_logged = set() 240 self.all_proxies_failed_logged = False 241 242 # Track proxies that should be removed but have active requests 243 self.proxies_pending_removal = set() 244 245 self.refresh_settings(config) 246 247 def refresh_settings(self, config): 248 """ 249 Load proxy settings 250 251 This is done on demand, so that we do not need a persistent 252 configuration reader, which could make things complicated in 253 thread-based contexts. 254 255 Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests. 256 257 :param config: Configuration reader 258 """ 259 # Load new settings 260 new_proxy_settings = { 261 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 262 "proxies.concurrent-host", "proxies.allow-localhost-fallback") 263 } 264 265 # Normalize empty config to localhost 266 if not new_proxy_settings["proxies.urls"]: 267 self.log.warning( 268 "No proxies configured (proxies.urls is empty or None). " 269 "Using localhost (direct connection) for all requests. " 270 "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning." 271 ) 272 new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST] 273 274 # Check if proxy URLs have changed 275 proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 276 self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"]) 277 278 # Update settings 279 self.proxy_settings = new_proxy_settings 280 281 # Reset proxy health tracking when settings are refreshed 282 num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]]) 283 self.proxy_health = {} 284 self.proxy_warnings_logged = set() 285 self.all_proxies_failed_logged = False 286 287 if num_proxies_restored > 0: 288 self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored") 289 290 # Initialize or update proxy pool 291 if not self.proxy_pool: 292 # First initialization 293 self._initialize_proxy_pool() 294 elif proxy_urls_changed: 295 # Settings changed - update pool 296 self._update_proxy_pool() 297 298 299 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 300 """ 301 Add URLs to the request queue 302 303 :param urls: An iterable of URLs. 304 :param queue_name: Queue name to add to. 305 :param position: Where in queue to insert; -1 adds to end of queue 306 :param kwargs: Other keyword arguments will be passed on to 307 `requests.get()` 308 """ 309 if queue_name in self.halted or "_" in self.halted: 310 # do not add URLs while delegator is shutting down 311 return 312 313 if queue_name not in self.queue: 314 self.queue[queue_name] = [] 315 316 for i, url in enumerate(urls): 317 url_metadata = namedtuple( 318 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 319 ) 320 url_metadata.url = url 321 # Make a per-URL copy of kwargs to avoid shared mutation across entries 322 per_kwargs = {**kwargs} if kwargs else {} 323 url_metadata.index = self.index 324 url_metadata.proxied = None 325 url_metadata.status = self.REQUEST_STATUS_QUEUED 326 self.index += 1 327 328 # If a response hook is provided, wrap it to inject the original URL 329 try: 330 hooks = per_kwargs.get("hooks") 331 if hooks and "response" in hooks: 332 # Copy hooks dict to avoid shared mutation 333 hooks = {**hooks} 334 original_hook = hooks["response"] 335 # Support a single callable or a list of callables 336 def wrap(h, original_url): 337 def _wrapped(resp, *a, **k): 338 # Inject FourCAT-specific original URL in kwargs once 339 if "fourcat_original_url" not in k: 340 k["fourcat_original_url"] = original_url 341 return h(resp, *a, **k) 342 return _wrapped 343 344 if isinstance(original_hook, list): 345 hooks["response"] = [wrap(h, url) for h in original_hook] 346 else: 347 hooks["response"] = wrap(original_hook, url) 348 349 # Persist modified hooks back into per-URL kwargs 350 per_kwargs["hooks"] = hooks 351 except Exception: 352 # If wrapping fails for any reason, proceed without modification 353 pass 354 355 # Assign the isolated kwargs to this metadata entry 356 url_metadata.kwargs = per_kwargs 357 358 if position == -1: 359 self.queue[queue_name].append(url_metadata) 360 else: 361 self.queue[queue_name].insert(position + i, url_metadata) 362 363 self.manage_requests() 364 365 def get_queue_length(self, queue_name="_"): 366 """ 367 Get the length, of the queue 368 369 :param str queue_name: Queue name 370 :return int: Amount of URLs in the queue (regardless of status) 371 """ 372 queue_length = 0 373 for queue in self.queue: 374 if queue == queue_name or queue_name == "_": 375 queue_length += len(self.queue[queue_name]) 376 377 return queue_length 378 379 def _update_proxy_pool(self): 380 """ 381 Update proxy pool when settings change 382 383 Adds new proxies, removes old ones, and preserves proxies with active requests. 384 Updates settings for existing proxies that remain in the configuration. 385 """ 386 new_proxy_urls = set(self.proxy_settings["proxies.urls"]) 387 current_proxy_urls = set(self.proxy_pool.keys()) 388 389 # Find proxies to add and remove 390 proxies_to_add = new_proxy_urls - current_proxy_urls 391 proxies_to_remove = current_proxy_urls - new_proxy_urls 392 proxies_to_keep = current_proxy_urls & new_proxy_urls 393 394 # Remove proxies that are no longer in config 395 for proxy_url in proxies_to_remove: 396 if proxy_url in self.proxy_pool: 397 proxy_obj = self.proxy_pool[proxy_url].proxy 398 # Check if proxy has truly active requests (CLAIMED or RUNNING, not COOLING_OFF) 399 if not proxy_obj.has_active_requests(): 400 # No active requests - remove immediately 401 del self.proxy_pool[proxy_url] 402 if proxy_url in self.proxy_health: 403 del self.proxy_health[proxy_url] 404 self.proxies_pending_removal.discard(proxy_url) 405 self.log.info(f"Removed proxy {proxy_url} (no longer in config)") 406 else: 407 # Has active requests - mark for removal but don't delete yet 408 self.proxies_pending_removal.add(proxy_url) 409 self.log.warning(f"Proxy {proxy_url} marked for removal (has active requests, will not accept new requests)") 410 411 # Update settings for existing proxies 412 for proxy_url in proxies_to_keep: 413 if proxy_url in self.proxy_pool: 414 proxy_obj = self.proxy_pool[proxy_url].proxy 415 proxy_obj.COOLOFF = self.proxy_settings["proxies.cooloff"] 416 proxy_obj.MAX_CONCURRENT_OVERALL = self.proxy_settings["proxies.concurrent-overall"] 417 proxy_obj.MAX_CONCURRENT_PER_HOST = self.proxy_settings["proxies.concurrent-host"] 418 419 # Add new proxies 420 for proxy_url in proxies_to_add: 421 self._add_proxy_to_pool(proxy_url) 422 self.log.info(f"Added new proxy {proxy_url}") 423 424 if proxies_to_add or proxies_to_remove: 425 self.log.info(f"Proxy pool updated: {len(self.proxy_pool)} total proxies " 426 f"({len(proxies_to_add)} added, {len([p for p in proxies_to_remove if p not in self.proxy_pool])} removed)") 427 428 def _add_proxy_to_pool(self, proxy_url): 429 """ 430 Add a proxy to the pool 431 432 Helper method to avoid duplicating proxy creation logic. 433 434 :param str proxy_url: The proxy URL to add (or PROXY_LOCALHOST for direct connection) 435 """ 436 self.proxy_pool[proxy_url] = namedtuple("ProxyEntry", ("proxy", "last_used")) 437 self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy( 438 proxy_url, 439 self.log, 440 self.proxy_settings.get("proxies.cooloff", 0.1), 441 self.proxy_settings.get("proxies.concurrent-overall", 5), 442 self.proxy_settings.get("proxies.concurrent-host", 2), 443 ) 444 self.proxy_pool[proxy_url].last_used = 0 445 self.proxy_health[proxy_url] = True 446 447 def claim_proxy(self, url): 448 """ 449 Find a proxy to do the request with 450 451 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 452 453 :param str url: URL to proxy a request for 454 :return SophisticatedFuturesProxy or None: 455 - SophisticatedFuturesProxy if a proxy is available 456 - None if all proxies are busy (retry later) 457 :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled 458 """ 459 # Get healthy proxies (excluding those pending removal) 460 healthy_proxies = [p for p in self.proxy_pool 461 if self.proxy_health.get(p, True) 462 and p not in self.proxies_pending_removal] 463 464 # If no healthy proxies, check if we should create localhost 465 if not healthy_proxies: 466 allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True) 467 468 if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool: 469 # Create localhost as fallback 470 if not self.all_proxies_failed_logged: 471 self.all_proxies_failed_logged = True 472 self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).") 473 self._add_proxy_to_pool(self.PROXY_LOCALHOST) 474 self.log.info("Created localhost proxy for direct connections") 475 healthy_proxies = [self.PROXY_LOCALHOST] 476 elif not allow_localhost: 477 # Fallback disabled - fail 478 if not self.all_proxies_failed_logged: 479 self.all_proxies_failed_logged = True 480 self.log.error("All proxies are unhealthy and localhost fallback is disabled.") 481 raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled") 482 else: 483 # Localhost exists but unhealthy 484 raise NoProxiesAvailableError("All proxies including localhost are unhealthy") 485 486 # within the pool, find the least recently used healthy proxy that is available 487 sorted_by_cooloff = sorted( 488 healthy_proxies, key=lambda p: self.proxy_pool[p].last_used 489 ) 490 for proxy_id in sorted_by_cooloff: 491 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 492 if claimed_proxy: 493 self.proxy_pool[proxy_id].last_used = time.time() 494 return claimed_proxy 495 496 # All proxies busy 497 return None 498 499 def _initialize_proxy_pool(self): 500 """ 501 Initialize the proxy pool with configured proxies 502 503 Called once during refresh_settings(). If no proxies configured, 504 adds localhost. Otherwise adds configured proxies. 505 """ 506 proxies = self.proxy_settings.get("proxies.urls", []) 507 508 # Handle empty/None proxy configuration - add localhost 509 if not proxies: 510 self.log.warning( 511 "No proxies configured (proxies.urls is empty or None). " 512 "Using localhost (direct connection) for all requests. " 513 "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning." 514 ) 515 self._add_proxy_to_pool(self.PROXY_LOCALHOST) 516 return 517 518 # Initialize configured proxies 519 for proxy_url in proxies: 520 self._add_proxy_to_pool(proxy_url) 521 522 self.log.debug(f"Proxy pool initialized with {len(self.proxy_pool)} proxies.") 523 524 def manage_requests(self): 525 """ 526 Manage requests asynchronously 527 528 First, make sure proxy status is up to date; then go through the list 529 of queued URLs and see if they have been requested, and release the 530 proxy accordingly. If the URL is not being requested, and a proxy is 531 available, start the request. 532 533 Note that this method does *not* return any requested data. This is 534 done in a separate function, which calls this one before returning any 535 finished requests in the original queue order (`get_results()`). 536 """ 537 # go through queue and look at the status of each URL 538 for queue_name in self.queue: 539 for i, url_metadata in enumerate(self.queue[queue_name]): 540 url = url_metadata.url 541 542 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 543 # waiting to be flushed or passed by `get_result()` 544 continue 545 546 if url_metadata.proxied and url_metadata.proxied.request.done(): 547 # collect result and buffer it for yielding 548 # done() here doesn't necessarily mean the request finished 549 # successfully, just that it has returned - a timed out 550 # request will also be done()! 551 self.log.debug(f"Request for {url} finished, collecting result") 552 url_metadata.proxied.proxy.mark_request_finished(url) 553 554 # Clean up proxies pending removal if they have no more active requests 555 proxy_url = url_metadata.proxied.proxy.proxy_url 556 if proxy_url in self.proxies_pending_removal: 557 proxy_obj = self.proxy_pool[proxy_url].proxy 558 # Check if proxy has truly active requests (not just cooling off) 559 if not proxy_obj.has_active_requests(): 560 del self.proxy_pool[proxy_url] 561 if proxy_url in self.proxy_health: 562 del self.proxy_health[proxy_url] 563 self.proxies_pending_removal.discard(proxy_url) 564 self.log.info(f"Removed proxy {proxy_url} (completed all active requests)") 565 try: 566 response = url_metadata.proxied.request.result() 567 # annotate the response so processors can see which 568 # proxy (if any) handled the request 569 setattr( 570 response, 571 "_4cat_proxy", 572 url_metadata.proxied.proxy.proxy_url, 573 ) 574 url_metadata.proxied.result = response 575 576 except requests.exceptions.ProxyError as e: 577 # Proxy connection issue - mark proxy as unhealthy and requeue 578 proxy_url = url_metadata.proxied.proxy.proxy_url 579 580 # Mark this proxy as unhealthy 581 self.proxy_health[proxy_url] = False 582 583 # Log warning once per proxy 584 if proxy_url not in self.proxy_warnings_logged: 585 self.proxy_warnings_logged.add(proxy_url) 586 self.log.warning( 587 f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}" 588 ) 589 590 # Reset URL to queued so it will retry with a different proxy 591 url_metadata.status = self.REQUEST_STATUS_QUEUED 592 url_metadata.proxied = None 593 # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow 594 595 except ( 596 ConnectionError, 597 asyncioCancelledError, 598 futureCancelledError, 599 requests.exceptions.RequestException, 600 urllib3.exceptions.HTTPError, 601 ) as e: 602 # this is where timeouts, etc, go 603 url_metadata.proxied.result = FailedProxiedRequest( 604 e, url_metadata.proxied.proxy.proxy_url 605 ) 606 607 finally: 608 # success or fail, we can pass it on 609 # Only set to waiting if not requeued by ProxyError handler 610 if url_metadata.status != self.REQUEST_STATUS_QUEUED: 611 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 612 613 else: 614 # running - ignore for now 615 # could do some health checks here... 616 # logging.debug(f"Request for {url} running...") 617 pass 618 619 if not url_metadata.proxied and not ( 620 queue_name in self.halted or "_" in self.halted 621 ): 622 # no request running for this URL yet, try to start one 623 try: 624 proxy = self.claim_proxy(url) 625 except NoProxiesAvailableError as e: 626 # All proxies failed and fallback disabled - fail this request 627 url_metadata.proxied = namedtuple( 628 "DelegatedRequest", 629 ("request", "created", "result", "proxy", "url", "index"), 630 ) 631 url_metadata.proxied.request = None 632 url_metadata.proxied.created = time.time() 633 url_metadata.proxied.result = FailedProxiedRequest(e, None) 634 url_metadata.proxied.proxy = None 635 url_metadata.proxied.url = url 636 url_metadata.proxied.index = url_metadata.index 637 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 638 self.queue[queue_name][i] = url_metadata 639 continue 640 641 if proxy is None: 642 # No proxy available (all busy) 643 # Try again next loop iteration 644 continue 645 646 proxy_url = proxy.proxy_url 647 proxy_definition = ( 648 {"http": proxy_url, "https": proxy_url} 649 if proxy_url != self.PROXY_LOCALHOST 650 else None 651 ) 652 653 # start request for URL 654 self.log.debug(f"Request for {url} started") 655 request = namedtuple( 656 "DelegatedRequest", 657 ( 658 "request", 659 "created", 660 "result", 661 "proxy", 662 "url", 663 "index", 664 ), 665 ) 666 request.created = time.time() 667 request.request = self.session.get( 668 **{ 669 "url": url, 670 "timeout": 30, 671 "proxies": proxy_definition, 672 **url_metadata.kwargs 673 } 674 ) 675 676 request.proxy = proxy 677 request.url = url 678 request.index = ( 679 url_metadata.index 680 ) # this is to allow for multiple requests for the same URL 681 682 url_metadata.status = self.REQUEST_STATUS_STARTED 683 url_metadata.proxied = request 684 685 proxy.mark_request_started(url) 686 687 self.queue[queue_name][i] = url_metadata 688 689 def get_results(self, queue_name="_", preserve_order=True): 690 """ 691 Return available results, without skipping 692 693 Loops through the queue, returning values (and updating the queue) for 694 requests that have been finished. If a request is not finished yet, 695 stop returning. This ensures that in the end, values are only ever 696 returned in the original queue order, at the cost of potential 697 buffering. 698 699 :param str queue_name: Queue name to get results from 700 :param bool preserve_order: Return results in the order they were 701 added to the queue. This means that other results are buffered and 702 potentially remain in the queue, which may in the worst case 703 significantly slow down data collection. For example, if the first 704 request in the queue takes a really long time while all other 705 requests are already finished, the queue will nevertheless remain 706 'full'. 707 708 :return: 709 """ 710 self.manage_requests() 711 712 # no results, no return 713 if queue_name not in self.queue: 714 return 715 716 # use list comprehensions here to avoid having to modify the 717 # lists while iterating through them 718 for url_metadata in [u for u in self.queue[queue_name]]: 719 # for each URL in the queue... 720 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 721 # see if a finished request is available... 722 self.queue[queue_name].remove(url_metadata) 723 yield url_metadata.url, url_metadata.proxied.result 724 725 elif preserve_order: 726 # ...but as soon as a URL has no finished result, return 727 # unless we don't care about the order, then continue and yield 728 # as much as possible 729 return 730 731 def _halt(self, queue_name="_"): 732 """ 733 Interrupt fetching of results 734 735 Can be used when 4CAT is interrupted. Clears queue and tries to cancel 736 running requests. 737 738 Note that running requests *cannot* always be cancelled via `.cancel()` 739 particularly when using `stream=True`. It is therefore recommended to 740 use `halt_and_wait()` which is blocking until all running requests have 741 properly terminated, instead of calling this method directly. 742 743 :param str queue_name: Queue name to stop fetching results for. By 744 default, halt all queues. 745 """ 746 self.halted.add(queue_name) 747 748 for queue in self.queue: 749 if queue_name == "_" or queue_name == queue: 750 # use a list comprehension here to avoid having to modify the 751 # list while iterating through it 752 for url_metadata in [u for u in self.queue[queue]]: 753 if url_metadata.status != self.REQUEST_STATUS_STARTED: 754 self.queue[queue].remove(url_metadata) 755 else: 756 url_metadata.proxied.request.cancel() 757 758 self.halted.remove(queue_name) 759 760 def halt_and_wait(self, queue_name="_"): 761 """ 762 Cancel any queued requests and wait until ongoing ones are finished 763 764 Blocking! 765 766 :param str queue_name: Queue name to stop fetching results for. By 767 default, halt all queues. 768 """ 769 self._halt(queue_name) 770 while self.get_queue_length(queue_name) > 0: 771 # exhaust generator without doing something w/ results 772 all(self.get_results(queue_name, preserve_order=False)) 773 774 if queue_name in self.queue: 775 del self.queue[queue_name]
14class FailedProxiedRequest: 15 """ 16 A delegated request that has failed for whatever reason 17 18 The failure context (usually the exception) is stored in the `context` 19 property. We also keep track of the proxy URL that serviced the request so 20 downstream consumers can make informed retry decisions. 21 """ 22 23 context = None 24 proxy_url = None 25 26 def __init__(self, context=None, proxy_url=None): 27 self.context = context 28 self.proxy_url = proxy_url
A delegated request that has failed for whatever reason
The failure context (usually the exception) is stored in the context
property. We also keep track of the proxy URL that serviced the request so
downstream consumers can make informed retry decisions.
31class NoProxiesAvailableError(Exception): 32 """ 33 Raised when all proxies are unhealthy and localhost fallback is disabled 34 """ 35 pass
Raised when all proxies are unhealthy and localhost fallback is disabled
38class ProxyStatus: 39 """ 40 An enum of possible statuses of a SophisticatedFuturesProxy 41 """ 42 43 AVAILABLE = 3 44 CLAIMED = 4 45 RUNNING = 5 46 COOLING_OFF = 6
An enum of possible statuses of a SophisticatedFuturesProxy
49class SophisticatedFuturesProxy: 50 """ 51 A proxy that can be used in combination with the DelegatedRequestHandler 52 53 This keeps track of cooloffs, etc, to ensure that any individual proxy does 54 not request more often than it should. This is a separate class because of 55 an additional piece of logic that allows this cooloff to be kept track of 56 on a per-hostname basis. This is useful because rate limits are typically 57 enforced per site, so we can have (figuratively) unlimited concurrent 58 request as long as each is on a separate hostname, but need to be more 59 careful when requesting from a single host. 60 """ 61 62 log = None 63 looping = True 64 65 COOLOFF = 0 66 MAX_CONCURRENT_OVERALL = 0 67 MAX_CONCURRENT_PER_HOST = 0 68 69 def __init__( 70 self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2 71 ): 72 self.proxy_url = url 73 self.hostnames = {} 74 self.log = log 75 76 self.COOLOFF = cooloff 77 self.MAX_CONCURRENT_OVERALL = concurrent_overall 78 self.MAX_CONCURRENT_PER_HOST = concurrent_host 79 80 def know_hostname(self, url): 81 """ 82 Make sure the hostname is known to this proxy 83 84 This means that we can now keep track of some per-hostname statistics 85 for this hostname. If the hostname is not known yet, the statistics are 86 re-initialised. 87 88 :param str url: URL with host name to keep stats for. Case-insensitive. 89 :param str: The host name, as parsed for internal use 90 """ 91 hostname = ural.get_hostname(url).lower() 92 if hostname not in self.hostnames: 93 self.hostnames[hostname] = namedtuple( 94 "HostnameForProxiedRequests", ("running",) 95 ) 96 self.hostnames[hostname].running = [] 97 98 return hostname 99 100 def has_active_requests(self): 101 """ 102 Check if this proxy has any truly active requests. 103 104 Active means CLAIMED or RUNNING status - not COOLING_OFF. 105 COOLING_OFF requests are essentially finished and waiting to be released, 106 so they shouldn't prevent proxy removal. 107 108 :return bool: True if proxy has requests that are claimed or running 109 """ 110 for hostname, metadata in self.hostnames.values(): 111 for request in metadata.running: 112 if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING): 113 return True 114 return False 115 116 def release_cooled_off(self): 117 """ 118 Release proxies that have finished cooling off. 119 120 Proxies cool off for a certain amount of time after starting a request. 121 This method removes cooled off requests, so that new ones may fill 122 their slot. 123 """ 124 for hostname, metadata in self.hostnames.copy().items(): 125 for request in metadata.running: 126 if ( 127 request.status == ProxyStatus.COOLING_OFF 128 and request.timestamp_finished < time.time() - self.COOLOFF 129 ): 130 self.log.debug( 131 f"Releasing proxy {self.proxy_url} for host name {hostname}" 132 ) 133 self.hostnames[hostname].running.remove(request) 134 135 # get rid of hostnames with no running or cooling off 136 # requests, else this might grow indefinitely 137 if len(self.hostnames[hostname].running) == 0: 138 del self.hostnames[hostname] 139 140 def claim_for(self, url): 141 """ 142 Try claiming a slot in this proxy for the given URL 143 144 Whether a slot is available depends both on the overall concurrency 145 limit, and the per-hostname limit. If both are not maxed out, fill 146 the slot and return the proxy object. 147 148 :param str url: URL to proxy a request for. 149 :return: `False` if no proxy is available, or the 150 `SophisticatedFuturesProxy` object if one is. 151 """ 152 self.release_cooled_off() 153 hostname = self.know_hostname(url) 154 155 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 156 if total_running >= self.MAX_CONCURRENT_OVERALL: 157 return False 158 159 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 160 request = namedtuple( 161 "ProxiedRequest", 162 ("url", "status", "timestamp_started", "timestamp_finished"), 163 ) 164 request.url = url 165 request.status = ProxyStatus.CLAIMED 166 request.timestamp_started = 0 167 request.timestamp_finished = 0 168 self.hostnames[hostname].running.append(request) 169 self.log.debug( 170 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 171 ) 172 return self 173 else: 174 return False 175 176 def mark_request_started(self, url): 177 """ 178 Mark a request for a URL as started 179 180 This updates the status for the related slot. If no matching slot 181 exists that is waiting for a request to start running, a `ValueError` 182 is raised. 183 184 :param str url: URL of the proxied request. 185 """ 186 hostname = self.know_hostname(url) 187 188 for i, metadata in enumerate(self.hostnames[hostname].running): 189 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 190 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 191 self.hostnames[hostname].running[i].timestamp_started = time.time() 192 return 193 194 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!") 195 196 def mark_request_finished(self, url): 197 """ 198 Mark a request for a URL as finished 199 200 This updates the status for the related slot. If no matching slot 201 exists that is waiting for a request to finish, a `ValueError` is 202 raised. After this, the proxy will be marked as cooling off, and is 203 released after cooling off is completed. 204 205 :param str url: URL of the proxied request. 206 """ 207 hostname = self.know_hostname(url) 208 209 for i, metadata in enumerate(self.hostnames[hostname].running): 210 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 211 self.hostnames[hostname].running[i].timestamp_finished = time.time() 212 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 213 return 214 215 raise ValueError(f"No proxy is currently running a request for URL {url}!")
A proxy that can be used in combination with the DelegatedRequestHandler
This keeps track of cooloffs, etc, to ensure that any individual proxy does not request more often than it should. This is a separate class because of an additional piece of logic that allows this cooloff to be kept track of on a per-hostname basis. This is useful because rate limits are typically enforced per site, so we can have (figuratively) unlimited concurrent request as long as each is on a separate hostname, but need to be more careful when requesting from a single host.
80 def know_hostname(self, url): 81 """ 82 Make sure the hostname is known to this proxy 83 84 This means that we can now keep track of some per-hostname statistics 85 for this hostname. If the hostname is not known yet, the statistics are 86 re-initialised. 87 88 :param str url: URL with host name to keep stats for. Case-insensitive. 89 :param str: The host name, as parsed for internal use 90 """ 91 hostname = ural.get_hostname(url).lower() 92 if hostname not in self.hostnames: 93 self.hostnames[hostname] = namedtuple( 94 "HostnameForProxiedRequests", ("running",) 95 ) 96 self.hostnames[hostname].running = [] 97 98 return hostname
Make sure the hostname is known to this proxy
This means that we can now keep track of some per-hostname statistics for this hostname. If the hostname is not known yet, the statistics are re-initialised.
Parameters
- str url: URL with host name to keep stats for. Case-insensitive.
- str: The host name, as parsed for internal use
100 def has_active_requests(self): 101 """ 102 Check if this proxy has any truly active requests. 103 104 Active means CLAIMED or RUNNING status - not COOLING_OFF. 105 COOLING_OFF requests are essentially finished and waiting to be released, 106 so they shouldn't prevent proxy removal. 107 108 :return bool: True if proxy has requests that are claimed or running 109 """ 110 for hostname, metadata in self.hostnames.values(): 111 for request in metadata.running: 112 if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING): 113 return True 114 return False
Check if this proxy has any truly active requests.
Active means CLAIMED or RUNNING status - not COOLING_OFF. COOLING_OFF requests are essentially finished and waiting to be released, so they shouldn't prevent proxy removal.
Returns
True if proxy has requests that are claimed or running
116 def release_cooled_off(self): 117 """ 118 Release proxies that have finished cooling off. 119 120 Proxies cool off for a certain amount of time after starting a request. 121 This method removes cooled off requests, so that new ones may fill 122 their slot. 123 """ 124 for hostname, metadata in self.hostnames.copy().items(): 125 for request in metadata.running: 126 if ( 127 request.status == ProxyStatus.COOLING_OFF 128 and request.timestamp_finished < time.time() - self.COOLOFF 129 ): 130 self.log.debug( 131 f"Releasing proxy {self.proxy_url} for host name {hostname}" 132 ) 133 self.hostnames[hostname].running.remove(request) 134 135 # get rid of hostnames with no running or cooling off 136 # requests, else this might grow indefinitely 137 if len(self.hostnames[hostname].running) == 0: 138 del self.hostnames[hostname]
Release proxies that have finished cooling off.
Proxies cool off for a certain amount of time after starting a request. This method removes cooled off requests, so that new ones may fill their slot.
140 def claim_for(self, url): 141 """ 142 Try claiming a slot in this proxy for the given URL 143 144 Whether a slot is available depends both on the overall concurrency 145 limit, and the per-hostname limit. If both are not maxed out, fill 146 the slot and return the proxy object. 147 148 :param str url: URL to proxy a request for. 149 :return: `False` if no proxy is available, or the 150 `SophisticatedFuturesProxy` object if one is. 151 """ 152 self.release_cooled_off() 153 hostname = self.know_hostname(url) 154 155 total_running = sum([len(m.running) for h, m in self.hostnames.items()]) 156 if total_running >= self.MAX_CONCURRENT_OVERALL: 157 return False 158 159 if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST: 160 request = namedtuple( 161 "ProxiedRequest", 162 ("url", "status", "timestamp_started", "timestamp_finished"), 163 ) 164 request.url = url 165 request.status = ProxyStatus.CLAIMED 166 request.timestamp_started = 0 167 request.timestamp_finished = 0 168 self.hostnames[hostname].running.append(request) 169 self.log.debug( 170 f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)" 171 ) 172 return self 173 else: 174 return False
Try claiming a slot in this proxy for the given URL
Whether a slot is available depends both on the overall concurrency limit, and the per-hostname limit. If both are not maxed out, fill the slot and return the proxy object.
Parameters
- str url: URL to proxy a request for.
Returns
Falseif no proxy is available, or theSophisticatedFuturesProxyobject if one is.
176 def mark_request_started(self, url): 177 """ 178 Mark a request for a URL as started 179 180 This updates the status for the related slot. If no matching slot 181 exists that is waiting for a request to start running, a `ValueError` 182 is raised. 183 184 :param str url: URL of the proxied request. 185 """ 186 hostname = self.know_hostname(url) 187 188 for i, metadata in enumerate(self.hostnames[hostname].running): 189 if metadata.status == ProxyStatus.CLAIMED and metadata.url == url: 190 self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING 191 self.hostnames[hostname].running[i].timestamp_started = time.time() 192 return 193 194 raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
Mark a request for a URL as started
This updates the status for the related slot. If no matching slot
exists that is waiting for a request to start running, a ValueError
is raised.
Parameters
- str url: URL of the proxied request.
196 def mark_request_finished(self, url): 197 """ 198 Mark a request for a URL as finished 199 200 This updates the status for the related slot. If no matching slot 201 exists that is waiting for a request to finish, a `ValueError` is 202 raised. After this, the proxy will be marked as cooling off, and is 203 released after cooling off is completed. 204 205 :param str url: URL of the proxied request. 206 """ 207 hostname = self.know_hostname(url) 208 209 for i, metadata in enumerate(self.hostnames[hostname].running): 210 if metadata.status == ProxyStatus.RUNNING and metadata.url == url: 211 self.hostnames[hostname].running[i].timestamp_finished = time.time() 212 self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF 213 return 214 215 raise ValueError(f"No proxy is currently running a request for URL {url}!")
Mark a request for a URL as finished
This updates the status for the related slot. If no matching slot
exists that is waiting for a request to finish, a ValueError is
raised. After this, the proxy will be marked as cooling off, and is
released after cooling off is completed.
Parameters
- str url: URL of the proxied request.
218class DelegatedRequestHandler: 219 queue = {} 220 session = None 221 proxy_pool = {} 222 proxy_settings = {} 223 halted = set() 224 log = None 225 index = 0 226 227 # some magic values 228 REQUEST_STATUS_QUEUED = 0 229 REQUEST_STATUS_STARTED = 1 230 REQUEST_STATUS_WAITING_FOR_YIELD = 2 231 PROXY_LOCALHOST = "__localhost__" 232 233 def __init__(self, log, config): 234 pool = ThreadPoolExecutor() 235 self.session = FuturesSession(executor=pool) 236 self.log = log 237 238 # Proxy health tracking 239 self.proxy_health = {} 240 self.proxy_warnings_logged = set() 241 self.all_proxies_failed_logged = False 242 243 # Track proxies that should be removed but have active requests 244 self.proxies_pending_removal = set() 245 246 self.refresh_settings(config) 247 248 def refresh_settings(self, config): 249 """ 250 Load proxy settings 251 252 This is done on demand, so that we do not need a persistent 253 configuration reader, which could make things complicated in 254 thread-based contexts. 255 256 Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests. 257 258 :param config: Configuration reader 259 """ 260 # Load new settings 261 new_proxy_settings = { 262 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 263 "proxies.concurrent-host", "proxies.allow-localhost-fallback") 264 } 265 266 # Normalize empty config to localhost 267 if not new_proxy_settings["proxies.urls"]: 268 self.log.warning( 269 "No proxies configured (proxies.urls is empty or None). " 270 "Using localhost (direct connection) for all requests. " 271 "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning." 272 ) 273 new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST] 274 275 # Check if proxy URLs have changed 276 proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 277 self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"]) 278 279 # Update settings 280 self.proxy_settings = new_proxy_settings 281 282 # Reset proxy health tracking when settings are refreshed 283 num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]]) 284 self.proxy_health = {} 285 self.proxy_warnings_logged = set() 286 self.all_proxies_failed_logged = False 287 288 if num_proxies_restored > 0: 289 self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored") 290 291 # Initialize or update proxy pool 292 if not self.proxy_pool: 293 # First initialization 294 self._initialize_proxy_pool() 295 elif proxy_urls_changed: 296 # Settings changed - update pool 297 self._update_proxy_pool() 298 299 300 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 301 """ 302 Add URLs to the request queue 303 304 :param urls: An iterable of URLs. 305 :param queue_name: Queue name to add to. 306 :param position: Where in queue to insert; -1 adds to end of queue 307 :param kwargs: Other keyword arguments will be passed on to 308 `requests.get()` 309 """ 310 if queue_name in self.halted or "_" in self.halted: 311 # do not add URLs while delegator is shutting down 312 return 313 314 if queue_name not in self.queue: 315 self.queue[queue_name] = [] 316 317 for i, url in enumerate(urls): 318 url_metadata = namedtuple( 319 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 320 ) 321 url_metadata.url = url 322 # Make a per-URL copy of kwargs to avoid shared mutation across entries 323 per_kwargs = {**kwargs} if kwargs else {} 324 url_metadata.index = self.index 325 url_metadata.proxied = None 326 url_metadata.status = self.REQUEST_STATUS_QUEUED 327 self.index += 1 328 329 # If a response hook is provided, wrap it to inject the original URL 330 try: 331 hooks = per_kwargs.get("hooks") 332 if hooks and "response" in hooks: 333 # Copy hooks dict to avoid shared mutation 334 hooks = {**hooks} 335 original_hook = hooks["response"] 336 # Support a single callable or a list of callables 337 def wrap(h, original_url): 338 def _wrapped(resp, *a, **k): 339 # Inject FourCAT-specific original URL in kwargs once 340 if "fourcat_original_url" not in k: 341 k["fourcat_original_url"] = original_url 342 return h(resp, *a, **k) 343 return _wrapped 344 345 if isinstance(original_hook, list): 346 hooks["response"] = [wrap(h, url) for h in original_hook] 347 else: 348 hooks["response"] = wrap(original_hook, url) 349 350 # Persist modified hooks back into per-URL kwargs 351 per_kwargs["hooks"] = hooks 352 except Exception: 353 # If wrapping fails for any reason, proceed without modification 354 pass 355 356 # Assign the isolated kwargs to this metadata entry 357 url_metadata.kwargs = per_kwargs 358 359 if position == -1: 360 self.queue[queue_name].append(url_metadata) 361 else: 362 self.queue[queue_name].insert(position + i, url_metadata) 363 364 self.manage_requests() 365 366 def get_queue_length(self, queue_name="_"): 367 """ 368 Get the length, of the queue 369 370 :param str queue_name: Queue name 371 :return int: Amount of URLs in the queue (regardless of status) 372 """ 373 queue_length = 0 374 for queue in self.queue: 375 if queue == queue_name or queue_name == "_": 376 queue_length += len(self.queue[queue_name]) 377 378 return queue_length 379 380 def _update_proxy_pool(self): 381 """ 382 Update proxy pool when settings change 383 384 Adds new proxies, removes old ones, and preserves proxies with active requests. 385 Updates settings for existing proxies that remain in the configuration. 386 """ 387 new_proxy_urls = set(self.proxy_settings["proxies.urls"]) 388 current_proxy_urls = set(self.proxy_pool.keys()) 389 390 # Find proxies to add and remove 391 proxies_to_add = new_proxy_urls - current_proxy_urls 392 proxies_to_remove = current_proxy_urls - new_proxy_urls 393 proxies_to_keep = current_proxy_urls & new_proxy_urls 394 395 # Remove proxies that are no longer in config 396 for proxy_url in proxies_to_remove: 397 if proxy_url in self.proxy_pool: 398 proxy_obj = self.proxy_pool[proxy_url].proxy 399 # Check if proxy has truly active requests (CLAIMED or RUNNING, not COOLING_OFF) 400 if not proxy_obj.has_active_requests(): 401 # No active requests - remove immediately 402 del self.proxy_pool[proxy_url] 403 if proxy_url in self.proxy_health: 404 del self.proxy_health[proxy_url] 405 self.proxies_pending_removal.discard(proxy_url) 406 self.log.info(f"Removed proxy {proxy_url} (no longer in config)") 407 else: 408 # Has active requests - mark for removal but don't delete yet 409 self.proxies_pending_removal.add(proxy_url) 410 self.log.warning(f"Proxy {proxy_url} marked for removal (has active requests, will not accept new requests)") 411 412 # Update settings for existing proxies 413 for proxy_url in proxies_to_keep: 414 if proxy_url in self.proxy_pool: 415 proxy_obj = self.proxy_pool[proxy_url].proxy 416 proxy_obj.COOLOFF = self.proxy_settings["proxies.cooloff"] 417 proxy_obj.MAX_CONCURRENT_OVERALL = self.proxy_settings["proxies.concurrent-overall"] 418 proxy_obj.MAX_CONCURRENT_PER_HOST = self.proxy_settings["proxies.concurrent-host"] 419 420 # Add new proxies 421 for proxy_url in proxies_to_add: 422 self._add_proxy_to_pool(proxy_url) 423 self.log.info(f"Added new proxy {proxy_url}") 424 425 if proxies_to_add or proxies_to_remove: 426 self.log.info(f"Proxy pool updated: {len(self.proxy_pool)} total proxies " 427 f"({len(proxies_to_add)} added, {len([p for p in proxies_to_remove if p not in self.proxy_pool])} removed)") 428 429 def _add_proxy_to_pool(self, proxy_url): 430 """ 431 Add a proxy to the pool 432 433 Helper method to avoid duplicating proxy creation logic. 434 435 :param str proxy_url: The proxy URL to add (or PROXY_LOCALHOST for direct connection) 436 """ 437 self.proxy_pool[proxy_url] = namedtuple("ProxyEntry", ("proxy", "last_used")) 438 self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy( 439 proxy_url, 440 self.log, 441 self.proxy_settings.get("proxies.cooloff", 0.1), 442 self.proxy_settings.get("proxies.concurrent-overall", 5), 443 self.proxy_settings.get("proxies.concurrent-host", 2), 444 ) 445 self.proxy_pool[proxy_url].last_used = 0 446 self.proxy_health[proxy_url] = True 447 448 def claim_proxy(self, url): 449 """ 450 Find a proxy to do the request with 451 452 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 453 454 :param str url: URL to proxy a request for 455 :return SophisticatedFuturesProxy or None: 456 - SophisticatedFuturesProxy if a proxy is available 457 - None if all proxies are busy (retry later) 458 :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled 459 """ 460 # Get healthy proxies (excluding those pending removal) 461 healthy_proxies = [p for p in self.proxy_pool 462 if self.proxy_health.get(p, True) 463 and p not in self.proxies_pending_removal] 464 465 # If no healthy proxies, check if we should create localhost 466 if not healthy_proxies: 467 allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True) 468 469 if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool: 470 # Create localhost as fallback 471 if not self.all_proxies_failed_logged: 472 self.all_proxies_failed_logged = True 473 self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).") 474 self._add_proxy_to_pool(self.PROXY_LOCALHOST) 475 self.log.info("Created localhost proxy for direct connections") 476 healthy_proxies = [self.PROXY_LOCALHOST] 477 elif not allow_localhost: 478 # Fallback disabled - fail 479 if not self.all_proxies_failed_logged: 480 self.all_proxies_failed_logged = True 481 self.log.error("All proxies are unhealthy and localhost fallback is disabled.") 482 raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled") 483 else: 484 # Localhost exists but unhealthy 485 raise NoProxiesAvailableError("All proxies including localhost are unhealthy") 486 487 # within the pool, find the least recently used healthy proxy that is available 488 sorted_by_cooloff = sorted( 489 healthy_proxies, key=lambda p: self.proxy_pool[p].last_used 490 ) 491 for proxy_id in sorted_by_cooloff: 492 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 493 if claimed_proxy: 494 self.proxy_pool[proxy_id].last_used = time.time() 495 return claimed_proxy 496 497 # All proxies busy 498 return None 499 500 def _initialize_proxy_pool(self): 501 """ 502 Initialize the proxy pool with configured proxies 503 504 Called once during refresh_settings(). If no proxies configured, 505 adds localhost. Otherwise adds configured proxies. 506 """ 507 proxies = self.proxy_settings.get("proxies.urls", []) 508 509 # Handle empty/None proxy configuration - add localhost 510 if not proxies: 511 self.log.warning( 512 "No proxies configured (proxies.urls is empty or None). " 513 "Using localhost (direct connection) for all requests. " 514 "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning." 515 ) 516 self._add_proxy_to_pool(self.PROXY_LOCALHOST) 517 return 518 519 # Initialize configured proxies 520 for proxy_url in proxies: 521 self._add_proxy_to_pool(proxy_url) 522 523 self.log.debug(f"Proxy pool initialized with {len(self.proxy_pool)} proxies.") 524 525 def manage_requests(self): 526 """ 527 Manage requests asynchronously 528 529 First, make sure proxy status is up to date; then go through the list 530 of queued URLs and see if they have been requested, and release the 531 proxy accordingly. If the URL is not being requested, and a proxy is 532 available, start the request. 533 534 Note that this method does *not* return any requested data. This is 535 done in a separate function, which calls this one before returning any 536 finished requests in the original queue order (`get_results()`). 537 """ 538 # go through queue and look at the status of each URL 539 for queue_name in self.queue: 540 for i, url_metadata in enumerate(self.queue[queue_name]): 541 url = url_metadata.url 542 543 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 544 # waiting to be flushed or passed by `get_result()` 545 continue 546 547 if url_metadata.proxied and url_metadata.proxied.request.done(): 548 # collect result and buffer it for yielding 549 # done() here doesn't necessarily mean the request finished 550 # successfully, just that it has returned - a timed out 551 # request will also be done()! 552 self.log.debug(f"Request for {url} finished, collecting result") 553 url_metadata.proxied.proxy.mark_request_finished(url) 554 555 # Clean up proxies pending removal if they have no more active requests 556 proxy_url = url_metadata.proxied.proxy.proxy_url 557 if proxy_url in self.proxies_pending_removal: 558 proxy_obj = self.proxy_pool[proxy_url].proxy 559 # Check if proxy has truly active requests (not just cooling off) 560 if not proxy_obj.has_active_requests(): 561 del self.proxy_pool[proxy_url] 562 if proxy_url in self.proxy_health: 563 del self.proxy_health[proxy_url] 564 self.proxies_pending_removal.discard(proxy_url) 565 self.log.info(f"Removed proxy {proxy_url} (completed all active requests)") 566 try: 567 response = url_metadata.proxied.request.result() 568 # annotate the response so processors can see which 569 # proxy (if any) handled the request 570 setattr( 571 response, 572 "_4cat_proxy", 573 url_metadata.proxied.proxy.proxy_url, 574 ) 575 url_metadata.proxied.result = response 576 577 except requests.exceptions.ProxyError as e: 578 # Proxy connection issue - mark proxy as unhealthy and requeue 579 proxy_url = url_metadata.proxied.proxy.proxy_url 580 581 # Mark this proxy as unhealthy 582 self.proxy_health[proxy_url] = False 583 584 # Log warning once per proxy 585 if proxy_url not in self.proxy_warnings_logged: 586 self.proxy_warnings_logged.add(proxy_url) 587 self.log.warning( 588 f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}" 589 ) 590 591 # Reset URL to queued so it will retry with a different proxy 592 url_metadata.status = self.REQUEST_STATUS_QUEUED 593 url_metadata.proxied = None 594 # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow 595 596 except ( 597 ConnectionError, 598 asyncioCancelledError, 599 futureCancelledError, 600 requests.exceptions.RequestException, 601 urllib3.exceptions.HTTPError, 602 ) as e: 603 # this is where timeouts, etc, go 604 url_metadata.proxied.result = FailedProxiedRequest( 605 e, url_metadata.proxied.proxy.proxy_url 606 ) 607 608 finally: 609 # success or fail, we can pass it on 610 # Only set to waiting if not requeued by ProxyError handler 611 if url_metadata.status != self.REQUEST_STATUS_QUEUED: 612 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 613 614 else: 615 # running - ignore for now 616 # could do some health checks here... 617 # logging.debug(f"Request for {url} running...") 618 pass 619 620 if not url_metadata.proxied and not ( 621 queue_name in self.halted or "_" in self.halted 622 ): 623 # no request running for this URL yet, try to start one 624 try: 625 proxy = self.claim_proxy(url) 626 except NoProxiesAvailableError as e: 627 # All proxies failed and fallback disabled - fail this request 628 url_metadata.proxied = namedtuple( 629 "DelegatedRequest", 630 ("request", "created", "result", "proxy", "url", "index"), 631 ) 632 url_metadata.proxied.request = None 633 url_metadata.proxied.created = time.time() 634 url_metadata.proxied.result = FailedProxiedRequest(e, None) 635 url_metadata.proxied.proxy = None 636 url_metadata.proxied.url = url 637 url_metadata.proxied.index = url_metadata.index 638 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 639 self.queue[queue_name][i] = url_metadata 640 continue 641 642 if proxy is None: 643 # No proxy available (all busy) 644 # Try again next loop iteration 645 continue 646 647 proxy_url = proxy.proxy_url 648 proxy_definition = ( 649 {"http": proxy_url, "https": proxy_url} 650 if proxy_url != self.PROXY_LOCALHOST 651 else None 652 ) 653 654 # start request for URL 655 self.log.debug(f"Request for {url} started") 656 request = namedtuple( 657 "DelegatedRequest", 658 ( 659 "request", 660 "created", 661 "result", 662 "proxy", 663 "url", 664 "index", 665 ), 666 ) 667 request.created = time.time() 668 request.request = self.session.get( 669 **{ 670 "url": url, 671 "timeout": 30, 672 "proxies": proxy_definition, 673 **url_metadata.kwargs 674 } 675 ) 676 677 request.proxy = proxy 678 request.url = url 679 request.index = ( 680 url_metadata.index 681 ) # this is to allow for multiple requests for the same URL 682 683 url_metadata.status = self.REQUEST_STATUS_STARTED 684 url_metadata.proxied = request 685 686 proxy.mark_request_started(url) 687 688 self.queue[queue_name][i] = url_metadata 689 690 def get_results(self, queue_name="_", preserve_order=True): 691 """ 692 Return available results, without skipping 693 694 Loops through the queue, returning values (and updating the queue) for 695 requests that have been finished. If a request is not finished yet, 696 stop returning. This ensures that in the end, values are only ever 697 returned in the original queue order, at the cost of potential 698 buffering. 699 700 :param str queue_name: Queue name to get results from 701 :param bool preserve_order: Return results in the order they were 702 added to the queue. This means that other results are buffered and 703 potentially remain in the queue, which may in the worst case 704 significantly slow down data collection. For example, if the first 705 request in the queue takes a really long time while all other 706 requests are already finished, the queue will nevertheless remain 707 'full'. 708 709 :return: 710 """ 711 self.manage_requests() 712 713 # no results, no return 714 if queue_name not in self.queue: 715 return 716 717 # use list comprehensions here to avoid having to modify the 718 # lists while iterating through them 719 for url_metadata in [u for u in self.queue[queue_name]]: 720 # for each URL in the queue... 721 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 722 # see if a finished request is available... 723 self.queue[queue_name].remove(url_metadata) 724 yield url_metadata.url, url_metadata.proxied.result 725 726 elif preserve_order: 727 # ...but as soon as a URL has no finished result, return 728 # unless we don't care about the order, then continue and yield 729 # as much as possible 730 return 731 732 def _halt(self, queue_name="_"): 733 """ 734 Interrupt fetching of results 735 736 Can be used when 4CAT is interrupted. Clears queue and tries to cancel 737 running requests. 738 739 Note that running requests *cannot* always be cancelled via `.cancel()` 740 particularly when using `stream=True`. It is therefore recommended to 741 use `halt_and_wait()` which is blocking until all running requests have 742 properly terminated, instead of calling this method directly. 743 744 :param str queue_name: Queue name to stop fetching results for. By 745 default, halt all queues. 746 """ 747 self.halted.add(queue_name) 748 749 for queue in self.queue: 750 if queue_name == "_" or queue_name == queue: 751 # use a list comprehension here to avoid having to modify the 752 # list while iterating through it 753 for url_metadata in [u for u in self.queue[queue]]: 754 if url_metadata.status != self.REQUEST_STATUS_STARTED: 755 self.queue[queue].remove(url_metadata) 756 else: 757 url_metadata.proxied.request.cancel() 758 759 self.halted.remove(queue_name) 760 761 def halt_and_wait(self, queue_name="_"): 762 """ 763 Cancel any queued requests and wait until ongoing ones are finished 764 765 Blocking! 766 767 :param str queue_name: Queue name to stop fetching results for. By 768 default, halt all queues. 769 """ 770 self._halt(queue_name) 771 while self.get_queue_length(queue_name) > 0: 772 # exhaust generator without doing something w/ results 773 all(self.get_results(queue_name, preserve_order=False)) 774 775 if queue_name in self.queue: 776 del self.queue[queue_name]
233 def __init__(self, log, config): 234 pool = ThreadPoolExecutor() 235 self.session = FuturesSession(executor=pool) 236 self.log = log 237 238 # Proxy health tracking 239 self.proxy_health = {} 240 self.proxy_warnings_logged = set() 241 self.all_proxies_failed_logged = False 242 243 # Track proxies that should be removed but have active requests 244 self.proxies_pending_removal = set() 245 246 self.refresh_settings(config)
248 def refresh_settings(self, config): 249 """ 250 Load proxy settings 251 252 This is done on demand, so that we do not need a persistent 253 configuration reader, which could make things complicated in 254 thread-based contexts. 255 256 Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests. 257 258 :param config: Configuration reader 259 """ 260 # Load new settings 261 new_proxy_settings = { 262 k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall", 263 "proxies.concurrent-host", "proxies.allow-localhost-fallback") 264 } 265 266 # Normalize empty config to localhost 267 if not new_proxy_settings["proxies.urls"]: 268 self.log.warning( 269 "No proxies configured (proxies.urls is empty or None). " 270 "Using localhost (direct connection) for all requests. " 271 "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning." 272 ) 273 new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST] 274 275 # Check if proxy URLs have changed 276 proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 277 self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"]) 278 279 # Update settings 280 self.proxy_settings = new_proxy_settings 281 282 # Reset proxy health tracking when settings are refreshed 283 num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]]) 284 self.proxy_health = {} 285 self.proxy_warnings_logged = set() 286 self.all_proxies_failed_logged = False 287 288 if num_proxies_restored > 0: 289 self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored") 290 291 # Initialize or update proxy pool 292 if not self.proxy_pool: 293 # First initialization 294 self._initialize_proxy_pool() 295 elif proxy_urls_changed: 296 # Settings changed - update pool 297 self._update_proxy_pool()
Load proxy settings
This is done on demand, so that we do not need a persistent configuration reader, which could make things complicated in thread-based contexts.
Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests.
Parameters
- config: Configuration reader
300 def add_urls(self, urls, queue_name="_", position=-1, **kwargs): 301 """ 302 Add URLs to the request queue 303 304 :param urls: An iterable of URLs. 305 :param queue_name: Queue name to add to. 306 :param position: Where in queue to insert; -1 adds to end of queue 307 :param kwargs: Other keyword arguments will be passed on to 308 `requests.get()` 309 """ 310 if queue_name in self.halted or "_" in self.halted: 311 # do not add URLs while delegator is shutting down 312 return 313 314 if queue_name not in self.queue: 315 self.queue[queue_name] = [] 316 317 for i, url in enumerate(urls): 318 url_metadata = namedtuple( 319 "UrlForDelegatedRequest", ("url", "args", "status", "proxied") 320 ) 321 url_metadata.url = url 322 # Make a per-URL copy of kwargs to avoid shared mutation across entries 323 per_kwargs = {**kwargs} if kwargs else {} 324 url_metadata.index = self.index 325 url_metadata.proxied = None 326 url_metadata.status = self.REQUEST_STATUS_QUEUED 327 self.index += 1 328 329 # If a response hook is provided, wrap it to inject the original URL 330 try: 331 hooks = per_kwargs.get("hooks") 332 if hooks and "response" in hooks: 333 # Copy hooks dict to avoid shared mutation 334 hooks = {**hooks} 335 original_hook = hooks["response"] 336 # Support a single callable or a list of callables 337 def wrap(h, original_url): 338 def _wrapped(resp, *a, **k): 339 # Inject FourCAT-specific original URL in kwargs once 340 if "fourcat_original_url" not in k: 341 k["fourcat_original_url"] = original_url 342 return h(resp, *a, **k) 343 return _wrapped 344 345 if isinstance(original_hook, list): 346 hooks["response"] = [wrap(h, url) for h in original_hook] 347 else: 348 hooks["response"] = wrap(original_hook, url) 349 350 # Persist modified hooks back into per-URL kwargs 351 per_kwargs["hooks"] = hooks 352 except Exception: 353 # If wrapping fails for any reason, proceed without modification 354 pass 355 356 # Assign the isolated kwargs to this metadata entry 357 url_metadata.kwargs = per_kwargs 358 359 if position == -1: 360 self.queue[queue_name].append(url_metadata) 361 else: 362 self.queue[queue_name].insert(position + i, url_metadata) 363 364 self.manage_requests()
Add URLs to the request queue
Parameters
- urls: An iterable of URLs.
- queue_name: Queue name to add to.
- position: Where in queue to insert; -1 adds to end of queue
- kwargs: Other keyword arguments will be passed on to
requests.get()
366 def get_queue_length(self, queue_name="_"): 367 """ 368 Get the length, of the queue 369 370 :param str queue_name: Queue name 371 :return int: Amount of URLs in the queue (regardless of status) 372 """ 373 queue_length = 0 374 for queue in self.queue: 375 if queue == queue_name or queue_name == "_": 376 queue_length += len(self.queue[queue_name]) 377 378 return queue_length
Get the length, of the queue
Parameters
- str queue_name: Queue name
Returns
Amount of URLs in the queue (regardless of status)
448 def claim_proxy(self, url): 449 """ 450 Find a proxy to do the request with 451 452 Finds a `SophisticatedFuturesProxy` that has an open slot for this URL. 453 454 :param str url: URL to proxy a request for 455 :return SophisticatedFuturesProxy or None: 456 - SophisticatedFuturesProxy if a proxy is available 457 - None if all proxies are busy (retry later) 458 :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled 459 """ 460 # Get healthy proxies (excluding those pending removal) 461 healthy_proxies = [p for p in self.proxy_pool 462 if self.proxy_health.get(p, True) 463 and p not in self.proxies_pending_removal] 464 465 # If no healthy proxies, check if we should create localhost 466 if not healthy_proxies: 467 allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True) 468 469 if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool: 470 # Create localhost as fallback 471 if not self.all_proxies_failed_logged: 472 self.all_proxies_failed_logged = True 473 self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).") 474 self._add_proxy_to_pool(self.PROXY_LOCALHOST) 475 self.log.info("Created localhost proxy for direct connections") 476 healthy_proxies = [self.PROXY_LOCALHOST] 477 elif not allow_localhost: 478 # Fallback disabled - fail 479 if not self.all_proxies_failed_logged: 480 self.all_proxies_failed_logged = True 481 self.log.error("All proxies are unhealthy and localhost fallback is disabled.") 482 raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled") 483 else: 484 # Localhost exists but unhealthy 485 raise NoProxiesAvailableError("All proxies including localhost are unhealthy") 486 487 # within the pool, find the least recently used healthy proxy that is available 488 sorted_by_cooloff = sorted( 489 healthy_proxies, key=lambda p: self.proxy_pool[p].last_used 490 ) 491 for proxy_id in sorted_by_cooloff: 492 claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url) 493 if claimed_proxy: 494 self.proxy_pool[proxy_id].last_used = time.time() 495 return claimed_proxy 496 497 # All proxies busy 498 return None
Find a proxy to do the request with
Finds a SophisticatedFuturesProxy that has an open slot for this URL.
Parameters
- str url: URL to proxy a request for
Returns
- SophisticatedFuturesProxy if a proxy is available - None if all proxies are busy (retry later)
Raises
- NoProxiesAvailableError: If all proxies unhealthy and fallback disabled
525 def manage_requests(self): 526 """ 527 Manage requests asynchronously 528 529 First, make sure proxy status is up to date; then go through the list 530 of queued URLs and see if they have been requested, and release the 531 proxy accordingly. If the URL is not being requested, and a proxy is 532 available, start the request. 533 534 Note that this method does *not* return any requested data. This is 535 done in a separate function, which calls this one before returning any 536 finished requests in the original queue order (`get_results()`). 537 """ 538 # go through queue and look at the status of each URL 539 for queue_name in self.queue: 540 for i, url_metadata in enumerate(self.queue[queue_name]): 541 url = url_metadata.url 542 543 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 544 # waiting to be flushed or passed by `get_result()` 545 continue 546 547 if url_metadata.proxied and url_metadata.proxied.request.done(): 548 # collect result and buffer it for yielding 549 # done() here doesn't necessarily mean the request finished 550 # successfully, just that it has returned - a timed out 551 # request will also be done()! 552 self.log.debug(f"Request for {url} finished, collecting result") 553 url_metadata.proxied.proxy.mark_request_finished(url) 554 555 # Clean up proxies pending removal if they have no more active requests 556 proxy_url = url_metadata.proxied.proxy.proxy_url 557 if proxy_url in self.proxies_pending_removal: 558 proxy_obj = self.proxy_pool[proxy_url].proxy 559 # Check if proxy has truly active requests (not just cooling off) 560 if not proxy_obj.has_active_requests(): 561 del self.proxy_pool[proxy_url] 562 if proxy_url in self.proxy_health: 563 del self.proxy_health[proxy_url] 564 self.proxies_pending_removal.discard(proxy_url) 565 self.log.info(f"Removed proxy {proxy_url} (completed all active requests)") 566 try: 567 response = url_metadata.proxied.request.result() 568 # annotate the response so processors can see which 569 # proxy (if any) handled the request 570 setattr( 571 response, 572 "_4cat_proxy", 573 url_metadata.proxied.proxy.proxy_url, 574 ) 575 url_metadata.proxied.result = response 576 577 except requests.exceptions.ProxyError as e: 578 # Proxy connection issue - mark proxy as unhealthy and requeue 579 proxy_url = url_metadata.proxied.proxy.proxy_url 580 581 # Mark this proxy as unhealthy 582 self.proxy_health[proxy_url] = False 583 584 # Log warning once per proxy 585 if proxy_url not in self.proxy_warnings_logged: 586 self.proxy_warnings_logged.add(proxy_url) 587 self.log.warning( 588 f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}" 589 ) 590 591 # Reset URL to queued so it will retry with a different proxy 592 url_metadata.status = self.REQUEST_STATUS_QUEUED 593 url_metadata.proxied = None 594 # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow 595 596 except ( 597 ConnectionError, 598 asyncioCancelledError, 599 futureCancelledError, 600 requests.exceptions.RequestException, 601 urllib3.exceptions.HTTPError, 602 ) as e: 603 # this is where timeouts, etc, go 604 url_metadata.proxied.result = FailedProxiedRequest( 605 e, url_metadata.proxied.proxy.proxy_url 606 ) 607 608 finally: 609 # success or fail, we can pass it on 610 # Only set to waiting if not requeued by ProxyError handler 611 if url_metadata.status != self.REQUEST_STATUS_QUEUED: 612 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 613 614 else: 615 # running - ignore for now 616 # could do some health checks here... 617 # logging.debug(f"Request for {url} running...") 618 pass 619 620 if not url_metadata.proxied and not ( 621 queue_name in self.halted or "_" in self.halted 622 ): 623 # no request running for this URL yet, try to start one 624 try: 625 proxy = self.claim_proxy(url) 626 except NoProxiesAvailableError as e: 627 # All proxies failed and fallback disabled - fail this request 628 url_metadata.proxied = namedtuple( 629 "DelegatedRequest", 630 ("request", "created", "result", "proxy", "url", "index"), 631 ) 632 url_metadata.proxied.request = None 633 url_metadata.proxied.created = time.time() 634 url_metadata.proxied.result = FailedProxiedRequest(e, None) 635 url_metadata.proxied.proxy = None 636 url_metadata.proxied.url = url 637 url_metadata.proxied.index = url_metadata.index 638 url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD 639 self.queue[queue_name][i] = url_metadata 640 continue 641 642 if proxy is None: 643 # No proxy available (all busy) 644 # Try again next loop iteration 645 continue 646 647 proxy_url = proxy.proxy_url 648 proxy_definition = ( 649 {"http": proxy_url, "https": proxy_url} 650 if proxy_url != self.PROXY_LOCALHOST 651 else None 652 ) 653 654 # start request for URL 655 self.log.debug(f"Request for {url} started") 656 request = namedtuple( 657 "DelegatedRequest", 658 ( 659 "request", 660 "created", 661 "result", 662 "proxy", 663 "url", 664 "index", 665 ), 666 ) 667 request.created = time.time() 668 request.request = self.session.get( 669 **{ 670 "url": url, 671 "timeout": 30, 672 "proxies": proxy_definition, 673 **url_metadata.kwargs 674 } 675 ) 676 677 request.proxy = proxy 678 request.url = url 679 request.index = ( 680 url_metadata.index 681 ) # this is to allow for multiple requests for the same URL 682 683 url_metadata.status = self.REQUEST_STATUS_STARTED 684 url_metadata.proxied = request 685 686 proxy.mark_request_started(url) 687 688 self.queue[queue_name][i] = url_metadata
Manage requests asynchronously
First, make sure proxy status is up to date; then go through the list of queued URLs and see if they have been requested, and release the proxy accordingly. If the URL is not being requested, and a proxy is available, start the request.
Note that this method does not return any requested data. This is
done in a separate function, which calls this one before returning any
finished requests in the original queue order (get_results()).
690 def get_results(self, queue_name="_", preserve_order=True): 691 """ 692 Return available results, without skipping 693 694 Loops through the queue, returning values (and updating the queue) for 695 requests that have been finished. If a request is not finished yet, 696 stop returning. This ensures that in the end, values are only ever 697 returned in the original queue order, at the cost of potential 698 buffering. 699 700 :param str queue_name: Queue name to get results from 701 :param bool preserve_order: Return results in the order they were 702 added to the queue. This means that other results are buffered and 703 potentially remain in the queue, which may in the worst case 704 significantly slow down data collection. For example, if the first 705 request in the queue takes a really long time while all other 706 requests are already finished, the queue will nevertheless remain 707 'full'. 708 709 :return: 710 """ 711 self.manage_requests() 712 713 # no results, no return 714 if queue_name not in self.queue: 715 return 716 717 # use list comprehensions here to avoid having to modify the 718 # lists while iterating through them 719 for url_metadata in [u for u in self.queue[queue_name]]: 720 # for each URL in the queue... 721 if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD: 722 # see if a finished request is available... 723 self.queue[queue_name].remove(url_metadata) 724 yield url_metadata.url, url_metadata.proxied.result 725 726 elif preserve_order: 727 # ...but as soon as a URL has no finished result, return 728 # unless we don't care about the order, then continue and yield 729 # as much as possible 730 return
Return available results, without skipping
Loops through the queue, returning values (and updating the queue) for requests that have been finished. If a request is not finished yet, stop returning. This ensures that in the end, values are only ever returned in the original queue order, at the cost of potential buffering.
Parameters
- str queue_name: Queue name to get results from
- bool preserve_order: Return results in the order they were added to the queue. This means that other results are buffered and potentially remain in the queue, which may in the worst case significantly slow down data collection. For example, if the first request in the queue takes a really long time while all other requests are already finished, the queue will nevertheless remain 'full'.
Returns
761 def halt_and_wait(self, queue_name="_"): 762 """ 763 Cancel any queued requests and wait until ongoing ones are finished 764 765 Blocking! 766 767 :param str queue_name: Queue name to stop fetching results for. By 768 default, halt all queues. 769 """ 770 self._halt(queue_name) 771 while self.get_queue_length(queue_name) > 0: 772 # exhaust generator without doing something w/ results 773 all(self.get_results(queue_name, preserve_order=False)) 774 775 if queue_name in self.queue: 776 del self.queue[queue_name]
Cancel any queued requests and wait until ongoing ones are finished
Blocking!
Parameters
- str queue_name: Queue name to stop fetching results for. By default, halt all queues.