"""Provides annotation pipeline class."""
from __future__ import annotations
import abc
import logging
from collections.abc import Iterator, Mapping
from dataclasses import dataclass, field
from types import TracebackType
from typing import Any, Callable, Optional
from dae.annotation.annotatable import Annotatable
from dae.genomic_resources.repository import (
GenomicResource,
GenomicResourceRepo,
)
from dae.variants.variant import SummaryAllele
logger = logging.getLogger(__name__)
[docs]@dataclass(init=False, eq=True, unsafe_hash=True)
class AttributeInfo:
"""Defines annotation attribute configuration."""
def __init__(self, name: str, source: str, internal: bool,
parameters: ParamsUsageMonitor | dict[str, Any],
_type: str = "str", description: str = "",
documentation: Optional[str] = None):
self.name = name
self.source = source
self.internal = internal
if isinstance(parameters, ParamsUsageMonitor):
self.parameters = parameters
else:
self.parameters = ParamsUsageMonitor(parameters)
self.type = _type
self.description = description
self._documentation = documentation
name: str
source: str
internal: bool
parameters: ParamsUsageMonitor
type: str = "str" # str, int, float, annotatable, or object
description: str = "" # interpreted as md
_documentation: Optional[str] = None
@property
def documentation(self) -> str:
if self._documentation is None:
return self.description
return self._documentation
[docs]@dataclass(init=False)
class AnnotatorInfo:
"""Defines annotator configuration."""
def __init__(self, _type: str, attributes: list[AttributeInfo],
parameters: ParamsUsageMonitor | dict[str, Any],
documentation: str = "",
resources: Optional[list[GenomicResource]] = None,
annotator_id: str = "N/A"):
self.type = _type
self.annotator_id = f"{annotator_id}"
self.attributes = attributes
self.documentation = documentation
if isinstance(parameters, ParamsUsageMonitor):
self.parameters = parameters
else:
self.parameters = ParamsUsageMonitor(parameters)
if resources is None:
self.resources = []
else:
self.resources = resources
annotator_id: str = field(compare=False, hash=None)
type: str
attributes: list[AttributeInfo]
parameters: ParamsUsageMonitor
documentation: str = ""
resources: list[GenomicResource] = field(default_factory=list)
def __hash__(self) -> int:
attrs_hash = "".join(str(hash(attr)) for attr in self.attributes)
resources_hash = "".join(str(hash(res)) for res in self.resources)
params_hash = "".join(str(hash(self.parameters)))
return hash(f"{self.type}{attrs_hash}{resources_hash}{params_hash}")
[docs]class Annotator(abc.ABC):
"""Annotator provides a set of attrubutes for a given Annotatable."""
def __init__(self, pipeline: Optional[AnnotationPipeline],
info: AnnotatorInfo):
self.pipeline = pipeline
self._info = info
self._is_open = False
[docs] def get_info(self) -> AnnotatorInfo:
return self._info
[docs] @abc.abstractmethod
def annotate(
self, annotatable: Optional[Annotatable], context: dict[str, Any],
) -> dict[str, Any]:
"""Produce annotation attributes for an annotatable."""
[docs] def close(self) -> None:
self._is_open = False
[docs] def open(self) -> Annotator:
self._is_open = True
return self
[docs] def is_open(self) -> bool:
return self._is_open
@property
def resources(self) -> list[GenomicResource]:
return self._info.resources
@property
def resource_ids(self) -> set[str]:
return {resource.get_id() for resource in self._info.resources}
@property
def attributes(self) -> list[AttributeInfo]:
return self._info.attributes
@property
def used_context_attributes(self) -> tuple[str, ...]:
return tuple()
def _empty_result(self) -> dict[str, Any]:
return {attribute_info.name: None
for attribute_info in self._info.attributes}
[docs]class AnnotationPipeline:
"""Provides annotation pipeline abstraction."""
def __init__(
self, repository: GenomicResourceRepo):
self.repository: GenomicResourceRepo = repository
self.annotators: list[Annotator] = []
self._is_open = False
[docs] def get_info(self) -> list[AnnotatorInfo]:
return [annotator.get_info() for annotator in self.annotators]
[docs] def get_attributes(self) -> list[AttributeInfo]:
return [attribute_info for annotator in self.annotators for
attribute_info in annotator.attributes]
[docs] def get_attribute_info(
self, attribute_name: str) -> Optional[AttributeInfo]:
for annotator in self.annotators:
for attribute_info in annotator.get_info().attributes:
if attribute_info.name == attribute_name:
return attribute_info
return None
[docs] def get_resource_ids(self) -> set[str]:
return {r_id for annotator in self.annotators
for r_id in annotator.resource_ids}
[docs] def get_annotator_by_attribute_info(
self, attribute_info: AttributeInfo,
) -> Optional[Annotator]:
for annotator in self.annotators:
if attribute_info in annotator.attributes:
return annotator
return None
[docs] def add_annotator(self, annotator: Annotator) -> None:
assert isinstance(annotator, Annotator)
self.annotators.append(annotator)
[docs] def annotate(self, annotatable: Annotatable,
context: Optional[dict] = None) -> dict:
"""Apply all annotators to an annotatable."""
if not self._is_open:
self.open()
if context is None:
context = {}
for annotator in self.annotators:
attributes = annotator.annotate(annotatable, context)
context.update(attributes)
return context
[docs] def open(self) -> AnnotationPipeline:
"""Open all annotators in the pipeline and mark it as open."""
if self._is_open:
logger.warning("annotation pipeline is already open")
return self
assert not self._is_open
for annotator in self.annotators:
annotator.open()
self._is_open = True
return self
[docs] def close(self) -> None:
"""Close the annotation pipeline."""
for annotator in self.annotators:
try:
annotator.close()
except Exception: # pylint: disable=broad-except
logger.error(
"exception while closing annotator %s",
annotator.get_info(), exc_info=True)
self._is_open = False
def __enter__(self) -> AnnotationPipeline:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: Optional[BaseException],
exc_tb: TracebackType | None) -> None:
if exc_type is not None:
logger.error(
"exception during annotation: %s, %s, %s",
exc_type, exc_value, exc_tb, exc_info=True)
self.close()
[docs]class ReannotationPipeline(AnnotationPipeline):
"""Special pipeline that handles reannotation of a previous pipeline."""
AnnotationDependencyGraph = dict[
AnnotatorInfo, list[tuple[AnnotatorInfo, AttributeInfo]],
]
def __init__(
self,
pipeline_new: AnnotationPipeline,
pipeline_old: AnnotationPipeline,
):
"""Produce a reannotation pipeline between two annotation pipelines."""
super().__init__(pipeline_new.repository)
self.pipeline_new: AnnotationPipeline = pipeline_new
self.pipeline_old: AnnotationPipeline = pipeline_old
infos_new = pipeline_new.get_info()
infos_old = pipeline_old.get_info()
self.dependency_graph = ReannotationPipeline.build_dependency_graph(
pipeline_new,
)
self.attributes_deleted: list[str] = []
for deleted_info in [i for i in infos_old if i not in infos_new]:
for attr in deleted_info.attributes:
if not attr.internal:
self.attributes_deleted.append(attr.name)
self.annotators_new: set[AnnotatorInfo] = {
i for i in infos_new if i not in infos_old
}
self.annotators_rerun: set[AnnotatorInfo] = set()
for i in self.annotators_new:
for dep in self.get_dependencies_for(i):
self.annotators_rerun.add(dep)
for dep in self.get_dependents_for(i):
self.annotators_rerun.add(dep)
for annotator in self.pipeline_new.annotators:
info = annotator.get_info()
if info in self.annotators_new or info in self.annotators_rerun:
self.annotators.append(annotator)
self.attributes_reused: dict[str, AttributeInfo] = {}
for annotator in self.annotators:
info = annotator.get_info()
for (dep_annotator, dep_attr) in self.dependency_graph[info]:
if dep_annotator in infos_old \
and dep_annotator not in self.annotators_rerun:
self.attributes_reused[dep_attr.name] = dep_attr
logger.debug("REANNOTATION SUMMARY:")
logger.debug("DELETED ATTRIBUTES - %s", self.attributes_deleted)
logger.debug("REUSED ATTRIBUTES - %s", self.attributes_reused)
logger.debug("NEW ANNOTATORS - %s", self.annotators_new)
logger.debug("RE-RUNNING ANNOTATORS - %s", self.annotators_rerun)
[docs] @staticmethod
def build_dependency_graph(
pipeline: AnnotationPipeline,
) -> AnnotationDependencyGraph:
"""Make dependency graph for an annotation pipeline."""
graph: ReannotationPipeline.AnnotationDependencyGraph = {}
for annotator in pipeline.annotators:
annotator_info = annotator.get_info()
if annotator_info not in graph:
graph[annotator_info] = []
for attr in annotator.used_context_attributes:
attr_info = pipeline.get_attribute_info(attr)
assert attr_info is not None
upstream_annotator = \
pipeline.get_annotator_by_attribute_info(attr_info)
assert upstream_annotator is not None
graph[annotator_info].append(
(upstream_annotator.get_info(), attr_info),
)
return graph
[docs] def get_dependencies_for(self, info: AnnotatorInfo) -> set[AnnotatorInfo]:
"""Get all dependencies for a given annotator."""
result: set[AnnotatorInfo] = set()
if info in self.dependency_graph:
for annotator, attr in self.dependency_graph[info]:
if attr.internal:
result.add(annotator)
dependencies = self.get_dependencies_for(annotator)
if dependencies:
result.add(*dependencies)
return result
[docs] def get_dependents_for(self, info: AnnotatorInfo) -> set[AnnotatorInfo]:
"""Get all dependents for a given annotator."""
result: set[AnnotatorInfo] = set()
for dependent, dependencies in self.dependency_graph.items():
if not dependencies:
continue
for dep_annotator, _ in dependencies:
if dep_annotator == info:
result.add(dependent)
further = self.get_dependents_for(dependent)
if further:
result.add(*further)
return result
[docs] def annotate(self, annotatable: Annotatable, record: dict) -> dict: # type: ignore # pylint: disable=arguments-renamed
reused_context: dict[str, Any] = {}
for attr_name, attr in self.attributes_reused.items():
raw_value = record[attr_name]
converted_value: Any = None
if attr.type == "int":
converted_value = int(raw_value)
elif attr.type == "float":
converted_value = float(raw_value)
elif attr.type == "annotatable":
converted_value = Annotatable.from_string(raw_value)
elif attr.type == "object":
raise ValueError("Cannot deserialize object attribute - ",
attr_name)
reused_context[attr_name] = converted_value
return super().annotate(annotatable, reused_context)
[docs] def annotate_summary_allele(self, allele: SummaryAllele) -> dict:
annotatable = allele.get_annotatable()
reused_context: dict[str, Any] = {}
for attr_name, _ in self.attributes_reused.items():
reused_context[attr_name] = allele.get_attribute(attr_name)
return super().annotate(annotatable, reused_context)
[docs] def get_attributes(self) -> list[AttributeInfo]:
return self.pipeline_new.get_attributes()
[docs]class AnnotatorDecorator(Annotator):
"""Defines annotator decorator base class."""
def __init__(self, child: Annotator):
super().__init__(child.pipeline, child.get_info())
self.child = child
[docs] def close(self) -> None:
self.child.close()
[docs] def open(self) -> Annotator:
return self.child.open()
[docs] def is_open(self) -> bool:
return self.child.is_open()
def __getattr__(self, name: str) -> Any:
return getattr(self.child, name)
[docs]class ParamsUsageMonitor(Mapping):
"""Class to monitor usage of annotator parameters."""
def __init__(self, data: dict[str, Any]):
self._data = dict(data)
self._used_keys: set[str] = set([])
def __hash__(self) -> int:
return hash(tuple(sorted(self._data.items())))
def __getitem__(self, key: str) -> Any:
self._used_keys.add(key)
return self._data[key]
def __len__(self) -> int:
return len(self._data)
def __iter__(self) -> Iterator:
raise ValueError("Should not iterate a parameter dictionary.")
def __repr__(self) -> str:
return self._data.__repr__()
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ParamsUsageMonitor):
return False
return self._data == other._data
[docs] def get_used_keys(self) -> set[str]:
return self._used_keys
[docs] def get_unused_keys(self) -> set[str]:
return set(self._data.keys()) - self._used_keys