Source code for dae.variants_loaders.dae.loader

# pylint: disable=too-many-lines
# FIXME refactor and shorten this module
import argparse
import gzip
import logging
import os
import warnings
from collections.abc import Generator
from contextlib import closing
from typing import Any, Optional, Union

import fsspec
import numpy as np
import pandas as pd
import pysam

from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Family
from dae.utils import fs_utils
from dae.utils.dae_utils import dae2vcf_variant
from dae.utils.helpers import str2bool
from dae.utils.regions import Region
from dae.utils.variant_utils import (
    GenotypeType,
    get_locus_ploidy,
    str2gt,
    str2mat,
    str2mat_adjust_colsep,
)
from dae.variants.attributes import Inheritance, Role
from dae.variants.core import Allele
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import (
    SummaryVariant,
    SummaryVariantFactory,
    allele_type_from_cshl_variant,
)
from dae.variants_loaders.raw.loader import (
    CLIArgument,
    FamiliesGenotypes,
    FamilyGenotypeIterator,
    FullVariantsIterator,
    TransmissionType,
    VariantsGenotypesLoader,
)

logger = logging.getLogger(__name__)


[docs]class DenovoFamiliesGenotypes(FamiliesGenotypes): """Tuple of family, genotype, and best_state""" def __init__( self, family: Family, gt: np.ndarray, best_state: Optional[np.ndarray] = None) -> None: super().__init__() self.family = family self.genotype = gt self.best_state = best_state
[docs] def family_genotype_iterator( self, ) -> Generator[tuple[Family, np.ndarray, Optional[np.ndarray]], None, None]: yield self.family, self.genotype, self.best_state
[docs]class DenovoLoader(VariantsGenotypesLoader): """Load denovo variants.""" def __init__( self, families: FamiliesData, denovo_filename: str, genome: ReferenceGenome, regions: Optional[list[str]] = None, params: Optional[dict[str, Any]] = None, sort: bool = True) -> None: super().__init__( families=families, filenames=[denovo_filename], transmission_type=TransmissionType.denovo, genome=genome, regions=regions, expect_genotype=False, expect_best_state=False, params=params if params else {}) self.genome = genome self.set_attribute("source_type", "denovo") logger.debug( "loading denovo variants: %s; %s", denovo_filename, self.params) self.denovo_df, extra_attributes = self._flexible_denovo_load_internal( denovo_filename, genome, families=families, **self.params, ) self.set_attribute("extra_attributes", extra_attributes) self._init_chromosomes() if np.all(pd.isnull(self.denovo_df["genotype"])): self.expect_best_state = True elif np.all(pd.isnull(self.denovo_df["best_state"])): self.expect_genotype = True else: assert False if sort: self.denovo_df = self.denovo_df.sort_values( by=["chrom", "position", "reference", "alternative"]) def _init_chromosomes(self) -> None: self._chromosomes = list(self.denovo_df.chrom.unique()) self._chromosomes = [ self._adjust_chrom_prefix(chrom) for chrom in self._chromosomes ] all_chromosomes = self.genome.chromosomes if all(chrom in set(all_chromosomes) for chrom in self._chromosomes): self._chromosomes = sorted( self._chromosomes, key=all_chromosomes.index) @property def chromosomes(self) -> list[str]: return self._chromosomes
[docs] def reset_regions(self, regions: Optional[Union[str, list[str]]]) -> None: super().reset_regions(regions) result: list[Optional[Region]] = [] for reg in self.regions: if reg is None: result.append(reg) else: result.append(Region.from_str(reg)) self.regions = result # type: ignore logger.debug("denovo reset regions: %s", self.regions)
def _is_in_regions(self, summary_variant: SummaryVariant) -> bool: isin = [ r.isin( # type: ignore self._adjust_chrom_prefix(summary_variant.chrom), summary_variant.position, ) if r is not None else True for r in self.regions ] return any(isin) def _produce_family_variants( self, svariant: SummaryVariant, values: pd.Series, ) -> list[FamilyVariant]: fvs = [] extra_attributes_keys = list(filter( lambda x: x not in ["best_state", "family_id", "genotype"], values.keys(), )) for f_idx, family_id in enumerate( values.get("family_id")): # type: ignore best_state = values.get("best_state")[f_idx] # type: ignore genotypes = values.get("genotype")[f_idx] # type: ignore family = self.families.get(family_id) if family is None: continue family_genotypes = DenovoFamiliesGenotypes( family, genotypes, best_state) # type: ignore for fam, genotype, best_state in \ family_genotypes.family_genotype_iterator(): fvariant = FamilyVariant(svariant, fam, genotype, best_state) extra_attributes = {} for attr in extra_attributes_keys: attr_val = values.get(attr)[f_idx] # type: ignore extra_attributes[attr] = [attr_val] if genotype is None: fvariant.gt, fvariant._genetic_model = \ self._calc_genotype( # type: ignore fvariant, self.genome) for fallele in fvariant.alleles: fallele.gt = fvariant.gt # pylint: disable=protected-access fallele._genetic_model = fvariant.genetic_model fvariant.update_attributes(extra_attributes) fvs.append(fvariant) return fvs def _full_variants_iterator_impl(self) -> FullVariantsIterator: group = self.denovo_df.groupby( ["chrom", "position", "reference", "alternative"], sort=False).agg(list) for num_idx, (idx, values) in enumerate(group.iterrows()): chrom, position, reference, alternative = idx # type: ignore position = int(position) summary_records = [] for alt_index, alt in enumerate(alternative.split(",")): summary_records.append({ "chrom": chrom, "reference": reference, "alternative": alt, "position": position, "summary_variant_index": num_idx, "allele_index": alt_index + 1, "af_parents_called_count": None, "af_parents_called_percent": None, "af_allele_count": None, "af_allele_freq": None, "af_ref_allele_count": None, "af_ref_allele_freq": None, }) svariant = SummaryVariantFactory.summary_variant_from_records( summary_records, self.transmission_type, ) if not self._is_in_regions(svariant): continue fvs = self._produce_family_variants(svariant, values) yield svariant, fvs
[docs] def full_variants_iterator(self) -> Generator[ tuple[SummaryVariant, list[FamilyVariant]], None, None]: full_iterator = super().full_variants_iterator() for summary_variants, family_variants in full_iterator: for fvariant in family_variants: for fallele in fvariant.family_alt_alleles: inheritance = [ Inheritance.denovo if vinmem is not None and mem.role in set([ Role.prb, Role.sib, Role.unknown]) and inh in set([ Inheritance.unknown, Inheritance.possible_denovo, Inheritance.possible_omission]) else inh for inh, vinmem, mem in zip( fallele.inheritance_in_members, fallele.variant_in_members, fallele.members_in_order, ) ] # pylint: disable=protected-access fallele._inheritance_in_members = inheritance yield summary_variants, family_variants
[docs] def close(self) -> None: pass
[docs] @staticmethod def split_location(location: str) -> tuple[str, int]: chrom, position = location.split(":") return chrom, int(position)
[docs] @staticmethod def produce_genotype( chrom: str, pos: int, genome: ReferenceGenome, family: Family, members_with_variant: list[str]) -> np.ndarray: """Produce genotype.""" # TODO: Add support for multiallelic variants # This method currently assumes biallelic variants genotype = np.zeros(shape=(2, len(family)), dtype=GenotypeType) for person in family.members_in_order: index = family.members_index([person.person_id]) has_variant = int(person.person_id in members_with_variant) ploidy = get_locus_ploidy(chrom, pos, person.sex, genome) if ploidy == 2: genotype[1, index] = has_variant else: genotype[0, index] = has_variant genotype[1, index] = -2 # signifies haploid genotype return genotype
@classmethod def _arguments(cls) -> list[CLIArgument]: arguments = super()._arguments() arguments.append(CLIArgument( "denovo_file", value_type=str, metavar="<variants filename>", help_text="DAE denovo variants file", )) arguments.append(CLIArgument( "--denovo-variant", help_text="The label or index of the" " column containing the CSHL-style" " representation of the variant." "[Default: variant]", )) arguments.append(CLIArgument( "--denovo-ref", help_text="The label or index of the" " column containing the reference" " allele for the variant. [Default: none]", )) arguments.append(CLIArgument( "--denovo-alt", help_text="The label or index of the " " column containing the alternative" " allele for the variant. [Default: none]", )) arguments.append(CLIArgument( "--denovo-location", # default_value="location", help_text="The label or index of the" " column containing the CSHL-style" " location of the variant. [Default: location]", )) arguments.append(CLIArgument( "--denovo-chrom", help_text="The label or index of the" " column containing the chromosome" " upon which the variant is located. [Default: none]", )) arguments.append(CLIArgument( "--denovo-pos", help_text="The label or index of the" " column containing the position" " upon which the variant is located. [Default: none]", )) arguments.append(CLIArgument( "--denovo-family-id", # default_value="familyId", help_text="The label or index of the column containing the" " family's ID. [Default: familyId]", )) arguments.append(CLIArgument( "--denovo-best-state", # default_value="bestState", help_text="The label or index of the" " column containing the best state" " for the family. [Default: bestState]", )) arguments.append(CLIArgument( "--denovo-genotype", help_text="The label or index of the" " column containing the family genotype." " [Default: genotype]", )) arguments.append(CLIArgument( "--denovo-person-id", help_text="The label or index of the column containing the" " person's ID. [Default: none]", )) arguments.append(CLIArgument( "--denovo-sep", value_type=str, default_value="\t", help_text="Denovo file field separator [default: `\\t`]", )) return arguments # flake8: noqa: C901
[docs] @classmethod def parse_cli_arguments( cls, argv: argparse.Namespace, use_defaults: bool = False, ) -> tuple[list[str], dict[str, Any]]: # pylint: disable=too-many-branches logger.debug("CLI arguments: %s", argv) if argv.denovo_location and (argv.denovo_chrom or argv.denovo_pos): logger.error( "--denovo-location and (--denovo-chrom, --denovo-pos) " "are mutually exclusive", ) raise ValueError if argv.denovo_variant and (argv.denovo_ref or argv.denovo_alt): logger.error( "--denovo-variant and (denovo-ref, denovo-alt) " "are mutually exclusive", ) raise ValueError if argv.denovo_person_id and ( argv.denovo_family_id or argv.denovo_best_state): logger.error( "--denovo-person-id and (denovo-family-id, denovo-best-state) " "are mutually exclusive", ) raise ValueError if not (argv.denovo_location or (argv.denovo_chrom and argv.denovo_pos)): argv.denovo_location = "location" if not (argv.denovo_variant or (argv.denovo_ref and argv.denovo_alt)): argv.denovo_variant = "variant" if not (argv.denovo_person_id or (argv.denovo_family_id and ( argv.denovo_best_state or argv.denovo_genotype))): argv.denovo_family_id = "familyId" argv.denovo_best_state = "bestState" if not argv.denovo_location: if not argv.denovo_chrom: argv.denovo_chrom = "CHROM" if not argv.denovo_pos: argv.denovo_pos = "POS" if not argv.denovo_variant: if not argv.denovo_ref: argv.denovo_ref = "REF" if not argv.denovo_alt: argv.denovo_alt = "ALT" if not argv.denovo_person_id: if not argv.denovo_family_id: argv.denovo_family_id = "familyId" if not argv.denovo_best_state and not argv.denovo_genotype: argv.denovo_best_state = "bestState" assert not (argv.denovo_best_state and argv.denovo_genotype) if argv.denovo_sep is None: argv.denovo_sep = "\t" params = { "denovo_location": argv.denovo_location, "denovo_variant": argv.denovo_variant, "denovo_chrom": argv.denovo_chrom, "denovo_pos": argv.denovo_pos, "denovo_ref": argv.denovo_ref, "denovo_alt": argv.denovo_alt, "denovo_person_id": argv.denovo_person_id, "denovo_family_id": argv.denovo_family_id, "denovo_best_state": argv.denovo_best_state, "denovo_genotype": argv.denovo_genotype, "add_chrom_prefix": argv.add_chrom_prefix, "del_chrom_prefix": argv.del_chrom_prefix, "denovo_sep": argv.denovo_sep, } return argv.denovo_file, params
def _flexible_denovo_load_internal( self, filepath: str, genome: ReferenceGenome, families: FamiliesData, denovo_location: Optional[str] = None, denovo_variant: Optional[str] = None, denovo_chrom: Optional[str] = None, denovo_pos: Optional[str] = None, denovo_ref: Optional[str] = None, denovo_alt: Optional[str] = None, denovo_person_id: Optional[str] = None, denovo_family_id: Optional[str] = None, denovo_best_state: Optional[str] = None, denovo_genotype: Optional[str] = None, denovo_sep: str = "\t", **_kwargs: Any) -> tuple[pd.DataFrame, Any]: # FIXME # pylint: disable=too-many-arguments,too-many-branches # pylint: disable=too-many-statements,too-many-locals assert families is not None assert isinstance( families, FamiliesData, ), "families must be an instance of FamiliesData!" assert genome, "You must provide a genome object!" if not (denovo_location or (denovo_chrom and denovo_pos)): denovo_location = "location" if not (denovo_variant or (denovo_ref and denovo_alt)): denovo_variant = "variant" if not (denovo_person_id or (denovo_family_id and (denovo_best_state or denovo_genotype))): denovo_family_id = "familyId" denovo_best_state = "bestState" if denovo_sep is None: denovo_sep = "\t" with warnings.catch_warnings(record=True) as _: warnings.filterwarnings( "ignore", category=pd.errors.ParserWarning, message="Both a converter and dtype were specified", ) raw_df = pd.read_csv( filepath, sep=denovo_sep, converters={ denovo_pos: lambda p: int(p) if p else None, # type: ignore } if denovo_pos is not None else {}, dtype=str, comment="#", encoding="utf-8", na_filter=False) if denovo_location: chrom_col, pos_col = zip( *map(self.split_location, raw_df[denovo_location]), ) else: assert denovo_chrom is not None assert denovo_pos is not None chrom_col = raw_df.loc[:, denovo_chrom] # type: ignore pos_col = raw_df.loc[:, denovo_pos] # type: ignore if denovo_variant: variant_col = raw_df.loc[:, denovo_variant] ref_alt_tuples = [ dae2vcf_variant( self._adjust_chrom_prefix(variant_tuple[0]), variant_tuple[1], variant_tuple[2], genome, ) for variant_tuple in zip(chrom_col, pos_col, variant_col) ] pos_col, ref_col, alt_col = zip(*ref_alt_tuples) else: assert denovo_ref is not None assert denovo_alt is not None ref_col = raw_df.loc[:, denovo_ref] # type: ignore alt_col = raw_df.loc[:, denovo_alt] # type: ignore extra_attributes_cols = raw_df.columns.difference([ denovo_location, denovo_variant, denovo_chrom, denovo_pos, denovo_ref, denovo_alt, denovo_person_id, denovo_family_id, denovo_best_state, denovo_genotype, ]) if denovo_person_id: if denovo_family_id in raw_df.columns: temp_df = pd.DataFrame( { "chrom": chrom_col, "pos": pos_col, "ref": ref_col, "alt": alt_col, "family_id": raw_df.loc[:, denovo_family_id], "person_id": raw_df.loc[:, denovo_person_id], }, ) grouped = temp_df.groupby( ["chrom", "pos", "ref", "alt", "family_id"]) else: temp_df = pd.DataFrame( { "chrom": chrom_col, "pos": pos_col, "ref": ref_col, "alt": alt_col, "person_id": raw_df.loc[:, denovo_person_id], }, ) grouped = temp_df.groupby(["chrom", "pos", "ref", "alt"]) result = [] # TODO Implement support for multiallelic variants for variant, variants_indices in grouped.groups.items(): # Here we join and then split again by ',' to handle cases # where the person IDs are actually a list of IDs, separated # by a ',' person_ids = ",".join( temp_df.iloc[ variants_indices].loc[:, "person_id"], # type: ignore ).replace(";", ",").split(",") if "family_id" in temp_df.columns: family_id = variant[4] # type: ignore variant_families = { families.persons[(family_id, pid)].family_id for pid in person_ids} else: variant_families = set() for person_id in person_ids: if len(families.persons_by_person_id[person_id]) == 0: continue if len(families.persons_by_person_id[person_id]) == 1: variant_families.add( families.persons_by_person_id[person_id][0].family_id, ) else: logger.warning( "person %s is in multiple families: %s", person_id, [ f.family_id for f in families.persons_by_person_id[ person_id ] ], ) raise ValueError( f"person {person_id} in multiple families") # TODO Implement support for multiallelic variants for family_id in variant_families: family = families[family_id] family_dict = { "chrom": variant[0], # type: ignore "position": variant[1], # type: ignore "reference": variant[2], # type: ignore "alternative": variant[3], # type: ignore "family_id": family_id, "genotype": self.produce_genotype( variant[0], # type: ignore variant[1], # type: ignore genome, family, person_ids, ), "best_state": None, } record = raw_df.loc[variants_indices[0]] # type: ignore extra_attributes = record[extra_attributes_cols].to_dict() result.append({**family_dict, **extra_attributes}) denovo_df = pd.DataFrame(result) else: family_col = raw_df.loc[:, denovo_family_id] if denovo_best_state: best_state_col = list( map( str2mat_adjust_colsep, raw_df[denovo_best_state], ), ) # genotype_col = list(map(best2gt, best_state_col)) denovo_df = pd.DataFrame( { "chrom": chrom_col, "position": pos_col, "reference": ref_col, "alternative": alt_col, "family_id": family_col, "genotype": None, "best_state": best_state_col, }, ) else: assert denovo_genotype genotype_col = list( map( str2gt, raw_df[denovo_genotype], ), ) # genotype_col = list(map(best2gt, best_state_col)) denovo_df = pd.DataFrame( { "chrom": chrom_col, "position": pos_col, "reference": ref_col, "alternative": alt_col, "family_id": family_col, "genotype": genotype_col, "best_state": None, }, ) extra_attributes_df = raw_df[extra_attributes_cols] denovo_df = denovo_df.join(extra_attributes_df) return (denovo_df, extra_attributes_cols.tolist())
[docs] def flexible_denovo_load( self, filepath: str, genome: ReferenceGenome, families: FamiliesData, denovo_location: Optional[str] = None, denovo_variant: Optional[str] = None, denovo_chrom: Optional[str] = None, denovo_pos: Optional[str] = None, denovo_ref: Optional[str] = None, denovo_alt: Optional[str] = None, denovo_person_id: Optional[str] = None, denovo_family_id: Optional[str] = None, denovo_best_state: Optional[str] = None, denovo_genotype: Optional[str] = None, denovo_sep: str = "\t", **kwargs: Any) -> pd.DataFrame: """Load de Novo variants from a file. Read a text file containing variants in the form of delimiter-separted values and produce a dataframe. The text file may use different names for the columns containing the relevant data - these are specified with the provided parameters. :param str filepath: The path to the DSV file to read. :param genome: A reference genome object. :param str denovo_location: The label or index of the column containing the CSHL-style location of the variant. :param str denovo_variant: The label or index of the column containing the CSHL-style representation of the variant. :param str denovo_chrom: The label or index of the column containing the chromosome upon which the variant is located. :param str denovo_pos: The label or index of the column containing the position upon which the variant is located. :param str denovo_ref: The label or index of the column containing the variant's reference allele. :param str denovo_alt: The label or index of the column containing the variant's alternative allele. :param str denovo_person_id: The label or index of the column containing either a singular person ID or a comma-separated list of person IDs. :param str denovo_family_id: The label or index of the column containing a singular family ID. :param str denovo_best_state: The label or index of the column containing the best state for the variant. :param str families: An instance of the FamiliesData class for the pedigree of the relevant study. :type genome: An instance of Genome. :return: Dataframe containing the variants, with the following header - 'chrom', 'position', 'reference', 'alternative', 'family_id', 'genotype'. :rtype: An instance of Pandas' DataFrame class. """ # FIXME too many arguments # pylint: disable=too-many-arguments denovo_df, _ = self._flexible_denovo_load_internal( filepath, genome, families, denovo_location=denovo_location, denovo_variant=denovo_variant, denovo_chrom=denovo_chrom, denovo_pos=denovo_pos, denovo_ref=denovo_ref, denovo_alt=denovo_alt, denovo_person_id=denovo_person_id, denovo_family_id=denovo_family_id, denovo_best_state=denovo_best_state, denovo_genotype=denovo_genotype, denovo_sep=denovo_sep, **kwargs, ) return denovo_df
[docs]class DaeTransmittedFamiliesGenotypes(FamiliesGenotypes): """Tuple of families and family data""" def __init__( self, families: FamiliesData, family_data: dict[str, tuple[np.ndarray, np.ndarray]], ) -> None: super().__init__() self.families = families self.family_data = family_data
[docs] def get_family_read_counts(self, family: Family) -> Optional[np.ndarray]: fdata = self.family_data.get(family.family_id, None) if fdata is None: return None return fdata[1]
[docs] def family_genotype_iterator( self, ) -> FamilyGenotypeIterator: for family_id, (best_state, read_counts) in self.family_data.items(): fam = self.families.get(family_id) if fam is None: continue assert best_state is not None, (family_id, best_state, read_counts) yield fam, best_state, read_counts
[docs]class DaeTransmittedLoader(VariantsGenotypesLoader): """Loader for dae variants.""" def __init__( self, families: FamiliesData, summary_filename: str, genome: ReferenceGenome, regions: None = None, params: Optional[dict[str, Any]] = None, **_kwargs: Any, ) -> None: toomany_filename = DaeTransmittedLoader._build_toomany_filename( summary_filename, ) params = params if params else {} super().__init__( families=families, filenames=[summary_filename, toomany_filename], transmission_type=TransmissionType.transmitted, genome=genome, regions=regions, expect_genotype=False, expect_best_state=True, params=params, ) self.summary_filename = summary_filename self.toomany_filename = toomany_filename self.set_attribute("source_type", "dae") self.genome = genome self.params = params self.include_reference = self.params.get( "dae_include_reference_genotypes", False, ) try: # pylint: disable=no-member with pysam.Tabixfile(self.summary_filename) as tbx: self._chromosomes = \ [self._adjust_chrom_prefix(chrom) for chrom in tbx.contigs] except Exception: # pylint: disable=broad-except self._chromosomes = self.genome.chromosomes @property def chromosomes(self) -> list[str]: return self._chromosomes @property def variants_filenames(self) -> list[str]: return [self.summary_filename] @staticmethod def _build_toomany_filename(summary_filename: str) -> str: assert fs_utils.exists( f"{summary_filename}.tbi", ), f"summary filename tabix index missing {summary_filename}" dirname = os.path.dirname(summary_filename) basename = os.path.basename(summary_filename) if basename.endswith(".txt.bgz"): result = os.path.join(dirname, f"{basename[:-8]}-TOOMANY.txt.bgz") elif basename.endswith(".txt.gz"): result = os.path.join(dirname, f"{basename[:-7]}-TOOMANY.txt.gz") else: assert False, ( f"Bad summary filename {summary_filename}: " f"unexpected extention" ) assert fs_utils.exists(result), f"missing TOOMANY file {result}" assert fs_utils.exists( f"{result}.tbi", ), f"missing tabix index for TOOMANY file {result}" return result @staticmethod def _rename_columns(columns: list[str]) -> list[str]: if "#chr" in columns: columns[columns.index("#chr")] = "chrom" if "chr" in columns: columns[columns.index("chr")] = "chrom" if "position" in columns: columns[columns.index("position")] = "cshl_position" if "variant" in columns: columns[columns.index("variant")] = "cshl_variant" return columns @staticmethod def _load_column_names(filename: str) -> list[str]: with fsspec.open(filename) as rawfile: with gzip.open(rawfile) as infile: column_names = ( infile.readline().decode("utf-8").strip().split("\t") ) return column_names @classmethod def _load_toomany_columns(cls, toomany_filename: str) -> list[str]: toomany_columns = cls._load_column_names(toomany_filename) return cls._rename_columns(toomany_columns) @classmethod def _load_summary_columns(cls, summary_filename: str) -> list[str]: summary_columns = cls._load_column_names(summary_filename) return cls._rename_columns(summary_columns) def _summary_variant_from_dae_record( self, summary_index: int, rec: dict[str, Any], ) -> SummaryVariant: rec["cshl_position"] = int(rec["cshl_position"]) chrom = rec["chrom"] position, reference, alternative = dae2vcf_variant( self._adjust_chrom_prefix(chrom), rec["cshl_position"], rec["cshl_variant"], self.genome, ) allele = Allele.build_vcf_allele( chrom, position, reference, alternative) rec["chrom"] = allele.chrom rec["position"] = allele.position rec["end_position"] = allele.end_position rec["reference"] = allele.reference rec["alternative"] = allele.alternative rec["all.nParCalled"] = int(rec["all.nParCalled"]) rec["all.nAltAlls"] = int(rec["all.nAltAlls"]) rec["all.prcntParCalled"] = float(rec["all.prcntParCalled"]) rec["all.altFreq"] = float(rec["all.altFreq"]) rec["summary_variant_index"] = summary_index parents_called = int(rec.get("all.nParCalled", 0)) ref_allele_count = 2 * int(rec.get("all.nParCalled", 0)) - int( rec.get("all.nAltAlls", 0), ) ref_allele_prcnt = 0.0 if parents_called > 0: ref_allele_prcnt = ref_allele_count / 2.0 / parents_called ref = { "chrom": rec["chrom"], "position": rec["position"], "end_position": rec["position"], "reference": rec["reference"], "alternative": None, "variant_type": None, "cshl_position": rec["cshl_position"], "cshl_variant": rec["cshl_variant"], "summary_variant_index": rec["summary_variant_index"], "allele_index": 0, "af_parents_called_count": parents_called, "af_parents_called_percent": float( rec.get("all.prcntParCalled", 0.0), ), "af_allele_count": ref_allele_count, "af_allele_freq": ref_allele_prcnt, "hw": rec.get("HW"), } alt = { "chrom": rec["chrom"], "position": rec["position"], "end_position": rec["end_position"], "reference": rec["reference"], "alternative": rec["alternative"], "variant_type": allele_type_from_cshl_variant(rec["cshl_variant"]), "cshl_position": rec["cshl_position"], "cshl_variant": rec["cshl_variant"], "summary_variant_index": rec["summary_variant_index"], "allele_index": 1, "af_parents_called_count": int(rec.get("all.nParCalled", 0)), "af_parents_called_percent": float( rec.get("all.prcntParCalled", 0.0), ), "af_allele_count": int(rec.get("all.nAltAlls", 0)), "af_allele_freq": float(rec.get("all.altFreq", 0.0)), "hw": rec.get("HW"), } summary_variant = SummaryVariantFactory.summary_variant_from_records( [ref, alt], transmission_type=self.transmission_type, ) return summary_variant @staticmethod def _explode_family_data( family_data: str, ) -> dict[str, tuple[np.ndarray, np.ndarray]]: best_states = { fid: ( str2mat(bs, col_sep="", row_sep="/"), str2mat(rc, col_sep=" ", row_sep="/", dtype=np.int16), ) for (fid, bs, rc) in [ fg.split(":") for fg in family_data.split(";") ] } return best_states # @staticmethod # def _explode_family_read_counts(family_data): # read_counts = { # fid: str2mat(rc, col_sep=" ", row_sep="/") # for (fid, _bs, rc) in [ # fg.split(":") for fg in family_data.split(";") # ] # } # return read_counts
[docs] def close(self) -> None: pass
def _produce_family_variants( self, summary_variant: SummaryVariant, families_genotypes: DaeTransmittedFamiliesGenotypes, ) -> list[FamilyVariant]: family_variants = [] for (fam, best_state, read_counts) in \ families_genotypes.family_genotype_iterator(): fvariant = FamilyVariant( summary_variant, fam, None, best_state) fvariant.gt, fvariant._genetic_model = \ self._calc_genotype( fvariant, self.genome) for fallele in fvariant.alleles: fallele.gt = fvariant.gt # type: ignore # pylint: disable=protected-access fallele._genetic_model = fvariant._genetic_model # type: ignore fallele.update_attributes({"read_counts": read_counts}) family_variants.append(fvariant) return family_variants def _full_variants_iterator_impl(self) -> FullVariantsIterator: summary_columns = self._load_summary_columns(self.summary_filename) toomany_columns = self._load_toomany_columns(self.toomany_filename) summary_index = 0 for region in self.regions: try: # using a context manager because of # https://stackoverflow.com/a/25968716/2316754 # pylint: disable=no-member with closing(pysam.Tabixfile(self.summary_filename)) \ as sum_tbf, \ closing(pysam.Tabixfile(self.toomany_filename)) \ as too_tbf: region_unadjusted = ( self._unadjust_chrom_prefix(region) if region is not None else None ) summary_iterator = sum_tbf.fetch( region=region_unadjusted, parser=pysam.asTuple(), ) toomany_iterator = too_tbf.fetch( region=region_unadjusted, parser=pysam.asTuple(), ) for summary_line in summary_iterator: rec = dict(zip(summary_columns, summary_line)) try: summary_variant = \ self._summary_variant_from_dae_record( summary_index, rec) family_data = rec["familyData"] if family_data == "TOOMANY": toomany_line = next(toomany_iterator, None) if toomany_line is None: return toomany_rec = dict(zip( toomany_columns, toomany_line)) family_data = toomany_rec["familyData"] assert rec["cshl_position"] == int( toomany_rec["cshl_position"], ) family_data = self._explode_family_data( family_data) families_genotypes = \ DaeTransmittedFamiliesGenotypes( self.families, family_data) family_variants = self._produce_family_variants( summary_variant, families_genotypes) yield summary_variant, family_variants summary_index += 1 except Exception: # pylint: disable=broad-except logger.error( "unable to process summary line: %s " "from %s: %s", summary_line, self.summary_filename, self.regions, exc_info=True) except ValueError: logger.warning( "could not find region %s in %s or %s", region, self.summary_filename, self.toomany_filename, exc_info=True) @classmethod def _arguments(cls) -> list[CLIArgument]: arguments = super()._arguments() arguments.append(CLIArgument( "dae_summary_file", value_type=str, metavar="<summary filename>", help_text="summary variants file to import", )) arguments.append(CLIArgument( "--dae-include-reference-genotypes", default_value=False, help_text="fill in reference only variants [default: %(default)s]", action="store_true", )) return arguments
[docs] @classmethod def parse_cli_arguments( cls, argv: argparse.Namespace, use_defaults: bool = False, ) -> tuple[list[str], dict[str, Any]]: filename = argv.dae_summary_file params = { "dae_include_reference_genotypes": str2bool( argv.dae_include_reference_genotypes, ), "add_chrom_prefix": argv.add_chrom_prefix, "del_chrom_prefix": argv.del_chrom_prefix, } return filename, params