Edit on GitHub

datasources.dmi-tcat.search_tcat

Twitter search within a DMI-TCAT bin; connect via TCAT frontend

  1"""
  2Twitter search within a DMI-TCAT bin; connect via TCAT frontend
  3"""
  4import requests
  5import datetime
  6import csv
  7import json
  8import re
  9import io
 10
 11from backend.lib.search import Search
 12from common.lib.exceptions import QueryParametersException
 13from common.lib.user_input import UserInput
 14from common.lib.helpers import sniff_encoding
 15from common.lib.item_mapping import MappedItem
 16
 17from datasources.twitterv2.search_twitter import SearchWithTwitterAPIv2
 18
 19
 20class SearchWithinTCATBins(Search):
 21    """
 22    Get Tweets via DMI-TCAT
 23
 24    This allows subsetting an existing query bin, similar to the 'Data
 25    Selection' panel in the DMI-TCAT analysis interface
 26    """
 27    type = "dmi-tcat-search"  # job ID
 28    extension = "ndjson"
 29    title = "TCAT Search (HTTP)"
 30
 31    # TCAT has a few fields that do not exist in APIv2
 32    additional_TCAT_fields = ["to_user_name", "filter_level", "favorite_count", "truncated", "from_user_favourites_count", "from_user_lang", "from_user_utcoffset",
 33                              "from_user_timezone"]
 34
 35    options = {
 36        "intro-1": {
 37            "type": UserInput.OPTION_INFO,
 38            "help": "This data source interfaces with a DMI-TCAT instance to allow subsetting of tweets from a tweet "
 39                    "bin in that instance."
 40        },
 41        "divider-1": {
 42            "type": UserInput.OPTION_DIVIDER
 43        },
 44        "bin": {
 45            "type": UserInput.OPTION_INFO,
 46            "help": "Query bin"
 47        },
 48        "query": {
 49            "type": UserInput.OPTION_TEXT,
 50            "help": "Query text",
 51            "tooltip": "Match all tweets containing this text."
 52        },
 53        "query-exclude": {
 54            "type": UserInput.OPTION_TEXT,
 55            "help": "Exclude text",
 56            "tooltip": "Match all tweets that do NOT contain this text."
 57        },
 58        "user-name": {
 59            "type": UserInput.OPTION_TEXT,
 60            "help": "From user",
 61            "tooltip": "Match all tweets from this username."
 62        },
 63        "user-exclude": {
 64            "type": UserInput.OPTION_TEXT,
 65            "help": "Exclude user",
 66            "tooltip": "Match all tweets NOT from this username."
 67        },
 68        "exclude-replies": {
 69            "type": UserInput.OPTION_CHOICE,
 70            "options": {
 71                "exclude": "Exclude replies",
 72                "include": "Include replies"
 73            },
 74            "help": "Reply tweets",
 75            "default": "include",
 76            "tooltip": "Choose to exclude or include tweets that are replies from the data"
 77        },
 78        "daterange": {
 79            "type": UserInput.OPTION_DATERANGE,
 80            "help": "Date range"
 81        },
 82        # Advanced Options Section
 83        "divider-2": {
 84            "type": UserInput.OPTION_DIVIDER
 85        },
 86        "advanced_options_info": {
 87            "type": UserInput.OPTION_INFO,
 88            "help": "Advanced Query Options can further refine your query"
 89        },
 90        "user-bio": {
 91            "type": UserInput.OPTION_TEXT,
 92            "help": "User bio text",
 93            "tooltip": "Match all tweets from users with biographies containing this text."
 94        },
 95        "user-language": {
 96            "type": UserInput.OPTION_TEXT,
 97            "help": "User language",
 98            "tooltip": "Match all tweets from users using this language (as detected by Twitter)."
 99        },
100        "tweet-language": {
101            "type": UserInput.OPTION_TEXT,
102            "help": "Tweet language",
103            "tooltip": "Match all tweets from users with this language (as detected by Twitter)."
104        },
105        "tweet-client": {
106            "type": UserInput.OPTION_TEXT,
107            "help": "Twitter client URL/descr",
108            "tooltip": "Match all tweets from clients that match this text."
109        },
110        "url": {
111            "type": UserInput.OPTION_TEXT,
112            "help": "(Part of) URL",
113            "tooltip": "Match all tweets containing this (partial) URL."
114        },
115        "url-media": {
116            "type": UserInput.OPTION_TEXT,
117            "help": "(Part of) media URL",
118            "tooltip": "Match all tweets containing this (partial) media URL."
119        },
120    }
121
122    config = {
123        "dmi-tcat-search.instances": {
124            "type": UserInput.OPTION_TEXT_JSON,
125            "help": "DMI-TCAT instances",
126            "tooltip": 'List of DMI-TCAT instance URLs, e.g. ["http://username:password@tcat.instance.webpage.net"]. '
127                       'This  needs to be formatted as a JSON list of strings.',
128            "default": {}
129        }
130    }
131
132    bin_data = {
133        "all_bins": {},
134        "last_collected": {},
135    }
136
137    @classmethod
138    def collect_all_bins(cls, config, force_update=False):
139        """
140        Requests bin information from TCAT instances
141        """
142        instances = config.get("dmi-tcat-search.instances", [])
143        for instance in instances:
144            # query each configured TCAT instance for a list of bins that can
145            # be subsetted
146            instance = instance.rstrip("/")
147            api_url = instance + "/api/bin-stats.php"
148
149            if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]:
150                # Collect Instance data
151                try:
152                    api_request = requests.get(api_url, timeout=5)
153                    instance_bins = json.loads(api_request.content)
154                    cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)}
155                    cls.bin_data["last_collected"][instance] = datetime.datetime.now()
156                except (requests.RequestException, json.JSONDecodeError):
157                    cls.bin_data["all_bins"][instance] = {"failed": True}
158                    # TODO: No logger here as nothing has been initialized
159                    # print(f"WARNING, unable to collect TCAT bins from instance {instance}")
160                    pass
161
162    @classmethod
163    def get_options(cls, parent_dataset=None, config=None):
164        """
165        Get data source options
166
167        This method takes the pre-defined options, but fills the 'bins' options
168        with bins currently available from the configured TCAT instances.
169
170        :param config:
171        :param DataSet parent_dataset:  An object representing the dataset that
172        the processor would be run on
173can
174        be used to show some options only to privileges users.
175        """
176        options = cls.options
177
178        cls.collect_all_bins(config)
179        if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]):
180            options["bin"] = {
181                "type": UserInput.OPTION_INFO,
182                "help": "Could not connect to DMI-TCAT instance(s)."
183            }
184            return options
185
186        options["bin"] = {
187            "type": UserInput.OPTION_CHOICE,
188            "options": {},
189            "help": "Query bin"
190        }
191
192        for instance, bins in cls.bin_data["all_bins"].items():
193            # make the host somewhat human-readable
194            # also strip out embedded HTTP auths
195            host = re.sub(r"^https?://", "", instance).split("@").pop()
196            for bin_name, bin in bins.items():
197                bin_key = "%s@%s" % (bin_name, host)
198                display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}"
199                options["bin"]["options"][bin_key] = display_text
200
201        return options
202
203    def get_items(self, query):
204        """
205        Use the DMI-TCAT tweet export to retrieve tweets
206
207        :param query:
208        :return:
209        """
210        bin = self.parameters.get("bin")
211        bin_name = bin.split("@")[0]
212        bin_host = bin.split("@").pop()
213
214        # we cannot store the full instance URL as a parameter, because it may
215        # contain sensitive information (e.g. HTTP auth) - so we find the full
216        # instance URL again here
217        # while the parameter could be marked 'sensitive', the values would
218        # still show up in e.g. the HTML of the 'create dataset' form
219        available_instances = self.config.get("dmi-tcat-search.instances", [])
220        instance_url = ""
221        instance = None
222        for available_instance in available_instances:
223            hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/")
224            if hostname == bin_host:
225                instance_url = available_instance
226                instance = available_instance.rstrip("/")
227                break
228
229        if not instance_url:
230            return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host)
231
232        # Collect the bins again (ensure we have updated info in case bin is still active)
233        self.collect_all_bins(self.config, force_update=True)
234        # Add metadata to parameters
235        try:
236            current_bin = self.bin_data["all_bins"][instance][bin_name]
237        except KeyError:
238            return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}")
239        # Add TCAT metadata to dataset
240        self.dataset.tcat_bin_data = current_bin
241        if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin):
242            self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.")
243
244        # now get the parameters...
245        request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php"
246
247        # Allow for blank dates
248        if self.parameters.get("min_date"):
249            start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d")
250        else:
251            first_tweet_timestamp = current_bin.get('range').get('first_tweet')
252            start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d")
253
254        end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
255        parameters = {
256            "dataset": bin_name,
257            "query": self.parameters.get("query"),
258            "url_query": self.parameters.get("url"),
259            "media_url_query": self.parameters.get("url-media"),
260            "exclude": self.parameters.get("query-exclude"),
261            "from_user_name": self.parameters.get("user-name"),
262            "from_user_lang": self.parameters.get("user-language"),
263            "lang": self.parameters.get("tweet-language"),
264            "exclude_from_user_name": self.parameters.get("user-exclude"),
265            "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")),
266            "startdate": start_date,
267            "enddate": end_date,
268            "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no",
269            "whattodo": "",
270            "exportSettings": "urls,mentions,hashtags,media,",
271            "graph_resolution": "day",
272            "outputformat": "csv"
273        }
274
275        # for now we simply request the full CSV export of the bin with the
276        # given parameters, letting TCAT handle the full text search and so
277        # on
278        self.dataset.update_status("Searching for tweets on %s" % bin_host)
279        response = requests.get(request_url, params=parameters, stream=True)
280        if response.status_code != 200:
281            return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code)
282
283        # process the file in 1kB chunks, buffer as we go
284        # If a newline is encountered, the buffer is processed as a row of csv
285        # data. This works as long as there are no newlines in the csv itself,
286        # which is the case for TCAT exports. Processing as a stream is needed
287        # to avoid having to load the full file in memory
288        buffer = bytearray()
289        fieldnames = None
290        items = 0
291        encoding = None
292        api_map_errors = 0
293        mapping_errors = 0
294        for chunk in response.iter_content(chunk_size=1024):
295            # see if this chunk contains a newline, in which case we have a
296            # full line to process (e.g. as a tweet)
297            lines = []
298            buffer += bytearray(chunk)
299
300            if not encoding and len(buffer) > 3:
301                # response.encoding is not correct sometimes, since it does not
302                # indicate that the file uses a BOM, so sniff it instead once
303                # we have some bytes
304                encoding = sniff_encoding(buffer)
305
306            # split buffer by newlines and process each full line
307            # the last line is always carried over, since it may be incomplete
308            if b"\n" in buffer:
309                buffered_lines = buffer.split(b"\n")
310                lines = buffered_lines[:-1]
311                buffer = buffered_lines.pop()
312            elif not chunk:
313                # eof, process left-over data
314                lines = buffer.split(b"\n")
315
316            # and finally we can process the data
317            for line in lines:
318                # use a dummy csv reader to abstract away the annoying csv parsing
319                # this is quite a bit of overhead, but beats implementing csv parsing
320                # manually, and it's still reasonably fast (about 10k/second)
321                dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding)
322                reader = csv.reader(dummy_file,
323                                    delimiter=",",
324                                    quotechar='"',
325                                    doublequote=True,
326                                    quoting=csv.QUOTE_MINIMAL)
327                row_data = next(reader)
328
329                if row_data and not fieldnames:
330                    # first line in file
331                    fieldnames = row_data.copy()
332
333                elif row_data:
334                    tweet = dict(zip(fieldnames, row_data))
335                    items += 1
336
337                    if items % 250 == 0:
338                        self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host))
339
340                    try:
341                        formatted_tweet = self.tcat_to_APIv2(tweet)
342                    except (KeyError, IndexError) as e:
343                        self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}")
344                        api_map_errors += 1
345                        continue
346                    
347                    # Check mapping errors
348                    try:
349                        SearchWithTwitterAPIv2.map_item(formatted_tweet)
350                    except (KeyError, IndexError) as e:
351                        # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON
352                        self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}")
353                        mapping_errors += 1
354
355                    # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later
356                    yield formatted_tweet
357
358            if not chunk:
359                # end of file
360                break
361
362        if mapping_errors or api_map_errors:
363            error_message = ""
364            if mapping_errors:
365                error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. "
366            if api_map_errors:
367                error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them."
368            self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})")
369            self.dataset.update_status(error_message, is_final=True)
370
371    @ staticmethod
372    def tcat_to_4cat_time(tcat_time):
373        """
374        Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp.
375
376        :return datetime:
377        """
378        try:
379            tcat_time = int(tcat_time)
380            return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z")
381        except ValueError:
382            return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z")
383
384    @staticmethod
385    def tcat_to_APIv2(tcat_tweet):
386        """
387        Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors!
388
389        A great deal of information is missing so there may result in some issues. Notes are kept for the expected
390        type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors
391        to handle None if necessary.
392        """
393        # We're missing lots of data here...
394
395        urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url]
396        # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif
397        media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"]
398
399        # 4CAT Twitter APIv2 result data structure
400        APIv2_tweet = {
401            "lang": tcat_tweet["lang"],  # str
402            "source": tcat_tweet["source"],  # REMOVED FROM TWITTER API v2
403            "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None,  # bool
404            "text": tcat_tweet["text"],  # str
405            "edit_history_tweet_ids": None,  # list; Missing in TCAT data
406            "public_metrics": {
407                "retweet_count": tcat_tweet["retweet_count"],  # int
408                "reply_count": None,  # int; Missing in TCAT data
409                "like_count": tcat_tweet["favorite_count"],  # int
410                "quote_count": None,  # int; Missing in TCAT data
411                "impression_count": None,  # int; Missing in TCAT data
412                # TCAT has also favorite_count
413            },
414            "entities": {
415                "mentions": [{
416                    "id": None,  # str; Missing in TCAT data
417                    "username": mention.strip(),  # str
418                    # Twitter v2 API has additional user fields
419                } for mention in tcat_tweet["mentions"].split(";") if mention],
420                "annotations": None,  # list; Missing in TCAT data
421                "urls": [{
422                    "url": url,  # str
423                    "expanded_url": url,  # str
424                    # Twitter v2 API has additional URL fields
425                } for url in urls],
426                "hashtags": [{
427                    "tag": hashtag.strip(),  # str
428                    "start": None,  # int; Missing in TCAT data
429                    "end": None,  # int; Missing in TCAT data
430                } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag],
431                "cashtags": None,  # list; Missing in TCAT data
432            },
433            "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]),  # str
434            "id": tcat_tweet["id"],  # str
435            "author_id": tcat_tweet["from_user_id"],  # str
436            "context_annotations": None,  # list; Missing in TCAT data
437            "reply_settings": None,  # str; Missing in TCAT data
438            "conversation_id": None,  # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation
439            "author_user": {
440                "protected": None,  # bool; Missing in TCAT data
441                "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None,  # bool
442                "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "",  # str; may be Missing in TCAT data
443                "name": tcat_tweet["from_user_realname"],  # str
444                "entities": {
445                    "description": None,  # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data
446                    "url": None,  # dict; containers entities from author url e.g. URL data; Missing in TCAT data
447                },
448                "description": tcat_tweet["from_user_description"],  # str
449                "pinned_tweet_id": None,  # str; Missing in TCAT data
450                "profile_image_url": tcat_tweet["from_user_profile_image_url"],  # str
451                "url": tcat_tweet["from_user_url"],  # str
452                "username": tcat_tweet["from_user_name"],  # str
453                "id": tcat_tweet["from_user_id"],  # str
454                "location": None,  # str; Missing in TCAT data
455                "public_metrics": {
456                    "followers_count": tcat_tweet["from_user_followercount"],  # int
457                    "following_count": tcat_tweet["from_user_friendcount"],  # int
458                    "tweet_count": tcat_tweet["from_user_tweetcount"],  # int
459                    "listed_count": tcat_tweet["from_user_listed"],  # int
460                    # TCAT has also from_user_favourites_count
461                },
462                "withheld": {
463                    "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
464                },
465                # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone
466            },
467            "attachments": {
468                # TCAT has some media data, but not the URLs listed
469                "media_keys": [{
470                    "type": media_type,
471                    "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]),  # str; TCAT does not have the URL though it may be in the list of URLs
472                    "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}]  # list; This is not the expected direct link to video, but it is a URL to the video
473                    # Twitter API v2 has additional data
474                }],  # list; TCAT seems to only have one type of media per tweet
475                "poll_ids": None,  # list; Missing from TCAT data
476            },
477            "geo": {
478                "place_id": None,  # str; Missing from TCAT data
479                "place": {
480                    "country": None,  # str; Missing from TCAT data
481                    "id": None,  # str; Missing from TCAT data
482                    "geo": {
483
484                    },
485                    "country_code": None,  # str; Missing from TCAT data
486                    "name": tcat_tweet["location"],  # str
487                    "place_type": None,  # str; Missing from TCAT data
488                    "full_name": tcat_tweet["location"],  # str
489                },
490                "coordindates": {
491                    "type": None,  # str; Missing from TCAT data
492                    "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]],  # list i.e. [longitude, latitude]
493                },
494            },
495            "withheld": {
496                "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None,  # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess
497                "country_codes": tcat_tweet["withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
498            },
499        }
500
501        # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing
502        referenced_tweets = []
503        if tcat_tweet["text"][:4] == "RT @":
504            # Retweet
505            referenced_tweets.append({
506                "type": "retweeted",
507                "id": None,  # str; Missing in TCAT data
508            })
509        if tcat_tweet["quoted_status_id"]:
510            # Quote
511            referenced_tweets.append({
512                "type": "quoted",
513                "id": tcat_tweet["quoted_status_id"],  # str; Missing in TCAT data
514            })
515        if tcat_tweet["in_reply_to_status_id"]:
516            # Reply
517            referenced_tweets.append({
518                "type": "replied_to",
519                "id": tcat_tweet["in_reply_to_status_id"],  # str; Missing in TCAT data
520            })
521            # These should NOT be None in case a processor/user attempts to identify a reply using these
522            APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN"  # str; Missing from TCAT data
523            APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"}  # dict; Missing from TCAT data
524
525        APIv2_tweet["referenced_tweets"] = referenced_tweets  # list
526
527        # Append any extra TCAT data
528        additional_TCAT_data = {}
529        for field in SearchWithinTCATBins.additional_TCAT_fields:
530            additional_TCAT_data["TCAT_"+field] = tcat_tweet[field]
531        APIv2_tweet.update(additional_TCAT_data)
532
533        return APIv2_tweet
534
535    @staticmethod
536    def validate_query(query, request, config):
537        """
538        Validate DMI-TCAT query input
539
540        :param dict query:  Query parameters, from client-side.
541        :param request:  Flask request
542        :param ConfigManager|None config:  Configuration reader (context-aware)
543        :return dict:  Safe query parameters
544        """
545        # no query 4 u
546        if not query.get("bin", "").strip():
547            raise QueryParametersException("You must choose a query bin to get tweets from.")
548
549        # Dates need to make sense as a range to search within
550        after, before = query.get("daterange")
551        if (after and before) and before <= after:
552            raise QueryParametersException("A date range must start before it ends")
553
554        query["min_date"], query["max_date"] = query.get("daterange")
555        del query["daterange"]
556
557        # simple!
558        return query
559
560    @staticmethod
561    def map_item(item):
562        """
563        Use Twitter APIv2 map_item
564        """
565        mapped_tweet = SearchWithTwitterAPIv2.map_item(item)
566
567        # Add TCAT extra data
568        data = mapped_tweet.get_item_data()
569        message = mapped_tweet.get_message()
570        for field in SearchWithinTCATBins.additional_TCAT_fields:
571            data["TCAT_" + field] = item.get("TCAT_" + field)
572
573        return MappedItem(data, message)
class SearchWithinTCATBins(backend.lib.search.Search):
 21class SearchWithinTCATBins(Search):
 22    """
 23    Get Tweets via DMI-TCAT
 24
 25    This allows subsetting an existing query bin, similar to the 'Data
 26    Selection' panel in the DMI-TCAT analysis interface
 27    """
 28    type = "dmi-tcat-search"  # job ID
 29    extension = "ndjson"
 30    title = "TCAT Search (HTTP)"
 31
 32    # TCAT has a few fields that do not exist in APIv2
 33    additional_TCAT_fields = ["to_user_name", "filter_level", "favorite_count", "truncated", "from_user_favourites_count", "from_user_lang", "from_user_utcoffset",
 34                              "from_user_timezone"]
 35
 36    options = {
 37        "intro-1": {
 38            "type": UserInput.OPTION_INFO,
 39            "help": "This data source interfaces with a DMI-TCAT instance to allow subsetting of tweets from a tweet "
 40                    "bin in that instance."
 41        },
 42        "divider-1": {
 43            "type": UserInput.OPTION_DIVIDER
 44        },
 45        "bin": {
 46            "type": UserInput.OPTION_INFO,
 47            "help": "Query bin"
 48        },
 49        "query": {
 50            "type": UserInput.OPTION_TEXT,
 51            "help": "Query text",
 52            "tooltip": "Match all tweets containing this text."
 53        },
 54        "query-exclude": {
 55            "type": UserInput.OPTION_TEXT,
 56            "help": "Exclude text",
 57            "tooltip": "Match all tweets that do NOT contain this text."
 58        },
 59        "user-name": {
 60            "type": UserInput.OPTION_TEXT,
 61            "help": "From user",
 62            "tooltip": "Match all tweets from this username."
 63        },
 64        "user-exclude": {
 65            "type": UserInput.OPTION_TEXT,
 66            "help": "Exclude user",
 67            "tooltip": "Match all tweets NOT from this username."
 68        },
 69        "exclude-replies": {
 70            "type": UserInput.OPTION_CHOICE,
 71            "options": {
 72                "exclude": "Exclude replies",
 73                "include": "Include replies"
 74            },
 75            "help": "Reply tweets",
 76            "default": "include",
 77            "tooltip": "Choose to exclude or include tweets that are replies from the data"
 78        },
 79        "daterange": {
 80            "type": UserInput.OPTION_DATERANGE,
 81            "help": "Date range"
 82        },
 83        # Advanced Options Section
 84        "divider-2": {
 85            "type": UserInput.OPTION_DIVIDER
 86        },
 87        "advanced_options_info": {
 88            "type": UserInput.OPTION_INFO,
 89            "help": "Advanced Query Options can further refine your query"
 90        },
 91        "user-bio": {
 92            "type": UserInput.OPTION_TEXT,
 93            "help": "User bio text",
 94            "tooltip": "Match all tweets from users with biographies containing this text."
 95        },
 96        "user-language": {
 97            "type": UserInput.OPTION_TEXT,
 98            "help": "User language",
 99            "tooltip": "Match all tweets from users using this language (as detected by Twitter)."
100        },
101        "tweet-language": {
102            "type": UserInput.OPTION_TEXT,
103            "help": "Tweet language",
104            "tooltip": "Match all tweets from users with this language (as detected by Twitter)."
105        },
106        "tweet-client": {
107            "type": UserInput.OPTION_TEXT,
108            "help": "Twitter client URL/descr",
109            "tooltip": "Match all tweets from clients that match this text."
110        },
111        "url": {
112            "type": UserInput.OPTION_TEXT,
113            "help": "(Part of) URL",
114            "tooltip": "Match all tweets containing this (partial) URL."
115        },
116        "url-media": {
117            "type": UserInput.OPTION_TEXT,
118            "help": "(Part of) media URL",
119            "tooltip": "Match all tweets containing this (partial) media URL."
120        },
121    }
122
123    config = {
124        "dmi-tcat-search.instances": {
125            "type": UserInput.OPTION_TEXT_JSON,
126            "help": "DMI-TCAT instances",
127            "tooltip": 'List of DMI-TCAT instance URLs, e.g. ["http://username:password@tcat.instance.webpage.net"]. '
128                       'This  needs to be formatted as a JSON list of strings.',
129            "default": {}
130        }
131    }
132
133    bin_data = {
134        "all_bins": {},
135        "last_collected": {},
136    }
137
138    @classmethod
139    def collect_all_bins(cls, config, force_update=False):
140        """
141        Requests bin information from TCAT instances
142        """
143        instances = config.get("dmi-tcat-search.instances", [])
144        for instance in instances:
145            # query each configured TCAT instance for a list of bins that can
146            # be subsetted
147            instance = instance.rstrip("/")
148            api_url = instance + "/api/bin-stats.php"
149
150            if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]:
151                # Collect Instance data
152                try:
153                    api_request = requests.get(api_url, timeout=5)
154                    instance_bins = json.loads(api_request.content)
155                    cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)}
156                    cls.bin_data["last_collected"][instance] = datetime.datetime.now()
157                except (requests.RequestException, json.JSONDecodeError):
158                    cls.bin_data["all_bins"][instance] = {"failed": True}
159                    # TODO: No logger here as nothing has been initialized
160                    # print(f"WARNING, unable to collect TCAT bins from instance {instance}")
161                    pass
162
163    @classmethod
164    def get_options(cls, parent_dataset=None, config=None):
165        """
166        Get data source options
167
168        This method takes the pre-defined options, but fills the 'bins' options
169        with bins currently available from the configured TCAT instances.
170
171        :param config:
172        :param DataSet parent_dataset:  An object representing the dataset that
173        the processor would be run on
174can
175        be used to show some options only to privileges users.
176        """
177        options = cls.options
178
179        cls.collect_all_bins(config)
180        if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]):
181            options["bin"] = {
182                "type": UserInput.OPTION_INFO,
183                "help": "Could not connect to DMI-TCAT instance(s)."
184            }
185            return options
186
187        options["bin"] = {
188            "type": UserInput.OPTION_CHOICE,
189            "options": {},
190            "help": "Query bin"
191        }
192
193        for instance, bins in cls.bin_data["all_bins"].items():
194            # make the host somewhat human-readable
195            # also strip out embedded HTTP auths
196            host = re.sub(r"^https?://", "", instance).split("@").pop()
197            for bin_name, bin in bins.items():
198                bin_key = "%s@%s" % (bin_name, host)
199                display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}"
200                options["bin"]["options"][bin_key] = display_text
201
202        return options
203
204    def get_items(self, query):
205        """
206        Use the DMI-TCAT tweet export to retrieve tweets
207
208        :param query:
209        :return:
210        """
211        bin = self.parameters.get("bin")
212        bin_name = bin.split("@")[0]
213        bin_host = bin.split("@").pop()
214
215        # we cannot store the full instance URL as a parameter, because it may
216        # contain sensitive information (e.g. HTTP auth) - so we find the full
217        # instance URL again here
218        # while the parameter could be marked 'sensitive', the values would
219        # still show up in e.g. the HTML of the 'create dataset' form
220        available_instances = self.config.get("dmi-tcat-search.instances", [])
221        instance_url = ""
222        instance = None
223        for available_instance in available_instances:
224            hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/")
225            if hostname == bin_host:
226                instance_url = available_instance
227                instance = available_instance.rstrip("/")
228                break
229
230        if not instance_url:
231            return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host)
232
233        # Collect the bins again (ensure we have updated info in case bin is still active)
234        self.collect_all_bins(self.config, force_update=True)
235        # Add metadata to parameters
236        try:
237            current_bin = self.bin_data["all_bins"][instance][bin_name]
238        except KeyError:
239            return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}")
240        # Add TCAT metadata to dataset
241        self.dataset.tcat_bin_data = current_bin
242        if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin):
243            self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.")
244
245        # now get the parameters...
246        request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php"
247
248        # Allow for blank dates
249        if self.parameters.get("min_date"):
250            start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d")
251        else:
252            first_tweet_timestamp = current_bin.get('range').get('first_tweet')
253            start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d")
254
255        end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
256        parameters = {
257            "dataset": bin_name,
258            "query": self.parameters.get("query"),
259            "url_query": self.parameters.get("url"),
260            "media_url_query": self.parameters.get("url-media"),
261            "exclude": self.parameters.get("query-exclude"),
262            "from_user_name": self.parameters.get("user-name"),
263            "from_user_lang": self.parameters.get("user-language"),
264            "lang": self.parameters.get("tweet-language"),
265            "exclude_from_user_name": self.parameters.get("user-exclude"),
266            "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")),
267            "startdate": start_date,
268            "enddate": end_date,
269            "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no",
270            "whattodo": "",
271            "exportSettings": "urls,mentions,hashtags,media,",
272            "graph_resolution": "day",
273            "outputformat": "csv"
274        }
275
276        # for now we simply request the full CSV export of the bin with the
277        # given parameters, letting TCAT handle the full text search and so
278        # on
279        self.dataset.update_status("Searching for tweets on %s" % bin_host)
280        response = requests.get(request_url, params=parameters, stream=True)
281        if response.status_code != 200:
282            return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code)
283
284        # process the file in 1kB chunks, buffer as we go
285        # If a newline is encountered, the buffer is processed as a row of csv
286        # data. This works as long as there are no newlines in the csv itself,
287        # which is the case for TCAT exports. Processing as a stream is needed
288        # to avoid having to load the full file in memory
289        buffer = bytearray()
290        fieldnames = None
291        items = 0
292        encoding = None
293        api_map_errors = 0
294        mapping_errors = 0
295        for chunk in response.iter_content(chunk_size=1024):
296            # see if this chunk contains a newline, in which case we have a
297            # full line to process (e.g. as a tweet)
298            lines = []
299            buffer += bytearray(chunk)
300
301            if not encoding and len(buffer) > 3:
302                # response.encoding is not correct sometimes, since it does not
303                # indicate that the file uses a BOM, so sniff it instead once
304                # we have some bytes
305                encoding = sniff_encoding(buffer)
306
307            # split buffer by newlines and process each full line
308            # the last line is always carried over, since it may be incomplete
309            if b"\n" in buffer:
310                buffered_lines = buffer.split(b"\n")
311                lines = buffered_lines[:-1]
312                buffer = buffered_lines.pop()
313            elif not chunk:
314                # eof, process left-over data
315                lines = buffer.split(b"\n")
316
317            # and finally we can process the data
318            for line in lines:
319                # use a dummy csv reader to abstract away the annoying csv parsing
320                # this is quite a bit of overhead, but beats implementing csv parsing
321                # manually, and it's still reasonably fast (about 10k/second)
322                dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding)
323                reader = csv.reader(dummy_file,
324                                    delimiter=",",
325                                    quotechar='"',
326                                    doublequote=True,
327                                    quoting=csv.QUOTE_MINIMAL)
328                row_data = next(reader)
329
330                if row_data and not fieldnames:
331                    # first line in file
332                    fieldnames = row_data.copy()
333
334                elif row_data:
335                    tweet = dict(zip(fieldnames, row_data))
336                    items += 1
337
338                    if items % 250 == 0:
339                        self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host))
340
341                    try:
342                        formatted_tweet = self.tcat_to_APIv2(tweet)
343                    except (KeyError, IndexError) as e:
344                        self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}")
345                        api_map_errors += 1
346                        continue
347                    
348                    # Check mapping errors
349                    try:
350                        SearchWithTwitterAPIv2.map_item(formatted_tweet)
351                    except (KeyError, IndexError) as e:
352                        # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON
353                        self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}")
354                        mapping_errors += 1
355
356                    # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later
357                    yield formatted_tweet
358
359            if not chunk:
360                # end of file
361                break
362
363        if mapping_errors or api_map_errors:
364            error_message = ""
365            if mapping_errors:
366                error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. "
367            if api_map_errors:
368                error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them."
369            self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})")
370            self.dataset.update_status(error_message, is_final=True)
371
372    @ staticmethod
373    def tcat_to_4cat_time(tcat_time):
374        """
375        Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp.
376
377        :return datetime:
378        """
379        try:
380            tcat_time = int(tcat_time)
381            return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z")
382        except ValueError:
383            return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z")
384
385    @staticmethod
386    def tcat_to_APIv2(tcat_tweet):
387        """
388        Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors!
389
390        A great deal of information is missing so there may result in some issues. Notes are kept for the expected
391        type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors
392        to handle None if necessary.
393        """
394        # We're missing lots of data here...
395
396        urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url]
397        # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif
398        media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"]
399
400        # 4CAT Twitter APIv2 result data structure
401        APIv2_tweet = {
402            "lang": tcat_tweet["lang"],  # str
403            "source": tcat_tweet["source"],  # REMOVED FROM TWITTER API v2
404            "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None,  # bool
405            "text": tcat_tweet["text"],  # str
406            "edit_history_tweet_ids": None,  # list; Missing in TCAT data
407            "public_metrics": {
408                "retweet_count": tcat_tweet["retweet_count"],  # int
409                "reply_count": None,  # int; Missing in TCAT data
410                "like_count": tcat_tweet["favorite_count"],  # int
411                "quote_count": None,  # int; Missing in TCAT data
412                "impression_count": None,  # int; Missing in TCAT data
413                # TCAT has also favorite_count
414            },
415            "entities": {
416                "mentions": [{
417                    "id": None,  # str; Missing in TCAT data
418                    "username": mention.strip(),  # str
419                    # Twitter v2 API has additional user fields
420                } for mention in tcat_tweet["mentions"].split(";") if mention],
421                "annotations": None,  # list; Missing in TCAT data
422                "urls": [{
423                    "url": url,  # str
424                    "expanded_url": url,  # str
425                    # Twitter v2 API has additional URL fields
426                } for url in urls],
427                "hashtags": [{
428                    "tag": hashtag.strip(),  # str
429                    "start": None,  # int; Missing in TCAT data
430                    "end": None,  # int; Missing in TCAT data
431                } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag],
432                "cashtags": None,  # list; Missing in TCAT data
433            },
434            "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]),  # str
435            "id": tcat_tweet["id"],  # str
436            "author_id": tcat_tweet["from_user_id"],  # str
437            "context_annotations": None,  # list; Missing in TCAT data
438            "reply_settings": None,  # str; Missing in TCAT data
439            "conversation_id": None,  # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation
440            "author_user": {
441                "protected": None,  # bool; Missing in TCAT data
442                "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None,  # bool
443                "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "",  # str; may be Missing in TCAT data
444                "name": tcat_tweet["from_user_realname"],  # str
445                "entities": {
446                    "description": None,  # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data
447                    "url": None,  # dict; containers entities from author url e.g. URL data; Missing in TCAT data
448                },
449                "description": tcat_tweet["from_user_description"],  # str
450                "pinned_tweet_id": None,  # str; Missing in TCAT data
451                "profile_image_url": tcat_tweet["from_user_profile_image_url"],  # str
452                "url": tcat_tweet["from_user_url"],  # str
453                "username": tcat_tweet["from_user_name"],  # str
454                "id": tcat_tweet["from_user_id"],  # str
455                "location": None,  # str; Missing in TCAT data
456                "public_metrics": {
457                    "followers_count": tcat_tweet["from_user_followercount"],  # int
458                    "following_count": tcat_tweet["from_user_friendcount"],  # int
459                    "tweet_count": tcat_tweet["from_user_tweetcount"],  # int
460                    "listed_count": tcat_tweet["from_user_listed"],  # int
461                    # TCAT has also from_user_favourites_count
462                },
463                "withheld": {
464                    "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
465                },
466                # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone
467            },
468            "attachments": {
469                # TCAT has some media data, but not the URLs listed
470                "media_keys": [{
471                    "type": media_type,
472                    "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]),  # str; TCAT does not have the URL though it may be in the list of URLs
473                    "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}]  # list; This is not the expected direct link to video, but it is a URL to the video
474                    # Twitter API v2 has additional data
475                }],  # list; TCAT seems to only have one type of media per tweet
476                "poll_ids": None,  # list; Missing from TCAT data
477            },
478            "geo": {
479                "place_id": None,  # str; Missing from TCAT data
480                "place": {
481                    "country": None,  # str; Missing from TCAT data
482                    "id": None,  # str; Missing from TCAT data
483                    "geo": {
484
485                    },
486                    "country_code": None,  # str; Missing from TCAT data
487                    "name": tcat_tweet["location"],  # str
488                    "place_type": None,  # str; Missing from TCAT data
489                    "full_name": tcat_tweet["location"],  # str
490                },
491                "coordindates": {
492                    "type": None,  # str; Missing from TCAT data
493                    "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]],  # list i.e. [longitude, latitude]
494                },
495            },
496            "withheld": {
497                "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None,  # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess
498                "country_codes": tcat_tweet["withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
499            },
500        }
501
502        # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing
503        referenced_tweets = []
504        if tcat_tweet["text"][:4] == "RT @":
505            # Retweet
506            referenced_tweets.append({
507                "type": "retweeted",
508                "id": None,  # str; Missing in TCAT data
509            })
510        if tcat_tweet["quoted_status_id"]:
511            # Quote
512            referenced_tweets.append({
513                "type": "quoted",
514                "id": tcat_tweet["quoted_status_id"],  # str; Missing in TCAT data
515            })
516        if tcat_tweet["in_reply_to_status_id"]:
517            # Reply
518            referenced_tweets.append({
519                "type": "replied_to",
520                "id": tcat_tweet["in_reply_to_status_id"],  # str; Missing in TCAT data
521            })
522            # These should NOT be None in case a processor/user attempts to identify a reply using these
523            APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN"  # str; Missing from TCAT data
524            APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"}  # dict; Missing from TCAT data
525
526        APIv2_tweet["referenced_tweets"] = referenced_tweets  # list
527
528        # Append any extra TCAT data
529        additional_TCAT_data = {}
530        for field in SearchWithinTCATBins.additional_TCAT_fields:
531            additional_TCAT_data["TCAT_"+field] = tcat_tweet[field]
532        APIv2_tweet.update(additional_TCAT_data)
533
534        return APIv2_tweet
535
536    @staticmethod
537    def validate_query(query, request, config):
538        """
539        Validate DMI-TCAT query input
540
541        :param dict query:  Query parameters, from client-side.
542        :param request:  Flask request
543        :param ConfigManager|None config:  Configuration reader (context-aware)
544        :return dict:  Safe query parameters
545        """
546        # no query 4 u
547        if not query.get("bin", "").strip():
548            raise QueryParametersException("You must choose a query bin to get tweets from.")
549
550        # Dates need to make sense as a range to search within
551        after, before = query.get("daterange")
552        if (after and before) and before <= after:
553            raise QueryParametersException("A date range must start before it ends")
554
555        query["min_date"], query["max_date"] = query.get("daterange")
556        del query["daterange"]
557
558        # simple!
559        return query
560
561    @staticmethod
562    def map_item(item):
563        """
564        Use Twitter APIv2 map_item
565        """
566        mapped_tweet = SearchWithTwitterAPIv2.map_item(item)
567
568        # Add TCAT extra data
569        data = mapped_tweet.get_item_data()
570        message = mapped_tweet.get_message()
571        for field in SearchWithinTCATBins.additional_TCAT_fields:
572            data["TCAT_" + field] = item.get("TCAT_" + field)
573
574        return MappedItem(data, message)

Get Tweets via DMI-TCAT

This allows subsetting an existing query bin, similar to the 'Data Selection' panel in the DMI-TCAT analysis interface

type = 'dmi-tcat-search'
extension = 'ndjson'
title = 'TCAT Search (HTTP)'
additional_TCAT_fields = ['to_user_name', 'filter_level', 'favorite_count', 'truncated', 'from_user_favourites_count', 'from_user_lang', 'from_user_utcoffset', 'from_user_timezone']
options = {'intro-1': {'type': 'info', 'help': 'This data source interfaces with a DMI-TCAT instance to allow subsetting of tweets from a tweet bin in that instance.'}, 'divider-1': {'type': 'divider'}, 'bin': {'type': 'info', 'help': 'Query bin'}, 'query': {'type': 'string', 'help': 'Query text', 'tooltip': 'Match all tweets containing this text.'}, 'query-exclude': {'type': 'string', 'help': 'Exclude text', 'tooltip': 'Match all tweets that do NOT contain this text.'}, 'user-name': {'type': 'string', 'help': 'From user', 'tooltip': 'Match all tweets from this username.'}, 'user-exclude': {'type': 'string', 'help': 'Exclude user', 'tooltip': 'Match all tweets NOT from this username.'}, 'exclude-replies': {'type': 'choice', 'options': {'exclude': 'Exclude replies', 'include': 'Include replies'}, 'help': 'Reply tweets', 'default': 'include', 'tooltip': 'Choose to exclude or include tweets that are replies from the data'}, 'daterange': {'type': 'daterange', 'help': 'Date range'}, 'divider-2': {'type': 'divider'}, 'advanced_options_info': {'type': 'info', 'help': 'Advanced Query Options can further refine your query'}, 'user-bio': {'type': 'string', 'help': 'User bio text', 'tooltip': 'Match all tweets from users with biographies containing this text.'}, 'user-language': {'type': 'string', 'help': 'User language', 'tooltip': 'Match all tweets from users using this language (as detected by Twitter).'}, 'tweet-language': {'type': 'string', 'help': 'Tweet language', 'tooltip': 'Match all tweets from users with this language (as detected by Twitter).'}, 'tweet-client': {'type': 'string', 'help': 'Twitter client URL/descr', 'tooltip': 'Match all tweets from clients that match this text.'}, 'url': {'type': 'string', 'help': '(Part of) URL', 'tooltip': 'Match all tweets containing this (partial) URL.'}, 'url-media': {'type': 'string', 'help': '(Part of) media URL', 'tooltip': 'Match all tweets containing this (partial) media URL.'}}
config = {'dmi-tcat-search.instances': {'type': 'json', 'help': 'DMI-TCAT instances', 'tooltip': 'List of DMI-TCAT instance URLs, e.g. ["http://username:password@tcat.instance.webpage.net"]. This needs to be formatted as a JSON list of strings.', 'default': {}}}
bin_data = {'all_bins': {}, 'last_collected': {}}
@classmethod
def collect_all_bins(cls, config, force_update=False):
138    @classmethod
139    def collect_all_bins(cls, config, force_update=False):
140        """
141        Requests bin information from TCAT instances
142        """
143        instances = config.get("dmi-tcat-search.instances", [])
144        for instance in instances:
145            # query each configured TCAT instance for a list of bins that can
146            # be subsetted
147            instance = instance.rstrip("/")
148            api_url = instance + "/api/bin-stats.php"
149
150            if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]:
151                # Collect Instance data
152                try:
153                    api_request = requests.get(api_url, timeout=5)
154                    instance_bins = json.loads(api_request.content)
155                    cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)}
156                    cls.bin_data["last_collected"][instance] = datetime.datetime.now()
157                except (requests.RequestException, json.JSONDecodeError):
158                    cls.bin_data["all_bins"][instance] = {"failed": True}
159                    # TODO: No logger here as nothing has been initialized
160                    # print(f"WARNING, unable to collect TCAT bins from instance {instance}")
161                    pass

