Edit on GitHub

backend.lib.proxied_requests

  1from requests_futures.sessions import FuturesSession
  2from concurrent.futures import ThreadPoolExecutor
  3
  4import time
  5import urllib3
  6import ural
  7import requests
  8from collections import namedtuple
  9
 10
 11class FailedProxiedRequest:
 12    """
 13    A delegated request that has failed for whatever reason
 14
 15    The failure context (usually the exception) is stored in the `context`
 16    property.
 17    """
 18
 19    context = None
 20
 21    def __init__(self, context=None):
 22        self.context = context
 23
 24
 25class ProxyStatus:
 26    """
 27    An enum of possible statuses of a SophisticatedFuturesProxy
 28    """
 29
 30    AVAILABLE = 3
 31    CLAIMED = 4
 32    RUNNING = 5
 33    COOLING_OFF = 6
 34
 35
 36class SophisticatedFuturesProxy:
 37    """
 38    A proxy that can be used in combination with the DelegatedRequestHandler
 39
 40    This keeps track of cooloffs, etc, to ensure that any individual proxy does
 41    not request more often than it should. This is a separate class because of
 42    an additional piece of logic that allows this cooloff to be kept track of
 43    on a per-hostname basis. This is useful because rate limits are typically
 44    enforced per site, so we can have (figuratively) unlimited concurrent
 45    request as long as each is on a separate hostname, but need to be more
 46    careful when requesting from a single host.
 47    """
 48
 49    log = None
 50    looping = True
 51
 52    COOLOFF = 0
 53    MAX_CONCURRENT_OVERALL = 0
 54    MAX_CONCURRENT_PER_HOST = 0
 55
 56    def __init__(
 57        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
 58    ):
 59        self.proxy_url = url
 60        self.hostnames = {}
 61        self.log = log
 62
 63        self.COOLOFF = cooloff
 64        self.MAX_CONCURRENT_OVERALL = concurrent_overall
 65        self.MAX_CONCURRENT_PER_HOST = concurrent_host
 66
 67    def know_hostname(self, url):
 68        """
 69        Make sure the hostname is known to this proxy
 70
 71        This means that we can now keep track of some per-hostname statistics
 72        for this hostname. If the hostname is not known yet, the statistics are
 73        re-initialised.
 74
 75        :param str url:  URL with host name to keep stats for. Case-insensitive.
 76        :param str:  The host name, as parsed for internal use
 77        """
 78        hostname = ural.get_hostname(url).lower()
 79        if hostname not in self.hostnames:
 80            self.hostnames[hostname] = namedtuple(
 81                "HostnameForProxiedRequests", ("running",)
 82            )
 83            self.hostnames[hostname].running = []
 84
 85        return hostname
 86
 87    def release_cooled_off(self):
 88        """
 89        Release proxies that have finished cooling off.
 90
 91        Proxies cool off for a certain amount of time after starting a request.
 92        This method removes cooled off requests, so that new ones may fill
 93        their slot.
 94        """
 95        for hostname, metadata in self.hostnames.copy().items():
 96            for request in metadata.running:
 97                if (
 98                    request.status == ProxyStatus.COOLING_OFF
 99                    and request.timestamp_finished < time.time() - self.COOLOFF
100                ):
101                    self.log.debug(
102                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
103                    )
104                    self.hostnames[hostname].running.remove(request)
105
106                    # get rid of hostnames with no running or cooling off
107                    # requests, else this might grow indefinitely
108                    if len(self.hostnames[hostname].running) == 0:
109                        del self.hostnames[hostname]
110
111    def claim_for(self, url):
112        """
113        Try claiming a slot in this proxy for the given URL
114
115        Whether a slot is available depends both on the overall concurrency
116        limit, and the per-hostname limit. If both are not maxed out, fill
117        the slot and return the proxy object.
118
119        :param str url:  URL to proxy a request for.
120        :return: `False` if no proxy is available, or the
121        `SophisticatedFuturesProxy` object if one is.
122        """
123        self.release_cooled_off()
124        hostname = self.know_hostname(url)
125
126        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
127        if total_running >= self.MAX_CONCURRENT_OVERALL:
128            return False
129
130        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
131            request = namedtuple(
132                "ProxiedRequest",
133                ("url", "status", "timestamp_started", "timestamp_finished"),
134            )
135            request.url = url
136            request.status = ProxyStatus.CLAIMED
137            request.timestamp_started = 0
138            request.timestamp_finished = 0
139            self.hostnames[hostname].running.append(request)
140            self.log.debug(
141                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
142            )
143            return self
144        else:
145            return False
146
147    def mark_request_started(self, url):
148        """
149        Mark a request for a URL as started
150
151        This updates the status for the related slot. If no matching slot
152        exists that is waiting for a request to start running, a `ValueError`
153        is raised.
154
155        :param str url:  URL of the proxied request.
156        """
157        hostname = self.know_hostname(url)
158
159        for i, metadata in enumerate(self.hostnames[hostname].running):
160            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
161                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
162                self.hostnames[hostname].running[i].timestamp_started = time.time()
163                return
164
165        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
166
167    def mark_request_finished(self, url):
168        """
169        Mark a request for a URL as finished
170
171        This updates the status for the related slot. If no matching slot
172        exists that is waiting for a request to finish, a `ValueError` is
173        raised. After this, the proxy will be marked as cooling off, and is
174        released after cooling off is completed.
175
176        :param str url:  URL of the proxied request.
177        """
178        hostname = self.know_hostname(url)
179
180        for i, metadata in enumerate(self.hostnames[hostname].running):
181            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
182                self.hostnames[hostname].running[i].timestamp_finished = time.time()
183                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
184                return
185
186        raise ValueError(f"No proxy is currently running a request for URL {url}!")
187
188
189class DelegatedRequestHandler:
190    queue = {}
191    session = None
192    proxy_pool = {}
193    proxy_settings = {}
194    halted = set()
195    log = None
196    index = 0
197
198    # some magic values
199    REQUEST_STATUS_QUEUED = 0
200    REQUEST_STATUS_STARTED = 1
201    REQUEST_STATUS_WAITING_FOR_YIELD = 2
202    PROXY_LOCALHOST = "__localhost__"
203
204    def __init__(self, log, config):
205        pool = ThreadPoolExecutor()
206        self.session = FuturesSession(executor=pool)
207        self.log = log
208
209        self.refresh_settings(config)
210
211    def refresh_settings(self, config):
212        """
213        Load proxy settings
214
215        This is done on demand, so that we do not need a persistent
216        configuration reader, which could make things complicated in
217        thread-based contexts.
218
219        :param config:  Configuration reader
220        """
221
222        self.proxy_settings = {
223            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
224                                            "proxies.concurrent-host", "proxies.concurrent-host")
225        }
226
227    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
228        """
229        Add URLs to the request queue
230
231        :param urls:  An iterable of URLs.
232        :param queue_name:  Queue name to add to.
233        :param position:  Where in queue to insert; -1 adds to end of queue
234        :param kwargs: Other keyword arguments will be passed on to
235        `requests.get()`
236        """
237        if queue_name in self.halted or "_" in self.halted:
238            # do not add URLs while delegator is shutting down
239            return
240
241        if queue_name not in self.queue:
242            self.queue[queue_name] = []
243
244        for i, url in enumerate(urls):
245            url_metadata = namedtuple(
246                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
247            )
248            url_metadata.url = url
249            url_metadata.kwargs = kwargs
250            url_metadata.index = self.index
251            url_metadata.proxied = None
252            url_metadata.status = self.REQUEST_STATUS_QUEUED
253            self.index += 1
254
255            if position == -1:
256                self.queue[queue_name].append(url_metadata)
257            else:
258                self.queue[queue_name].insert(position + i, url_metadata)
259
260        self.manage_requests()
261
262    def get_queue_length(self, queue_name="_"):
263        """
264        Get the length, of the queue
265
266        :param str queue_name:  Queue name
267        :return int: Amount of URLs in the queue (regardless of status)
268        """
269        queue_length = 0
270        for queue in self.queue:
271            if queue == queue_name or queue_name == "_":
272                queue_length += len(self.queue[queue_name])
273
274        return queue_length
275
276    def claim_proxy(self, url):
277        """
278        Find a proxy to do the request with
279
280        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
281
282        :param str url:  URL to proxy a request for
283        :return SophisticatedFuturesProxy or False:
284        """
285        if not self.proxy_pool:
286            # this will trigger the first time this method is called
287            # build a proxy pool with some information per available proxy
288            proxies = self.proxy_settings["proxies.urls"]
289            for proxy_url in proxies:
290                self.proxy_pool[proxy_url] = namedtuple(
291                    "ProxyEntry", ("proxy", "last_used")
292                )
293                self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy(
294                    proxy_url,
295                    self.log,
296                    self.proxy_settings["proxies.cooloff"],
297                    self.proxy_settings["proxies.concurrent-overall"],
298                    self.proxy_settings["proxies.concurrent-host"],
299                )
300                self.proxy_pool[proxy_url].last_used = 0
301
302            self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.")
303
304        # within the pool, find the least recently used proxy that is available
305        sorted_by_cooloff = sorted(
306            self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used
307        )
308        for proxy_id in sorted_by_cooloff:
309            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
310            if claimed_proxy:
311                self.proxy_pool[proxy_id].last_used = time.time()
312                return claimed_proxy
313
314        return False
315
316    def manage_requests(self):
317        """
318        Manage requests asynchronously
319
320        First, make sure proxy status is up to date; then go through the list
321        of queued URLs and see if they have been requested, and release the
322        proxy accordingly. If the URL is not being requested, and a proxy is
323        available, start the request.
324
325        Note that this method does *not* return any requested data. This is
326        done in a separate function, which calls this one before returning any
327        finished requests in the original queue order (`get_results()`).
328        """
329        # go through queue and look at the status of each URL
330        for queue_name in self.queue:
331            for i, url_metadata in enumerate(self.queue[queue_name]):
332                url = url_metadata.url
333
334                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
335                    # waiting to be flushed or passed by `get_result()`
336                    continue
337
338                if url_metadata.proxied and url_metadata.proxied.request.done():
339                    # collect result and buffer it for yielding
340                    # done() here doesn't necessarily mean the request finished
341                    # successfully, just that it has returned - a timed out
342                    # request will also be done()!
343                    self.log.debug(f"Request for {url} finished, collecting result")
344                    url_metadata.proxied.proxy.mark_request_finished(url)
345                    try:
346                        response = url_metadata.proxied.request.result()
347                        url_metadata.proxied.result = response
348
349                    except (
350                        ConnectionError,
351                        requests.exceptions.RequestException,
352                        urllib3.exceptions.HTTPError,
353                    ) as e:
354                        # this is where timeouts, etc, go
355                        url_metadata.proxied.result = FailedProxiedRequest(e)
356
357                    finally:
358                        # success or fail, we can pass it on
359                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
360
361                else:
362                    # running - ignore for now
363                    # could do some health checks here...
364                    # logging.debug(f"Request for {url} running...")
365                    pass
366
367                if not url_metadata.proxied and not (
368                    queue_name in self.halted or "_" in self.halted
369                ):
370                    # no request running for this URL yet, try to start one
371                    proxy = self.claim_proxy(url)
372                    if not proxy:
373                        # no available proxies, try again next loop
374                        continue
375
376                    proxy_url = proxy.proxy_url
377                    proxy_definition = (
378                        {"http": proxy_url, "https": proxy_url}
379                        if proxy_url != self.PROXY_LOCALHOST
380                        else None
381                    )
382
383                    # start request for URL
384                    self.log.debug(f"Request for {url} started")
385                    request = namedtuple(
386                        "DelegatedRequest",
387                        (
388                            "request",
389                            "created",
390                            "result",
391                            "proxy",
392                            "url",
393                            "index",
394                        ),
395                    )
396                    request.created = time.time()
397                    request.request = self.session.get(
398                        **{
399                            "url": url,
400                            "timeout": 30,
401                            "proxies": proxy_definition,
402                            **url_metadata.kwargs
403                        }
404                    )
405
406                    request.proxy = proxy
407                    request.url = url
408                    request.index = (
409                        url_metadata.index
410                    )  # this is to allow for multiple requests for the same URL
411
412                    url_metadata.status = self.REQUEST_STATUS_STARTED
413                    url_metadata.proxied = request
414
415                    proxy.mark_request_started(url)
416
417                self.queue[queue_name][i] = url_metadata
418
419    def get_results(self, queue_name="_", preserve_order=True):
420        """
421        Return available results, without skipping
422
423        Loops through the queue, returning values (and updating the queue) for
424        requests that have been finished. If a request is not finished yet,
425        stop returning. This ensures that in the end, values are only ever
426        returned in the original queue order, at the cost of potential
427        buffering.
428
429        :param str queue_name:  Queue name to get results from
430        :param bool preserve_order:  Return results in the order they were
431        added to the queue. This means that other results are buffered and
432        potentially remain in the queue, which may in the worst case
433        significantly slow down data collection. For example, if the first
434        request in the queue takes a really long time while all other
435        requests are already finished, the queue will nevertheless remain
436        'full'.
437
438        :return:
439        """
440        self.manage_requests()
441
442        # no results, no return
443        if queue_name not in self.queue:
444            return
445
446        # use list comprehensions here to avoid having to modify the
447        # lists while iterating through them
448        for url_metadata in [u for u in self.queue[queue_name]]:
449            # for each URL in the queue...
450            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
451                # see if a finished request is available...
452                self.queue[queue_name].remove(url_metadata)
453                yield url_metadata.url, url_metadata.proxied.result
454
455            elif preserve_order:
456                # ...but as soon as a URL has no finished result, return
457                # unless we don't care about the order, then continue and yield
458                # as much as possible
459                return
460
461    def _halt(self, queue_name="_"):
462        """
463        Interrupt fetching of results
464
465        Can be used when 4CAT is interrupted. Clears queue and tries to cancel
466        running requests.
467
468        Note that running requests *cannot* always be cancelled via `.cancel()`
469        particularly when using `stream=True`. It is therefore recommended to
470        use `halt_and_wait()` which is blocking until all running requests have
471        properly terminated, instead of calling this method directly.
472
473        :param str queue_name:  Queue name to stop fetching results for. By
474        default, halt all queues.
475        """
476        self.halted.add(queue_name)
477
478        for queue in self.queue:
479            if queue_name == "_" or queue_name == queue:
480                # use a list comprehension here to avoid having to modify the
481                # list while iterating through it
482                for url_metadata in [u for u in self.queue[queue]]:
483                    if url_metadata.status != self.REQUEST_STATUS_STARTED:
484                        self.queue[queue].remove(url_metadata)
485                    else:
486                        url_metadata.proxied.request.cancel()
487
488        self.halted.remove(queue_name)
489
490    def halt_and_wait(self, queue_name="_"):
491        """
492        Cancel any queued requests and wait until ongoing ones are finished
493
494        Blocking!
495
496        :param str queue_name:  Queue name to stop fetching results for. By
497        default, halt all queues.
498        """
499        self._halt(queue_name)
500        while self.get_queue_length(queue_name) > 0:
501            # exhaust generator without doing something w/ results
502            all(self.get_results(queue_name, preserve_order=False))
503
504        if queue_name in self.queue:
505            del self.queue[queue_name]
class FailedProxiedRequest:
12class FailedProxiedRequest:
13    """
14    A delegated request that has failed for whatever reason
15
16    The failure context (usually the exception) is stored in the `context`
17    property.
18    """
19
20    context = None
21
22    def __init__(self, context=None):
23        self.context = context

