Source code for dae.pheno.pheno_db

# pylint: disable=too-many-lines
from __future__ import annotations
import os
import math
import logging
from typing import Iterable, Any, cast
from typing import Optional, Sequence, Union, Generator
from abc import ABC, abstractmethod

from collections import defaultdict
from itertools import chain
from box import Box

import pandas as pd
from sqlalchemy.sql import select, text, union
from sqlalchemy import not_

from dae.pedigrees.family import Person
from dae.pedigrees.families_data import FamiliesData
from dae.pheno.db import DbManager
from dae.pheno.common import MeasureType
from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.configuration.schemas.phenotype_data import pheno_conf_schema

from dae.variants.attributes import Sex, Status, Role
from dae.utils.helpers import isnan


logger = logging.getLogger(__name__)


[docs]def get_pheno_db_dir(dae_config: Optional[Box]) -> str: """Return the directory where phenotype data configurations are located.""" if dae_config is not None: if dae_config.phenotype_data is None or \ dae_config.phenotype_data.dir is None: pheno_data_dir = os.path.join( dae_config.conf_dir, "pheno") else: pheno_data_dir = dae_config.phenotype_data.dir else: pheno_data_dir = os.path.join( os.environ.get("DAE_DB_DIR", ""), "pheno") return pheno_data_dir
[docs]def get_pheno_browser_images_dir(dae_config: Optional[Box] = None) -> str: pheno_db_dir = os.environ.get( "DAE_PHENODB_DIR", get_pheno_db_dir(dae_config) ) browser_images_path = os.path.join(pheno_db_dir, "images") return browser_images_path
[docs]class Instrument: """ Instrument object represents phenotype instruments. Common fields are: * `instrument_name` * `measures` -- dictionary of all measures in the instrument """ def __init__(self, name: str) -> None: self.instrument_name = name self.measures: dict[str, Measure] = {} def __repr__(self) -> str: return f"Instrument({self.instrument_name}, {len(self.measures)})"
[docs]class Measure: """ Measure objects represent phenotype measures. Common fields are: * `instrument_name` * `measure_name` * `measure_id` - formed by `instrument_name`.`measure_name` * `measure_type` - one of 'continuous', 'ordinal', 'categorical' * `description` * `min_value` - for 'continuous' and 'ordinal' measures * `max_value` - for 'continuous' and 'ordinal' measures * `values_domain` - string that represents the values """ def __init__(self, measure_id: str, name: str) -> None: self.measure_id = measure_id self.name: str = name self.measure_name: str = name self.measure_type: MeasureType = MeasureType.other self.values_domain: Optional[str] = None self.instrument_name: Optional[str] = None self.description: Optional[str] = None self.default_filter = None self.min_value = None self.max_value = None def __repr__(self) -> str: return f"Measure({self.measure_id}, " \ f"{self.measure_type}, {self.values_domain})" @property def domain(self) -> Sequence[Union[str, float]]: """Return measure values domain.""" domain_list: Sequence[Union[str, float]] = [] if self.values_domain is not None: domain = ( self.values_domain.replace("[", "") .replace("]", "") .replace(" ", "") ) domain_list = domain.split(",") if self.measure_type in ( MeasureType.continuous, MeasureType.ordinal, ): return list(map(float, domain_list)) return domain_list @classmethod def _from_record(cls, row: dict[str, Any]) -> Measure: """Create `Measure` object from pandas data frame row.""" assert row["measure_type"] is not None mes = Measure(row["measure_id"], row["measure_name"]) mes.instrument_name = row["instrument_name"] mes.measure_name = row["measure_name"] mes.measure_type = row["measure_type"] mes.description = row["description"] mes.default_filter = row["default_filter"] mes.values_domain = row.get("values_domain") mes.min_value = row.get("min_value") mes.max_value = row.get("max_value") return mes
[docs] @classmethod def from_json(cls, json: dict[str, Any]) -> Measure: """Create `Measure` object from a JSON representation.""" assert json["measureType"] is not None mes = Measure(json["measureId"], json["measureName"]) mes.instrument_name = json["instrumentName"] mes.measure_name = json["measureName"] mes.measure_type = MeasureType.from_str(json["measureType"]) mes.description = json["description"] mes.default_filter = json["defaultFilter"] mes.values_domain = json.get("valuesDomain") mes.min_value = json.get("minValue") mes.max_value = json.get("maxValue") return mes
[docs] def to_json(self) -> dict[str, Any]: """Return measure description in JSON freindly format.""" result: dict[str, Any] = {} result["measureName"] = self.measure_name result["measureId"] = self.measure_id result["instrumentName"] = self.instrument_name result["measureType"] = self.measure_type.name result["description"] = self.description result["defaultFilter"] = self.default_filter result["valuesDomain"] = self.values_domain result["minValue"] = \ None if self.min_value is None or math.isnan(self.min_value) \ else self.min_value result["maxValue"] = \ None if self.max_value is None or math.isnan(self.max_value) \ else self.max_value return result
[docs]class PhenotypeData(ABC): """Base class for all phenotype data studies and datasets.""" def __init__(self, pheno_id: str) -> None: self._pheno_id: str = pheno_id self._measures: dict[str, Measure] = {} self._instruments: dict[str, Instrument] = {} self.families: FamiliesData @property def pheno_id(self) -> str: return self._pheno_id @property def measures(self) -> dict[str, Measure]: return self._measures @property def instruments(self) -> dict[str, Instrument]: return self._instruments
[docs] def get_instruments(self) -> list[str]: return cast(list[str], self.instruments.keys())
[docs] @abstractmethod def get_regressions(self) -> dict[str, Any]: pass
[docs] @abstractmethod def get_measures_info(self) -> dict[str, Any]: pass
[docs] @abstractmethod def get_persons_df( self, roles: Optional[Iterable[Role]] = None, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None) -> pd.DataFrame: pass
[docs] def get_persons( self, roles: Optional[Iterable[Role]] = None, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None) -> dict[str, Person]: """ Return individuals data from phenotype database. `roles` -- specifies persons of which role should be returned. If not specified returns all individuals from phenotype database. `person_ids` -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. `family_ids` -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. Returns a dictionary of (`personId`, `Person()`) where the `Person` object is the same object used into `VariantDB` families. """ persons = {} df = self.get_persons_df( roles=roles, person_ids=person_ids, family_ids=family_ids) for row in df.to_dict("records"): person_id = row["person_id"] person = Person(**row) # type: ignore assert row["role"] in Role, f"{row['role']} not a valid role" assert row["sex"] in Sex, f"{row['sex']} not a valid sex" assert row["status"] in Status, \ f"{row['status']} not a valid status" persons[person_id] = person return persons
[docs] @abstractmethod def search_measures( self, instrument: Optional[str], search_term: Optional[str] ) -> Generator[dict[str, Any], None, None]: pass
[docs] def has_measure(self, measure_id: str) -> bool: """Check if phenotype DB contains a measure by ID.""" return measure_id in self._measures
[docs] def get_measure(self, measure_id: str) -> Measure: """Return a measure by measure_id.""" assert measure_id in self._measures, measure_id return self._measures[measure_id]
[docs] def get_measures( self, instrument_name: Optional[str] = None, measure_type: Optional[str] = None ) -> dict[str, Measure]: """ Return a dictionary of measures objects. `instrument_name` -- an instrument name which measures should be returned. If not specified all type of measures are returned. `measure_type` -- a type ('continuous', 'ordinal' or 'categorical') of measures that should be returned. If not specified all type of measures are returned. """ result = {} instruments = self.instruments if instrument_name is not None: assert instrument_name in self.instruments instruments = { instrument_name: self.instruments[instrument_name] } type_query = None if measure_type is not None: type_query = MeasureType.from_str(measure_type) for _, instrument in instruments.items(): for measure in instrument.measures.values(): if type_query is not None and \ measure.measure_type != type_query: continue result[measure.measure_id] = measure return result
[docs] def get_measure_description(self, measure_id: str) -> dict[str, Any]: """Construct and return a measure description.""" measure = self.measures[measure_id] out = { "instrument_name": measure.instrument_name, "measure_name": measure.measure_name, "measure_type": measure.measure_type.name, "values_domain": measure.domain, } if not (measure.min_value is None or math.isnan(measure.min_value)): out["min_value"] = measure.min_value if not (measure.max_value is None or math.isnan(measure.max_value)): out["max_value"] = measure.max_value return out
[docs] @abstractmethod def get_measure_values_df( self, measure_id: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> pd.DataFrame: """Return a data frame with values for the specified `measure_id`. :param measure_id: -- a measure ID which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. :param default_filter: -- one of ('`skip`', '`apply`', '`invert`'). When the measure has a `default_filter` this argument specifies whether the filter should be applied or skipped or inverted. The returned data frame contains values of the measure for each individual. The person_id is used as key in the dictionary. """
[docs] def get_measure_values( self, measure_id: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> dict[str, Any]: """Return a dictionary with values for the specified `measure_id`. :param measure_id: -- a measure ID which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. :param default_filter: -- one of ('`skip`', '`apply`', '`invert`'). When the measure has a `default_filter` this argument specifies whether the filter should be applied or skipped or inverted. The returned dictionary contains values of the measure for each individual. The person_id is used as key in the dictionary. """ df = self.get_measure_values_df( measure_id, person_ids=person_ids, family_ids=family_ids, roles=roles, default_filter=default_filter) res = {} for row in df.to_dict("records"): res[row["person_id"]] = row[measure_id] return res
[docs] @abstractmethod def get_values_df( self, measure_ids: Iterable[str], person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> pd.DataFrame: """Return a data frame with values for all `measure_ids`. :param measure_ids: -- list of measure IDs which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. :param default_filter: -- one of ('`skip`', '`apply`', '`invert`'). When the measure has a `default_filter` this argument specifies whether the filter should be applied or skipped or inverted. """
[docs] def get_values( self, measure_ids: Iterable[str], person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> dict[str, dict[str, Any]]: """Return dictionary of dictionaries with values for all `measure_ids`. The returned dictionary uses `person_id` as key. The value for each key is a dictionary of measurement values for each ID in `measure_ids` keyed measure_id. :param measure_ids: -- list of measure IDs which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. :param default_filter: -- one of ('`skip`', '`apply`', '`invert`'). When the measure has a `default_filter` this argument specifies whether the filter should be applied or skipped or inverted. """ df = self.get_values_df( measure_ids, person_ids, family_ids, roles, default_filter) res: dict[str, dict[str, Any]] = {} for row in df.to_dict("records"): person_id = str(row["person_id"]) res[person_id] = cast(dict[str, Any], row) return res
[docs] def get_persons_values_df( self, measure_ids: Iterable[str], person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> pd.DataFrame: """ Return a data frame with measure values and person data. Collects values for all measures in `measure_ids` and joins with data frame returned by `get_persons_df`. """ persons_df = self.get_persons_df( roles=roles, person_ids=person_ids, family_ids=family_ids) value_df = self.get_values_df( measure_ids, person_ids=person_ids, family_ids=family_ids, roles=roles, default_filter=default_filter) df = persons_df.join( value_df.set_index("person_id"), on="person_id", how="right", rsuffix="_val") df = df.set_index("person_id") df = df.reset_index() return df
[docs] def get_instrument_measures(self, instrument_name: str) -> list[str]: """Return measures for given instrument.""" assert instrument_name in self.instruments instrument = self.instruments[instrument_name] measure_ids = [ m.measure_id for m in list(instrument.measures.values()) ] return measure_ids
[docs] def get_instrument_values_df( self, instrument_name: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, role: Optional[Iterable[Role]] = None, measure_ids: Optional[Iterable[str]] = None) -> pd.DataFrame: """ Return a dataframe with values for measures in given instrument. If not supplied a list of measure IDs, it will use all measures in the given instrument (see **get_values_df**) """ if measure_ids is None: measure_ids = self.get_instrument_measures(instrument_name) res = self.get_values_df(measure_ids, person_ids, family_ids, role) return res
[docs] def get_instrument_values( self, instrument_name: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, role: Optional[Iterable[Role]] = None, measure_ids: Optional[Iterable[str]] = None ) -> dict[str, dict[str, Any]]: """ Return a dictionary with values for measures in given instrument. If not supplied a list of measure IDs, it will use all measures in the given instrument (see :func:`get_values`) """ if measure_ids is None: measure_ids = self.get_instrument_measures(instrument_name) return self.get_values(measure_ids, person_ids, family_ids, role)
[docs] @abstractmethod def get_people_measure_values( self, measure_ids: list[str], person_ids: Optional[list[str]] = None, family_ids: Optional[list[str]] = None, roles: Optional[list[str]] = None, ) -> Generator[dict[str, Any], None, None]: """ Collect and format the values of the given measures in dict format. Yields a dict representing every row. `measure_ids` -- list of measure ids which values should be returned. `person_ids` -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. `family_ids` -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. `roles` -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. """ raise NotImplementedError()
[docs]class PhenotypeStudy(PhenotypeData): """ Main class for accessing phenotype database in DAE. To access the phenotype database create an instance of this class and call the method *load()*. Common fields of this class are: * `families` -- list of all families in the database * `persons` -- list of all individuals in the database * `instruments` -- dictionary of all instruments * `measures` -- dictionary of all measures """ def __init__( self, pheno_id: str, dbfile: str, browser_dbfile: Optional[str] = None, config: Optional[dict[str, str]] = None) -> None: super().__init__(pheno_id) self.db = DbManager(dbfile=dbfile, browser_dbfile=browser_dbfile) self.config = config self.db.build() self.families = self._load_families() self._instruments = self._load_instruments() def _get_measures_df( self, instrument: Optional[str] = None, measure_type: Optional[str] = None ) -> pd.DataFrame: """ Return data frame containing measures information. `instrument` -- an instrument name which measures should be returned. If not specified all type of measures are returned. `measure_type` -- a type ('continuous', 'ordinal' or 'categorical') of measures that should be returned. If not specified all type of measures are returned. Each row in the returned data frame represents given measure. Columns in the returned data frame are: `measure_id`, `measure_name`, `instrument_name`, `description`, `stats`, `min_value`, `max_value`, `value_domain`, `has_probands`, `has_siblings`, `has_parents`, `default_filter`. """ assert instrument is None or instrument in self.instruments assert measure_type is None or measure_type in set( ["continuous", "ordinal", "categorical", "unknown"] ) measure = self.db.measure columns = [ measure.c.measure_id, measure.c.instrument_name, measure.c.measure_name, measure.c.description, measure.c.measure_type, measure.c.individuals, measure.c.default_filter, measure.c.values_domain, measure.c.min_value, measure.c.max_value, ] query = select(*columns) query = query.where(not_(measure.c.measure_type.is_(None))) if instrument is not None: query = query.where(measure.c.instrument_name == instrument) if measure_type is not None: query = query.where(measure.c.measure_type == measure_type) df = pd.read_sql(query, self.db.pheno_engine) df_columns = [ "measure_id", "measure_name", "instrument_name", "description", "individuals", "measure_type", "default_filter", "values_domain", "min_value", "max_value", ] res_df = df[df_columns] return res_df def _load_instruments(self) -> dict[str, Instrument]: instruments = {} df = self._get_measures_df() instrument_names = list(df.instrument_name.unique()) instrument_names = sorted(instrument_names) for instrument_name in instrument_names: instrument = Instrument(instrument_name) measures = {} measures_df = df[df.instrument_name == instrument_name] for row in measures_df.to_dict("records"): # pylint: disable=protected-access measure = Measure._from_record(row) measures[measure.measure_name] = measure self._measures[measure.measure_id] = measure instrument.measures = measures instruments[instrument.instrument_name] = instrument return instruments def _load_families(self) -> FamiliesData: families = defaultdict(list) persons = self.get_persons() for person in list(persons.values()): families[person.family_id].append(person) return FamiliesData.from_family_persons(families)
[docs] def get_persons_df( self, roles: Optional[Iterable[Union[str, Role]]] = None, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None ) -> pd.DataFrame: """Return a individuals data from phenotype database as a data frame. :param roles: -- specifies persons of which role should be returned. If not specified returns all individuals from phenotype database. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. Each row of the returned data frame represnts a person from phenotype database. Columns returned are: `person_id`, `family_id`, `role`, `sex`. """ columns = [ self.db.family.c.family_id, self.db.person.c.person_id, self.db.person.c.role, self.db.person.c.status, self.db.person.c.sex, ] query = select(*columns) query = query.select_from(self.db.family.join(self.db.person)) if roles is not None: query = query.where(self.db.person.c.role.in_(roles)) if person_ids is not None: query = query.where(self.db.person.c.person_id.in_(person_ids)) if family_ids is not None: query = query.where(self.db.family.c.family_id.in_(family_ids)) df = pd.read_sql(query, self.db.pheno_engine) # df.rename(columns={'sex': 'sex'}, inplace=True) return df[["person_id", "family_id", "role", "sex", "status"]]
def _build_default_filter_clause( self, measure: Measure, default_filter: str ) -> Optional[str]: if default_filter == "skip" or measure.default_filter is None: return None if default_filter == "apply": return f"value {measure.default_filter}" if default_filter == "invert": return f"NOT (value {measure.default_filter})" raise ValueError( f"bad default_filter value: {default_filter}" ) def _raw_get_measure_values_df( self, measure: Measure, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Union[str, Role]]] = None, default_filter: str = "skip", ) -> pd.DataFrame: measure_type = measure.measure_type if measure_type is None: raise ValueError( f"bad measure: {measure.measure_id}; unknown value type" ) value_table = self.db.get_value_table(measure_type) columns = [ self.db.family.c.family_id, self.db.person.c.person_id, self.db.person.c.sex, self.db.person.c.status, value_table.c.value, ] query = select(*columns) query = query.select_from( value_table.join(self.db.measure) .join(self.db.person) .join(self.db.family) ) query = query.where(self.db.measure.c.measure_id == measure.measure_id) if roles is not None: query = query.where(self.db.person.c.role.in_(roles)) if person_ids is not None: query = query.where(self.db.person.c.person_id.in_(person_ids)) if family_ids is not None: query = query.where(self.db.family.c.family_id.in_(family_ids)) if measure.default_filter is not None: filter_clause = self._build_default_filter_clause( measure, default_filter ) if filter_clause is not None: query = query.where(text(filter_clause)) df = pd.read_sql(query, self.db.pheno_engine) df.rename(columns={"value": measure.measure_id}, inplace=True) return df
[docs] def get_measure_values_df( self, measure_id: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Union[str, Role]]] = None, default_filter: str = "apply" ) -> pd.DataFrame: """Return a data frame with values for the specified `measure_id`. :param measure_id: -- a measure ID which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data forindividuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are retuned. :param default_filter: -- one of ('`skip`', '`apply`', '`invert`'). When the measure has a `default_filter` this argument specifies whether the filter should be applied or skipped or inverted. The returned data frame contains two columns: `person_id` for individuals IDs and column named as `measure_id` values of the measure. """ assert measure_id in self.measures, measure_id measure = self.measures[measure_id] df = self._raw_get_measure_values_df( measure, person_ids=person_ids, family_ids=family_ids, roles=roles, default_filter=default_filter, ) return df[["person_id", measure_id]]
[docs] def get_values_df( self, measure_ids: Iterable[str], person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Union[str, Role]]] = None, default_filter: str = "apply" ) -> pd.DataFrame: """Return a data frame with values for given list of measures. Values are loaded using consecutive calls to `get_measure_values_df()` method for each measure in `measure_ids`. All data frames are joined in the end and returned. :param measure_ids: -- list of measure ids which values should be returned. :param person_ids: -- list of person IDs to filter result. Only data for individuals with person_id in the list `person_ids` are returned. :param family_ids: -- list of family IDs to filter result. Only data for individuals that are members of any of the specified `family_ids` are returned. :param roles: -- list of roles of individuals to select measure value for. If not specified value for individuals in all roles are returned. """ assert isinstance(measure_ids, list) assert len(measure_ids) >= 1 assert all(self.has_measure(m) for m in measure_ids) dfs = [ self.get_measure_values_df( m, person_ids, family_ids, roles, default_filter ) for m in measure_ids ] res_df = dfs[0] for i, df in enumerate(dfs[1:]): res_df = res_df.join( df.set_index("person_id"), on="person_id", how="outer", rsuffix=f"_val_{i}", ) return res_df
[docs] def get_people_measure_values( self, measure_ids: list[str], person_ids: Optional[list[str]] = None, family_ids: Optional[list[str]] = None, roles: Optional[list[str]] = None, ) -> Generator[dict[str, Any], None, None]: assert isinstance(measure_ids, list) assert len(measure_ids) >= 1 assert all(self.has_measure(m) for m in measure_ids), self.measures assert len(self.db.instrument_values_tables) > 0 measure_column_names = self.db.get_measure_column_names_reverse( measure_ids ) instrument_tables = {} instrument_table_columns = {} for instrument_name, table in self.db.instrument_values_tables.items(): skip_table = True for m_id in measure_ids: if m_id.startswith(instrument_name): skip_table = False if skip_table: continue instrument_tables[instrument_name] = table table_cols = [ c.label(measure_column_names[c.name]) for c in table.c if c.name in measure_column_names ] instrument_table_columns[instrument_name] = table_cols subquery_selects = [] for table in instrument_tables.values(): subquery_selects.append( select( table.c.person_id, table.c.family_id, table.c.role ).select_from(table) ) subquery = union(*subquery_selects).subquery("instruments_people") select_cols = [] for instrument_name, columns in instrument_table_columns.items(): select_cols.extend(columns) query = select( subquery.c.person_id, subquery.c.family_id, subquery.c.role, *select_cols ) query = query.select_from(subquery) for instrument_name in instrument_table_columns: table = instrument_tables[instrument_name] query = query.join( table, subquery.c.person_id == table.c.person_id, isouter=True, full=True ) if person_ids is not None: query = query.where( subquery.c.person_id.in_(person_ids) ) if family_ids is not None: query = query.where( subquery.c.family_id.in_(family_ids) ) if roles is not None: query = query.where( subquery.c.role.in_(roles) ) with self.db.pheno_engine.connect() as connection: result = connection.execute(query) for row in result: output = {**row._mapping} # pylint: disable=protected-access yield output
[docs] def get_regressions(self) -> dict[str, Any]: return self.db.regression_display_names_with_ids
def _get_pheno_images_base_url(self) -> Optional[str]: return None if self.config is None \ else self.config.get("browser_images_url")
[docs] def get_measures_info(self) -> dict[str, Any]: return { "base_image_url": self._get_pheno_images_base_url(), "has_descriptions": self.db.has_descriptions, "regression_names": self.db.regression_display_names, }
[docs] def search_measures( self, instrument: Optional[str], search_term: Optional[str] ) -> Generator[dict[str, Any], None, None]: measures = self.db.search_measures(instrument, search_term) for measure in measures: if measure["values_domain"] is None: measure["values_domain"] = "" measure["measure_type"] = \ cast(MeasureType, measure["measure_type"]).name measure["regressions"] = [] regressions = self.db.get_regression_values( measure["measure_id"]) or [] for reg in regressions: if isnan(reg["pvalue_regression_male"]): reg["pvalue_regression_male"] = "NaN" if isnan(reg["pvalue_regression_female"]): reg["pvalue_regression_female"] = "NaN" measure["regressions"].append(dict(reg)) yield { "measure": measure, }
[docs]class PhenotypeGroup(PhenotypeData): """Represents a group of phenotype data studies or groups.""" def __init__( self, pheno_id: str, phenotype_data: list[PhenotypeData], config: Optional[dict] = None ) -> None: super().__init__(pheno_id) self.phenotype_data = phenotype_data self.families = self._build_families() instruments, measures = self._merge_instruments( [ph.instruments for ph in self.phenotype_data]) self._instruments.update(instruments) self._measures.update(measures) self.config = config def _build_families(self) -> FamiliesData: phenos = self.phenotype_data logger.info( "building combined families from phenotype data: %s", [st.pheno_id for st in phenos]) if len(phenos) == 1: return FamiliesData.copy(phenos[0].families) logger.info( "combining families from phenotype data %s and %s", phenos[0].pheno_id, phenos[1].pheno_id) result = FamiliesData.combine( phenos[0].families, phenos[1].families) if len(phenos) > 2: for sind in range(2, len(phenos)): logger.debug( "processing pheno (%s): %s", sind, phenos[sind].pheno_id) logger.info( "combining families from pheno (%s) %s with families " "from pheno %s", sind, [st.pheno_id for st in phenos[:sind]], phenos[sind].pheno_id) result = FamiliesData.combine( result, phenos[sind].families, forced=True) return result @staticmethod def _merge_instruments( phenos_instruments: Iterable[dict[str, Instrument]] ) -> tuple[dict[str, Instrument], dict[str, Measure]]: group_instruments: dict[str, Instrument] = {} group_measures: dict[str, Measure] = {} for pheno_instruments in phenos_instruments: for instrument_name, instrument in pheno_instruments.items(): if instrument_name not in group_instruments: group_instrument = Instrument( instrument_name ) else: group_instrument = group_instruments[instrument_name] for name, measure in instrument.measures.items(): full_name = f"{instrument_name}.{name}" if full_name in group_measures: logger.warning( "%s measure duplication! ignoring", full_name) del group_instrument.measures[full_name] del group_measures[full_name] continue group_instrument.measures[full_name] = measure group_measures[full_name] = measure group_instruments[instrument_name] = group_instrument return group_instruments, group_measures
[docs] def get_persons_df( self, roles: Optional[Iterable[Role]] = None, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None ) -> pd.DataFrame: ped_df: pd.DataFrame = self.families.ped_df[[ "person_id", "family_id", "role", "sex", "status"]] if roles is not None: ped_df = ped_df[ped_df.role.isin(roles)] if person_ids is not None: ped_df = ped_df[ped_df.person_id.isin(person_ids)] if family_ids is not None: ped_df = ped_df[ped_df.family_id.isin(family_ids)] return ped_df
[docs] def get_measure_values_df( self, measure_id: str, person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply" ) -> pd.DataFrame: assert self.has_measure(measure_id), measure_id for pheno in self.phenotype_data: if pheno.has_measure(measure_id): return pheno.get_measure_values_df( measure_id, person_ids=person_ids, family_ids=family_ids, roles=roles, default_filter=default_filter ) # We should never get here msg = f"measure {measure_id} not found in phenotype group " \ f"{self.pheno_id}" logger.error(msg) raise ValueError(msg)
[docs] def get_values_df( self, measure_ids: Iterable[str], person_ids: Optional[Iterable[str]] = None, family_ids: Optional[Iterable[str]] = None, roles: Optional[Iterable[Role]] = None, default_filter: str = "apply") -> pd.DataFrame: assert all(self.has_measure(mid) for mid in measure_ids), measure_ids dfs = [] for pheno in self.phenotype_data: pheno_measure_ids = [] for mid in measure_ids: if pheno.has_measure(mid): pheno_measure_ids.append(mid) if pheno_measure_ids: df = pheno.get_values_df( pheno_measure_ids, person_ids=person_ids, family_ids=family_ids, roles=roles, default_filter=default_filter) dfs.append(df) assert len(dfs) > 0 if len(dfs) == 1: return dfs[0] res_df = dfs[0] for i, df in enumerate(dfs[1:]): res_df = res_df.join( df.set_index("person_id"), on="person_id", how="outer", rsuffix=f"_val_{i}") return res_df
[docs] def get_regressions(self) -> dict[str, Any]: res = {} for pheno in self.phenotype_data: res.update(pheno.get_regressions()) return res
[docs] def get_measures_info(self) -> dict[str, Any]: result = { "base_image_url": "", "has_descriptions": False, "regression_names": {} } for pheno in self.phenotype_data: measures_info = pheno.get_measures_info() result["has_descriptions"] = \ result["has_descriptions"] or measures_info["has_descriptions"] cast(dict, result["regression_names"]).update( measures_info["regression_names"] ) return result
[docs] def search_measures( self, instrument: Optional[str], search_term: Optional[str] ) -> Generator[dict[str, Any], None, None]: generators = [ pheno.search_measures(instrument, search_term) for pheno in self.phenotype_data ] measures = chain(*generators) yield from measures
[docs] def get_people_measure_values( self, measure_ids: list[str], person_ids: Optional[list[str]] = None, family_ids: Optional[list[str]] = None, roles: Optional[list[str]] = None, ) -> Generator[dict[str, Any], None, None]: raise NotImplementedError()
[docs]class PhenoDb: """Represents a phenotype databases stored in an sqlite database.""" def __init__(self, pheno_data_dir: str) -> None: super().__init__() configs = GPFConfigParser.load_directory_configs( pheno_data_dir, pheno_conf_schema ) self.config = { config.phenotype_data.name: config.phenotype_data for config in configs if config.phenotype_data and config.phenotype_data.enabled } self.pheno_cache: dict[str, PhenotypeData] = {}
[docs] def get_dbfile(self, pheno_id: str) -> str: return cast(str, self.config[pheno_id]["dbfile"])
[docs] def get_browser_dbfile(self, pheno_id: str) -> Optional[str]: config = self.get_dbconfig(pheno_id) if "browser_dbfile" in config: return cast(str, config["browser_dbfile"]) return None
[docs] def get_dbconfig(self, pheno_id: str) -> dict: return cast(dict, self.config[pheno_id])
[docs] def has_phenotype_data(self, pheno_id: str) -> bool: return pheno_id in self.config
[docs] def get_phenotype_data_ids(self) -> list[Union[Any, str]]: return list(self.config.keys())
[docs] def get_phenotype_data(self, pheno_id: str) -> PhenotypeData: """Construct and return a phenotype data with the specified ID.""" if not self.has_phenotype_data(pheno_id): raise ValueError(f"phenotype data <{pheno_id}> not found") if pheno_id in self.pheno_cache: return self.pheno_cache[pheno_id] phenotype_data: PhenotypeData config = self.get_dbconfig(pheno_id) if config.get("phenotype_data_list") is not None: logger.info("loading pheno db group <%s>", pheno_id) phenotype_studies = [ self.get_phenotype_data(ps_id) for ps_id in config["phenotype_data_list"] ] phenotype_data = PhenotypeGroup( pheno_id, phenotype_studies, config) else: logger.info("loading pheno db <%s>", pheno_id) phenotype_data = PhenotypeStudy( pheno_id, dbfile=self.get_dbfile(pheno_id), browser_dbfile=self.get_browser_dbfile(pheno_id), config=config ) self.pheno_cache[pheno_id] = phenotype_data return phenotype_data
[docs] def get_all_phenotype_data(self) -> list[PhenotypeData]: return [ self.get_phenotype_data(pheno_id) for pheno_id in self.get_phenotype_data_ids() ]
[docs] def get_phenotype_data_config( self, pheno_id: str ) -> Optional[dict[str, Any]]: return self.config.get(pheno_id)