Edit on GitHub

backend.lib.proxied_requests

  1from requests_futures.sessions import FuturesSession
  2from concurrent.futures import ThreadPoolExecutor
  3
  4import time
  5import urllib3
  6import ural
  7import requests
  8
  9from collections import namedtuple
 10from asyncio import CancelledError as asyncioCancelledError
 11from concurrent.futures import CancelledError as futureCancelledError
 12
 13class FailedProxiedRequest:
 14    """
 15    A delegated request that has failed for whatever reason
 16
 17    The failure context (usually the exception) is stored in the `context`
 18    property. We also keep track of the proxy URL that serviced the request so
 19    downstream consumers can make informed retry decisions.
 20    """
 21
 22    context = None
 23    proxy_url = None
 24
 25    def __init__(self, context=None, proxy_url=None):
 26        self.context = context
 27        self.proxy_url = proxy_url
 28
 29
 30class NoProxiesAvailableError(Exception):
 31    """
 32    Raised when all proxies are unhealthy and localhost fallback is disabled
 33    """
 34    pass
 35
 36
 37class ProxyStatus:
 38    """
 39    An enum of possible statuses of a SophisticatedFuturesProxy
 40    """
 41
 42    AVAILABLE = 3
 43    CLAIMED = 4
 44    RUNNING = 5
 45    COOLING_OFF = 6
 46
 47
 48class SophisticatedFuturesProxy:
 49    """
 50    A proxy that can be used in combination with the DelegatedRequestHandler
 51
 52    This keeps track of cooloffs, etc, to ensure that any individual proxy does
 53    not request more often than it should. This is a separate class because of
 54    an additional piece of logic that allows this cooloff to be kept track of
 55    on a per-hostname basis. This is useful because rate limits are typically
 56    enforced per site, so we can have (figuratively) unlimited concurrent
 57    request as long as each is on a separate hostname, but need to be more
 58    careful when requesting from a single host.
 59    """
 60
 61    log = None
 62    looping = True
 63
 64    COOLOFF = 0
 65    MAX_CONCURRENT_OVERALL = 0
 66    MAX_CONCURRENT_PER_HOST = 0
 67
 68    def __init__(
 69        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
 70    ):
 71        self.proxy_url = url
 72        self.hostnames = {}
 73        self.log = log
 74
 75        self.COOLOFF = cooloff
 76        self.MAX_CONCURRENT_OVERALL = concurrent_overall
 77        self.MAX_CONCURRENT_PER_HOST = concurrent_host
 78
 79    def know_hostname(self, url):
 80        """
 81        Make sure the hostname is known to this proxy
 82
 83        This means that we can now keep track of some per-hostname statistics
 84        for this hostname. If the hostname is not known yet, the statistics are
 85        re-initialised.
 86
 87        :param str url:  URL with host name to keep stats for. Case-insensitive.
 88        :param str:  The host name, as parsed for internal use
 89        """
 90        hostname = ural.get_hostname(url).lower()
 91        if hostname not in self.hostnames:
 92            self.hostnames[hostname] = namedtuple(
 93                "HostnameForProxiedRequests", ("running",)
 94            )
 95            self.hostnames[hostname].running = []
 96
 97        return hostname
 98
 99    def has_active_requests(self):
