"""Classes to represent genotype data."""
from __future__ import annotations
import functools
import gzip
import json
import logging
import os
import time
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable
from contextlib import closing
from os.path import basename, exists
from typing import Any, Optional, Union, cast
from box import Box
from dae.effect_annotation.effect import expand_effect_types
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.loader import FamiliesLoader
from dae.person_sets import PersonSetCollection
from dae.query_variants.query_runners import QueryResult
from dae.utils.regions import Region
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import SummaryVariant
logger = logging.getLogger(__name__)
# FIXME: Too many public methods, refactor?
[docs]class GenotypeData(ABC): # pylint: disable=too-many-public-methods
"""Abstract base class for genotype data."""
def __init__(self, config: Box, studies: list[GenotypeData]):
self.config = config
self.studies = studies
self._description: Optional[str] = None
self._person_set_collections: dict[str, PersonSetCollection] = {}
self._parents: set[str] = set()
self._executor = None
self.is_remote = False
self.config_dir: str = self.config["work_dir"]
[docs] def close(self) -> None:
logger.error("base genotype data close() called for %s", self.study_id)
@property
def study_id(self) -> str:
return cast(str, self.config["id"])
@property
def name(self) -> str:
name = self.config.get("name")
if name:
return cast(str, name)
return self.study_id
@property
def description(self) -> Optional[str]:
"""Load and return description of a genotype data."""
if self._description is None:
if basename(self.config.description_file) != "description.md" and \
not exists(self.config.description_file):
# If a non-default description file was given, assert it exists
raise ValueError(
f"missing description file {self.config.description_file}")
if exists(self.config.description_file):
# the default description file may not yet exist
with open(
self.config.description_file,
mode="rt", encoding="utf8") as infile:
self._description = infile.read()
return self._description
@description.setter
def description(self, input_text: str) -> None:
self._description = None
with open(
self.config.description_file,
mode="wt", encoding="utf8") as outfile:
outfile.write(input_text)
@property
def year(self) -> Optional[str]:
return cast(str, self.config.get("year"))
@property
def pub_med(self) -> Optional[str]:
return cast(str, self.config.get("pub_med"))
@property
def phenotype(self) -> Optional[str]:
return cast(str, self.config.get("phenotype"))
@property
def has_denovo(self) -> bool:
return cast(bool, self.config.get("has_denovo"))
@property
def has_transmitted(self) -> bool:
return cast(bool, self.config.get("has_transmitted"))
@property
def has_cnv(self) -> bool:
return cast(bool, self.config.get("has_cnv"))
@property
def has_complex(self) -> bool:
return cast(bool, self.config.get("has_complex"))
@property
def study_type(self) -> str:
return cast(str, self.config.get("study_type"))
@property
def parents(self) -> set[str]:
return self._parents
@property
def person_set_collections(self) -> dict[str, PersonSetCollection]:
return self._person_set_collections
def _add_parent(self, genotype_data_id: str) -> None:
self._parents.add(genotype_data_id)
@property
@abstractmethod
def is_group(self) -> bool:
return False
[docs] @abstractmethod
def get_studies_ids(self, leaves: bool = True) -> list[str]:
pass
[docs] def get_leaf_children(self) -> list[GenotypeDataStudy]:
"""Return list of genotype studies children of this group."""
if not self.is_group:
return []
result = []
seen = set()
for study in self.studies:
if not study.is_group:
if study.study_id in seen:
continue
result.append(cast(GenotypeDataStudy, study))
seen.add(study.study_id)
else:
children = study.get_leaf_children()
for child in children:
if child.study_id in seen:
continue
result.append(child)
seen.add(child.study_id)
return result
def _get_query_leaf_studies(
self, study_filters: Optional[Iterable[str]],
) -> list[GenotypeDataStudy]:
leafs = []
logger.debug("find leaf studies started...")
start = time.time()
if self.is_group:
leafs = self.get_leaf_children()
else:
leafs = [cast(GenotypeDataStudy, self)]
elapsed = time.time() - start
logger.debug("leaf studies found in %.2f sec", elapsed)
logger.debug("leaf children: %s", [st.study_id for st in leafs])
logger.debug("study_filters: %s", study_filters)
if study_filters is not None:
leafs = [st for st in leafs if st.study_id in study_filters]
logger.debug("studies to query: %s", [st.study_id for st in leafs])
return leafs
# FIXME: Too many locals, too many arguments, complex. To refactor
[docs] def query_result_variants(
self,
regions: Optional[list[Region]] = None,
genes: Optional[list[str]] = None,
effect_types: Optional[list[str]] = None,
family_ids: Optional[Iterable[str]] = None,
person_ids: Optional[Iterable[str]] = None,
person_set_collection: Optional[tuple[str, list[str]]] = None,
inheritance: Optional[Union[str, list[str]]] = None,
roles: Optional[str] = None,
sexes: Optional[str] = None,
variant_type: Optional[str] = None,
real_attr_filter: Optional[list[tuple]] = None,
ultra_rare: Optional[bool] = None,
frequency_filter: Optional[list[tuple]] = None,
return_reference: Optional[bool] = None,
return_unknown: Optional[bool] = None,
limit: Optional[int] = None,
study_filters: Optional[Iterable[str]] = None,
pedigree_fields: Optional[list[str]] = None,
**_kwargs: Any,
) -> Optional[QueryResult]:
"""Build a query result."""
# flake8: noqa: C901
# pylint: disable=too-many-locals,too-many-arguments
del pedigree_fields # Unused argument
psc_query = person_set_collection
if person_ids is not None and not person_ids:
return None
if effect_types:
effect_types = expand_effect_types(effect_types)
def adapt_study_variants(
study_name: str,
study_phenotype: str,
v: FamilyVariant,
) -> FamilyVariant:
if v is None:
return None
for allele in v.alleles:
if allele.get_attribute("study_name") is None:
allele.update_attributes(
{"study_name": study_name})
if allele.get_attribute("study_phenotype") is None:
allele.update_attributes(
{"study_phenotype": study_phenotype})
return v
runners = []
logger.debug("query leaf studies...")
for genotype_study in self._get_query_leaf_studies(study_filters):
person_sets_query = None
query_person_ids = set(person_ids) \
if person_ids is not None else None
if psc_query is not None:
psc_id, selected_person_set_ids = psc_query
if selected_person_set_ids is not None:
psc = genotype_study.get_person_set_collection(psc_id)
# pylint: disable=no-member,protected-access
person_sets_query = genotype_study\
._backend\
.build_person_set_collection_query(psc, psc_query)
if person_sets_query == ([], []):
continue
if person_sets_query is None:
query_person_ids = genotype_study\
._transform_person_set_collection_query(
psc_query, person_ids)
if query_person_ids is not None and len(query_person_ids) == 0:
logger.debug(
"Study %s can't match any person to filter %s... "
"skipping",
genotype_study.study_id,
psc_query)
continue
# pylint: disable=no-member,protected-access
runner = genotype_study._backend\
.build_family_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=query_person_ids,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
pedigree_fields=person_sets_query)
if runner is None:
logger.debug(
"study %s has no varants... skipping",
genotype_study.study_id)
continue
runner.study_id = genotype_study.study_id
logger.debug("runner created")
study_name = genotype_study.name
study_phenotype = genotype_study.study_phenotype
runner.adapt(functools.partial(
adapt_study_variants, study_name, study_phenotype))
runners.append(runner)
logger.debug("runners: %s", len(runners))
if len(runners) == 0:
return None
return QueryResult(runners)
[docs] def query_variants( # pylint: disable=too-many-locals,too-many-arguments
self,
regions: Optional[list[Region]] = None,
genes: Optional[list[str]] = None,
effect_types: Optional[list[str]] = None,
family_ids: Optional[Iterable[str]] = None,
person_ids: Optional[Iterable[str]] = None,
person_set_collection: Optional[tuple[str, list[str]]] = None,
inheritance: Optional[Union[str, list[str]]] = None,
roles: Optional[str] = None,
sexes: Optional[str] = None,
variant_type: Optional[str] = None,
real_attr_filter: Optional[list[tuple]] = None,
ultra_rare: Optional[bool] = None,
frequency_filter: Optional[list[tuple]] = None,
return_reference: Optional[bool] = None,
return_unknown: Optional[bool] = None,
limit: Optional[int] = None,
study_filters: Optional[Iterable[str]] = None,
pedigree_fields: Optional[list[str]] = None,
unique_family_variants: bool = True,
**kwargs: Any,
) -> Generator[FamilyVariant, None, None]:
"""Query and return generator containing variants."""
result = self.query_result_variants(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=person_ids,
person_set_collection=person_set_collection,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
study_filters=study_filters,
pedigree_fields=pedigree_fields,
**kwargs,
)
if result is None:
return
started = time.time()
try:
result.start()
with closing(result) as result:
seen = set()
for v in result:
if v is None:
continue
if unique_family_variants and v.fvuid in seen:
continue
seen.add(v.fvuid)
yield v
if limit and len(seen) >= limit:
break
except GeneratorExit:
logger.info("generator closed")
except Exception: # pylint: disable=broad-except
logger.exception("unexpected exception:", exc_info=True)
finally:
elapsed = time.time() - started
logger.info(
"processing study %s elapsed: %.3f", self.study_id, elapsed)
logger.debug("[DONE] executor closed...")
# FIXME: Too many locals, too many arguments, To refactor
[docs] def query_result_summary_variants(
self,
regions: Optional[list[Region]] = None,
genes: Optional[list[str]] = None,
effect_types: Optional[list[str]] = None,
variant_type: Optional[str] = None,
real_attr_filter: Optional[list[tuple]] = None,
ultra_rare: Optional[bool] = None,
frequency_filter: Optional[list[tuple]] = None,
return_reference: Optional[bool] = None,
return_unknown: Optional[bool] = None,
limit: Optional[int] = None,
study_filters: Optional[list[str]] = None,
**kwargs: Any,
) -> Optional[QueryResult]:
# pylint: disable=too-many-locals,too-many-arguments,unused-argument
"""Build a query result for summary variants only."""
logger.info("summary query - study_filters: %s", study_filters)
logger.info(
"study %s children: %s", self.study_id, self.get_leaf_children())
# person_ids = self._transform_person_set_collection_query(
# person_set_collection, person_ids)
# if person_ids is not None and len(person_ids) == 0:
# return None
if effect_types:
effect_types = expand_effect_types(effect_types)
runners = []
for genotype_study in self._get_query_leaf_studies(study_filters):
# pylint: disable=no-member,protected-access
runner = genotype_study._backend \
.build_summary_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit)
runner.study_id = genotype_study.study_id
runners.append(runner)
if len(runners) == 0:
return None
result = QueryResult(runners)
return result
[docs] def query_summary_variants(
self,
regions: Optional[list[Region]] = None,
genes: Optional[list[str]] = None,
effect_types: Optional[list[str]] = None,
variant_type: Optional[str] = None,
real_attr_filter: Optional[list[tuple]] = None,
ultra_rare: Optional[bool] = None,
frequency_filter: Optional[list[tuple]] = None,
return_reference: Optional[bool] = None,
return_unknown: Optional[bool] = None,
limit: Optional[int] = None,
study_filters: Optional[list[str]] = None,
**kwargs: Any,
) -> Generator[SummaryVariant, None, None]:
"""Query and return generator containing summary variants."""
# pylint: disable=too-many-locals,too-many-arguments
result = self.query_result_summary_variants(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
study_filters=study_filters,
**kwargs)
try:
if result is None:
return
started = time.time()
variants: dict[str, Any] = {}
with closing(result) as result:
result.start()
for v in result:
if v is None:
continue
if v.svuid in variants:
existing = variants[v.svuid]
fv_count = existing.get_attribute(
"family_variants_count")[0]
if fv_count is None:
continue
fv_count += v.get_attribute("family_variants_count")[0]
seen_in_status = existing.get_attribute(
"seen_in_status")[0]
seen_in_status = \
seen_in_status | \
v.get_attribute("seen_in_status")[0]
seen_as_denovo = existing.get_attribute(
"seen_as_denovo")[0]
seen_as_denovo = \
seen_as_denovo or \
v.get_attribute("seen_as_denovo")[0]
new_attributes = {
"family_variants_count": [fv_count],
"seen_in_status": [seen_in_status],
"seen_as_denovo": [seen_as_denovo],
}
v.update_attributes(new_attributes)
variants[v.svuid] = v
if limit and len(variants) >= limit:
return
elapsed = time.time() - started
logger.info(
"processing study %s elapsed: %.3f",
self.study_id, elapsed)
for v in variants.values():
yield v
finally:
logger.debug("[DONE] executor closed...")
@property
@abstractmethod
def families(self) -> FamiliesData:
pass
@abstractmethod
def _build_person_set_collection(
self, psc_config: dict[str, Any],
families: FamiliesData,
) -> PersonSetCollection:
pass
def _build_person_set_collections(
self, pscs_config: Optional[dict[str, Any]],
families: FamiliesData,
) -> dict[str, PersonSetCollection]:
if pscs_config is None:
return {}
selected_collections = pscs_config["selected_person_set_collections"]
result = {}
for psc_id in selected_collections:
psc_config = pscs_config[psc_id]
psc = self._build_person_set_collection(
psc_config, families)
result[psc_id] = psc
return result
def _transform_person_set_collection_query(
self, collection_query: tuple[str, list[str]],
person_ids: Optional[Iterable[str]],
) -> Optional[set[str]]:
assert collection_query is not None
collection_id, selected_sets = collection_query
collection = self.get_person_set_collection(collection_id)
if collection is None or selected_sets is None:
return set(person_ids) if person_ids is not None else None
person_set_ids = set(collection.person_sets.keys())
selected_fpids: set[tuple[str, str]] = set()
if set(selected_sets) == person_set_ids:
return set(person_ids) \
if person_ids is not None else None
for set_id in (set(selected_sets) & person_set_ids):
selected_fpids.update(
collection.person_sets[set_id].persons.keys(),
)
selected_person_ids: set[str] = set(
fpid[1] for fpid in selected_fpids)
if person_ids is not None:
return set(person_ids) & selected_person_ids
return selected_person_ids
[docs] def get_person_set_collection(
self, person_set_collection_id: str,
) -> Optional[PersonSetCollection]:
if person_set_collection_id is None:
return None
return self.person_set_collections[person_set_collection_id]
[docs]class GenotypeDataGroup(GenotypeData):
"""
Represents a group of genotype data classes.
Queries to this object will be sent to all child data.
"""
def __init__(
self, config: Box,
studies: Iterable[GenotypeData],
):
super().__init__(
config, list(studies),
)
self._families: FamiliesData
if not self.has_families_cache():
self.rebuild_families()
self._executor = None
self.is_remote = False
for study in self.studies:
study._add_parent(self.study_id)
@property
def is_group(self) -> bool:
return True
@property
def families(self) -> FamiliesData:
return self._families
[docs] def get_studies_ids(self, leaves: bool = True) -> list[str]:
result = set([self.study_id])
if not leaves:
result = result.union([st.study_id for st in self.studies])
return list(result)
for study in self.studies:
result = result.union(study.get_studies_ids())
return list(result)
def _has_cached_families_data(self) -> bool:
"""Load families data cache if exists."""
cache_path = None
if "conf_dir" in self.config:
cache_path = os.path.join(
self.config["conf_dir"], "families_cache.ped.gz",
)
if cache_path is None or not os.path.exists(cache_path):
return False
return True
def _load_cached_families_data(
self, cache_dir: Optional[str] = None,
) -> Optional[FamiliesData]:
"""Load families data cache if exists."""
cache_dir = self._ensure_cache_dir(cache_dir)
cache_path = os.path.join(
cache_dir, "families_cache.ped.gz",
)
if cache_path is not None and os.path.exists(cache_path):
try:
result = FamiliesLoader.load_pedigree_file(cache_path)
return result
except BaseException: # pylint: disable=broad-except
logger.error(
"Couldn't load families cache for %s", self.study_id,
)
return None
def _ensure_cache_dir(self, cache_dir: Optional[str] = None) -> str:
if cache_dir is None and "conf_dir" not in self.config:
raise ValueError("genotype study group cache dir not specified")
if cache_dir is None:
cache_dir = self.config["conf_dir"]
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
if not os.path.isdir(cache_dir):
raise ValueError(f"cache dir {cache_dir} is not a directory")
return cache_dir
def _save_cached_families_data(
self, cache_dir: Optional[str] = None,
) -> bool:
"""Store genotype data group families data into cache."""
cache_dir = self._ensure_cache_dir(cache_dir)
cache_path = os.path.join(cache_dir, "families_cache.ped.gz")
try:
FamiliesLoader.save_families(self.families, cache_path)
return True
except BaseException: # pylint: disable=broad-except
logger.error(
"Failed to cache families for %s", self.study_id,
exc_info=True,
)
return False
def _save_cached_person_sets(
self, cache_dir: Optional[str] = None,
) -> bool:
"""Save cached person set collections defined for a genotype group."""
cache_dir = self._ensure_cache_dir(cache_dir)
try:
for psc in self._person_set_collections.values():
cache_path = os.path.join(
cache_dir,
f"person_set_{psc.id}_cache.json.gz")
with gzip.open(cache_path, "wt") as outfile:
json.dump(psc.to_json(), outfile)
return True
except BaseException: # pylint: disable=broad-except
logger.error(
"Failed to cache person sets for study %s", self.study_id,
exc_info=True,
)
return False
def _has_cached_person_sets(self, cache_dir: Optional[str] = None) -> bool:
"""Save cached person set collections defined for a genotype group."""
cache_dir = self._ensure_cache_dir(cache_dir)
pscs_config = self.config.get("person_set_collections")
if pscs_config is None:
return False
selected_pscs = pscs_config["selected_person_set_collections"]
for psc_id in selected_pscs:
cache_path = os.path.join(
cache_dir,
f"person_set_{psc_id}_cache.json.gz")
if not os.path.exists(cache_path):
logger.info(
"unable to load person sets collection <%s> cache "
"for study %s; missing file %s",
psc_id, self.study_id, cache_path)
return False
return True
def _load_cached_person_sets(
self, cache_dir: Optional[str] = None,
) -> Optional[dict[str, PersonSetCollection]]:
"""Save cached person set collections defined for a genotype group."""
cache_dir = self._ensure_cache_dir(cache_dir)
pscs_config = self.config.get("person_set_collections")
if pscs_config is None:
return {}
selected_pscs = pscs_config["selected_person_set_collections"]
try:
result = {}
for psc_id in selected_pscs:
cache_path = os.path.join(
cache_dir,
f"person_set_{psc_id}_cache.json.gz")
if not os.path.exists(cache_path):
logger.info(
"unable to load person sets collection <%s> cache "
"for study %s; missing file %s",
psc_id, self.study_id, cache_path)
return None
with gzip.open(cache_path, "rt") as infile:
content = infile.read()
data = json.loads(content)
psc = PersonSetCollection.from_json(data, self.families)
result[psc_id] = psc
return result
except BaseException: # pylint: disable=broad-except
logger.error(
"Failed to load cached person sets for study %s",
self.study_id, exc_info=True)
return None
[docs] def has_families_cache(self) -> bool:
"""Check cached families and person set collections."""
return self._has_cached_families_data() \
and self._has_cached_person_sets()
[docs] def load_families_cache(self) -> bool:
"""Load cached families and person set collections.
If cached families or persons are missing, returns False.
"""
families = self._load_cached_families_data()
if families is None:
return False
self._families = families
pscs = self._load_cached_person_sets()
if pscs is None:
return False
self._person_set_collections = pscs
return True
[docs] def save_families_cache(self, cache_dir: Optional[str] = None) -> None:
"""Load cached families and person set collections.
If cached families or persons are missing, returns False.
"""
self._save_cached_families_data(cache_dir)
self._save_cached_person_sets(cache_dir)
[docs] def rebuild_families(self) -> None:
"""Construct genotype group families data from child studies."""
logger.info(
"building combined families from studies: %s",
[st.study_id for st in self.studies])
if len(self.studies) == 1:
self._families = self.studies[0].families
self._person_set_collections = self._build_person_set_collections(
self.config.get("person_set_collections"),
self._families,
)
return
logger.info(
"combining families from study %s and from study %s",
self.studies[0].study_id, self.studies[1].study_id)
result = FamiliesData.combine(
self.studies[0].families,
self.studies[1].families)
if len(self.studies) > 2:
for sind in range(2, len(self.studies)):
logger.debug(
"processing study (%s): %s",
sind, self.studies[sind].study_id)
logger.info(
"combining families from studies (%s) %s with families "
"from study %s",
sind, [st.study_id for st in self.studies[:sind]],
self.studies[sind].study_id)
result = FamiliesData.combine(
result,
self.studies[sind].families,
forced=True)
self._families = result
pscs = self._build_person_set_collections(
self.config.get("person_set_collections"),
result,
)
self._person_set_collections = pscs
def _build_person_set_collection(
self, psc_config: dict[str, Any],
families: FamiliesData,
) -> PersonSetCollection:
psc_id = psc_config["id"]
studies_psc = []
for study in self.studies:
study_psc = study.get_person_set_collection(psc_id)
if study_psc is None:
raise ValueError(
f"person set collection {psc_id} "
f"not found in study {study.study_id}")
studies_psc.append(study_psc)
psc = PersonSetCollection.combine(studies_psc, families)
for fpid, person in families.real_persons.items():
person_set_value = psc.get_person_set_of_person(fpid)
assert person_set_value is not None
person.set_attr(psc_id, person_set_value.id)
return psc
[docs]class GenotypeDataStudy(GenotypeData):
"""Represents a singular genotype data study."""
def __init__(self, config: Box, backend: Any):
super().__init__(config, [self])
self._backend = backend
self. _person_set_collections = self._build_person_set_collections(
self.config.get("person_set_collections"),
self.families,
)
self.is_remote = False
@property
def study_phenotype(self) -> str:
return cast(str, self.config.get("study_phenotype", "-"))
@property
def is_group(self) -> bool:
return False
[docs] def get_studies_ids(self, leaves: bool = True) -> list[str]:
return [self.study_id]
@property
def families(self) -> FamiliesData:
return cast(FamiliesData, self._backend.families)
def _build_person_set_collection(
self, psc_config: dict[str, Any],
families: FamiliesData,
) -> PersonSetCollection:
psc = PersonSetCollection.from_families(psc_config, self.families)
for fpid, person in families.real_persons.items():
person_set_value = psc.get_person_set_of_person(fpid)
assert person_set_value is not None
person.set_attr(psc.id, person_set_value.id)
return psc