A delegated request that has failed for whatever reason

The failure context (usually the exception) is stored in the context property.

FailedProxiedRequest(context=None)
22    def __init__(self, context=None):
23        self.context = context
context = None
class ProxyStatus:
26class ProxyStatus:
27    """
28    An enum of possible statuses of a SophisticatedFuturesProxy
29    """
30
31    AVAILABLE = 3
32    CLAIMED = 4
33    RUNNING = 5
34    COOLING_OFF = 6

An enum of possible statuses of a SophisticatedFuturesProxy

AVAILABLE = 3
CLAIMED = 4
RUNNING = 5
COOLING_OFF = 6
class SophisticatedFuturesProxy:
 37class SophisticatedFuturesProxy:
 38    """
 39    A proxy that can be used in combination with the DelegatedRequestHandler
 40
 41    This keeps track of cooloffs, etc, to ensure that any individual proxy does
 42    not request more often than it should. This is a separate class because of
 43    an additional piece of logic that allows this cooloff to be kept track of
 44    on a per-hostname basis. This is useful because rate limits are typically
 45    enforced per site, so we can have (figuratively) unlimited concurrent
 46    request as long as each is on a separate hostname, but need to be more
 47    careful when requesting from a single host.
 48    """
 49
 50    log = None
 51    looping = True
 52
 53    COOLOFF = 0
 54    MAX_CONCURRENT_OVERALL = 0
 55    MAX_CONCURRENT_PER_HOST = 0
 56
 57    def __init__(
 58        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
 59    ):
 60        self.proxy_url = url
 61        self.hostnames = {}
 62        self.log = log
 63
 64        self.COOLOFF = cooloff
 65        self.MAX_CONCURRENT_OVERALL = concurrent_overall
 66        self.MAX_CONCURRENT_PER_HOST = concurrent_host
 67
 68    def know_hostname(self, url):
 69        """
 70        Make sure the hostname is known to this proxy
 71
 72        This means that we can now keep track of some per-hostname statistics
 73        for this hostname. If the hostname is not known yet, the statistics are
 74        re-initialised.
 75
 76        :param str url:  URL with host name to keep stats for. Case-insensitive.
 77        :param str:  The host name, as parsed for internal use
 78        """
 79        hostname = ural.get_hostname(url).lower()
 80        if hostname not in self.hostnames:
 81            self.hostnames[hostname] = namedtuple(
 82                "HostnameForProxiedRequests", ("running",)
 83            )
 84            self.hostnames[hostname].running = []
 85
 86        return hostname
 87
 88    def release_cooled_off(self):
 89        """
 90        Release proxies that have finished cooling off.
 91
 92        Proxies cool off for a certain amount of time after starting a request.
 93        This method removes cooled off requests, so that new ones may fill
 94        their slot.
 95        """
 96        for hostname, metadata in self.hostnames.copy().items():
 97            for request in metadata.running:
 98                if (
 99                    request.status == ProxyStatus.COOLING_OFF
100                    and request.timestamp_finished < time.time() - self.COOLOFF
101                ):
102                    self.log.debug(
103                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
104                    )
105                    self.hostnames[hostname].running.remove(request)
106
107                    # get rid of hostnames with no running or cooling off
108                    # requests, else this might grow indefinitely
109                    if len(self.hostnames[hostname].running) == 0:
110                        del self.hostnames[hostname]
111
112    def claim_for(self, url):
113        """
114        Try claiming a slot in this proxy for the given URL
115
116        Whether a slot is available depends both on the overall concurrency
117        limit, and the per-hostname limit. If both are not maxed out, fill
118        the slot and return the proxy object.
119
120        :param str url:  URL to proxy a request for.
121        :return: `False` if no proxy is available, or the
122        `SophisticatedFuturesProxy` object if one is.
123        """
124        self.release_cooled_off()
125        hostname = self.know_hostname(url)
126
127        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
128        if total_running >= self.MAX_CONCURRENT_OVERALL:
129            return False
130
131        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
132            request = namedtuple(
133                "ProxiedRequest",
134                ("url", "status", "timestamp_started", "timestamp_finished"),
135            )
136            request.url = url
137            request.status = ProxyStatus.CLAIMED
138            request.timestamp_started = 0
139            request.timestamp_finished = 0
140            self.hostnames[hostname].running.append(request)
141            self.log.debug(
142                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
143            )
144            return self
145        else:
146            return False
147
148    def mark_request_started(self, url):
149        """
150        Mark a request for a URL as started
151
152        This updates the status for the related slot. If no matching slot
153        exists that is waiting for a request to start running, a `ValueError`
154        is raised.
155
156        :param str url:  URL of the proxied request.
157        """
158        hostname = self.know_hostname(url)
159
160        for i, metadata in enumerate(self.hostnames[hostname].running):
161            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
162                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
163                self.hostnames[hostname].running[i].timestamp_started = time.time()
164                return
165
166        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")
167
168    def mark_request_finished(self, url):
169        """
170        Mark a request for a URL as finished
171
172        This updates the status for the related slot. If no matching slot
173        exists that is waiting for a request to finish, a `ValueError` is
174        raised. After this, the proxy will be marked as cooling off, and is
175        released after cooling off is completed.
176
177        :param str url:  URL of the proxied request.
178        """
179        hostname = self.know_hostname(url)
180
181        for i, metadata in enumerate(self.hostnames[hostname].running):
182            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
183                self.hostnames[hostname].running[i].timestamp_finished = time.time()
184                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
185                return
186
187        raise ValueError(f"No proxy is currently running a request for URL {url}!")

A proxy that can be used in combination with the DelegatedRequestHandler

This keeps track of cooloffs, etc, to ensure that any individual proxy does not request more often than it should. This is a separate class because of an additional piece of logic that allows this cooloff to be kept track of on a per-hostname basis. This is useful because rate limits are typically enforced per site, so we can have (figuratively) unlimited concurrent request as long as each is on a separate hostname, but need to be more careful when requesting from a single host.

SophisticatedFuturesProxy(url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2)
57    def __init__(
58        self, url, log=None, cooloff=3, concurrent_overall=5, concurrent_host=2
59    ):
60        self.proxy_url = url
61        self.hostnames = {}
62        self.log = log
63
64        self.COOLOFF = cooloff
65        self.MAX_CONCURRENT_OVERALL = concurrent_overall
66        self.MAX_CONCURRENT_PER_HOST = concurrent_host
log = None
looping = True
COOLOFF = 0
MAX_CONCURRENT_OVERALL = 0
MAX_CONCURRENT_PER_HOST = 0
proxy_url
hostnames
def know_hostname(self, url):
68    def know_hostname(self, url):
69        """
70        Make sure the hostname is known to this proxy
71
72        This means that we can now keep track of some per-hostname statistics
73        for this hostname. If the hostname is not known yet, the statistics are
74        re-initialised.
75
76        :param str url:  URL with host name to keep stats for. Case-insensitive.
77        :param str:  The host name, as parsed for internal use
78        """
79        hostname = ural.get_hostname(url).lower()
80        if hostname not in self.hostnames:
81            self.hostnames[hostname] = namedtuple(
82                "HostnameForProxiedRequests", ("running",)
83            )
84            self.hostnames[hostname].running = []
85
86        return hostname

