"""Factory for creation of annotation pipeline."""
# import collections
import copy
import fnmatch
import logging
from collections import Counter
from textwrap import dedent
from typing import Any, Callable, Dict, List, Optional
import yaml
from dae.annotation.annotation_pipeline import (
AnnotationPipeline,
Annotator,
AnnotatorInfo,
AttributeInfo,
InputAnnotableAnnotatorDecorator,
ReannotationPipeline,
ValueTransformAnnotatorDecorator,
)
from dae.genomic_resources import build_genomic_resource_repository
from dae.genomic_resources.repository import GenomicResourceRepo
logger = logging.getLogger(__name__)
_ANNOTATOR_FACTORY_REGISTRY: dict[
str, Callable[[AnnotationPipeline, AnnotatorInfo], Annotator]] = {}
_EXTENTIONS_LOADED = False
def _load_annotator_factory_plugins() -> None:
# pylint: disable=global-statement
global _EXTENTIONS_LOADED
if _EXTENTIONS_LOADED:
return
# pylint: disable=import-outside-toplevel
from importlib_metadata import entry_points
discovered_entries = entry_points(group="dae.annotation.annotators")
for entry in discovered_entries:
annotator_type = entry.name
factory = entry.load()
if annotator_type in _ANNOTATOR_FACTORY_REGISTRY:
logger.warning(
"overwriting annotator type: %s", annotator_type)
_ANNOTATOR_FACTORY_REGISTRY[annotator_type] = factory
_EXTENTIONS_LOADED = True
[docs]def get_annotator_factory(
annotator_type: str,
) -> Callable[[AnnotationPipeline, AnnotatorInfo], Annotator]:
"""Find and return a factory function for creation of an annotator type.
If the specified annotator type is not found, this function raises
`ValueError` exception.
:return: the annotator factory for the specified annotator type.
:raises ValueError: when can't find an annotator factory for the
specified annotator type.
"""
_load_annotator_factory_plugins()
if annotator_type not in _ANNOTATOR_FACTORY_REGISTRY:
raise ValueError(f"unsupported annotator type: {annotator_type}")
return _ANNOTATOR_FACTORY_REGISTRY[annotator_type]
[docs]def get_available_annotator_types() -> List[str]:
"""Return the list of all registered annotator factory types."""
_load_annotator_factory_plugins()
return list(_ANNOTATOR_FACTORY_REGISTRY.keys())
[docs]def register_annotator_factory(
annotator_type: str,
factory: Callable[[AnnotationPipeline, AnnotatorInfo], Annotator],
) -> None:
"""Register additional annotator factory.
By default all genotype storage factories should be registered at
`[dae.genotype_storage.factories]` extenstion point. All registered
factories are loaded automatically. This function should be used if you
want to bypass extension point mechanism and register addition genotype
storage factory programatically.
"""
_load_annotator_factory_plugins()
if annotator_type in _ANNOTATOR_FACTORY_REGISTRY:
logger.warning("overwriting annotator type: %s", annotator_type)
_ANNOTATOR_FACTORY_REGISTRY[annotator_type] = factory
[docs]class AnnotationConfigParser:
"""Parser for annotation configuration."""
[docs] @staticmethod
def normalize(pipeline_config: List[Any]) -> List[Dict]:
"""Return a normalized annotation pipeline configuration."""
result = []
for config in pipeline_config:
if isinstance(config, str):
config = {
config: {},
}
assert isinstance(config, dict)
assert len(config) == 1
annotator_type, config = next(iter(config.items()))
if not isinstance(config, dict):
assert isinstance(config, str)
# handle score annotators short form
config = {"resource_id": config}
assert isinstance(config, dict)
config["annotator_type"] = annotator_type
result.append(config)
return result
[docs] @staticmethod
def match_labels_query(
query: dict[str, str], resource_labels: dict[str, str],
) -> bool:
"""Check if the labels query for a wildcard matches."""
for k, v in query.items():
if k not in resource_labels \
or not fnmatch.fnmatch(resource_labels[k], v):
return False
return True
[docs] @staticmethod
def query_resources(
annotator_type: str, wildcard: str, grr: GenomicResourceRepo,
) -> list[str]:
"""Collect resources matching a given query."""
result = []
labels_query: dict[str, str] = {}
# Handle querying by labels
if wildcard.endswith("]"):
assert "[" in wildcard
wildcard, raw_labels = wildcard.split("[")
labels = raw_labels.strip("]").split(" and ")
for label in labels:
k, v = label.split("=")
labels_query[k] = v
for resource in grr.get_all_resources():
if (resource.get_type() == annotator_type
and fnmatch.fnmatch(resource.get_id(), wildcard)
and AnnotationConfigParser.match_labels_query(
labels_query, resource.get_labels())):
result.append(resource.get_id())
return result
[docs] @staticmethod
def has_wildcard(string: str) -> bool:
"""Ascertain whether a string contains a valid wildcard."""
if "*" in string:
if "[" not in string or string.index("*") < string.index("["):
# We assert that at least one wildcard symbol is present
# in the resource id itself, since '*' can also be used
# in the label query as well.
return True
return False
[docs] @staticmethod
def parse_minimal(raw: str, idx: int) -> AnnotatorInfo:
"""Parse a minimal-form annotation config."""
return AnnotatorInfo(raw, [], {}, annotator_id=f"A{idx}")
[docs] @staticmethod
def parse_short(
raw: dict[str, Any], idx: int,
grr: Optional[GenomicResourceRepo] = None,
) -> list[AnnotatorInfo]:
"""Parse a short-form annotation config."""
ann_type, ann_details = next(iter(raw.items()))
if AnnotationConfigParser.has_wildcard(ann_details):
assert grr is not None
matching_resources = AnnotationConfigParser.query_resources(
ann_type, ann_details, grr,
)
return [
AnnotatorInfo(
ann_type, [], {"resource_id": resource},
annotator_id=f"A{idx}_{resource}",
)
for resource in matching_resources
]
return [
AnnotatorInfo(
ann_type, [], {"resource_id": ann_details},
annotator_id=f"A{idx}",
),
]
[docs] @staticmethod
def parse_complete(raw: dict[str, Any], idx: int) -> AnnotatorInfo:
"""Parse a full-form annotation config."""
ann_type, ann_details = next(iter(raw.items()))
attributes = []
if "attributes" in ann_details:
attributes = AnnotationConfigParser.parse_raw_attributes(
ann_details["attributes"],
)
parameters = {k: v for k, v in ann_details.items()
if k != "attributes"}
return AnnotatorInfo(
ann_type, attributes, parameters, annotator_id=f"A{idx}",
)
[docs] @staticmethod
def parse_raw(
pipeline_raw_config: Optional[list[dict[str, Any]]],
grr: Optional[GenomicResourceRepo] = None,
) -> list[AnnotatorInfo]:
"""Parse raw dictionary annotation pipeline configuration."""
if pipeline_raw_config is None:
logger.warning("empty annotation pipeline configuration")
return []
if not isinstance(pipeline_raw_config, list):
raise AnnotationConfigurationError(
"The annotation is not a list of annotator configurations.")
result = []
for idx, raw_cfg in enumerate(pipeline_raw_config):
if isinstance(raw_cfg, str):
# the minimal annotator configuration form
result.append(
AnnotationConfigParser.parse_minimal(raw_cfg, idx),
)
continue
if isinstance(raw_cfg, dict):
ann_details = next(iter(raw_cfg.values()))
if isinstance(ann_details, str):
# the short annotator configuation form
result.extend(AnnotationConfigParser.parse_short(
raw_cfg, idx, grr,
))
continue
if isinstance(ann_details, dict):
# the complete annotator configuration form
result.append(
AnnotationConfigParser.parse_complete(raw_cfg, idx),
)
continue
raise AnnotationConfigurationError(dedent(f"""
Incorrect annotator configuation form: {raw_cfg}.
The allowed forms are:
* minimal
- <annotator type>
* short
- <annotator type>: <resource_id_pattern>
* complete without attributes
- <annotator type>:
<param1>: <value1>
...
* complete with attributes
- <annotator type>:
<param1>: <value1>
...
attributes:
- <att1 config>
....
"""))
return result
[docs] @staticmethod
def parse_str(
content: str, source_file_name: Optional[str] = None,
grr: Optional[GenomicResourceRepo] = None,
) -> list[AnnotatorInfo]:
"""Parse annotation pipeline configuration string."""
try:
pipeline_raw_config = yaml.safe_load(content)
except Exception as error: # pylint: disable=broad-except
if source_file_name is None:
raise AnnotationConfigurationError(
f"The pipeline configuration {content} is an invalid yaml "
"string.", error) from error
raise AnnotationConfigurationError(
f"The pipeline configuration file {source_file_name} is "
"an invalid yaml file.", error) from error
return AnnotationConfigParser.parse_raw(pipeline_raw_config, grr=grr)
[docs] @staticmethod
def parse_config_file(
filename: str, grr: Optional[GenomicResourceRepo],
) -> List[AnnotatorInfo]:
"""Parse annotation pipeline configuration file."""
logger.info("loading annotation pipeline configuration: %s", filename)
try:
with open(filename, "rt", encoding="utf8") as infile:
content = infile.read()
except Exception as error:
raise AnnotationConfigurationError(
f"Problem reading the contents of the {filename} file.",
error) from error
return AnnotationConfigParser.parse_str(content, grr=grr)
[docs] @staticmethod
def parse_raw_attribute_config(
raw_attribute_config: dict[str, Any]) -> AttributeInfo:
"""Parse annotation attribute raw configuration."""
attribute_config = copy.deepcopy(raw_attribute_config)
if "destination" in attribute_config:
logger.warning(
"usage of 'destination' in annotators attribute configuration "
"is deprecated; use 'name' instead")
name = attribute_config.get("destination")
attribute_config.pop("destination")
attribute_config["name"] = name
name = attribute_config.get("name")
source = attribute_config.get("source")
if name is None and source is None:
raise ValueError(f"The raw attribute configuraion "
f"{attribute_config} has neigther "
"name nor source.")
name = name if name else source
source = source if source else name
internal = bool(attribute_config.get("internal", False))
assert source is not None
if not isinstance(name, str):
message = "The name for in an attribute " + \
f"config {attribute_config} should be a string"
raise ValueError(message)
parameters = {k: v for k, v in attribute_config.items()
if k not in ["name", "source", "internal"]}
return AttributeInfo(name, source, internal, parameters)
[docs] @staticmethod
def parse_raw_attributes(
raw_attributes_config: Any) -> list[AttributeInfo]:
"""Parse annotator pipeline attribute configuration."""
if not isinstance(raw_attributes_config, list):
message = "The attributes parameters should be a list."
raise ValueError(message)
attribute_config = []
for raw_attribute_config in raw_attributes_config:
if isinstance(raw_attribute_config, str):
raw_attribute_config = {"name": raw_attribute_config}
attribute_config.append(
AnnotationConfigParser.parse_raw_attribute_config(
raw_attribute_config))
return attribute_config
[docs]class AnnotationConfigurationError(ValueError):
pass
[docs]def build_annotation_pipeline(
pipeline_config: Optional[list[AnnotatorInfo]] = None,
pipeline_config_raw: Optional[list[dict]] = None,
pipeline_config_file: Optional[str] = None,
pipeline_config_str: Optional[str] = None,
grr_repository: Optional[GenomicResourceRepo] = None,
grr_repository_file: Optional[str] = None,
grr_repository_definition: Optional[dict] = None,
allow_repeated_attributes: bool = False,
) -> AnnotationPipeline:
"""Build an annotation pipeline."""
if not grr_repository:
grr_repository = build_genomic_resource_repository(
definition=grr_repository_definition,
file_name=grr_repository_file)
else:
assert grr_repository_file is None
assert grr_repository_definition is None
if pipeline_config_file is not None:
assert pipeline_config is None
assert pipeline_config_raw is None
assert pipeline_config_str is None
pipeline_config = AnnotationConfigParser.parse_config_file(
pipeline_config_file, grr=grr_repository)
elif pipeline_config_str is not None:
assert pipeline_config_raw is None
assert pipeline_config is None
pipeline_config = AnnotationConfigParser.parse_str(
pipeline_config_str, grr=grr_repository)
elif pipeline_config_raw is not None:
assert pipeline_config is None
pipeline_config = AnnotationConfigParser.parse_raw(
pipeline_config_raw, grr=grr_repository)
assert pipeline_config is not None
pipeline = AnnotationPipeline(grr_repository)
for annotator_config in pipeline_config:
try:
builder = get_annotator_factory(annotator_config.type)
annotator = builder(pipeline, annotator_config)
annotator = InputAnnotableAnnotatorDecorator.decorate(annotator)
annotator = ValueTransformAnnotatorDecorator.decorate(annotator)
check_for_unused_parameters(annotator_config)
check_for_repeated_attributes_in_annotator(annotator_config)
pipeline.add_annotator(annotator)
except ValueError as value_error:
raise AnnotationConfigurationError(
f"The {annotator_config.annotator_id} annotator"
f" configuration is incorrect: ",
value_error) from value_error
check_for_repeated_attributes_in_pipeline(
pipeline, allow_repeated_attributes,
)
return pipeline
[docs]def copy_annotation_pipeline(
pipeline: AnnotationPipeline,
) -> AnnotationPipeline:
"""Copy an annotation pipeline instance."""
infos = []
for annotator in pipeline.annotators:
src = annotator.get_info()
attributes = []
for src_attr in src.attributes:
attributes.append(AttributeInfo(
src_attr.name,
src_attr.source,
src_attr.internal,
src_attr.parameters._data, # pylint: disable=W0212
src_attr.type,
src_attr.description,
src_attr.documentation,
))
infos.append(AnnotatorInfo(
src.type,
attributes,
src.parameters._data, # pylint: disable=W0212
"",
None,
src.annotator_id,
))
return build_annotation_pipeline(
pipeline_config=infos,
grr_repository=pipeline.repository,
)
[docs]def copy_reannotation_pipeline(
pipeline: ReannotationPipeline,
) -> ReannotationPipeline:
"""Copy a reannotation pipeline instance."""
return ReannotationPipeline(
copy_annotation_pipeline(pipeline.pipeline_new),
copy_annotation_pipeline(pipeline.pipeline_old),
)
[docs]def check_for_repeated_attributes_in_annotator(
annotator_config: AnnotatorInfo,
) -> None:
"""Check for repeated attributes in annotator configuration."""
annotator_names_list = [att.name for att in annotator_config.attributes]
annotator_names_set = set(annotator_names_list)
if len(annotator_names_set) < len(annotator_names_list):
repeated_annotator_names = ",".join(sorted(
[att for att, cnt in Counter(annotator_names_list).items()
if cnt > 1]))
raise ValueError("The annotator has repeated attributes: "
f"{repeated_annotator_names}")
[docs]def check_for_repeated_attributes_in_pipeline(
pipeline: AnnotationPipeline, allow_repeated_attributes: bool = False,
) -> None:
"""Check for repeated attributes in pipeline configuration."""
pipeline_names_set = Counter(att.name for att in pipeline.get_attributes())
repeated_attributes = {
att for att, cnt in Counter(pipeline_names_set).items() if cnt > 1
}
if not repeated_attributes:
return
if allow_repeated_attributes:
resolve_repeated_attributes(pipeline, repeated_attributes)
return
overlaps: dict[str, list[str]] = {}
# reversed so that it follows the order of the pipeline config
for annotator in reversed(pipeline.annotators):
annotator_id = annotator.get_info().annotator_id
for attr in annotator.attributes:
if attr.name in repeated_attributes:
overlaps.setdefault(attr.name, []).append(annotator_id)
raise AnnotationConfigurationError(
f"Repeated attributes in pipeline were found - {overlaps}",
)
[docs]def resolve_repeated_attributes(
pipeline: AnnotationPipeline, repeated_attributes: set[str],
) -> None:
"""Resolve repeated attributes in pipeline configuration via renaming."""
for rep in repeated_attributes:
for annotator in pipeline.annotators:
for attribute in annotator.attributes:
if attribute.name == rep:
attribute.name = \
f"{attribute.name}_{annotator.get_info().annotator_id}"
[docs]def check_for_unused_parameters(info: AnnotatorInfo) -> None:
"""Check annotator configuration for unused parameters."""
unused_annotator_parameters = info.parameters.get_unused_keys()
if unused_annotator_parameters:
raise ValueError("The are unused annotator parameters: "
f"{unused_annotator_parameters}")
for att in info.attributes:
unused_params = att.parameters.get_unused_keys()
if unused_params:
raise ValueError("There are unused annotator attribute "
f"parameters: {','.join(sorted(unused_params))}")