100        """
101        Check if this proxy has any truly active requests.
102        
103        Active means CLAIMED or RUNNING status - not COOLING_OFF.
104        COOLING_OFF requests are essentially finished and waiting to be released,
105        so they shouldn't prevent proxy removal.
106        
107        :return bool: True if proxy has requests that are claimed or running
108        """
109        for hostname, metadata in self.hostnames.values():
110            for request in metadata.running:
111                if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING):
112                    return True
113        return False
114    
115    def release_cooled_off(self):
116        """
117        Release proxies that have finished cooling off.
118
119        Proxies cool off for a certain amount of time after starting a request.
120        This method removes cooled off requests, so that new ones may fill
121        their slot.
122        """
123        for hostname, metadata in self.hostnames.copy().items():
124            for request in metadata.running:
125                if (
126                    request.status == ProxyStatus.COOLING_OFF
127                    and request.timestamp_finished < time.time() - self.COOLOFF
128                ):
129                    self.log.debug(
130                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
131                    )
132                    self.hostnames[hostname].running.remove(request)
133
134                    # get rid of hostnames with no running or cooling off
135                    # requests, else this might grow indefinitely
136                    if len(self.hostnames[hostname].running) == 0:
137                        del self.hostnames[hostname]
138
139    def claim_for(self, url):
140        """
141        Try claiming a slot in this proxy for the given URL
142
143        Whether a slot is available depends both on the overall concurrency
144        limit, and the per-hostname limit. If both are not maxed out, fill
145        the slot and return the proxy object.
146
147        :param str url:  URL to proxy a request for.
148        :return: `False` if no proxy is available, or the
149        `SophisticatedFuturesProxy` object if one is.
150        """
151        self.release_cooled_off()
152        hostname = self.know_hostname(url)
153
154        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
155        if total_running >= self.MAX_CONCURRENT_OVERALL:
156            return False
157
158        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
159            request = namedtuple(
160                "ProxiedRequest",
161                ("url", "status", "timestamp_started", "timestamp_finished"),
162            )
163            request.url = url
164            request.status = ProxyStatus.CLAIMED
165            request.timestamp_started = 0
166            request.timestamp_finished = 0
167            self.hostnames[hostname].running.append(request)
168            self.log.debug(
169                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
170            )
171            return self
172        else:
173            return False
174
175    def mark_request_started(self, url):
176        """
177        Mark a request for a URL as started
178
179        This updates the status for the related slot. If no matching slot
180        exists that is waiting for a request to start running, a `ValueError`
181        is raised.
182
183        :param str url:  URL of the proxied request.
184        """
185        hostname = self.know_hostname(url)
186
187        for i, metadata in enumerate(self.hostnames[hostname].running):
188            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
189                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
190                self.hostnames[hostname].running[i].timestamp_started = time.time()
191                return
192
193        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
194
195    def mark_request_finished(self, url):
196        """
197        Mark a request for a URL as finished
198
199        This updates the status for the related slot. If no matching slot
200        exists that is waiting for a request to finish, a `ValueError` is
201        raised. After this, the proxy will be marked as cooling off, and is
202        released after cooling off is completed.
203
204        :param str url:  URL of the proxied request.
205        """
206        hostname = self.know_hostname(url)
207
208        for i, metadata in enumerate(self.hostnames[hostname].running):
209            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
210                self.hostnames[hostname].running[i].timestamp_finished = time.time()
211                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
212                return
213
214        raise ValueError(f"No proxy is currently running a request for URL {url}!")
215
216
217class DelegatedRequestHandler:
218    queue = {}
219    session = None
220    proxy_pool = {}
221    proxy_settings = {}
222    halted = set()
223    log = None
224    index = 0
225
226    # some magic values
227    REQUEST_STATUS_QUEUED = 0
228    REQUEST_STATUS_STARTED = 1
229    REQUEST_STATUS_WAITING_FOR_YIELD = 2
230    PROXY_LOCALHOST = "__localhost__"
231
232    def __init__(self, log, config):
233        pool = ThreadPoolExecutor()
234        self.session = FuturesSession(executor=pool)
235        self.log = log
236        
237        # Proxy health tracking
238        self.proxy_health = {}
239        self.proxy_warnings_logged = set()
240        self.all_proxies_failed_logged = False
241        
242        # Track proxies that should be removed but have active requests
243        self.proxies_pending_removal = set()
244
245        self.refresh_settings(config)
246
247    def refresh_settings(self, config):
248        """
249        Load proxy settings
250
251        This is done on demand, so that we do not need a persistent
252        configuration reader, which could make things complicated in
253        thread-based contexts.
254
255        Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests.
256
257        :param config:  Configuration reader
258        """
259        # Load new settings
260        new_proxy_settings = {
261            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
262                                            "proxies.concurrent-host", "proxies.allow-localhost-fallback")
263        }
264        
265        # Normalize empty config to localhost
266        if not new_proxy_settings["proxies.urls"]:
267            self.log.warning(
268                "No proxies configured (proxies.urls is empty or None). "
269                "Using localhost (direct connection) for all requests. "
270                "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning."
271            )
272            new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST]
273        
274        # Check if proxy URLs have changed
275        proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 
276                             self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"])
277        
278        # Update settings
279        self.proxy_settings = new_proxy_settings
280        
281        # Reset proxy health tracking when settings are refreshed
282        num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]])
283        self.proxy_health = {}
284        self.proxy_warnings_logged = set()
285        self.all_proxies_failed_logged = False
286        
287        if num_proxies_restored > 0:
288            self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored")
289        
290        # Initialize or update proxy pool
291        if not self.proxy_pool:
292            # First initialization
293            self._initialize_proxy_pool()
294        elif proxy_urls_changed:
295            # Settings changed - update pool
296            self._update_proxy_pool()
297
298
299    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
300        """
301        Add URLs to the request queue
302
303        :param urls:  An iterable of URLs.
304        :param queue_name:  Queue name to add to.
305        :param position:  Where in queue to insert; -1 adds to end of queue
306        :param kwargs: Other keyword arguments will be passed on to
307        `requests.get()`
308        """
309        if queue_name in self.halted or "_" in self.halted:
310            # do not add URLs while delegator is shutting down
311            return
312
313        if queue_name not in self.queue:
314            self.queue[queue_name] = []
315
316        for i, url in enumerate(urls):
317            url_metadata = namedtuple(
318                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
319            )
320            url_metadata.url = url
321            # Make a per-URL copy of kwargs to avoid shared mutation across entries
322            per_kwargs = {**kwargs} if kwargs else {}
323            url_metadata.index = self.index
324            url_metadata.proxied = None
325            url_metadata.status = self.REQUEST_STATUS_QUEUED
326            self.index += 1
327
328            # If a response hook is provided, wrap it to inject the original URL
329            try:
330                hooks = per_kwargs.get("hooks")
331                if hooks and "response" in hooks:
332                    # Copy hooks dict to avoid shared mutation
333                    hooks = {**hooks}
334                    original_hook = hooks["response"]
335                    # Support a single callable or a list of callables
336                    def wrap(h, original_url):
337                        def _wrapped(resp, *a, **k):
338                            # Inject FourCAT-specific original URL in kwargs once
339                            if "fourcat_original_url" not in k:
340                                k["fourcat_original_url"] = original_url
341                            return h(resp, *a, **k)
342                        return _wrapped
343
344                    if isinstance(original_hook, list):
345                        hooks["response"] = [wrap(h, url) for h in original_hook]
346                    else:
347                        hooks["response"] = wrap(original_hook, url)
348
349                    # Persist modified hooks back into per-URL kwargs
350                    per_kwargs["hooks"] = hooks
351            except Exception:
352                # If wrapping fails for any reason, proceed without modification
353                pass
354
355            # Assign the isolated kwargs to this metadata entry
356            url_metadata.kwargs = per_kwargs
357
358            if position == -1:
359                self.queue[queue_name].append(url_metadata)
360            else:
361                self.queue[queue_name].insert(position + i, url_metadata)
362
363        self.manage_requests()
364
365    def get_queue_length(self, queue_name="_"):
366        """
367        Get the length, of the queue
368
369        :param str queue_name:  Queue name
370        :return int: Amount of URLs in the queue (regardless of status)
371        """
372        queue_length = 0
373        for queue in self.queue:
374            if queue == queue_name or queue_name == "_":
375                queue_length += len(self.queue[queue_name])
376
377        return queue_length
378
379    def _update_proxy_pool(self):
380        """
381        Update proxy pool when settings change
382        
383        Adds new proxies, removes old ones, and preserves proxies with active requests.
384        Updates settings for existing proxies that remain in the configuration.
385        """
386        new_proxy_urls = set(self.proxy_settings["proxies.urls"])
387        current_proxy_urls = set(self.proxy_pool.keys())
388        
389        # Find proxies to add and remove
390        proxies_to_add = new_proxy_urls - current_proxy_urls
391        proxies_to_remove = current_proxy_urls - new_proxy_urls
392        proxies_to_keep = current_proxy_urls & new_proxy_urls
393        
394        # Remove proxies that are no longer in config
395        for proxy_url in proxies_to_remove:
396            if proxy_url in self.proxy_pool:
397                proxy_obj = self.proxy_pool[proxy_url].proxy
398                # Check if proxy has truly active requests (CLAIMED or RUNNING, not COOLING_OFF)
399                if not proxy_obj.has_active_requests():
400                    # No active requests - remove immediately
401                    del self.proxy_pool[proxy_url]
402                    if proxy_url in self.proxy_health:
403                        del self.proxy_health[proxy_url]
404                    self.proxies_pending_removal.discard(proxy_url)
405                    self.log.info(f"Removed proxy {proxy_url} (no longer in config)")
406                else:
407                    # Has active requests - mark for removal but don't delete yet
408                    self.proxies_pending_removal.add(proxy_url)
409                    self.log.warning(f"Proxy {proxy_url} marked for removal (has active requests, will not accept new requests)")
410        
411        # Update settings for existing proxies
412        for proxy_url in proxies_to_keep:
413            if proxy_url in self.proxy_pool:
414                proxy_obj = self.proxy_pool[proxy_url].proxy
415                proxy_obj.COOLOFF = self.proxy_settings["proxies.cooloff"]
416                proxy_obj.MAX_CONCURRENT_OVERALL = self.proxy_settings["proxies.concurrent-overall"]
417                proxy_obj.MAX_CONCURRENT_PER_HOST = self.proxy_settings["proxies.concurrent-host"]
418        
419        # Add new proxies
420        for proxy_url in proxies_to_add:
421            self._add_proxy_to_pool(proxy_url)
422            self.log.info(f"Added new proxy {proxy_url}")
423        
424        if proxies_to_add or proxies_to_remove:
425            self.log.info(f"Proxy pool updated: {len(self.proxy_pool)} total proxies "
426                         f"({len(proxies_to_add)} added, {len([p for p in proxies_to_remove if p not in self.proxy_pool])} removed)")
427    
428    def _add_proxy_to_pool(self, proxy_url):
429        """
430        Add a proxy to the pool
431        
432        Helper method to avoid duplicating proxy creation logic.
433        
434        :param str proxy_url: The proxy URL to add (or PROXY_LOCALHOST for direct connection)
435        """
436        self.proxy_pool[proxy_url] = namedtuple("ProxyEntry", ("proxy", "last_used"))
437        self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy(
438            proxy_url,
439            self.log,
440            self.proxy_settings.get("proxies.cooloff", 0.1),
441            self.proxy_settings.get("proxies.concurrent-overall", 5),
442            self.proxy_settings.get("proxies.concurrent-host", 2),
443        )
444        self.proxy_pool[proxy_url].last_used = 0
445        self.proxy_health[proxy_url] = True
446    
447    def claim_proxy(self, url):
448        """
449        Find a proxy to do the request with
450
451        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
452
453        :param str url:  URL to proxy a request for
454        :return SophisticatedFuturesProxy or None:
455            - SophisticatedFuturesProxy if a proxy is available
456            - None if all proxies are busy (retry later)
457        :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled
458        """
459        # Get healthy proxies (excluding those pending removal)
460        healthy_proxies = [p for p in self.proxy_pool 
461                          if self.proxy_health.get(p, True)
462                          and p not in self.proxies_pending_removal]
463        
464        # If no healthy proxies, check if we should create localhost
465        if not healthy_proxies:
466            allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True)
467            
468            if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool:
469                # Create localhost as fallback
470                if not self.all_proxies_failed_logged:
471                    self.all_proxies_failed_logged = True
472                    self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).")
473                self._add_proxy_to_pool(self.PROXY_LOCALHOST)
474                self.log.info("Created localhost proxy for direct connections")
475                healthy_proxies = [self.PROXY_LOCALHOST]
476            elif not allow_localhost:
477                # Fallback disabled - fail
478                if not self.all_proxies_failed_logged:
479                    self.all_proxies_failed_logged = True
480                    self.log.error("All proxies are unhealthy and localhost fallback is disabled.")
481                raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled")
482            else:
483                # Localhost exists but unhealthy
484                raise NoProxiesAvailableError("All proxies including localhost are unhealthy")
485        
486        # within the pool, find the least recently used healthy proxy that is available
487        sorted_by_cooloff = sorted(
488            healthy_proxies, key=lambda p: self.proxy_pool[p].last_used
489        )
490        for proxy_id in sorted_by_cooloff:
491            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
492            if claimed_proxy:
493                self.proxy_pool[proxy_id].last_used = time.time()
494                return claimed_proxy
495        
496        # All proxies busy
497        return None
498    
499    def _initialize_proxy_pool(self):
500        """
501        Initialize the proxy pool with configured proxies
502        
503        Called once during refresh_settings(). If no proxies configured,
504        adds localhost. Otherwise adds configured proxies.
505        """
506        proxies = self.proxy_settings.get("proxies.urls", [])
507        
508        # Handle empty/None proxy configuration - add localhost
509        if not proxies:
510            self.log.warning(
511                "No proxies configured (proxies.urls is empty or None). "
512                "Using localhost (direct connection) for all requests. "
513                "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning."
514            )
515            self._add_proxy_to_pool(self.PROXY_LOCALHOST)
516            return
517        
518        # Initialize configured proxies
519        for proxy_url in proxies:
520            self._add_proxy_to_pool(proxy_url)
521
522        self.log.debug(f"Proxy pool initialized with {len(self.proxy_pool)} proxies.")
523
524    def manage_requests(self):
525        """
526        Manage requests asynchronously
527
528        First, make sure proxy status is up to date; then go through the list
529        of queued URLs and see if they have been requested, and release the
530        proxy accordingly. If the URL is not being requested, and a proxy is
531        available, start the request.
532
533        Note that this method does *not* return any requested data. This is
534        done in a separate function, which calls this one before returning any
535        finished requests in the original queue order (`get_results()`).
536        """
537        # go through queue and look at the status of each URL
538        for queue_name in self.queue:
539            for i, url_metadata in enumerate(self.queue[queue_name]):
540                url = url_metadata.url
541
542                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
543                    # waiting to be flushed or passed by `get_result()`
544                    continue
545
546                if url_metadata.proxied and url_metadata.proxied.request.done():
547                    # collect result and buffer it for yielding
548                    # done() here doesn't necessarily mean the request finished
549                    # successfully, just that it has returned - a timed out
550                    # request will also be done()!
551                    self.log.debug(f"Request for {url} finished, collecting result")
552                    url_metadata.proxied.proxy.mark_request_finished(url)
553                    
554                    # Clean up proxies pending removal if they have no more active requests
555                    proxy_url = url_metadata.proxied.proxy.proxy_url
556                    if proxy_url in self.proxies_pending_removal:
557                        proxy_obj = self.proxy_pool[proxy_url].proxy
558                        # Check if proxy has truly active requests (not just cooling off)
559                        if not proxy_obj.has_active_requests():
560                            del self.proxy_pool[proxy_url]
561                            if proxy_url in self.proxy_health:
562                                del self.proxy_health[proxy_url]
563                            self.proxies_pending_removal.discard(proxy_url)
564                            self.log.info(f"Removed proxy {proxy_url} (completed all active requests)")
565                    try:
566                        response = url_metadata.proxied.request.result()
567                        # annotate the response so processors can see which
568                        # proxy (if any) handled the request
569                        setattr(
570                            response,
571                            "_4cat_proxy",
572                            url_metadata.proxied.proxy.proxy_url,
573                        )
574                        url_metadata.proxied.result = response
575
576                    except requests.exceptions.ProxyError as e:
577                        # Proxy connection issue - mark proxy as unhealthy and requeue
578                        proxy_url = url_metadata.proxied.proxy.proxy_url
579                        
580                        # Mark this proxy as unhealthy
581                        self.proxy_health[proxy_url] = False
582                        
583                        # Log warning once per proxy
584                        if proxy_url not in self.proxy_warnings_logged:
585                            self.proxy_warnings_logged.add(proxy_url)
586                            self.log.warning(
587                                f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}"
588                            )
589                        
590                        # Reset URL to queued so it will retry with a different proxy
591                        url_metadata.status = self.REQUEST_STATUS_QUEUED
592                        url_metadata.proxied = None
593                        # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow
594                    
595                    except (
596                        ConnectionError,
597                        asyncioCancelledError,
598                        futureCancelledError,
599                        requests.exceptions.RequestException,
600                        urllib3.exceptions.HTTPError,
601                    ) as e:
602                        # this is where timeouts, etc, go
603                        url_metadata.proxied.result = FailedProxiedRequest(
604                            e, url_metadata.proxied.proxy.proxy_url
605                        )
606
607                    finally:
608                        # success or fail, we can pass it on
609                        # Only set to waiting if not requeued by ProxyError handler
610                        if url_metadata.status != self.REQUEST_STATUS_QUEUED:
611                            url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
612
613                else:
614                    # running - ignore for now
615                    # could do some health checks here...
616                    # logging.debug(f"Request for {url} running...")
617                    pass
618
619                if not url_metadata.proxied and not (
620                    queue_name in self.halted or "_" in self.halted
621                ):
622                    # no request running for this URL yet, try to start one
623                    try:
624                        proxy = self.claim_proxy(url)
625                    except NoProxiesAvailableError as e:
626                        # All proxies failed and fallback disabled - fail this request
627                        url_metadata.proxied = namedtuple(
628                            "DelegatedRequest",
629                            ("request", "created", "result", "proxy", "url", "index"),
630                        )
631                        url_metadata.proxied.request = None
632                        url_metadata.proxied.created = time.time()
633                        url_metadata.proxied.result = FailedProxiedRequest(e, None)
634                        url_metadata.proxied.proxy = None
635                        url_metadata.proxied.url = url
636                        url_metadata.proxied.index = url_metadata.index
637                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
638                        self.queue[queue_name][i] = url_metadata
639                        continue
640                    
641                    if proxy is None:
642                        # No proxy available (all busy)
643                        # Try again next loop iteration
644                        continue
645
646                    proxy_url = proxy.proxy_url
647                    proxy_definition = (
648                        {"http": proxy_url, "https": proxy_url}
649                        if proxy_url != self.PROXY_LOCALHOST
650                        else None
651                    )
652
653                    # start request for URL
654                    self.log.debug(f"Request for {url} started")
655                    request = namedtuple(
656                        "DelegatedRequest",
657                        (
658                            "request",
659                            "created",
660                            "result",
661                            "proxy",
662                            "url",
663                            "index",
664                        ),
665                    )
666                    request.created = time.time()
667                    request.request = self.session.get(
668                        **{
669                            "url": url,
670                            "timeout": 30,
671                            "proxies": proxy_definition,
672                            **url_metadata.kwargs
673                        }
674                    )
675
676                    request.proxy = proxy
677                    request.url = url
678                    request.index = (
679                        url_metadata.index
680                    )  # this is to allow for multiple requests for the same URL
681
682                    url_metadata.status = self.REQUEST_STATUS_STARTED
683                    url_metadata.proxied = request
684
685                    proxy.mark_request_started(url)
686
687                self.queue[queue_name][i] = url_metadata
688
689    def get_results(self, queue_name="_", preserve_order=True):
690        """
691        Return available results, without skipping
692
693        Loops through the queue, returning values (and updating the queue) for
694        requests that have been finished. If a request is not finished yet,
695        stop returning. This ensures that in the end, values are only ever
696        returned in the original queue order, at the cost of potential
697        buffering.
698
699        :param str queue_name:  Queue name to get results from
700        :param bool preserve_order:  Return results in the order they were
701        added to the queue. This means that other results are buffered and
702        potentially remain in the queue, which may in the worst case
703        significantly slow down data collection. For example, if the first
704        request in the queue takes a really long time while all other
705        requests are already finished, the queue will nevertheless remain
706        'full'.
707
708        :return:
709        """
710        self.manage_requests()
711
712        # no results, no return
713        if queue_name not in self.queue:
714            return
715
716        # use list comprehensions here to avoid having to modify the
717        # lists while iterating through them
718        for url_metadata in [u for u in self.queue[queue_name]]:
719            # for each URL in the queue...
720            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
721                # see if a finished request is available...
722                self.queue[queue_name].remove(url_metadata)
723                yield url_metadata.url, url_metadata.proxied.result
724
725            elif preserve_order:
726                # ...but as soon as a URL has no finished result, return
727                # unless we don't care about the order, then continue and yield
728                # as much as possible
729                return
730
731    def _halt(self, queue_name="_"):
732        """
733        Interrupt fetching of results
734
735        Can be used when 4CAT is interrupted. Clears queue and tries to cancel
736        running requests.
737
738        Note that running requests *cannot* always be cancelled via `.cancel()`
739        particularly when using `stream=True`. It is therefore recommended to
740        use `halt_and_wait()` which is blocking until all running requests have
741        properly terminated, instead of calling this method directly.
742
743        :param str queue_name:  Queue name to stop fetching results for. By
744        default, halt all queues.
745        """
746        self.halted.add(queue_name)
747
748        for queue in self.queue:
749            if queue_name == "_" or queue_name == queue:
750                # use a list comprehension here to avoid having to modify the
751                # list while iterating through it
752                for url_metadata in [u for u in self.queue[queue]]:
753                    if url_metadata.status != self.REQUEST_STATUS_STARTED:
754                        self.queue[queue].remove(url_metadata)
755                    else:
756                        url_metadata.proxied.request.cancel()
757
758        self.halted.remove(queue_name)
759
760    def halt_and_wait(self, queue_name="_"):
761        """
762        Cancel any queued requests and wait until ongoing ones are finished
763
764        Blocking!
765
766        :param str queue_name:  Queue name to stop fetching results for. By
767        default, halt all queues.
768        """
769        self._halt(queue_name)
770        while self.get_queue_length(queue_name) > 0:
771            # exhaust generator without doing something w/ results
772            all(self.get_results(queue_name, preserve_order=False))
773
774        if queue_name in self.queue:
775            del self.queue[queue_name]
class FailedProxiedRequest:
14class FailedProxiedRequest:
15    """
16    A delegated request that has failed for whatever reason
17
18    The failure context (usually the exception) is stored in the `context`
19    property. We also keep track of the proxy URL that serviced the request so
20    downstream consumers can make informed retry decisions.
21    """
22
23    context = None
24    proxy_url = None
25
26    def __init__(self, context=None, proxy_url=None):
27        self.context = context
28        self.proxy_url = proxy_url