Make sure the hostname is known to this proxy

This means that we can now keep track of some per-hostname statistics for this hostname. If the hostname is not known yet, the statistics are re-initialised.

Parameters
  • str url: URL with host name to keep stats for. Case-insensitive.
  • str: The host name, as parsed for internal use
def release_cooled_off(self):
 88    def release_cooled_off(self):
 89        """
 90        Release proxies that have finished cooling off.
 91
 92        Proxies cool off for a certain amount of time after starting a request.
 93        This method removes cooled off requests, so that new ones may fill
 94        their slot.
 95        """
 96        for hostname, metadata in self.hostnames.copy().items():
 97            for request in metadata.running:
 98                if (
 99                    request.status == ProxyStatus.COOLING_OFF
100                    and request.timestamp_finished < time.time() - self.COOLOFF
101                ):
102                    self.log.debug(
103                        f"Releasing proxy {self.proxy_url} for host name {hostname}"
104                    )
105                    self.hostnames[hostname].running.remove(request)
106
107                    # get rid of hostnames with no running or cooling off
108                    # requests, else this might grow indefinitely
109                    if len(self.hostnames[hostname].running) == 0:
110                        del self.hostnames[hostname]

Release proxies that have finished cooling off.

Proxies cool off for a certain amount of time after starting a request. This method removes cooled off requests, so that new ones may fill their slot.

