Source code for dae.tools.vcfinfo_extractor

#!/usr/bin/env python

import argparse
import logging
import os
import sys
from contextlib import closing
from functools import partial

import pysam

from dae.gpf_instance.gpf_instance import GPFInstance
from dae.utils.regions import Region
from dae.variants.attributes import TransmissionType
from dae.variants.variant import SummaryVariantFactory


[docs]def info_get(info, column=None): v = info.get(column) if v is None: return "" return v
[docs]def info_get_allele(info, column=None): v = info.get(column) if v is None: return "" return v[0]
[docs]def info_get_allele_percent(info, column=None): v = info.get(column) if v is None: return "" return float(v[0]) * 100.0
[docs]def main(argv, gpf_instance=None): # flake8: noqa: C901 parser = argparse.ArgumentParser() parser.add_argument("--verbose", "-V", action="count", default=0) parser.add_argument( "vcf_filename", type=str, metavar="<VCF file>", help="VCF filename. ", ) parser.add_argument( "--columns", "-c", type=str, default=None, metavar="<C1,C2,C3...>", help="column separated list of INFO field values to be extracted", ) parser.add_argument( "--region", "-r", action="store", type=str, default=None) parser.add_argument( "--output", "-o", type=str, metavar="<output filename>", help="output filename") argv = parser.parse_args(argv) assert argv.output is not None if argv.verbose == 1: logging.basicConfig(level=logging.WARNING) elif argv.verbose == 2: logging.basicConfig(level=logging.INFO) elif argv.verbose >= 3: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.ERROR) if gpf_instance is None: gpf_instance = GPFInstance.build() assert argv.columns is not None info_columns = [c.strip() for c in argv.columns.split(",")] header = [ "#CHROM", "POS", "chrom", "position", "location", "variant", "REF", "ALT", "ID", ] header.extend(info_columns) region = None if argv.region: if ":" in argv.region: region = Region.from_str(argv.region) elif "-" in argv.region: parts = argv.region.split("-") if len(parts) == 3: region = Region.from_str(f"{parts[0]}:{parts[1]}-{parts[2]}") elif len(parts) == 2: region = Region.from_str(f"{parts[0]}:{parts[1]}") else: raise ValueError(f"unexpected region format {argv.region}") print(f"processing region: {region}") assert os.path.exists(argv.vcf_filename) with closing(pysam.VariantFile(argv.vcf_filename)) as vcf, \ open(argv.output, "wt") as output: header_info = vcf.header.info accessors = {} for column in info_columns: if column not in header_info and column.endswith("_percent"): base_column = column[:-8] assert base_column in header_info accessors[column] = partial( info_get_allele_percent, column=base_column) elif header_info[column].number == 1: accessors[column] = partial(info_get, column=column) elif header_info[column].number == "A": accessors[column] = partial(info_get_allele, column=column) else: raise ValueError("unexpected info number type") if region is None: vcf_iterator = vcf.fetch() else: vcf_iterator = vcf.fetch(region.chrom, region.begin, region.end) output.write("\t".join(header)) output.write("\n") for summary_index, vcf_variant in enumerate(vcf_iterator): assert len(vcf_variant.alts) == 1, vcf_variant if not region.isin(vcf_variant.chrom, vcf_variant.pos): continue sv = SummaryVariantFactory.summary_variant_from_vcf( vcf_variant, summary_index, transmission_type=TransmissionType.transmitted) sa = sv.alt_alleles[0] assert len(vcf_variant.alts) == 1 line = [ vcf_variant.chrom, vcf_variant.pos, sa.chromosome, sa.cshl_position, sa.cshl_location, sa.cshl_variant, vcf_variant.ref, vcf_variant.alts[0], vcf_variant.id, ] for info_id in info_columns: accessor = accessors[info_id] value = accessor(vcf_variant.info) line.append(value) outline = [] for v in line: if v is None: outline.append("") else: outline.append(str(v)) output.write("\t".join(outline)) output.write("\n") if summary_index % 10000 == 0: print(f"progress {argv.region}: {summary_index}")
if __name__ == "__main__": main(sys.argv[1:])