Source code for encord.objects.ontology_labels_impl

from __future__ import annotations

from collections import defaultdict
from copy import deepcopy
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import IntEnum
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    NoReturn,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    TypeVar,
    Union,
    overload,
)
from uuid import uuid4

from dateutil.parser import parse

from encord.client import EncordClientProject
from encord.constants.enums import DataType
from encord.exceptions import LabelRowError, OntologyError
from encord.objects.common import (
    Attribute,
    AttributeClasses,
    AttributeTypes,
    ChecklistAttribute,
    FlatOption,
    NestableOption,
    Option,
    OptionClasses,
    OptionTypes,
    RadioAttribute,
    Shape,
    TextAttribute,
    _add_attribute,
    _get_attribute_by_hash,
    _get_attributes_by_title,
    _get_option_by_hash,
    _handle_wrong_number_of_found_items,
    attribute_from_dict,
    attributes_to_list_dict,
)
from encord.objects.constants import (
    DATETIME_LONG_STRING_FORMAT,
    DEFAULT_CONFIDENCE,
    DEFAULT_MANUAL_ANNOTATION,
)
from encord.objects.coordinates import (
    ACCEPTABLE_COORDINATES_FOR_ONTOLOGY_ITEMS,
    BoundingBoxCoordinates,
    Coordinates,
    PointCoordinate,
    PolygonCoordinates,
    PolylineCoordinates,
    RotatableBoundingBoxCoordinates,
)
from encord.objects.frames import (
    Frames,
    Range,
    Ranges,
    frames_class_to_frames_list,
    frames_to_ranges,
    ranges_list_to_ranges,
)
from encord.objects.internal_helpers import (
    Answer,
    _get_static_answer_map,
    _infer_attribute_from_answer,
    _search_child_attributes,
    get_answer_from_object,
    get_default_answer_from_attribute,
    set_answer_for_object,
)
from encord.objects.utils import (
    _lower_snake_case,
    check_email,
    check_type,
    does_type_match,
    filter_by_type,
    short_uuid_str,
)
from encord.orm.formatter import Formatter
from encord.orm.label_row import AnnotationTaskStatus, LabelRowMetadata, LabelStatus