def claim_for(self, url):
112    def claim_for(self, url):
113        """
114        Try claiming a slot in this proxy for the given URL
115
116        Whether a slot is available depends both on the overall concurrency
117        limit, and the per-hostname limit. If both are not maxed out, fill
118        the slot and return the proxy object.
119
120        :param str url:  URL to proxy a request for.
121        :return: `False` if no proxy is available, or the
122        `SophisticatedFuturesProxy` object if one is.
123        """
124        self.release_cooled_off()
125        hostname = self.know_hostname(url)
126
127        total_running = sum([len(m.running) for h, m in self.hostnames.items()])
128        if total_running >= self.MAX_CONCURRENT_OVERALL:
129            return False
130
131        if len(self.hostnames[hostname].running) < self.MAX_CONCURRENT_PER_HOST:
132            request = namedtuple(
133                "ProxiedRequest",
134                ("url", "status", "timestamp_started", "timestamp_finished"),
135            )
136            request.url = url
137            request.status = ProxyStatus.CLAIMED
138            request.timestamp_started = 0
139            request.timestamp_finished = 0
140            self.hostnames[hostname].running.append(request)
141            self.log.debug(
142                f"Claiming proxy {self.proxy_url} for host name {hostname} ({len(self.hostnames[hostname].running)} of {self.MAX_CONCURRENT_PER_HOST} for host)"
143            )
144            return self
145        else:
146            return False

Try claiming a slot in this proxy for the given URL

Whether a slot is available depends both on the overall concurrency limit, and the per-hostname limit. If both are not maxed out, fill the slot and return the proxy object.

Parameters
  • str url: URL to proxy a request for.
Returns

False if no proxy is available, or the SophisticatedFuturesProxy object if one is.

def mark_request_started(self, url):
148    def mark_request_started(self, url):
149        """
150        Mark a request for a URL as started
151
152        This updates the status for the related slot. If no matching slot
153        exists that is waiting for a request to start running, a `ValueError`
154        is raised.
155
156        :param str url:  URL of the proxied request.
157        """
158        hostname = self.know_hostname(url)
159
160        for i, metadata in enumerate(self.hostnames[hostname].running):
161            if metadata.status == ProxyStatus.CLAIMED and metadata.url == url:
162                self.hostnames[hostname].running[i].status = ProxyStatus.RUNNING
163                self.hostnames[hostname].running[i].timestamp_started = time.time()
164                return
165
166        raise ValueError(f"No proxy is waiting for a request with URL {url} to start!")

Mark a request for a URL as started

This updates the status for the related slot. If no matching slot exists that is waiting for a request to start running, a ValueError is raised.

Parameters
  • str url: URL of the proxied request.
def mark_request_finished(self, url):
168    def mark_request_finished(self, url):
169        """
170        Mark a request for a URL as finished
171
172        This updates the status for the related slot. If no matching slot
173        exists that is waiting for a request to finish, a `ValueError` is
174        raised. After this, the proxy will be marked as cooling off, and is
175        released after cooling off is completed.
176
177        :param str url:  URL of the proxied request.
178        """
179        hostname = self.know_hostname(url)
180
181        for i, metadata in enumerate(self.hostnames[hostname].running):
182            if metadata.status == ProxyStatus.RUNNING and metadata.url == url:
183                self.hostnames[hostname].running[i].timestamp_finished = time.time()
184                self.hostnames[hostname].running[i].status = ProxyStatus.COOLING_OFF
185                return
186
187        raise ValueError(f"No proxy is currently running a request for URL {url}!")

Mark a request for a URL as finished

This updates the status for the related slot. If no matching slot exists that is waiting for a request to finish, a ValueError is raised. After this, the proxy will be marked as cooling off, and is released after cooling off is completed.

Parameters
  • str url: URL of the proxied request.
