Source code for dynamicannotationdb.annotation_client

from dynamicannotationdb.interface import DynamicAnnotationInterface
from dynamicannotationdb.errors import AnnotationInsertLimitExceeded, \
                                      NoAnnotationsFoundWithID, \
                                      UpdateAnnotationError
from dynamicannotationdb.models import AnnoMetadata
from emannotationschemas import get_flat_schema
from marshmallow import INCLUDE
from sqlalchemy.exc import ArgumentError, InvalidRequestError, OperationalError, IntegrityError
from sqlalchemy.orm.exc import NoResultFound

from typing import List
import datetime
import logging
import json


[docs]class DynamicAnnotationClient(DynamicAnnotationInterface): def __init__(self, aligned_volume, sql_base_uri): sql_uri = self.create_or_select_database(aligned_volume, sql_base_uri) super().__init__(sql_uri) self.aligned_volume = aligned_volume self._table = None self._cached_schemas = {} @property def table(self): return self._table
[docs] def load_table(self, table_name: str): """Load a table Parameters ---------- table_name : str name of table Returns ------- DeclarativeMeta the sqlalchemy table of that name """ self._table = self._cached_table(table_name) return self._table
[docs] def create_table(self, table_name: str, schema_type: str, description: str, user_id: str, reference_table: str = None, flat_segmentation_source: str = None): r"""Create a new annotation table Parameters ---------- table_name : str name of new table schema_type : str type of schema for that table description: str a string with a human readable explanation of what is in the table. Including who made it and any information that helps interpret the fields of the annotations. user_id: str user id for this table reference_table: str reference table name, if required by this schema flat_segmentation_source: str a path to a segmentation source associated with this table i.e. 'precomputed:\\gs:\\my_synapse_seg\example1' Returns ------- str description of table at creation time """ # TODO: check that schemas that are reference schemas # have a reference_table in their metadata return self.create_annotation_table(table_name, schema_type, description, user_id, reference_table=reference_table, flat_segmentation_source=flat_segmentation_source)
[docs] def delete_table(self, table_name: str) -> bool: """Marks a table for deletion, which will remove it from user visible calls and stop materialization from happening on this table only updates metadata to reflect deleted timestamp. Parameters ---------- table_name : str name of table to mark for deletion Returns ------- bool whether table was successfully deleted """ metadata = self.cached_session.query(AnnoMetadata). \ filter(AnnoMetadata.table_name == table_name).first() metadata.deleted = datetime.datetime.now() self.commit_session() return True
[docs] def drop_table(self, table_name: str) -> bool: """Drop a table, actually removes it from the database along with segmentation tables associated with it Parameters ---------- table_name : str name of table to drop Returns ------- bool whether drop was successful """ return self._drop_table(table_name)
[docs] def insert_annotations(self, table_name: str, annotations: List[dict]): """Insert some annotations. Parameters ---------- table_name : str name of target table to insert annotations annotations : list of dict a list of dicts with the annotations that meet the schema Returns ------- bool True is successfully inserted annotations Raises ------ AnnotationInsertLimitExceeded Exception raised when amount of annotations exceeds defined limit. """ insertion_limit = 10_000 if len(annotations) > insertion_limit: raise AnnotationInsertLimitExceeded(len(annotations), insertion_limit) schema_type = self.get_table_schema(table_name) AnnotationModel = self._cached_table(table_name) formatted_anno_data = [] for annotation in annotations: annotation_data, __ = self._get_flattened_schema_data(schema_type, annotation) if annotation.get('id'): annotation_data['id'] = annotation['id'] annotation_data['created'] = datetime.datetime.now() annotation_data['valid'] = True formatted_anno_data.append(annotation_data) annos = [AnnotationModel(**annotation_data) for annotation_data in formatted_anno_data] self.cached_session.add_all(annos) self.commit_session() return True
[docs] def get_annotations(self, table_name: str, annotation_ids: List[int]) -> List[dict]: """Get a set of annotations by ID Parameters ---------- table_name : str name of table annotation_ids : List[int] list of annotation ids to get Returns ------- List[dict] list of returned annotations """ AnnotationModel = self._cached_table(table_name) annotations = self.cached_session.query(AnnotationModel). \ filter(AnnotationModel.id.in_([x for x in annotation_ids])).all() schema_type = self.get_table_schema(table_name) anno_schema, __ = self._get_flattened_schema(schema_type) schema = anno_schema(unknown=INCLUDE) try: data = [] for anno in annotations: anno_data = anno.__dict__ anno_data['created'] = str(anno_data.get('created')) anno_data['deleted'] = str(anno_data.get('deleted')) anno_data.pop('_sa_instance_state', None) data.append(anno_data) return schema.load(data, many=True) except Exception as e: logging.exception(e) raise NoAnnotationsFoundWithID(annotation_ids)
[docs] def update_annotation(self, table_name: str, annotation: dict): """Update an annotation Parameters ---------- table_name : str name of targeted table to update annotations annotation : dict new data for that annotation Returns ------- [type] [description] Raises ------ TODO: make this raise an exception rather than return strings """ anno_id = annotation.get('id') if not anno_id: return "Annotation requires an 'id' to update targeted row" schema_type = self.get_table_schema(table_name) AnnotationModel = self._cached_table(table_name) new_annotation, __ = self._get_flattened_schema_data(schema_type, annotation) new_annotation['created'] = datetime.datetime.now() new_annotation['valid'] = True new_data = AnnotationModel(**new_annotation) try: old_anno = self.cached_session.query(AnnotationModel).filter(AnnotationModel.id == anno_id).one() if old_anno.superceded_id: raise UpdateAnnotationError(anno_id, old_anno.superceded_id) self.cached_session.add(new_data) self.cached_session.flush() deleted_time = datetime.datetime.now() old_anno.deleted = deleted_time old_anno.superceded_id = new_data.id old_anno.valid = False self.commit_session() return f"id {anno_id} updated" except NoResultFound as e: return f"No result found for {anno_id}. Error: {e}"
[docs] def delete_annotation(self, table_name: str, annotation_ids: List[int]): """Delete annotations by ids Parameters ---------- table_name : str name of table to delete from annotation_ids : List[int] list of ids to delete Returns ------- Raises ------ """ AnnotationModel = self._cached_table(table_name) annotations = self.cached_session.query(AnnotationModel).filter(AnnotationModel.id.in_(annotation_ids)).all() if annotations: deleted_time = datetime.datetime.now() for annotation in annotations: annotation.deleted = deleted_time annotation.valid = False self.commit_session() else: return None return True