A delegated request that has failed for whatever reason

The failure context (usually the exception) is stored in the context property. We also keep track of the proxy URL that serviced the request so downstream consumers can make informed retry decisions.

FailedProxiedRequest(context=None, proxy_url=None)
26    def __init__(self, context=None, proxy_url=None):
27        self.context = context
28        self.proxy_url = proxy_url
context = None
proxy_url = None
class NoProxiesAvailableError(builtins.Exception):
31class NoProxiesAvailableError(Exception):
32    """
33    Raised when all proxies are unhealthy and localhost fallback is disabled
34    """
35    pass

Raised when all proxies are unhealthy and localhost fallback is disabled

class ProxyStatus:
38class ProxyStatus:
39    """
40    An enum of possible statuses of a SophisticatedFuturesProxy
41    """
42
43    AVAILABLE = 3
44    CLAIMED = 4
45    RUNNING = 5
46    COOLING_OFF = 6

An enum of possible statuses of a SophisticatedFuturesProxy

AVAILABLE = 3
CLAIMED = 4
RUNNING = 5
COOLING_OFF = 6
class SophisticatedFuturesProxy:
 49class SophisticatedFuturesProxy:
 50    """
 51    A proxy that can be used in combination with the DelegatedRequestHandler
 52
 53    This keeps track of cooloffs, etc, to ensure that any individual proxy does
 54    not request more often than it should. This is a separate class because of
 55    an additional piece of logic that allows this cooloff to be kept track of
 56    on a per-hostname basis. This is useful because rate limits are typically
 57    enforced per site, so we can have (figuratively) unlimited concurrent
 58    request as long as each is on a separate hostname, but need to be more
 59    careful when requesting from a single host.
 60    """
 61
 62    log = None
 63    looping = True
 64
 65    COOLOFF = 0
 66    MAX_CONCURRENT_OVERALL = 0
 67    MAX_CONCURRENT_PER_HOST = 0
 68
 69    def __init__(
 70        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
 71    ):
 72        self.proxy_url = url
 73        self.hostnames = {}
 74        self.log = log
 75
 76        self.COOLOFF = cooloff
 77        self.MAX_CONCURRENT_OVERALL = concurrent_overall
 78        self.MAX_CONCURRENT_PER_HOST = concurrent_host
 79
 80    def know_hostname(self, url):
 81        """
 82        Make sure the hostname is known to this proxy
 83
 84        This means that we can now keep track of some per-hostname statistics
 85        for this hostname. If the hostname is not known yet, the statistics are
 86        re-initialised.
 87
 88        :param str url:  URL with host name to keep stats for. Case-insensitive.
 89        :param str:  The host name, as parsed for internal use
 90        """
 91        hostname = ural.get_hostname(url).lower()
 92        if hostname not in self.hostnames:
 93            self.hostnames[hostname] = namedtuple(
 94                "HostnameForProxiedRequests", ("running",)
 95            )
 96            self.hostnames[hostname].running = []
 97
 98        return hostname
 99