class DelegatedRequestHandler:
190class DelegatedRequestHandler:
191    queue = {}
192    session = None
193    proxy_pool = {}
194    proxy_settings = {}
195    halted = set()
196    log = None
197    index = 0
198
199    # some magic values
200    REQUEST_STATUS_QUEUED = 0
201    REQUEST_STATUS_STARTED = 1
202    REQUEST_STATUS_WAITING_FOR_YIELD = 2
203    PROXY_LOCALHOST = "__localhost__"
204
205    def __init__(self, log, config):
206        pool = ThreadPoolExecutor()
207        self.session = FuturesSession(executor=pool)
208        self.log = log
209
210        self.refresh_settings(config)
211
212    def refresh_settings(self, config):
213        """
214        Load proxy settings
215
216        This is done on demand, so that we do not need a persistent
217        configuration reader, which could make things complicated in
218        thread-based contexts.
219
220        :param config:  Configuration reader
221        """
222
223        self.proxy_settings = {
224            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
225                                            "proxies.concurrent-host", "proxies.concurrent-host")
226        }
227
228    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
229        """
230        Add URLs to the request queue
231
232        :param urls:  An iterable of URLs.
233        :param queue_name:  Queue name to add to.
234        :param position:  Where in queue to insert; -1 adds to end of queue
235        :param kwargs: Other keyword arguments will be passed on to
236        `requests.get()`
237        """
238        if queue_name in self.halted or "_" in self.halted:
239            # do not add URLs while delegator is shutting down
240            return
241
242        if queue_name not in self.queue:
243            self.queue[queue_name] = []
244
245        for i, url in enumerate(urls):
246            url_metadata = namedtuple(
247                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
248            )
249            url_metadata.url = url
250            url_metadata.kwargs = kwargs
251            url_metadata.index = self.index
252            url_metadata.proxied = None
253            url_metadata.status = self.REQUEST_STATUS_QUEUED
254            self.index += 1
255
256            if position == -1:
257                self.queue[queue_name].append(url_metadata)
258            else:
259                self.queue[queue_name].insert(position + i, url_metadata)
260
261        self.manage_requests()
262
263    def get_queue_length(self, queue_name="_"):
264        """
265        Get the length, of the queue
266
267        :param str queue_name:  Queue name
268        :return int: Amount of URLs in the queue (regardless of status)
269        """
270        queue_length = 0
271        for queue in self.queue:
272            if queue == queue_name or queue_name == "_":
273                queue_length += len(self.queue[queue_name])
274
275        return queue_length
276
277    def claim_proxy(self, url):
278        """
279        Find a proxy to do the request with
280
281        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
282
283        :param str url:  URL to proxy a request for
284        :return SophisticatedFuturesProxy or False:
285        """
286        if not self.proxy_pool:
287            # this will trigger the first time this method is called
288            # build a proxy pool with some information per available proxy
289            proxies = self.proxy_settings["proxies.urls"]
290            for proxy_url in proxies:
291                self.proxy_pool[proxy_url] = namedtuple(
292                    "ProxyEntry", ("proxy", "last_used")
293                )
294                self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy(
295                    proxy_url,
296                    self.log,
297                    self.proxy_settings["proxies.cooloff"],
298                    self.proxy_settings["proxies.concurrent-overall"],
299                    self.proxy_settings["proxies.concurrent-host"],
300                )
301                self.proxy_pool[proxy_url].last_used = 0
302
303            self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.")
304
305        # within the pool, find the least recently used proxy that is available
306        sorted_by_cooloff = sorted(
307            self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used
308        )
309        for proxy_id in sorted_by_cooloff:
310            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
311            if claimed_proxy:
312                self.proxy_pool[proxy_id].last_used = time.time()
313                return claimed_proxy
314
315        return False
316
317    def manage_requests(self):
318        """
319        Manage requests asynchronously
320
321        First, make sure proxy status is up to date; then go through the list
322        of queued URLs and see if they have been requested, and release the
323        proxy accordingly. If the URL is not being requested, and a proxy is
324        available, start the request.
325
326        Note that this method does *not* return any requested data. This is
327        done in a separate function, which calls this one before returning any
328        finished requests in the original queue order (`get_results()`).
329        """
330        # go through queue and look at the status of each URL
331        for queue_name in self.queue:
332            for i, url_metadata in enumerate(self.queue[queue_name]):
333                url = url_metadata.url
334
335                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
336                    # waiting to be flushed or passed by `get_result()`
337                    continue
338
339                if url_metadata.proxied and url_metadata.proxied.request.done():
340                    # collect result and buffer it for yielding
341                    # done() here doesn't necessarily mean the request finished
342                    # successfully, just that it has returned - a timed out
343                    # request will also be done()!
344                    self.log.debug(f"Request for {url} finished, collecting result")
345                    url_metadata.proxied.proxy.mark_request_finished(url)
346                    try:
347                        response = url_metadata.proxied.request.result()
348                        url_metadata.proxied.result = response
349
350                    except (
351                        ConnectionError,
352                        requests.exceptions.RequestException,
353                        urllib3.exceptions.HTTPError,
354                    ) as e:
355                        # this is where timeouts, etc, go
356                        url_metadata.proxied.result = FailedProxiedRequest(e)
357
358                    finally:
359                        # success or fail, we can pass it on
360                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
361
362                else:
363                    # running - ignore for now
364                    # could do some health checks here...
365                    # logging.debug(f"Request for {url} running...")
366                    pass
367
368                if not url_metadata.proxied and not (
369                    queue_name in self.halted or "_" in self.halted
370                ):
371                    # no request running for this URL yet, try to start one
372                    proxy = self.claim_proxy(url)
373                    if not proxy:
374                        # no available proxies, try again next loop
375                        continue
376
377                    proxy_url = proxy.proxy_url
378                    proxy_definition = (
379                        {"http": proxy_url, "https": proxy_url}
380                        if proxy_url != self.PROXY_LOCALHOST
381                        else None
382                    )
383
384                    # start request for URL
385                    self.log.debug(f"Request for {url} started")
386                    request = namedtuple(
387                        "DelegatedRequest",
388                        (
389                            "request",
390                            "created",
391                            "result",
392                            "proxy",
393                            "url",
394                            "index",
395                        ),
396                    )
397                    request.created = time.time()
398                    request.request = self.session.get(
399                        **{
400                            "url": url,
401                            "timeout": 30,
402                            "proxies": proxy_definition,
403                            **url_metadata.kwargs
404                        }
405                    )
406
407                    request.proxy = proxy
408                    request.url = url
409                    request.index = (
410                        url_metadata.index
411                    )  # this is to allow for multiple requests for the same URL
412
413                    url_metadata.status = self.REQUEST_STATUS_STARTED
414                    url_metadata.proxied = request
415
416                    proxy.mark_request_started(url)
417
418                self.queue[queue_name][i] = url_metadata
419
420    def get_results(self, queue_name="_", preserve_order=True):
421        """
422        Return available results, without skipping
423
424        Loops through the queue, returning values (and updating the queue) for
425        requests that have been finished. If a request is not finished yet,
426        stop returning. This ensures that in the end, values are only ever
427        returned in the original queue order, at the cost of potential
428        buffering.
429
430        :param str queue_name:  Queue name to get results from
431        :param bool preserve_order:  Return results in the order they were
432        added to the queue. This means that other results are buffered and
433        potentially remain in the queue, which may in the worst case
434        significantly slow down data collection. For example, if the first
435        request in the queue takes a really long time while all other
436        requests are already finished, the queue will nevertheless remain
437        'full'.
438
439        :return:
440        """
441        self.manage_requests()
442
443        # no results, no return
444        if queue_name not in self.queue:
445            return
446
447        # use list comprehensions here to avoid having to modify the
448        # lists while iterating through them
449        for url_metadata in [u for u in self.queue[queue_name]]:
450            # for each URL in the queue...
451            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
452                # see if a finished request is available...
453                self.queue[queue_name].remove(url_metadata)
454                yield url_metadata.url, url_metadata.proxied.result
455
456            elif preserve_order:
457                # ...but as soon as a URL has no finished result, return
458                # unless we don't care about the order, then continue and yield
459                # as much as possible
460                return
461
462    def _halt(self, queue_name="_"):
463        """
464        Interrupt fetching of results
465
466        Can be used when 4CAT is interrupted. Clears queue and tries to cancel
467        running requests.
468
469        Note that running requests *cannot* always be cancelled via `.cancel()`
470        particularly when using `stream=True`. It is therefore recommended to
471        use `halt_and_wait()` which is blocking until all running requests have
472        properly terminated, instead of calling this method directly.
473
474        :param str queue_name:  Queue name to stop fetching results for. By
475        default, halt all queues.
476        """
477        self.halted.add(queue_name)
478
479        for queue in self.queue:
480            if queue_name == "_" or queue_name == queue:
481                # use a list comprehension here to avoid having to modify the
482                # list while iterating through it
483                for url_metadata in [u for u in self.queue[queue]]:
484                    if url_metadata.status != self.REQUEST_STATUS_STARTED:
485                        self.queue[queue].remove(url_metadata)
486                    else:
487                        url_metadata.proxied.request.cancel()
488
489        self.halted.remove(queue_name)
490
491    def halt_and_wait(self, queue_name="_"):
492        """
493        Cancel any queued requests and wait until ongoing ones are finished
494
495        Blocking!
496
497        :param str queue_name:  Queue name to stop fetching results for. By
498        default, halt all queues.
499        """
500        self._halt(queue_name)
501        while self.get_queue_length(queue_name) > 0:
502            # exhaust generator without doing something w/ results
503            all(self.get_results(queue_name, preserve_order=False))
504
505        if queue_name in self.queue:
506            del self.queue[queue_name]
DelegatedRequestHandler(log, config)
205    def __init__(self, log, config):
206        pool = ThreadPoolExecutor()
207        self.session = FuturesSession(executor=pool)
208        self.log = log
209
210        self.refresh_settings(config)
queue = {}
session = None
proxy_pool = {}
proxy_settings = {}
halted = set()
log = None
index = 0
REQUEST_STATUS_QUEUED = 0
REQUEST_STATUS_STARTED = 1
REQUEST_STATUS_WAITING_FOR_YIELD = 2
PROXY_LOCALHOST = '__localhost__'
def refresh_settings(self, config):
212    def refresh_settings(self, config):
213        """
214        Load proxy settings
215
216        This is done on demand, so that we do not need a persistent
217        configuration reader, which could make things complicated in
218        thread-based contexts.
219
220        :param config:  Configuration reader
221        """
222
223        self.proxy_settings = {
224            k: config.get(k) for k in ("proxies.urls", "proxies.cooloff", "proxies.concurrent-overall",
225                                            "proxies.concurrent-host", "proxies.concurrent-host")
226        }

