common.lib.annotation
Annotation class
1""" 2Annotation class 3""" 4 5 6import time 7import json 8 9from common.lib.database import Database 10from common.lib.exceptions import AnnotationException 11 12 13class Annotation: 14 """ 15 Annotation class 16 17 Annotations are always tied to a dataset, a dataset item (e.g. a csv row), 18 an annotation label, and a type ('text', 'multichoice', etc.). 19 20 """ 21 22 # Attributes must be created here to ensure getattr and setattr work properly 23 24 data = None 25 db = None 26 27 id = None # Unique ID for this annotation 28 item_id = None # ID of the item that this annotation was made for, e.g. a post ID. 29 field_id = None # ID for the annotation field 30 dataset = None # Dataset key this annotation is generated from 31 timestamp = None # When this annotation was edited 32 timestamp_created = None # When this annotation was created 33 label = None # Label of annotation 34 type = None # Type of annotation (e.g. `text`) 35 options = None # Possible options 36 value = None # The actual annotation value 37 author = None # Who last edited the annotation 38 author_original = None # Who originally made the annotation 39 by_processor = None # Whether the annotation was made by a processor 40 from_dataset = None # Processor-made dataset key this annotation was generated as part of 41 metadata = None # Misc metadata 42 43 def __init__(self, data=None, annotation_id=None, db=None): 44 """ 45 Instantiate annotation object. 46 47 :param data: Annotation data; should correspond to the annotations table record. 48 :param annotation_id: The ID of an annotation. If given, it retrieves the annotation 49 from the database. 50 :param db: Database connection object 51 """ 52 53 required_fields = ["field_id", "item_id", "dataset"] 54 55 # Must have an ID or data 56 if (annotation_id is None and data is None) or (data is not None and not isinstance(data, dict)): 57 raise AnnotationException("Annotation() requires either a valid `data` dictionary or ID.") 58 59 if not db: 60 raise AnnotationException("Annotation() needs a `db` database object") 61 62 self.db = db 63 64 new_or_updated = False 65 66 # Get the annotation data if the ID is given; if an annotation has 67 # an ID, it is guaranteed to be in the database. 68 # IDs can both be explicitly given or present in the data dict. 69 if annotation_id is not None or "id" in data: 70 if data and "id" in data: 71 annotation_id = data["id"] 72 self.id = annotation_id # IDs correspond to unique serial numbers in the database. 73 current = self.get_by_id(annotation_id) 74 if not current: 75 raise AnnotationException( 76 "Annotation() requires a valid ID for an existing annotation, %s given" % id) 77 78 # If an ID is not given, get or create an Annotation object from its data. 79 # First check if required fields are present in `data`. 80 else: 81 for required_field in required_fields: 82 if required_field not in data or not data[required_field]: 83 raise AnnotationException("Annotation() requires a %s field" % required_field) 84 85 # Check if this annotation already exists, based on dataset key, item id, and label. 86 current = self.get_by_field(data["dataset"], data["item_id"], data["field_id"]) 87 88 # If we were able to retrieve an annotation from the db, it already exists 89 if current: 90 # Check if we have to overwrite old data with new data 91 if data: 92 for key, value in data.items(): 93 # Save unknown fields in metadata 94 if key not in current: 95 current["metadata"][key] = value 96 new_or_updated = True 97 # If values differ, update the value 98 elif current[key] != value: 99 current[key] = value 100 new_or_updated = True 101 102 self.data = current 103 104 # If this is a new annotation, set all the properties. 105 else: 106 # Keep track of when the annotation was made 107 created_timestamp = int(time.time()) 108 109 new_data = { 110 "dataset": data["dataset"], 111 "item_id": data["item_id"], 112 "field_id": data["field_id"], 113 "timestamp": created_timestamp, 114 "timestamp_created": created_timestamp, 115 "label": data["label"], 116 "type": data.get("type", "text"), 117 "options": data.get("options", ""), 118 "value": data.get("value", ""), 119 "author": data.get("author", ""), 120 "author_original": data.get("author", ""), 121 "by_processor": data.get("by_processor", False), 122 "from_dataset": data.get("from_dataset", ""), 123 "metadata": data.get("metadata", {}), 124 } 125 126 self.data = new_data 127 new_or_updated = True 128 129 if isinstance(self.data["metadata"], str): 130 try: 131 self.metadata = json.loads(self.data["metadata"]) 132 except (TypeError, json.JSONDecodeError): 133 self.metadata = {} 134 135 for k, v in self.data.items(): 136 # Some type checking 137 try: 138 if k == "timestamp" or k == "timestamp_created": 139 v = int(v) 140 elif k == "by_processor": 141 v = bool(v) 142 except ValueError as e: 143 raise AnnotationException("Annotation fields are not of the right type (%s)" % e) 144 self.__setattr__(k, v) 145 146 # Write to db if anything changed 147 if new_or_updated: 148 self.timestamp = int(time.time()) 149 self.write_to_db() 150 151 def get_by_id(self, annotation_id: int): 152 """ 153 Get annotation by ID 154 155 :param str annotation_id: ID of annotation. 156 :return: Annotation object, or an empty dict if the ID doesn't exist. 157 """ 158 159 try: 160 int(annotation_id) 161 except ValueError: 162 raise AnnotationException("Id '%s' is not valid" % annotation_id) 163 164 data = self.db.fetchone("SELECT * FROM annotations WHERE id = %s" % annotation_id) 165 166 if not data: 167 return {} 168 169 if data["type"] == "checkbox": 170 data["value"] = data["value"].split(",") 171 data["metadata"] = json.loads(data["metadata"]) 172 173 return data 174 175 def get_by_field(self, dataset_key: str, item_id: str, field_id: str) -> dict: 176 """ 177 Get the annotation information via its dataset key, item ID, and field_id. 178 This is always a unique combination. 179 180 :param dataset_key: The key of the dataset this annotation was made for. 181 :param item_id: The ID of the item this annotation was made for. 182 :param field_id: The field ID of the annotation. 183 184 :return data: A dict with data of the retrieved annotation, or an empty dict if it doesn't exist. 185 """ 186 187 data = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s AND item_id = %s AND field_id = %s", 188 (dataset_key, str(item_id), field_id)) 189 if not data: 190 return {} 191 192 if data["type"] == "checkbox": 193 data["value"] = data["value"].split(",") 194 data["metadata"] = json.loads(data["metadata"]) 195 196 return data 197 198 def write_to_db(self): 199 """ 200 Write an annotation to the database. 201 """ 202 db_data = self.data 203 204 db_data["timestamp"] = int(time.time()) 205 m = db_data["metadata"] # To avoid circular reference error 206 db_data["metadata"] = json.dumps(m) 207 if db_data["type"] == "checkbox": 208 db_data["value"] = ",".join(db_data["value"]) 209 210 return self.db.upsert("annotations", data=db_data, constraints=["field_id", "dataset", "item_id"]) 211 212 def delete(self): 213 """ 214 Deletes this annotation 215 """ 216 return self.db.delete("annotations", {"id": self.id}) 217 218 219 @staticmethod 220 def get_annotations_for_dataset(db: Database, dataset_key: str, item_id=None) -> list: 221 """ 222 Returns all annotations for a dataset. 223 :param db: Database object. 224 :param str dataset_key: A dataset key. 225 :param str item_id: An optional item ID to only get annotations from one item (i.g. social media post). 226 227 :return list: List with annotations. 228 """ 229 if not dataset_key: 230 return [] 231 232 if item_id: 233 data = db.fetchall( 234 "SELECT * FROM annotations WHERE dataset = %s AND item_id = %s", 235 (dataset_key, str(item_id),) 236 ) 237 else: 238 data = db.fetchall("SELECT * FROM annotations WHERE dataset = %s", (dataset_key,)) 239 if not data: 240 return [] 241 242 for i in range(len(data)): 243 if data[i]["type"] == "checkbox": 244 data[i]["value"] = data[i]["value"].split(",") 245 data[i]["metadata"] = json.loads(data[i]["metadata"]) 246 247 return [Annotation(data=d, db=db) for d in data] 248 249 @staticmethod 250 def delete_many(db: Database, dataset_key=None, annotation_id=None, field_id=None): 251 """ 252 Deletes annotations for an entire dataset or by a list of (field) IDs. 253 254 :param db: Database object. 255 :param str dataset_key: A dataset key. 256 :param li annotation_id: A list or string of unique annotation IDs. 257 :param li field_id: A list or string of IDs for annotation fields. 258 259 :return int: The number of removed records. 260 """ 261 if not dataset_key and not annotation_id and not field_id: 262 return 0 263 264 where = {} 265 if dataset_key: 266 where["dataset"] = dataset_key 267 if annotation_id: 268 where["id"] = annotation_id 269 if field_id: 270 where["field_id"] = field_id 271 272 return db.delete("annotations", where) 273 274 @staticmethod 275 def update_annotations_via_fields(dataset_key: str, old_fields: dict, new_fields: dict, db: Database) -> int: 276 """ 277 Updates annotations in the annotations table if the input fields 278 themselves have been changed, for instance if a dropdown label is renamed 279 or a field is deleted. 280 281 :param str dataset_key: The dataset key for which fields changed. 282 :param dict old_fields: Old annotation fields. 283 :param dict new_fields: New annotation fields; this should contain not just 284 the additions, but all fields, changed or otherwise. 285 :param db: Database object so we can write. 286 287 :returns int: How many records were affected. 288 """ 289 290 text_fields = ["textarea", "text"] 291 292 # If old and new fields are identical, do nothing. 293 if old_fields == new_fields: 294 return 0 295 296 fields_to_delete = set() # Delete all annotations with this field ID 297 fields_to_update = {} # Update values of annotations with this field ID 298 old_options = {} 299 300 # Loop through the old annotation fields 301 for old_field_id, old_field in old_fields.items(): 302 303 # Delete all annotations of this type if the field is deleted. 304 if old_field_id not in new_fields: 305 fields_to_delete.add(old_field_id) 306 continue 307 308 field_id = old_field_id 309 new_field = new_fields[field_id] 310 311 # If the annotation type has changed, also delete existing annotations, 312 # except between text and textarea, where we can just change the type and keep the text. 313 if old_field["type"] != new_field["type"]: 314 if old_field["type"] not in text_fields and new_field["type"] not in text_fields: 315 fields_to_delete.add(field_id) 316 continue 317 318 # Loop through all the key/values in the new field settings 319 # and update in case it's different from the old values. 320 update_data = {} 321 for field_key, field_value in new_field.items(): 322 323 # Update if values don't match 324 if field_value != old_field.get(field_key): 325 326 # Special case: option values that are removed/renamed. 327 # Here we may have to change/delete values within the 328 # values column. 329 if field_key == "options": 330 331 new_options = field_value 332 333 # Edge case: delete annotations of this type if all option fields are deleted 334 if not new_options: 335 fields_to_delete.add(field_id) 336 continue 337 338 # Changed options values (e.g. renamed or one field deleted) 339 old_options[old_field_id] = old_field.get("options", {}) 340 options_to_update = {} 341 if old_options[old_field_id] and old_options != new_options: 342 options_to_update = new_options 343 344 if options_to_update: 345 update_data[field_key] = {"options": options_to_update} 346 347 # For all other changes, just overwrite with new data. 348 else: 349 update_data[field_key] = field_value 350 351 if update_data: 352 fields_to_update[field_id] = update_data 353 354 # Delete annotations 355 if fields_to_delete: 356 Annotation.delete_many(db, field_id=list(fields_to_delete)) 357 358 # Write changes to fields to database 359 count = 0 360 if fields_to_update: 361 for field_id, updates in fields_to_update.items(): 362 363 # Write to db 364 for column, update_value in updates.items(): 365 366 update_value_insert = update_value 367 if column == "options": 368 update_value_insert = ",".join(list(update_value["options"].values())) 369 370 # Change values of columns 371 updates = db.update("annotations", {column: update_value_insert}, 372 where={"dataset": dataset_key, "field_id": field_id}) 373 count += updates 374 375 # Special case: Changed option labels. 376 # Here we have to also rename/remove inserted options from the `value` column. 377 if column == "options": 378 379 annotations = db.fetchall("SELECT id, options, value FROM annotations " 380 "WHERE dataset = '%s' and field_id = '%s' AND value != '';" 381 % (dataset_key, field_id)) 382 383 for annotation in annotations: 384 annotation_id = annotation["id"] 385 annotation_values = annotation["value"].split(",") 386 387 # Remove or rename options 388 new_values = [] 389 new_options = update_value["options"] # Dict with option id->label as items 390 391 for ann_value in annotation_values: 392 # Get the option ID, so we can see if it's new, deleted, or renamed. 393 # Should always be present in old options dict 394 option_id = [k for k, v in old_options[field_id].items() if v == ann_value][0] 395 # Deleted... 396 if option_id not in new_options: 397 continue 398 # Or replaced with a new, possibly renamed value 399 else: 400 new_values.append(new_options[option_id]) 401 402 new_values = ",".join(new_values) 403 db.update("annotations", {"value": new_values}, where={"id": annotation_id}) 404 405 return count 406 407 def __getattr__(self, attr): 408 """ 409 Getter so we don't have to use .data all the time 410 411 :param attr: Data key to get 412 :return: Value 413 """ 414 415 if attr in dir(self): 416 # an explicitly defined attribute should always be called in favour 417 # of this passthrough 418 attribute = getattr(self, attr) 419 return attribute 420 elif attr in self.data: 421 return self.data[attr] 422 else: 423 raise AttributeError("Annotation instance has no attribute %s" % attr) 424 425 def __setattr__(self, attr, value): 426 """ 427 Setter so we can flexibly update the database 428 429 Also updates internal data stores (.data etc.). If the attribute is 430 unknown, it is stored within the 'metadata' attribute. 431 432 :param str attr: Attribute to update 433 :param value: New value 434 """ 435 436 # don't override behaviour for *actual* class attributes 437 if attr in dir(self): 438 super().__setattr__(attr, value) 439 return 440 441 if attr not in self.data: 442 self.metadata[attr] = value 443 attr = "metadata" 444 value = self.metadata 445 446 if attr == "metadata": 447 value = json.dumps(value) 448 449 self.db.update("annotations", where={"id": self.id}, data={attr: value}) 450 451 self.data[attr] = value 452 if attr == "metadata": 453 self.metadata = json.loads(value)
14class Annotation: 15 """ 16 Annotation class 17 18 Annotations are always tied to a dataset, a dataset item (e.g. a csv row), 19 an annotation label, and a type ('text', 'multichoice', etc.). 20 21 """ 22 23 # Attributes must be created here to ensure getattr and setattr work properly 24 25 data = None 26 db = None 27 28 id = None # Unique ID for this annotation 29 item_id = None # ID of the item that this annotation was made for, e.g. a post ID. 30 field_id = None # ID for the annotation field 31 dataset = None # Dataset key this annotation is generated from 32 timestamp = None # When this annotation was edited 33 timestamp_created = None # When this annotation was created 34 label = None # Label of annotation 35 type = None # Type of annotation (e.g. `text`) 36 options = None # Possible options 37 value = None # The actual annotation value 38 author = None # Who last edited the annotation 39 author_original = None # Who originally made the annotation 40 by_processor = None # Whether the annotation was made by a processor 41 from_dataset = None # Processor-made dataset key this annotation was generated as part of 42 metadata = None # Misc metadata 43 44 def __init__(self, data=None, annotation_id=None, db=None): 45 """ 46 Instantiate annotation object. 47 48 :param data: Annotation data; should correspond to the annotations table record. 49 :param annotation_id: The ID of an annotation. If given, it retrieves the annotation 50 from the database. 51 :param db: Database connection object 52 """ 53 54 required_fields = ["field_id", "item_id", "dataset"] 55 56 # Must have an ID or data 57 if (annotation_id is None and data is None) or (data is not None and not isinstance(data, dict)): 58 raise AnnotationException("Annotation() requires either a valid `data` dictionary or ID.") 59 60 if not db: 61 raise AnnotationException("Annotation() needs a `db` database object") 62 63 self.db = db 64 65 new_or_updated = False 66 67 # Get the annotation data if the ID is given; if an annotation has 68 # an ID, it is guaranteed to be in the database. 69 # IDs can both be explicitly given or present in the data dict. 70 if annotation_id is not None or "id" in data: 71 if data and "id" in data: 72 annotation_id = data["id"] 73 self.id = annotation_id # IDs correspond to unique serial numbers in the database. 74 current = self.get_by_id(annotation_id) 75 if not current: 76 raise AnnotationException( 77 "Annotation() requires a valid ID for an existing annotation, %s given" % id) 78 79 # If an ID is not given, get or create an Annotation object from its data. 80 # First check if required fields are present in `data`. 81 else: 82 for required_field in required_fields: 83 if required_field not in data or not data[required_field]: 84 raise AnnotationException("Annotation() requires a %s field" % required_field) 85 86 # Check if this annotation already exists, based on dataset key, item id, and label. 87 current = self.get_by_field(data["dataset"], data["item_id"], data["field_id"]) 88 89 # If we were able to retrieve an annotation from the db, it already exists 90 if current: 91 # Check if we have to overwrite old data with new data 92 if data: 93 for key, value in data.items(): 94 # Save unknown fields in metadata 95 if key not in current: 96 current["metadata"][key] = value 97 new_or_updated = True 98 # If values differ, update the value 99 elif current[key] != value: 100 current[key] = value 101 new_or_updated = True 102 103 self.data = current 104 105 # If this is a new annotation, set all the properties. 106 else: 107 # Keep track of when the annotation was made 108 created_timestamp = int(time.time()) 109 110 new_data = { 111 "dataset": data["dataset"], 112 "item_id": data["item_id"], 113 "field_id": data["field_id"], 114 "timestamp": created_timestamp, 115 "timestamp_created": created_timestamp, 116 "label": data["label"], 117 "type": data.get("type", "text"), 118 "options": data.get("options", ""), 119 "value": data.get("value", ""), 120 "author": data.get("author", ""), 121 "author_original": data.get("author", ""), 122 "by_processor": data.get("by_processor", False), 123 "from_dataset": data.get("from_dataset", ""), 124 "metadata": data.get("metadata", {}), 125 } 126 127 self.data = new_data 128 new_or_updated = True 129 130 if isinstance(self.data["metadata"], str): 131 try: 132 self.metadata = json.loads(self.data["metadata"]) 133 except (TypeError, json.JSONDecodeError): 134 self.metadata = {} 135 136 for k, v in self.data.items(): 137 # Some type checking 138 try: 139 if k == "timestamp" or k == "timestamp_created": 140 v = int(v) 141 elif k == "by_processor": 142 v = bool(v) 143 except ValueError as e: 144 raise AnnotationException("Annotation fields are not of the right type (%s)" % e) 145 self.__setattr__(k, v) 146 147 # Write to db if anything changed 148 if new_or_updated: 149 self.timestamp = int(time.time()) 150 self.write_to_db() 151 152 def get_by_id(self, annotation_id: int): 153 """ 154 Get annotation by ID 155 156 :param str annotation_id: ID of annotation. 157 :return: Annotation object, or an empty dict if the ID doesn't exist. 158 """ 159 160 try: 161 int(annotation_id) 162 except ValueError: 163 raise AnnotationException("Id '%s' is not valid" % annotation_id) 164 165 data = self.db.fetchone("SELECT * FROM annotations WHERE id = %s" % annotation_id) 166 167 if not data: 168 return {} 169 170 if data["type"] == "checkbox": 171 data["value"] = data["value"].split(",") 172 data["metadata"] = json.loads(data["metadata"]) 173 174 return data 175 176 def get_by_field(self, dataset_key: str, item_id: str, field_id: str) -> dict: 177 """ 178 Get the annotation information via its dataset key, item ID, and field_id. 179 This is always a unique combination. 180 181 :param dataset_key: The key of the dataset this annotation was made for. 182 :param item_id: The ID of the item this annotation was made for. 183 :param field_id: The field ID of the annotation. 184 185 :return data: A dict with data of the retrieved annotation, or an empty dict if it doesn't exist. 186 """ 187 188 data = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s AND item_id = %s AND field_id = %s", 189 (dataset_key, str(item_id), field_id)) 190 if not data: 191 return {} 192 193 if data["type"] == "checkbox": 194 data["value"] = data["value"].split(",") 195 data["metadata"] = json.loads(data["metadata"]) 196 197 return data 198 199 def write_to_db(self): 200 """ 201 Write an annotation to the database. 202 """ 203 db_data = self.data 204 205 db_data["timestamp"] = int(time.time()) 206 m = db_data["metadata"] # To avoid circular reference error 207 db_data["metadata"] = json.dumps(m) 208 if db_data["type"] == "checkbox": 209 db_data["value"] = ",".join(db_data["value"]) 210 211 return self.db.upsert("annotations", data=db_data, constraints=["field_id", "dataset", "item_id"]) 212 213 def delete(self): 214 """ 215 Deletes this annotation 216 """ 217 return self.db.delete("annotations", {"id": self.id}) 218 219 220 @staticmethod 221 def get_annotations_for_dataset(db: Database, dataset_key: str, item_id=None) -> list: 222 """ 223 Returns all annotations for a dataset. 224 :param db: Database object. 225 :param str dataset_key: A dataset key. 226 :param str item_id: An optional item ID to only get annotations from one item (i.g. social media post). 227 228 :return list: List with annotations. 229 """ 230 if not dataset_key: 231 return [] 232 233 if item_id: 234 data = db.fetchall( 235 "SELECT * FROM annotations WHERE dataset = %s AND item_id = %s", 236 (dataset_key, str(item_id),) 237 ) 238 else: 239 data = db.fetchall("SELECT * FROM annotations WHERE dataset = %s", (dataset_key,)) 240 if not data: 241 return [] 242 243 for i in range(len(data)): 244 if data[i]["type"] == "checkbox": 245 data[i]["value"] = data[i]["value"].split(",") 246 data[i]["metadata"] = json.loads(data[i]["metadata"]) 247 248 return [Annotation(data=d, db=db) for d in data] 249 250 @staticmethod 251 def delete_many(db: Database, dataset_key=None, annotation_id=None, field_id=None): 252 """ 253 Deletes annotations for an entire dataset or by a list of (field) IDs. 254 255 :param db: Database object. 256 :param str dataset_key: A dataset key. 257 :param li annotation_id: A list or string of unique annotation IDs. 258 :param li field_id: A list or string of IDs for annotation fields. 259 260 :return int: The number of removed records. 261 """ 262 if not dataset_key and not annotation_id and not field_id: 263 return 0 264 265 where = {} 266 if dataset_key: 267 where["dataset"] = dataset_key 268 if annotation_id: 269 where["id"] = annotation_id 270 if field_id: 271 where["field_id"] = field_id 272 273 return db.delete("annotations", where) 274 275 @staticmethod 276 def update_annotations_via_fields(dataset_key: str, old_fields: dict, new_fields: dict, db: Database) -> int: 277 """ 278 Updates annotations in the annotations table if the input fields 279 themselves have been changed, for instance if a dropdown label is renamed 280 or a field is deleted. 281 282 :param str dataset_key: The dataset key for which fields changed. 283 :param dict old_fields: Old annotation fields. 284 :param dict new_fields: New annotation fields; this should contain not just 285 the additions, but all fields, changed or otherwise. 286 :param db: Database object so we can write. 287 288 :returns int: How many records were affected. 289 """ 290 291 text_fields = ["textarea", "text"] 292 293 # If old and new fields are identical, do nothing. 294 if old_fields == new_fields: 295 return 0 296 297 fields_to_delete = set() # Delete all annotations with this field ID 298 fields_to_update = {} # Update values of annotations with this field ID 299 old_options = {} 300 301 # Loop through the old annotation fields 302 for old_field_id, old_field in old_fields.items(): 303 304 # Delete all annotations of this type if the field is deleted. 305 if old_field_id not in new_fields: 306 fields_to_delete.add(old_field_id) 307 continue 308 309 field_id = old_field_id 310 new_field = new_fields[field_id] 311 312 # If the annotation type has changed, also delete existing annotations, 313 # except between text and textarea, where we can just change the type and keep the text. 314 if old_field["type"] != new_field["type"]: 315 if old_field["type"] not in text_fields and new_field["type"] not in text_fields: 316 fields_to_delete.add(field_id) 317 continue 318 319 # Loop through all the key/values in the new field settings 320 # and update in case it's different from the old values. 321 update_data = {} 322 for field_key, field_value in new_field.items(): 323 324 # Update if values don't match 325 if field_value != old_field.get(field_key): 326 327 # Special case: option values that are removed/renamed. 328 # Here we may have to change/delete values within the 329 # values column. 330 if field_key == "options": 331 332 new_options = field_value 333 334 # Edge case: delete annotations of this type if all option fields are deleted 335 if not new_options: 336 fields_to_delete.add(field_id) 337 continue 338 339 # Changed options values (e.g. renamed or one field deleted) 340 old_options[old_field_id] = old_field.get("options", {}) 341 options_to_update = {} 342 if old_options[old_field_id] and old_options != new_options: 343 options_to_update = new_options 344 345 if options_to_update: 346 update_data[field_key] = {"options": options_to_update} 347 348 # For all other changes, just overwrite with new data. 349 else: 350 update_data[field_key] = field_value 351 352 if update_data: 353 fields_to_update[field_id] = update_data 354 355 # Delete annotations 356 if fields_to_delete: 357 Annotation.delete_many(db, field_id=list(fields_to_delete)) 358 359 # Write changes to fields to database 360 count = 0 361 if fields_to_update: 362 for field_id, updates in fields_to_update.items(): 363 364 # Write to db 365 for column, update_value in updates.items(): 366 367 update_value_insert = update_value 368 if column == "options": 369 update_value_insert = ",".join(list(update_value["options"].values())) 370 371 # Change values of columns 372 updates = db.update("annotations", {column: update_value_insert}, 373 where={"dataset": dataset_key, "field_id": field_id}) 374 count += updates 375 376 # Special case: Changed option labels. 377 # Here we have to also rename/remove inserted options from the `value` column. 378 if column == "options": 379 380 annotations = db.fetchall("SELECT id, options, value FROM annotations " 381 "WHERE dataset = '%s' and field_id = '%s' AND value != '';" 382 % (dataset_key, field_id)) 383 384 for annotation in annotations: 385 annotation_id = annotation["id"] 386 annotation_values = annotation["value"].split(",") 387 388 # Remove or rename options 389 new_values = [] 390 new_options = update_value["options"] # Dict with option id->label as items 391 392 for ann_value in annotation_values: 393 # Get the option ID, so we can see if it's new, deleted, or renamed. 394 # Should always be present in old options dict 395 option_id = [k for k, v in old_options[field_id].items() if v == ann_value][0] 396 # Deleted... 397 if option_id not in new_options: 398 continue 399 # Or replaced with a new, possibly renamed value 400 else: 401 new_values.append(new_options[option_id]) 402 403 new_values = ",".join(new_values) 404 db.update("annotations", {"value": new_values}, where={"id": annotation_id}) 405 406 return count 407 408 def __getattr__(self, attr): 409 """ 410 Getter so we don't have to use .data all the time 411 412 :param attr: Data key to get 413 :return: Value 414 """ 415 416 if attr in dir(self): 417 # an explicitly defined attribute should always be called in favour 418 # of this passthrough 419 attribute = getattr(self, attr) 420 return attribute 421 elif attr in self.data: 422 return self.data[attr] 423 else: 424 raise AttributeError("Annotation instance has no attribute %s" % attr) 425 426 def __setattr__(self, attr, value): 427 """ 428 Setter so we can flexibly update the database 429 430 Also updates internal data stores (.data etc.). If the attribute is 431 unknown, it is stored within the 'metadata' attribute. 432 433 :param str attr: Attribute to update 434 :param value: New value 435 """ 436 437 # don't override behaviour for *actual* class attributes 438 if attr in dir(self): 439 super().__setattr__(attr, value) 440 return 441 442 if attr not in self.data: 443 self.metadata[attr] = value 444 attr = "metadata" 445 value = self.metadata 446 447 if attr == "metadata": 448 value = json.dumps(value) 449 450 self.db.update("annotations", where={"id": self.id}, data={attr: value}) 451 452 self.data[attr] = value 453 if attr == "metadata": 454 self.metadata = json.loads(value)
Annotation class
Annotations are always tied to a dataset, a dataset item (e.g. a csv row), an annotation label, and a type ('text', 'multichoice', etc.).
44 def __init__(self, data=None, annotation_id=None, db=None): 45 """ 46 Instantiate annotation object. 47 48 :param data: Annotation data; should correspond to the annotations table record. 49 :param annotation_id: The ID of an annotation. If given, it retrieves the annotation 50 from the database. 51 :param db: Database connection object 52 """ 53 54 required_fields = ["field_id", "item_id", "dataset"] 55 56 # Must have an ID or data 57 if (annotation_id is None and data is None) or (data is not None and not isinstance(data, dict)): 58 raise AnnotationException("Annotation() requires either a valid `data` dictionary or ID.") 59 60 if not db: 61 raise AnnotationException("Annotation() needs a `db` database object") 62 63 self.db = db 64 65 new_or_updated = False 66 67 # Get the annotation data if the ID is given; if an annotation has 68 # an ID, it is guaranteed to be in the database. 69 # IDs can both be explicitly given or present in the data dict. 70 if annotation_id is not None or "id" in data: 71 if data and "id" in data: 72 annotation_id = data["id"] 73 self.id = annotation_id # IDs correspond to unique serial numbers in the database. 74 current = self.get_by_id(annotation_id) 75 if not current: 76 raise AnnotationException( 77 "Annotation() requires a valid ID for an existing annotation, %s given" % id) 78 79 # If an ID is not given, get or create an Annotation object from its data. 80 # First check if required fields are present in `data`. 81 else: 82 for required_field in required_fields: 83 if required_field not in data or not data[required_field]: 84 raise AnnotationException("Annotation() requires a %s field" % required_field) 85 86 # Check if this annotation already exists, based on dataset key, item id, and label. 87 current = self.get_by_field(data["dataset"], data["item_id"], data["field_id"]) 88 89 # If we were able to retrieve an annotation from the db, it already exists 90 if current: 91 # Check if we have to overwrite old data with new data 92 if data: 93 for key, value in data.items(): 94 # Save unknown fields in metadata 95 if key not in current: 96 current["metadata"][key] = value 97 new_or_updated = True 98 # If values differ, update the value 99 elif current[key] != value: 100 current[key] = value 101 new_or_updated = True 102 103 self.data = current 104 105 # If this is a new annotation, set all the properties. 106 else: 107 # Keep track of when the annotation was made 108 created_timestamp = int(time.time()) 109 110 new_data = { 111 "dataset": data["dataset"], 112 "item_id": data["item_id"], 113 "field_id": data["field_id"], 114 "timestamp": created_timestamp, 115 "timestamp_created": created_timestamp, 116 "label": data["label"], 117 "type": data.get("type", "text"), 118 "options": data.get("options", ""), 119 "value": data.get("value", ""), 120 "author": data.get("author", ""), 121 "author_original": data.get("author", ""), 122 "by_processor": data.get("by_processor", False), 123 "from_dataset": data.get("from_dataset", ""), 124 "metadata": data.get("metadata", {}), 125 } 126 127 self.data = new_data 128 new_or_updated = True 129 130 if isinstance(self.data["metadata"], str): 131 try: 132 self.metadata = json.loads(self.data["metadata"]) 133 except (TypeError, json.JSONDecodeError): 134 self.metadata = {} 135 136 for k, v in self.data.items(): 137 # Some type checking 138 try: 139 if k == "timestamp" or k == "timestamp_created": 140 v = int(v) 141 elif k == "by_processor": 142 v = bool(v) 143 except ValueError as e: 144 raise AnnotationException("Annotation fields are not of the right type (%s)" % e) 145 self.__setattr__(k, v) 146 147 # Write to db if anything changed 148 if new_or_updated: 149 self.timestamp = int(time.time()) 150 self.write_to_db()
Instantiate annotation object.
Parameters
- data: Annotation data; should correspond to the annotations table record.
- annotation_id: The ID of an annotation. If given, it retrieves the annotation from the database.
- db: Database connection object
152 def get_by_id(self, annotation_id: int): 153 """ 154 Get annotation by ID 155 156 :param str annotation_id: ID of annotation. 157 :return: Annotation object, or an empty dict if the ID doesn't exist. 158 """ 159 160 try: 161 int(annotation_id) 162 except ValueError: 163 raise AnnotationException("Id '%s' is not valid" % annotation_id) 164 165 data = self.db.fetchone("SELECT * FROM annotations WHERE id = %s" % annotation_id) 166 167 if not data: 168 return {} 169 170 if data["type"] == "checkbox": 171 data["value"] = data["value"].split(",") 172 data["metadata"] = json.loads(data["metadata"]) 173 174 return data
Get annotation by ID
Parameters
- str annotation_id: ID of annotation.
Returns
Annotation object, or an empty dict if the ID doesn't exist.
176 def get_by_field(self, dataset_key: str, item_id: str, field_id: str) -> dict: 177 """ 178 Get the annotation information via its dataset key, item ID, and field_id. 179 This is always a unique combination. 180 181 :param dataset_key: The key of the dataset this annotation was made for. 182 :param item_id: The ID of the item this annotation was made for. 183 :param field_id: The field ID of the annotation. 184 185 :return data: A dict with data of the retrieved annotation, or an empty dict if it doesn't exist. 186 """ 187 188 data = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s AND item_id = %s AND field_id = %s", 189 (dataset_key, str(item_id), field_id)) 190 if not data: 191 return {} 192 193 if data["type"] == "checkbox": 194 data["value"] = data["value"].split(",") 195 data["metadata"] = json.loads(data["metadata"]) 196 197 return data
Get the annotation information via its dataset key, item ID, and field_id. This is always a unique combination.
Parameters
- dataset_key: The key of the dataset this annotation was made for.
- item_id: The ID of the item this annotation was made for.
- field_id: The field ID of the annotation.
Returns
A dict with data of the retrieved annotation, or an empty dict if it doesn't exist.
199 def write_to_db(self): 200 """ 201 Write an annotation to the database. 202 """ 203 db_data = self.data 204 205 db_data["timestamp"] = int(time.time()) 206 m = db_data["metadata"] # To avoid circular reference error 207 db_data["metadata"] = json.dumps(m) 208 if db_data["type"] == "checkbox": 209 db_data["value"] = ",".join(db_data["value"]) 210 211 return self.db.upsert("annotations", data=db_data, constraints=["field_id", "dataset", "item_id"])
Write an annotation to the database.
213 def delete(self): 214 """ 215 Deletes this annotation 216 """ 217 return self.db.delete("annotations", {"id": self.id})
Deletes this annotation
220 @staticmethod 221 def get_annotations_for_dataset(db: Database, dataset_key: str, item_id=None) -> list: 222 """ 223 Returns all annotations for a dataset. 224 :param db: Database object. 225 :param str dataset_key: A dataset key. 226 :param str item_id: An optional item ID to only get annotations from one item (i.g. social media post). 227 228 :return list: List with annotations. 229 """ 230 if not dataset_key: 231 return [] 232 233 if item_id: 234 data = db.fetchall( 235 "SELECT * FROM annotations WHERE dataset = %s AND item_id = %s", 236 (dataset_key, str(item_id),) 237 ) 238 else: 239 data = db.fetchall("SELECT * FROM annotations WHERE dataset = %s", (dataset_key,)) 240 if not data: 241 return [] 242 243 for i in range(len(data)): 244 if data[i]["type"] == "checkbox": 245 data[i]["value"] = data[i]["value"].split(",") 246 data[i]["metadata"] = json.loads(data[i]["metadata"]) 247 248 return [Annotation(data=d, db=db) for d in data]
Returns all annotations for a dataset.
Parameters
- db: Database object.
- str dataset_key: A dataset key.
- str item_id: An optional item ID to only get annotations from one item (i.g. social media post).
Returns
List with annotations.
250 @staticmethod 251 def delete_many(db: Database, dataset_key=None, annotation_id=None, field_id=None): 252 """ 253 Deletes annotations for an entire dataset or by a list of (field) IDs. 254 255 :param db: Database object. 256 :param str dataset_key: A dataset key. 257 :param li annotation_id: A list or string of unique annotation IDs. 258 :param li field_id: A list or string of IDs for annotation fields. 259 260 :return int: The number of removed records. 261 """ 262 if not dataset_key and not annotation_id and not field_id: 263 return 0 264 265 where = {} 266 if dataset_key: 267 where["dataset"] = dataset_key 268 if annotation_id: 269 where["id"] = annotation_id 270 if field_id: 271 where["field_id"] = field_id 272 273 return db.delete("annotations", where)
Deletes annotations for an entire dataset or by a list of (field) IDs.
Parameters
- db: Database object.
- str dataset_key: A dataset key.
- li annotation_id: A list or string of unique annotation IDs.
- li field_id: A list or string of IDs for annotation fields.
Returns
The number of removed records.
275 @staticmethod 276 def update_annotations_via_fields(dataset_key: str, old_fields: dict, new_fields: dict, db: Database) -> int: 277 """ 278 Updates annotations in the annotations table if the input fields 279 themselves have been changed, for instance if a dropdown label is renamed 280 or a field is deleted. 281 282 :param str dataset_key: The dataset key for which fields changed. 283 :param dict old_fields: Old annotation fields. 284 :param dict new_fields: New annotation fields; this should contain not just 285 the additions, but all fields, changed or otherwise. 286 :param db: Database object so we can write. 287 288 :returns int: How many records were affected. 289 """ 290 291 text_fields = ["textarea", "text"] 292 293 # If old and new fields are identical, do nothing. 294 if old_fields == new_fields: 295 return 0 296 297 fields_to_delete = set() # Delete all annotations with this field ID 298 fields_to_update = {} # Update values of annotations with this field ID 299 old_options = {} 300 301 # Loop through the old annotation fields 302 for old_field_id, old_field in old_fields.items(): 303 304 # Delete all annotations of this type if the field is deleted. 305 if old_field_id not in new_fields: 306 fields_to_delete.add(old_field_id) 307 continue 308 309 field_id = old_field_id 310 new_field = new_fields[field_id] 311 312 # If the annotation type has changed, also delete existing annotations, 313 # except between text and textarea, where we can just change the type and keep the text. 314 if old_field["type"] != new_field["type"]: 315 if old_field["type"] not in text_fields and new_field["type"] not in text_fields: 316 fields_to_delete.add(field_id) 317 continue 318 319 # Loop through all the key/values in the new field settings 320 # and update in case it's different from the old values. 321 update_data = {} 322 for field_key, field_value in new_field.items(): 323 324 # Update if values don't match 325 if field_value != old_field.get(field_key): 326 327 # Special case: option values that are removed/renamed. 328 # Here we may have to change/delete values within the 329 # values column. 330 if field_key == "options": 331 332 new_options = field_value 333 334 # Edge case: delete annotations of this type if all option fields are deleted 335 if not new_options: 336 fields_to_delete.add(field_id) 337 continue 338 339 # Changed options values (e.g. renamed or one field deleted) 340 old_options[old_field_id] = old_field.get("options", {}) 341 options_to_update = {} 342 if old_options[old_field_id] and old_options != new_options: 343 options_to_update = new_options 344 345 if options_to_update: 346 update_data[field_key] = {"options": options_to_update} 347 348 # For all other changes, just overwrite with new data. 349 else: 350 update_data[field_key] = field_value 351 352 if update_data: 353 fields_to_update[field_id] = update_data 354 355 # Delete annotations 356 if fields_to_delete: 357 Annotation.delete_many(db, field_id=list(fields_to_delete)) 358 359 # Write changes to fields to database 360 count = 0 361 if fields_to_update: 362 for field_id, updates in fields_to_update.items(): 363 364 # Write to db 365 for column, update_value in updates.items(): 366 367 update_value_insert = update_value 368 if column == "options": 369 update_value_insert = ",".join(list(update_value["options"].values())) 370 371 # Change values of columns 372 updates = db.update("annotations", {column: update_value_insert}, 373 where={"dataset": dataset_key, "field_id": field_id}) 374 count += updates 375 376 # Special case: Changed option labels. 377 # Here we have to also rename/remove inserted options from the `value` column. 378 if column == "options": 379 380 annotations = db.fetchall("SELECT id, options, value FROM annotations " 381 "WHERE dataset = '%s' and field_id = '%s' AND value != '';" 382 % (dataset_key, field_id)) 383 384 for annotation in annotations: 385 annotation_id = annotation["id"] 386 annotation_values = annotation["value"].split(",") 387 388 # Remove or rename options 389 new_values = [] 390 new_options = update_value["options"] # Dict with option id->label as items 391 392 for ann_value in annotation_values: 393 # Get the option ID, so we can see if it's new, deleted, or renamed. 394 # Should always be present in old options dict 395 option_id = [k for k, v in old_options[field_id].items() if v == ann_value][0] 396 # Deleted... 397 if option_id not in new_options: 398 continue 399 # Or replaced with a new, possibly renamed value 400 else: 401 new_values.append(new_options[option_id]) 402 403 new_values = ",".join(new_values) 404 db.update("annotations", {"value": new_values}, where={"id": annotation_id}) 405 406 return count
Updates annotations in the annotations table if the input fields themselves have been changed, for instance if a dropdown label is renamed or a field is deleted.
Parameters
- str dataset_key: The dataset key for which fields changed.
- dict old_fields: Old annotation fields.
- dict new_fields: New annotation fields; this should contain not just the additions, but all fields, changed or otherwise.
- db: Database object so we can write.
:returns int: How many records were affected.