100    def has_active_requests(self):
101        """
102        Check if this proxy has any truly active requests.
103        
104        Active means CLAIMED or RUNNING status - not COOLING_OFF.
105        COOLING_OFF requests are essentially finished and waiting to be released,
106        so they shouldn't prevent proxy removal.
107        
108        :return bool: True if proxy has requests that are claimed or running
109        """
110        for hostname, metadata in self.hostnames.values():
111            for request in metadata.running:
112                if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING):
113                    return True
114        return False
115    
116    def release_cooled_off(self):
117        """
118        Release proxies that have finished cooling off.
119
120        Proxies cool off for a certain amount of time after starting a request.
121        This method removes cooled off requests, so that new ones may fill
122        their slot.
123        """
124        for hostname, metadata in self.hostnames.copy().items():
125            for request in metadata.running:
126                if (
127                    request.status == ProxyStatus.COOLING_OFF
128                    and request.timestamp_finished < time.time() - self.COOLOFF
129                ):
130                    self.log.debug(
131                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
132                    )
133                    self.hostnames[hostname].running.remove(request)
134
135                    # get rid of hostnames with no running or cooling off
136                    # requests, else this might grow indefinitely
137                    if len(self.hostnames[hostname].running) == 0:
138                        del self.hostnames[hostname]
139
140    def claim_for(self, url):
141        """
142        Try claiming a slot in this proxy for the given URL
143
144        Whether a slot is available depends both on the overall concurrency
145        limit, and the per-hostname limit. If both are not maxed out, fill
146        the slot and return the proxy object.
147
148        :param str url:  URL to proxy a request for.
149        :return: `False` if no proxy is available, or the
150        `SophisticatedFuturesProxy` object if one is.
151        """
152        self.release_cooled_off()
153        hostname = self.know_hostname(url)
154
155        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
156        if total_running >= self.MAX_CONCURRENT_OVERALL:
157            return False
158
159        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
160            request = namedtuple(
161                "ProxiedRequest",
162                ("url", "status", "timestamp_started", "timestamp_finished"),
163            )
164            request.url = url
165            request.status = ProxyStatus.CLAIMED
166            request.timestamp_started = 0
167            request.timestamp_finished = 0
168            self.hostnames[hostname].running.append(request)
169            self.log.debug(
170                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
171            )
172            return self
173        else:
174            return False
175
176    def mark_request_started(self, url):
177        """
178        Mark a request for a URL as started
179
180        This updates the status for the related slot. If no matching slot
181        exists that is waiting for a request to start running, a `ValueError`
182        is raised.
183
184        :param str url:  URL of the proxied request.
185        """
186        hostname = self.know_hostname(url)
187
188        for i, metadata in enumerate(self.hostnames[hostname].running):
189            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
190                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
191                self.hostnames[hostname].running[i].timestamp_started = time.time()
192                return
193
194        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
195
196    def mark_request_finished(self, url):
197        """
198        Mark a request for a URL as finished
199
200        This updates the status for the related slot. If no matching slot
201        exists that is waiting for a request to finish, a `ValueError` is
202        raised. After this, the proxy will be marked as cooling off, and is
203        released after cooling off is completed.
204
205        :param str url:  URL of the proxied request.
206        """
207        hostname = self.know_hostname(url)
208
209        for i, metadata in enumerate(self.hostnames[hostname].running):
210            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
211                self.hostnames[hostname].running[i].timestamp_finished = time.time()
212                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
213                return
214
215        raise ValueError(f"No proxy is currently running a request for URL {url}!")

A proxy that can be used in combination with the DelegatedRequestHandler

This keeps track of cooloffs, etc, to ensure that any individual proxy does not request more often than it should. This is a separate class because of an additional piece of logic that allows this cooloff to be kept track of on a per-hostname basis. This is useful because rate limits are typically enforced per site, so we can have (figuratively) unlimited concurrent request as long as each is on a separate hostname, but need to be more careful when requesting from a single host.

SophisticatedFuturesProxy(url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2)
69    def __init__(
70        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
71    ):
72        self.proxy_url = url
73        self.hostnames = {}
74        self.log = log
75
76        self.COOLOFF = cooloff
77        self.MAX_CONCURRENT_OVERALL = concurrent_overall
78        self.MAX_CONCURRENT_PER_HOST = concurrent_host
log = None
looping = True
COOLOFF = 0
MAX_CONCURRENT_OVERALL = 0
MAX_CONCURRENT_PER_HOST = 0
proxy_url
hostnames
def know_hostname(self, url):
80    def know_hostname(self, url):
81        """
82        Make sure the hostname is known to this proxy
83
84        This means that we can now keep track of some per-hostname statistics
85        for this hostname. If the hostname is not known yet, the statistics are
86        re-initialised.
87
88        :param str url:  URL with host name to keep stats for. Case-insensitive.
89        :param str:  The host name, as parsed for internal use
90        """
91        hostname = ural.get_hostname(url).lower()
92        if hostname not in self.hostnames:
93            self.hostnames[hostname] = namedtuple(
94                "HostnameForProxiedRequests", ("running",)
95            )
96            self.hostnames[hostname].running = []
97
98        return hostname

Make sure the hostname is known to this proxy

This means that we can now keep track of some per-hostname statistics for this hostname. If the hostname is not known yet, the statistics are re-initialised.

Parameters
  • str url: URL with host name to keep stats for. Case-insensitive.
  • str: The host name, as parsed for internal use
def has_active_requests(self):
100    def has_active_requests(self):
101        """
102        Check if this proxy has any truly active requests.
103        
104        Active means CLAIMED or RUNNING status - not COOLING_OFF.
105        COOLING_OFF requests are essentially finished and waiting to be released,
106        so they shouldn't prevent proxy removal.
107        
108        :return bool: True if proxy has requests that are claimed or running
109        """
110        for hostname, metadata in self.hostnames.values():
111            for request in metadata.running:
112                if request.status in (ProxyStatus.CLAIMED, ProxyStatus.RUNNING):
113                    return True
114        return False

Check if this proxy has any truly active requests.

Active means CLAIMED or RUNNING status - not COOLING_OFF. COOLING_OFF requests are essentially finished and waiting to be released, so they shouldn't prevent proxy removal.

Returns

True if proxy has requests that are claimed or running

def release_cooled_off(self):
116    def release_cooled_off(self):
117        """
118        Release proxies that have finished cooling off.
119
120        Proxies cool off for a certain amount of time after starting a request.
121        This method removes cooled off requests, so that new ones may fill
122        their slot.
123        """
124        for hostname, metadata in self.hostnames.copy().items():
125            for request in metadata.running:
126                if (
127                    request.status == ProxyStatus.COOLING_OFF
128                    and request.timestamp_finished < time.time() - self.COOLOFF
129                ):
130                    self.log.debug(
131                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
132                    )
133                    self.hostnames[hostname].running.remove(request)
134
135                    # get rid of hostnames with no running or cooling off
136                    # requests, else this might grow indefinitely
137                    if len(self.hostnames[hostname].running) == 0:
138                        del self.hostnames[hostname]

Release proxies that have finished cooling off.

Proxies cool off for a certain amount of time after starting a request. This method removes cooled off requests, so that new ones may fill their slot.

def claim_for(self, url):
140    def claim_for(self, url):
141        """
142        Try claiming a slot in this proxy for the given URL
143
144        Whether a slot is available depends both on the overall concurrency
145        limit, and the per-hostname limit. If both are not maxed out, fill
146        the slot and return the proxy object.
147
148        :param str url:  URL to proxy a request for.
149        :return: `False` if no proxy is available, or the
150        `SophisticatedFuturesProxy` object if one is.
151        """
152        self.release_cooled_off()
153        hostname = self.know_hostname(url)
154
155        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
156        if total_running >= self.MAX_CONCURRENT_OVERALL:
157            return False
158
159        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
160            request = namedtuple(
161                "ProxiedRequest",
162                ("url", "status", "timestamp_started", "timestamp_finished"),
163            )
164            request.url = url
165            request.status = ProxyStatus.CLAIMED
166            request.timestamp_started = 0
167            request.timestamp_finished = 0
168            self.hostnames[hostname].running.append(request)
169            self.log.debug(
170                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
171            )
172            return self
173        else:
174            return False

Try claiming a slot in this proxy for the given URL

Whether a slot is available depends both on the overall concurrency limit, and the per-hostname limit. If both are not maxed out, fill the slot and return the proxy object.

Parameters
  • str url: URL to proxy a request for.
Returns

False if no proxy is available, or the SophisticatedFuturesProxy object if one is.

def mark_request_started(self, url):
176    def mark_request_started(self, url):
177        """
178        Mark a request for a URL as started
179
180        This updates the status for the related slot. If no matching slot
181        exists that is waiting for a request to start running, a `ValueError`
182        is raised.
183
184        :param str url:  URL of the proxied request.
185        """
186        hostname = self.know_hostname(url)
187
188        for i, metadata in enumerate(self.hostnames[hostname].running):
189            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
190                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
191                self.hostnames[hostname].running[i].timestamp_started = time.time()
192                return
193
194        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")