Load proxy settings

This is done on demand, so that we do not need a persistent configuration reader, which could make things complicated in thread-based contexts.

Parameters
  • config: Configuration reader
def add_urls(self, urls, queue_name='_', position=-1, **kwargs):
228    def add_urls(self, urls, queue_name="_", position=-1, **kwargs):
229        """
230        Add URLs to the request queue
231
232        :param urls:  An iterable of URLs.
233        :param queue_name:  Queue name to add to.
234        :param position:  Where in queue to insert; -1 adds to end of queue
235        :param kwargs: Other keyword arguments will be passed on to
236        `requests.get()`
237        """
238        if queue_name in self.halted or "_" in self.halted:
239            # do not add URLs while delegator is shutting down
240            return
241
242        if queue_name not in self.queue:
243            self.queue[queue_name] = []
244
245        for i, url in enumerate(urls):
246            url_metadata = namedtuple(
247                "UrlForDelegatedRequest", ("url", "args", "status", "proxied")
248            )
249            url_metadata.url = url
250            url_metadata.kwargs = kwargs
251            url_metadata.index = self.index
252            url_metadata.proxied = None
253            url_metadata.status = self.REQUEST_STATUS_QUEUED
254            self.index += 1
255
256            if position == -1:
257                self.queue[queue_name].append(url_metadata)
258            else:
259                self.queue[queue_name].insert(position + i, url_metadata)
260
261        self.manage_requests()

Add URLs to the request queue

Parameters
  • urls: An iterable of URLs.
  • queue_name: Queue name to add to.
  • position: Where in queue to insert; -1 adds to end of queue
  • kwargs: Other keyword arguments will be passed on to requests.get()
def get_queue_length(self, queue_name='_'):
263    def get_queue_length(self, queue_name="_"):
264        """
265        Get the length, of the queue
266
267        :param str queue_name:  Queue name
268        :return int: Amount of URLs in the queue (regardless of status)
269        """
270        queue_length = 0
271        for queue in self.queue:
272            if queue == queue_name or queue_name == "_":
273                queue_length += len(self.queue[queue_name])
274
275        return queue_length

Get the length, of the queue

Parameters
  • str queue_name: Queue name
Returns

Amount of URLs in the queue (regardless of status)

def claim_proxy(self, url):
277    def claim_proxy(self, url):
278        """
279        Find a proxy to do the request with
280
281        Finds a `SophisticatedFuturesProxy` that has an open slot for this URL.
282
283        :param str url:  URL to proxy a request for
284        :return SophisticatedFuturesProxy or False:
285        """
286        if not self.proxy_pool:
287            # this will trigger the first time this method is called
288            # build a proxy pool with some information per available proxy
289            proxies = self.proxy_settings["proxies.urls"]
290            for proxy_url in proxies:
291                self.proxy_pool[proxy_url] = namedtuple(
292                    "ProxyEntry", ("proxy", "last_used")
293                )
294                self.proxy_pool[proxy_url].proxy = SophisticatedFuturesProxy(
295                    proxy_url,
296                    self.log,
297                    self.proxy_settings["proxies.cooloff"],
298                    self.proxy_settings["proxies.concurrent-overall"],
299                    self.proxy_settings["proxies.concurrent-host"],
300                )
301                self.proxy_pool[proxy_url].last_used = 0
302
303            self.log.debug(f"Proxy pool has {len(self.proxy_pool)} available proxies.")
304
305        # within the pool, find the least recently used proxy that is available
306        sorted_by_cooloff = sorted(
307            self.proxy_pool, key=lambda p: self.proxy_pool[p].last_used
308        )
309        for proxy_id in sorted_by_cooloff:
310            claimed_proxy = self.proxy_pool[proxy_id].proxy.claim_for(url)
311            if claimed_proxy:
312                self.proxy_pool[proxy_id].last_used = time.time()
313                return claimed_proxy
314
315        return False

Find a proxy to do the request with

Finds a SophisticatedFuturesProxy that has an open slot for this URL.

Parameters
  • str url: URL to proxy a request for
