Source code for dae.backends.dae.loader

# pylint: disable=too-many-lines
# FIXME refactor and shorten this module
import os
import gzip
import warnings
import logging
from typing import List, Optional, Dict, Any, Tuple
from contextlib import closing

import numpy as np

import pysam  # type: ignore
import pandas as pd

import fsspec  # type: ignore
from dae.utils.regions import Region
from dae.utils import fs_utils

from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.utils.variant_utils import str2mat, GENOTYPE_TYPE, str2gt
from dae.utils.helpers import str2bool

from dae.utils.dae_utils import dae2vcf_variant

from dae.pedigrees.family import Family, FamiliesData
from dae.variants.attributes import Inheritance, Role

from dae.variants.variant import SummaryVariantFactory, \
    allele_type_from_cshl_variant
from dae.variants.family_variant import FamilyVariant

from dae.backends.raw.loader import (
    VariantsGenotypesLoader,
    TransmissionType,
    FamiliesGenotypes,
    CLIArgument
)

from dae.utils.variant_utils import get_locus_ploidy


logger = logging.getLogger(__name__)


[docs]class DenovoFamiliesGenotypes(FamiliesGenotypes): """Tuple of family, genotype, and best_state""" def __init__(self, family, gt, best_state=None): super().__init__() self.family = family self.genotype = gt self.best_state = best_state
[docs] def full_families_genotypes(self): raise NotImplementedError()
[docs] def get_family_genotype(self, family): assert family.family_id == self.family.family_id return self.genotype
[docs] def get_family_best_state(self, family): assert family.family_id == self.family.family_id return self.best_state
[docs] def family_genotype_iterator(self): yield self.family, self.genotype, self.best_state
[docs]class DenovoLoader(VariantsGenotypesLoader): """Load denovo variants.""" def __init__( self, families: FamiliesData, denovo_filename: str, genome: ReferenceGenome, regions: List[str] = None, params: Optional[Dict[str, Any]] = None, sort: bool = True): super().__init__( families=families, filenames=[denovo_filename], transmission_type=TransmissionType.denovo, genome=genome, regions=regions, expect_genotype=False, expect_best_state=False, params=params if params else {}) self.genome = genome self.set_attribute("source_type", "denovo") logger.debug( "loading denovo variants: %s; %s", denovo_filename, self.params) self.denovo_df, extra_attributes = self._flexible_denovo_load_internal( denovo_filename, genome, families=families, adjust_chrom_prefix=self._adjust_chrom_prefix, **self.params, ) self.set_attribute("extra_attributes", extra_attributes) self._init_chromosomes() if np.all(pd.isnull(self.denovo_df["genotype"])): self.expect_best_state = True elif np.all(pd.isnull(self.denovo_df["best_state"])): self.expect_genotype = True else: assert False if sort: self.denovo_df = self.denovo_df.sort_values( by=["chrom", "position", "reference", "alternative"]) def _init_chromosomes(self): self.chromosomes = list(self.denovo_df.chrom.unique()) self.chromosomes = [ self._adjust_chrom_prefix(chrom) for chrom in self.chromosomes ] all_chromosomes = self.genome.chromosomes if all(chrom in set(all_chromosomes) for chrom in self.chromosomes): self.chromosomes = sorted( self.chromosomes, key=all_chromosomes.index)
[docs] def reset_regions(self, regions): super().reset_regions(regions) result = [] for reg in self.regions: if reg is None: result.append(reg) else: result.append(Region.from_str(reg)) self.regions = result logger.debug("denovo reset regions: %s", self.regions)
def _is_in_regions(self, summary_variant): isin = [ r.isin(summary_variant.chrom, summary_variant.position) if r is not None else True for r in self.regions ] return any(isin) def _produce_family_variants(self, svariant, values): fvs = [] extra_attributes_keys = list(filter( lambda x: x not in ["best_state", "family_id", "genotype"], values.keys() )) for f_idx, family_id in enumerate(values.get("family_id")): best_state = values.get("best_state")[f_idx] genotypes = values.get("genotype")[f_idx] family = self.families.get(family_id) if family is None: continue family_genotypes = DenovoFamiliesGenotypes( family, genotypes, best_state) for fam, genotype, best_state in \ family_genotypes.family_genotype_iterator(): fvariant = FamilyVariant(svariant, fam, genotype, best_state) extra_attributes = {} for attr in extra_attributes_keys: attr_val = values.get(attr)[f_idx] extra_attributes[attr] = [attr_val] if genotype is None: fvariant.gt, fvariant._genetic_model = self._calc_genotype( fvariant, self.genome) for fallele in fvariant.alleles: fallele.gt = fvariant.gt # pylint: disable=protected-access fallele._genetic_model = fvariant.genetic_model fvariant.update_attributes(extra_attributes) fvs.append(fvariant) return fvs def _full_variants_iterator_impl(self): for region in self.regions: if region is None: continue region.chrom = self._adjust_chrom_prefix(region.chrom) group = self.denovo_df.groupby( ["chrom", "position", "reference", "alternative"], sort=False).agg(list) for num_idx, (idx, values) in enumerate(group.iterrows()): chrom, position, reference, alternative = idx position = int(position) summary_records = [] for alt_index, alt in enumerate(alternative.split(",")): summary_records.append({ "chrom": chrom, "reference": reference, "alternative": alt, "position": position, "summary_variant_index": num_idx, "allele_index": alt_index + 1, }) svariant = SummaryVariantFactory.summary_variant_from_records( summary_records, self.transmission_type ) if not self._is_in_regions(svariant): continue fvs = self._produce_family_variants(svariant, values) yield svariant, fvs
[docs] def full_variants_iterator(self): full_iterator = super().full_variants_iterator() for summary_variants, family_variants in full_iterator: for fvariant in family_variants: for fallele in fvariant.alt_alleles: inheritance = [ Inheritance.denovo if vinmem is not None and mem.role in set([ Role.prb, Role.sib, Role.unknown]) and inh in set([ Inheritance.unknown, Inheritance.possible_denovo, Inheritance.possible_omission]) else inh for inh, vinmem, mem in zip( fallele.inheritance_in_members, fallele.variant_in_members, fallele.members_in_order ) ] # pylint: disable=protected-access fallele._inheritance_in_members = inheritance yield summary_variants, family_variants
[docs] def close(self): pass
[docs] @staticmethod def split_location(location): chrom, position = location.split(":") return chrom, int(position)
[docs] @staticmethod def produce_genotype( chrom: str, pos: int, genome: ReferenceGenome, family: Family, members_with_variant: List[str]) -> np.ndarray: """Produce genotype.""" # TODO: Add support for multiallelic variants # This method currently assumes biallelic variants genotype = np.zeros(shape=(2, len(family)), dtype=GENOTYPE_TYPE) for person_id, person in family.persons.items(): index = family.members_index([person_id]) has_variant = int(person_id in members_with_variant) ploidy = get_locus_ploidy(chrom, pos, person.sex, genome) if ploidy == 2: genotype[1, index] = has_variant else: genotype[0, index] = has_variant genotype[1, index] = -2 # signifies haploid genotype return genotype
@classmethod def _arguments(cls): arguments = super()._arguments() arguments.append(CLIArgument( "denovo_file", value_type=str, metavar="<variants filename>", help_text="DAE denovo variants file", )) arguments.append(CLIArgument( "--denovo-variant", help_text="The label or index of the" " column containing the CSHL-style" " representation of the variant." "[Default: variant]", )) arguments.append(CLIArgument( "--denovo-ref", help_text="The label or index of the" " column containing the reference" " allele for the variant. [Default: none]", )) arguments.append(CLIArgument( "--denovo-alt", help_text="The label or index of the " " column containing the alternative" " allele for the variant. [Default: none]", )) arguments.append(CLIArgument( "--denovo-location", # default_value="location", help_text="The label or index of the" " column containing the CSHL-style" " location of the variant. [Default: location]", )) arguments.append(CLIArgument( "--denovo-chrom", help_text="The label or index of the" " column containing the chromosome" " upon which the variant is located. [Default: none]", )) arguments.append(CLIArgument( "--denovo-pos", help_text="The label or index of the" " column containing the position" " upon which the variant is located. [Default: none]", )) arguments.append(CLIArgument( "--denovo-family-id", # default_value="familyId", help_text="The label or index of the column containing the" " family's ID. [Default: familyId]", )) arguments.append(CLIArgument( "--denovo-best-state", # default_value="bestState", help_text="The label or index of the" " column containing the best state" " for the family. [Default: bestState]", )) arguments.append(CLIArgument( "--denovo-genotype", help_text="The label or index of the" " column containing the family genotype." " [Default: genotype]", )) arguments.append(CLIArgument( "--denovo-person-id", help_text="The label or index of the column containing the" " person's ID. [Default: none]", )) arguments.append(CLIArgument( "--denovo-sep", value_type=str, default_value="\t", help_text="Denovo file field separator [default: `\\t`]", )) return arguments # flake8: noqa: C901
[docs] @classmethod def parse_cli_arguments(cls, argv, use_defaults=False): # pylint: disable=too-many-branches logger.debug("CLI arguments: %s", argv) if argv.denovo_location and (argv.denovo_chrom or argv.denovo_pos): logger.error( "--denovo-location and (--denovo-chrom, --denovo-pos) " "are mutually exclusive" ) raise ValueError() if argv.denovo_variant and (argv.denovo_ref or argv.denovo_alt): logger.error( "--denovo-variant and (denovo-ref, denovo-alt) " "are mutually exclusive" ) raise ValueError() if argv.denovo_person_id and ( argv.denovo_family_id or argv.denovo_best_state): logger.error( "--denovo-person-id and (denovo-family-id, denovo-best-state) " "are mutually exclusive" ) raise ValueError() if not (argv.denovo_location or (argv.denovo_chrom and argv.denovo_pos)): argv.denovo_location = "location" if not (argv.denovo_variant or (argv.denovo_ref and argv.denovo_alt)): argv.denovo_variant = "variant" if not (argv.denovo_person_id or (argv.denovo_family_id and ( argv.denovo_best_state or argv.denovo_genotype))): argv.denovo_family_id = "familyId" argv.denovo_best_state = "bestState" if not argv.denovo_location: if not argv.denovo_chrom: argv.denovo_chrom = "CHROM" if not argv.denovo_pos: argv.denovo_pos = "POS" if not argv.denovo_variant: if not argv.denovo_ref: argv.denovo_ref = "REF" if not argv.denovo_alt: argv.denovo_alt = "ALT" if not argv.denovo_person_id: if not argv.denovo_family_id: argv.denovo_family_id = "familyId" if not argv.denovo_best_state and not argv.denovo_genotype: argv.denovo_best_state = "bestState" assert not (argv.denovo_best_state and argv.denovo_genotype) if argv.denovo_sep is None: argv.denovo_sep = "\t" params = { "denovo_location": argv.denovo_location, "denovo_variant": argv.denovo_variant, "denovo_chrom": argv.denovo_chrom, "denovo_pos": argv.denovo_pos, "denovo_ref": argv.denovo_ref, "denovo_alt": argv.denovo_alt, "denovo_person_id": argv.denovo_person_id, "denovo_family_id": argv.denovo_family_id, "denovo_best_state": argv.denovo_best_state, "denovo_genotype": argv.denovo_genotype, "add_chrom_prefix": argv.add_chrom_prefix, "del_chrom_prefix": argv.del_chrom_prefix, "denovo_sep": argv.denovo_sep, } return argv.denovo_file, params
@classmethod def _flexible_denovo_load_internal( cls, filepath: str, genome: ReferenceGenome, families: FamiliesData, denovo_location: Optional[str] = None, denovo_variant: Optional[str] = None, denovo_chrom: Optional[str] = None, denovo_pos: Optional[str] = None, denovo_ref: Optional[str] = None, denovo_alt: Optional[str] = None, denovo_person_id: Optional[str] = None, denovo_family_id: Optional[str] = None, denovo_best_state: Optional[str] = None, denovo_genotype: Optional[str] = None, denovo_sep: str = "\t", adjust_chrom_prefix=None, **_kwargs) -> Tuple[pd.DataFrame, Any]: # FIXME # pylint: disable=too-many-arguments,too-many-branches # pylint: disable=too-many-statements,too-many-locals assert families is not None assert isinstance( families, FamiliesData ), "families must be an instance of FamiliesData!" assert genome, "You must provide a genome object!" if not (denovo_location or (denovo_chrom and denovo_pos)): denovo_location = "location" if not (denovo_variant or (denovo_ref and denovo_alt)): denovo_variant = "variant" if not (denovo_person_id or (denovo_family_id and (denovo_best_state or denovo_genotype))): denovo_family_id = "familyId" denovo_best_state = "bestState" if denovo_sep is None: denovo_sep = "\t" with warnings.catch_warnings(record=True) as _: warnings.filterwarnings( "ignore", category=pd.errors.ParserWarning, message="Both a converter and dtype were specified", ) raw_df = pd.read_csv( filepath, sep=denovo_sep, converters={ denovo_pos: lambda p: int(p) if p else None, }, dtype=str, comment="#", encoding="utf-8", na_filter=False) if denovo_location: chrom_col, pos_col = zip( *map(cls.split_location, raw_df[denovo_location]) ) else: assert denovo_chrom is not None assert denovo_pos is not None chrom_col = raw_df.loc[:, denovo_chrom] pos_col = raw_df.loc[:, denovo_pos] if adjust_chrom_prefix is not None: chrom_col = tuple(map(adjust_chrom_prefix, chrom_col)) if denovo_variant: variant_col = raw_df.loc[:, denovo_variant] ref_alt_tuples = [ dae2vcf_variant( variant_tuple[0], variant_tuple[1], variant_tuple[2], genome ) for variant_tuple in zip(chrom_col, pos_col, variant_col) ] pos_col, ref_col, alt_col = zip(*ref_alt_tuples) else: assert denovo_ref is not None assert denovo_alt is not None ref_col = raw_df.loc[:, denovo_ref] alt_col = raw_df.loc[:, denovo_alt] extra_attributes_cols = raw_df.columns.difference([ denovo_location, denovo_variant, denovo_chrom, denovo_pos, denovo_ref, denovo_alt, denovo_person_id, denovo_family_id, denovo_best_state, denovo_genotype ]) if denovo_person_id: temp_df = pd.DataFrame( { "chrom": chrom_col, "pos": pos_col, "ref": ref_col, "alt": alt_col, "person_id": raw_df.loc[:, denovo_person_id], } ) grouped = temp_df.groupby(["chrom", "pos", "ref", "alt"]) result = [] # TODO Implement support for multiallelic variants for variant, variants_indices in grouped.groups.items(): # Here we join and then split again by ',' to handle cases # where the person IDs are actually a list of IDs, separated # by a ',' person_ids = ",".join( temp_df.iloc[variants_indices].loc[:, "person_id"] ).split(",") variant_families = { families.persons[pid].family_id for pid in filter( lambda p: p in families.persons, person_ids)} # TODO Implement support for multiallelic variants for family_id in variant_families: family = families[family_id] family_dict = { "chrom": variant[0], "position": variant[1], "reference": variant[2], "alternative": variant[3], "family_id": family_id, "genotype": cls.produce_genotype( variant[0], variant[1], genome, family, person_ids, ), "best_state": None, } record = raw_df.loc[variants_indices[0]] extra_attributes = record[extra_attributes_cols].to_dict() result.append({**family_dict, **extra_attributes}) denovo_df = pd.DataFrame(result) else: family_col = raw_df.loc[:, denovo_family_id] if denovo_best_state: best_state_col = list( map( lambda bs: str2mat(bs, col_sep=" "), # type: ignore raw_df[denovo_best_state], ) ) # genotype_col = list(map(best2gt, best_state_col)) denovo_df = pd.DataFrame( { "chrom": chrom_col, "position": pos_col, "reference": ref_col, "alternative": alt_col, "family_id": family_col, "genotype": None, "best_state": best_state_col, } ) else: assert denovo_genotype genotype_col = list( map( str2gt, raw_df[denovo_genotype], ) ) # genotype_col = list(map(best2gt, best_state_col)) denovo_df = pd.DataFrame( { "chrom": chrom_col, "position": pos_col, "reference": ref_col, "alternative": alt_col, "family_id": family_col, "genotype": genotype_col, "best_state": None, } ) extra_attributes_df = raw_df[extra_attributes_cols] denovo_df = denovo_df.join(extra_attributes_df) return (denovo_df, extra_attributes_cols.tolist())
[docs] @classmethod def flexible_denovo_load( cls, filepath: str, genome: ReferenceGenome, families: FamiliesData, denovo_location: Optional[str] = None, denovo_variant: Optional[str] = None, denovo_chrom: Optional[str] = None, denovo_pos: Optional[str] = None, denovo_ref: Optional[str] = None, denovo_alt: Optional[str] = None, denovo_person_id: Optional[str] = None, denovo_family_id: Optional[str] = None, denovo_best_state: Optional[str] = None, denovo_genotype: Optional[str] = None, denovo_sep: str = "\t", adjust_chrom_prefix=None, **kwargs) -> pd.DataFrame: """Load de Novo variants from a file. Read a text file containing variants in the form of delimiter-separted values and produce a dataframe. The text file may use different names for the columns containing the relevant data - these are specified with the provided parameters. :param str filepath: The path to the DSV file to read. :param genome: A reference genome object. :param str denovo_location: The label or index of the column containing the CSHL-style location of the variant. :param str denovo_variant: The label or index of the column containing the CSHL-style representation of the variant. :param str denovo_chrom: The label or index of the column containing the chromosome upon which the variant is located. :param str denovo_pos: The label or index of the column containing the position upon which the variant is located. :param str denovo_ref: The label or index of the column containing the variant's reference allele. :param str denovo_alt: The label or index of the column containing the variant's alternative allele. :param str denovo_person_id: The label or index of the column containing either a singular person ID or a comma-separated list of person IDs. :param str denovo_family_id: The label or index of the column containing a singular family ID. :param str denovo_best_state: The label or index of the column containing the best state for the variant. :param str families: An instance of the FamiliesData class for the pedigree of the relevant study. :type genome: An instance of Genome. :return: Dataframe containing the variants, with the following header - 'chrom', 'position', 'reference', 'alternative', 'family_id', 'genotype'. :rtype: An instance of Pandas' DataFrame class. """ # FIXME too many arguments # pylint: disable=too-many-arguments denovo_df, _ = cls._flexible_denovo_load_internal( filepath, genome, families, denovo_location=denovo_location, denovo_variant=denovo_variant, denovo_chrom=denovo_chrom, denovo_pos=denovo_pos, denovo_ref=denovo_ref, denovo_alt=denovo_alt, denovo_person_id=denovo_person_id, denovo_family_id=denovo_family_id, denovo_best_state=denovo_best_state, denovo_genotype=denovo_genotype, denovo_sep=denovo_sep, adjust_chrom_prefix=adjust_chrom_prefix, **kwargs ) return denovo_df
[docs]class DaeTransmittedFamiliesGenotypes(FamiliesGenotypes): """Tuple of families and family data""" def __init__(self, families, family_data): super().__init__() self.families = families self.family_data = family_data # def get_family_genotype(self, family): # gt = self.families_genotypes.get(family.family_id, None) # if gt is not None: # return gt # else: # # FIXME: what genotype we should return in case # # we have no data in the file: # # - reference # # - unknown # return reference_genotype(len(family))
[docs] def get_family_best_state(self, family): fdata = self.family_data.get(family.family_id, None) if fdata is None: return None return fdata[0]
[docs] def get_family_read_counts(self, family): fdata = self.family_data.get(family.family_id, None) if fdata is None: return None return fdata[1]
[docs] def get_family_genotype(self, family): raise NotImplementedError()
[docs] def family_genotype_iterator(self): for family_id, (best_state, read_counts) in self.family_data.items(): fam = self.families.get(family_id) if fam is None: continue assert best_state is not None, (family_id, best_state, read_counts) yield fam, best_state, read_counts
[docs] def full_families_genotypes(self): raise NotImplementedError()
# return self.families_genotypes
[docs]class DaeTransmittedLoader(VariantsGenotypesLoader): """Loader for dae variants.""" def __init__( self, families, summary_filename, genome, regions=None, params=None, **_kwargs, ): toomany_filename = DaeTransmittedLoader._build_toomany_filename( summary_filename ) params = params if params else {} super().__init__( families=families, filenames=[summary_filename, toomany_filename], transmission_type=TransmissionType.transmitted, genome=genome, regions=regions, expect_genotype=False, expect_best_state=True, params=params, ) self.summary_filename = summary_filename self.toomany_filename = toomany_filename self.set_attribute("source_type", "dae") self.genome = genome self.params = params self.include_reference = self.params.get( "dae_include_reference_genotypes", False ) try: # pylint: disable=no-member with pysam.Tabixfile(self.summary_filename) as tbx: self.chromosomes = \ [self._adjust_chrom_prefix(chrom) for chrom in tbx.contigs] except Exception: # pylint: disable=broad-except self.chromosomes = self.genome.chromosomes @property def variants_filenames(self): return [self.summary_filename] @staticmethod def _build_toomany_filename(summary_filename): assert fs_utils.exists( f"{summary_filename}.tbi" ), f"summary filename tabix index missing {summary_filename}" dirname = os.path.dirname(summary_filename) basename = os.path.basename(summary_filename) if basename.endswith(".txt.bgz"): result = os.path.join(dirname, f"{basename[:-8]}-TOOMANY.txt.bgz") elif basename.endswith(".txt.gz"): result = os.path.join(dirname, f"{basename[:-7]}-TOOMANY.txt.gz") else: assert False, ( f"Bad summary filename {summary_filename}: " f"unexpected extention" ) assert fs_utils.exists(result), f"missing TOOMANY file {result}" assert fs_utils.exists( f"{result}.tbi" ), f"missing tabix index for TOOMANY file {result}" return result @staticmethod def _rename_columns(columns): if "#chr" in columns: columns[columns.index("#chr")] = "chrom" if "chr" in columns: columns[columns.index("chr")] = "chrom" if "position" in columns: columns[columns.index("position")] = "cshl_position" if "variant" in columns: columns[columns.index("variant")] = "cshl_variant" return columns @staticmethod def _load_column_names(filename): with fsspec.open(filename) as rawfile: with gzip.open(rawfile) as infile: column_names = ( infile.readline().decode("utf-8").strip().split("\t") ) return column_names @classmethod def _load_toomany_columns(cls, toomany_filename): toomany_columns = cls._load_column_names(toomany_filename) return cls._rename_columns(toomany_columns) @classmethod def _load_summary_columns(cls, summary_filename): summary_columns = cls._load_column_names(summary_filename) return cls._rename_columns(summary_columns) def _summary_variant_from_dae_record(self, summary_index, rec): rec["cshl_position"] = int(rec["cshl_position"]) position, reference, alternative = dae2vcf_variant( self._adjust_chrom_prefix(rec["chrom"]), rec["cshl_position"], rec["cshl_variant"], self.genome, ) rec["position"] = position rec["reference"] = reference rec["alternative"] = alternative rec["all.nParCalled"] = int(rec["all.nParCalled"]) rec["all.nAltAlls"] = int(rec["all.nAltAlls"]) rec["all.prcntParCalled"] = float(rec["all.prcntParCalled"]) rec["all.altFreq"] = float(rec["all.altFreq"]) rec["summary_variant_index"] = summary_index parents_called = int(rec.get("all.nParCalled", 0)) ref_allele_count = 2 * int(rec.get("all.nParCalled", 0)) - int( rec.get("all.nAltAlls", 0) ) ref_allele_prcnt = 0.0 if parents_called > 0: ref_allele_prcnt = ref_allele_count / 2.0 / parents_called ref = { "chrom": rec["chrom"], "position": rec["position"], "reference": rec["reference"], "alternative": None, "variant_type": None, "cshl_position": rec["cshl_position"], "cshl_variant": rec["cshl_variant"], "summary_variant_index": rec["summary_variant_index"], "allele_index": 0, "af_parents_called_count": parents_called, "af_parents_called_percent": float( rec.get("all.prcntParCalled", 0.0) ), "af_allele_count": ref_allele_count, "af_allele_freq": ref_allele_prcnt, "hw": rec.get("HW"), } alt = { "chrom": rec["chrom"], "position": rec["position"], "reference": rec["reference"], "alternative": rec["alternative"], "variant_type": allele_type_from_cshl_variant(rec["cshl_variant"]), "cshl_position": rec["cshl_position"], "cshl_variant": rec["cshl_variant"], "summary_variant_index": rec["summary_variant_index"], "allele_index": 1, "af_parents_called_count": int(rec.get("all.nParCalled", 0)), "af_parents_called_percent": float( rec.get("all.prcntParCalled", 0.0) ), "af_allele_count": int(rec.get("all.nAltAlls", 0)), "af_allele_freq": float(rec.get("all.altFreq", 0.0)), "hw": rec.get("HW"), } summary_variant = SummaryVariantFactory.summary_variant_from_records( [ref, alt], transmission_type=self.transmission_type ) return summary_variant @staticmethod def _explode_family_data(family_data): best_states = { fid: ( str2mat(bs, col_sep="", row_sep="/"), str2mat(rc, col_sep=" ", row_sep="/", dtype=np.int16) ) for (fid, bs, rc) in [ fg.split(":") for fg in family_data.split(";") ] } return best_states # @staticmethod # def _explode_family_read_counts(family_data): # read_counts = { # fid: str2mat(rc, col_sep=" ", row_sep="/") # for (fid, _bs, rc) in [ # fg.split(":") for fg in family_data.split(";") # ] # } # return read_counts
[docs] def close(self): pass
def _produce_family_variants(self, summary_variant, families_genotypes): family_variants = [] for (fam, best_state, read_counts) in \ families_genotypes.family_genotype_iterator(): fvariant = FamilyVariant( summary_variant, fam, None, best_state) fvariant.gt, fvariant._genetic_model = self._calc_genotype( fvariant, self.genome) for fallele in fvariant.alleles: fallele.gt = fvariant.gt # pylint: disable=protected-access fallele._genetic_model = fvariant._genetic_model fallele.update_attributes({"read_counts": read_counts}) family_variants.append(fvariant) return family_variants def _full_variants_iterator_impl(self): summary_columns = self._load_summary_columns(self.summary_filename) toomany_columns = self._load_toomany_columns(self.toomany_filename) summary_index = 0 for region in self.regions: try: # using a context manager because of # https://stackoverflow.com/a/25968716/2316754 # pylint: disable=no-member with closing(pysam.Tabixfile(self.summary_filename)) \ as sum_tbf, \ closing(pysam.Tabixfile(self.toomany_filename)) \ as too_tbf: region_unadjusted = self._unadjust_chrom_prefix(region) summary_iterator = sum_tbf.fetch( region=region_unadjusted, parser=pysam.asTuple() ) toomany_iterator = too_tbf.fetch( region=region_unadjusted, parser=pysam.asTuple() ) for summary_line in summary_iterator: rec = dict(zip(summary_columns, summary_line)) try: summary_variant = \ self._summary_variant_from_dae_record( summary_index, rec) family_data = rec["familyData"] if family_data == "TOOMANY": toomany_line = next(toomany_iterator, None) if toomany_line is None: return toomany_rec = dict(zip( toomany_columns, toomany_line)) family_data = toomany_rec["familyData"] assert rec["cshl_position"] == int( toomany_rec["cshl_position"] ) family_data = self._explode_family_data( family_data) families_genotypes = \ DaeTransmittedFamiliesGenotypes( self.families, family_data) family_variants = self._produce_family_variants( summary_variant, families_genotypes) yield summary_variant, family_variants summary_index += 1 except Exception: # pylint: disable=broad-except logger.error( "unable to process summary line: %s " "from %s: %s", summary_line, self.summary_filename, self.regions, exc_info=True) except ValueError: logger.warning( "could not find region %s in %s or %s", region, self.summary_filename, self.toomany_filename, exc_info=True) @classmethod def _arguments(cls) -> list[CLIArgument]: arguments = super()._arguments() arguments.append(CLIArgument( "dae_summary_file", value_type=str, metavar="<summary filename>", help_text="summary variants file to import", )) arguments.append(CLIArgument( "--dae-include-reference-genotypes", default_value=False, help_text="fill in reference only variants [default: %(default)s]", action="store_true", )) return arguments
[docs] @classmethod def parse_cli_arguments(cls, argv, use_defaults=False): filename = argv.dae_summary_file params = { "dae_include_reference_genotypes": str2bool( argv.dae_include_reference_genotypes ), "add_chrom_prefix": argv.add_chrom_prefix, "del_chrom_prefix": argv.del_chrom_prefix, } return filename, params