Mark a request for a URL as started

This updates the status for the related slot. If no matching slot exists that is waiting for a request to start running, a ValueError is raised.

Parameters
  • str url: URL of the proxied request.
def mark_request_finished(self, url):
196    def mark_request_finished(self, url):
197        """
198        Mark a request for a URL as finished
199
200        This updates the status for the related slot. If no matching slot
201        exists that is waiting for a request to finish, a `ValueError` is
202        raised. After this, the proxy will be marked as cooling off, and is
203        released after cooling off is completed.
204
205        :param str url:  URL of the proxied request.
206        """
207        hostname = self.know_hostname(url)
208
209        for i, metadata in enumerate(self.hostnames[hostname].running):
210            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
211                self.hostnames[hostname].running[i].timestamp_finished = time.time()
212                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
213                return
214
215        raise ValueError(f"No proxy is currently running a request for URL {url}!")

Mark a request for a URL as finished

This updates the status for the related slot. If no matching slot exists that is waiting for a request to finish, a ValueError is raised. After this, the proxy will be marked as cooling off, and is released after cooling off is completed.

Parameters
  • str url: URL of the proxied request.
class DelegatedRequestHandler:
218class DelegatedRequestHandler:
219    queue = {}
220    session = None
221    proxy_pool = {}
222    proxy_settings = {}
223    halted = set()
224    log = None
225    index = 0
226
227    # some magic values
228    REQUEST_STATUS_QUEUED = 0
229    REQUEST_STATUS_STARTED = 1
230    REQUEST_STATUS_WAITING_FOR_YIELD = 2
231    PROXY_LOCALHOST = "__localhost__"
232
233    def __init__(self, log, config):
234        pool = ThreadPoolExecutor()
235        self.session = FuturesSession(executor=pool)
236        self.log = log
237        
238        # Proxy health tracking
239        self.proxy_health = {}
240        self.proxy_warnings_logged = set()
241        self.all_proxies_failed_logged = False
242        
243        # Track proxies that should be removed but have active requests
244        self.proxies_pending_removal = set()
245
246        self.refresh_settings(config)
247
248    def refresh_settings(self, config):
249        """
250        Load proxy settings
251
252        This is done on demand, so that we do not need a persistent
253        configuration reader, which could make things complicated in
254        thread-based contexts.
255
256        Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests.
257
258        :param config:  Configuration reader
259        """
260        # Load new settings
261        new_proxy_settings = {
262            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
263                                            "proxies.concurrent-host", "proxies.allow-localhost-fallback")
264        }
265        
266        # Normalize empty config to localhost
267        if not new_proxy_settings["proxies.urls"]:
268            self.log.warning(
269                "No proxies configured (proxies.urls is empty or None). "
270                "Using localhost (direct connection) for all requests. "
271                "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning."
272            )
273            new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST]
274        
275        # Check if proxy URLs have changed
276        proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 
277                             self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"])
278        
279        # Update settings
280        self.proxy_settings = new_proxy_settings
281        
282        # Reset proxy health tracking when settings are refreshed
283        num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]])
284        self.proxy_health = {}
285        self.proxy_warnings_logged = set()
286        self.all_proxies_failed_logged = False
287        
288        if num_proxies_restored > 0:
289            self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored")
290        
291        # Initialize or update proxy pool
292        if not self.proxy_pool:
293            # First initialization
294            self._initialize_proxy_pool()
295        elif proxy_urls_changed:
296            # Settings changed - update pool
297            self._update_proxy_pool()
298
299
300    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
301        """
302        Add URLs to the request queue
303
304        :param urls:  An iterable of URLs.
305        :param queue_name:  Queue name to add to.
306        :param position:  Where in queue to insert; -1 adds to end of queue
307        :param kwargs: Other keyword arguments will be passed on to
308        `requests.get()`
309        """
310        if queue_name in self.halted or "_" in self.halted:
311            # do not add URLs while delegator is shutting down
312            return
313
314        if queue_name not in self.queue:
315            self.queue[queue_name] = []
316
317        for i, url in enumerate(urls):
318            url_metadata = namedtuple(
319                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
320            )
321            url_metadata.url = url
322            # Make a per-URL copy of kwargs to avoid shared mutation across entries
323            per_kwargs = {**kwargs} if kwargs else {}
324            url_metadata.index = self.index
325            url_metadata.proxied = None
326            url_metadata.status = self.REQUEST_STATUS_QUEUED
327            self.index += 1
328
329            # If a response hook is provided, wrap it to inject the original URL
330            try:
331                hooks = per_kwargs.get("hooks")
332                if hooks and "response" in hooks:
333                    # Copy hooks dict to avoid shared mutation
334                    hooks = {**hooks}
335                    original_hook = hooks["response"]
336                    # Support a single callable or a list of callables
337                    def wrap(h, original_url):
338                        def _wrapped(resp, *a, **k):
339                            # Inject FourCAT-specific original URL in kwargs once
340                            if "fourcat_original_url" not in k:
341                                k["fourcat_original_url"] = original_url
342                            return h(resp, *a, **k)
343                        return _wrapped
344
345                    if isinstance(original_hook, list):
346                        hooks["response"] = [wrap(h, url) for h in original_hook]
347                    else:
348                        hooks["response"] = wrap(original_hook, url)
349
350                    # Persist modified hooks back into per-URL kwargs
351                    per_kwargs["hooks"] = hooks
352            except Exception:
353                # If wrapping fails for any reason, proceed without modification
354                pass
355
356            # Assign the isolated kwargs to this metadata entry
357            url_metadata.kwargs = per_kwargs
358
359            if position == -1:
360                self.queue[queue_name].append(url_metadata)
361            else:
362                self.queue[queue_name].insert(position + i, url_metadata)
363
364        self.manage_requests()
365
366    def get_queue_length(self, queue_name="_"):
367        """
368        Get the length, of the queue
369
370        :param str queue_name:  Queue name
371        :return int: Amount of URLs in the queue (regardless of status)
372        """
373        queue_length = 0
374        for queue in self.queue:
375            if queue == queue_name or queue_name == "_":
376                queue_length += len(self.queue[queue_name])
377
378        return queue_length
379
380    def _update_proxy_pool(self):
381        """
382        Update proxy pool when settings change
383        
384        Adds new proxies, removes old ones, and preserves proxies with active requests.
385        Updates settings for existing proxies that remain in the configuration.
386        """
387        new_proxy_urls = set(self.proxy_settings["proxies.urls"])
388        current_proxy_urls = set(self.proxy_pool.keys())
389        
390        # Find proxies to add and remove
391        proxies_to_add = new_proxy_urls - current_proxy_urls
392        proxies_to_remove = current_proxy_urls - new_proxy_urls
393        proxies_to_keep = current_proxy_urls & new_proxy_urls
394        
395        # Remove proxies that are no longer in config
396        for proxy_url in proxies_to_remove:
397            if proxy_url in self.proxy_pool:
398                proxy_obj = self.proxy_pool[proxy_url].proxy
399                # Check if proxy has truly active requests (CLAIMED or RUNNING, not COOLING_OFF)
400                if not proxy_obj.has_active_requests():
401                    # No active requests - remove immediately
402                    del self.proxy_pool[proxy_url]
403                    if proxy_url in self.proxy_health:
404                        del self.proxy_health[proxy_url]
405                    self.proxies_pending_removal.discard(proxy_url)
406                    self.log.info(f"Removed proxy {proxy_url} (no longer in config)")
407                else:
408                    # Has active requests - mark for removal but don't delete yet
409                    self.proxies_pending_removal.add(proxy_url)
410                    self.log.warning(f"Proxy {proxy_url} marked for removal (has active requests, will not accept new requests)")
411        
412        # Update settings for existing proxies
413        for proxy_url in proxies_to_keep:
414            if proxy_url in self.proxy_pool:
415                proxy_obj = self.proxy_pool[proxy_url].proxy
416                proxy_obj.COOLOFF = self.proxy_settings["proxies.cooloff"]
417                proxy_obj.MAX_CONCURRENT_OVERALL = self.proxy_settings["proxies.concurrent-overall"]
418                proxy_obj.MAX_CONCURRENT_PER_HOST = self.proxy_settings["proxies.concurrent-host"]
419        
420        # Add new proxies
421        for proxy_url in proxies_to_add:
422            self._add_proxy_to_pool(proxy_url)
423            self.log.info(f"Added new proxy {proxy_url}")
424        
425        if proxies_to_add or proxies_to_remove:
426            self.log.info(f"Proxy pool updated: {len(self.proxy_pool)} total proxies "
427                         f"({len(proxies_to_add)} added, {len([p for p in proxies_to_remove if p not in self.proxy_pool])} removed)")
428    
429    def _add_proxy_to_pool(self, proxy_url):
430        """
431        Add a proxy to the pool
432        
433        Helper method to avoid duplicating proxy creation logic.
434        
435        :param str proxy_url: The proxy URL to add (or PROXY_LOCALHOST for direct connection)
436        """
437        self.proxy_pool[proxy_url] = namedtuple("ProxyEntry", ("proxy", "last_used"))
438        self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy(
439            proxy_url,
440            self.log,
441            self.proxy_settings.get("proxies.cooloff", 0.1),
442            self.proxy_settings.get("proxies.concurrent-overall", 5),
443            self.proxy_settings.get("proxies.concurrent-host", 2),
444        )
445        self.proxy_pool[proxy_url].last_used = 0
446        self.proxy_health[proxy_url] = True
447    
448    def claim_proxy(self, url):
449        """
450        Find a proxy to do the request with
451
452        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
453
454        :param str url:  URL to proxy a request for
455        :return SophisticatedFuturesProxy or None:
456            - SophisticatedFuturesProxy if a proxy is available
457            - None if all proxies are busy (retry later)
458        :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled
459        """
460        # Get healthy proxies (excluding those pending removal)
461        healthy_proxies = [p for p in self.proxy_pool 
462                          if self.proxy_health.get(p, True)
463                          and p not in self.proxies_pending_removal]
464        
465        # If no healthy proxies, check if we should create localhost
466        if not healthy_proxies:
467            allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True)
468            
469            if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool:
470                # Create localhost as fallback
471                if not self.all_proxies_failed_logged:
472                    self.all_proxies_failed_logged = True
473                    self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).")
474                self._add_proxy_to_pool(self.PROXY_LOCALHOST)
475                self.log.info("Created localhost proxy for direct connections")
476                healthy_proxies = [self.PROXY_LOCALHOST]
477            elif not allow_localhost:
478                # Fallback disabled - fail
479                if not self.all_proxies_failed_logged:
480                    self.all_proxies_failed_logged = True
481                    self.log.error("All proxies are unhealthy and localhost fallback is disabled.")
482                raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled")
483            else:
484                # Localhost exists but unhealthy
485                raise NoProxiesAvailableError("All proxies including localhost are unhealthy")
486        
487        # within the pool, find the least recently used healthy proxy that is available
488        sorted_by_cooloff = sorted(
489            healthy_proxies, key=lambda p: self.proxy_pool[p].last_used
490        )
491        for proxy_id in sorted_by_cooloff:
492            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
493            if claimed_proxy:
494                self.proxy_pool[proxy_id].last_used = time.time()
495                return claimed_proxy
496        
497        # All proxies busy
498        return None
499    
500    def _initialize_proxy_pool(self):
501        """
502        Initialize the proxy pool with configured proxies
503        
504        Called once during refresh_settings(). If no proxies configured,
505        adds localhost. Otherwise adds configured proxies.
506        """
507        proxies = self.proxy_settings.get("proxies.urls", [])
508        
509        # Handle empty/None proxy configuration - add localhost
510        if not proxies:
511            self.log.warning(
512                "No proxies configured (proxies.urls is empty or None). "
513                "Using localhost (direct connection) for all requests. "
514                "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning."
515            )
516            self._add_proxy_to_pool(self.PROXY_LOCALHOST)
517            return
518        
519        # Initialize configured proxies
520        for proxy_url in proxies:
521            self._add_proxy_to_pool(proxy_url)
522
523        self.log.debug(f"Proxy pool initialized with {len(self.proxy_pool)} proxies.")
524
525    def manage_requests(self):
526        """
527        Manage requests asynchronously
528
529        First, make sure proxy status is up to date; then go through the list
530        of queued URLs and see if they have been requested, and release the
531        proxy accordingly. If the URL is not being requested, and a proxy is
532        available, start the request.
533
534        Note that this method does *not* return any requested data. This is
535        done in a separate function, which calls this one before returning any
536        finished requests in the original queue order (`get_results()`).
537        """
538        # go through queue and look at the status of each URL
539        for queue_name in self.queue:
540            for i, url_metadata in enumerate(self.queue[queue_name]):
541                url = url_metadata.url
542
543                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
544                    # waiting to be flushed or passed by `get_result()`
545                    continue
546
547                if url_metadata.proxied and url_metadata.proxied.request.done():
548                    # collect result and buffer it for yielding
549                    # done() here doesn't necessarily mean the request finished
550                    # successfully, just that it has returned - a timed out
551                    # request will also be done()!
552                    self.log.debug(f"Request for {url} finished, collecting result")
553                    url_metadata.proxied.proxy.mark_request_finished(url)
554                    
555                    # Clean up proxies pending removal if they have no more active requests
556                    proxy_url = url_metadata.proxied.proxy.proxy_url
557                    if proxy_url in self.proxies_pending_removal:
558                        proxy_obj = self.proxy_pool[proxy_url].proxy
559                        # Check if proxy has truly active requests (not just cooling off)
560                        if not proxy_obj.has_active_requests():
561                            del self.proxy_pool[proxy_url]
562                            if proxy_url in self.proxy_health:
563                                del self.proxy_health[proxy_url]
564                            self.proxies_pending_removal.discard(proxy_url)
565                            self.log.info(f"Removed proxy {proxy_url} (completed all active requests)")
566                    try:
567                        response = url_metadata.proxied.request.result()
568                        # annotate the response so processors can see which
569                        # proxy (if any) handled the request
570                        setattr(
571                            response,
572                            "_4cat_proxy",
573                            url_metadata.proxied.proxy.proxy_url,
574                        )
575                        url_metadata.proxied.result = response
576
577                    except requests.exceptions.ProxyError as e:
578                        # Proxy connection issue - mark proxy as unhealthy and requeue
579                        proxy_url = url_metadata.proxied.proxy.proxy_url
580                        
581                        # Mark this proxy as unhealthy
582                        self.proxy_health[proxy_url] = False
583                        
584                        # Log warning once per proxy
585                        if proxy_url not in self.proxy_warnings_logged:
586                            self.proxy_warnings_logged.add(proxy_url)
587                            self.log.warning(
588                                f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}"
589                            )
590                        
591                        # Reset URL to queued so it will retry with a different proxy
592                        url_metadata.status = self.REQUEST_STATUS_QUEUED
593                        url_metadata.proxied = None
594                        # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow
595                    
596                    except (
597                        ConnectionError,
598                        asyncioCancelledError,
599                        futureCancelledError,
600                        requests.exceptions.RequestException,
601                        urllib3.exceptions.HTTPError,
602                    ) as e:
603                        # this is where timeouts, etc, go
604                        url_metadata.proxied.result = FailedProxiedRequest(
605                            e, url_metadata.proxied.proxy.proxy_url
606                        )
607
608                    finally:
609                        # success or fail, we can pass it on
610                        # Only set to waiting if not requeued by ProxyError handler
611                        if url_metadata.status != self.REQUEST_STATUS_QUEUED:
612                            url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
613
614                else:
615                    # running - ignore for now
616                    # could do some health checks here...
617                    # logging.debug(f"Request for {url} running...")
618                    pass
619
620                if not url_metadata.proxied and not (
621                    queue_name in self.halted or "_" in self.halted
622                ):
623                    # no request running for this URL yet, try to start one
624                    try:
625                        proxy = self.claim_proxy(url)
626                    except NoProxiesAvailableError as e:
627                        # All proxies failed and fallback disabled - fail this request
628                        url_metadata.proxied = namedtuple(
629                            "DelegatedRequest",
630                            ("request", "created", "result", "proxy", "url", "index"),
631                        )
632                        url_metadata.proxied.request = None
633                        url_metadata.proxied.created = time.time()
634                        url_metadata.proxied.result = FailedProxiedRequest(e, None)
635                        url_metadata.proxied.proxy = None
636                        url_metadata.proxied.url = url
637                        url_metadata.proxied.index = url_metadata.index
638                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
639                        self.queue[queue_name][i] = url_metadata
640                        continue
641                    
642                    if proxy is None:
643                        # No proxy available (all busy)
644                        # Try again next loop iteration
645                        continue
646
647                    proxy_url = proxy.proxy_url
648                    proxy_definition = (
649                        {"http": proxy_url, "https": proxy_url}
650                        if proxy_url != self.PROXY_LOCALHOST
651                        else None
652                    )
653
654                    # start request for URL
655                    self.log.debug(f"Request for {url} started")
656                    request = namedtuple(
657                        "DelegatedRequest",
658                        (
659                            "request",
660                            "created",
661                            "result",
662                            "proxy",
663                            "url",
664                            "index",
665                        ),
666                    )
667                    request.created = time.time()
668                    request.request = self.session.get(
669                        **{
670                            "url": url,
671                            "timeout": 30,
672                            "proxies": proxy_definition,
673                            **url_metadata.kwargs
674                        }
675                    )
676
677                    request.proxy = proxy
678                    request.url = url
679                    request.index = (
680                        url_metadata.index
681                    )  # this is to allow for multiple requests for the same URL
682
683                    url_metadata.status = self.REQUEST_STATUS_STARTED
684                    url_metadata.proxied = request
685
686                    proxy.mark_request_started(url)
687
688                self.queue[queue_name][i] = url_metadata
689
690    def get_results(self, queue_name="_", preserve_order=True):
691        """
692        Return available results, without skipping
693
694        Loops through the queue, returning values (and updating the queue) for
695        requests that have been finished. If a request is not finished yet,
696        stop returning. This ensures that in the end, values are only ever
697        returned in the original queue order, at the cost of potential
698        buffering.
699
700        :param str queue_name:  Queue name to get results from
701        :param bool preserve_order:  Return results in the order they were
702        added to the queue. This means that other results are buffered and
703        potentially remain in the queue, which may in the worst case
704        significantly slow down data collection. For example, if the first
705        request in the queue takes a really long time while all other
706        requests are already finished, the queue will nevertheless remain
707        'full'.
708
709        :return:
710        """
711        self.manage_requests()
712
713        # no results, no return
714        if queue_name not in self.queue:
715            return
716
717        # use list comprehensions here to avoid having to modify the
718        # lists while iterating through them
719        for url_metadata in [u for u in self.queue[queue_name]]:
720            # for each URL in the queue...
721            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
722                # see if a finished request is available...
723                self.queue[queue_name].remove(url_metadata)
724                yield url_metadata.url, url_metadata.proxied.result
725
726            elif preserve_order:
727                # ...but as soon as a URL has no finished result, return
728                # unless we don't care about the order, then continue and yield
729                # as much as possible
730                return
731
732    def _halt(self, queue_name="_"):
733        """
734        Interrupt fetching of results
735
736        Can be used when 4CAT is interrupted. Clears queue and tries to cancel
737        running requests.
738
739        Note that running requests *cannot* always be cancelled via `.cancel()`
740        particularly when using `stream=True`. It is therefore recommended to
741        use `halt_and_wait()` which is blocking until all running requests have
742        properly terminated, instead of calling this method directly.
743
744        :param str queue_name:  Queue name to stop fetching results for. By
745        default, halt all queues.
746        """
747        self.halted.add(queue_name)
748
749        for queue in self.queue:
750            if queue_name == "_" or queue_name == queue:
751                # use a list comprehension here to avoid having to modify the
752                # list while iterating through it
753                for url_metadata in [u for u in self.queue[queue]]:
754                    if url_metadata.status != self.REQUEST_STATUS_STARTED:
755                        self.queue[queue].remove(url_metadata)
756                    else:
757                        url_metadata.proxied.request.cancel()
758
759        self.halted.remove(queue_name)
760
761    def halt_and_wait(self, queue_name="_"):
762        """
763        Cancel any queued requests and wait until ongoing ones are finished
764
765        Blocking!
766
767        :param str queue_name:  Queue name to stop fetching results for. By
768        default, halt all queues.
769        """
770        self._halt(queue_name)
771        while self.get_queue_length(queue_name) > 0:
772            # exhaust generator without doing something w/ results
773            all(self.get_results(queue_name, preserve_order=False))
774
775        if queue_name in self.queue:
776            del self.queue[queue_name]
DelegatedRequestHandler(log, config)
233    def __init__(self, log, config):
234        pool = ThreadPoolExecutor()
235        self.session = FuturesSession(executor=pool)
236        self.log = log
237        
238        # Proxy health tracking
239        self.proxy_health = {}
240        self.proxy_warnings_logged = set()
241        self.all_proxies_failed_logged = False
242        
243        # Track proxies that should be removed but have active requests
244        self.proxies_pending_removal = set()
245
246        self.refresh_settings(config)
queue = {}
session = None
proxy_pool = {}
proxy_settings = {}
halted = set()
log = None
index = 0
REQUEST_STATUS_QUEUED = 0
REQUEST_STATUS_STARTED = 1
REQUEST_STATUS_WAITING_FOR_YIELD = 2
PROXY_LOCALHOST = '__localhost__'
proxy_health
proxy_warnings_logged
all_proxies_failed_logged
proxies_pending_removal
def refresh_settings(self, config):
248    def refresh_settings(self, config):
249        """
250        Load proxy settings
251
252        This is done on demand, so that we do not need a persistent
253        configuration reader, which could make things complicated in
254        thread-based contexts.
255
256        Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests.
257
258        :param config:  Configuration reader
259        """
260        # Load new settings
261        new_proxy_settings = {
262            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
263                                            "proxies.concurrent-host", "proxies.allow-localhost-fallback")
264        }
265        
266        # Normalize empty config to localhost
267        if not new_proxy_settings["proxies.urls"]:
268            self.log.warning(
269                "No proxies configured (proxies.urls is empty or None). "
270                "Using localhost (direct connection) for all requests. "
271                "Set Proxy URLs in setting to [\"__localhost__\"] to disable this warning."
272            )
273            new_proxy_settings["proxies.urls"] = [self.PROXY_LOCALHOST]
274        
275        # Check if proxy URLs have changed
276        proxy_urls_changed = (not hasattr(self, 'proxy_settings') or 
277                             self.proxy_settings.get("proxies.urls") != new_proxy_settings["proxies.urls"])
278        
279        # Update settings
280        self.proxy_settings = new_proxy_settings
281        
282        # Reset proxy health tracking when settings are refreshed
283        num_proxies_restored = len([p for p in self.proxy_health if not self.proxy_health[p]])
284        self.proxy_health = {}
285        self.proxy_warnings_logged = set()
286        self.all_proxies_failed_logged = False
287        
288        if num_proxies_restored > 0:
289            self.log.debug(f"Proxy health reset: {num_proxies_restored} proxies restored")
290        
291        # Initialize or update proxy pool
292        if not self.proxy_pool:
293            # First initialization
294            self._initialize_proxy_pool()
295        elif proxy_urls_changed:
296            # Settings changed - update pool
297            self._update_proxy_pool()

