datasources.dmi-tcat.search_tcat
Twitter search within a DMI-TCAT bin; connect via TCAT frontend
1""" 2Twitter search within a DMI-TCAT bin; connect via TCAT frontend 3""" 4import requests 5import datetime 6import csv 7import json 8import re 9import io 10 11from backend.lib.search import Search 12from common.lib.exceptions import QueryParametersException 13from common.lib.user_input import UserInput 14from common.lib.helpers import sniff_encoding 15from common.lib.item_mapping import MappedItem 16 17from datasources.twitterv2.search_twitter import SearchWithTwitterAPIv2 18 19 20class SearchWithinTCATBins(Search): 21 """ 22 Get Tweets via DMI-TCAT 23 24 This allows subsetting an existing query bin, similar to the 'Data 25 Selection' panel in the DMI-TCAT analysis interface 26 """ 27 type = "dmi-tcat-search" # job ID 28 extension = "ndjson" 29 title = "TCAT Search (HTTP)" 30 31 # TCAT has a few fields that do not exist in APIv2 32 additional_TCAT_fields = ["to_user_name", "filter_level", "favorite_count", "truncated", "from_user_favourites_count", "from_user_lang", "from_user_utcoffset", 33 "from_user_timezone"] 34 35 options = { 36 "intro-1": { 37 "type": UserInput.OPTION_INFO, 38 "help": "This data source interfaces with a DMI-TCAT instance to allow subsetting of tweets from a tweet " 39 "bin in that instance." 40 }, 41 "divider-1": { 42 "type": UserInput.OPTION_DIVIDER 43 }, 44 "bin": { 45 "type": UserInput.OPTION_INFO, 46 "help": "Query bin" 47 }, 48 "query": { 49 "type": UserInput.OPTION_TEXT, 50 "help": "Query text", 51 "tooltip": "Match all tweets containing this text." 52 }, 53 "query-exclude": { 54 "type": UserInput.OPTION_TEXT, 55 "help": "Exclude text", 56 "tooltip": "Match all tweets that do NOT contain this text." 57 }, 58 "user-name": { 59 "type": UserInput.OPTION_TEXT, 60 "help": "From user", 61 "tooltip": "Match all tweets from this username." 62 }, 63 "user-exclude": { 64 "type": UserInput.OPTION_TEXT, 65 "help": "Exclude user", 66 "tooltip": "Match all tweets NOT from this username." 67 }, 68 "exclude-replies": { 69 "type": UserInput.OPTION_CHOICE, 70 "options": { 71 "exclude": "Exclude replies", 72 "include": "Include replies" 73 }, 74 "help": "Reply tweets", 75 "default": "include", 76 "tooltip": "Choose to exclude or include tweets that are replies from the data" 77 }, 78 "daterange": { 79 "type": UserInput.OPTION_DATERANGE, 80 "help": "Date range" 81 }, 82 # Advanced Options Section 83 "divider-2": { 84 "type": UserInput.OPTION_DIVIDER 85 }, 86 "advanced_options_info": { 87 "type": UserInput.OPTION_INFO, 88 "help": "Advanced Query Options can further refine your query" 89 }, 90 "user-bio": { 91 "type": UserInput.OPTION_TEXT, 92 "help": "User bio text", 93 "tooltip": "Match all tweets from users with biographies containing this text." 94 }, 95 "user-language": { 96 "type": UserInput.OPTION_TEXT, 97 "help": "User language", 98 "tooltip": "Match all tweets from users using this language (as detected by Twitter)." 99 }, 100 "tweet-language": { 101 "type": UserInput.OPTION_TEXT, 102 "help": "Tweet language", 103 "tooltip": "Match all tweets from users with this language (as detected by Twitter)." 104 }, 105 "tweet-client": { 106 "type": UserInput.OPTION_TEXT, 107 "help": "Twitter client URL/descr", 108 "tooltip": "Match all tweets from clients that match this text." 109 }, 110 "url": { 111 "type": UserInput.OPTION_TEXT, 112 "help": "(Part of) URL", 113 "tooltip": "Match all tweets containing this (partial) URL." 114 }, 115 "url-media": { 116 "type": UserInput.OPTION_TEXT, 117 "help": "(Part of) media URL", 118 "tooltip": "Match all tweets containing this (partial) media URL." 119 }, 120 } 121 122 config = { 123 "dmi-tcat-search.instances": { 124 "type": UserInput.OPTION_TEXT_JSON, 125 "help": "DMI-TCAT instances", 126 "tooltip": 'List of DMI-TCAT instance URLs, e.g. ["http://username:password@tcat.instance.webpage.net"]. ' 127 'This needs to be formatted as a JSON list of strings.', 128 "default": {} 129 } 130 } 131 132 bin_data = { 133 "all_bins": {}, 134 "last_collected": {}, 135 } 136 137 @classmethod 138 def collect_all_bins(cls, config, force_update=False): 139 """ 140 Requests bin information from TCAT instances 141 """ 142 instances = config.get("dmi-tcat-search.instances", []) 143 for instance in instances: 144 # query each configured TCAT instance for a list of bins that can 145 # be subsetted 146 instance = instance.rstrip("/") 147 api_url = instance + "/api/bin-stats.php" 148 149 if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]: 150 # Collect Instance data 151 try: 152 api_request = requests.get(api_url, timeout=5) 153 instance_bins = json.loads(api_request.content) 154 cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)} 155 cls.bin_data["last_collected"][instance] = datetime.datetime.now() 156 except (requests.RequestException, json.JSONDecodeError): 157 cls.bin_data["all_bins"][instance] = {"failed": True} 158 # TODO: No logger here as nothing has been initialized 159 # print(f"WARNING, unable to collect TCAT bins from instance {instance}") 160 pass 161 162 @classmethod 163 def get_options(cls, parent_dataset=None, config=None): 164 """ 165 Get data source options 166 167 This method takes the pre-defined options, but fills the 'bins' options 168 with bins currently available from the configured TCAT instances. 169 170 :param config: 171 :param DataSet parent_dataset: An object representing the dataset that 172 the processor would be run on 173can 174 be used to show some options only to privileges users. 175 """ 176 options = cls.options 177 178 cls.collect_all_bins(config) 179 if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]): 180 options["bin"] = { 181 "type": UserInput.OPTION_INFO, 182 "help": "Could not connect to DMI-TCAT instance(s)." 183 } 184 return options 185 186 options["bin"] = { 187 "type": UserInput.OPTION_CHOICE, 188 "options": {}, 189 "help": "Query bin" 190 } 191 192 for instance, bins in cls.bin_data["all_bins"].items(): 193 # make the host somewhat human-readable 194 # also strip out embedded HTTP auths 195 host = re.sub(r"^https?://", "", instance).split("@").pop() 196 for bin_name, bin in bins.items(): 197 bin_key = "%s@%s" % (bin_name, host) 198 display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}" 199 options["bin"]["options"][bin_key] = display_text 200 201 return options 202 203 def get_items(self, query): 204 """ 205 Use the DMI-TCAT tweet export to retrieve tweets 206 207 :param query: 208 :return: 209 """ 210 bin = self.parameters.get("bin") 211 bin_name = bin.split("@")[0] 212 bin_host = bin.split("@").pop() 213 214 # we cannot store the full instance URL as a parameter, because it may 215 # contain sensitive information (e.g. HTTP auth) - so we find the full 216 # instance URL again here 217 # while the parameter could be marked 'sensitive', the values would 218 # still show up in e.g. the HTML of the 'create dataset' form 219 available_instances = self.config.get("dmi-tcat-search.instances", []) 220 instance_url = "" 221 instance = None 222 for available_instance in available_instances: 223 hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/") 224 if hostname == bin_host: 225 instance_url = available_instance 226 instance = available_instance.rstrip("/") 227 break 228 229 if not instance_url: 230 return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host) 231 232 # Collect the bins again (ensure we have updated info in case bin is still active) 233 self.collect_all_bins(self.config, force_update=True) 234 # Add metadata to parameters 235 try: 236 current_bin = self.bin_data["all_bins"][instance][bin_name] 237 except KeyError: 238 return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}") 239 # Add TCAT metadata to dataset 240 self.dataset.tcat_bin_data = current_bin 241 if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin): 242 self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.") 243 244 # now get the parameters... 245 request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php" 246 247 # Allow for blank dates 248 if self.parameters.get("min_date"): 249 start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d") 250 else: 251 first_tweet_timestamp = current_bin.get('range').get('first_tweet') 252 start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d") 253 254 end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") 255 parameters = { 256 "dataset": bin_name, 257 "query": self.parameters.get("query"), 258 "url_query": self.parameters.get("url"), 259 "media_url_query": self.parameters.get("url-media"), 260 "exclude": self.parameters.get("query-exclude"), 261 "from_user_name": self.parameters.get("user-name"), 262 "from_user_lang": self.parameters.get("user-language"), 263 "lang": self.parameters.get("tweet-language"), 264 "exclude_from_user_name": self.parameters.get("user-exclude"), 265 "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")), 266 "startdate": start_date, 267 "enddate": end_date, 268 "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no", 269 "whattodo": "", 270 "exportSettings": "urls,mentions,hashtags,media,", 271 "graph_resolution": "day", 272 "outputformat": "csv" 273 } 274 275 # for now we simply request the full CSV export of the bin with the 276 # given parameters, letting TCAT handle the full text search and so 277 # on 278 self.dataset.update_status("Searching for tweets on %s" % bin_host) 279 response = requests.get(request_url, params=parameters, stream=True) 280 if response.status_code != 200: 281 return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code) 282 283 # process the file in 1kB chunks, buffer as we go 284 # If a newline is encountered, the buffer is processed as a row of csv 285 # data. This works as long as there are no newlines in the csv itself, 286 # which is the case for TCAT exports. Processing as a stream is needed 287 # to avoid having to load the full file in memory 288 buffer = bytearray() 289 fieldnames = None 290 items = 0 291 encoding = None 292 api_map_errors = 0 293 mapping_errors = 0 294 for chunk in response.iter_content(chunk_size=1024): 295 # see if this chunk contains a newline, in which case we have a 296 # full line to process (e.g. as a tweet) 297 lines = [] 298 buffer += bytearray(chunk) 299 300 if not encoding and len(buffer) > 3: 301 # response.encoding is not correct sometimes, since it does not 302 # indicate that the file uses a BOM, so sniff it instead once 303 # we have some bytes 304 encoding = sniff_encoding(buffer) 305 306 # split buffer by newlines and process each full line 307 # the last line is always carried over, since it may be incomplete 308 if b"\n" in buffer: 309 buffered_lines = buffer.split(b"\n") 310 lines = buffered_lines[:-1] 311 buffer = buffered_lines.pop() 312 elif not chunk: 313 # eof, process left-over data 314 lines = buffer.split(b"\n") 315 316 # and finally we can process the data 317 for line in lines: 318 # use a dummy csv reader to abstract away the annoying csv parsing 319 # this is quite a bit of overhead, but beats implementing csv parsing 320 # manually, and it's still reasonably fast (about 10k/second) 321 dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding) 322 reader = csv.reader(dummy_file, 323 delimiter=",", 324 quotechar='"', 325 doublequote=True, 326 quoting=csv.QUOTE_MINIMAL) 327 row_data = next(reader) 328 329 if row_data and not fieldnames: 330 # first line in file 331 fieldnames = row_data.copy() 332 333 elif row_data: 334 tweet = dict(zip(fieldnames, row_data)) 335 items += 1 336 337 if items % 250 == 0: 338 self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host)) 339 340 try: 341 formatted_tweet = self.tcat_to_APIv2(tweet) 342 except (KeyError, IndexError) as e: 343 self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}") 344 api_map_errors += 1 345 continue 346 347 # Check mapping errors 348 try: 349 SearchWithTwitterAPIv2.map_item(formatted_tweet) 350 except (KeyError, IndexError) as e: 351 # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON 352 self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}") 353 mapping_errors += 1 354 355 # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later 356 yield formatted_tweet 357 358 if not chunk: 359 # end of file 360 break 361 362 if mapping_errors or api_map_errors: 363 error_message = "" 364 if mapping_errors: 365 error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. " 366 if api_map_errors: 367 error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them." 368 self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})") 369 self.dataset.update_status(error_message, is_final=True) 370 371 @ staticmethod 372 def tcat_to_4cat_time(tcat_time): 373 """ 374 Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp. 375 376 :return datetime: 377 """ 378 try: 379 tcat_time = int(tcat_time) 380 return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z") 381 except ValueError: 382 return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z") 383 384 @staticmethod 385 def tcat_to_APIv2(tcat_tweet): 386 """ 387 Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors! 388 389 A great deal of information is missing so there may result in some issues. Notes are kept for the expected 390 type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors 391 to handle None if necessary. 392 """ 393 # We're missing lots of data here... 394 395 urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url] 396 # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif 397 media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"] 398 399 # 4CAT Twitter APIv2 result data structure 400 APIv2_tweet = { 401 "lang": tcat_tweet["lang"], # str 402 "source": tcat_tweet["source"], # REMOVED FROM TWITTER API v2 403 "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None, # bool 404 "text": tcat_tweet["text"], # str 405 "edit_history_tweet_ids": None, # list; Missing in TCAT data 406 "public_metrics": { 407 "retweet_count": tcat_tweet["retweet_count"], # int 408 "reply_count": None, # int; Missing in TCAT data 409 "like_count": tcat_tweet["favorite_count"], # int 410 "quote_count": None, # int; Missing in TCAT data 411 "impression_count": None, # int; Missing in TCAT data 412 # TCAT has also favorite_count 413 }, 414 "entities": { 415 "mentions": [{ 416 "id": None, # str; Missing in TCAT data 417 "username": mention.strip(), # str 418 # Twitter v2 API has additional user fields 419 } for mention in tcat_tweet["mentions"].split(";") if mention], 420 "annotations": None, # list; Missing in TCAT data 421 "urls": [{ 422 "url": url, # str 423 "expanded_url": url, # str 424 # Twitter v2 API has additional URL fields 425 } for url in urls], 426 "hashtags": [{ 427 "tag": hashtag.strip(), # str 428 "start": None, # int; Missing in TCAT data 429 "end": None, # int; Missing in TCAT data 430 } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag], 431 "cashtags": None, # list; Missing in TCAT data 432 }, 433 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]), # str 434 "id": tcat_tweet["id"], # str 435 "author_id": tcat_tweet["from_user_id"], # str 436 "context_annotations": None, # list; Missing in TCAT data 437 "reply_settings": None, # str; Missing in TCAT data 438 "conversation_id": None, # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation 439 "author_user": { 440 "protected": None, # bool; Missing in TCAT data 441 "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None, # bool 442 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "", # str; may be Missing in TCAT data 443 "name": tcat_tweet["from_user_realname"], # str 444 "entities": { 445 "description": None, # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data 446 "url": None, # dict; containers entities from author url e.g. URL data; Missing in TCAT data 447 }, 448 "description": tcat_tweet["from_user_description"], # str 449 "pinned_tweet_id": None, # str; Missing in TCAT data 450 "profile_image_url": tcat_tweet["from_user_profile_image_url"], # str 451 "url": tcat_tweet["from_user_url"], # str 452 "username": tcat_tweet["from_user_name"], # str 453 "id": tcat_tweet["from_user_id"], # str 454 "location": None, # str; Missing in TCAT data 455 "public_metrics": { 456 "followers_count": tcat_tweet["from_user_followercount"], # int 457 "following_count": tcat_tweet["from_user_friendcount"], # int 458 "tweet_count": tcat_tweet["from_user_tweetcount"], # int 459 "listed_count": tcat_tweet["from_user_listed"], # int 460 # TCAT has also from_user_favourites_count 461 }, 462 "withheld": { 463 "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 464 }, 465 # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone 466 }, 467 "attachments": { 468 # TCAT has some media data, but not the URLs listed 469 "media_keys": [{ 470 "type": media_type, 471 "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]), # str; TCAT does not have the URL though it may be in the list of URLs 472 "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}] # list; This is not the expected direct link to video, but it is a URL to the video 473 # Twitter API v2 has additional data 474 }], # list; TCAT seems to only have one type of media per tweet 475 "poll_ids": None, # list; Missing from TCAT data 476 }, 477 "geo": { 478 "place_id": None, # str; Missing from TCAT data 479 "place": { 480 "country": None, # str; Missing from TCAT data 481 "id": None, # str; Missing from TCAT data 482 "geo": { 483 484 }, 485 "country_code": None, # str; Missing from TCAT data 486 "name": tcat_tweet["location"], # str 487 "place_type": None, # str; Missing from TCAT data 488 "full_name": tcat_tweet["location"], # str 489 }, 490 "coordindates": { 491 "type": None, # str; Missing from TCAT data 492 "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]], # list i.e. [longitude, latitude] 493 }, 494 }, 495 "withheld": { 496 "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None, # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess 497 "country_codes": tcat_tweet["withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 498 }, 499 } 500 501 # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing 502 referenced_tweets = [] 503 if tcat_tweet["text"][:4] == "RT @": 504 # Retweet 505 referenced_tweets.append({ 506 "type": "retweeted", 507 "id": None, # str; Missing in TCAT data 508 }) 509 if tcat_tweet["quoted_status_id"]: 510 # Quote 511 referenced_tweets.append({ 512 "type": "quoted", 513 "id": tcat_tweet["quoted_status_id"], # str; Missing in TCAT data 514 }) 515 if tcat_tweet["in_reply_to_status_id"]: 516 # Reply 517 referenced_tweets.append({ 518 "type": "replied_to", 519 "id": tcat_tweet["in_reply_to_status_id"], # str; Missing in TCAT data 520 }) 521 # These should NOT be None in case a processor/user attempts to identify a reply using these 522 APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN" # str; Missing from TCAT data 523 APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"} # dict; Missing from TCAT data 524 525 APIv2_tweet["referenced_tweets"] = referenced_tweets # list 526 527 # Append any extra TCAT data 528 additional_TCAT_data = {} 529 for field in SearchWithinTCATBins.additional_TCAT_fields: 530 additional_TCAT_data["TCAT_"+field] = tcat_tweet[field] 531 APIv2_tweet.update(additional_TCAT_data) 532 533 return APIv2_tweet 534 535 @staticmethod 536 def validate_query(query, request, config): 537 """ 538 Validate DMI-TCAT query input 539 540 :param dict query: Query parameters, from client-side. 541 :param request: Flask request 542 :param ConfigManager|None config: Configuration reader (context-aware) 543 :return dict: Safe query parameters 544 """ 545 # no query 4 u 546 if not query.get("bin", "").strip(): 547 raise QueryParametersException("You must choose a query bin to get tweets from.") 548 549 # Dates need to make sense as a range to search within 550 after, before = query.get("daterange") 551 if (after and before) and before <= after: 552 raise QueryParametersException("A date range must start before it ends") 553 554 query["min_date"], query["max_date"] = query.get("daterange") 555 del query["daterange"] 556 557 # simple! 558 return query 559 560 @staticmethod 561 def map_item(item): 562 """ 563 Use Twitter APIv2 map_item 564 """ 565 mapped_tweet = SearchWithTwitterAPIv2.map_item(item) 566 567 # Add TCAT extra data 568 data = mapped_tweet.get_item_data() 569 message = mapped_tweet.get_message() 570 for field in SearchWithinTCATBins.additional_TCAT_fields: 571 data["TCAT_" + field] = item.get("TCAT_" + field) 572 573 return MappedItem(data, message)
21class SearchWithinTCATBins(Search): 22 """ 23 Get Tweets via DMI-TCAT 24 25 This allows subsetting an existing query bin, similar to the 'Data 26 Selection' panel in the DMI-TCAT analysis interface 27 """ 28 type = "dmi-tcat-search" # job ID 29 extension = "ndjson" 30 title = "TCAT Search (HTTP)" 31 32 # TCAT has a few fields that do not exist in APIv2 33 additional_TCAT_fields = ["to_user_name", "filter_level", "favorite_count", "truncated", "from_user_favourites_count", "from_user_lang", "from_user_utcoffset", 34 "from_user_timezone"] 35 36 options = { 37 "intro-1": { 38 "type": UserInput.OPTION_INFO, 39 "help": "This data source interfaces with a DMI-TCAT instance to allow subsetting of tweets from a tweet " 40 "bin in that instance." 41 }, 42 "divider-1": { 43 "type": UserInput.OPTION_DIVIDER 44 }, 45 "bin": { 46 "type": UserInput.OPTION_INFO, 47 "help": "Query bin" 48 }, 49 "query": { 50 "type": UserInput.OPTION_TEXT, 51 "help": "Query text", 52 "tooltip": "Match all tweets containing this text." 53 }, 54 "query-exclude": { 55 "type": UserInput.OPTION_TEXT, 56 "help": "Exclude text", 57 "tooltip": "Match all tweets that do NOT contain this text." 58 }, 59 "user-name": { 60 "type": UserInput.OPTION_TEXT, 61 "help": "From user", 62 "tooltip": "Match all tweets from this username." 63 }, 64 "user-exclude": { 65 "type": UserInput.OPTION_TEXT, 66 "help": "Exclude user", 67 "tooltip": "Match all tweets NOT from this username." 68 }, 69 "exclude-replies": { 70 "type": UserInput.OPTION_CHOICE, 71 "options": { 72 "exclude": "Exclude replies", 73 "include": "Include replies" 74 }, 75 "help": "Reply tweets", 76 "default": "include", 77 "tooltip": "Choose to exclude or include tweets that are replies from the data" 78 }, 79 "daterange": { 80 "type": UserInput.OPTION_DATERANGE, 81 "help": "Date range" 82 }, 83 # Advanced Options Section 84 "divider-2": { 85 "type": UserInput.OPTION_DIVIDER 86 }, 87 "advanced_options_info": { 88 "type": UserInput.OPTION_INFO, 89 "help": "Advanced Query Options can further refine your query" 90 }, 91 "user-bio": { 92 "type": UserInput.OPTION_TEXT, 93 "help": "User bio text", 94 "tooltip": "Match all tweets from users with biographies containing this text." 95 }, 96 "user-language": { 97 "type": UserInput.OPTION_TEXT, 98 "help": "User language", 99 "tooltip": "Match all tweets from users using this language (as detected by Twitter)." 100 }, 101 "tweet-language": { 102 "type": UserInput.OPTION_TEXT, 103 "help": "Tweet language", 104 "tooltip": "Match all tweets from users with this language (as detected by Twitter)." 105 }, 106 "tweet-client": { 107 "type": UserInput.OPTION_TEXT, 108 "help": "Twitter client URL/descr", 109 "tooltip": "Match all tweets from clients that match this text." 110 }, 111 "url": { 112 "type": UserInput.OPTION_TEXT, 113 "help": "(Part of) URL", 114 "tooltip": "Match all tweets containing this (partial) URL." 115 }, 116 "url-media": { 117 "type": UserInput.OPTION_TEXT, 118 "help": "(Part of) media URL", 119 "tooltip": "Match all tweets containing this (partial) media URL." 120 }, 121 } 122 123 config = { 124 "dmi-tcat-search.instances": { 125 "type": UserInput.OPTION_TEXT_JSON, 126 "help": "DMI-TCAT instances", 127 "tooltip": 'List of DMI-TCAT instance URLs, e.g. ["http://username:password@tcat.instance.webpage.net"]. ' 128 'This needs to be formatted as a JSON list of strings.', 129 "default": {} 130 } 131 } 132 133 bin_data = { 134 "all_bins": {}, 135 "last_collected": {}, 136 } 137 138 @classmethod 139 def collect_all_bins(cls, config, force_update=False): 140 """ 141 Requests bin information from TCAT instances 142 """ 143 instances = config.get("dmi-tcat-search.instances", []) 144 for instance in instances: 145 # query each configured TCAT instance for a list of bins that can 146 # be subsetted 147 instance = instance.rstrip("/") 148 api_url = instance + "/api/bin-stats.php" 149 150 if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]: 151 # Collect Instance data 152 try: 153 api_request = requests.get(api_url, timeout=5) 154 instance_bins = json.loads(api_request.content) 155 cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)} 156 cls.bin_data["last_collected"][instance] = datetime.datetime.now() 157 except (requests.RequestException, json.JSONDecodeError): 158 cls.bin_data["all_bins"][instance] = {"failed": True} 159 # TODO: No logger here as nothing has been initialized 160 # print(f"WARNING, unable to collect TCAT bins from instance {instance}") 161 pass 162 163 @classmethod 164 def get_options(cls, parent_dataset=None, config=None): 165 """ 166 Get data source options 167 168 This method takes the pre-defined options, but fills the 'bins' options 169 with bins currently available from the configured TCAT instances. 170 171 :param config: 172 :param DataSet parent_dataset: An object representing the dataset that 173 the processor would be run on 174can 175 be used to show some options only to privileges users. 176 """ 177 options = cls.options 178 179 cls.collect_all_bins(config) 180 if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]): 181 options["bin"] = { 182 "type": UserInput.OPTION_INFO, 183 "help": "Could not connect to DMI-TCAT instance(s)." 184 } 185 return options 186 187 options["bin"] = { 188 "type": UserInput.OPTION_CHOICE, 189 "options": {}, 190 "help": "Query bin" 191 } 192 193 for instance, bins in cls.bin_data["all_bins"].items(): 194 # make the host somewhat human-readable 195 # also strip out embedded HTTP auths 196 host = re.sub(r"^https?://", "", instance).split("@").pop() 197 for bin_name, bin in bins.items(): 198 bin_key = "%s@%s" % (bin_name, host) 199 display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}" 200 options["bin"]["options"][bin_key] = display_text 201 202 return options 203 204 def get_items(self, query): 205 """ 206 Use the DMI-TCAT tweet export to retrieve tweets 207 208 :param query: 209 :return: 210 """ 211 bin = self.parameters.get("bin") 212 bin_name = bin.split("@")[0] 213 bin_host = bin.split("@").pop() 214 215 # we cannot store the full instance URL as a parameter, because it may 216 # contain sensitive information (e.g. HTTP auth) - so we find the full 217 # instance URL again here 218 # while the parameter could be marked 'sensitive', the values would 219 # still show up in e.g. the HTML of the 'create dataset' form 220 available_instances = self.config.get("dmi-tcat-search.instances", []) 221 instance_url = "" 222 instance = None 223 for available_instance in available_instances: 224 hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/") 225 if hostname == bin_host: 226 instance_url = available_instance 227 instance = available_instance.rstrip("/") 228 break 229 230 if not instance_url: 231 return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host) 232 233 # Collect the bins again (ensure we have updated info in case bin is still active) 234 self.collect_all_bins(self.config, force_update=True) 235 # Add metadata to parameters 236 try: 237 current_bin = self.bin_data["all_bins"][instance][bin_name] 238 except KeyError: 239 return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}") 240 # Add TCAT metadata to dataset 241 self.dataset.tcat_bin_data = current_bin 242 if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin): 243 self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.") 244 245 # now get the parameters... 246 request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php" 247 248 # Allow for blank dates 249 if self.parameters.get("min_date"): 250 start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d") 251 else: 252 first_tweet_timestamp = current_bin.get('range').get('first_tweet') 253 start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d") 254 255 end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") 256 parameters = { 257 "dataset": bin_name, 258 "query": self.parameters.get("query"), 259 "url_query": self.parameters.get("url"), 260 "media_url_query": self.parameters.get("url-media"), 261 "exclude": self.parameters.get("query-exclude"), 262 "from_user_name": self.parameters.get("user-name"), 263 "from_user_lang": self.parameters.get("user-language"), 264 "lang": self.parameters.get("tweet-language"), 265 "exclude_from_user_name": self.parameters.get("user-exclude"), 266 "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")), 267 "startdate": start_date, 268 "enddate": end_date, 269 "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no", 270 "whattodo": "", 271 "exportSettings": "urls,mentions,hashtags,media,", 272 "graph_resolution": "day", 273 "outputformat": "csv" 274 } 275 276 # for now we simply request the full CSV export of the bin with the 277 # given parameters, letting TCAT handle the full text search and so 278 # on 279 self.dataset.update_status("Searching for tweets on %s" % bin_host) 280 response = requests.get(request_url, params=parameters, stream=True) 281 if response.status_code != 200: 282 return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code) 283 284 # process the file in 1kB chunks, buffer as we go 285 # If a newline is encountered, the buffer is processed as a row of csv 286 # data. This works as long as there are no newlines in the csv itself, 287 # which is the case for TCAT exports. Processing as a stream is needed 288 # to avoid having to load the full file in memory 289 buffer = bytearray() 290 fieldnames = None 291 items = 0 292 encoding = None 293 api_map_errors = 0 294 mapping_errors = 0 295 for chunk in response.iter_content(chunk_size=1024): 296 # see if this chunk contains a newline, in which case we have a 297 # full line to process (e.g. as a tweet) 298 lines = [] 299 buffer += bytearray(chunk) 300 301 if not encoding and len(buffer) > 3: 302 # response.encoding is not correct sometimes, since it does not 303 # indicate that the file uses a BOM, so sniff it instead once 304 # we have some bytes 305 encoding = sniff_encoding(buffer) 306 307 # split buffer by newlines and process each full line 308 # the last line is always carried over, since it may be incomplete 309 if b"\n" in buffer: 310 buffered_lines = buffer.split(b"\n") 311 lines = buffered_lines[:-1] 312 buffer = buffered_lines.pop() 313 elif not chunk: 314 # eof, process left-over data 315 lines = buffer.split(b"\n") 316 317 # and finally we can process the data 318 for line in lines: 319 # use a dummy csv reader to abstract away the annoying csv parsing 320 # this is quite a bit of overhead, but beats implementing csv parsing 321 # manually, and it's still reasonably fast (about 10k/second) 322 dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding) 323 reader = csv.reader(dummy_file, 324 delimiter=",", 325 quotechar='"', 326 doublequote=True, 327 quoting=csv.QUOTE_MINIMAL) 328 row_data = next(reader) 329 330 if row_data and not fieldnames: 331 # first line in file 332 fieldnames = row_data.copy() 333 334 elif row_data: 335 tweet = dict(zip(fieldnames, row_data)) 336 items += 1 337 338 if items % 250 == 0: 339 self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host)) 340 341 try: 342 formatted_tweet = self.tcat_to_APIv2(tweet) 343 except (KeyError, IndexError) as e: 344 self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}") 345 api_map_errors += 1 346 continue 347 348 # Check mapping errors 349 try: 350 SearchWithTwitterAPIv2.map_item(formatted_tweet) 351 except (KeyError, IndexError) as e: 352 # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON 353 self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}") 354 mapping_errors += 1 355 356 # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later 357 yield formatted_tweet 358 359 if not chunk: 360 # end of file 361 break 362 363 if mapping_errors or api_map_errors: 364 error_message = "" 365 if mapping_errors: 366 error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. " 367 if api_map_errors: 368 error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them." 369 self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})") 370 self.dataset.update_status(error_message, is_final=True) 371 372 @ staticmethod 373 def tcat_to_4cat_time(tcat_time): 374 """ 375 Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp. 376 377 :return datetime: 378 """ 379 try: 380 tcat_time = int(tcat_time) 381 return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z") 382 except ValueError: 383 return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z") 384 385 @staticmethod 386 def tcat_to_APIv2(tcat_tweet): 387 """ 388 Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors! 389 390 A great deal of information is missing so there may result in some issues. Notes are kept for the expected 391 type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors 392 to handle None if necessary. 393 """ 394 # We're missing lots of data here... 395 396 urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url] 397 # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif 398 media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"] 399 400 # 4CAT Twitter APIv2 result data structure 401 APIv2_tweet = { 402 "lang": tcat_tweet["lang"], # str 403 "source": tcat_tweet["source"], # REMOVED FROM TWITTER API v2 404 "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None, # bool 405 "text": tcat_tweet["text"], # str 406 "edit_history_tweet_ids": None, # list; Missing in TCAT data 407 "public_metrics": { 408 "retweet_count": tcat_tweet["retweet_count"], # int 409 "reply_count": None, # int; Missing in TCAT data 410 "like_count": tcat_tweet["favorite_count"], # int 411 "quote_count": None, # int; Missing in TCAT data 412 "impression_count": None, # int; Missing in TCAT data 413 # TCAT has also favorite_count 414 }, 415 "entities": { 416 "mentions": [{ 417 "id": None, # str; Missing in TCAT data 418 "username": mention.strip(), # str 419 # Twitter v2 API has additional user fields 420 } for mention in tcat_tweet["mentions"].split(";") if mention], 421 "annotations": None, # list; Missing in TCAT data 422 "urls": [{ 423 "url": url, # str 424 "expanded_url": url, # str 425 # Twitter v2 API has additional URL fields 426 } for url in urls], 427 "hashtags": [{ 428 "tag": hashtag.strip(), # str 429 "start": None, # int; Missing in TCAT data 430 "end": None, # int; Missing in TCAT data 431 } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag], 432 "cashtags": None, # list; Missing in TCAT data 433 }, 434 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]), # str 435 "id": tcat_tweet["id"], # str 436 "author_id": tcat_tweet["from_user_id"], # str 437 "context_annotations": None, # list; Missing in TCAT data 438 "reply_settings": None, # str; Missing in TCAT data 439 "conversation_id": None, # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation 440 "author_user": { 441 "protected": None, # bool; Missing in TCAT data 442 "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None, # bool 443 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "", # str; may be Missing in TCAT data 444 "name": tcat_tweet["from_user_realname"], # str 445 "entities": { 446 "description": None, # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data 447 "url": None, # dict; containers entities from author url e.g. URL data; Missing in TCAT data 448 }, 449 "description": tcat_tweet["from_user_description"], # str 450 "pinned_tweet_id": None, # str; Missing in TCAT data 451 "profile_image_url": tcat_tweet["from_user_profile_image_url"], # str 452 "url": tcat_tweet["from_user_url"], # str 453 "username": tcat_tweet["from_user_name"], # str 454 "id": tcat_tweet["from_user_id"], # str 455 "location": None, # str; Missing in TCAT data 456 "public_metrics": { 457 "followers_count": tcat_tweet["from_user_followercount"], # int 458 "following_count": tcat_tweet["from_user_friendcount"], # int 459 "tweet_count": tcat_tweet["from_user_tweetcount"], # int 460 "listed_count": tcat_tweet["from_user_listed"], # int 461 # TCAT has also from_user_favourites_count 462 }, 463 "withheld": { 464 "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 465 }, 466 # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone 467 }, 468 "attachments": { 469 # TCAT has some media data, but not the URLs listed 470 "media_keys": [{ 471 "type": media_type, 472 "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]), # str; TCAT does not have the URL though it may be in the list of URLs 473 "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}] # list; This is not the expected direct link to video, but it is a URL to the video 474 # Twitter API v2 has additional data 475 }], # list; TCAT seems to only have one type of media per tweet 476 "poll_ids": None, # list; Missing from TCAT data 477 }, 478 "geo": { 479 "place_id": None, # str; Missing from TCAT data 480 "place": { 481 "country": None, # str; Missing from TCAT data 482 "id": None, # str; Missing from TCAT data 483 "geo": { 484 485 }, 486 "country_code": None, # str; Missing from TCAT data 487 "name": tcat_tweet["location"], # str 488 "place_type": None, # str; Missing from TCAT data 489 "full_name": tcat_tweet["location"], # str 490 }, 491 "coordindates": { 492 "type": None, # str; Missing from TCAT data 493 "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]], # list i.e. [longitude, latitude] 494 }, 495 }, 496 "withheld": { 497 "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None, # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess 498 "country_codes": tcat_tweet["withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 499 }, 500 } 501 502 # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing 503 referenced_tweets = [] 504 if tcat_tweet["text"][:4] == "RT @": 505 # Retweet 506 referenced_tweets.append({ 507 "type": "retweeted", 508 "id": None, # str; Missing in TCAT data 509 }) 510 if tcat_tweet["quoted_status_id"]: 511 # Quote 512 referenced_tweets.append({ 513 "type": "quoted", 514 "id": tcat_tweet["quoted_status_id"], # str; Missing in TCAT data 515 }) 516 if tcat_tweet["in_reply_to_status_id"]: 517 # Reply 518 referenced_tweets.append({ 519 "type": "replied_to", 520 "id": tcat_tweet["in_reply_to_status_id"], # str; Missing in TCAT data 521 }) 522 # These should NOT be None in case a processor/user attempts to identify a reply using these 523 APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN" # str; Missing from TCAT data 524 APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"} # dict; Missing from TCAT data 525 526 APIv2_tweet["referenced_tweets"] = referenced_tweets # list 527 528 # Append any extra TCAT data 529 additional_TCAT_data = {} 530 for field in SearchWithinTCATBins.additional_TCAT_fields: 531 additional_TCAT_data["TCAT_"+field] = tcat_tweet[field] 532 APIv2_tweet.update(additional_TCAT_data) 533 534 return APIv2_tweet 535 536 @staticmethod 537 def validate_query(query, request, config): 538 """ 539 Validate DMI-TCAT query input 540 541 :param dict query: Query parameters, from client-side. 542 :param request: Flask request 543 :param ConfigManager|None config: Configuration reader (context-aware) 544 :return dict: Safe query parameters 545 """ 546 # no query 4 u 547 if not query.get("bin", "").strip(): 548 raise QueryParametersException("You must choose a query bin to get tweets from.") 549 550 # Dates need to make sense as a range to search within 551 after, before = query.get("daterange") 552 if (after and before) and before <= after: 553 raise QueryParametersException("A date range must start before it ends") 554 555 query["min_date"], query["max_date"] = query.get("daterange") 556 del query["daterange"] 557 558 # simple! 559 return query 560 561 @staticmethod 562 def map_item(item): 563 """ 564 Use Twitter APIv2 map_item 565 """ 566 mapped_tweet = SearchWithTwitterAPIv2.map_item(item) 567 568 # Add TCAT extra data 569 data = mapped_tweet.get_item_data() 570 message = mapped_tweet.get_message() 571 for field in SearchWithinTCATBins.additional_TCAT_fields: 572 data["TCAT_" + field] = item.get("TCAT_" + field) 573 574 return MappedItem(data, message)
Get Tweets via DMI-TCAT
This allows subsetting an existing query bin, similar to the 'Data Selection' panel in the DMI-TCAT analysis interface
138 @classmethod 139 def collect_all_bins(cls, config, force_update=False): 140 """ 141 Requests bin information from TCAT instances 142 """ 143 instances = config.get("dmi-tcat-search.instances", []) 144 for instance in instances: 145 # query each configured TCAT instance for a list of bins that can 146 # be subsetted 147 instance = instance.rstrip("/") 148 api_url = instance + "/api/bin-stats.php" 149 150 if force_update or instance not in cls.bin_data["last_collected"] or datetime.datetime.now()-datetime.timedelta(days=1) >= cls.bin_data["last_collected"][instance]: 151 # Collect Instance data 152 try: 153 api_request = requests.get(api_url, timeout=5) 154 instance_bins = json.loads(api_request.content) 155 cls.bin_data["all_bins"][instance] = {k: instance_bins[k] for k in sorted(instance_bins)} 156 cls.bin_data["last_collected"][instance] = datetime.datetime.now() 157 except (requests.RequestException, json.JSONDecodeError): 158 cls.bin_data["all_bins"][instance] = {"failed": True} 159 # TODO: No logger here as nothing has been initialized 160 # print(f"WARNING, unable to collect TCAT bins from instance {instance}") 161 pass
Requests bin information from TCAT instances
163 @classmethod 164 def get_options(cls, parent_dataset=None, config=None): 165 """ 166 Get data source options 167 168 This method takes the pre-defined options, but fills the 'bins' options 169 with bins currently available from the configured TCAT instances. 170 171 :param config: 172 :param DataSet parent_dataset: An object representing the dataset that 173 the processor would be run on 174can 175 be used to show some options only to privileges users. 176 """ 177 options = cls.options 178 179 cls.collect_all_bins(config) 180 if all([data.get("failed", False) for instance, data in cls.bin_data["all_bins"].items()]): 181 options["bin"] = { 182 "type": UserInput.OPTION_INFO, 183 "help": "Could not connect to DMI-TCAT instance(s)." 184 } 185 return options 186 187 options["bin"] = { 188 "type": UserInput.OPTION_CHOICE, 189 "options": {}, 190 "help": "Query bin" 191 } 192 193 for instance, bins in cls.bin_data["all_bins"].items(): 194 # make the host somewhat human-readable 195 # also strip out embedded HTTP auths 196 host = re.sub(r"^https?://", "", instance).split("@").pop() 197 for bin_name, bin in bins.items(): 198 bin_key = "%s@%s" % (bin_name, host) 199 display_text = f"{bin_name}: {bin.get('tweets_approximate')} tweets from {bin.get('range').get('first_tweet')} to {bin.get('range').get('last_tweet')}" 200 options["bin"]["options"][bin_key] = display_text 201 202 return options
Get data source options
This method takes the pre-defined options, but fills the 'bins' options
with bins currently available from the configured TCAT instances.
:param config:
:param DataSet parent_dataset: An object representing the dataset that
the processor would be run on
can be used to show some options only to privileges users.
204 def get_items(self, query): 205 """ 206 Use the DMI-TCAT tweet export to retrieve tweets 207 208 :param query: 209 :return: 210 """ 211 bin = self.parameters.get("bin") 212 bin_name = bin.split("@")[0] 213 bin_host = bin.split("@").pop() 214 215 # we cannot store the full instance URL as a parameter, because it may 216 # contain sensitive information (e.g. HTTP auth) - so we find the full 217 # instance URL again here 218 # while the parameter could be marked 'sensitive', the values would 219 # still show up in e.g. the HTML of the 'create dataset' form 220 available_instances = self.config.get("dmi-tcat-search.instances", []) 221 instance_url = "" 222 instance = None 223 for available_instance in available_instances: 224 hostname = re.sub(r"https?://", "", available_instance).split("@").pop().rstrip("/") 225 if hostname == bin_host: 226 instance_url = available_instance 227 instance = available_instance.rstrip("/") 228 break 229 230 if not instance_url: 231 return self.dataset.finish_with_error("Invalid DMI-TCAT instance name '%s'" % bin_host) 232 233 # Collect the bins again (ensure we have updated info in case bin is still active) 234 self.collect_all_bins(self.config, force_update=True) 235 # Add metadata to parameters 236 try: 237 current_bin = self.bin_data["all_bins"][instance][bin_name] 238 except KeyError: 239 return self.dataset.finish_with_error(f"Lost connection to TCAT instance {bin_host}") 240 # Add TCAT metadata to dataset 241 self.dataset.tcat_bin_data = current_bin 242 if current_bin.get("type") in ["follow", "track", "timeline", "geotrack"] and ("phrase_times" not in current_bin or not "user_times" not in current_bin): 243 self.dataset.update_status("Warning: TCAT not updated to send phrase and user time ranges; consider updating if you would like to retain this BIN metadata.") 244 245 # now get the parameters... 246 request_url = instance_url.rstrip("/") + "/analysis/mod.export_tweets.php" 247 248 # Allow for blank dates 249 if self.parameters.get("min_date"): 250 start_date = datetime.datetime.fromtimestamp(self.parameters.get("min_date")).strftime("%Y-%m-%d") 251 else: 252 first_tweet_timestamp = current_bin.get('range').get('first_tweet') 253 start_date = datetime.datetime.strptime(first_tweet_timestamp, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d") 254 255 end_date = datetime.datetime.fromtimestamp(self.parameters.get("max_date")).strftime("%Y-%m-%d") if self.parameters.get("max_date") else (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") 256 parameters = { 257 "dataset": bin_name, 258 "query": self.parameters.get("query"), 259 "url_query": self.parameters.get("url"), 260 "media_url_query": self.parameters.get("url-media"), 261 "exclude": self.parameters.get("query-exclude"), 262 "from_user_name": self.parameters.get("user-name"), 263 "from_user_lang": self.parameters.get("user-language"), 264 "lang": self.parameters.get("tweet-language"), 265 "exclude_from_user_name": self.parameters.get("user-exclude"), 266 "from_source": re.sub(r"<[^>]+>", "", self.parameters.get("tweet-client")), 267 "startdate": start_date, 268 "enddate": end_date, 269 "replyto": "yes" if self.parameters.get("exclude-replies") == "exclude" else "no", 270 "whattodo": "", 271 "exportSettings": "urls,mentions,hashtags,media,", 272 "graph_resolution": "day", 273 "outputformat": "csv" 274 } 275 276 # for now we simply request the full CSV export of the bin with the 277 # given parameters, letting TCAT handle the full text search and so 278 # on 279 self.dataset.update_status("Searching for tweets on %s" % bin_host) 280 response = requests.get(request_url, params=parameters, stream=True) 281 if response.status_code != 200: 282 return self.dataset.finish_with_error("Query bin not available: received HTTP Error %i" % response.status_code) 283 284 # process the file in 1kB chunks, buffer as we go 285 # If a newline is encountered, the buffer is processed as a row of csv 286 # data. This works as long as there are no newlines in the csv itself, 287 # which is the case for TCAT exports. Processing as a stream is needed 288 # to avoid having to load the full file in memory 289 buffer = bytearray() 290 fieldnames = None 291 items = 0 292 encoding = None 293 api_map_errors = 0 294 mapping_errors = 0 295 for chunk in response.iter_content(chunk_size=1024): 296 # see if this chunk contains a newline, in which case we have a 297 # full line to process (e.g. as a tweet) 298 lines = [] 299 buffer += bytearray(chunk) 300 301 if not encoding and len(buffer) > 3: 302 # response.encoding is not correct sometimes, since it does not 303 # indicate that the file uses a BOM, so sniff it instead once 304 # we have some bytes 305 encoding = sniff_encoding(buffer) 306 307 # split buffer by newlines and process each full line 308 # the last line is always carried over, since it may be incomplete 309 if b"\n" in buffer: 310 buffered_lines = buffer.split(b"\n") 311 lines = buffered_lines[:-1] 312 buffer = buffered_lines.pop() 313 elif not chunk: 314 # eof, process left-over data 315 lines = buffer.split(b"\n") 316 317 # and finally we can process the data 318 for line in lines: 319 # use a dummy csv reader to abstract away the annoying csv parsing 320 # this is quite a bit of overhead, but beats implementing csv parsing 321 # manually, and it's still reasonably fast (about 10k/second) 322 dummy_file = io.TextIOWrapper(io.BytesIO(line.replace(b"\0", b"")), encoding=encoding) 323 reader = csv.reader(dummy_file, 324 delimiter=",", 325 quotechar='"', 326 doublequote=True, 327 quoting=csv.QUOTE_MINIMAL) 328 row_data = next(reader) 329 330 if row_data and not fieldnames: 331 # first line in file 332 fieldnames = row_data.copy() 333 334 elif row_data: 335 tweet = dict(zip(fieldnames, row_data)) 336 items += 1 337 338 if items % 250 == 0: 339 self.dataset.update_status("Loaded %i tweets from bin %s@%s" % (items, bin_name, bin_host)) 340 341 try: 342 formatted_tweet = self.tcat_to_APIv2(tweet) 343 except (KeyError, IndexError) as e: 344 self.dataset.log(f"Error converting TCAT tweet ({items}) to APIv2 format: {e}") 345 api_map_errors += 1 346 continue 347 348 # Check mapping errors 349 try: 350 SearchWithTwitterAPIv2.map_item(formatted_tweet) 351 except (KeyError, IndexError) as e: 352 # these tweets will not be usable by 4CAT processors, but we can still yield and they are availalbe for download as NDJSON 353 self.dataset.log(f"Error mapping TCAT tweet ({items}) to 4CAT Twitter format: {e}") 354 mapping_errors += 1 355 356 # yield formatted_tweet which contains some TCAT specific fields; mapping to X/Twitter APIv2 format is done later 357 yield formatted_tweet 358 359 if not chunk: 360 # end of file 361 break 362 363 if mapping_errors or api_map_errors: 364 error_message = "" 365 if mapping_errors: 366 error_message += f"{mapping_errors} tweets were unable to be imported from TCAT. " 367 if api_map_errors: 368 error_message += f"{api_map_errors} tweets were unable to be formmated corrected and 4CAT will not be able to analyse them." 369 self.log.warning(f"SearchWithinTCATBins: import mapping issue detected ({self.dataset.key})") 370 self.dataset.update_status(error_message, is_final=True)
Use the DMI-TCAT tweet export to retrieve tweets
Parameters
- query:
Returns
372 @ staticmethod 373 def tcat_to_4cat_time(tcat_time): 374 """ 375 Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp. 376 377 :return datetime: 378 """ 379 try: 380 tcat_time = int(tcat_time) 381 return datetime.datetime.fromtimestamp(tcat_time).strftime("%Y-%m-%dT%H:%M:%S.000Z") 382 except ValueError: 383 return datetime.datetime.strptime(tcat_time, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S.000Z")
Twitter APIv2 time is in format "%Y-%m-%dT%H:%M:%S.000Z" while TCAT uses "%Y-%m-%d %H:%M:%S" and a timestamp.
Returns
385 @staticmethod 386 def tcat_to_APIv2(tcat_tweet): 387 """ 388 Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors! 389 390 A great deal of information is missing so there may result in some issues. Notes are kept for the expected 391 type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors 392 to handle None if necessary. 393 """ 394 # We're missing lots of data here... 395 396 urls = [url.strip() for url in (tcat_tweet["urls_expanded"].split(";") if tcat_tweet["urls_expanded"] else tcat_tweet["urls_followed"].split(";") if tcat_tweet["urls_followed"] else tcat_tweet["urls_followed"].split(";")) if url] 397 # TCAT media_id: 7 = video, 3 = photo, 16 = animated_gif 398 media_type = "video" if tcat_tweet["media_id"] == "7" else "photo" if tcat_tweet["media_id"] == "3" else "animated_gif" if tcat_tweet["media_id"] == "16" else tcat_tweet["media_id"] 399 400 # 4CAT Twitter APIv2 result data structure 401 APIv2_tweet = { 402 "lang": tcat_tweet["lang"], # str 403 "source": tcat_tweet["source"], # REMOVED FROM TWITTER API v2 404 "possibly_sensitive": True if tcat_tweet["possibly_sensitive"] == 1 else False if tcat_tweet["possibly_sensitive"] == 0 else None, # bool 405 "text": tcat_tweet["text"], # str 406 "edit_history_tweet_ids": None, # list; Missing in TCAT data 407 "public_metrics": { 408 "retweet_count": tcat_tweet["retweet_count"], # int 409 "reply_count": None, # int; Missing in TCAT data 410 "like_count": tcat_tweet["favorite_count"], # int 411 "quote_count": None, # int; Missing in TCAT data 412 "impression_count": None, # int; Missing in TCAT data 413 # TCAT has also favorite_count 414 }, 415 "entities": { 416 "mentions": [{ 417 "id": None, # str; Missing in TCAT data 418 "username": mention.strip(), # str 419 # Twitter v2 API has additional user fields 420 } for mention in tcat_tweet["mentions"].split(";") if mention], 421 "annotations": None, # list; Missing in TCAT data 422 "urls": [{ 423 "url": url, # str 424 "expanded_url": url, # str 425 # Twitter v2 API has additional URL fields 426 } for url in urls], 427 "hashtags": [{ 428 "tag": hashtag.strip(), # str 429 "start": None, # int; Missing in TCAT data 430 "end": None, # int; Missing in TCAT data 431 } for hashtag in tcat_tweet["hashtags"].split(";") if hashtag], 432 "cashtags": None, # list; Missing in TCAT data 433 }, 434 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["time"]), # str 435 "id": tcat_tweet["id"], # str 436 "author_id": tcat_tweet["from_user_id"], # str 437 "context_annotations": None, # list; Missing in TCAT data 438 "reply_settings": None, # str; Missing in TCAT data 439 "conversation_id": None, # str; TCAT has a in_reply_to_status_id but this is not necessarily the original Tweet that started the conversation 440 "author_user": { 441 "protected": None, # bool; Missing in TCAT data 442 "verified": True if tcat_tweet["from_user_verified"] == 1 else False if tcat_tweet["from_user_verified"] == 0 else None, # bool 443 "created_at": SearchWithinTCATBins.tcat_to_4cat_time(tcat_tweet["from_user_created_at"]) if tcat_tweet["from_user_created_at"] else "", # str; may be Missing in TCAT data 444 "name": tcat_tweet["from_user_realname"], # str 445 "entities": { 446 "description": None, # dict; contains entities from author description such as mentions, URLs, etc.; Missing in TCAT data 447 "url": None, # dict; containers entities from author url e.g. URL data; Missing in TCAT data 448 }, 449 "description": tcat_tweet["from_user_description"], # str 450 "pinned_tweet_id": None, # str; Missing in TCAT data 451 "profile_image_url": tcat_tweet["from_user_profile_image_url"], # str 452 "url": tcat_tweet["from_user_url"], # str 453 "username": tcat_tweet["from_user_name"], # str 454 "id": tcat_tweet["from_user_id"], # str 455 "location": None, # str; Missing in TCAT data 456 "public_metrics": { 457 "followers_count": tcat_tweet["from_user_followercount"], # int 458 "following_count": tcat_tweet["from_user_friendcount"], # int 459 "tweet_count": tcat_tweet["from_user_tweetcount"], # int 460 "listed_count": tcat_tweet["from_user_listed"], # int 461 # TCAT has also from_user_favourites_count 462 }, 463 "withheld": { 464 "country_codes": tcat_tweet["from_user_withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 465 }, 466 # TCAT has also from_user_lang, from_user_utcoffset, from_user_timezone 467 }, 468 "attachments": { 469 # TCAT has some media data, but not the URLs listed 470 "media_keys": [{ 471 "type": media_type, 472 "url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["photo"]]), # str; TCAT does not have the URL though it may be in the list of URLs 473 "variants": [{"url": ",".join([url for url in urls if (url.split("/")[-2] if len(url.split("/")) > 1 else "") in ["video"]]), "bit_rate":0}] # list; This is not the expected direct link to video, but it is a URL to the video 474 # Twitter API v2 has additional data 475 }], # list; TCAT seems to only have one type of media per tweet 476 "poll_ids": None, # list; Missing from TCAT data 477 }, 478 "geo": { 479 "place_id": None, # str; Missing from TCAT data 480 "place": { 481 "country": None, # str; Missing from TCAT data 482 "id": None, # str; Missing from TCAT data 483 "geo": { 484 485 }, 486 "country_code": None, # str; Missing from TCAT data 487 "name": tcat_tweet["location"], # str 488 "place_type": None, # str; Missing from TCAT data 489 "full_name": tcat_tweet["location"], # str 490 }, 491 "coordindates": { 492 "type": None, # str; Missing from TCAT data 493 "coordinates": [tcat_tweet["lng"], tcat_tweet["lat"]], # list i.e. [longitude, latitude] 494 }, 495 }, 496 "withheld": { 497 "copyright": True if tcat_tweet["withheld_copyright"] == 1 else False if tcat_tweet["withheld_copyright"] == 0 else None, # bool; TODO TCAT has column, but have not seen it populated in testing... This is guess 498 "country_codes": tcat_tweet["withheld_scope"].split(";"), # list; TODO TCAT has column, but have not seen it populated in testing... This is guess 499 }, 500 } 501 502 # Referenced Tweets; Twitter API v2 has entire tweet data here which we will be missing 503 referenced_tweets = [] 504 if tcat_tweet["text"][:4] == "RT @": 505 # Retweet 506 referenced_tweets.append({ 507 "type": "retweeted", 508 "id": None, # str; Missing in TCAT data 509 }) 510 if tcat_tweet["quoted_status_id"]: 511 # Quote 512 referenced_tweets.append({ 513 "type": "quoted", 514 "id": tcat_tweet["quoted_status_id"], # str; Missing in TCAT data 515 }) 516 if tcat_tweet["in_reply_to_status_id"]: 517 # Reply 518 referenced_tweets.append({ 519 "type": "replied_to", 520 "id": tcat_tweet["in_reply_to_status_id"], # str; Missing in TCAT data 521 }) 522 # These should NOT be None in case a processor/user attempts to identify a reply using these 523 APIv2_tweet["in_reply_to_user_id"] = "UNKNOWN" # str; Missing from TCAT data 524 APIv2_tweet["in_reply_to_user"] = {"username": "UNKNOWN"} # dict; Missing from TCAT data 525 526 APIv2_tweet["referenced_tweets"] = referenced_tweets # list 527 528 # Append any extra TCAT data 529 additional_TCAT_data = {} 530 for field in SearchWithinTCATBins.additional_TCAT_fields: 531 additional_TCAT_data["TCAT_"+field] = tcat_tweet[field] 532 APIv2_tweet.update(additional_TCAT_data) 533 534 return APIv2_tweet
Attempt to construct a 4CAT tweet gathered from APIv2 to allow for use of Twitter specific processors!
A great deal of information is missing so there may result in some issues. Notes are kept for the expected type and, if the data is missing in TCAT, None is used. Therefor it should be possible to refactor processors to handle None if necessary.
536 @staticmethod 537 def validate_query(query, request, config): 538 """ 539 Validate DMI-TCAT query input 540 541 :param dict query: Query parameters, from client-side. 542 :param request: Flask request 543 :param ConfigManager|None config: Configuration reader (context-aware) 544 :return dict: Safe query parameters 545 """ 546 # no query 4 u 547 if not query.get("bin", "").strip(): 548 raise QueryParametersException("You must choose a query bin to get tweets from.") 549 550 # Dates need to make sense as a range to search within 551 after, before = query.get("daterange") 552 if (after and before) and before <= after: 553 raise QueryParametersException("A date range must start before it ends") 554 555 query["min_date"], query["max_date"] = query.get("daterange") 556 del query["daterange"] 557 558 # simple! 559 return query
Validate DMI-TCAT query input
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
561 @staticmethod 562 def map_item(item): 563 """ 564 Use Twitter APIv2 map_item 565 """ 566 mapped_tweet = SearchWithTwitterAPIv2.map_item(item) 567 568 # Add TCAT extra data 569 data = mapped_tweet.get_item_data() 570 message = mapped_tweet.get_message() 571 for field in SearchWithinTCATBins.additional_TCAT_fields: 572 data["TCAT_" + field] = item.get("TCAT_" + field) 573 574 return MappedItem(data, message)
Use Twitter APIv2 map_item
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor