Source code for dae.annotation.annotate_columns
from __future__ import annotations
import argparse
import gzip
import logging
import os
import sys
from contextlib import closing
from typing import Any, Optional
from pysam import TabixFile, tabix_index
from dae.annotation.annotate_utils import (
AnnotationTool,
produce_partfile_paths,
produce_regions,
)
from dae.annotation.annotation_pipeline import ReannotationPipeline
from dae.annotation.context import CLIAnnotationContext
from dae.annotation.record_to_annotatable import (
RecordToCNVAllele,
RecordToPosition,
RecordToRegion,
RecordToVcfAllele,
add_record_to_annotable_arguments,
build_record_to_annotatable,
)
from dae.genomic_resources.cli import VerbosityConfiguration
from dae.genomic_resources.genomic_context import get_genomic_context
from dae.genomic_resources.reference_genome import (
ReferenceGenome,
build_reference_genome_from_resource,
)
from dae.task_graph import TaskGraphCli
from dae.utils.fs_utils import tabix_index_filename
logger = logging.getLogger("annotate_columns")
[docs]def read_input(
args: Any, region: tuple = tuple(),
) -> tuple[Any, Any, list[str]]:
"""Return a file object, line iterator and list of header columns.
Handles differences between tabixed and non-tabixed input files.
"""
if args.input.endswith(".gz"):
tabix_file = TabixFile(args.input)
with gzip.open(args.input, "rt") as in_file_raw:
header = in_file_raw.readline() \
.strip("\r\n") \
.split(args.input_separator)
return closing(tabix_file), tabix_file.fetch(*region), header
# pylint: disable=consider-using-with
text_file = open(args.input, "rt")
header = text_file.readline().strip("\r\n").split(args.input_separator)
return text_file, text_file, header
[docs]def produce_tabix_index(
filepath: str, args: Any, header: list[str],
ref_genome: Optional[ReferenceGenome],
) -> None:
"""Produce a tabix index file for the given variants file."""
record_to_annotatable = build_record_to_annotatable(
vars(args), set(header), ref_genome)
line_skip = 0 if header[0].startswith("#") else 1
seq_col = 0
start_col = 1
if isinstance(record_to_annotatable, (RecordToRegion,
RecordToCNVAllele)):
end_col = 2
elif isinstance(record_to_annotatable, (RecordToVcfAllele,
RecordToPosition)):
end_col = 1
else:
raise ValueError(
"Could not generate tabix index: record"
f" {type(record_to_annotatable)} is of unsupported type.")
tabix_index(filepath,
seq_col=seq_col,
start_col=start_col,
end_col=end_col,
line_skip=line_skip,
force=True)
[docs]def combine(args: Any, partfile_paths: list[str], out_file_path: str) -> None:
"""Combine annotated region parts into a single VCF file."""
CLIAnnotationContext.register(args)
context = get_genomic_context()
pipeline = CLIAnnotationContext.get_pipeline(context)
annotation_attributes = [
attr.name for attr in pipeline.get_attributes()
if not attr.internal
]
with gzip.open(args.input, "rt") as in_file_raw:
hcs = in_file_raw.readline().strip("\r\n").split(args.input_separator)
header = args.output_separator.join(hcs + annotation_attributes)
with open(out_file_path, "wt") as out_file:
out_file.write(header + "\n")
for partfile_path in partfile_paths:
with gzip.open(partfile_path, "rt") as partfile:
partfile.readline() # skip header
out_file.write(partfile.read())
for partfile_path in partfile_paths:
os.remove(partfile_path)
produce_tabix_index(
out_file_path, args, hcs, context.get_reference_genome())
[docs]class AnnotateColumnsTool(AnnotationTool):
"""Annotation tool for TSV-style text files."""
[docs] def get_argument_parser(self) -> argparse.ArgumentParser:
"""Configure argument parser."""
parser = argparse.ArgumentParser(
description="Annotate columns",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input", default="-", nargs="?",
help="the input column file")
parser.add_argument(
"pipeline", default="context", nargs="?",
help="The pipeline definition file. By default, or if "
"the value is gpf_instance, the annotation pipeline "
"from the configured gpf instance will be used.")
parser.add_argument(
"-r", "--region-size", default=300_000_000,
type=int, help="region size to parallelize by")
parser.add_argument(
"-w", "--work-dir",
help="Directory to store intermediate output files in",
default="annotate_columns_output")
parser.add_argument(
"-o", "--output",
help="Filename of the output result",
default=None)
parser.add_argument(
"-in_sep", "--input-separator", default="\t",
help="The column separator in the input")
parser.add_argument(
"-out_sep", "--output-separator", default="\t",
help="The column separator in the output")
parser.add_argument(
"--reannotate", default=None,
help="Old pipeline config to reannotate over")
CLIAnnotationContext.add_context_arguments(parser)
add_record_to_annotable_arguments(parser)
TaskGraphCli.add_arguments(parser)
VerbosityConfiguration.set_arguments(parser)
return parser
[docs] @staticmethod
def annotate(
args: argparse.Namespace,
pipeline_config: str,
grr_definition: Optional[dict],
ref_genome_id: Optional[str],
out_file_path: str,
region: tuple = tuple(),
compress_output: bool = False,
) -> None:
"""Annotate a variants file with a given pipeline configuration."""
# pylint: disable=too-many-locals,too-many-branches
# TODO Insisting on having the pipeline config passed in args
# prevents the finding of a default annotation config. Consider fixing
pipeline_config_old = None
if args.reannotate:
with open(args.reannotate, "r") as infile:
pipeline_config_old = infile.read()
pipeline = AnnotateColumnsTool._produce_annotation_pipeline(
pipeline_config, pipeline_config_old,
grr_definition, args.allow_repeated_attributes,
)
grr = pipeline.repository
ref_genome = None
if ref_genome_id:
res = grr.find_resource(ref_genome_id)
if res is not None:
ref_genome = build_reference_genome_from_resource(res).open()
errors = []
in_file, line_iterator, header_columns = read_input(args, region)
record_to_annotatable = build_record_to_annotatable(
vars(args), set(header_columns), ref_genome=ref_genome)
annotation_columns = [
attr.name for attr in pipeline.get_attributes()
if not attr.internal]
if compress_output:
out_file = gzip.open(out_file_path, "wt")
else:
out_file = open(out_file_path, "wt")
pipeline.open()
with pipeline, in_file, out_file:
if isinstance(pipeline, ReannotationPipeline):
old_annotation_columns = {
attr.name
for attr in pipeline.pipeline_old.get_attributes()
if not attr.internal
}
new_header = [
col for col in header_columns
if col not in old_annotation_columns
]
else:
new_header = list(header_columns)
new_header = new_header + annotation_columns
out_file.write(args.output_separator.join(new_header) + "\n")
for lnum, line in enumerate(line_iterator):
try:
columns = line.strip("\n\r").split(args.input_separator)
record = dict(zip(header_columns, columns))
if isinstance(pipeline, ReannotationPipeline):
for col in pipeline.attributes_deleted:
del record[col]
annotation = pipeline.annotate(
record_to_annotatable.build(record), record,
)
else:
annotation = pipeline.annotate(
record_to_annotatable.build(record),
)
for col in annotation_columns:
record[col] = annotation[col]
result = list(map(str, record.values()))
out_file.write(args.output_separator.join(result) + "\n")
except Exception as ex: # pylint: disable=broad-except
logger.warning(
"unexpected input data format at line %s: %s",
lnum, line, exc_info=True)
errors.append((lnum, line, str(ex)))
if len(errors) > 0:
logger.error("there were errors during the import")
for lnum, line, error in errors:
logger.error("line %s: %s", lnum, line)
logger.error("\t%s", error)
[docs] def work(self) -> None:
if self.args.output:
output = self.args.output
else:
input_name = self.args.input.rstrip(".gz")
if "." in input_name:
idx = input_name.find(".")
output = f"{input_name[:idx]}_annotated{input_name[idx:]}"
else:
output = f"{input_name}_annotated"
ref_genome = self.context.get_reference_genome()
ref_genome_id = ref_genome.resource_id \
if ref_genome is not None else None
with open(self.args.pipeline, "r") as infile:
raw_pipeline_config = infile.read()
if tabix_index_filename(self.args.input):
with closing(TabixFile(self.args.input)) as pysam_file:
regions = produce_regions(pysam_file, self.args.region_size)
file_paths = produce_partfile_paths(
self.args.input, regions, self.args.work_dir)
region_tasks = []
for index, (region, path) in enumerate(zip(regions, file_paths)):
region_tasks.append(self.task_graph.create_task(
f"part-{index}",
AnnotateColumnsTool.annotate,
[self.args, raw_pipeline_config,
self.grr.definition,
ref_genome_id, path, region, True],
[]))
self.task_graph.create_task(
"combine",
combine,
[self.args, file_paths, output],
region_tasks)
else:
self.task_graph.create_task(
"annotate_all",
AnnotateColumnsTool.annotate,
[self.args, raw_pipeline_config, self.grr.definition,
ref_genome_id, output,
tuple(), output.endswith(".gz")],
[])
[docs]def cli(raw_args: Optional[list[str]] = None) -> None:
tool = AnnotateColumnsTool(raw_args)
tool.run()
if __name__ == "__main__":
cli(sys.argv[1:])