Load proxy settings

This is done on demand, so that we do not need a persistent configuration reader, which could make things complicated in thread-based contexts.

Initializes proxy pool or updates if URLs have changed, preserving proxies with active requests.

Parameters
  • config: Configuration reader
def add_urls(self, urls, queue_name='_', position=-1, **kwargs):
300    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
301        """
302        Add URLs to the request queue
303
304        :param urls:  An iterable of URLs.
305        :param queue_name:  Queue name to add to.
306        :param position:  Where in queue to insert; -1 adds to end of queue
307        :param kwargs: Other keyword arguments will be passed on to
308        `requests.get()`
309        """
310        if queue_name in self.halted or "_" in self.halted:
311            # do not add URLs while delegator is shutting down
312            return
313
314        if queue_name not in self.queue:
315            self.queue[queue_name] = []
316
317        for i, url in enumerate(urls):
318            url_metadata = namedtuple(
319                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
320            )
321            url_metadata.url = url
322            # Make a per-URL copy of kwargs to avoid shared mutation across entries
323            per_kwargs = {**kwargs} if kwargs else {}
324            url_metadata.index = self.index
325            url_metadata.proxied = None
326            url_metadata.status = self.REQUEST_STATUS_QUEUED
327            self.index += 1
328
329            # If a response hook is provided, wrap it to inject the original URL
330            try:
331                hooks = per_kwargs.get("hooks")
332                if hooks and "response" in hooks:
333                    # Copy hooks dict to avoid shared mutation
334                    hooks = {**hooks}
335                    original_hook = hooks["response"]
336                    # Support a single callable or a list of callables
337                    def wrap(h, original_url):
338                        def _wrapped(resp, *a, **k):
339                            # Inject FourCAT-specific original URL in kwargs once
340                            if "fourcat_original_url" not in k:
341                                k["fourcat_original_url"] = original_url
342                            return h(resp, *a, **k)
343                        return _wrapped
344
345                    if isinstance(original_hook, list):
346                        hooks["response"] = [wrap(h, url) for h in original_hook]
347                    else:
348                        hooks["response"] = wrap(original_hook, url)
349
350                    # Persist modified hooks back into per-URL kwargs
351                    per_kwargs["hooks"] = hooks
352            except Exception:
353                # If wrapping fails for any reason, proceed without modification
354                pass
355
356            # Assign the isolated kwargs to this metadata entry
357            url_metadata.kwargs = per_kwargs
358
359            if position == -1:
360                self.queue[queue_name].append(url_metadata)
361            else:
362                self.queue[queue_name].insert(position + i, url_metadata)
363
364        self.manage_requests()