Requests bin information from TCAT instances

@classmethod
def get_options(cls, parent_dataset=None, config=None):
163    @classmethod
164    def get_options(cls, parent_dataset=None, config=None):
165        """
166        Get data source options
167
168        This method takes the pre-defined options, but fills the 'bins' options
169        with bins currently available from the configured TCAT instances.
170
171        :param config:
172        :param DataSet parent_dataset:  An object representing the dataset that
173        the processor would be run on
174can
175        be used to show some options only to privileges users.
176        """
177        options = cls.options
178
179        cls.collect_all_bins(config)
180        if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]):
181            options["bin"] = {
182                "type": UserInput.OPTION_INFO,
183                "help": "Could not connect to DMI-TCAT instance(s)."
184            }
185            return options
186
187        options["bin"] = {
188            "type": UserInput.OPTION_CHOICE,
189            "options": {},
190            "help": "Query bin"
191        }
192
193        for instance, bins in cls.bin_data["all_bins"].items():
194            # make the host somewhat human-readable
195            # also strip out embedded HTTP auths
196            host = re.sub(r"^https?://", "", instance).split("@").pop()
197            for bin_name, bin in bins.items():
198                bin_key = "%s@%s" % (bin_name, host)
199                display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}"
200                options["bin"]["options"][bin_key] = display_text
201
202        return options