[docs]@dataclass class Object: """ This class is currently in BETA. Its API might change in future minor version releases. """ uid: int name: str color: str shape: Shape feature_node_hash: str attributes: List[Attribute] = field(default_factory=list)
[docs] def create_instance(self) -> ObjectInstance: """Create a :class:`encord.objects.ObjectInstance` to be used with a label row.""" return ObjectInstance(self)
[docs] def get_child_by_hash( self, feature_node_hash: str, type_: Union[AttributeTypes, OptionTypes, None] = None, ) -> Union[AttributeClasses, OptionClasses]: """ Returns the first child node of this ontology tree node with the matching feature node hash. If there is more than one child with the same feature node hash in the ontology tree node, then the ontology would be in an invalid state. Throws if nothing is found or if the type is not matched. Args: feature_node_hash: the feature_node_hash of the child node to search for in the ontology. type_: The expected type of the item. If the found child does not match the type, an error will be thrown. """ found_item = _get_attribute_by_hash(feature_node_hash, self.attributes) if found_item is None: raise OntologyError("Item not found.") check_type(found_item, type_) return found_item
[docs] def get_child_by_title( self, title: str, type_: Union[AttributeTypes, OptionTypes, None] = None, ) -> Union[AttributeClasses, OptionClasses]: """ Returns a child node of this ontology tree node with the matching title and matching type if specified. If more than one child in this Object have the same title, then an error will be thrown. If no item is found, an error will be thrown as well. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the child node. Only a node that matches this type will be returned. """ found_items = self.get_children_by_title(title, type_) _handle_wrong_number_of_found_items(found_items, title, type_) return found_items[0]
[docs] def get_children_by_title( self, title: str, type_: Union[AttributeTypes, OptionTypes, None] = None, ) -> List[Union[AttributeClasses, OptionClasses]]: """ Returns all the child nodes of this ontology tree node with the matching title and matching type if specified. Title in ontologies do not need to be unique, however, we recommend unique titles when creating ontologies. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the item. Only nodes that match this type will be returned. """ found_items = _get_attributes_by_title(title, self.attributes) return filter_by_type(found_items, type_) # noqa
[docs] @classmethod def from_dict(cls, d: dict) -> Object: shape_opt = Shape.from_string(d["shape"]) if shape_opt is None: raise TypeError(f"The shape '{d['shape']}' of the object '{d}' is not recognised") attributes_ret: List[Attribute] = list() if "attributes" in d: for attribute_dict in d["attributes"]: attributes_ret.append(attribute_from_dict(attribute_dict)) object_ret = Object( uid=int(d["id"]), name=d["name"], color=d["color"], shape=shape_opt, feature_node_hash=d["featureNodeHash"], attributes=attributes_ret, ) return object_ret
[docs] def to_dict(self) -> dict: ret = dict() ret["id"] = str(self.uid) ret["name"] = self.name ret["color"] = self.color ret["shape"] = self.shape.value ret["featureNodeHash"] = self.feature_node_hash attributes_list = attributes_to_list_dict(self.attributes) if attributes_list: ret["attributes"] = attributes_list return ret
T = TypeVar("T", bound=Attribute)
[docs] def add_attribute( self, cls: Type[T], name: str, local_uid: Optional[int] = None, feature_node_hash: Optional[str] = None, required: bool = False, dynamic: bool = False, ) -> T: """ Adds an attribute to the object. Args: cls: attribute type, one of `RadioAttribute`, `ChecklistAttribute`, `TextAttribute` name: the user-visible name of the attribute local_uid: integer identifier of the attribute. Normally auto-generated; omit this unless the aim is to create an exact clone of existing ontology feature_node_hash: global identifier of the attribute. Normally auto-generated; omit this unless the aim is to create an exact clone of existing ontology required: whether the label editor would mark this attribute as 'required' dynamic: whether the attribute can have a different answer for the same object across different frames. Returns: the created attribute that can be further specified with Options, where appropriate Raises: ValueError: if specified `local_uid` or `feature_node_hash` violate uniqueness constraints """ return _add_attribute(self.attributes, cls, name, [self.uid], local_uid, feature_node_hash, required, dynamic)
[docs]@dataclass class Classification: """ This class is currently in BETA. Its API might change in future minor version releases. Represents a whole-image classification as part of Ontology structure. Wraps a single Attribute that describes the image in general rather then individual object. """ uid: int feature_node_hash: str attributes: List[Attribute]
[docs] def create_instance(self) -> ClassificationInstance: """Create a :class:`encord.objects.ClassificationInstance` to be used with a label row.""" return ClassificationInstance(self)
[docs] def get_child_by_hash( self, feature_node_hash: str, type_: Union[AttributeTypes, OptionTypes, None] = None, ) -> Union[AttributeClasses, OptionClasses]: """ Returns the first child node of this ontology tree node with the matching feature node hash. If there is more than one child with the same feature node hash in the ontology tree node, then the ontology would be in an invalid state. Throws if nothing is found or if the type is not matched. Args: feature_node_hash: the feature_node_hash of the child node to search for in the ontology. type_: The expected type of the item. If the found child does not match the type, an error will be thrown. """ found_item = _get_attribute_by_hash(feature_node_hash, self.attributes) if found_item is None: raise OntologyError("Item not found.") check_type(found_item, type_) return found_item
[docs] def get_child_by_title( self, title: str, type_: Union[OptionTypes, AttributeTypes, None] = None, ) -> Union[AttributeClasses, OptionClasses]: """ Returns a child node of this ontology tree node with the matching title and matching type if specified. If more than one child in this Object have the same title, then an error will be thrown. If no item is found, an error will be thrown as well. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the child node. Only a node that matches this type will be returned. """ found_items = self.get_children_by_title(title, type_) _handle_wrong_number_of_found_items(found_items, title, type_) return found_items[0]
[docs] def get_children_by_title( self, title: str, type_: Union[OptionTypes, AttributeTypes, None] = None, ) -> List[Union[AttributeClasses, OptionClasses]]: """ Returns all the child nodes of this ontology tree node with the matching title and matching type if specified. Title in ontologies do not need to be unique, however, we recommend unique titles when creating ontologies. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the item. Only nodes that match this type will be returned. """ found_items = _get_attributes_by_title(title, self.attributes) return filter_by_type(found_items, type_) # noqa
[docs] @classmethod def from_dict(cls, d: dict) -> Classification: attributes_ret: List[Attribute] = list() for attribute_dict in d["attributes"]: attributes_ret.append(attribute_from_dict(attribute_dict)) return Classification( uid=int(d["id"]), feature_node_hash=d["featureNodeHash"], attributes=attributes_ret, )
[docs] def to_dict(self) -> dict: ret = dict() ret["id"] = str(self.uid) ret["featureNodeHash"] = self.feature_node_hash attributes_list = attributes_to_list_dict(self.attributes) if attributes_list: ret["attributes"] = attributes_list return ret
T = TypeVar("T", bound=Attribute)
[docs] def add_attribute( self, cls: Type[T], name: str, local_uid: Optional[int] = None, feature_node_hash: Optional[str] = None, required: bool = False, ) -> T: """ Adds an attribute to the classification. Args: cls: attribute type, one of `RadioAttribute`, `ChecklistAttribute`, `TextAttribute` name: the user-visible name of the attribute local_uid: integer identifier of the attribute. Normally auto-generated; omit this unless the aim is to create an exact clone of existing ontology feature_node_hash: global identifier of the attribute. Normally auto-generated; omit this unless the aim is to create an exact clone of existing ontology required: whether the label editor would mark this attribute as 'required' Returns: the created attribute that can be further specified with Options, where appropriate Raises: ValueError: if the classification already has an attribute assigned """ if self.attributes: raise ValueError("Classification should have exactly one root attribute") return _add_attribute(self.attributes, cls, name, [self.uid], local_uid, feature_node_hash, required)
def __hash__(self): return hash(self.feature_node_hash)
OntologyTypes = Union[Type[Object], Type[Classification]] OntologyClasses = Union[Object, Classification]
[docs]class ClassificationInstance: def __init__(self, ontology_classification: Classification, *, classification_hash: Optional[str] = None): self._ontology_classification = ontology_classification self._parent: Optional[LabelRowV2] = None self._classification_hash = classification_hash or short_uuid_str() self._static_answer_map: Dict[str, Answer] = _get_static_answer_map(self._ontology_classification.attributes) # feature_node_hash of attribute to the answer. self._frames_to_data: Dict[int, ClassificationInstance.FrameData] = defaultdict(self.FrameData) @property def classification_hash(self) -> str: return self._classification_hash @classification_hash.setter def classification_hash(self, v: Any) -> NoReturn: raise LabelRowError("Cannot set the object hash on an instantiated label object.") @property def ontology_item(self) -> Classification: return deepcopy(self._ontology_classification) @property def _last_frame(self) -> Union[int, float]: if self._parent is None or self._parent.data_type is DataType.DICOM: return float("inf") else: return self._parent.number_of_frames
[docs] def is_assigned_to_label_row(self) -> bool: return self._parent is not None
[docs] def set_for_frames( self, frames: Frames = 0, *, overwrite: bool = False, created_at: datetime = datetime.now(), created_by: str = None, confidence: int = DEFAULT_CONFIDENCE, manual_annotation: bool = DEFAULT_MANUAL_ANNOTATION, last_edited_at: datetime = datetime.now(), last_edited_by: Optional[str] = None, reviews: Optional[List[dict]] = None, ) -> None: """ Places the classification onto the specified frame. If the classification already exists on the frame and overwrite is set to `True`, the currently specified values will be overwritten. Args: frames: The frame to add the classification instance to. Defaulting to the first frame for convenience. overwrite: If `True`, overwrite existing data for the given frames. This will not reset all the non-specified values. If `False` and data already exists for the given frames, raises an error. created_at: Optionally specify the creation time of the classification instance on this frame. Defaults to `datetime.now()`. created_by: Optionally specify the creator of the classification instance on this frame. Defaults to the current SDK user. last_edited_at: Optionally specify the last edit time of the classification instance on this frame. Defaults to `datetime.now()`. last_edited_by: Optionally specify the last editor of the classification instance on this frame. Defaults to the current SDK user. confidence: Optionally specify the confidence of the classification instance on this frame. Defaults to `1.0`. manual_annotation: Optionally specify whether the classification instance on this frame was manually annotated. Defaults to `True`. reviews: Should only be set by internal functions. """ frames_list = frames_class_to_frames_list(frames) self._check_classification_already_present(frames_list) for frame in frames_list: self._check_within_range(frame) self._set_frame_and_frame_data( frame, overwrite=overwrite, created_at=created_at, created_by=created_by, confidence=confidence, manual_annotation=manual_annotation, last_edited_at=last_edited_at, last_edited_by=last_edited_by, reviews=reviews, )
[docs] def get_annotation(self, frame: Union[int, str] = 0) -> Annotation: """ Args: frame: Either the frame number or the image hash if the data type is an image or image group. Defaults to the first frame. """ if isinstance(frame, str): frame = self._parent.get_frame_number(frame) return self.Annotation(self, frame)
[docs] def remove_from_frames(self, frames: Frames) -> None: frame_list = frames_class_to_frames_list(frames) for frame in frame_list: self._frames_to_data.pop(frame) if self.is_assigned_to_label_row(): self._parent._remove_frames_from_classification(self.ontology_item, frame_list) self._parent._remove_from_frame_to_hashes_map(frame_list, self.classification_hash)
[docs] def get_annotations(self) -> List[Annotation]: """ Returns: A list of `ClassificationInstance.Annotation` in order of available frames. """ ret = [] for frame_num in sorted(self._frames_to_data.keys()): ret.append(self.get_annotation(frame_num)) return ret
[docs] def is_valid(self) -> None: if not len(self._frames_to_data) > 0: raise LabelRowError("ClassificationInstance is not on any frames. Please add it to at least one frame.")
[docs] def set_answer( self, answer: Union[str, Option, Iterable[Option]], attribute: Optional[Attribute] = None, overwrite: bool = False, ) -> None: """ Set the answer for a given ontology Attribute. This is the equivalent of e.g. selecting a checkbox in the UI after adding a ClassificationInstance. There is only one answer per ClassificationInstance per Attribute. Args: answer: The answer to set. attribute: The ontology attribute to set the answer for. If not set, this will be attempted to be inferred. For answers to :class:`encord.objects.common.RadioAttribute` or :class:`encord.objects.common.ChecklistAttribute`, this can be inferred automatically. For :class:`encord.objects.common.TextAttribute`, this will only be inferred there is only one possible TextAttribute to set for the entire object instance. Otherwise, a :class:`encord.exceptionsLabelRowError` will be thrown. overwrite: If `True`, the answer will be overwritten if it already exists. If `False`, this will throw a LabelRowError if the answer already exists. """ if attribute is None: attribute = _infer_attribute_from_answer(self._ontology_classification.attributes, answer) elif not self._is_attribute_valid_child_of_classification(attribute): raise LabelRowError("The attribute is not a valid child of the classification.") elif not self._is_selectable_child_attribute(attribute): raise LabelRowError( "Setting a nested attribute is only possible if all parent attributes have been selected." ) static_answer = self._static_answer_map[attribute.feature_node_hash] if static_answer.is_answered() and overwrite is False: raise LabelRowError( "The answer to this attribute was already set. Set `overwrite` to `True` if you want to" "overwrite an existing answer to and attribute." ) set_answer_for_object(static_answer, answer)
[docs] def set_answer_from_list(self, answers_list: List[Dict[str, Any]]) -> None: """ This is a low level helper function and should not be used directly. Sets the answer for the classification from a dictionary. Args: answer_dict: The dictionary to set the answer from. """ for answer_dict in answers_list: attribute = _get_attribute_by_hash(answer_dict["featureHash"], self._ontology_classification.attributes) if attribute is None: raise LabelRowError( "One of the attributes does not exist in the ontology. Cannot create a valid LabelRow." ) if not self._is_attribute_valid_child_of_classification(attribute): raise LabelRowError( "One of the attributes set for a classification is not a valid child of the classification. " "Cannot create a valid LabelRow." ) if isinstance(attribute, TextAttribute): self._set_answer_unsafe(answer_dict["answers"], attribute) elif isinstance(attribute, RadioAttribute): feature_hash = answer_dict["answers"][0]["featureHash"] option = _get_option_by_hash(feature_hash, attribute.options) self._set_answer_unsafe(option, attribute) elif isinstance(attribute, ChecklistAttribute): options = [] for answer in answer_dict["answers"]: feature_hash = answer["featureHash"] option = _get_option_by_hash(feature_hash, attribute.options) options.append(option) self._set_answer_unsafe(options, attribute) else: raise NotImplementedError(f"The attribute type {type(attribute)} is not supported.")
[docs] def get_answer(self, attribute: Optional[Attribute] = None) -> Union[str, Option, Iterable[Option], None]: """ Get the answer set for a given ontology Attribute. Returns `None` if the attribute is not yet answered. For the ChecklistAttribute, it returns None if and only if the attribute is nested and the parent is unselected. Otherwise, if not yet answered it will return an empty list. Args: attribute: The ontology attribute to get the answer for. """ if attribute is None: attribute = self._ontology_classification.attributes[0] elif not self._is_attribute_valid_child_of_classification(attribute): raise LabelRowError("The attribute is not a valid child of the classification.") elif not self._is_selectable_child_attribute(attribute): return None static_answer = self._static_answer_map[attribute.feature_node_hash] return get_answer_from_object(static_answer)
[docs] def delete_answer(self, attribute: Optional[Attribute] = None) -> None: """ This resets the answer of an attribute as if it was never set. Args: attribute: The ontology attribute to delete the answer for. If not provided, the first level attribute is used. """ if attribute is None: attribute = self._ontology_classification.attributes[0] elif not self._is_attribute_valid_child_of_classification(attribute): raise LabelRowError("The attribute is not a valid child of the classification.") static_answer = self._static_answer_map[attribute.feature_node_hash] static_answer.unset()
[docs] def copy(self) -> ClassificationInstance: """ Creates an exact copy of this ClassificationInstance but with a new classification hash and without being associated to any LabelRowV2. This is useful if you want to add the semantically same ClassificationInstance to multiple `LabelRowV2`s. """ ret = ClassificationInstance(self._ontology_classification) ret._static_answer_map = deepcopy(self._static_answer_map) ret._frames_to_data = deepcopy(self._frames_to_data) return ret
[docs] def get_all_static_answers(self) -> List[Answer]: """A low level helper function.""" return list(self._static_answer_map.values())
[docs] class Annotation: """ This class can be used to set or get data for a specific annotation (i.e. the ClassificationInstance for a given frame number). """ def __init__(self, classification_instance: ClassificationInstance, frame: int): self._classification_instance = classification_instance self._frame = frame @property def frame(self) -> int: return self._frame @property def created_at(self) -> datetime: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().created_at @created_at.setter def created_at(self, created_at: datetime) -> None: self._check_if_frame_view_valid() self._get_object_frame_instance_data().created_at = created_at @property def created_by(self) -> Optional[str]: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().created_by @created_by.setter def created_by(self, created_by: Optional[str]) -> None: """ Set the created_by field with a user email or None if it should default to the current user of the SDK. """ self._check_if_frame_view_valid() if created_by is not None: check_email(created_by) self._get_object_frame_instance_data().created_by = created_by @property def last_edited_at(self) -> datetime: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().last_edited_at @last_edited_at.setter def last_edited_at(self, last_edited_at: datetime) -> None: self._check_if_frame_view_valid() self._get_object_frame_instance_data().last_edited_at = last_edited_at @property def last_edited_by(self) -> Optional[str]: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().last_edited_by @last_edited_by.setter def last_edited_by(self, last_edited_by: Optional[str]) -> None: """ Set the last_edited_by field with a user email or None if it should default to the current user of the SDK. """ self._check_if_frame_view_valid() if last_edited_by is not None: check_email(last_edited_by) self._get_object_frame_instance_data().last_edited_by = last_edited_by @property def confidence(self) -> float: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().confidence @confidence.setter def confidence(self, confidence: float) -> None: self._check_if_frame_view_valid() self._get_object_frame_instance_data().confidence = confidence @property def manual_annotation(self) -> bool: self._check_if_frame_view_valid() return self._get_object_frame_instance_data().manual_annotation @manual_annotation.setter def manual_annotation(self, manual_annotation: bool) -> None: self._check_if_frame_view_valid() self._get_object_frame_instance_data().manual_annotation = manual_annotation @property def reviews(self) -> List[dict]: """ A read only property about the reviews that happened for this object on this frame. """ self._check_if_frame_view_valid() return self._get_object_frame_instance_data().reviews def _check_if_frame_view_valid(self) -> None: if self._frame not in self._classification_instance._frames_to_data: raise LabelRowError( "Trying to use an ObjectInstance.Annotation for an ObjectInstance that is not on the frame." ) def _get_object_frame_instance_data(self) -> ClassificationInstance.FrameData: return self._classification_instance._frames_to_data[self._frame]
[docs] @dataclass class FrameData: created_at: datetime = datetime.now() created_by: Optional[str] = None confidence: int = DEFAULT_CONFIDENCE manual_annotation: bool = DEFAULT_MANUAL_ANNOTATION last_edited_at: datetime = datetime.now() last_edited_by: Optional[str] = None reviews: Optional[List[dict]] = None
[docs] @staticmethod def from_dict(d: dict) -> ClassificationInstance.FrameData: if "lastEditedAt" in d: last_edited_at = parse(d["lastEditedAt"]) else: last_edited_at = None return ClassificationInstance.FrameData( created_at=parse(d["createdAt"]), created_by=d["createdBy"], confidence=d["confidence"], manual_annotation=d["manualAnnotation"], last_edited_at=last_edited_at, last_edited_by=d.get("lastEditedBy"), reviews=d.get("reviews"), )
[docs] def update_from_optional_fields( self, created_at: Optional[datetime] = None, created_by: Optional[str] = None, confidence: Optional[int] = None, manual_annotation: Optional[bool] = None, last_edited_at: Optional[datetime] = None, last_edited_by: Optional[str] = None, reviews: Optional[List[dict]] = None, ) -> None: self.created_at = created_at or self.created_at if created_by is not None: self.created_by = created_by self.last_edited_at = last_edited_at or self.last_edited_at if last_edited_by is not None: self.last_edited_by = last_edited_by if confidence is not None: self.confidence = confidence if manual_annotation is not None: self.manual_annotation = manual_annotation if reviews is not None: self.reviews = reviews
def _set_frame_and_frame_data( self, frame, *, overwrite: bool = False, created_at: Optional[datetime] = None, created_by: Optional[str] = None, confidence: Optional[int] = None, manual_annotation: Optional[bool] = None, last_edited_at: Optional[datetime] = None, last_edited_by: Optional[str] = None, reviews: Optional[List[dict]] = None, ): existing_frame_data = self._frames_to_data.get(frame) if overwrite is False and existing_frame_data is not None: raise LabelRowError( f"Cannot overwrite existing data for frame `{frame}`. Set `overwrite` to `True` to overwrite." ) if existing_frame_data is None: existing_frame_data = self.FrameData() self._frames_to_data[frame] = existing_frame_data existing_frame_data.update_from_optional_fields( created_at, created_by, confidence, manual_annotation, last_edited_at, last_edited_by, reviews ) if self.is_assigned_to_label_row(): self._parent.add_to_single_frame_to_hashes_map(self, frame) def _set_answer_unsafe(self, answer: Union[str, Option, Iterable[Option]], attribute: Attribute) -> None: static_answer = self._static_answer_map[attribute.feature_node_hash] set_answer_for_object(static_answer, answer) def _is_attribute_valid_child_of_classification(self, attribute: Attribute) -> bool: return attribute.feature_node_hash in self._static_answer_map def _is_selectable_child_attribute(self, attribute: Attribute) -> bool: # I have the ontology classification, so I can build the tree from that. Basically do a DFS. ontology_classification = self._ontology_classification top_attribute = ontology_classification.attributes[0] return _search_child_attributes(attribute, top_attribute, self._static_answer_map) def _check_within_range(self, frame: int) -> None: if frame < 0 or frame >= self._last_frame: raise LabelRowError( f"The supplied frame of `{frame}` is not within the acceptable bounds of `0` to `{self._last_frame}`." ) def _check_classification_already_present(self, frames: Iterable[int]) -> None: if self._parent is None: return already_present_frame = self._parent._is_classification_already_present(self.ontology_item, frames) if already_present_frame is not None: raise LabelRowError( f"The LabelRowV2, that this classification is part of, already has a classification of the same type " f"on frame `{already_present_frame}`. The same type of classification can only be present once per " f"frame per LabelRowV2." ) def __repr__(self): return ( f"ClassificationInstance(classification_hash={self.classification_hash}, " f"classification_name={self._ontology_classification.attributes[0].name}, " f"object_feature_hash={self._ontology_classification.feature_node_hash})" ) def __lt__(self, other) -> bool: return self.classification_hash < other.classification_hash
[docs]class LabelRowV2: """ This class represents a single label row. It is corresponding to exactly one data row within a project. It holds all the labels for that data row. You can access a many metadata fields with this class directly. If you want to read or write labels you will need to call :meth:`.initialise_labels()` first. To upload your added labels call :meth:`.save()`. """ def __init__( self, label_row_metadata: LabelRowMetadata, project_client: EncordClientProject, ontology: Ontology ) -> None: self._project_client = project_client self._ontology = ontology self._label_row_read_only_data: LabelRowV2.LabelRowReadOnlyData = self._parse_label_row_metadata( label_row_metadata ) self._is_labelling_initialised = False self._frame_to_hashes: defaultdict[int, Set[str]] = defaultdict(set) # ^ frames to object and classification hashes self._classifications_to_frames: defaultdict[Classification, Set[int]] = defaultdict(set) self._objects_map: Dict[str, ObjectInstance] = dict() self._classifications_map: Dict[str, ClassificationInstance] = dict() # ^ conveniently a dict is ordered in Python. Use this to our advantage to keep the labels in order # at least at the final objects_index/classifications_index level. @property def label_hash(self) -> Optional[str]: return self._label_row_read_only_data.label_hash @property def data_hash(self) -> str: return self._label_row_read_only_data.data_hash @property def dataset_hash(self) -> str: return self._label_row_read_only_data.dataset_hash @property def dataset_title(self) -> str: return self._label_row_read_only_data.dataset_title @property def data_title(self) -> str: return self._label_row_read_only_data.data_title @property def data_type(self) -> DataType: return self._label_row_read_only_data.data_type @property def label_status(self) -> LabelStatus: return self._label_row_read_only_data.label_status @property def annotation_task_status(self) -> AnnotationTaskStatus: return self._label_row_read_only_data.annotation_task_status @property def is_shadow_data(self) -> bool: return self._label_row_read_only_data.is_shadow_data @property def created_at(self) -> Optional[datetime]: """The creation date of the label row. None if the label row was not yet created.""" return self._label_row_read_only_data.created_at @property def last_edited_at(self) -> Optional[datetime]: """The time the label row was updated last as a whole. None if the label row was not yet created.""" return self._label_row_read_only_data.last_edited_at @property def number_of_frames(self) -> int: return self._label_row_read_only_data.number_of_frames @property def duration(self) -> Optional[float]: """Only a value for Video data types.""" return self._label_row_read_only_data.duration @property def fps(self) -> Optional[float]: """Only a value for Video data types.""" return self._label_row_read_only_data.fps @property def data_link(self) -> Optional[str]: """ The data link in either your cloud storage or the encord storage to the underlying object. This will be `None` for DICOM series or image groups that have been created without performance optimisations, as there is no single underlying file for these data types. """ return self._label_row_read_only_data.data_link @property def width(self) -> Optional[int]: """ This is `None` for image groups without performance optimisation, as there is no single underlying width for this data type. """ return self._label_row_read_only_data.width @property def height(self) -> Optional[int]: """ This is `None` for image groups without performance optimisation, as there is no single underlying width for this data type. """ return self._label_row_read_only_data.height @property def ontology_structure(self) -> OntologyStructure: """Get the corresponding ontology structure""" return self._ontology.structure @property def is_labelling_initialised(self) -> bool: """ Whether you can start labelling or not. If this is `False`, call the member `.initialise_labels()` to read or write specific ObjectInstances or ClassificationInstances. """ return self._is_labelling_initialised
[docs] def initialise_labels( self, include_object_feature_hashes: Optional[Set[str]] = None, include_classification_feature_hashes: Optional[Set[str]] = None, include_reviews: bool = False, overwrite: bool = False, ) -> None: """ Call this function to start reading or writing labels. This will fetch the labels that are currently stored in the Encord server. If you only want to inspect a subset of labels, you can filter them. Please note that if you filter the labels, and upload them later, you will effectively delete all the labels that had been filtered previously. If the label was not yet in progress, this will set the label status to `LabelStatus.LABEL_IN_PROGRESS`. You can call this function at any point to overwrite the current labels stored in this class with the most up to date labels stored in the Encord servers. This would only matter if you manipulate the labels while someone else is working on the labels as well. You would need to supply the `overwrite` parameter to `True` Args: include_object_feature_hashes: If None all the objects will be included. Otherwise, only objects labels will be included of which the feature_hash has been added. WARNING: it is only recommended to use this filter if you are reading (not writing) the labels. If you are requesting a subset of objects and later, save the label, you will effectively delete all the object instances that are stored in the Encord platform, which were not included in this filtered subset. include_classification_feature_hashes: If None all the classifications will be included. Otherwise, only classification labels will be included of which the feature_hash has been added. WARNING: it is only recommended to use this filter if you are reading (not writing) the labels. If you are requesting a subset of classifications and later, save the label, you will effectively delete all the classification instances that are stored in the Encord platform, which were not included in this filtered subset. include_reviews: Whether to request read only information about the reviews of the label row. overwrite: If the label row was already initialised, you need to set this flag to `True` to overwrite the current labels with the labels stored in the Encord server. If this is `False` and the label row was already initialised, this function will throw an error. """ if self.is_labelling_initialised and not overwrite: raise LabelRowError( "You are trying to re-initialise a label row that has already been initialised. This would overwrite " "current labels. If this is your intend, set the `overwrite` flag to `True`." ) get_signed_url = False if self.label_hash is None: label_row_dict = self._project_client.create_label_row(self.data_hash) else: label_row_dict = self._project_client.get_label_row( uid=self.label_hash, get_signed_url=get_signed_url, include_object_feature_hashes=include_object_feature_hashes, include_classification_feature_hashes=include_classification_feature_hashes, include_reviews=include_reviews, ) self.from_labels_dict(label_row_dict)
[docs] def from_labels_dict(self, label_row_dict: dict) -> None: """ If you have a label row dictionary in the same format that the Encord servers produce, you can initialise the LabelRow from that directly. In most cases you should prefer using the `initialise_labels` method. This function also initialises the label row. Calling this function will reset all the labels that are currently stored within this class. Args: label_row_dict: The dictionary of all labels as expected by the Encord format. """ self._is_labelling_initialised = True self._label_row_read_only_data = self._parse_label_row_dict(label_row_dict) self._frame_to_hashes = defaultdict(set) self._classifications_to_frames: defaultdict[Classification, Set[int]] = defaultdict(set) self._objects_map: Dict[str, ObjectInstance] = dict() self._classifications_map: Dict[str, ClassificationInstance] = dict() self._parse_labels_from_dict(label_row_dict)
[docs] def get_image_hash(self, frame_number: int) -> Optional[str]: """ Get the corresponding image hash of the frame number. Return `None` if the frame number is out of bounds. Raise an error if this function is used for non-image data types. """ self._check_labelling_is_initalised() if self.data_type not in (DataType.IMAGE, DataType.IMG_GROUP): raise LabelRowError("This function is only supported for label rows of image or image group data types.") return self._label_row_read_only_data.frame_to_image_hash.get(frame_number)
[docs] def get_frame_number(self, image_hash: str) -> Optional[int]: """ Get the corresponding image hash of the frame number. Return `None` if the image hash was not found with an associated frame number. Raise an error if this function is used for non-image data types. """ self._check_labelling_is_initalised() if self.data_type not in (DataType.IMAGE, DataType.IMG_GROUP): raise LabelRowError("This function is only supported for label rows of image or image group data types.") return self._label_row_read_only_data.image_hash_to_frame[image_hash]
[docs] def save(self): """ Upload the created labels with the Encord server. This will overwrite any labels that someone has created in the platform in the meantime. """ self._check_labelling_is_initalised() dict_labels = self.to_encord_dict() self._project_client.save_label_row(uid=self.label_hash, label=dict_labels)
[docs] def get_frame_view(self, frame: Union[int, str] = 0) -> FrameView: """ Args: frame: Either the frame number or the image hash if the data type is an image or image group. Defaults to the first frame. """ self._check_labelling_is_initalised() if isinstance(frame, str): frame = self.get_frame_number(frame) return self.FrameView(self, self._label_row_read_only_data, frame)
[docs] def get_frame_views(self) -> List[FrameView]: """ Returns: A list of frame views in order of available frames. """ self._check_labelling_is_initalised() ret = [] for frame in range(self.number_of_frames): ret.append(self.get_frame_view(frame)) return ret
[docs] def get_object_instances( self, filter_ontology_object: Optional[Object] = None, filter_frames: Optional[Frames] = None ) -> List[ObjectInstance]: """ Args: filter_ontology_object: Optionally filter by a specific ontology object. filter_frames: Optionally filter by specific frames. Returns: All the `ObjectInstance`s that match the filter. """ self._check_labelling_is_initalised() ret: List[ObjectInstance] = list() if filter_frames is not None: filtered_frames_list = frames_class_to_frames_list(filter_frames) else: filtered_frames_list = list() for object_ in self._objects_map.values(): # filter by ontology object if not ( filter_ontology_object is None or object_.ontology_item.feature_node_hash == filter_ontology_object.feature_node_hash ): continue # filter by frame if filter_frames is None: append = True else: append = False for frame in filtered_frames_list: hashes = self._frame_to_hashes.get(frame, set()) if object_.object_hash in hashes: append = True break if append: ret.append(object_) return ret
[docs] def add_object_instance(self, object_instance: ObjectInstance, force: bool = True) -> None: """ Add an object instance to the label row. If the object instance already exists, it Args: object_instance: The object instance to add. force: overwrites current objects, otherwise this will replace the current object. """ self._check_labelling_is_initalised() object_instance.is_valid() if object_instance.is_assigned_to_label_row(): raise LabelRowError( "The supplied ObjectInstance is already part of a LabelRowV2. You can only add a ObjectInstance to one " "LabelRowV2. You can do a ObjectInstance.copy() to create an identical ObjectInstance which is not part of " "any LabelRowV2." ) object_hash = object_instance.object_hash if object_hash in self._objects_map and not force: raise LabelRowError( "The supplied ObjectInstance was already previously added. (the object_hash is the same)." ) elif object_hash in self._objects_map and force: self._objects_map.pop(object_hash) self._objects_map[object_hash] = object_instance object_instance._parent = self self._add_to_frame_to_hashes_map(object_instance)
[docs] def add_classification_instance(self, classification_instance: ClassificationInstance, force: bool = False) -> None: """ Add a classification instance to the label row. Args: classification_instance: The object instance to add. force: overwrites current objects, otherwise this will replace the current object. """ self._check_labelling_is_initalised() classification_instance.is_valid() if classification_instance.is_assigned_to_label_row(): raise LabelRowError( "The supplied ClassificationInstance is already part of a LabelRowV2. You can only add a ClassificationInstance" " to one LabelRowV2. You can do a ClassificationInstance.copy() to create an identical ObjectInstance which is " "not part of any LabelRowV2." ) classification_hash = classification_instance.classification_hash already_present_frame = self._is_classification_already_present( classification_instance.ontology_item, _frame_views_to_frame_numbers(classification_instance.get_annotations()), ) if classification_hash in self._classifications_map and not force: raise LabelRowError( "The supplied ClassificationInstance was already previously added. (the classification_hash is the same)." ) if already_present_frame is not None and not force: raise LabelRowError( f"A ClassificationInstance of the same type was already added and has overlapping frames. One " f"overlapping frame that was found is `{already_present_frame}`. Make sure that you only add " f"classifications which are on frames where the same type of classification does not yet exist." ) if classification_hash in self._classifications_map and force: self._classifications_map.pop(classification_hash) self._classifications_map[classification_hash] = classification_instance classification_instance._parent = self self._classifications_to_frames[classification_instance.ontology_item].update( set(_frame_views_to_frame_numbers(classification_instance.get_annotations())) ) self._add_to_frame_to_hashes_map(classification_instance)
[docs] def remove_classification(self, classification_instance: ClassificationInstance): """Remove a classification instance from a label row.""" self._check_labelling_is_initalised() classification_hash = classification_instance.classification_hash self._classifications_map.pop(classification_hash) all_frames = self._classifications_to_frames[classification_instance.ontology_item] actual_frames = _frame_views_to_frame_numbers(classification_instance.get_annotations()) for actual_frame in actual_frames: all_frames.remove(actual_frame)
[docs] def add_to_single_frame_to_hashes_map( self, label_item: Union[ObjectInstance, ClassificationInstance], frame: int ) -> None: """This is an internal function, it is not meant to be called by the SDK user.""" self._check_labelling_is_initalised() if isinstance(label_item, ObjectInstance): self._frame_to_hashes[frame].add(label_item.object_hash) elif isinstance(label_item, ClassificationInstance): self._frame_to_hashes[frame].add(label_item.classification_hash) else: raise NotImplementedError(f"Got an unexpected label item class `{type(label_item)}`")
[docs] def get_classification_instances( self, filter_ontology_classification: Optional[Classification] = None, filter_frames: Optional[Frames] = None ) -> List[ClassificationInstance]: """ Args: filter_ontology_classification: Optionally filter by a specific ontology classification. filter_frames: Optionally filter by specific frames. Returns: All the `ObjectInstance`s that match the filter. """ self._check_labelling_is_initalised() ret: List[ClassificationInstance] = list() if filter_frames is not None: filtered_frames_list = frames_class_to_frames_list(filter_frames) else: filtered_frames_list = list() for classification in self._classifications_map.values(): # filter by ontology object if not ( filter_ontology_classification is None or classification.ontology_item.feature_node_hash == filter_ontology_classification.feature_node_hash ): continue # filter by frame if filter_frames is None: append = True else: append = False for frame in filtered_frames_list: hashes = self._frame_to_hashes.get(frame, set()) if classification.classification_hash in hashes: append = True break if append: ret.append(classification) return ret
[docs] def remove_object(self, object_instance: ObjectInstance): """Remove an object instance from a label row.""" self._check_labelling_is_initalised() self._objects_map.pop(object_instance.object_hash) self._remove_from_frame_to_hashes_map( _frame_views_to_frame_numbers(object_instance.get_annotations()), object_instance.object_hash ) object_instance._parent = None
[docs] def to_encord_dict(self) -> dict: """ This is an internal helper function. Likely this should not be used by a user. To upload labels use the :meth:`.save()` function. """ self._check_labelling_is_initalised() ret = {} read_only_data = self._label_row_read_only_data ret["label_hash"] = read_only_data.label_hash ret["created_at"] = read_only_data.created_at ret["last_edited_at"] = read_only_data.last_edited_at ret["data_hash"] = read_only_data.data_hash ret["dataset_hash"] = read_only_data.dataset_hash ret["dataset_title"] = read_only_data.dataset_title ret["data_title"] = read_only_data.data_title ret["data_type"] = read_only_data.data_type.value ret["annotation_task_status"] = read_only_data.annotation_task_status ret["is_shadow_data"] = read_only_data.is_shadow_data ret["object_answers"] = self._to_object_answers() ret["classification_answers"] = self._to_classification_answers() ret["object_actions"] = self._to_object_actions() ret["label_status"] = read_only_data.label_status.value ret["data_units"] = self._to_encord_data_units() return ret
[docs] class FrameView: """ This class can be used to inspect what object/classification instances are on a given frame or what metadata, such as a image file size, is on a given frame. """ def __init__( self, label_row: LabelRowV2, label_row_read_only_data: LabelRowV2.LabelRowReadOnlyData, frame: int ): self._label_row = label_row self._label_row_read_only_data = label_row_read_only_data self._frame = frame @property def image_hash(self) -> str: if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]: raise LabelRowError("Image hash can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP") return self._frame_level_data().image_hash @property def image_title(self) -> str: if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]: raise LabelRowError("Image title can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP") return self._frame_level_data().image_title @property def file_type(self) -> str: if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]: raise LabelRowError("File type can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP") return self._frame_level_data().file_type @property def frame(self) -> int: return self._frame @property def width(self) -> int: if self._label_row.data_type in [DataType.IMG_GROUP]: return self._frame_level_data().width else: return self._label_row_read_only_data.width @property def height(self) -> int: if self._label_row.data_type in [DataType.IMG_GROUP]: return self._frame_level_data().height else: return self._label_row_read_only_data.height @property def data_link(self) -> Optional[str]: if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]: raise LabelRowError("Data link can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP") return self._frame_level_data().data_link
[docs] def add_object_instance( self, object_instance: ObjectInstance, coordinates: Coordinates, *, overwrite: bool = False, created_at: Optional[datetime] = None, created_by: Optional[str] = None, last_edited_at: Optional[datetime] = None, last_edited_by: Optional[str] = None, confidence: Optional[float] = None, manual_annotation: Optional[bool] = None, ) -> None: label_row = object_instance.is_assigned_to_label_row() if label_row and self._label_row != label_row: raise LabelRowError( "This object instance is already assigned to a different label row. It can not be " "added to multiple label rows at once." ) object_instance.set_for_frames( coordinates, self._frame, overwrite=overwrite, created_at=created_at, created_by=created_by, last_edited_at=last_edited_at, last_edited_by=last_edited_by, confidence=confidence, manual_annotation=manual_annotation, ) if not label_row: self._label_row.add_object_instance(object_instance)
[docs] def add_classification_instance( self, classification_instance: ClassificationInstance, *, overwrite: bool = False, created_at: datetime = datetime.now(), created_by: str = None, confidence: int = DEFAULT_CONFIDENCE, manual_annotation: bool = DEFAULT_MANUAL_ANNOTATION, last_edited_at: datetime = datetime.now(), last_edited_by: Optional[str] = None, ) -> None: label_row = classification_instance.is_assigned_to_label_row() if label_row and self._label_row != label_row: raise LabelRowError( "This object instance is already assigned to a different label row. It can not be " "added to multiple label rows at once." ) classification_instance.set_for_frames( self._frame, overwrite=overwrite, created_at=created_at, created_by=created_by, confidence=confidence, manual_annotation=manual_annotation, last_edited_at=last_edited_at, last_edited_by=last_edited_by, ) if not label_row: self._label_row.add_classification_instance(classification_instance)
[docs] def get_object_instances(self, filter_ontology_object: Optional[Object] = None) -> List[ObjectInstance]: """ Args: filter_ontology_object: Optionally filter by a specific ontology object. Returns: All the `ObjectInstance`s that match the filter. """ return self._label_row.get_object_instances( filter_ontology_object=filter_ontology_object, filter_frames=self._frame )
[docs] def get_classification_instances( self, filter_ontology_classification: Optional[Classification] = None ) -> List[ClassificationInstance]: """ Args: filter_ontology_classification: Optionally filter by a specific ontology object. Returns: All the `ObjectInstance`s that match the filter. """ return self._label_row.get_classification_instances( filter_ontology_classification=filter_ontology_classification, filter_frames=self._frame )
def _frame_level_data(self) -> LabelRowV2.FrameLevelImageGroupData: return self._label_row_read_only_data.frame_level_data[self._frame] def __repr__(self): return f"FrameView(label_row={self._label_row}, frame={self._frame})"
[docs] @dataclass(frozen=True) class FrameLevelImageGroupData: """This is an internal helper class. A user should not directly interract with it.""" image_hash: str image_title: str file_type: str frame_number: int width: int height: int data_link: Optional[str] = None
[docs] @dataclass(frozen=True) class LabelRowReadOnlyData: """This is an internal helper class. A user should not directly interract with it.""" label_hash: Optional[str] """This is None if the label row does not have any labels and was not initialised for labelling.""" created_at: Optional[datetime] """This is None if the label row does not have any labels and was not initialised for labelling.""" last_edited_at: Optional[datetime] """This is None if the label row does not have any labels and was not initialised for labelling.""" data_hash: str data_type: DataType label_status: LabelStatus annotation_task_status: AnnotationTaskStatus is_shadow_data: bool number_of_frames: int duration: Optional[float] fps: Optional[float] dataset_hash: str dataset_title: str data_title: str width: Optional[int] height: Optional[int] data_link: Optional[str] frame_level_data: Dict[int, LabelRowV2.FrameLevelImageGroupData] = field(default_factory=dict) image_hash_to_frame: Dict[str, int] = field(default_factory=dict) frame_to_image_hash: Dict[int, str] = field(default_factory=dict)
def _to_object_answers(self) -> dict: ret = {} for obj in self._objects_map.values(): all_static_answers = self._get_all_static_answers(obj) ret[obj.object_hash] = { "classifications": list(reversed(all_static_answers)), "objectHash": obj.object_hash, } return ret def _to_object_actions(self) -> dict: ret = {} for obj in self._objects_map.values(): all_static_answers = self._dynamic_answers_to_encord_dict(obj) if len(all_static_answers) == 0: continue ret[obj.object_hash] = { "actions": list(reversed(all_static_answers)), "objectHash": obj.object_hash, } return ret def _to_classification_answers(self) -> dict: ret = {} for classification in self._classifications_map.values(): classifications = [] all_static_answers = classification.get_all_static_answers() for answer in all_static_answers: if answer.is_answered(): classifications.append(answer.to_encord_dict()) ret[classification.classification_hash] = { "classifications": list(reversed(classifications)), "classificationHash": classification.classification_hash, } return ret @staticmethod def _get_all_static_answers(object_instance: ObjectInstance) -> List[dict]: """Essentially convert to the JSON format of all the static answers.""" ret = [] for answer in object_instance._get_all_static_answers(): d_opt = answer.to_encord_dict() if d_opt is not None: ret.append(d_opt) return ret @staticmethod def _dynamic_answers_to_encord_dict(object_instance: ObjectInstance) -> List[dict]: ret = [] for answer, ranges in object_instance._get_all_dynamic_answers(): d_opt = answer.to_encord_dict(ranges) if d_opt is not None: ret.append(d_opt) return ret def _to_encord_data_units(self) -> dict: ret = {} frame_level_data = self._label_row_read_only_data.frame_level_data for value in frame_level_data.values(): ret[value.image_hash] = self._to_encord_data_unit(value) return ret def _to_encord_data_unit(self, frame_level_data: FrameLevelImageGroupData) -> dict: ret = {} data_type = self._label_row_read_only_data.data_type if data_type == DataType.IMG_GROUP: data_sequence = str(frame_level_data.frame_number) elif data_type in (DataType.VIDEO, DataType.DICOM, DataType.IMAGE): data_sequence = frame_level_data.frame_number else: raise NotImplementedError(f"The data type {data_type} is not implemented yet.") ret["data_hash"] = frame_level_data.image_hash ret["data_title"] = frame_level_data.image_title if data_type != DataType.DICOM: ret["data_link"] = frame_level_data.data_link ret["data_type"] = frame_level_data.file_type ret["data_sequence"] = data_sequence ret["width"] = frame_level_data.width ret["height"] = frame_level_data.height ret["labels"] = self._to_encord_labels(frame_level_data) if self._label_row_read_only_data.duration is not None: ret["data_duration"] = self._label_row_read_only_data.duration if self._label_row_read_only_data.fps is not None: ret["data_fps"] = self._label_row_read_only_data.fps return ret def _to_encord_labels(self, frame_level_data: FrameLevelImageGroupData) -> dict: ret = {} data_type = self._label_row_read_only_data.data_type if data_type in [DataType.IMAGE, DataType.IMG_GROUP]: frame = frame_level_data.frame_number ret.update(self._to_encord_label(frame)) elif data_type in [DataType.VIDEO, DataType.DICOM]: for frame in self._frame_to_hashes.keys(): ret[str(frame)] = self._to_encord_label(frame) return ret def _to_encord_label(self, frame: int) -> dict: ret = {} ret["objects"] = self._to_encord_objects_list(frame) ret["classifications"] = self._to_encord_classifications_list(frame) return ret def _to_encord_objects_list(self, frame: int) -> list: # Get objects for frame ret: List[dict] = [] objects = self.get_object_instances(filter_frames=frame) for object_ in objects: encord_object = self._to_encord_object(object_, frame) ret.append(encord_object) return ret def _to_encord_object( self, object_: ObjectInstance, frame: int, ) -> dict: ret = {} object_instance_annotation = object_.get_annotation(frame) coordinates = object_instance_annotation.coordinates ontology_hash = object_.ontology_item.feature_node_hash ontology_object = self._ontology.structure.get_child_by_hash(ontology_hash) ret["name"] = ontology_object.name ret["color"] = ontology_object.color ret["shape"] = ontology_object.shape.value ret["value"] = _lower_snake_case(ontology_object.name) ret["createdAt"] = object_instance_annotation.created_at.strftime(DATETIME_LONG_STRING_FORMAT) ret["createdBy"] = object_instance_annotation.created_by ret["confidence"] = object_instance_annotation.confidence ret["objectHash"] = object_.object_hash ret["featureHash"] = ontology_object.feature_node_hash ret["manualAnnotation"] = object_instance_annotation.manual_annotation if object_instance_annotation.last_edited_at is not None: ret["lastEditedAt"] = object_instance_annotation.last_edited_at.strftime(DATETIME_LONG_STRING_FORMAT) if object_instance_annotation.last_edited_by is not None: ret["lastEditedBy"] = object_instance_annotation.last_edited_by if object_instance_annotation.is_deleted is not None: ret["isDeleted"] = object_instance_annotation.is_deleted self._add_coordinates_to_encord_object(coordinates, ret) return ret def _add_coordinates_to_encord_object(self, coordinates: Coordinates, encord_object: dict) -> None: if isinstance(coordinates, BoundingBoxCoordinates): encord_object["boundingBox"] = coordinates.to_dict() elif isinstance(coordinates, RotatableBoundingBoxCoordinates): encord_object["rotatableBoundingBox"] = coordinates.to_dict() elif isinstance(coordinates, PolygonCoordinates): encord_object["polygon"] = coordinates.to_dict() elif isinstance(coordinates, PolylineCoordinates): encord_object["polyline"] = coordinates.to_dict() elif isinstance(coordinates, PointCoordinate): encord_object["point"] = coordinates.to_dict() else: raise NotImplementedError(f"adding coordinatees for this type not yet implemented {type(coordinates)}") def _to_encord_classifications_list(self, frame: int) -> list: ret: List[dict] = [] classifications = self.get_classification_instances(filter_frames=frame) for classification in classifications: encord_classification = self._to_encord_classification(classification, frame) ret.append(encord_classification) return ret def _to_encord_classification(self, classification: ClassificationInstance, frame: int) -> dict: ret = {} annotation = classification.get_annotation(frame) classification_feature_hash = classification.ontology_item.feature_node_hash ontology_classification = self._ontology.structure.get_child_by_hash(classification_feature_hash) attribute_hash = classification.ontology_item.attributes[0].feature_node_hash ontology_attribute = self._ontology.structure.get_child_by_hash(attribute_hash) ret["name"] = ontology_attribute.name ret["value"] = _lower_snake_case(ontology_attribute.name) ret["createdAt"] = annotation.created_at.strftime(DATETIME_LONG_STRING_FORMAT) ret["createdBy"] = annotation.created_by ret["confidence"] = annotation.confidence ret["featureHash"] = ontology_classification.feature_node_hash ret["classificationHash"] = classification.classification_hash ret["manualAnnotation"] = annotation.manual_annotation if annotation.last_edited_at is not None: ret["lastEditedAt"] = annotation.last_edited_at.strftime(DATETIME_LONG_STRING_FORMAT) if annotation.last_edited_by is not None: ret["lastEditedBy"] = annotation.last_edited_by return ret def _is_classification_already_present( self, classification: Classification, frames: Iterable[int] ) -> Optional[int]: present_frames = self._classifications_to_frames.get(classification, set()) for frame in frames: if frame in present_frames: return frame return None def _remove_frames_from_classification(self, classification: Classification, frames: Iterable[int]) -> None: present_frames = self._classifications_to_frames.get(classification, set()) for frame in frames: present_frames.remove(frame) def _add_to_frame_to_hashes_map(self, label_item: Union[ObjectInstance, ClassificationInstance]) -> None: """This can be called by the ObjectInstance.""" for frame_view in label_item.get_annotations(): self.add_to_single_frame_to_hashes_map(label_item, frame_view.frame) def _remove_from_frame_to_hashes_map(self, frames: Iterable[int], item_hash: str): for frame in frames: self._frame_to_hashes[frame].remove(item_hash) def _parse_label_row_metadata(self, label_row_metadata: LabelRowMetadata) -> LabelRowV2.LabelRowReadOnlyData: data_type = DataType.from_upper_case_string(label_row_metadata.data_type) return LabelRowV2.LabelRowReadOnlyData( label_hash=label_row_metadata.label_hash, data_hash=label_row_metadata.data_hash, data_title=label_row_metadata.data_title, dataset_hash=label_row_metadata.dataset_hash, dataset_title=label_row_metadata.dataset_title, data_type=data_type, data_link=label_row_metadata.data_link, label_status=label_row_metadata.label_status, annotation_task_status=label_row_metadata.annotation_task_status, is_shadow_data=label_row_metadata.is_shadow_data, created_at=label_row_metadata.created_at, last_edited_at=label_row_metadata.last_edited_at, duration=label_row_metadata.duration, fps=label_row_metadata.frames_per_second, number_of_frames=label_row_metadata.number_of_frames, width=label_row_metadata.width, height=label_row_metadata.height, ) def _parse_label_row_dict(self, label_row_dict: dict) -> LabelRowReadOnlyData: frame_level_data = self._parse_image_group_frame_level_data(label_row_dict["data_units"]) image_hash_to_frame = {item.image_hash: item.frame_number for item in frame_level_data.values()} frame_to_image_hash = {item.frame_number: item.image_hash for item in frame_level_data.values()} data_type = DataType(label_row_dict["data_type"]) if data_type == DataType.VIDEO: video_dict = list(label_row_dict["data_units"].values())[0] data_link = video_dict["data_link"] height = video_dict["height"] width = video_dict["width"] elif data_type == DataType.DICOM: dicom_dict = list(label_row_dict["data_units"].values())[0] data_link = None height = dicom_dict["height"] width = dicom_dict["width"] elif data_type == DataType.IMAGE: image_dict = list(label_row_dict["data_units"].values())[0] data_link = image_dict["data_link"] height = image_dict["height"] width = image_dict["width"] elif data_type == DataType.IMG_GROUP: data_link = None height = None width = None else: raise NotImplementedError(f"The data type {data_type} is not implemented yet.") return self.LabelRowReadOnlyData( label_hash=label_row_dict["label_hash"], dataset_hash=label_row_dict["dataset_hash"], dataset_title=label_row_dict["dataset_title"], data_title=label_row_dict["data_title"], data_hash=label_row_dict["data_hash"], data_type=data_type, label_status=LabelStatus(label_row_dict["label_status"]), annotation_task_status=label_row_dict["annotation_task_status"], is_shadow_data=self.is_shadow_data, created_at=label_row_dict["created_at"], last_edited_at=label_row_dict["last_edited_at"], frame_level_data=frame_level_data, image_hash_to_frame=image_hash_to_frame, frame_to_image_hash=frame_to_image_hash, duration=self.duration, fps=self.fps, number_of_frames=self.number_of_frames, data_link=data_link, height=height, width=width, ) def _parse_labels_from_dict(self, label_row_dict: dict): classification_answers = label_row_dict["classification_answers"] for data_unit in label_row_dict["data_units"].values(): data_type = label_row_dict["data_type"] if data_type in {DataType.IMG_GROUP.value, DataType.IMAGE.value}: frame = int(data_unit["data_sequence"]) self._add_object_instances_from_objects(data_unit["labels"].get("objects", []), frame) self._add_classification_instances_from_classifications( data_unit["labels"].get("classifications", []), classification_answers, int(frame) ) elif data_type in {DataType.VIDEO.value, DataType.DICOM.value}: for frame, frame_data in data_unit["labels"].items(): self._add_object_instances_from_objects(frame_data["objects"], int(frame)) self._add_classification_instances_from_classifications( frame_data["classifications"], classification_answers, int(frame) ) else: raise NotImplementedError(f"Got an unexpected data type `{data_type}`") self._add_objects_answers(label_row_dict) self._add_action_answers(label_row_dict) def _add_object_instances_from_objects( self, objects_list: List[dict], frame: int, ) -> None: for frame_object_label in objects_list: object_hash = frame_object_label["objectHash"] if object_hash not in self._objects_map: object_instance = self._create_new_object_instance(frame_object_label, frame) self.add_object_instance(object_instance) else: self._add_coordinates_to_object_instance(frame_object_label, frame) def _add_objects_answers(self, label_row_dict: dict): for answer in label_row_dict["object_answers"].values(): object_hash = answer["objectHash"] object_instance = self._objects_map[object_hash] answer_list = answer["classifications"] object_instance.set_answer_from_list(answer_list) def _add_action_answers(self, label_row_dict: dict): for answer in label_row_dict["object_actions"].values(): object_hash = answer["objectHash"] object_instance = self._objects_map[object_hash] answer_list = answer["actions"] object_instance.set_answer_from_list(answer_list) def _create_new_object_instance(self, frame_object_label: dict, frame: int) -> ObjectInstance: ontology = self._ontology.structure feature_hash = frame_object_label["featureHash"] object_hash = frame_object_label["objectHash"] label_class = ontology.get_child_by_hash(feature_hash) object_instance = ObjectInstance(label_class, object_hash=object_hash) coordinates = self._get_coordinates(frame_object_label) object_frame_instance_info = ObjectInstance.FrameInfo.from_dict(frame_object_label) object_instance.set_for_frames(coordinates=coordinates, frames=frame, **asdict(object_frame_instance_info)) return object_instance def _add_coordinates_to_object_instance( self, frame_object_label: dict, frame: int = 0, ) -> None: object_hash = frame_object_label["objectHash"] object_instance = self._objects_map[object_hash] coordinates = self._get_coordinates(frame_object_label) object_frame_instance_info = ObjectInstance.FrameInfo.from_dict(frame_object_label) object_instance.set_for_frames(coordinates=coordinates, frames=frame, **asdict(object_frame_instance_info)) def _get_coordinates(self, frame_object_label: dict) -> Coordinates: if "boundingBox" in frame_object_label: return BoundingBoxCoordinates.from_dict(frame_object_label) if "rotatableBoundingBox" in frame_object_label: return RotatableBoundingBoxCoordinates.from_dict(frame_object_label) elif "polygon" in frame_object_label: return PolygonCoordinates.from_dict(frame_object_label) elif "point" in frame_object_label: return PointCoordinate.from_dict(frame_object_label) elif "polyline" in frame_object_label: return PolylineCoordinates.from_dict(frame_object_label) elif "skeleton" in frame_object_label: raise NotImplementedError(f"Got a skeleton object, which is not supported yet") else: raise NotImplementedError(f"Getting coordinates for `{frame_object_label}` is not supported yet.") def _add_classification_instances_from_classifications( self, classifications_list: List[dict], classification_answers: dict, frame: int ): for frame_classification_label in classifications_list: classification_hash = frame_classification_label["classificationHash"] if classification_hash not in self._classifications_map: classification_instance = self._create_new_classification_instance( frame_classification_label, frame, classification_answers ) self.add_classification_instance(classification_instance) else: self._add_frames_to_classification_instance(frame_classification_label, frame) def _parse_image_group_frame_level_data(self, label_row_data_units: dict) -> Dict[int, FrameLevelImageGroupData]: frame_level_data: Dict[int, LabelRowV2.FrameLevelImageGroupData] = dict() for _, payload in label_row_data_units.items(): frame_number = int(payload["data_sequence"]) frame_level_image_group_data = self.FrameLevelImageGroupData( image_hash=payload["data_hash"], image_title=payload["data_title"], data_link=payload.get("data_link"), file_type=payload["data_type"], frame_number=frame_number, width=payload["width"], height=payload["height"], ) frame_level_data[frame_number] = frame_level_image_group_data return frame_level_data def _create_new_classification_instance( self, frame_classification_label: dict, frame: int, classification_answers: dict ) -> ClassificationInstance: feature_hash = frame_classification_label["featureHash"] classification_hash = frame_classification_label["classificationHash"] label_class = self._ontology.structure.get_child_by_hash(feature_hash) classification_instance = ClassificationInstance(label_class, classification_hash=classification_hash) frame_view = ClassificationInstance.FrameData.from_dict(frame_classification_label) classification_instance.set_for_frames(frame, **asdict(frame_view)) answers_dict = classification_answers[classification_hash]["classifications"] self._add_static_answers_from_dict(classification_instance, answers_dict) return classification_instance def _add_static_answers_from_dict( self, classification_instance: ClassificationInstance, answers_list: List[dict] ) -> None: classification_instance.set_answer_from_list(answers_list) def _add_frames_to_classification_instance(self, frame_classification_label: dict, frame: int) -> None: object_hash = frame_classification_label["classificationHash"] classification_instance = self._classifications_map[object_hash] frame_view = ClassificationInstance.FrameData.from_dict(frame_classification_label) classification_instance.set_for_frames(frame, **asdict(frame_view)) def _check_labelling_is_initalised(self): if not self.is_labelling_initialised: raise LabelRowError( "For this operation you will need to initialise labelling first. Call the `.initialise_labels()` " "to do so first." ) def __repr__(self) -> str: return f"LabelRowV2(label_hash={self.label_hash}, data_hash={self.data_hash}, data_title={self.data_title})"
[docs]@dataclass class AnswerForFrames: answer: Union[str, Option, Iterable[Option]] ranges: Ranges """ The ranges are essentially a run length encoding of the frames where the unique answer is set. They are sorted in ascending order. """
AnswersForFrames = List[AnswerForFrames]
[docs]class ObjectInstance: """ An object instance is an object that has coordinates and can be places on one or multiple frames in a label row. """ def __init__(self, ontology_object: Object, *, object_hash: Optional[str] = None): self._ontology_object = ontology_object self._frames_to_instance_data: Dict[int, ObjectInstance.FrameData] = dict() self._object_hash = object_hash or short_uuid_str() self._parent: Optional[LabelRowV2] = None """This member should only be manipulated by a LabelRowV2""" self._static_answer_map: Dict[str, Answer] = _get_static_answer_map(self._ontology_object.attributes) # feature_node_hash of attribute to the answer. self._dynamic_answer_manager = DynamicAnswerManager(self)
[docs] def is_assigned_to_label_row(self) -> Optional[LabelRowV2]: return self._parent
@property def object_hash(self) -> str: """A unique identifier for the object instance.""" return self._object_hash @property def ontology_item(self) -> Any: return deepcopy(self._ontology_object) @property def _last_frame(self) -> Union[int, float]: if self._parent is None or self._parent.data_type is DataType.DICOM: return float("inf") else: return self._parent.number_of_frames
[docs] def get_answer( self, attribute: Attribute, filter_answer: Union[str, Option, Iterable[Option], None] = None, filter_frame: Optional[int] = None, is_dynamic: Optional[bool] = None, ) -> Union[str, Option, Iterable[Option], AnswersForFrames, None]: """ Get the answer set for a given ontology Attribute. Returns `None` if the attribute is not yet answered. For the ChecklistAttribute, it returns None if and only if the attribute is nested and the parent is unselected. Otherwise, if not yet answered it will return an empty list. Args: attribute: The ontology attribute to get the answer for. filter_answer: A filter for a specific answer value. Only applies to dynamic attributes. filter_frame: A filter for a specific frame. Only applies to dynamic attributes. is_dynamic: Optionally specify whether a dynamic answer is expected or not. This will throw if it is set incorrectly according to the attribute. Set this to narrow down the return type. Returns: If the attribute is static, then the answer value is returned, assuming an answer value has already been set. If the attribute is dynamic, the AnswersForFrames object is returned. """ if attribute is None: attribute = self._ontology_object.attributes[0] elif not self._is_attribute_valid_child_of_object_instance(attribute): raise LabelRowError("The attribute is not a valid child of the classification.") elif not self._is_selectable_child_attribute(attribute): return None if is_dynamic is not None and is_dynamic is not attribute.dynamic: raise LabelRowError( f"The attribute is {'dynamic' if attribute.dynamic else 'static'}, but is_dynamic is set to " f"{is_dynamic}." ) if attribute.dynamic: return self._dynamic_answer_manager.get_answer(attribute, filter_answer, filter_frame) static_answer = self._static_answer_map[attribute.feature_node_hash] return get_answer_from_object(static_answer)
[docs] def set_answer( self, answer: Union[str, Option, Sequence[Option]], attribute: Optional[Attribute] = None, frames: Optional[Frames] = None, overwrite: bool = False, ) -> None: """ Set the answer for a given ontology Attribute. This is the equivalent of e.g. selecting a checkbox in the UI after drowing the ObjectInstance. There is only one answer per ObjectInstance per Attribute, unless the attribute is dynamic (check the args list for more instructions on how to set dynamic answers). Args: answer: The answer to set. attribute: The ontology attribute to set the answer for. If not set, this will be attempted to be inferred. For answers to :class:`encord.objects.common.RadioAttribute` or :class:`encord.objects.common.ChecklistAttribute`, this can be inferred automatically. For :class:`encord.objects.common.TextAttribute`, this will only be inferred there is only one possible TextAttribute to set for the entire object instance. Otherwise, a :class:`encord.exceptionsLabelRowError` will be thrown. frames: Only relevant for dynamic attributes. The frames to set the answer for. If `None`, the answer is set for all frames that this object currently has set coordinates for (also overwriting current answers). This will not automatically propagate the answer to new frames that are added in the future. If this is anything but `None` for non-dynamic attributes, this will throw a ValueError. overwrite: If `True`, the answer will be overwritten if it already exists. If `False`, this will throw a LabelRowError if the answer already exists. This argument is ignored for dynamic attributes. """ if attribute is None: attribute = _infer_attribute_from_answer(self._ontology_object.attributes, answer) if not self._is_attribute_valid_child_of_object_instance(attribute): raise LabelRowError("The attribute is not a valid child of the object.") elif not self._is_selectable_child_attribute(attribute): raise LabelRowError( "Setting a nested attribute is only possible if all parent attributes have been selected." ) elif frames is not None and attribute.dynamic is False: raise LabelRowError("Setting frames is only possible for dynamic attributes.") if attribute.dynamic: self._dynamic_answer_manager.set_answer(answer, attribute, frames) return static_answer = self._static_answer_map[attribute.feature_node_hash] if static_answer.is_answered() and overwrite is False: raise LabelRowError( "The answer to this attribute was already set. Set `overwrite` to `True` if you want to" "overwrite an existing answer to an attribute." ) set_answer_for_object(static_answer, answer)
[docs] def set_answer_from_list(self, answers_list: List[Dict[str, Any]]) -> None: """ This is a low level helper function and should usually not be used directly. Sets the answer for the classification from a dictionary. Args: answer_dict: The dictionary to set the answer from. """ for answer_dict in answers_list: attribute = _get_attribute_by_hash(answer_dict["featureHash"], self._ontology_object.attributes) if attribute is None: raise LabelRowError( "One of the attributes does not exist in the ontology. Cannot create a valid LabelRow." ) if not self._is_attribute_valid_child_of_object_instance(attribute): raise LabelRowError( "One of the attributes set for a classification is not a valid child of the classification. " "Cannot create a valid LabelRow." ) self._set_answer_from_dict(answer_dict, attribute)
[docs] def delete_answer( self, attribute: Attribute, filter_answer: Union[str, Option, Iterable[Option]] = None, filter_frame: Optional[int] = None, ) -> None: """ This resets the answer of an attribute as if it was never set. Args: attribute: The attribute to delete the answer for. filter_answer: A filter for a specific answer value. Delete only answers with the provided value. Only applies to dynamic attributes. filter_frame: A filter for a specific frame. Only applies to dynamic attributes. """ if attribute.dynamic: self._dynamic_answer_manager.delete_answer(attribute, filter_frame, filter_answer) return static_answer = self._static_answer_map[attribute.feature_node_hash] static_answer.unset()
[docs] def check_within_range(self, frame: int) -> None: if frame < 0 or frame >= self._last_frame: raise LabelRowError( f"The supplied frame of `{frame}` is not within the acceptable bounds of `0` to `{self._last_frame}`." )
[docs] def set_for_frames( self, coordinates: Coordinates, frames: Frames = 0, *, overwrite: bool = False, created_at: Optional[datetime] = None, created_by: Optional[str] = None, last_edited_at: Optional[datetime] = None, last_edited_by: Optional[str] = None, confidence: Optional[float] = None, manual_annotation: Optional[bool] = None, reviews: Optional[List[dict]] = None, is_deleted: Optional[bool] = None, ) -> None: """ Places the object onto the specified frame. If the object already exists on the frame and overwrite is set to `True`, the currently specified values will be overwritten. Args: coordinates: The coordinates of the object in the frame. This will throw an error if the type of the coordinates does not match the type of the attribute in the object instance. frames: The frames to add the object instance to. Defaulting to the first frame for convenience. overwrite: If `True`, overwrite existing data for the given frames. This will not reset all the non-specified values. If `False` and data already exists for the given frames, raises an error. created_at: Optionally specify the creation time of the object instance on this frame. Defaults to `datetime.now()`. created_by: Optionally specify the creator of the object instance on this frame. Defaults to the current SDK user. last_edited_at: Optionally specify the last edit time of the object instance on this frame. Defaults to `datetime.now()`. last_edited_by: Optionally specify the last editor of the object instance on this frame. Defaults to the current SDK user. confidence: Optionally specify the confidence of the object instance on this frame. Defaults to `1.0`. manual_annotation: Optionally specify whether the object instance on this frame was manually annotated. Defaults to `True`. reviews: Should only be set by internal functions. is_deleted: Should only be set by internal functions. """ frames_list = frames_class_to_frames_list(frames) for frame in frames_list: existing_frame_data = self._frames_to_instance_data.get(frame) if overwrite is False and existing_frame_data is not None: raise LabelRowError( "Cannot overwrite existing data for a frame. Set `overwrite` to `True` to overwrite." ) check_coordinate_type(coordinates, self._ontology_object) self.check_within_range(frame) if existing_frame_data is None: existing_frame_data = ObjectInstance.FrameData( coordinates=coordinates, object_frame_instance_info=ObjectInstance.FrameInfo() ) self._frames_to_instance_data[frame] = existing_frame_data existing_frame_data.object_frame_instance_info.update_from_optional_fields( created_at=created_at, created_by=created_by, last_edited_at=last_edited_at, last_edited_by=last_edited_by, confidence=confidence, manual_annotation=manual_annotation, reviews=reviews, is_deleted=is_deleted, ) existing_frame_data.coordinates = coordinates if self._parent: self._parent.add_to_single_frame_to_hashes_map(self, frame)
[docs] def get_annotation(self, frame: Union[int, str] = 0) -> Annotation: """ Get the annotation for the object instance on the specified frame. Args: frame: Either the frame number or the image hash if the data type is an image or image group. Defaults to the first frame. """ if isinstance(frame, str): frame = self._parent.get_frame_number(frame) return self.Annotation(self, frame)
[docs] def copy(self) -> ObjectInstance: """ Creates an exact copy of this ObjectInstance but with a new object hash and without being associated to any LabelRowV2. This is useful if you want to add the semantically same ObjectInstance to multiple `LabelRowV2`s. """ ret = ObjectInstance(self._ontology_object) ret._frames_to_instance_data = deepcopy(self._frames_to_instance_data) ret._static_answer_map = deepcopy(self._static_answer_map) ret._dynamic_answer_manager = self._dynamic_answer_manager.copy() return ret
[docs] def get_annotations(self) -> List[Annotation]: """ Get all annotations for the object instance on all frames it has been placed to. Returns: A list of `ObjectInstance.Annotation` in order of available frames. """ ret = [] for frame_num in sorted(self._frames_to_instance_data.keys()): ret.append(self.get_annotation(frame_num)) return ret
[docs] def remove_from_frames(self, frames: Frames): """Ensure that it will be removed from all frames.""" frames_list = frames_class_to_frames_list(frames) for frame in frames_list: self._frames_to_instance_data.pop(frame) if self._parent: self._parent._remove_from_frame_to_hashes_map(frames_list, self.object_hash)
[docs] def is_valid(self) -> None: """Check if is valid, could also return some human/computer messages.""" if len(self._frames_to_instance_data) == 0: raise LabelRowError("ObjectInstance is not on any frames. Please add it to at least one frame.") self.are_dynamic_answers_valid()
[docs] def are_dynamic_answers_valid(self) -> None: """ Whether there are any dynamic answers on frames that have no coordinates. """ dynamic_frames = set(self._dynamic_answer_manager.frames()) local_frames = set(_frame_views_to_frame_numbers(self.get_annotations())) if not len(dynamic_frames - local_frames) == 0: raise LabelRowError( "There are some dynamic answers on frames that have no coordinates. " "Please ensure that all the dynamic answers are only on frames where coordinates " "have been set previously." )
[docs] class Annotation: """ This class can be used to set or get data for a specific annotation (i.e. the ObjectInstance for a given frame number). """ def __init__(self, object_instance: ObjectInstance, frame: int): self._object_instance = object_instance self._frame = frame @property def frame(self) -> int: return self._frame @property def coordinates(self) -> Coordinates: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().coordinates @coordinates.setter def coordinates(self, coordinates: Coordinates) -> None: self._check_if_frame_view_is_valid() self._object_instance.set_for_frames(coordinates, self._frame, overwrite=True) @property def created_at(self) -> datetime: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.created_at @created_at.setter def created_at(self, created_at: datetime) -> None: self._check_if_frame_view_is_valid() self._get_object_frame_instance_data().object_frame_instance_info.created_at = created_at @property def created_by(self) -> Optional[str]: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.created_by @created_by.setter def created_by(self, created_by: Optional[str]) -> None: """ Set the created_by field with a user email or None if it should default to the current user of the SDK. """ self._check_if_frame_view_is_valid() if created_by is not None: check_email(created_by) self._get_object_frame_instance_data().object_frame_instance_info.created_by = created_by @property def last_edited_at(self) -> datetime: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.last_edited_at @last_edited_at.setter def last_edited_at(self, last_edited_at: datetime) -> None: self._check_if_frame_view_is_valid() self._get_object_frame_instance_data().object_frame_instance_info.last_edited_at = last_edited_at @property def last_edited_by(self) -> Optional[str]: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.last_edited_by @last_edited_by.setter def last_edited_by(self, last_edited_by: Optional[str]) -> None: """ Set the last_edited_by field with a user email or None if it should default to the current user of the SDK. """ self._check_if_frame_view_is_valid() if last_edited_by is not None: check_email(last_edited_by) self._get_object_frame_instance_data().object_frame_instance_info.last_edited_by = last_edited_by @property def confidence(self) -> float: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.confidence @confidence.setter def confidence(self, confidence: float) -> None: self._check_if_frame_view_is_valid() self._get_object_frame_instance_data().object_frame_instance_info.confidence = confidence @property def manual_annotation(self) -> bool: self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.manual_annotation @manual_annotation.setter def manual_annotation(self, manual_annotation: bool) -> None: self._check_if_frame_view_is_valid() self._get_object_frame_instance_data().object_frame_instance_info.manual_annotation = manual_annotation @property def reviews(self) -> List[dict]: """ A read only property about the reviews that happened for this object on this frame. """ self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.reviews @property def is_deleted(self) -> bool: """This property is only relevant for internal use.""" self._check_if_frame_view_is_valid() return self._get_object_frame_instance_data().object_frame_instance_info.is_deleted def _get_object_frame_instance_data(self) -> ObjectInstance.FrameData: return self._object_instance._frames_to_instance_data[self._frame] def _check_if_frame_view_is_valid(self) -> None: if self._frame not in self._object_instance._frames_to_instance_data: raise LabelRowError( "Tryinannotation to use an ObjectInstance.Annotation for an ObjectInstance that is not on the frameAnnotation" )
[docs] @dataclass class FrameInfo: created_at: datetime = datetime.now() created_by: Optional[str] = None """None defaults to the user of the SDK once uploaded to the server.""" last_edited_at: datetime = datetime.now() last_edited_by: Optional[str] = None """None defaults to the user of the SDK once uploaded to the server.""" confidence: float = DEFAULT_CONFIDENCE manual_annotation: bool = DEFAULT_MANUAL_ANNOTATION reviews: Optional[List[dict]] = None is_deleted: Optional[bool] = None
[docs] @staticmethod def from_dict(d: dict): if "lastEditedAt" in d: last_edited_at = parse(d["lastEditedAt"]) else: last_edited_at = None return ObjectInstance.FrameInfo( created_at=parse(d["createdAt"]), created_by=d["createdBy"], last_edited_at=last_edited_at, last_edited_by=d.get("lastEditedBy"), confidence=d["confidence"], manual_annotation=d["manualAnnotation"], reviews=d.get("reviews"), is_deleted=d.get("isDeleted"), )
[docs] def update_from_optional_fields( self, created_at: Optional[datetime] = None, created_by: Optional[str] = None, last_edited_at: Optional[datetime] = None, last_edited_by: Optional[str] = None, confidence: Optional[float] = None, manual_annotation: Optional[bool] = None, reviews: Optional[List[dict]] = None, is_deleted: Optional[bool] = None, ) -> None: """Return a new instance with the specified fields updated.""" self.created_at = created_at or self.created_at if created_by is not None: self.created_by = created_by self.last_edited_at = last_edited_at or self.last_edited_at if last_edited_by is not None: self.last_edited_by = last_edited_by if confidence is not None: self.confidence = confidence if manual_annotation is not None: self.manual_annotation = manual_annotation if reviews is not None: self.reviews = reviews if is_deleted is not None: self.is_deleted = is_deleted
[docs] @dataclass class FrameData: coordinates: Coordinates object_frame_instance_info: ObjectInstance.FrameInfo
# Probably the above can be flattened out into this class. def _set_answer_unsafe( self, answer: Union[str, Option, Iterable[Option]], attribute: Attribute, track_hash: str, ranges: Ranges ) -> None: if attribute.dynamic: self._dynamic_answer_manager.set_answer(answer, attribute, frames=ranges) else: static_answer = self._static_answer_map[attribute.feature_node_hash] set_answer_for_object(static_answer, answer) def _set_answer_from_dict(self, answer_dict: Dict[str, Any], attribute: Attribute) -> None: if attribute.dynamic: track_hash = answer_dict["trackHash"] ranges = ranges_list_to_ranges(answer_dict["range"]) else: track_hash = None ranges = None if isinstance(attribute, TextAttribute): self._set_answer_unsafe(answer_dict["answers"], attribute, track_hash, ranges) elif isinstance(attribute, RadioAttribute): feature_hash = answer_dict["answers"][0]["featureHash"] option = _get_option_by_hash(feature_hash, attribute.options) self._set_answer_unsafe(option, attribute, track_hash, ranges) elif isinstance(attribute, ChecklistAttribute): options = [] for answer in answer_dict["answers"]: feature_hash = answer["featureHash"] option = _get_option_by_hash(feature_hash, attribute.options) options.append(option) self._set_answer_unsafe(options, attribute, track_hash, ranges) else: raise NotImplementedError(f"The attribute type {type(attribute)} is not supported.") def _is_attribute_valid_child_of_object_instance(self, attribute: Attribute) -> bool: is_static_child = attribute.feature_node_hash in self._static_answer_map is_dynamic_child = self._dynamic_answer_manager.is_valid_dynamic_attribute(attribute) return is_dynamic_child or is_static_child def _is_selectable_child_attribute(self, attribute: Attribute) -> bool: # I have the ontology classification, so I can build the tree from that. Basically do a DFS. ontology_object = self._ontology_object for search_attribute in ontology_object.attributes: if _search_child_attributes(attribute, search_attribute, self._static_answer_map): return True return False def _get_all_static_answers(self) -> List[Answer]: return list(self._static_answer_map.values()) def _get_all_dynamic_answers(self) -> List[Tuple[Answer, Ranges]]: return self._dynamic_answer_manager.get_all_answers() def __repr__(self): return ( f"ObjectInstance(object_hash={self._object_hash}, object_name={self._ontology_object.name}, " f"object_feature_hash={self._ontology_object.feature_node_hash})" ) def __hash__(self) -> int: return hash(id(self)) def __lt__(self, other: ObjectInstance) -> bool: return self._object_hash < other._object_hash
class DynamicAnswerManager: """ This class is an internal helper class. The user should not interact with it directly. Manages the answers that are set for different frames. This can be part of the ObjectInstance class. """ def __init__(self, object_instance: ObjectInstance): self._object_instance = object_instance self._frames_to_answers: Dict[int, Set[Answer]] = defaultdict(set) self._answers_to_frames: Dict[Answer, Set[int]] = defaultdict(set) self._dynamic_uninitialised_answer_options: Set[Answer] = self._get_dynamic_answers() # ^ these are like the static answers. Everything that is possibly an answer. However, # don't forget also nested-ness. In this case nested-ness should be ignored. # ^ I might not need this object but only need the _get_dynamic_answers object. def is_valid_dynamic_attribute(self, attribute: Attribute) -> bool: feature_node_hash = attribute.feature_node_hash for answer in self._dynamic_uninitialised_answer_options: if answer.ontology_attribute.feature_node_hash == feature_node_hash: return True return False def delete_answer( self, attribute: Attribute, frames: Optional[Frames] = None, filter_answer: Union[str, Option, Iterable[Option], None] = None, ) -> None: if frames is None: frames = [Range(i, i) for i in self._frames_to_answers.keys()] frame_list = frames_class_to_frames_list(frames) for frame in frame_list: to_remove_answer = None for answer_object in self._frames_to_answers[frame]: if filter_answer is not None: answer_value = get_answer_from_object(answer_object) if answer_value != filter_answer: continue # ideally this would not be a log(n) operation, however these will not be extremely large. if answer_object.ontology_attribute == attribute: to_remove_answer = answer_object break if to_remove_answer is not None: self._frames_to_answers[frame].remove(to_remove_answer) self._answers_to_frames[to_remove_answer].remove(frame) if self._answers_to_frames[to_remove_answer] == set(): del self._answers_to_frames[to_remove_answer] def set_answer( self, answer: Union[str, Option, Iterable[Option]], attribute: Attribute, frames: Optional[Frames] = None ) -> None: if frames is None: for available_frame_view in self._object_instance.get_annotations(): self._set_answer(answer, attribute, available_frame_view.frame) return self._set_answer(answer, attribute, frames) def _set_answer(self, answer: Union[str, Option, Iterable[Option]], attribute: Attribute, frames: Frames) -> None: """Set the answer for a single frame""" frame_list = frames_class_to_frames_list(frames) for frame in frame_list: self._object_instance.check_within_range(frame) self.delete_answer(attribute, frames) default_answer = get_default_answer_from_attribute(attribute) set_answer_for_object(default_answer, answer) frame_list = frames_class_to_frames_list(frames) for frame in frame_list: self._frames_to_answers[frame].add(default_answer) self._answers_to_frames[default_answer].add(frame) def get_answer( self, attribute: Attribute, filter_answer: Union[str, Option, Iterable[Option], None] = None, filter_frames: Optional[Frames] = None, ) -> AnswersForFrames: """For a given attribute, return all the answers and frames given the filters.""" ret = [] filter_frames_set = None if filter_frames is None else set(frames_class_to_frames_list(filter_frames)) for answer in self._answers_to_frames: if answer.ontology_attribute != attribute: continue if not (filter_answer is None or filter_answer == get_answer_from_object(answer)): continue actual_frames = self._answers_to_frames[answer] if not (filter_frames_set is None or len(actual_frames & filter_frames_set) > 0): continue ranges = frames_to_ranges(self._answers_to_frames[answer]) ret.append(AnswerForFrames(answer=get_answer_from_object(answer), ranges=ranges)) return ret def frames(self) -> Iterable[int]: """Returns all frames that have answers set.""" return self._frames_to_answers.keys() def get_all_answers(self) -> List[Tuple[Answer, Ranges]]: """Returns all answers that are set.""" ret = [] for answer, frames in self._answers_to_frames.items(): ret.append((answer, frames_to_ranges(frames))) return ret def copy(self) -> DynamicAnswerManager: ret = DynamicAnswerManager(self._object_instance) ret._frames_to_answers = deepcopy(self._frames_to_answers) ret._answers_to_frames = deepcopy(self._answers_to_frames) return ret def _get_dynamic_answers(self) -> Set[Answer]: ret: Set[Answer] = set() for attribute in self._object_instance.ontology_item.attributes: if attribute.dynamic: answer = get_default_answer_from_attribute(attribute) ret.add(answer) return ret def __eq__(self, other: DynamicAnswerManager) -> bool: if not isinstance(other, DynamicAnswerManager): return False return ( self._frames_to_answers == other._frames_to_answers and self._answers_to_frames == other._answers_to_frames ) def __hash__(self) -> int: return hash(id(self)) def check_coordinate_type(coordinates: Coordinates, ontology_object: Object) -> None: expected_coordinate_type = ACCEPTABLE_COORDINATES_FOR_ONTOLOGY_ITEMS[ontology_object.shape] if type(coordinates) != expected_coordinate_type: raise LabelRowError( f"Expected a coordinate of type `{expected_coordinate_type}`, but got type `{type(coordinates)}`." ) AVAILABLE_COLORS = ( "#D33115", "#E27300", "#16406C", "#FE9200", "#FCDC00", "#DBDF00", "#A4DD00", "#68CCCA", "#73D8FF", "#AEA1FF", "#FCC400", "#B0BC00", "#68BC00", "#16A5A5", "#009CE0", "#7B64FF", "#FA28FF", "#B3B3B3", "#9F0500", "#C45100", "#FB9E00", "#808900", "#194D33", "#0C797D", "#0062B1", "#653294", "#AB149E", )
[docs]@dataclass class OntologyStructure: """ This class is currently in BETA. Its API might change in future minor version releases. """ objects: List[Object] = field(default_factory=list) classifications: List[Classification] = field(default_factory=list)
[docs] def get_child_by_hash( self, feature_node_hash: str, type_: Union[OntologyTypes, AttributeTypes, OptionTypes, None] = None, ) -> Union[OntologyClasses, AttributeClasses, OptionClasses]: """ Returns the first child node of this ontology tree node with the matching feature node hash. If there is more than one child with the same feature node hash in the ontology tree node, then the ontology would be in an invalid state. Throws if nothing is found or if the type is not matched. Args: feature_node_hash: the feature_node_hash of the child node to search for in the ontology. type_: The expected type of the item. If the found child does not match the type, an error will be thrown. """ for object_ in self.objects: if object_.feature_node_hash == feature_node_hash: check_type(object_, type_) return object_ found_item = _get_attribute_by_hash(feature_node_hash, object_.attributes) if found_item is not None: check_type(found_item, type_) return found_item for classification in self.classifications: if classification.feature_node_hash == feature_node_hash: check_type(classification, type_) return classification found_item = _get_attribute_by_hash(feature_node_hash, classification.attributes) if found_item is not None: check_type(found_item, type_) return found_item raise OntologyError("Item not found.")
[docs] def get_child_by_title( self, title: str, type_: Union[OntologyTypes, AttributeTypes, OptionTypes, None] = None, ) -> Union[OntologyClasses, AttributeClasses, OptionClasses]: """ Returns a child node of this ontology tree node with the matching title and matching type if specified. If more than one child in this Object have the same title, then an error will be thrown. If no item is found, an error will be thrown as well. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the child node. Only a node that matches this type will be returned. """ found_items = self.get_children_by_title(title, type_) _handle_wrong_number_of_found_items(found_items, title, type_) return found_items[0]
[docs] def get_children_by_title( self, title: str, type_: Union[OntologyTypes, AttributeTypes, OptionTypes, None] = None, ) -> List[Union[OntologyClasses, AttributeClasses, OptionClasses]]: """ Returns all the child nodes of this ontology tree node with the matching title and matching type if specified. Title in ontologies do not need to be unique, however, we recommend unique titles when creating ontologies. Args: title: The exact title of the child node to search for in the ontology. type_: The expected type of the item. Only nodes that match this type will be returned. """ ret = [] for object_ in self.objects: if object_.name == title: if does_type_match(object_, type_): ret.append(object_) found_items = _get_attributes_by_title(title, object_.attributes) filtered_items = filter_by_type(found_items, type_) ret.extend(filtered_items) for classification in self.classifications: if classification.attributes[0].name == title: if does_type_match(classification, type_): ret.append(classification) found_items = _get_attributes_by_title(title, classification.attributes) filtered_items = filter_by_type(found_items, type_) ret.extend(filtered_items) return ret
[docs] @classmethod def from_dict(cls, d: dict) -> OntologyStructure: """ Args: d: a JSON blob of an "ontology structure" (e.g. from Encord web app) Raises: KeyError: If the dict is missing a required field. """ objects_ret: List[Object] = list() for object_dict in d["objects"]: objects_ret.append(Object.from_dict(object_dict)) classifications_ret: List[Classification] = list() for classification_dict in d["classifications"]: classifications_ret.append(Classification.from_dict(classification_dict)) return OntologyStructure(objects=objects_ret, classifications=classifications_ret)
[docs] def to_dict(self) -> dict: """ Returns: The dict equivalent to the ontology. Raises: KeyError: If the dict is missing a required field. """ ret = dict() ontology_objects = list() ret["objects"] = ontology_objects for ontology_object in self.objects: ontology_objects.append(ontology_object.to_dict()) ontology_classifications = list() ret["classifications"] = ontology_classifications for ontology_classification in self.classifications: ontology_classifications.append(ontology_classification.to_dict()) return ret
[docs] def add_object( self, name: str, shape: Shape, uid: Optional[int] = None, color: Optional[str] = None, feature_node_hash: Optional[str] = None, ) -> Object: """ Adds an object class definition to the structure. .. code:: structure = ontology_structure.OntologyStructure() eye = structure.add_object( name="Eye", ) nose = structure.add_object( name="Nose", ) nose_detail = nose.add_attribute( encord.objects.common.ChecklistAttribute, ) nose_detail.add_option(feature_node_hash="2bc17c88", label="Is it a cute nose?") nose_detail.add_option(feature_node_hash="86eaa4f2", label="Is it a wet nose? ") Args: name: the user-visible name of the object shape: the kind of object (bounding box, polygon, etc). See :py:class:`encord.objects.common.Shape` enum for possible values uid: integer identifier of the object. Normally auto-generated; omit this unless the aim is to create an exact clone of existing structure color: the color of the object in the label editor. Normally auto-assigned, should be in '#1A2B3F' syntax. feature_node_hash: global identifier of the object. Normally auto-generated; omit this unless the aim is to create an exact clone of existing structure Returns: the created object class that can be further customised with attributes. """ if uid is None: if self.objects: uid = max([obj.uid for obj in self.objects]) + 1 else: uid = 1 else: if any([obj.uid == uid for obj in self.objects]): raise ValueError(f"Duplicate uid '{uid}'") if color is None: color_index = 0 if self.objects: try: color_index = AVAILABLE_COLORS.index(self.objects[-1].color) + 1 if color_index >= len(AVAILABLE_COLORS): color_index = 0 except ValueError: pass color = AVAILABLE_COLORS[color_index] if feature_node_hash is None: feature_node_hash = str(uuid4())[:8] if any([obj.feature_node_hash == feature_node_hash for obj in self.objects]): raise ValueError(f"Duplicate feature_node_hash '{feature_node_hash}'") obj = Object(uid, name, color, shape, feature_node_hash) self.objects.append(obj) return obj
[docs] def add_classification( self, uid: Optional[int] = None, feature_node_hash: Optional[str] = None, ) -> Classification: """ Adds an classification definition to the ontology. .. code:: structure = ontology_structure.OntologyStructure() cls = structure.add_classification(feature_node_hash="a39d81c0") cat_standing = cls.add_attribute( encord.objects.common.RadioAttribute, feature_node_hash="a6136d14", name="Is the cat standing?", required=True, ) cat_standing.add_option(feature_node_hash="a3aeb48d", label="Yes") cat_standing.add_option(feature_node_hash="d0a4b373", label="No") Args: uid: integer identifier of the object. Normally auto-generated; omit this unless the aim is to create an exact clone of existing structure feature_node_hash: global identifier of the object. Normally auto-generated; omit this unless the aim is to create an exact clone of existing structure Returns: the created classification node. Note that classification attribute should be further specified by calling its `add_attribute()` method. """ if uid is None: if self.classifications: uid = max([cls.uid for cls in self.classifications]) + 1 else: uid = 1 else: if any([cls.uid == uid for cls in self.classifications]): raise ValueError(f"Duplicate uid '{uid}'") if feature_node_hash is None: feature_node_hash = str(uuid4())[:8] if any([cls.feature_node_hash == feature_node_hash for cls in self.classifications]): raise ValueError(f"Duplicate feature_node_hash '{feature_node_hash}'") cls = Classification(uid, feature_node_hash, list()) self.classifications.append(cls) return cls
DATETIME_STRING_FORMAT = "%Y-%m-%d %H:%M:%S" class OntologyUserRole(IntEnum): ADMIN = 0 USER = 1 class Ontology(dict, Formatter): def __init__( self, title: str, structure: OntologyStructure, ontology_hash: str, description: Optional[str] = None, ): """ DEPRECATED - prefer using the :class:`encord.ontology.Ontology` class instead. This class has dict-style accessors for backwards compatibility. Clients who are using this class for the first time are encouraged to use the property accessors and setters instead of the underlying dictionary. The mixed use of the `dict` style member functions and the property accessors and setters is discouraged. WARNING: Do NOT use the `.data` member of this class. Its usage could corrupt the correctness of the datastructure. """ super().__init__( { "ontology_hash": ontology_hash, "title": title, "description": description, "structure": structure, } ) @property def ontology_hash(self) -> str: return self["ontology_hash"] @property def title(self) -> str: return self["title"] @title.setter def title(self, value: str) -> None: self["title"] = value @property def description(self) -> str: return self["description"] @description.setter def description(self, value: str) -> None: self["description"] = value @property def structure(self) -> OntologyStructure: return self["structure"] @structure.setter def structure(self, value: OntologyStructure) -> None: self["structure"] = value @classmethod def from_dict(cls, json_dict: Dict) -> Ontology: return Ontology( title=json_dict["title"], description=json_dict["description"], ontology_hash=json_dict["ontology_hash"], structure=OntologyStructure.from_dict(json_dict["editor"]), ) def _frame_views_to_frame_numbers( frame_views: Sequence[Union[ObjectInstance.Annotation, ClassificationInstance.Annotation, LabelRowV2.FrameView]] ) -> List[int]: return [frame_view.frame for frame_view in frame_views]