Add URLs to the request queue

Parameters
  • urls: An iterable of URLs.
  • queue_name: Queue name to add to.
  • position: Where in queue to insert; -1 adds to end of queue
  • kwargs: Other keyword arguments will be passed on to requests.get()
def get_queue_length(self, queue_name='_'):
366    def get_queue_length(self, queue_name="_"):
367        """
368        Get the length, of the queue
369
370        :param str queue_name:  Queue name
371        :return int: Amount of URLs in the queue (regardless of status)
372        """
373        queue_length = 0
374        for queue in self.queue:
375            if queue == queue_name or queue_name == "_":
376                queue_length += len(self.queue[queue_name])
377
378        return queue_length

Get the length, of the queue

Parameters
  • str queue_name: Queue name
Returns

Amount of URLs in the queue (regardless of status)

def claim_proxy(self, url):
448    def claim_proxy(self, url):
449        """
450        Find a proxy to do the request with
451
452        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
453
454        :param str url:  URL to proxy a request for
455        :return SophisticatedFuturesProxy or None:
456            - SophisticatedFuturesProxy if a proxy is available
457            - None if all proxies are busy (retry later)
458        :raises NoProxiesAvailableError: If all proxies unhealthy and fallback disabled
459        """
460        # Get healthy proxies (excluding those pending removal)
461        healthy_proxies = [p for p in self.proxy_pool 
462                          if self.proxy_health.get(p, True)
463                          and p not in self.proxies_pending_removal]
464        
465        # If no healthy proxies, check if we should create localhost
466        if not healthy_proxies:
467            allow_localhost = self.proxy_settings.get("proxies.allow-localhost-fallback", True)
468            
469            if allow_localhost and self.PROXY_LOCALHOST not in self.proxy_pool:
470                # Create localhost as fallback
471                if not self.all_proxies_failed_logged:
472                    self.all_proxies_failed_logged = True
473                    self.log.error("All configured proxies are unhealthy. Falling back to localhost (direct connection).")
474                self._add_proxy_to_pool(self.PROXY_LOCALHOST)
475                self.log.info("Created localhost proxy for direct connections")
476                healthy_proxies = [self.PROXY_LOCALHOST]
477            elif not allow_localhost:
478                # Fallback disabled - fail
479                if not self.all_proxies_failed_logged:
480                    self.all_proxies_failed_logged = True
481                    self.log.error("All proxies are unhealthy and localhost fallback is disabled.")
482                raise NoProxiesAvailableError("All proxies unhealthy and localhost fallback disabled")
483            else:
484                # Localhost exists but unhealthy
485                raise NoProxiesAvailableError("All proxies including localhost are unhealthy")
486        
487        # within the pool, find the least recently used healthy proxy that is available
488        sorted_by_cooloff = sorted(
489            healthy_proxies, key=lambda p: self.proxy_pool[p].last_used
490        )
491        for proxy_id in sorted_by_cooloff:
492            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
493            if claimed_proxy:
494                self.proxy_pool[proxy_id].last_used = time.time()
495                return claimed_proxy
496        
497        # All proxies busy
498        return None