Get data source options

    This method takes the pre-defined options, but fills the 'bins' options
    with bins currently available from the configured TCAT instances.

    :param config:
    :param DataSet parent_dataset:  An object representing the dataset that
    the processor would be run on

can be used to show some options only to privileges users.

def get_items(self, query):
204    def get_items(self, query):
205        """
206        Use the DMI-TCAT tweet export to retrieve tweets
207
208        :param query:
209        :return:
210        """
211        bin = self.parameters.get("bin")
212        bin_name = bin.split("@")[0]
213        bin_host = bin.split("@").pop()
214
215        # we cannot store the full instance URL as a parameter, because it may
216        # contain sensitive information (e.g. HTTP auth) - so we find the full
217        # instance URL again here
218        # while the parameter could be marked 'sensitive', the values would
219        # still show up in e.g. the HTML of the 'create dataset' form
220        available_instances = self.config.get("dmi-tcat-search.instances", [])
221        instance_url = ""
222        instance = None
223        for available_instance in available_instances:
224            hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/")
225            if hostname == bin_host:
226                instance_url = available_instance
227                instance = available_instance.rstrip("/")
228                break
229
230        if not instance_url:
231            return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host)
232
233        # Collect the bins again (ensure we have updated info in case bin is still active)
234        self.collect_all_bins(self.config, force_update=True)
235        # Add metadata to parameters
236        try:
237            current_bin = self.bin_data["all_bins"][instance][bin_name]
238        except KeyError:
239            return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}")
240        # Add TCAT metadata to dataset
241        self.dataset.tcat_bin_data = current_bin
242        if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin):
243            self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.")
244
245        # now get the parameters...
246        request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php"
247
248        # Allow for blank dates
249        if self.parameters.get("min_date"):
250            start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d")
251        else:
252            first_tweet_timestamp = current_bin.get('range').get('first_tweet')
253            start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d")
254
255        end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
256        parameters = {
257            "dataset": bin_name,
258            "query": self.parameters.get("query"),
259            "url_query": self.parameters.get("url"),
260            "media_url_query": self.parameters.get("url-media"),
261            "exclude": self.parameters.get("query-exclude"),
262            "from_user_name": self.parameters.get("user-name"),
263            "from_user_lang": self.parameters.get("user-language"),
264            "lang": self.parameters.get("tweet-language"),
265            "exclude_from_user_name": self.parameters.get("user-exclude"),
266            "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")),
267            "startdate": start_date,
268            "enddate": end_date,
269            "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no",
270            "whattodo": "",
271            "exportSettings": "urls,mentions,hashtags,media,",
272            "graph_resolution": "day",
273            "outputformat": "csv"
274        }
275
276        # for now we simply request the full CSV export of the bin with the
277        # given parameters, letting TCAT handle the full text search and so
278        # on
279        self.dataset.update_status("Searching for tweets on %s" % bin_host)
280        response = requests.get(request_url, params=parameters, stream=True)
281        if response.status_code != 200:
282            return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code)
283
284        # process the file in 1kB chunks, buffer as we go
285        # If a newline is encountered, the buffer is processed as a row of csv
286        # data. This works as long as there are no newlines in the csv itself,
287        # which is the case for TCAT exports. Processing as a stream is needed
288        # to avoid having to load the full file in memory
289        buffer = bytearray()
290        fieldnames = None
291        items = 0
292        encoding = None
293        api_map_errors = 0
294        mapping_errors = 0
295        for chunk in response.iter_content(chunk_size=1024):
296            # see if this chunk contains a newline, in which case we have a
297            # full line to process (e.g. as a tweet)
298            lines = []
299            buffer += bytearray(chunk)
300
301            if not encoding and len(buffer) > 3:
302                # response.encoding is not correct sometimes, since it does not
303                # indicate that the file uses a BOM, so sniff it instead once
304                # we have some bytes
305                encoding = sniff_encoding(buffer)
306
307            # split buffer by newlines and process each full line
308            # the last line is always carried over, since it may be incomplete
309            if b"\n" in buffer:
310                buffered_lines = buffer.split(b"\n")
311                lines = buffered_lines[:-1]
312                buffer = buffered_lines.pop()
313            elif not chunk:
314                # eof, process left-over data
315                lines = buffer.split(b"\n")
316
317            # and finally we can process the data
318            for line in lines:
319                # use a dummy csv reader to abstract away the annoying csv parsing
320                # this is quite a bit of overhead, but beats implementing csv parsing
321                # manually, and it's still reasonably fast (about 10k/second)
322                dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding)
323                reader = csv.reader(dummy_file,
324                                    delimiter=",",
325                                    quotechar='"',
326                                    doublequote=True,
327                                    quoting=csv.QUOTE_MINIMAL)
328                row_data = next(reader)
329
330                if row_data and not fieldnames:
331                    # first line in file
332                    fieldnames = row_data.copy()
333
334                elif row_data:
335                    tweet = dict(zip(fieldnames, row_data))
336                    items += 1
337
338                    if items % 250 == 0:
339                        self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host))
340
341                    try:
342                        formatted_tweet = self.tcat_to_APIv2(tweet)
343                    except (KeyError, IndexError) as e:
344                        self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}")
345                        api_map_errors += 1
346                        continue
347                    
348                    # Check mapping errors
349                    try:
350                        SearchWithTwitterAPIv2.map_item(formatted_tweet)
351                    except (KeyError, IndexError) as e:
352                        # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON
353                        self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}")
354                        mapping_errors += 1
355
356                    # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later
357                    yield formatted_tweet
358
359            if not chunk:
360                # end of file
361                break
362
363        if mapping_errors or api_map_errors:
364            error_message = ""
365            if mapping_errors:
366                error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. "
367            if api_map_errors:
368                error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them."
369            self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})")
370            self.dataset.update_status(error_message, is_final=True)

