Source code for dae.person_sets

"""
Provide classes for grouping of individuals by some criteria.

This module provides functionality for grouping
individuals from a study or study group into various
sets based on what value they have in a given mapping.
"""
from __future__ import annotations

import logging
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any, FrozenSet, Optional, cast

from box import Box

from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Person
from dae.pheno.pheno_data import MeasureType, PhenotypeData
from dae.variants.attributes import Sex

logger = logging.getLogger(__name__)


[docs]@dataclass class ChildrenStats: """Statistics about children in a PersonSet.""" male: int female: int unspecified: int @property def total(self) -> int: return self.male + self.female + self.unspecified
[docs]@dataclass class ChildrenBySex: """Statistics about children in a PersonSet.""" male: set[tuple[str, str]] female: set[tuple[str, str]] unspecified: set[tuple[str, str]]
[docs]@dataclass class PersonSet: """Set of individuals mapped to a common value in the source.""" def __init__( self, psid: str, name: str, values: list[str], color: str, persons: dict[tuple[str, str], Person]): self.id: str = psid # pylint: disable=invalid-name self.name: str = name self.values: list[str] = values self.color: str = color assert all(not p.generated for p in persons.values()) self.persons: dict[tuple[str, str], Person] = persons self._children_by_sex: Optional[ChildrenBySex] = None self._children_stats: Optional[ChildrenStats] = None self._children: Optional[list[Person]] = None self._children_count: Optional[int] = None def __repr__(self) -> str: return f"PersonSet({self.id}: {self.name}, {len(self.persons)})" def __len__(self) -> int: return len(self.persons)
[docs] def get_children(self) -> list[Person]: """Return all children in the person set.""" if self._children is None: self._children = [] for person in self.persons.values(): if person.is_child(): self._children.append(person) return self._children
[docs] def get_children_count(self) -> int: if self._children_count is None: self._children_count = len(self.get_children()) return self._children_count
[docs] def get_children_by_sex(self) -> ChildrenBySex: """Return all children in the person set splitted by sex.""" if self._children_by_sex is None: self._children_by_sex = ChildrenBySex( set(), set(), set(), ) for child in self.get_children(): if child.sex == Sex.M: self._children_by_sex.male.add(child.fpid) elif child.sex == Sex.F: self._children_by_sex.female.add(child.fpid) else: assert child.sex == Sex.U self._children_by_sex.unspecified.add(child.fpid) assert self._children_by_sex is not None return self._children_by_sex
[docs] def get_children_stats(self) -> ChildrenStats: """Return statistics about children in the person set.""" if self._children_stats is None: children_by_sex = self.get_children_by_sex() self._children_stats = ChildrenStats( len(children_by_sex.male), len(children_by_sex.female), len(children_by_sex.unspecified), ) assert self._children_stats is not None return self._children_stats
[docs] def get_parents(self) -> Generator[Person, None, None]: for person in self.persons.values(): if person.is_parent(): yield person
[docs] def to_json(self) -> dict[str, Any]: return { "id": self.id, "name": self.name, "values": self.values, "color": self.color, "person_ids": list(self.persons.keys()), }
[docs] @staticmethod def from_json(json: dict[str, Any], families: FamiliesData) -> PersonSet: """Construct person set from a JSON dict.""" real_persons = families.real_persons persons = { pid: real_persons[pid] for pid in json["person_ids"] if pid in real_persons } return PersonSet( json["id"], json["name"], json["values"], json["color"], persons, )
[docs]class PersonSetCollection: """The collection of all possible person sets in a given source."""
[docs] @dataclass(frozen=True, eq=True) class Source: sfrom: str ssource: str
def __init__( self, pscid: str, name: str, config: dict[str, Any], sources: list[Source], person_sets: dict[str, PersonSet], default: PersonSet, families: FamiliesData): assert config.get("default") is not None self.id: str = pscid # pylint: disable=invalid-name self.name: str = name self.config = config self.sources = sources self.person_sets: dict[str, PersonSet] = person_sets self.default: PersonSet = default self.person_sets[default.id] = default self.families: FamiliesData = families def __repr__(self) -> str: return f"PersonSetCollection({self.id}: {self.person_sets})" def __len__(self) -> int: return len(self.person_sets)
[docs] def is_pedigree_only(self) -> bool: return all(s.sfrom == "pedigree" for s in self.sources)
@staticmethod def _sources_from_config( person_set_collection: dict[str, Any], ) -> list[Source]: sources = [ PersonSetCollection.Source(src["from"], src["source"]) for src in person_set_collection["sources"] ] return sources @staticmethod def _produce_sets(config: dict[str, Any]) -> dict[str, PersonSet]: """ Produce initial PersonSet instances. Initializes a dictionary of person set IDs mapped to empty PersonSet instances from a given configuration. """ person_set_configs = config["domain"] result = {} for person_set in person_set_configs: result[person_set["id"]] = PersonSet( person_set["id"], person_set["name"], person_set["values"], person_set["color"], {}, ) return result @staticmethod def _produce_default_person_set(config: dict[str, Any]) -> PersonSet: assert config["default"] is not None, config default_config = config["default"] return PersonSet( default_config["id"], default_config["name"], [], default_config["color"], {}, )
[docs] @staticmethod def get_person_color( person: Person, person_set_collection: PersonSetCollection, ) -> str: """Get the hex color value for a Person in a PersonSetCollection.""" if person.generated: return "#E0E0E0" if person_set_collection is None: return "#FFFFFF" matching_person_set = person_set_collection.get_person_set_of_person( person.fpid, ) if matching_person_set is not None: return matching_person_set.color logger.warning( "Person <%s> could not be found in any" " domain of <%s>!", person.fpid, person_set_collection.id, ) return "#AAAAAA"
[docs] @staticmethod def remove_empty_person_sets( person_set_collection: PersonSetCollection, ) -> PersonSetCollection: """Remove all empty person sets in a PersonSetCollection in place.""" empty_person_sets = set() for set_id, person_set in person_set_collection.person_sets.items(): if len(person_set.persons) == 0: empty_person_sets.add(set_id) logger.debug( "empty person sets to remove from person set collection <%s>: %s", person_set_collection.id, empty_person_sets) for set_id in empty_person_sets: del person_set_collection.person_sets[set_id] return person_set_collection
[docs] def collect_person_collection_attributes( self, person: Person, pheno_db: Optional[PhenotypeData], ) -> FrozenSet[str]: """Collect all configured attributes for a Person.""" values = [] for source in self.sources: if source.sfrom == "pedigree": value = person.get_attr(source.ssource) # Convert to string since some of the person's # attributes can be of an enum type if value is not None: value = str(value) elif source.sfrom == "phenodb" and pheno_db is not None: assert pheno_db.get_measure(source.ssource).measure_type \ in {MeasureType.categorical, MeasureType.ordinal}, \ f"Continuous measures not allowed in person sets! " \ f"({source.ssource})" pheno_values = list(pheno_db.get_people_measure_values( [source.ssource], person_ids=[person.person_id], )) if len(pheno_values) == 0: value = None else: value = pheno_values[0][source.ssource] else: raise ValueError(f"Invalid source type {source.sfrom}!") values.append(value) # make unified frozenset value return frozenset(values)
[docs] @staticmethod def from_families( psc_config: dict[str, Any], families_data: FamiliesData, pheno_db: Optional[PhenotypeData] = None, ) -> PersonSetCollection: """Produce a PersonSetCollection from a config and pedigree.""" collection = PersonSetCollection( psc_config["id"], psc_config["name"], psc_config, PersonSetCollection._sources_from_config(psc_config), PersonSetCollection._produce_sets(psc_config), PersonSetCollection._produce_default_person_set(psc_config), families_data, ) value_to_id = { frozenset(ps_config["values"]): ps_config["id"] for ps_config in psc_config["domain"] } logger.debug("person set collection value_to_id: %s", value_to_id) for person_id, person in families_data.real_persons.items(): assert not person.missing value = collection.collect_person_collection_attributes( person, pheno_db) if value not in value_to_id: collection.default.persons[person_id] = person else: set_id = value_to_id[value] collection.person_sets[set_id].persons[person_id] = person return PersonSetCollection.remove_empty_person_sets(collection)
[docs] @staticmethod def merge_configs( person_set_collections: list[PersonSetCollection], ) -> Box: """ Merge the configurations of a list of PersonSetCollection objects. Only supports merging PersonSetCollection objects with matching ids. The method will not merge the PersonSet objects' values. """ assert len(person_set_collections) > 0 collections_iterator = iter(person_set_collections) first = next(collections_iterator) result: dict[str, Any] = {} result["id"] = first.id result["name"] = first.name sources = [{ "from": "pedigree", "source": first.id, }] result["sources"] = sources result["default"] = { "id": first.default.id, "name": first.default.name, "color": first.default.color, } domain = {} for person_set in first.person_sets.values(): result_def = { "id": person_set.id, "name": person_set.name, "values": list(person_set.values), "color": person_set.color, } domain[person_set.id] = result_def for collection in collections_iterator: if result["id"] != collection.id: logger.error( "trying to merge different type of collections: %s <-> %s", collection.id, result["id"]) raise ValueError( "trying to merge different type of collections") for person_set in collection.person_sets.values(): if person_set.id in domain: # check if this person set is compatible # with the existing one pass else: result_def = { "id": person_set.id, "name": person_set.name, "values": list(person_set.values), "color": person_set.color, } domain[person_set.id] = result_def if first.default.id in domain: del domain[first.default.id] result["domain"] = [ domain[vid] for vid in sorted(domain.keys()) ] return Box(result)
[docs] def get_person_set( self, person_id: tuple[str, str], ) -> Optional[PersonSet]: for person_set in self.person_sets.values(): if person_id in person_set.persons: return person_set return None
[docs] def get_person_set_of_person( self, fpid: tuple[str, str], ) -> Optional[PersonSet]: """Retrieve the PersonSet associated with the given person identifier. Args: fpid (tuple[str, str]): The person identifier consisting of two strings - family ID and person ID. Returns: Optional[PersonSet]: The PersonSet associated with the given person identifier, or None if not found. """ result = self.get_person_set(fpid) if result is not None: return result return None
[docs] @staticmethod def combine( collections: list[PersonSetCollection], families: FamiliesData, ) -> PersonSetCollection: """Combine a list of PersonSetCollection objects into a single one.""" if len(collections) == 0: raise ValueError("can't combine empty list of collections") if len(collections) == 1: return collections[0] config = PersonSetCollection.merge_configs(collections) result = PersonSetCollection( config["id"], config["name"], config, PersonSetCollection._sources_from_config(config), PersonSetCollection._produce_sets(config), PersonSetCollection._produce_default_person_set(config), families) for person_id, person in families.real_persons.items(): person_set = None for psc in collections: person_set = psc.get_person_set(person_id) if person_set is not None: break if person_set is not None: result.person_sets[person_set.id].persons[person_id] = person else: result.default.persons[person_id] = person return PersonSetCollection.remove_empty_person_sets(result)
[docs] def config_json(self) -> dict[str, Any]: """Produce a JSON configuration for this PersonSetCollection object.""" domain = [] for person_set in self.person_sets.values(): if self.default.id == person_set.id: continue domain.append({ "id": person_set.id, "name": person_set.name, "values": person_set.values, "color": person_set.color, }) sources = [ {"from": s.sfrom, "source": s.ssource} for s in self.sources ] conf = { "id": self.id, "name": self.name, "sources": sources, "domain": domain, "default": { "id": self.default.id, "name": self.default.name, "color": self.default.color, }, } return conf
[docs] def domain_json(self) -> dict[str, Any]: """Produce a JSON to represent domain of this PersonSetCollection.""" domain = [] for person_set in self.person_sets.values(): domain.append({ "id": person_set.id, "name": person_set.name, "color": person_set.color, }) conf = { "id": self.id, "name": self.name, "domain": domain, } return conf
[docs] def get_stats(self) -> dict[str, dict[str, int]]: """ Return a dictionary with statistics for each PersonSet. The statistics are a dictionary containing the amount of parents and children in the set. """ result = {} for set_id, person_set in self.person_sets.items(): parents = len(list(person_set.get_parents())) children = len(list(person_set.get_children())) result[set_id] = { "parents": parents, "children": children, } return result
[docs] def to_json(self) -> dict[str, Any]: """Serialize a person sets collection to a json format.""" return { "config": self.config_json(), "person_sets": [ ps.to_json() for ps in self.person_sets.values() ], }
[docs] @staticmethod def from_json( data: dict[str, Any], families: FamiliesData, ) -> PersonSetCollection: """Construct person sets collection from json serialization.""" config = data["config"] psc = PersonSetCollection( config["id"], config["name"], config, PersonSetCollection._sources_from_config(config), PersonSetCollection._produce_sets(config), PersonSetCollection._produce_default_person_set(config), families, ) for ps_json in data["person_sets"]: person_set = psc.person_sets[ps_json["id"]] for fpid_json in ps_json["person_ids"]: fpid = cast(tuple[str, str], tuple(fpid_json)) person = families.persons[fpid] assert person.get_attr(psc.id) == person_set.id person_set.persons[fpid] = person PersonSetCollection.remove_empty_person_sets(psc) return psc
[docs]@dataclass class PSCQuery: """Person set collection query.""" collection_id: str selected_person_sets: set[str]