Find a proxy to do the request with

Finds a SophisticatedFuturesProxy that has an open slot for this URL.

Parameters
  • str url: URL to proxy a request for
Returns
- SophisticatedFuturesProxy if a proxy is available
- None if all proxies are busy (retry later)
Raises
  • NoProxiesAvailableError: If all proxies unhealthy and fallback disabled
def manage_requests(self):
525    def manage_requests(self):
526        """
527        Manage requests asynchronously
528
529        First, make sure proxy status is up to date; then go through the list
530        of queued URLs and see if they have been requested, and release the
531        proxy accordingly. If the URL is not being requested, and a proxy is
532        available, start the request.
533
534        Note that this method does *not* return any requested data. This is
535        done in a separate function, which calls this one before returning any
536        finished requests in the original queue order (`get_results()`).
537        """
538        # go through queue and look at the status of each URL
539        for queue_name in self.queue:
540            for i, url_metadata in enumerate(self.queue[queue_name]):
541                url = url_metadata.url
542
543                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
544                    # waiting to be flushed or passed by `get_result()`
545                    continue
546
547                if url_metadata.proxied and url_metadata.proxied.request.done():
548                    # collect result and buffer it for yielding
549                    # done() here doesn't necessarily mean the request finished
550                    # successfully, just that it has returned - a timed out
551                    # request will also be done()!
552                    self.log.debug(f"Request for {url} finished, collecting result")
553                    url_metadata.proxied.proxy.mark_request_finished(url)
554                    
555                    # Clean up proxies pending removal if they have no more active requests
556                    proxy_url = url_metadata.proxied.proxy.proxy_url
557                    if proxy_url in self.proxies_pending_removal:
558                        proxy_obj = self.proxy_pool[proxy_url].proxy
559                        # Check if proxy has truly active requests (not just cooling off)
560                        if not proxy_obj.has_active_requests():
561                            del self.proxy_pool[proxy_url]
562                            if proxy_url in self.proxy_health:
563                                del self.proxy_health[proxy_url]
564                            self.proxies_pending_removal.discard(proxy_url)
565                            self.log.info(f"Removed proxy {proxy_url} (completed all active requests)")
566                    try:
567                        response = url_metadata.proxied.request.result()
568                        # annotate the response so processors can see which
569                        # proxy (if any) handled the request
570                        setattr(
571                            response,
572                            "_4cat_proxy",
573                            url_metadata.proxied.proxy.proxy_url,
574                        )
575                        url_metadata.proxied.result = response
576
577                    except requests.exceptions.ProxyError as e:
578                        # Proxy connection issue - mark proxy as unhealthy and requeue
579                        proxy_url = url_metadata.proxied.proxy.proxy_url
580                        
581                        # Mark this proxy as unhealthy
582                        self.proxy_health[proxy_url] = False
583                        
584                        # Log warning once per proxy
585                        if proxy_url not in self.proxy_warnings_logged:
586                            self.proxy_warnings_logged.add(proxy_url)
587                            self.log.warning(
588                                f"Proxy {proxy_url} marked as unhealthy due to connection failure: {str(e)}"
589                            )
590                        
591                        # Reset URL to queued so it will retry with a different proxy
592                        url_metadata.status = self.REQUEST_STATUS_QUEUED
593                        url_metadata.proxied = None
594                        # Don't set to WAITING_FOR_YIELD - let it retry in the normal flow
595                    
596                    except (
597                        ConnectionError,
598                        asyncioCancelledError,
599                        futureCancelledError,
600                        requests.exceptions.RequestException,
601                        urllib3.exceptions.HTTPError,
602                    ) as e:
603                        # this is where timeouts, etc, go
604                        url_metadata.proxied.result = FailedProxiedRequest(
605                            e, url_metadata.proxied.proxy.proxy_url
606                        )
607
608                    finally:
609                        # success or fail, we can pass it on
610                        # Only set to waiting if not requeued by ProxyError handler
611                        if url_metadata.status != self.REQUEST_STATUS_QUEUED:
612                            url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
613
614                else:
615                    # running - ignore for now
616                    # could do some health checks here...
617                    # logging.debug(f"Request for {url} running...")
618                    pass
619
620                if not url_metadata.proxied and not (
621                    queue_name in self.halted or "_" in self.halted
622                ):
623                    # no request running for this URL yet, try to start one
624                    try:
625                        proxy = self.claim_proxy(url)
626                    except NoProxiesAvailableError as e:
627                        # All proxies failed and fallback disabled - fail this request
628                        url_metadata.proxied = namedtuple(
629                            "DelegatedRequest",
630                            ("request", "created", "result", "proxy", "url", "index"),
631                        )
632                        url_metadata.proxied.request = None
633                        url_metadata.proxied.created = time.time()
634                        url_metadata.proxied.result = FailedProxiedRequest(e, None)
635                        url_metadata.proxied.proxy = None
636                        url_metadata.proxied.url = url
637                        url_metadata.proxied.index = url_metadata.index
638                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
639                        self.queue[queue_name][i] = url_metadata
640                        continue
641                    
642                    if proxy is None:
643                        # No proxy available (all busy)
644                        # Try again next loop iteration
645                        continue
646
647                    proxy_url = proxy.proxy_url
648                    proxy_definition = (
649                        {"http": proxy_url, "https": proxy_url}
650                        if proxy_url != self.PROXY_LOCALHOST
651                        else None
652                    )
653
654                    # start request for URL
655                    self.log.debug(f"Request for {url} started")
656                    request = namedtuple(
657                        "DelegatedRequest",
658                        (
659                            "request",
660                            "created",
661                            "result",
662                            "proxy",
663                            "url",
664                            "index",
665                        ),
666                    )
667                    request.created = time.time()
668                    request.request = self.session.get(
669                        **{
670                            "url": url,
671                            "timeout": 30,
672                            "proxies": proxy_definition,
673                            **url_metadata.kwargs
674                        }
675                    )
676
677                    request.proxy = proxy
678                    request.url = url
679                    request.index = (
680                        url_metadata.index
681                    )  # this is to allow for multiple requests for the same URL
682
683                    url_metadata.status = self.REQUEST_STATUS_STARTED
684                    url_metadata.proxied = request
685
686                    proxy.mark_request_started(url)
687
688                self.queue[queue_name][i] = url_metadata

Manage requests asynchronously

First, make sure proxy status is up to date; then go through the list of queued URLs and see if they have been requested, and release the proxy accordingly. If the URL is not being requested, and a proxy is available, start the request.

Note that this method does not return any requested data. This is done in a separate function, which calls this one before returning any finished requests in the original queue order (get_results()).

def get_results(self, queue_name='_', preserve_order=True):
690    def get_results(self, queue_name="_", preserve_order=True):
691        """
692        Return available results, without skipping
693
694        Loops through the queue, returning values (and updating the queue) for
695        requests that have been finished. If a request is not finished yet,
696        stop returning. This ensures that in the end, values are only ever
697        returned in the original queue order, at the cost of potential
698        buffering.
699
700        :param str queue_name:  Queue name to get results from
701        :param bool preserve_order:  Return results in the order they were
702        added to the queue. This means that other results are buffered and
703        potentially remain in the queue, which may in the worst case
704        significantly slow down data collection. For example, if the first
705        request in the queue takes a really long time while all other
706        requests are already finished, the queue will nevertheless remain
707        'full'.
708
709        :return:
710        """
711        self.manage_requests()
712
713        # no results, no return
714        if queue_name not in self.queue:
715            return
716
717        # use list comprehensions here to avoid having to modify the
718        # lists while iterating through them
719        for url_metadata in [u for u in self.queue[queue_name]]:
720            # for each URL in the queue...
721            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
722                # see if a finished request is available...
723                self.queue[queue_name].remove(url_metadata)
724                yield url_metadata.url, url_metadata.proxied.result
725
726            elif preserve_order:
727                # ...but as soon as a URL has no finished result, return
728                # unless we don't care about the order, then continue and yield
729                # as much as possible
730                return

Return available results, without skipping

Loops through the queue, returning values (and updating the queue) for requests that have been finished. If a request is not finished yet, stop returning. This ensures that in the end, values are only ever returned in the original queue order, at the cost of potential buffering.

Parameters
  • str queue_name: Queue name to get results from
  • bool preserve_order: Return results in the order they were added to the queue. This means that other results are buffered and potentially remain in the queue, which may in the worst case significantly slow down data collection. For example, if the first request in the queue takes a really long time while all other requests are already finished, the queue will nevertheless remain 'full'.
Returns
def halt_and_wait(self, queue_name='_'):
761    def halt_and_wait(self, queue_name="_"):
762        """
763        Cancel any queued requests and wait until ongoing ones are finished
764
765        Blocking!
766
767        :param str queue_name:  Queue name to stop fetching results for. By
768        default, halt all queues.
769        """
770        self._halt(queue_name)
771        while self.get_queue_length(queue_name) > 0:
772            # exhaust generator without doing something w/ results
773            all(self.get_results(queue_name, preserve_order=False))
774
775        if queue_name in self.queue:
776            del self.queue[queue_name]

Cancel any queued requests and wait until ongoing ones are finished

Blocking!

Parameters
  • str queue_name: Queue name to stop fetching results for. By default, halt all queues.