Use the DMI-TCAT tweet export to retrieve tweets

Parameters
  • query:
Returns
@staticmethod
def tcat_to_4cat_time(tcat_time):
372    @ staticmethod
373    def tcat_to_4cat_time(tcat_time):
374        """
375        Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp.
376
377        :return datetime:
378        """
379        try:
380            tcat_time = int(tcat_time)
381            return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z")
382        except ValueError:
383            return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z")

Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp.

Returns
@staticmethod
def tcat_to_APIv2(tcat_tweet):
385    @staticmethod
386    def tcat_to_APIv2(tcat_tweet):
387        """
388        Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors!
389
390        A great deal of information is missing so there may result in some issues. Notes are kept for the expected
391        type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors
392        to handle None if necessary.
393        """
394        # We're missing lots of data here...
395
396        urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url]
397        # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif
398        media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"]
399
400        # 4CAT Twitter APIv2 result data structure
401        APIv2_tweet = {
402            "lang": tcat_tweet["lang"],  # str
403            "source": tcat_tweet["source"],  # REMOVED FROM TWITTER API v2
404            "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None,  # bool
405            "text": tcat_tweet["text"],  # str
406            "edit_history_tweet_ids": None,  # list; Missing in TCAT data
407            "public_metrics": {
408                "retweet_count": tcat_tweet["retweet_count"],  # int
409                "reply_count": None,  # int; Missing in TCAT data
410                "like_count": tcat_tweet["favorite_count"],  # int
411                "quote_count": None,  # int; Missing in TCAT data
412                "impression_count": None,  # int; Missing in TCAT data
413                # TCAT has also favorite_count
414            },
415            "entities": {
416                "mentions": [{
417                    "id": None,  # str; Missing in TCAT data
418                    "username": mention.strip(),  # str
419                    # Twitter v2 API has additional user fields
420                } for mention in tcat_tweet["mentions"].split(";") if mention],
421                "annotations": None,  # list; Missing in TCAT data
422                "urls": [{
423                    "url": url,  # str
424                    "expanded_url": url,  # str
425                    # Twitter v2 API has additional URL fields
426                } for url in urls],
427                "hashtags": [{
428                    "tag": hashtag.strip(),  # str
429                    "start": None,  # int; Missing in TCAT data
430                    "end": None,  # int; Missing in TCAT data
431                } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag],
432                "cashtags": None,  # list; Missing in TCAT data
433            },
434            "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]),  # str
435            "id": tcat_tweet["id"],  # str
436            "author_id": tcat_tweet["from_user_id"],  # str
437            "context_annotations": None,  # list; Missing in TCAT data
438            "reply_settings": None,  # str; Missing in TCAT data
439            "conversation_id": None,  # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation
440            "author_user": {
441                "protected": None,  # bool; Missing in TCAT data
442                "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None,  # bool
443                "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "",  # str; may be Missing in TCAT data
444                "name": tcat_tweet["from_user_realname"],  # str
445                "entities": {
446                    "description": None,  # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data
447                    "url": None,  # dict; containers entities from author url e.g. URL data; Missing in TCAT data
448                },
449                "description": tcat_tweet["from_user_description"],  # str
450                "pinned_tweet_id": None,  # str; Missing in TCAT data
451                "profile_image_url": tcat_tweet["from_user_profile_image_url"],  # str
452                "url": tcat_tweet["from_user_url"],  # str
453                "username": tcat_tweet["from_user_name"],  # str
454                "id": tcat_tweet["from_user_id"],  # str
455                "location": None,  # str; Missing in TCAT data
456                "public_metrics": {
457                    "followers_count": tcat_tweet["from_user_followercount"],  # int
458                    "following_count": tcat_tweet["from_user_friendcount"],  # int
459                    "tweet_count": tcat_tweet["from_user_tweetcount"],  # int
460                    "listed_count": tcat_tweet["from_user_listed"],  # int
461                    # TCAT has also from_user_favourites_count
462                },
463                "withheld": {
464                    "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
465                },
466                # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone
467            },
468            "attachments": {
469                # TCAT has some media data, but not the URLs listed
470                "media_keys": [{
471                    "type": media_type,
472                    "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]),  # str; TCAT does not have the URL though it may be in the list of URLs
473                    "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}]  # list; This is not the expected direct link to video, but it is a URL to the video
474                    # Twitter API v2 has additional data
475                }],  # list; TCAT seems to only have one type of media per tweet
476                "poll_ids": None,  # list; Missing from TCAT data
477            },
478            "geo": {
479                "place_id": None,  # str; Missing from TCAT data
480                "place": {
481                    "country": None,  # str; Missing from TCAT data
482                    "id": None,  # str; Missing from TCAT data
483                    "geo": {
484
485                    },
486                    "country_code": None,  # str; Missing from TCAT data
487                    "name": tcat_tweet["location"],  # str
488                    "place_type": None,  # str; Missing from TCAT data
489                    "full_name": tcat_tweet["location"],  # str
490                },
491                "coordindates": {
492                    "type": None,  # str; Missing from TCAT data
493                    "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]],  # list i.e. [longitude, latitude]
494                },
495            },
496            "withheld": {
497                "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None,  # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess
498                "country_codes": tcat_tweet["withheld_scope"].split(";"),  # list; TODO TCAT has column, but have not seen it populated in testing... This is guess
499            },
500        }
501
502        # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing
503        referenced_tweets = []
504        if tcat_tweet["text"][:4] == "RT @":
505            # Retweet
506            referenced_tweets.append({
507                "type": "retweeted",
508                "id": None,  # str; Missing in TCAT data
509            })
510        if tcat_tweet["quoted_status_id"]:
511            # Quote
512            referenced_tweets.append({
513                "type": "quoted",
514                "id": tcat_tweet["quoted_status_id"],  # str; Missing in TCAT data
515            })
516        if tcat_tweet["in_reply_to_status_id"]:
517            # Reply
518            referenced_tweets.append({
519                "type": "replied_to",
520                "id": tcat_tweet["in_reply_to_status_id"],  # str; Missing in TCAT data
521            })
522            # These should NOT be None in case a processor/user attempts to identify a reply using these
523            APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN"  # str; Missing from TCAT data
524            APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"}  # dict; Missing from TCAT data
525
526        APIv2_tweet["referenced_tweets"] = referenced_tweets  # list
527
528        # Append any extra TCAT data
529        additional_TCAT_data = {}
530        for field in SearchWithinTCATBins.additional_TCAT_fields:
531            additional_TCAT_data["TCAT_"+field] = tcat_tweet[field]
532        APIv2_tweet.update(additional_TCAT_data)
533
534        return APIv2_tweet

Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors!

