"""Defines GPFInstance class that gives access to different parts of GPF."""
# pylint: disable=import-outside-toplevel
from __future__ import annotations
import logging
import os
from functools import cached_property
from pathlib import Path
from typing import Any, Optional, Union, cast
import yaml
from box import Box
from dae.annotation.annotation_factory import build_annotation_pipeline
from dae.annotation.annotation_pipeline import AnnotationPipeline
from dae.common_reports.common_report import CommonReport
from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.configuration.schemas.dae_conf import dae_conf_schema
from dae.configuration.schemas.gene_profile import gene_profiles_config
from dae.gene.denovo_gene_sets_db import DenovoGeneSetsDb
from dae.gene.gene_sets_db import (
GeneSet,
GeneSetsDb,
build_gene_set_collection_from_resource,
)
from dae.gene_profile.db import GeneProfileDB
from dae.gene_profile.statistic import GPStatistic
from dae.gene_scores.gene_scores import GeneScore
from dae.gene_scores.gene_scores import ScoreDesc as GeneScoreDesc
from dae.genomic_resources.gene_models import GeneModels, TranscriptModel
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.genomic_resources.repository import GenomicResourceRepo
from dae.genomic_scores.scores import GenomicScoresRegistry
from dae.pheno.pheno_data import PhenotypeData, get_pheno_db_dir
from dae.pheno.registry import PhenoRegistry
from dae.studies.study import GenotypeData
from dae.studies.variants_db import VariantsDb
from dae.utils.fs_utils import find_directory_with_a_file
logger = logging.getLogger(__name__)
[docs]class GPFInstance:
"""Class to access different parts of a GPF instance."""
# pylint: disable=too-many-public-methods
@staticmethod
def _build_gpf_config(
config_filename: Optional[Union[str, Path]] = None,
) -> tuple[Box, Path]:
dae_dir: Optional[Path]
if config_filename is not None:
config_filename = Path(config_filename)
dae_dir = config_filename.parent
else:
if os.environ.get("DAE_DB_DIR"):
dae_dir = Path(os.environ["DAE_DB_DIR"])
config_filename = Path(dae_dir) / "gpf_instance.yaml"
else:
dae_dir = find_directory_with_a_file("gpf_instance.yaml")
if dae_dir is None:
raise ValueError("unable to locate GPF instance directory")
config_filename = dae_dir / "gpf_instance.yaml"
assert config_filename is not None
if not config_filename.exists():
raise ValueError(
f"GPF instance config <{config_filename}> does not exists")
dae_config = GPFConfigParser.load_config(
str(config_filename), dae_conf_schema)
return dae_config, dae_dir
[docs] @staticmethod
def build(
config_filename: Optional[Union[str, Path]] = None,
**kwargs: Any) -> GPFInstance:
"""Construct and return a GPF instance.
If the config_filename is None, tries to discover the GPF instance.
First check if a DAE_DB_DIR environment variable is defined and if
defined use it as a GPF instance directory.
Otherwise look for a gpf_instance.yaml file in the current directory
and its parents. If found use it as a configuration file.
"""
dae_config, dae_dir = GPFInstance._build_gpf_config(config_filename)
return GPFInstance(dae_config, dae_dir, **kwargs)
def __init__(
self,
dae_config: Box,
dae_dir: Union[str, Path],
**kwargs: dict[str, Any]):
assert dae_dir is not None
self.dae_config = dae_config
self.dae_dir = str(dae_dir)
self.instance_id = self.dae_config.get("instance_id")
assert self.instance_id is not None, "No instance ID provided."
self._grr = cast(GenomicResourceRepo, kwargs.get("grr"))
self._reference_genome = cast(
ReferenceGenome, kwargs.get("reference_genome"),
)
self._gene_models = cast(
GeneModels,
kwargs.get("gene_models"),
)
self._annotation_pipeline: Optional[AnnotationPipeline] = None
[docs] def load(self) -> GPFInstance:
"""Load all GPF instance attributes."""
# pylint: disable=pointless-statement
self.reference_genome
self.gene_models
self.gene_sets_db
self._pheno_registry
self._variants_db
self.denovo_gene_sets_db
self.genomic_scores
self.genotype_storages
return self
@cached_property
def grr(self) -> GenomicResourceRepo:
"""Return genomic resource repository configured for GPF instance."""
if self._grr is not None:
return self._grr
# pylint: disable=import-outside-toplevel
from dae.genomic_resources import build_genomic_resource_repository
if self.dae_config.grr:
self._grr = build_genomic_resource_repository(
self.dae_config.grr.to_dict())
return self._grr
self._grr = build_genomic_resource_repository()
return self._grr
@cached_property
def reference_genome(self) -> ReferenceGenome:
"""Return reference genome defined in the GPFInstance config."""
if self._reference_genome is not None:
return self._reference_genome
# pylint: disable=import-outside-toplevel
from dae.genomic_resources.reference_genome import (
build_reference_genome_from_resource,
)
resource = self.grr.get_resource(
self.dae_config.reference_genome.resource_id)
result = build_reference_genome_from_resource(resource)
result.open()
return result
@cached_property
def gene_models(self) -> GeneModels:
"""Return gene models used in the GPF instance."""
if self._gene_models is not None:
return self._gene_models
# pylint: disable=import-outside-toplevel
from dae.genomic_resources.gene_models import (
build_gene_models_from_resource,
)
resource = self.grr.get_resource(
self.dae_config.gene_models.resource_id)
assert resource is not None, \
self.dae_config.gene_models.resource_id
result = build_gene_models_from_resource(resource)
result.load()
return result
[docs] def get_transcript_models(
self, gene_symbol: str,
) -> tuple[Optional[str], Optional[list[TranscriptModel]]]:
"""Get gene model by gene symbol."""
gene_symbol = gene_symbol.lower()
gene_models = self.gene_models.gene_models
for k, v in gene_models.items():
if gene_symbol == k.lower():
return k, v
return None, None
@cached_property
def _pheno_registry(self) -> PhenoRegistry:
pheno_data_dir = get_pheno_db_dir(self.dae_config)
registry = PhenoRegistry()
logger.info("pheno registry created: %s", id(registry))
pheno_configs = GPFConfigParser.collect_directory_configs(
pheno_data_dir,
)
with PhenoRegistry.CACHE_LOCK:
for config in pheno_configs:
logger.info("loading phenotype data from config: %s", config)
registry.register_phenotype_data(
PhenoRegistry.load_pheno_data(Path(config)),
lock=False,
)
return registry
@cached_property
def gene_scores_db(self) -> Any:
"""Load and return gene scores db."""
from dae.gene_scores.gene_scores import (
GeneScoresDb,
build_gene_score_from_resource,
)
if self.dae_config.gene_scores_db is None:
return GeneScoresDb([])
gene_scores = self.dae_config.gene_scores_db.gene_scores
collections = []
for score in gene_scores:
resource = self.grr.get_resource(score)
if resource is None:
logger.error("unable to find gene score: %s", score)
continue
collections.append(build_gene_score_from_resource(resource))
return GeneScoresDb(collections)
@cached_property
def genomic_scores(self) -> GenomicScoresRegistry:
"""Load and return genomic scores db."""
pipeline = self.get_annotation_pipeline()
return GenomicScoresRegistry.build_genomic_scores_registry(pipeline)
@cached_property
def genotype_storages(self) -> Any:
"""Construct and return genotype storage registry."""
# pylint: disable=import-outside-toplevel
from dae.genotype_storage.genotype_storage_registry import (
GenotypeStorageRegistry,
)
registry = GenotypeStorageRegistry()
internal_storage = registry.register_storage_config({
"id": "internal",
"storage_type": "inmemory",
"dir": os.path.join(self.dae_dir, "internal_storage"),
})
registry.register_default_storage(internal_storage)
if self.dae_config.genotype_storage:
registry.register_storages_configs(
self.dae_config.genotype_storage)
return registry
@cached_property
def _variants_db(self) -> VariantsDb:
return VariantsDb(
self.dae_config,
self.reference_genome,
self.gene_models,
self.genotype_storages,
)
@cached_property
def _gene_profile_db(self) -> GeneProfileDB:
config = None if self._gene_profile_config is None else\
self._gene_profile_config.to_dict()
gpdb = GeneProfileDB(
config,
os.path.join(self.dae_dir, "gpdb"),
)
return gpdb
[docs] def reload(self) -> None:
"""Reload GPF instance studies, de Novo gene sets, etc."""
self._variants_db.reload()
self.denovo_gene_sets_db.reload()
@cached_property
def _gene_profile_config(self) -> Optional[Box]:
gp_config = self.dae_config.gene_profiles_config
config_filename = None
if gp_config is None:
config_filename = os.path.join(
self.dae_dir, "geneProfiles.yaml")
if not os.path.exists(config_filename):
return None
else:
if not os.path.exists(gp_config.conf_file):
return None
config_filename = gp_config.conf_file
assert config_filename is not None
return GPFConfigParser.load_config(
config_filename,
gene_profiles_config,
)
@cached_property
def gene_sets_db(self) -> GeneSetsDb:
"""Return GeneSetsDb populated with gene sets from the GPFInstance."""
logger.debug("creating new instance of GeneSetsDb")
if "gene_sets_db" in self.dae_config:
gsc_ids = self.dae_config.gene_sets_db.gene_set_collections
gscs = []
for gsc_id in gsc_ids:
resource = self.grr.get_resource(gsc_id)
if resource is None:
logger.error("can't find resource %s", gsc_id)
continue
gscs.append(
build_gene_set_collection_from_resource(resource),
)
return GeneSetsDb(gscs)
logger.debug("No gene sets DB configured")
return GeneSetsDb([])
@cached_property
def denovo_gene_sets_db(self) -> DenovoGeneSetsDb:
return DenovoGeneSetsDb(self)
[docs] def get_genotype_data_ids(self, local_only: bool = False) -> list[str]:
# pylint: disable=unused-argument
return cast(list[str], (
self._variants_db.get_all_genotype_study_ids()
+ self._variants_db.get_all_genotype_group_ids()
))
[docs] def get_genotype_data(self, genotype_data_id: str) -> GenotypeData:
genotype_data_study = self._variants_db.get_genotype_study(
genotype_data_id)
if genotype_data_study:
return genotype_data_study
return cast(
GenotypeData,
self._variants_db.get_genotype_group(genotype_data_id),
)
[docs] def get_all_genotype_data(self) -> list[GenotypeData]:
genotype_studies = self._variants_db.get_all_genotype_studies()
genotype_data_groups = self._variants_db.get_all_genotype_groups()
return cast(
list[GenotypeData], genotype_studies + genotype_data_groups,
)
[docs] def get_genotype_data_config(self, genotype_data_id: str) -> Optional[Box]:
config = self._variants_db.get_genotype_study_config(genotype_data_id)
if config is not None:
return config
return cast(Box, self._variants_db.get_genotype_group_config(
genotype_data_id,
))
[docs] def register_genotype_data(self, genotype_data: GenotypeData) -> None:
self._variants_db.register_genotype_data(genotype_data)
[docs] def unregister_genotype_data(self, genotype_data: GenotypeData) -> None:
self._variants_db.unregister_genotype_data(genotype_data)
# Phenotype data
[docs] def get_phenotype_data_ids(self) -> list[str]:
return self._pheno_registry.get_phenotype_data_ids()
[docs] def has_phenotype_data(
self, phenotype_data_id: str,
) -> bool:
return self._pheno_registry.has_phenotype_data(phenotype_data_id)
[docs] def get_phenotype_data(
self, phenotype_data_id: str,
) -> PhenotypeData:
return self._pheno_registry.get_phenotype_data(phenotype_data_id)
[docs] def get_all_phenotype_data(self) -> list[PhenotypeData]:
return self._pheno_registry.get_all_phenotype_data()
[docs] def get_phenotype_data_config(
self, phenotype_data_id: str,
) -> Optional[Box]:
return self._pheno_registry.get_phenotype_data_config(
phenotype_data_id)
# Gene scores
[docs] def has_gene_score(self, gene_score_id: str) -> bool:
return gene_score_id in self.gene_scores_db
[docs] def get_gene_score(self, gene_score_id: str) -> GeneScore:
return cast(
GeneScore, self.gene_scores_db.get_gene_score(gene_score_id),
)
[docs] def get_gene_score_desc(self, score_id: str) -> GeneScoreDesc:
return cast(
GeneScoreDesc, self.gene_scores_db.get_score_desc(score_id),
)
[docs] def get_all_gene_scores(self) -> list[GeneScore]:
return cast(list[GeneScore], self.gene_scores_db.get_gene_scores())
[docs] def get_all_gene_score_descs(self) -> list[GeneScoreDesc]:
return cast(list[GeneScoreDesc], self.gene_scores_db.get_scores())
# Common reports
[docs] def get_common_report(self, study_id: str) -> Optional[CommonReport]:
"""Load and return common report (dataset statistics) for a study."""
study = self.get_genotype_data(study_id)
if study is None or study.is_remote:
return None
if not study.config.common_report.enabled:
return None
report = CommonReport.load(
study.config.common_report.file_path)
if report is None:
report = CommonReport.build_and_save(study)
return report
[docs] def get_all_common_report_configs(self) -> list[Box]:
"""Return all common report configuration."""
configs = []
local_ids = self.get_genotype_data_ids(True)
for gd_id in local_ids:
config = self.get_genotype_data_config(gd_id)
if config is not None and config.common_report is not None:
configs.append(config.common_report)
return configs
# Gene sets
[docs] def get_gene_sets_collections(self) -> list[dict[str, Any]]:
return self.gene_sets_db.collections_descriptions
[docs] def has_gene_set_collection(self, gsc_id: str) -> bool:
return self.gene_sets_db.has_gene_set_collection(gsc_id)
[docs] def get_all_gene_sets(self, collection_id: str) -> list[GeneSet]:
return self.gene_sets_db.get_all_gene_sets(collection_id)
[docs] def get_gene_set(self, collection_id: str, gene_set_id: str) -> GeneSet:
return cast(
GeneSet,
self.gene_sets_db.get_gene_set(collection_id, gene_set_id),
)
[docs] def get_denovo_gene_sets(
self, datasets: list[GenotypeData],
) -> list[dict[str, Any]]:
return cast(
list[dict[str, Any]],
self.denovo_gene_sets_db.get_gene_set_descriptions(datasets),
)
[docs] def has_denovo_gene_sets(self) -> bool:
return len(self.denovo_gene_sets_db) > 0
[docs] def get_all_denovo_gene_sets(
self, types: dict[str, Any],
datasets: list[Any],
collection_id: str, # pylint: disable=unused-argument
) -> list[dict[str, Any]]:
return cast(
list[dict[str, Any]],
self.denovo_gene_sets_db.get_all_gene_sets(types, datasets),
)
[docs] def get_denovo_gene_set(
self, gene_set_id: str,
types: dict[str, Any],
datasets: list[GenotypeData],
collection_id: str, # pylint: disable=unused-argument
) -> dict[str, Any]:
return cast(
dict[str, Any],
self.denovo_gene_sets_db.get_gene_set(
gene_set_id, types, datasets,
),
)
# Variants DB
[docs] def get_dataset(self, dataset_id: str) -> GenotypeData:
return cast(GenotypeData, self._variants_db.get(dataset_id))
# GP
[docs] def get_gp_configuration(self) -> Box:
return cast(Box, self._gene_profile_db.configuration)
[docs] def get_gp_statistic(self, gene_symbol: str) -> GPStatistic:
return cast(
GPStatistic, self._gene_profile_db.get_gp(gene_symbol),
)
[docs] def query_gp_statistics(
self,
page: int,
symbol_like: Optional[str] = None,
sort_by: Optional[str] = None,
order: Optional[str] = None,
) -> list[GPStatistic]:
"""Query AGR statistics and return results."""
rows = self._gene_profile_db.query_gps(
page, symbol_like, sort_by, order,
)
statistics = list(map(
self._gene_profile_db.gp_from_table_row,
rows,
))
return cast(list[GPStatistic], statistics)
def _construct_import_effect_annotator_config(
self,
) -> dict[str, Any]:
"""Construct import effect annotator."""
genome = self.reference_genome
gene_models = self.gene_models
config = {
"effect_annotator": {
"genome": genome.resource_id,
"gene_models": gene_models.resource_id,
"attributes": [
{
"source": "allele_effects",
"name": "allele_effects",
"internal": True,
},
"worst_effect",
"gene_effects",
"effect_details",
],
},
}
return config
[docs] def get_annotation_pipeline_config(self) -> list[dict[str, Any]]:
"""Return the annotation pipeline config."""
pipeline_config = []
if self.dae_config.annotation is not None:
config_filename = self.dae_config.annotation.conf_file
if not os.path.exists(config_filename):
raise ValueError(
f"annotation config file not found: {config_filename}")
with open(config_filename, "rt", encoding="utf8") as infile:
pipeline_config = yaml.safe_load(infile.read())
pipeline_config.insert(
0, self._construct_import_effect_annotator_config())
return pipeline_config
[docs] def get_annotation_pipeline(self) -> AnnotationPipeline:
"""Return the annotation pipeline configured in the GPF instance."""
if self._annotation_pipeline is None:
pipeline_config = self.get_annotation_pipeline_config()
pipeline = build_annotation_pipeline(
pipeline_config_raw=pipeline_config,
grr_repository=self.grr)
self._annotation_pipeline = pipeline
return self._annotation_pipeline