from __future__ import annotations
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Type, Union
from encord.client import EncordClientProject
from encord.client import LabelRow as OrmLabelRow
from encord.constants.enums import DataType
from encord.exceptions import LabelRowError, WrongProjectTypeError
from encord.http.bundle import Bundle, BundleResultHandler, BundleResultMapper
from encord.http.limits import (
LABEL_ROW_BUNDLE_CREATE_LIMIT,
LABEL_ROW_BUNDLE_GET_LIMIT,
LABEL_ROW_BUNDLE_SAVE_LIMIT,
)
from encord.objects.attributes import Attribute
from encord.objects.classification import Classification
from encord.objects.classification_instance import ClassificationInstance
from encord.objects.constants import ( # pylint: disable=unused-import # for backward compatibility
AVAILABLE_COLORS,
DATETIME_LONG_STRING_FORMAT,
DEFAULT_CONFIDENCE,
DEFAULT_MANUAL_ANNOTATION,
)
from encord.objects.coordinates import (
BitmaskCoordinates,
BoundingBoxCoordinates,
Coordinates,
PointCoordinate,
PolygonCoordinates,
PolylineCoordinates,
RotatableBoundingBoxCoordinates,
)
from encord.objects.frames import Frames, frames_class_to_frames_list
from encord.objects.metadata import DICOMSeriesMetadata, DICOMSliceMetadata
from encord.objects.ontology_object import Object
from encord.objects.ontology_object_instance import ObjectInstance
from encord.objects.ontology_structure import OntologyStructure
from encord.objects.utils import _lower_snake_case
from encord.ontology import Ontology
from encord.orm.label_row import (
AnnotationTaskStatus,
LabelRowMetadata,
LabelStatus,
WorkflowGraphNode,
)
from encord.orm.ontology import ( # pylint: disable=unused-import # for backward compatibility
OntologyUserRole,
)
from encord.orm.project import TaskPriorityParams
log = logging.getLogger(__name__)
OntologyTypes = Union[Type[Object], Type[Classification]]
OntologyClasses = Union[Object, Classification]
@dataclass
class BundledGetRowsPayload:
uids: List[str]
get_signed_url: bool
include_object_feature_hashes: Optional[Set[str]]
include_classification_feature_hashes: Optional[Set[str]]
include_reviews: bool
@dataclass
class BundledCreateRowsPayload:
uids: List[str]
@dataclass
class BundledSaveRowsPayload:
uids: List[str]
payload: List[Dict]
[docs]class LabelRowV2:
"""
This class represents a single label row. It is corresponding to exactly one data row within a project. It holds all
the labels for that data row.
You can access a many metadata fields with this class directly. If you want to read or write labels you will need to
call :meth:`.initialise_labels()` first. To upload your added labels call :meth:`.save()`.
"""
def __init__(
self,
label_row_metadata: LabelRowMetadata,
project_client: EncordClientProject,
ontology: Ontology,
) -> None:
self._project_client = project_client
self._ontology = ontology
self._label_row_read_only_data: LabelRowV2.LabelRowReadOnlyData = self._parse_label_row_metadata(
label_row_metadata
)
self._is_labelling_initialised = False
self._frame_to_hashes: defaultdict[int, Set[str]] = defaultdict(set)
# ^ frames to object and classification hashes
self._metadata: Optional[DICOMSeriesMetadata] = None
self._frame_metadata: defaultdict[int, Optional[DICOMSliceMetadata]] = defaultdict(lambda: None)
self._classifications_to_frames: defaultdict[Classification, Set[int]] = defaultdict(set)
self._objects_map: Dict[str, ObjectInstance] = dict()
self._classifications_map: Dict[str, ClassificationInstance] = dict()
# ^ conveniently a dict is ordered in Python. Use this to our advantage to keep the labels in order
# at least at the final objects_index/classifications_index level.
@property
def label_hash(self) -> Optional[str]:
return self._label_row_read_only_data.label_hash
@property
def data_hash(self) -> str:
return self._label_row_read_only_data.data_hash
@property
def dataset_hash(self) -> str:
return self._label_row_read_only_data.dataset_hash
@property
def dataset_title(self) -> str:
return self._label_row_read_only_data.dataset_title
@property
def data_title(self) -> str:
return self._label_row_read_only_data.data_title
@property
def data_type(self) -> DataType:
return self._label_row_read_only_data.data_type
@property
def label_status(self) -> LabelStatus:
"""
Returns the current labeling status for the label row.
**Note**: This method is not supported for workflow-based projects. Please see our
:ref:`workflow documentation <tutorials/workflows:Workflows>`
for more details.
"""
if self.__is_tms2_project:
raise WrongProjectTypeError(
'"label_status" property returns incorrect results for workflow-based projects.\
Please use "workflow_graph_node" property instead.'
)
return self._label_row_read_only_data.label_status
@property
def annotation_task_status(self) -> AnnotationTaskStatus:
"""
Returns the current annotation task status for the label row.
**Note**: This method is not supported for workflow-based projects. Please see our
:ref:`workflow documentation <tutorials/workflows:Workflows>`
for more details.
"""
if self.__is_tms2_project:
raise WrongProjectTypeError(
'"annotation_task_status" property returns incorrect results for workflow-based projects.\
Please use "workflow_graph_node" property instead.'
)
assert self._label_row_read_only_data.annotation_task_status is not None # Never None for Workflow projects
return self._label_row_read_only_data.annotation_task_status
@property
def workflow_graph_node(self) -> Optional[WorkflowGraphNode]:
return self._label_row_read_only_data.workflow_graph_node
@property
def is_shadow_data(self) -> bool:
return self._label_row_read_only_data.is_shadow_data
@property
def created_at(self) -> Optional[datetime]:
"""The creation date of the label row. None if the label row was not yet created."""
return self._label_row_read_only_data.created_at
@property
def last_edited_at(self) -> Optional[datetime]:
"""The time the label row was updated last as a whole. None if the label row was not yet created."""
return self._label_row_read_only_data.last_edited_at
@property
def number_of_frames(self) -> int:
return self._label_row_read_only_data.number_of_frames
@property
def duration(self) -> Optional[float]:
"""Only a value for Video data types."""
return self._label_row_read_only_data.duration
@property
def fps(self) -> Optional[float]:
"""Only a value for Video data types."""
return self._label_row_read_only_data.fps
@property
def data_link(self) -> Optional[str]:
"""
The data link in either your cloud storage or the encord storage to the underlying object. This will be `None`
for DICOM series or image groups that have been created without performance optimisations, as there is no
single underlying file for these data types.
"""
return self._label_row_read_only_data.data_link
@property
def width(self) -> Optional[int]:
"""
This is `None` for image groups without performance optimisation, as there is no single underlying width
for this data type.
"""
return self._label_row_read_only_data.width
@property
def height(self) -> Optional[int]:
"""
This is `None` for image groups without performance optimisation, as there is no single underlying width
for this data type.
"""
return self._label_row_read_only_data.height
@property
def priority(self) -> Optional[float]:
"""
Get workflow priority for the task associated with the data unit.
This property only works for workflow-based project.
It is None for label rows in "complete" state.
"""
if not self.__is_tms2_project:
raise WrongProjectTypeError('"priority" property only works with workflow-based projects.')
return self._label_row_read_only_data.priority
@property
def ontology_structure(self) -> OntologyStructure:
"""Get the corresponding ontology structure"""
return self._ontology.structure
@property
def is_labelling_initialised(self) -> bool:
"""
Whether you can start labelling or not. If this is `False`, call the member :meth:`.initialise_labels()` to
read or write specific ObjectInstances or ClassificationInstances.
"""
return self._is_labelling_initialised
@property
def __is_tms2_project(self) -> bool:
return self.workflow_graph_node is not None
[docs] def initialise_labels(
self,
include_object_feature_hashes: Optional[Set[str]] = None,
include_classification_feature_hashes: Optional[Set[str]] = None,
include_reviews: bool = False,
overwrite: bool = False,
bundle: Optional[Bundle] = None,
) -> None:
"""
Call this function to download or export labels stored on the Encord server, as well as to perform any other
reading or writing operations. If you only want to inspect a subset of labels, you can filter them.
Please note that if you filter the labels, and upload them later, you will effectively delete all the labels
that were previously filtered.
If the label was not yet in progress, this will set the label status to `LabelStatus.LABEL_IN_PROGRESS`.
You can call this function at any point to overwrite the current labels stored in this class with the most
up to date labels stored in the Encord servers. This would only matter if you manipulate the labels while
someone else is working on the labels as well. You would need to supply the `overwrite` parameter to `True`
Args:
include_object_feature_hashes: If None all the objects will be included. Otherwise, only objects labels
will be included of which the feature_hash has been added. WARNING: it is only recommended to use
this filter if you are reading (not writing) the labels. If you are requesting a subset of objects and
later, save the label, you will effectively delete all the object instances that are stored in the
Encord platform, which were not included in this filtered subset.
include_classification_feature_hashes: If None all the classifications will be included. Otherwise, only
classification labels will be included of which the feature_hash has been added. WARNING: it is only
recommended to use this filter if you are reading (not writing) the labels. If you are requesting a
subset of classifications and later, save the label, you will effectively delete all the
classification instances that are stored in the Encord platform, which were not included in this
filtered subset.
include_reviews: Whether to request read only information about the reviews of the label row.
overwrite: If the label row was already initialised, you need to set this flag to `True` to overwrite the
current labels with the labels stored in the Encord server. If this is `False` and the label row was
already initialised, this function will throw an error.
bundle: If not passed, initialisation is performed independently. If passed, it will be delayed and
initialised along with other objects in the same bundle.
"""
if self.is_labelling_initialised and not overwrite:
raise LabelRowError(
"You are trying to re-initialise a label row that has already been initialised. This would overwrite "
"current labels. If this is your intend, set the `overwrite` flag to `True`."
)
if bundle is not None:
self.__batched_initialise(
bundle,
uid=self.label_hash,
get_signed_url=False,
include_object_feature_hashes=include_object_feature_hashes,
include_classification_feature_hashes=include_classification_feature_hashes,
include_reviews=include_reviews,
)
else:
if self.label_hash is None:
label_row_dict = self._project_client.create_label_row(self.data_hash)
else:
label_row_dict = self._project_client.get_label_row(
uid=self.label_hash,
get_signed_url=False,
include_object_feature_hashes=include_object_feature_hashes,
include_classification_feature_hashes=include_classification_feature_hashes,
include_reviews=include_reviews,
)
self.from_labels_dict(label_row_dict)
def __batched_initialise(
self,
bundle: Bundle,
uid,
get_signed_url,
include_object_feature_hashes,
include_classification_feature_hashes,
include_reviews,
):
if self.label_hash is None:
bundle.add(
operation=self._project_client.create_label_rows,
request_reducer=self._bundle_create_rows_reducer,
result_mapper=BundleResultMapper[OrmLabelRow](
result_mapping_predicate=self._bundle_create_rows_mapping_predicate,
result_handler=BundleResultHandler(predicate=self.data_hash, handler=self.from_labels_dict),
),
payload=BundledCreateRowsPayload(
uids=[self.data_hash],
),
limit=LABEL_ROW_BUNDLE_CREATE_LIMIT,
)
else:
bundle.add(
operation=self._project_client.get_label_rows,
request_reducer=self._bundle_get_rows_reducer,
result_mapper=BundleResultMapper[OrmLabelRow](
result_mapping_predicate=self._bundle_get_rows_mapping_predicate,
result_handler=BundleResultHandler(predicate=self.label_hash, handler=self.from_labels_dict),
),
payload=BundledGetRowsPayload(
uids=[uid],
get_signed_url=get_signed_url,
include_object_feature_hashes=include_object_feature_hashes,
include_classification_feature_hashes=include_classification_feature_hashes,
include_reviews=include_reviews,
),
limit=LABEL_ROW_BUNDLE_GET_LIMIT,
)
@staticmethod
def _bundle_create_rows_reducer(
bundle_payload: BundledCreateRowsPayload, payload: BundledCreateRowsPayload
) -> BundledCreateRowsPayload:
bundle_payload.uids += payload.uids
return bundle_payload
@staticmethod
def _bundle_get_rows_reducer(
bundle_payload: BundledGetRowsPayload, payload: BundledGetRowsPayload
) -> BundledGetRowsPayload:
bundle_payload.uids += payload.uids
return bundle_payload
@staticmethod
def _bundle_get_rows_mapping_predicate(entry: OrmLabelRow) -> str:
return entry["label_hash"]
@staticmethod
def _bundle_create_rows_mapping_predicate(entry: OrmLabelRow) -> str:
return entry["data_hash"]
[docs] def from_labels_dict(self, label_row_dict: dict) -> None:
"""
If you have a label row dictionary in the same format that the Encord servers produce, you can initialise the
LabelRow from that directly. In most cases you should prefer using the `initialise_labels` method.
This function also initialises the label row.
Calling this function will reset all the labels that are currently stored within this class.
Args:
label_row_dict: The dictionary of all labels as expected by the Encord format.
"""
self._is_labelling_initialised = True
self._label_row_read_only_data = self._parse_label_row_dict(label_row_dict)
self._frame_to_hashes = defaultdict(set)
self._classifications_to_frames = defaultdict(set)
self._metadata = None
self._frame_metadata = defaultdict(lambda: None)
self._objects_map = dict()
self._classifications_map = dict()
self._parse_labels_from_dict(label_row_dict)
[docs] def get_image_hash(self, frame_number: int) -> Optional[str]:
"""
Get the corresponding image hash of the frame number. Return `None` if the frame number is out of bounds.
Raise an error if this function is used for non-image data types.
"""
self._check_labelling_is_initalised()
if self.data_type not in (DataType.IMAGE, DataType.IMG_GROUP):
raise LabelRowError("This function is only supported for label rows of image or image group data types.")
return self._label_row_read_only_data.frame_to_image_hash.get(frame_number)
[docs] def get_frame_number(self, image_hash: str) -> Optional[int]:
"""
Get the corresponding image hash of the frame number. Return `None` if the image hash was not found with an
associated frame number.
Raise an error if this function is used for non-image data types.
"""
self._check_labelling_is_initalised()
if self.data_type not in (DataType.IMAGE, DataType.IMG_GROUP):
raise LabelRowError("This function is only supported for label rows of image or image group data types.")
return self._label_row_read_only_data.image_hash_to_frame[image_hash]
[docs] def save(self, bundle: Optional[Bundle] = None) -> None:
"""
Upload the created labels with the Encord server. This will overwrite any labels that someone has created
in the platform in the meantime.
Args:
bundle: if not passed, save is executed immediately. If passed, it is executed as a part of the bundle
"""
self._check_labelling_is_initalised()
dict_labels = self.to_encord_dict()
if bundle is None:
self._project_client.save_label_row(uid=self.label_hash, label=dict_labels)
else:
assert self.label_hash is not None # Checked earlier, assert is mostly to silence mypy
bundle.add(
operation=self._project_client.save_label_rows,
request_reducer=self._batch_save_rows_reducer,
result_mapper=None,
payload=BundledSaveRowsPayload(uids=[self.label_hash], payload=[dict_labels]),
limit=LABEL_ROW_BUNDLE_SAVE_LIMIT,
)
@staticmethod
def _batch_save_rows_reducer(
bundle_payload: BundledSaveRowsPayload, payload: BundledSaveRowsPayload
) -> BundledSaveRowsPayload:
bundle_payload.uids += payload.uids
bundle_payload.payload += payload.payload
return bundle_payload
@property
def metadata(self) -> Optional[DICOMSeriesMetadata]:
"""
Metadata for the given data type.
Currently only supported for DICOM, and will return `None` for other formats.
Label row needs to be initialised before using this property
"""
self._check_labelling_is_initalised()
return self._metadata
[docs] def get_frame_view(self, frame: Union[int, str] = 0) -> FrameView:
"""
Args:
frame: Either the frame number or the image hash if the data type is an image or image group.
Defaults to the first frame.
"""
self._check_labelling_is_initalised()
if isinstance(frame, str):
frame_num = self.get_frame_number(frame)
if frame_num is None:
raise LabelRowError(f"Image hash {frame} not found in the label row")
else:
frame_num = frame
return self.FrameView(self, self._label_row_read_only_data, frame_num)
[docs] def get_frame_views(self) -> List[FrameView]:
"""
Returns:
A list of frame views in order of available frames.
"""
self._check_labelling_is_initalised()
ret = []
for frame in range(self.number_of_frames):
ret.append(self.get_frame_view(frame))
return ret
[docs] def get_object_instances(
self, filter_ontology_object: Optional[Object] = None, filter_frames: Optional[Frames] = None
) -> List[ObjectInstance]:
"""
Args:
filter_ontology_object:
Optionally filter by a specific ontology object.
filter_frames:
Optionally filter by specific frames.
Returns:
All the `ObjectInstance`s that match the filter.
"""
self._check_labelling_is_initalised()
ret: List[ObjectInstance] = list()
if filter_frames is not None:
filtered_frames_list = frames_class_to_frames_list(filter_frames)
else:
filtered_frames_list = list()
for object_ in self._objects_map.values():
# filter by ontology object
if not (
filter_ontology_object is None
or object_.ontology_item.feature_node_hash == filter_ontology_object.feature_node_hash
):
continue
# filter by frame
if filter_frames is None:
append = True
else:
append = False
for frame in filtered_frames_list:
hashes = self._frame_to_hashes.get(frame, set())
if object_.object_hash in hashes:
append = True
break
if append:
ret.append(object_)
return ret
[docs] def add_object_instance(self, object_instance: ObjectInstance, force: bool = True) -> None:
"""
Add an object instance to the label row. If the object instance already exists, it
Args:
object_instance: The object instance to add.
force: overwrites current objects, otherwise this will replace the current object.
"""
self._check_labelling_is_initalised()
object_instance.is_valid()
if object_instance.is_assigned_to_label_row():
raise LabelRowError(
"The supplied ObjectInstance is already part of a LabelRowV2. You can only add a ObjectInstance to one "
"LabelRowV2. You can do a ObjectInstance.copy() to create an identical ObjectInstance which is not part of "
"any LabelRowV2."
)
object_hash = object_instance.object_hash
if object_hash in self._objects_map and not force:
raise LabelRowError(
"The supplied ObjectInstance was already previously added. (the object_hash is the same)."
)
elif object_hash in self._objects_map and force:
self._objects_map.pop(object_hash)
self._objects_map[object_hash] = object_instance
object_instance._parent = self
frames = set(_frame_views_to_frame_numbers(object_instance.get_annotations()))
self._add_to_frame_to_hashes_map(object_instance, frames)
[docs] def add_classification_instance(self, classification_instance: ClassificationInstance, force: bool = False) -> None:
"""
Add a classification instance to the label row.
Args:
classification_instance: The object instance to add.
force: overwrites current objects, otherwise this will replace the current object.
"""
self._check_labelling_is_initalised()
classification_instance.is_valid()
if classification_instance.is_assigned_to_label_row():
raise LabelRowError(
"The supplied ClassificationInstance is already part of a LabelRowV2. You can only add a ClassificationInstance"
" to one LabelRowV2. You can do a ClassificationInstance.copy() to create an identical ObjectInstance which is "
"not part of any LabelRowV2."
)
frames = set(_frame_views_to_frame_numbers(classification_instance.get_annotations()))
classification_hash = classification_instance.classification_hash
already_present_frame = self._is_classification_already_present(
classification_instance.ontology_item,
frames,
)
if classification_hash in self._classifications_map and not force:
raise LabelRowError(
"The supplied ClassificationInstance was already previously added. (the classification_hash is the same)."
)
if already_present_frame is not None and not force:
raise LabelRowError(
f"A ClassificationInstance of the same type was already added and has overlapping frames. One "
f"overlapping frame that was found is `{already_present_frame}`. Make sure that you only add "
f"classifications which are on frames where the same type of classification does not yet exist."
)
if classification_hash in self._classifications_map and force:
self._classifications_map.pop(classification_hash)
self._classifications_map[classification_hash] = classification_instance
classification_instance._parent = self
self._classifications_to_frames[classification_instance.ontology_item].update(frames)
self._add_to_frame_to_hashes_map(classification_instance, frames)
[docs] def remove_classification(self, classification_instance: ClassificationInstance):
"""Remove a classification instance from a label row."""
self._check_labelling_is_initalised()
classification_hash = classification_instance.classification_hash
self._classifications_map.pop(classification_hash)
all_frames = self._classifications_to_frames[classification_instance.ontology_item]
actual_frames = _frame_views_to_frame_numbers(classification_instance.get_annotations())
for actual_frame in actual_frames:
all_frames.remove(actual_frame)
[docs] def add_to_single_frame_to_hashes_map(
self, label_item: Union[ObjectInstance, ClassificationInstance], frame: int
) -> None:
"""This is an internal function, it is not meant to be called by the SDK user."""
self._check_labelling_is_initalised()
if isinstance(label_item, ObjectInstance):
self._frame_to_hashes[frame].add(label_item.object_hash)
elif isinstance(label_item, ClassificationInstance):
self._frame_to_hashes[frame].add(label_item.classification_hash)
else:
raise NotImplementedError(f"Got an unexpected label item class `{type(label_item)}`")
[docs] def get_classification_instances(
self, filter_ontology_classification: Optional[Classification] = None, filter_frames: Optional[Frames] = None
) -> List[ClassificationInstance]:
"""
Args:
filter_ontology_classification:
Optionally filter by a specific ontology classification.
filter_frames:
Optionally filter by specific frames.
Returns:
All the `ObjectInstance`s that match the filter.
"""
self._check_labelling_is_initalised()
ret: List[ClassificationInstance] = list()
if filter_frames is not None:
filtered_frames_list = frames_class_to_frames_list(filter_frames)
else:
filtered_frames_list = list()
for classification in self._classifications_map.values():
# filter by ontology object
if not (
filter_ontology_classification is None
or classification.ontology_item.feature_node_hash == filter_ontology_classification.feature_node_hash
):
continue
# filter by frame
if filter_frames is None:
append = True
else:
append = False
for frame in filtered_frames_list:
hashes = self._frame_to_hashes.get(frame, set())
if classification.classification_hash in hashes:
append = True
break
if append:
ret.append(classification)
return ret
[docs] def remove_object(self, object_instance: ObjectInstance):
"""Remove an object instance from a label row."""
self._check_labelling_is_initalised()
self._objects_map.pop(object_instance.object_hash)
self._remove_from_frame_to_hashes_map(
_frame_views_to_frame_numbers(object_instance.get_annotations()), object_instance.object_hash
)
object_instance._parent = None
[docs] def to_encord_dict(self) -> Dict[str, Any]:
"""
This is an internal helper function. Likely this should not be used by a user. To upload labels use the
:meth:`.save()` function.
"""
self._check_labelling_is_initalised()
ret: Dict[str, Any] = {}
read_only_data = self._label_row_read_only_data
ret["label_hash"] = read_only_data.label_hash
ret["created_at"] = read_only_data.created_at
ret["last_edited_at"] = read_only_data.last_edited_at
ret["data_hash"] = read_only_data.data_hash
ret["dataset_hash"] = read_only_data.dataset_hash
ret["dataset_title"] = read_only_data.dataset_title
ret["data_title"] = read_only_data.data_title
ret["data_type"] = read_only_data.data_type.value
ret["annotation_task_status"] = read_only_data.annotation_task_status
ret["is_shadow_data"] = read_only_data.is_shadow_data
ret["object_answers"] = self._to_object_answers()
ret["classification_answers"] = self._to_classification_answers()
ret["object_actions"] = self._to_object_actions()
ret["label_status"] = read_only_data.label_status.value
ret["data_units"] = self._to_encord_data_units()
return ret
[docs] def workflow_reopen(self) -> None:
"""
A label row is returned to the first annotation stage for re-labeling.
No data will be lost during this call.
This method is only relevant for the projects that use the :ref:`Workflow <tutorials/workflows:Workflows>`
feature, and will raise an error for pre-workflow projects.
"""
if self.label_hash is None:
# Label has not yet moved from the initial state, nothing to do
return
self._project_client.workflow_reopen([self.label_hash])
[docs] def workflow_complete(self) -> None:
"""
A label row is moved to the final workflow node, marking it as 'Complete'.
This method can be called only for labels for which :meth:`.initialise_labels()` was called at least ance, and
consequentially the "label_hash" field is not `None`.
Please note that labels need not be initialized every time the workflow_complete() method is called.
This method is only relevant for the projects that use the :ref:`Workflow <tutorials/workflows:Workflows>`
feature, and will raise an error for projects that don't use Workflows.
"""
if self.label_hash is None:
raise LabelRowError(
"For this operation you will need to initialise labelling first. Call the .initialise_labels() "
"to do so first."
)
self._project_client.workflow_complete([self.label_hash])
[docs] def set_priority(self, priority: float) -> None:
"""
Set priority for task in workflow project.
Args:
priority: float value from 0.0 to 1.0, where 1.0 is the highest priority
"""
if not self.__is_tms2_project:
raise WrongProjectTypeError("Setting priority only possible for workflow-based projects")
params = TaskPriorityParams(priorities=[(self.data_hash, priority)])
self._project_client.workflow_set_priority(params)
[docs] class FrameView:
"""
This class can be used to inspect what object/classification instances are on a given frame or
what metadata, such as a image file size, is on a given frame.
"""
def __init__(
self, label_row: LabelRowV2, label_row_read_only_data: LabelRowV2.LabelRowReadOnlyData, frame: int
):
self._label_row = label_row
self._label_row_read_only_data = label_row_read_only_data
self._frame = frame
@property
def image_hash(self) -> str:
if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]:
raise LabelRowError("Image hash can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP")
return self._frame_level_data().image_hash
@property
def image_title(self) -> str:
if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]:
raise LabelRowError("Image title can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP")
return self._frame_level_data().image_title
@property
def file_type(self) -> str:
if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]:
raise LabelRowError("File type can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP")
return self._frame_level_data().file_type
@property
def frame(self) -> int:
return self._frame
@property
def width(self) -> int:
if self._label_row.data_type in [DataType.IMG_GROUP]:
return self._frame_level_data().width
elif self._label_row_read_only_data.width is not None:
return self._label_row_read_only_data.width
else:
raise LabelRowError(f"Width is expected but not set for the data type {self._label_row.data_type}")
@property
def height(self) -> int:
if self._label_row.data_type in [DataType.IMG_GROUP]:
return self._frame_level_data().height
elif self._label_row_read_only_data.height is not None:
return self._label_row_read_only_data.height
else:
raise LabelRowError(f"Height is expected but not set for the data type {self._label_row.data_type}")
@property
def data_link(self) -> Optional[str]:
if self._label_row.data_type not in [DataType.IMAGE, DataType.IMG_GROUP]:
raise LabelRowError("Data link can only be retrieved for DataType.IMAGE or DataType.IMG_GROUP")
return self._frame_level_data().data_link
@property
def metadata(self) -> Optional[DICOMSliceMetadata]:
"""
Annotation metadata.
Particular format depends on the data type.
Currently only supported for DICOM, and will return `None` for other formats.
"""
return self._label_row._frame_metadata[self._frame]
[docs] def add_object_instance(
self,
object_instance: ObjectInstance,
coordinates: Coordinates,
*,
overwrite: bool = False,
created_at: Optional[datetime] = None,
created_by: Optional[str] = None,
last_edited_at: Optional[datetime] = None,
last_edited_by: Optional[str] = None,
confidence: Optional[float] = None,
manual_annotation: Optional[bool] = None,
) -> None:
label_row = object_instance.is_assigned_to_label_row()
if label_row and self._label_row != label_row:
raise LabelRowError(
"This object instance is already assigned to a different label row. It can not be "
"added to multiple label rows at once."
)
object_instance.set_for_frames(
coordinates,
self._frame,
overwrite=overwrite,
created_at=created_at,
created_by=created_by,
last_edited_at=last_edited_at,
last_edited_by=last_edited_by,
confidence=confidence,
manual_annotation=manual_annotation,
)
if not label_row:
self._label_row.add_object_instance(object_instance)
[docs] def add_classification_instance(
self,
classification_instance: ClassificationInstance,
*,
overwrite: bool = False,
created_at: Optional[datetime] = None,
created_by: Optional[str] = None,
confidence: float = DEFAULT_CONFIDENCE,
manual_annotation: bool = DEFAULT_MANUAL_ANNOTATION,
last_edited_at: Optional[datetime] = None,
last_edited_by: Optional[str] = None,
) -> None:
if created_at is None:
created_at = datetime.now()
if last_edited_at is None:
last_edited_at = datetime.now()
label_row = classification_instance.is_assigned_to_label_row()
if label_row and self._label_row != label_row:
raise LabelRowError(
"This object instance is already assigned to a different label row. It can not be "
"added to multiple label rows at once."
)
classification_instance.set_for_frames(
self._frame,
overwrite=overwrite,
created_at=created_at,
created_by=created_by,
confidence=confidence,
manual_annotation=manual_annotation,
last_edited_at=last_edited_at,
last_edited_by=last_edited_by,
)
if not label_row:
self._label_row.add_classification_instance(classification_instance)
[docs] def get_object_instances(self, filter_ontology_object: Optional[Object] = None) -> List[ObjectInstance]:
"""
Args:
filter_ontology_object:
Optionally filter by a specific ontology object.
Returns:
All the `ObjectInstance`s that match the filter.
"""
return self._label_row.get_object_instances(
filter_ontology_object=filter_ontology_object, filter_frames=self._frame
)
[docs] def get_classification_instances(
self, filter_ontology_classification: Optional[Classification] = None
) -> List[ClassificationInstance]:
"""
Args:
filter_ontology_classification:
Optionally filter by a specific ontology object.
Returns:
All the `ObjectInstance`s that match the filter.
"""
return self._label_row.get_classification_instances(
filter_ontology_classification=filter_ontology_classification, filter_frames=self._frame
)
def _frame_level_data(self) -> LabelRowV2.FrameLevelImageGroupData:
return self._label_row_read_only_data.frame_level_data[self._frame]
def __repr__(self):
return f"FrameView(label_row={self._label_row}, frame={self._frame})"
[docs] @dataclass(frozen=True)
class FrameLevelImageGroupData:
"""This is an internal helper class. A user should not directly interact with it."""
image_hash: str
image_title: str
file_type: str
frame_number: int
width: int
height: int
data_link: Optional[str] = None
[docs] @dataclass(frozen=True)
class LabelRowReadOnlyData:
"""This is an internal helper class. A user should not directly interact with it."""
label_hash: Optional[str]
"""This is None if the label row does not have any labels and was not initialised for labelling."""
created_at: Optional[datetime]
"""This is None if the label row does not have any labels and was not initialised for labelling."""
last_edited_at: Optional[datetime]
"""This is None if the label row does not have any labels and was not initialised for labelling."""
data_hash: str
data_type: DataType
label_status: LabelStatus
annotation_task_status: Optional[AnnotationTaskStatus]
workflow_graph_node: Optional[WorkflowGraphNode]
is_shadow_data: bool
number_of_frames: int
duration: Optional[float]
fps: Optional[float]
dataset_hash: str
dataset_title: str
data_title: str
width: Optional[int]
height: Optional[int]
data_link: Optional[str]
priority: Optional[float]
frame_level_data: Dict[int, LabelRowV2.FrameLevelImageGroupData] = field(default_factory=dict)
image_hash_to_frame: Dict[str, int] = field(default_factory=dict)
frame_to_image_hash: Dict[int, str] = field(default_factory=dict)
def _to_object_answers(self) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
for obj in self._objects_map.values():
all_static_answers = self._get_all_static_answers(obj)
ret[obj.object_hash] = {
"classifications": list(reversed(all_static_answers)),
"objectHash": obj.object_hash,
}
return ret
def _to_object_actions(self) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
for obj in self._objects_map.values():
all_static_answers = self._dynamic_answers_to_encord_dict(obj)
if len(all_static_answers) == 0:
continue
ret[obj.object_hash] = {
"actions": list(reversed(all_static_answers)),
"objectHash": obj.object_hash,
}
return ret
def _to_classification_answers(self) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
for classification in self._classifications_map.values():
all_static_answers = classification.get_all_static_answers()
classifications = [answer.to_encord_dict() for answer in all_static_answers if answer.is_answered()]
ret[classification.classification_hash] = {
"classifications": list(reversed(classifications)),
"classificationHash": classification.classification_hash,
}
return ret
@staticmethod
def _get_all_static_answers(object_instance: ObjectInstance) -> List[Dict[str, Any]]:
"""Essentially convert to the JSON format of all the static answers."""
ret = []
for answer in object_instance._get_all_static_answers():
d_opt = answer.to_encord_dict()
if d_opt is not None:
ret.append(d_opt)
return ret
@staticmethod
def _dynamic_answers_to_encord_dict(object_instance: ObjectInstance) -> List[Dict[str, Any]]:
ret = []
for answer, ranges in object_instance._get_all_dynamic_answers():
d_opt = answer.to_encord_dict(ranges)
if d_opt is not None:
ret.append(d_opt)
return ret
def _to_encord_data_units(self) -> Dict[str, Any]:
frame_level_data = self._label_row_read_only_data.frame_level_data
return {value.image_hash: self._to_encord_data_unit(value) for value in frame_level_data.values()}
def _to_encord_data_unit(self, frame_level_data: FrameLevelImageGroupData) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
data_type = self._label_row_read_only_data.data_type
if data_type == DataType.IMG_GROUP:
data_sequence: Union[str, int] = str(frame_level_data.frame_number)
elif data_type in (DataType.VIDEO, DataType.DICOM, DataType.IMAGE):
data_sequence = frame_level_data.frame_number
else:
raise NotImplementedError(f"The data type {data_type} is not implemented yet.")
ret["data_hash"] = frame_level_data.image_hash
ret["data_title"] = frame_level_data.image_title
if data_type != DataType.DICOM:
ret["data_link"] = frame_level_data.data_link
ret["data_type"] = frame_level_data.file_type
ret["data_sequence"] = data_sequence
ret["width"] = frame_level_data.width
ret["height"] = frame_level_data.height
ret["labels"] = self._to_encord_labels(frame_level_data)
if self._label_row_read_only_data.duration is not None:
ret["data_duration"] = self._label_row_read_only_data.duration
if self._label_row_read_only_data.fps is not None:
ret["data_fps"] = self._label_row_read_only_data.fps
return ret
def _to_encord_labels(self, frame_level_data: FrameLevelImageGroupData) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
data_type = self._label_row_read_only_data.data_type
if data_type in [DataType.IMAGE, DataType.IMG_GROUP]:
frame = frame_level_data.frame_number
ret.update(self._to_encord_label(frame))
elif data_type in [DataType.VIDEO, DataType.DICOM]:
for frame in self._frame_to_hashes.keys():
ret[str(frame)] = self._to_encord_label(frame)
return ret
def _to_encord_label(self, frame: int) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
ret["objects"] = self._to_encord_objects_list(frame)
ret["classifications"] = self._to_encord_classifications_list(frame)
return ret
def _to_encord_objects_list(self, frame: int) -> list:
# Get objects for frame
ret: List[dict] = []
objects = self.get_object_instances(filter_frames=frame)
for object_ in objects:
encord_object = self._to_encord_object(object_, frame)
ret.append(encord_object)
return ret
def _to_encord_object(
self,
object_: ObjectInstance,
frame: int,
) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
object_instance_annotation = object_.get_annotation(frame)
coordinates = object_instance_annotation.coordinates
ontology_hash = object_.ontology_item.feature_node_hash
ontology_object = self._ontology.structure.get_child_by_hash(ontology_hash, type_=Object)
ret["name"] = ontology_object.name
ret["color"] = ontology_object.color
ret["shape"] = ontology_object.shape.value
ret["value"] = _lower_snake_case(ontology_object.name)
ret["createdAt"] = object_instance_annotation.created_at.strftime(DATETIME_LONG_STRING_FORMAT)
ret["createdBy"] = object_instance_annotation.created_by
ret["confidence"] = object_instance_annotation.confidence
ret["objectHash"] = object_.object_hash
ret["featureHash"] = ontology_object.feature_node_hash
ret["manualAnnotation"] = object_instance_annotation.manual_annotation
if object_instance_annotation.last_edited_at is not None:
ret["lastEditedAt"] = object_instance_annotation.last_edited_at.strftime(DATETIME_LONG_STRING_FORMAT)
if object_instance_annotation.last_edited_by is not None:
ret["lastEditedBy"] = object_instance_annotation.last_edited_by
if object_instance_annotation.is_deleted is not None:
ret["isDeleted"] = object_instance_annotation.is_deleted
self._add_coordinates_to_encord_object(coordinates, frame, ret)
return ret
def _add_coordinates_to_encord_object(
self, coordinates: Coordinates, frame: int, encord_object: Dict[str, Any]
) -> None:
if isinstance(coordinates, BoundingBoxCoordinates):
encord_object["boundingBox"] = coordinates.to_dict()
elif isinstance(coordinates, RotatableBoundingBoxCoordinates):
encord_object["rotatableBoundingBox"] = coordinates.to_dict()
elif isinstance(coordinates, PolygonCoordinates):
encord_object["polygon"] = coordinates.to_dict()
elif isinstance(coordinates, PolylineCoordinates):
encord_object["polyline"] = coordinates.to_dict()
elif isinstance(coordinates, PointCoordinate):
encord_object["point"] = coordinates.to_dict()
elif isinstance(coordinates, BitmaskCoordinates):
frame_view = self.get_frame_view(frame)
if not (
frame_view.height == coordinates._encoded_bitmask.height
and frame_view.width == coordinates._encoded_bitmask.width
):
raise ValueError("Bitmask dimensions don't match the media dimensions")
encord_object["bitmask"] = coordinates.to_dict()
else:
raise NotImplementedError(f"adding coordinatees for this type not yet implemented {type(coordinates)}")
def _to_encord_classifications_list(self, frame: int) -> list:
ret: List[Dict[str, Any]] = []
classifications = self.get_classification_instances(filter_frames=frame)
for classification in classifications:
encord_classification = self._to_encord_classification(classification, frame)
ret.append(encord_classification)
return ret
def _to_encord_classification(self, classification: ClassificationInstance, frame: int) -> Dict[str, Any]:
ret: Dict[str, Any] = {}
annotation = classification.get_annotation(frame)
classification_feature_hash = classification.ontology_item.feature_node_hash
ontology_classification = self._ontology.structure.get_child_by_hash(
classification_feature_hash, type_=Classification
)
attribute_hash = classification.ontology_item.attributes[0].feature_node_hash
ontology_attribute = self._ontology.structure.get_child_by_hash(attribute_hash, type_=Attribute)
ret["name"] = ontology_attribute.name
ret["value"] = _lower_snake_case(ontology_attribute.name)
ret["createdAt"] = annotation.created_at.strftime(DATETIME_LONG_STRING_FORMAT)
ret["createdBy"] = annotation.created_by
ret["confidence"] = annotation.confidence
ret["featureHash"] = ontology_classification.feature_node_hash
ret["classificationHash"] = classification.classification_hash
ret["manualAnnotation"] = annotation.manual_annotation
if annotation.last_edited_at is not None:
ret["lastEditedAt"] = annotation.last_edited_at.strftime(DATETIME_LONG_STRING_FORMAT)
if annotation.last_edited_by is not None:
ret["lastEditedBy"] = annotation.last_edited_by
return ret
def _is_classification_already_present(
self, classification: Classification, frames: Iterable[int]
) -> Optional[int]:
present_frames = self._classifications_to_frames.get(classification, set())
return next((frame for frame in frames if frame in present_frames), None)
def _add_frames_to_classification(self, classification: Classification, frames: Iterable[int]) -> None:
self._classifications_to_frames[classification].update(set(frames))
def _remove_frames_from_classification(self, classification: Classification, frames: Iterable[int]) -> None:
present_frames = self._classifications_to_frames.get(classification, set())
for frame in frames:
present_frames.remove(frame)
def _add_to_frame_to_hashes_map(
self, label_item: Union[ObjectInstance, ClassificationInstance], frames: Iterable[int]
) -> None:
for frame in frames:
self.add_to_single_frame_to_hashes_map(label_item, frame)
def _remove_from_frame_to_hashes_map(self, frames: Iterable[int], item_hash: str):
for frame in frames:
self._frame_to_hashes[frame].remove(item_hash)
def _parse_label_row_metadata(self, label_row_metadata: LabelRowMetadata) -> LabelRowV2.LabelRowReadOnlyData:
data_type = DataType.from_upper_case_string(label_row_metadata.data_type)
return LabelRowV2.LabelRowReadOnlyData(
label_hash=label_row_metadata.label_hash,
data_hash=label_row_metadata.data_hash,
data_title=label_row_metadata.data_title,
dataset_hash=label_row_metadata.dataset_hash,
dataset_title=label_row_metadata.dataset_title,
data_type=data_type,
data_link=label_row_metadata.data_link,
label_status=label_row_metadata.label_status,
annotation_task_status=label_row_metadata.annotation_task_status,
workflow_graph_node=label_row_metadata.workflow_graph_node,
is_shadow_data=label_row_metadata.is_shadow_data,
created_at=label_row_metadata.created_at,
last_edited_at=label_row_metadata.last_edited_at,
duration=label_row_metadata.duration,
fps=label_row_metadata.frames_per_second,
number_of_frames=label_row_metadata.number_of_frames,
width=label_row_metadata.width,
height=label_row_metadata.height,
priority=label_row_metadata.priority,
)
def _parse_label_row_dict(self, label_row_dict: dict) -> LabelRowReadOnlyData:
frame_level_data = self._parse_image_group_frame_level_data(label_row_dict["data_units"])
image_hash_to_frame = {item.image_hash: item.frame_number for item in frame_level_data.values()}
frame_to_image_hash = {item.frame_number: item.image_hash for item in frame_level_data.values()}
data_type = DataType(label_row_dict["data_type"])
if data_type == DataType.VIDEO:
video_dict = list(label_row_dict["data_units"].values())[0]
data_link = video_dict["data_link"]
height = video_dict["height"]
width = video_dict["width"]
elif data_type == DataType.DICOM:
dicom_dict = list(label_row_dict["data_units"].values())[0]
data_link = None
height = dicom_dict["height"]
width = dicom_dict["width"]
elif data_type == DataType.IMAGE:
image_dict = list(label_row_dict["data_units"].values())[0]
data_link = image_dict["data_link"]
height = image_dict["height"]
width = image_dict["width"]
elif data_type == DataType.IMG_GROUP:
data_link = None
height = None
width = None
else:
raise NotImplementedError(f"The data type {data_type} is not implemented yet.")
return LabelRowV2.LabelRowReadOnlyData(
label_hash=label_row_dict["label_hash"],
dataset_hash=label_row_dict["dataset_hash"],
dataset_title=label_row_dict["dataset_title"],
data_title=label_row_dict["data_title"],
data_hash=label_row_dict["data_hash"],
data_type=data_type,
label_status=LabelStatus(label_row_dict["label_status"]),
annotation_task_status=label_row_dict.get("annotation_task_status"),
workflow_graph_node=label_row_dict.get(
"workflow_graph_node", self._label_row_read_only_data.workflow_graph_node
),
is_shadow_data=self.is_shadow_data,
created_at=label_row_dict["created_at"],
last_edited_at=label_row_dict["last_edited_at"],
frame_level_data=frame_level_data,
image_hash_to_frame=image_hash_to_frame,
frame_to_image_hash=frame_to_image_hash,
duration=self.duration,
fps=self.fps,
number_of_frames=self.number_of_frames,
data_link=data_link,
height=height,
width=width,
priority=label_row_dict.get("priority", self._label_row_read_only_data.priority),
)
def _parse_labels_from_dict(self, label_row_dict: dict):
classification_answers = label_row_dict["classification_answers"]
for data_unit in label_row_dict["data_units"].values():
data_type = DataType(label_row_dict["data_type"])
if data_type in {DataType.IMG_GROUP, DataType.IMAGE}:
frame = int(data_unit["data_sequence"])
self._add_object_instances_from_objects(data_unit["labels"].get("objects", []), frame)
self._add_classification_instances_from_classifications(
data_unit["labels"].get("classifications", []),
classification_answers,
frame,
)
elif data_type in {DataType.VIDEO, DataType.DICOM}:
for frame, frame_data in data_unit["labels"].items():
frame_num = int(frame)
self._add_object_instances_from_objects(frame_data["objects"], frame_num)
self._add_classification_instances_from_classifications(
frame_data["classifications"], classification_answers, frame_num
)
self._add_frame_metadata(frame_num, frame_data.get("metadata"))
else:
raise NotImplementedError(f"Got an unexpected data type `{data_type}`")
self._add_data_unit_metadata(data_type, data_unit.get("metadata"))
self._add_objects_answers(label_row_dict)
self._add_action_answers(label_row_dict)
def _add_frame_metadata(self, frame: int, metadata: Optional[Dict[str, str]]):
if metadata is not None:
self._frame_metadata[frame] = DICOMSliceMetadata.from_dict(metadata)
else:
self._frame_metadata[frame] = None
def _add_data_unit_metadata(self, data_type: DataType, metadata: Optional[Dict[str, str]]) -> None:
if metadata is None:
self._metadata = None
return
if data_type == DataType.DICOM:
self._metadata = DICOMSeriesMetadata.from_dict(metadata)
else:
log.warning(
f"Unexpected metadata for the data type: {data_type}. Please update the Encord SDK to the latest version."
)
def _add_object_instances_from_objects(
self,
objects_list: List[dict],
frame: int,
) -> None:
for frame_object_label in objects_list:
object_hash = frame_object_label["objectHash"]
if object_hash not in self._objects_map:
object_instance = self._create_new_object_instance(frame_object_label, frame)
self.add_object_instance(object_instance)
else:
self._add_coordinates_to_object_instance(frame_object_label, frame)
def _add_objects_answers(self, label_row_dict: dict):
for answer in label_row_dict["object_answers"].values():
object_hash = answer["objectHash"]
object_instance = self._objects_map[object_hash]
answer_list = answer["classifications"]
object_instance.set_answer_from_list(answer_list)
def _add_action_answers(self, label_row_dict: dict):
for answer in label_row_dict["object_actions"].values():
object_hash = answer["objectHash"]
object_instance = self._objects_map[object_hash]
answer_list = answer["actions"]
object_instance.set_answer_from_list(answer_list)
def _create_new_object_instance(self, frame_object_label: dict, frame: int) -> ObjectInstance:
ontology = self._ontology.structure
feature_hash = frame_object_label["featureHash"]
object_hash = frame_object_label["objectHash"]
label_class = ontology.get_child_by_hash(feature_hash, type_=Object)
object_instance = ObjectInstance(label_class, object_hash=object_hash)
coordinates = self._get_coordinates(frame_object_label)
object_frame_instance_info = ObjectInstance.FrameInfo.from_dict(frame_object_label)
object_instance.set_for_frames(
coordinates=coordinates,
frames=frame,
created_at=object_frame_instance_info.created_at,
created_by=object_frame_instance_info.created_by,
last_edited_at=object_frame_instance_info.last_edited_at,
last_edited_by=object_frame_instance_info.last_edited_by,
confidence=object_frame_instance_info.confidence,
manual_annotation=object_frame_instance_info.manual_annotation,
reviews=object_frame_instance_info.reviews,
is_deleted=object_frame_instance_info.is_deleted,
)
return object_instance
def _add_coordinates_to_object_instance(
self,
frame_object_label: dict,
frame: int = 0,
) -> None:
object_hash = frame_object_label["objectHash"]
object_instance = self._objects_map[object_hash]
coordinates = self._get_coordinates(frame_object_label)
object_frame_instance_info = ObjectInstance.FrameInfo.from_dict(frame_object_label)
object_instance.set_for_frames(
coordinates=coordinates,
frames=frame,
created_at=object_frame_instance_info.created_at,
created_by=object_frame_instance_info.created_by,
last_edited_at=object_frame_instance_info.last_edited_at,
last_edited_by=object_frame_instance_info.last_edited_by,
confidence=object_frame_instance_info.confidence,
manual_annotation=object_frame_instance_info.manual_annotation,
reviews=object_frame_instance_info.reviews,
is_deleted=object_frame_instance_info.is_deleted,
)
def _get_coordinates(self, frame_object_label: dict) -> Coordinates:
if "boundingBox" in frame_object_label:
return BoundingBoxCoordinates.from_dict(frame_object_label)
if "rotatableBoundingBox" in frame_object_label:
return RotatableBoundingBoxCoordinates.from_dict(frame_object_label)
elif "polygon" in frame_object_label:
return PolygonCoordinates.from_dict(frame_object_label)
elif "point" in frame_object_label:
return PointCoordinate.from_dict(frame_object_label)
elif "polyline" in frame_object_label:
return PolylineCoordinates.from_dict(frame_object_label)
elif "skeleton" in frame_object_label:
raise NotImplementedError("Got a skeleton object, which is not supported yet")
elif "bitmask" in frame_object_label:
return BitmaskCoordinates.from_dict(frame_object_label)
else:
raise NotImplementedError(f"Getting coordinates for `{frame_object_label}` is not supported yet.")
def _add_classification_instances_from_classifications(
self, classifications_list: List[dict], classification_answers: dict, frame: int
):
for frame_classification_label in classifications_list:
classification_hash = frame_classification_label["classificationHash"]
if classification_hash not in self._classifications_map:
classification_instance = self._create_new_classification_instance(
frame_classification_label, frame, classification_answers
)
self.add_classification_instance(classification_instance)
else:
self._add_frames_to_classification_instance(frame_classification_label, frame)
def _parse_image_group_frame_level_data(self, label_row_data_units: dict) -> Dict[int, FrameLevelImageGroupData]:
frame_level_data: Dict[int, LabelRowV2.FrameLevelImageGroupData] = {}
for payload in label_row_data_units.values():
frame_number = int(payload["data_sequence"])
frame_level_image_group_data = self.FrameLevelImageGroupData(
image_hash=payload["data_hash"],
image_title=payload["data_title"],
data_link=payload.get("data_link"),
file_type=payload["data_type"],
frame_number=frame_number,
width=payload["width"],
height=payload["height"],
)
frame_level_data[frame_number] = frame_level_image_group_data
return frame_level_data
def _create_new_classification_instance(
self, frame_classification_label: dict, frame: int, classification_answers: dict
) -> ClassificationInstance:
feature_hash = frame_classification_label["featureHash"]
classification_hash = frame_classification_label["classificationHash"]
label_class = self._ontology.structure.get_child_by_hash(feature_hash, type_=Classification)
classification_instance = ClassificationInstance(label_class, classification_hash=classification_hash)
frame_view = ClassificationInstance.FrameData.from_dict(frame_classification_label)
classification_instance.set_for_frames(
frame,
created_at=frame_view.created_at,
created_by=frame_view.created_by,
confidence=frame_view.confidence,
manual_annotation=frame_view.manual_annotation,
last_edited_at=frame_view.last_edited_at,
last_edited_by=frame_view.last_edited_by,
reviews=frame_view.reviews,
)
answers_dict = classification_answers[classification_hash]["classifications"]
self._add_static_answers_from_dict(classification_instance, answers_dict)
return classification_instance
def _add_static_answers_from_dict(
self, classification_instance: ClassificationInstance, answers_list: List[dict]
) -> None:
classification_instance.set_answer_from_list(answers_list)
def _add_frames_to_classification_instance(self, frame_classification_label: dict, frame: int) -> None:
object_hash = frame_classification_label["classificationHash"]
classification_instance = self._classifications_map[object_hash]
frame_view = ClassificationInstance.FrameData.from_dict(frame_classification_label)
classification_instance.set_for_frames(
frame,
created_at=frame_view.created_at,
created_by=frame_view.created_by,
confidence=frame_view.confidence,
manual_annotation=frame_view.manual_annotation,
last_edited_at=frame_view.last_edited_at,
last_edited_by=frame_view.last_edited_by,
reviews=frame_view.reviews,
)
def _check_labelling_is_initalised(self):
if not self.is_labelling_initialised:
raise LabelRowError(
"For this operation you will need to initialise labelling first. Call the `.initialise_labels()` "
"to do so first."
)
def __repr__(self) -> str:
return f"LabelRowV2(label_hash={self.label_hash}, data_hash={self.data_hash}, data_title={self.data_title})"
def _frame_views_to_frame_numbers(
frame_views: Sequence[Union[ObjectInstance.Annotation, ClassificationInstance.Annotation, LabelRowV2.FrameView]]
) -> List[int]:
return [frame_view.frame for frame_view in frame_views]