A great deal of information is missing so there may result in some issues. Notes are kept for the expected type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors to handle None if necessary.

@staticmethod
def validate_query(query, request, config):
536    @staticmethod
537    def validate_query(query, request, config):
538        """
539        Validate DMI-TCAT query input
540
541        :param dict query:  Query parameters, from client-side.
542        :param request:  Flask request
543        :param ConfigManager|None config:  Configuration reader (context-aware)
544        :return dict:  Safe query parameters
545        """
546        # no query 4 u
547        if not query.get("bin", "").strip():
548            raise QueryParametersException("You must choose a query bin to get tweets from.")
549
550        # Dates need to make sense as a range to search within
551        after, before = query.get("daterange")
552        if (after and before) and before <= after:
553            raise QueryParametersException("A date range must start before it ends")
554
555        query["min_date"], query["max_date"] = query.get("daterange")
556        del query["daterange"]
557
558        # simple!
559        return query

Validate DMI-TCAT query input

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def map_item(item):
561    @staticmethod
562    def map_item(item):
563        """
564        Use Twitter APIv2 map_item
565        """
566        mapped_tweet = SearchWithTwitterAPIv2.map_item(item)
567
568        # Add TCAT extra data
569        data = mapped_tweet.get_item_data()
570        message = mapped_tweet.get_message()
571        for field in SearchWithinTCATBins.additional_TCAT_fields:
572            data["TCAT_" + field] = item.get("TCAT_" + field)
573
574        return MappedItem(data, message)

Use Twitter APIv2 map_item