Returns
def manage_requests(self):
317    def manage_requests(self):
318        """
319        Manage requests asynchronously
320
321        First, make sure proxy status is up to date; then go through the list
322        of queued URLs and see if they have been requested, and release the
323        proxy accordingly. If the URL is not being requested, and a proxy is
324        available, start the request.
325
326        Note that this method does *not* return any requested data. This is
327        done in a separate function, which calls this one before returning any
328        finished requests in the original queue order (`get_results()`).
329        """
330        # go through queue and look at the status of each URL
331        for queue_name in self.queue:
332            for i, url_metadata in enumerate(self.queue[queue_name]):
333                url = url_metadata.url
334
335                if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
336                    # waiting to be flushed or passed by `get_result()`
337                    continue
338
339                if url_metadata.proxied and url_metadata.proxied.request.done():
340                    # collect result and buffer it for yielding
341                    # done() here doesn't necessarily mean the request finished
342                    # successfully, just that it has returned - a timed out
343                    # request will also be done()!
344                    self.log.debug(f"Request for {url} finished, collecting result")
345                    url_metadata.proxied.proxy.mark_request_finished(url)
346                    try:
347                        response = url_metadata.proxied.request.result()
348                        url_metadata.proxied.result = response
349
350                    except (
351                        ConnectionError,
352                        requests.exceptions.RequestException,
353                        urllib3.exceptions.HTTPError,
354                    ) as e:
355                        # this is where timeouts, etc, go
356                        url_metadata.proxied.result = FailedProxiedRequest(e)
357
358                    finally:
359                        # success or fail, we can pass it on
360                        url_metadata.status = self.REQUEST_STATUS_WAITING_FOR_YIELD
361
362                else:
363                    # running - ignore for now
364                    # could do some health checks here...
365                    # logging.debug(f"Request for {url} running...")
366                    pass
367
368                if not url_metadata.proxied and not (
369                    queue_name in self.halted or "_" in self.halted
370                ):
371                    # no request running for this URL yet, try to start one
372                    proxy = self.claim_proxy(url)
373                    if not proxy:
374                        # no available proxies, try again next loop
375                        continue
376
377                    proxy_url = proxy.proxy_url
378                    proxy_definition = (
379                        {"http": proxy_url, "https": proxy_url}
380                        if proxy_url != self.PROXY_LOCALHOST
381                        else None
382                    )
383
384                    # start request for URL
385                    self.log.debug(f"Request for {url} started")
386                    request = namedtuple(
387                        "DelegatedRequest",
388                        (
389                            "request",
390                            "created",
391                            "result",
392                            "proxy",
393                            "url",
394                            "index",
395                        ),
396                    )
397                    request.created = time.time()
398                    request.request = self.session.get(
399                        **{
400                            "url": url,
401                            "timeout": 30,
402                            "proxies": proxy_definition,
403                            **url_metadata.kwargs
404                        }
405                    )
406
407                    request.proxy = proxy
408                    request.url = url
409                    request.index = (
410                        url_metadata.index
411                    )  # this is to allow for multiple requests for the same URL
412
413                    url_metadata.status = self.REQUEST_STATUS_STARTED
414                    url_metadata.proxied = request
415
416                    proxy.mark_request_started(url)
417
418                self.queue[queue_name][i] = url_metadata

Manage requests asynchronously

First, make sure proxy status is up to date; then go through the list of queued URLs and see if they have been requested, and release the proxy accordingly. If the URL is not being requested, and a proxy is available, start the request.

Note that this method does not return any requested data. This is done in a separate function, which calls this one before returning any finished requests in the original queue order (get_results()).

def get_results(self, queue_name='_', preserve_order=True):
420    def get_results(self, queue_name="_", preserve_order=True):
421        """
422        Return available results, without skipping
423
424        Loops through the queue, returning values (and updating the queue) for
425        requests that have been finished. If a request is not finished yet,
426        stop returning. This ensures that in the end, values are only ever
427        returned in the original queue order, at the cost of potential
428        buffering.
429
430        :param str queue_name:  Queue name to get results from
431        :param bool preserve_order:  Return results in the order they were
432        added to the queue. This means that other results are buffered and
433        potentially remain in the queue, which may in the worst case
434        significantly slow down data collection. For example, if the first
435        request in the queue takes a really long time while all other
436        requests are already finished, the queue will nevertheless remain
437        'full'.
438
439        :return:
440        """
441        self.manage_requests()
442
443        # no results, no return
444        if queue_name not in self.queue:
445            return
446
447        # use list comprehensions here to avoid having to modify the
448        # lists while iterating through them
449        for url_metadata in [u for u in self.queue[queue_name]]:
450            # for each URL in the queue...
451            if url_metadata.status == self.REQUEST_STATUS_WAITING_FOR_YIELD:
452                # see if a finished request is available...
453                self.queue[queue_name].remove(url_metadata)
454                yield url_metadata.url, url_metadata.proxied.result
455
456            elif preserve_order:
457                # ...but as soon as a URL has no finished result, return
458                # unless we don't care about the order, then continue and yield
459                # as much as possible
460                return

Return available results, without skipping

Loops through the queue, returning values (and updating the queue) for requests that have been finished. If a request is not finished yet, stop returning. This ensures that in the end, values are only ever returned in the original queue order, at the cost of potential buffering.

Parameters
  • str queue_name: Queue name to get results from
  • bool preserve_order: Return results in the order they were added to the queue. This means that other results are buffered and potentially remain in the queue, which may in the worst case significantly slow down data collection. For example, if the first request in the queue takes a really long time while all other requests are already finished, the queue will nevertheless remain 'full'.
Returns
def halt_and_wait(self, queue_name='_'):
491    def halt_and_wait(self, queue_name="_"):
492        """
493        Cancel any queued requests and wait until ongoing ones are finished
494
495        Blocking!
496
497        :param str queue_name:  Queue name to stop fetching results for. By
498        default, halt all queues.
499        """
500        self._halt(queue_name)
501        while self.get_queue_length(queue_name) > 0:
502            # exhaust generator without doing something w/ results
503            all(self.get_results(queue_name, preserve_order=False))
504
505        if queue_name in self.queue:
506            del self.queue[queue_name]

Cancel any queued requests and wait until ongoing ones are finished

Blocking!

Parameters
  • str queue_name: Queue name to stop fetching results for. By default, halt all queues.