Source code for dae.genomic_resources.gene_models

# pylint: disable=too-many-lines
# FIXME: too-many-lines
from __future__ import annotations

import copy
import gzip
import json
import logging
import textwrap
from collections import defaultdict
from collections.abc import Generator  # TextIO
from contextlib import contextmanager
from functools import lru_cache
from typing import IO, Any, Optional, Protocol, cast

import pandas as pd
import yaml
from jinja2 import Template

from dae.genomic_resources import GenomicResource
from dae.genomic_resources.fsspec_protocol import build_local_resource
from dae.genomic_resources.resource_implementation import (
    GenomicResourceImplementation,
    InfoImplementationMixin,
    ResourceConfigValidationMixin,
    get_base_resource_schema,
)
from dae.task_graph.graph import Task, TaskGraph
from dae.utils.regions import BedRegion

logger = logging.getLogger(__name__)

# TODO IVAN: not all parsers handle the gene_mapping properly!


[docs]class Exon: """Provides exon model.""" def __init__( self, start: int, stop: int, frame: Optional[int] = None, number: Optional[int] = None, cds_start: Optional[int] = None, cds_stop: Optional[int] = None, ): self.start = start self.stop = stop self.frame = frame # related to cds # for GTF self.number = number # exon number self.cds_start = cds_start # self.cds_stop = cds_stop def __repr__(self) -> str: return ( f"Exon(start={self.start}; stop={self.stop}; " f"number={self.number})" )
[docs]class TranscriptModel: """Provides transcript model.""" # pylint: disable=too-many-instance-attributes,too-many-arguments def __init__( self, gene: str, tr_id: str, tr_name: str, chrom: str, strand: str, tx: tuple[int, int], # pylint: disable=invalid-name cds: tuple[int, int], exons: Optional[list[Exon]] = None, attributes: Optional[dict[str, Any]] = None, ): self.gene = gene self.tr_id = tr_id self.tr_name = tr_name self.chrom = chrom self.strand = strand self.tx = tx # pylint: disable=invalid-name self.cds = cds self.exons: list[Exon] = exons if exons is not None else [] # for GTF self.utrs: list[tuple[int, int, int]] = [] self.start_codon: tuple[int, int, int] self.stop_codon: tuple[int, int, int] # # it can be derivable from cds' start and end # self._is_coding = is_coding self.attributes = attributes if attributes is not None else {}
[docs] def is_coding(self) -> bool: if self.cds[0] >= self.cds[1]: return False return True
[docs] def cds_regions(self, ss_extend: int = 0) -> list[BedRegion]: """Compute CDS regions.""" if self.cds[0] >= self.cds[1]: return [] regions = [] k = 0 while self.exons[k].stop < self.cds[0]: k += 1 if self.cds[1] <= self.exons[k].stop: regions.append( BedRegion( chrom=self.chrom, start=self.cds[0], stop=self.cds[1]), ) return regions regions.append( BedRegion( chrom=self.chrom, start=self.cds[0], stop=self.exons[k].stop + ss_extend, ), ) k += 1 while k < len(self.exons) and self.exons[k].stop <= self.cds[1]: if self.exons[k].stop < self.cds[1]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start - ss_extend, stop=self.exons[k].stop + ss_extend, ), ) k += 1 else: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start - ss_extend, stop=self.exons[k].stop, ), ) return regions if k < len(self.exons) and self.exons[k].start <= self.cds[1]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start - ss_extend, stop=self.cds[1], ), ) return regions
[docs] def utr5_regions(self) -> list[BedRegion]: """Build list of UTR5 regions.""" if self.cds[0] >= self.cds[1]: return [] regions = [] k = 0 if self.strand == "+": while self.exons[k].stop < self.cds[0]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start, stop=self.exons[k].stop, ), ) k += 1 if self.exons[k].start < self.cds[0]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start, stop=self.cds[0] - 1, ), ) else: while self.exons[k].stop < self.cds[1]: k += 1 if self.exons[k].stop == self.cds[1]: k += 1 else: regions.append( BedRegion( chrom=self.chrom, start=self.cds[1] + 1, stop=self.exons[k].stop, ), ) k += 1 for exon in self.exons[k:]: regions.append( BedRegion(chrom=self.chrom, start=exon.start, stop=exon.stop), ) return regions
[docs] def utr3_regions(self) -> list[BedRegion]: """Build and return list of UTR3 regions.""" if self.cds[0] >= self.cds[1]: return [] regions = [] k = 0 if self.strand == "-": while self.exons[k].stop < self.cds[0]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start, stop=self.exons[k].stop, ), ) k += 1 if self.exons[k].start < self.cds[0]: regions.append( BedRegion( chrom=self.chrom, start=self.exons[k].start, stop=self.cds[0] - 1, ), ) else: while self.exons[k].stop < self.cds[1]: k += 1 if self.exons[k].stop == self.cds[1]: k += 1 else: regions.append( BedRegion( chrom=self.chrom, start=self.cds[1] + 1, stop=self.exons[k].stop, ), ) k += 1 for exon in self.exons[k:]: regions.append( BedRegion(chrom=self.chrom, start=exon.start, stop=exon.stop), ) return regions
[docs] def all_regions( self, ss_extend: int = 0, prom: int = 0, ) -> list[BedRegion]: """Build and return list of regions.""" # pylint:disable=too-many-branches regions = [] if ss_extend == 0: for exon in self.exons: regions.append( BedRegion(chrom=self.chrom, start=exon.start, stop=exon.stop), ) else: for exon in self.exons: if exon.stop <= self.cds[0]: regions.append( BedRegion( chrom=self.chrom, start=exon.start, stop=exon.stop), ) elif exon.start <= self.cds[0]: if exon.stop >= self.cds[1]: regions.append( BedRegion( chrom=self.chrom, start=exon.start, stop=exon.stop), ) else: regions.append( BedRegion( chrom=self.chrom, start=exon.start, stop=exon.stop + ss_extend, ), ) elif exon.start > self.cds[1]: regions.append( BedRegion( chrom=self.chrom, start=exon.start, stop=exon.stop), ) else: if exon.stop >= self.cds[1]: regions.append( BedRegion( chrom=self.chrom, start=exon.start - ss_extend, stop=exon.stop, ), ) else: regions.append( BedRegion( chrom=self.chrom, start=exon.start - ss_extend, stop=exon.stop + ss_extend, ), ) if prom != 0: if self.strand == "+": regions[0] = BedRegion( chrom=regions[0].chrom, start=regions[0].start - prom, stop=regions[0].stop, ) else: regions[-1] = BedRegion( chrom=regions[-1].chrom, start=regions[-1].start, stop=regions[-1].stop + prom, ) return regions
[docs] def total_len(self) -> int: length = 0 for reg in self.exons: length += reg.stop - reg.start + 1 return length
[docs] def cds_len(self) -> int: regions = self.cds_regions() length = 0 for reg in regions: length += reg.stop - reg.start + 1 return length
[docs] def utr3_len(self) -> int: utr3 = self.utr3_regions() length = 0 for reg in utr3: length += reg.stop - reg.start + 1 return length
[docs] def utr5_len(self) -> int: utr5 = self.utr5_regions() length = 0 for reg in utr5: length += reg.stop - reg.start + 1 return length
[docs] def calc_frames(self) -> list[int]: """Calculate codon frames.""" length = len(self.exons) fms = [] if self.cds[0] > self.cds[1]: fms = [-1] * length elif self.strand == "+": k = 0 while self.exons[k].stop < self.cds[0]: fms.append(-1) k += 1 fms.append(0) if self.exons[k].stop < self.cds[1]: fms.append((self.exons[k].stop - self.cds[0] + 1) % 3) k += 1 while self.exons[k].stop < self.cds[1] and k < length: fms.append( (fms[k] + self.exons[k].stop - self.exons[k].start + 1) % 3, ) k += 1 fms += [-1] * (length - len(fms)) else: k = length - 1 while self.exons[k].start > self.cds[1]: fms.append(-1) k -= 1 fms.append(0) if self.cds[0] < self.exons[k].start: fms.append((self.cds[1] - self.exons[k].start + 1) % 3) k -= 1 while self.cds[0] < self.exons[k].start and k > -1: fms.append( (fms[-1] + self.exons[k].stop - self.exons[k].start + 1) % 3, ) k -= 1 fms += [-1] * (length - len(fms)) fms = fms[::-1] assert len(self.exons) == len(fms) return fms
[docs] def update_frames(self) -> None: """Update codon frames.""" frames = self.calc_frames() for exon, frame in zip(self.exons, frames): exon.frame = frame
[docs] def test_frames(self) -> bool: frames = self.calc_frames() for exon, frame in zip(self.exons, frames): if exon.frame != frame: return False return True
@contextmanager def _open_file(filename: str) -> Generator[IO, None, None]: if filename.endswith(".gz") or filename.endswith(".bgz"): infile = gzip.open(filename, "rt") else: infile = open(filename) try: yield infile finally: infile.close()
[docs]class GeneModelsParser(Protocol): """Gene models parser function type.""" def __call__( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: ...
[docs]class GeneModels( GenomicResourceImplementation, ResourceConfigValidationMixin, InfoImplementationMixin, ): """Provides class for gene models.""" def __init__(self, resource: GenomicResource): super().__init__(resource) self.config = self.validate_and_normalize_schema( self.config, resource, ) self.gene_models: dict[str, list[TranscriptModel]] = defaultdict(list) self.utr_models: dict[ str, dict[tuple[int, int], list[TranscriptModel]]] = \ defaultdict(lambda: defaultdict(list)) self.transcript_models: dict[str, Any] = {} self.alternative_names: dict[str, Any] = {} self._reset() def _reset(self) -> None: self.alternative_names = {} self.utr_models = defaultdict(lambda: defaultdict(list)) self.transcript_models = {} self.gene_models = defaultdict(list) @property def resource_id(self) -> str: return self.resource.resource_id @property def files(self) -> set[str]: res = set() res.add(self.resource.get_config()["filename"]) gene_mapping_filename = self.resource.get_config().get("gene_mapping") if gene_mapping_filename is not None: res.add(gene_mapping_filename) return res def _add_transcript_model(self, transcript_model: TranscriptModel) -> None: assert transcript_model.tr_id not in self.transcript_models self.transcript_models[transcript_model.tr_id] = transcript_model self.gene_models[transcript_model.gene].append(transcript_model) self.utr_models[transcript_model.chrom][transcript_model.tx]\ .append(transcript_model)
[docs] def update_indexes(self) -> None: self.gene_models = defaultdict(list) self.utr_models = defaultdict(lambda: defaultdict(list)) for transcript in self.transcript_models.values(): self.gene_models[transcript.gene].append(transcript) self.utr_models[transcript.chrom][transcript.tx].append(transcript)
[docs] def gene_names(self) -> list[str]: if self.gene_models is None: logger.warning( "gene models %s are empty", self.resource.resource_id) return [] return list(self.gene_models.keys())
[docs] def gene_models_by_gene_name( self, name: str, ) -> Optional[list[TranscriptModel]]: return self.gene_models.get(name, None)
[docs] def gene_models_by_location( self, chrom: str, pos1: int, pos2: Optional[int] = None, ) -> list[TranscriptModel]: """Retrieve TranscriptModel objects based on genomic position(s). Args: chrom (str): The chromosome name. pos1 (int): The starting genomic position. pos2 (Optional[int]): The ending genomic position. If not provided, only models that contain pos1 will be returned. Returns: list[TranscriptModel]: A list of TranscriptModel objects that match the given location criteria. """ result = [] if pos2 is None: key: tuple[int, int] for key in self.utr_models[chrom]: if key[0] <= pos1 <= key[1]: result.extend(self.utr_models[chrom][key]) else: if pos2 < pos1: pos1, pos2 = pos2, pos1 for key in self.utr_models[chrom]: if pos1 <= key[0] <= pos2 or key[0] <= pos1 <= key[1]: result.extend(self.utr_models[chrom][key]) return result
[docs] def relabel_chromosomes( self, relabel: Optional[dict[str, str]] = None, map_file: Optional[str] = None, ) -> None: """Relabel chromosomes in gene model.""" assert relabel or map_file if not relabel: assert map_file is not None with open(map_file) as infile: relabel = cast( dict[str, str], { line.strip("\n\r").split()[:2] for line in infile }, ) self.utr_models = { relabel[chrom]: v for chrom, v in self.utr_models.items() if chrom in relabel } self.transcript_models = { tid: tm for tid, tm in self.transcript_models.items() if tm.chrom in relabel } for transcript_model in self.transcript_models.values(): transcript_model.chrom = relabel[transcript_model.chrom]
def _save_gene_models(self, outfile: IO) -> None: outfile.write( "\t".join( [ "chr", "trID", "trOrigId", "gene", "strand", "tsBeg", "txEnd", "cdsStart", "cdsEnd", "exonStarts", "exonEnds", "exonFrames", "atts", ], ), ) outfile.write("\n") for transcript_model in self.transcript_models.values(): exon_starts = ",".join([ str(e.start) for e in transcript_model.exons]) exon_ends = ",".join([ str(e.stop) for e in transcript_model.exons]) exon_frames = ",".join([ str(e.frame) for e in transcript_model.exons]) add_atts = ";".join( [ k + ":" + str(v).replace(":", "_") for k, v in list(transcript_model.attributes.items()) ], ) columns = [ transcript_model.chrom, transcript_model.tr_id, transcript_model.tr_name, transcript_model.gene, transcript_model.strand, transcript_model.tx[0], transcript_model.tx[1], transcript_model.cds[0], transcript_model.cds[1], exon_starts, exon_ends, exon_frames, add_atts, ] outfile.write("\t".join([str(x) if x else "" for x in columns])) outfile.write("\n")
[docs] def save(self, output_filename: str, gzipped: bool = True) -> None: """Save gene models in a file in default file format.""" if gzipped: if not output_filename.endswith(".gz"): output_filename = f"{output_filename}.gz" with gzip.open(output_filename, "wt") as outfile: self._save_gene_models(outfile) else: with open(output_filename, "wt") as outfile: self._save_gene_models(outfile)
def _parse_default_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # pylint: disable=too-many-locals # FIXME: too-many-locals infile.seek(0) df = pd.read_csv( infile, sep="\t", nrows=nrows, dtype={ "chr": str, "trID": str, "trOrigId": str, "gene": str, "strand": str, "atts": str, }, ) expected_columns = [ "chr", "trID", "gene", "strand", "tsBeg", "txEnd", "cdsStart", "cdsEnd", "exonStarts", "exonEnds", "exonFrames", "atts", ] assert set(expected_columns) <= set(df.columns) if not set(expected_columns) <= set(df.columns): return False if "trOrigId" not in df.columns: tr_names = pd.Series(data=df["trID"].values) df["trOrigId"] = tr_names records = df.to_dict(orient="records") for line in records: line = cast(dict, line) exon_starts = list(map(int, line["exonStarts"].split(","))) exon_ends = list(map(int, line["exonEnds"].split(","))) exon_frames = list(map(int, line["exonFrames"].split(","))) assert len(exon_starts) == len(exon_ends) == len(exon_frames) exons = [] for start, end, frame in zip(exon_starts, exon_ends, exon_frames): exons.append(Exon(start=start, stop=end, frame=frame)) attributes: dict = {} atts = line.get("atts") if atts and isinstance(atts, str): astep = [a.split(":") for a in atts.split(";")] attributes = { a[0]: a[1] for a in astep } gene = line["gene"] if gene_mapping is not None: gene = gene_mapping.get(gene, gene) transcript_model = TranscriptModel( gene=gene, tr_id=line["trID"], tr_name=line["trOrigId"], chrom=line["chr"], strand=line["strand"], tx=(line["tsBeg"], line["txEnd"]), cds=(line["cdsStart"], line["cdsEnd"]), exons=exons, attributes=attributes, ) self.transcript_models[transcript_model.tr_id] = transcript_model self.update_indexes() if nrows is not None: return True return True def _parse_ref_flat_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # pylint: disable=too-many-locals # FIXME: expected_columns = [ "#geneName", "name", "chrom", "strand", "txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount", "exonStarts", "exonEnds", ] infile.seek(0) df = self._parse_raw(infile, expected_columns, nrows=nrows) if df is None: return False records = df.to_dict(orient="records") transcript_ids_counter: dict[str, int] = defaultdict(int) for rec in records: gene = rec["#geneName"] if gene_mapping: gene = gene_mapping.get(gene, gene) tr_name = rec["name"] chrom = rec["chrom"] strand = rec["strand"] tx = ( # pylint: disable=invalid-name int(rec["txStart"]) + 1, int(rec["txEnd"])) cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"])) exon_starts = list(map( int, str(rec["exonStarts"]).strip(",").split(","))) exon_ends = list(map( int, str(rec["exonEnds"]).strip(",").split(","))) assert len(exon_starts) == len(exon_ends) exons = [ Exon(start + 1, end) for start, end in zip(exon_starts, exon_ends) ] transcript_ids_counter[tr_name] += 1 tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}" transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_name, chrom=chrom, strand=strand, tx=tx, cds=cds, exons=exons, ) transcript_model.update_frames() self._add_transcript_model(transcript_model) return True def _parse_ref_seq_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # FIXME: # pylint: disable=too-many-locals expected_columns = [ "#bin", "name", "chrom", "strand", "txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount", "exonStarts", "exonEnds", "score", "name2", "cdsStartStat", "cdsEndStat", "exonFrames", ] infile.seek(0) df = self._parse_raw(infile, expected_columns, nrows=nrows) if df is None: return False records = df.to_dict(orient="records") transcript_ids_counter: dict[str, int] = defaultdict(int) for rec in records: gene = rec["name2"] if gene_mapping: gene = gene_mapping.get(gene, gene) tr_name = rec["name"] chrom = rec["chrom"] strand = rec["strand"] tx = ( # pylint: disable=invalid-name int(rec["txStart"]) + 1, int(rec["txEnd"])) cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"])) exon_starts = list(map( int, rec["exonStarts"].strip(",").split(","))) exon_ends = list(map( int, rec["exonEnds"].strip(",").split(","))) assert len(exon_starts) == len(exon_ends) exons = [ Exon(start + 1, end) for start, end in zip(exon_starts, exon_ends) ] transcript_ids_counter[tr_name] += 1 tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}" attributes = { k: rec[k] for k in [ "#bin", "score", "exonCount", "cdsStartStat", "cdsEndStat", "exonFrames", ] } transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_name, chrom=chrom, strand=strand, tx=tx, cds=cds, exons=exons, attributes=attributes, ) transcript_model.update_frames() self._add_transcript_model(transcript_model) return True @classmethod def _probe_header( cls, infile: IO, expected_columns: list[str], comment: Optional[str] = None, ) -> bool: infile.seek(0) df = pd.read_csv( infile, sep="\t", nrows=1, header=None, comment=comment) return list(df.iloc[0, :]) == expected_columns @classmethod def _probe_columns( cls, infile: IO, expected_columns: list[str], comment: Optional[str] = None) -> bool: infile.seek(0) df = pd.read_csv( infile, sep="\t", nrows=1, header=None, comment=comment) return cast(list[int], list(df.columns)) == \ list(range(len(expected_columns))) @classmethod def _parse_raw( cls, infile: IO, expected_columns: list[str], nrows: Optional[int] = None, comment: Optional[str] = None, ) -> Optional[pd.DataFrame]: if cls._probe_header(infile, expected_columns, comment=comment): infile.seek(0) df = pd.read_csv(infile, sep="\t", nrows=nrows, comment=comment) assert list(df.columns) == expected_columns return df if cls._probe_columns(infile, expected_columns, comment=comment): infile.seek(0) df = pd.read_csv( infile, sep="\t", nrows=nrows, header=None, names=expected_columns, comment=comment, ) assert list(df.columns) == expected_columns return df return None def _parse_ccds_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # FIXME: # pylint: disable=too-many-locals expected_columns = [ # CCDS is identical with RefSeq "#bin", "name", "chrom", "strand", "txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount", "exonStarts", "exonEnds", "score", "name2", "cdsStartStat", "cdsEndStat", "exonFrames", ] infile.seek(0) df = self._parse_raw(infile, expected_columns, nrows=nrows) if df is None: return False records = df.to_dict(orient="records") transcript_ids_counter: dict[str, int] = defaultdict(int) self.alternative_names = {} if gene_mapping is not None: self.alternative_names = copy.deepcopy(gene_mapping) for rec in records: gene = rec["name"] gene = self.alternative_names.get(gene, gene) tr_name = rec["name"] chrom = rec["chrom"] strand = rec["strand"] tx = ( # pylint: disable=invalid-name int(rec["txStart"]) + 1, int(rec["txEnd"])) cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"])) exon_starts = list(map( int, rec["exonStarts"].strip(",").split(","))) exon_ends = list(map( int, rec["exonEnds"].strip(",").split(","))) assert len(exon_starts) == len(exon_ends) exons = [ Exon(start + 1, end) for start, end in zip(exon_starts, exon_ends) ] transcript_ids_counter[tr_name] += 1 tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}" attributes = { k: rec[k] for k in [ "#bin", "score", "exonCount", "cdsStartStat", "cdsEndStat", "exonFrames", ] } transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_name, chrom=chrom, strand=strand, tx=tx, cds=cds, exons=exons, attributes=attributes, ) transcript_model.update_frames() self._add_transcript_model(transcript_model) return True def _parse_known_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # FIXME: # pylint: disable=too-many-locals expected_columns = [ "name", "chrom", "strand", "txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount", "exonStarts", "exonEnds", "proteinID", "alignID", ] infile.seek(0) df = self._parse_raw(infile, expected_columns, nrows=nrows) if df is None: return False records = df.to_dict(orient="records") transcript_ids_counter: dict[str, int] = defaultdict(int) self.alternative_names = {} if gene_mapping is not None: self.alternative_names = copy.deepcopy(gene_mapping) for rec in records: gene = rec["name"] gene = self.alternative_names.get(gene, gene) tr_name = rec["name"] chrom = rec["chrom"] strand = rec["strand"] tx = ( # pylint: disable=invalid-name int(rec["txStart"]) + 1, int(rec["txEnd"])) cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"])) exon_starts = list(map( int, rec["exonStarts"].strip(",").split(","))) exon_ends = list(map( int, rec["exonEnds"].strip(",").split(","))) assert len(exon_starts) == len(exon_ends) exons = [ Exon(start + 1, end) for start, end in zip(exon_starts, exon_ends) ] transcript_ids_counter[tr_name] += 1 tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}" attributes = {k: rec[k] for k in ["proteinID", "alignID"]} transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_name, chrom=chrom, strand=strand, tx=tx, cds=cds, exons=exons, attributes=attributes, ) transcript_model.update_frames() self._add_transcript_model(transcript_model) return True def _parse_ucscgenepred_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: """Parse UCSC gene prediction models file fomrat. table genePred "A gene prediction." ( string name; "Name of gene" string chrom; "Chromosome name" char[1] strand; "+ or - for strand" uint txStart; "Transcription start position" uint txEnd; "Transcription end position" uint cdsStart; "Coding region start" uint cdsEnd; "Coding region end" uint exonCount; "Number of exons" uint[exonCount] exonStarts; "Exon start positions" uint[exonCount] exonEnds; "Exon end positions" ) table genePredExt "A gene prediction with some additional info." ( string name; "Name of gene (usually transcript_id from GTF)" string chrom; "Chromosome name" char[1] strand; "+ or - for strand" uint txStart; "Transcription start position" uint txEnd; "Transcription end position" uint cdsStart; "Coding region start" uint cdsEnd; "Coding region end" uint exonCount; "Number of exons" uint[exonCount] exonStarts; "Exon start positions" uint[exonCount] exonEnds; "Exon end positions" int score; "Score" string name2; "Alternate name (e.g. gene_id from GTF)" string cdsStartStat; "Status of CDS start annotation (none, unknown, incomplete, or complete)" string cdsEndStat; "Status of CDS end annotation (none, unknown, incomplete, or complete)" lstring exonFrames; "Exon frame offsets {0,1,2}" ) """ # FIXME: # pylint: disable=too-many-locals expected_columns = [ "name", "chrom", "strand", "txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount", "exonStarts", "exonEnds", "score", "name2", "cdsStartStat", "cdsEndStat", "exonFrames", ] infile.seek(0) df = self._parse_raw(infile, expected_columns[:10], nrows=nrows) if df is None: infile.seek(0) df = self._parse_raw(infile, expected_columns, nrows=nrows) if df is None: return False records = df.to_dict(orient="records") transcript_ids_counter: dict[str, int] = defaultdict(int) self.alternative_names = {} if gene_mapping is not None: self.alternative_names = copy.deepcopy(gene_mapping) for rec in records: gene = rec.get("name2") if not gene: gene = rec["name"] gene = self.alternative_names.get(gene, gene) tr_name = rec["name"] chrom = rec["chrom"] strand = rec["strand"] tx = ( # pylint: disable=invalid-name int(rec["txStart"]) + 1, int(rec["txEnd"])) cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"])) exon_starts = list(map( int, rec["exonStarts"].strip(",").split(","))) exon_ends = list(map( int, rec["exonEnds"].strip(",").split(","))) assert len(exon_starts) == len(exon_ends) exons = [ Exon(start + 1, end) for start, end in zip(exon_starts, exon_ends) ] transcript_ids_counter[tr_name] += 1 tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}" attributes = {} for attr in expected_columns[10:]: if attr in rec: attributes[attr] = rec.get(attr) transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_name, chrom=chrom, strand=strand, tx=tx, cds=cds, exons=exons, attributes=attributes, ) transcript_model.update_frames() self._add_transcript_model(transcript_model) return True def _parse_gtf_gene_models_format( self, infile: IO, gene_mapping: Optional[dict[str, str]] = None, nrows: Optional[int] = None, ) -> bool: # FIXME: # flake8=noqa # pylint: disable=too-many-locals,too-many-branches,too-many-statements expected_columns = [ "seqname", "source", "feature", "start", "end", "score", "strand", "phase", "attributes", # "comments", ] infile.seek(0) df = self._parse_raw( infile, expected_columns, nrows=nrows, comment="#") if df is None: expected_columns.append("comment") infile.seek(0) df = self._parse_raw( infile, expected_columns, nrows=nrows, comment="#") if df is None: return False def parse_gtf_attributes(data: str) -> dict[str, str]: attributes = list( filter(lambda x: x, [a.strip() for a in data.split(";")]), ) result = {} for attr in attributes: key, value = attr.split(" ") result[key.strip()] = value.strip('"').strip() return result records = df.to_dict(orient="records") for rec in records: feature = rec["feature"] if feature == "gene": continue attributes = parse_gtf_attributes(rec["attributes"]) tr_id = attributes["transcript_id"] if feature in set(["transcript", "Selenocysteine"]): if feature == "Selenocysteine" and \ tr_id in self.transcript_models: continue if tr_id in self.transcript_models: raise ValueError( f"{tr_id} of {feature} already in transcript models", ) gene = attributes["gene_name"] if gene_mapping: gene = gene_mapping.get(gene, gene) transcript_model = TranscriptModel( gene=gene, tr_id=tr_id, tr_name=tr_id, chrom=rec["seqname"], strand=rec["strand"], tx=(rec["start"], rec["end"]), cds=(rec["end"], rec["start"]), attributes=attributes, ) self._add_transcript_model(transcript_model) continue if feature in {"CDS", "exon"}: if tr_id not in self.transcript_models: raise ValueError( f"exon or CDS transcript {tr_id} not found " f"in transctipt models", ) exon_number = int(attributes["exon_number"]) transcript_model = self.transcript_models[tr_id] if len(transcript_model.exons) < exon_number: transcript_model.exons.append(Exon(-1, -1)) assert len(transcript_model.exons) >= exon_number exon = transcript_model.exons[exon_number - 1] if feature == "exon": exon.start = rec["start"] exon.stop = rec["end"] exon.frame = -1 exon.number = exon_number continue if feature == "CDS": exon.cds_start = rec["start"] exon.cds_stop = rec["end"] exon.frame = rec["phase"] # pylint: disable=protected-access # transcript_model._is_coding = True continue if feature in {"UTR", "5UTR", "3UTR", "start_codon", "stop_codon"}: exon_number = int(attributes["exon_number"]) transcript_model = self.transcript_models[tr_id] if feature in {"UTR", "5UTR", "3UTR"}: transcript_model.utrs.append( (rec["start"], rec["end"], exon_number)) continue if feature == "start_codon": transcript_model.start_codon = \ (rec["start"], rec["end"], exon_number) if feature == "stop_codon": transcript_model.stop_codon = \ (rec["start"], rec["end"], exon_number) cds = transcript_model.cds transcript_model.cds = \ (min(cds[0], rec["start"]), max(cds[1], rec["end"])) continue raise ValueError( f"unknown feature {feature} found in gtf gene models") for transcript_model in self.transcript_models.values(): transcript_model.exons = sorted( transcript_model.exons, key=lambda x: x.start) transcript_model.utrs = sorted( transcript_model.utrs, key=lambda x: x[0]) transcript_model.update_frames() return True @classmethod def _gene_mapping(cls, infile: IO) -> dict[str, str]: """Load alternative names for genes. Assume that its first line has two column names """ df = pd.read_csv(infile, sep="\t") assert len(df.columns) == 2 df = df.rename(columns={df.columns[0]: "tr_id", df.columns[1]: "gene"}) records = df.to_dict(orient="records") alt_names = {} for rec in records: rec = cast(dict, rec) alt_names[rec["tr_id"]] = rec["gene"] return alt_names SUPPORTED_GENE_MODELS_FILE_FORMATS = { "default", "refflat", "refseq", "ccds", "knowngene", "gtf", "ucscgenepred", } def _get_parser( self, fileformat: str, ) -> Optional[GeneModelsParser]: # pylint: disable=too-many-return-statements if fileformat == "default": return self._parse_default_gene_models_format if fileformat == "refflat": return self._parse_ref_flat_gene_models_format if fileformat == "refseq": return self._parse_ref_seq_gene_models_format if fileformat == "ccds": return self._parse_ccds_gene_models_format if fileformat == "knowngene": return self._parse_known_gene_models_format if fileformat == "gtf": return self._parse_gtf_gene_models_format if fileformat == "ucscgenepred": return self._parse_ucscgenepred_models_format return None def _infer_gene_model_parser( self, infile: IO, file_format: Optional[str] = None) -> Optional[str]: if file_format is not None: parser = self._get_parser(file_format) if parser is not None: return file_format logger.info("going to infer gene models file format...") inferred_formats = [] for inferred_format in self.SUPPORTED_GENE_MODELS_FILE_FORMATS: parser = self._get_parser(inferred_format) if parser is None: continue try: logger.debug("trying file format: %s...", inferred_format) self._reset() infile.seek(0) res = parser(infile, nrows=50) if res: inferred_formats.append(inferred_format) logger.debug( "gene models format %s matches input", inferred_format) except Exception as ex: # pylint: disable=broad-except logger.debug( "file format %s does not match; %s", inferred_format, ex, exc_info=True) logger.info("inferred file formats: %s", inferred_formats) if len(inferred_formats) == 1: return inferred_formats[0] logger.error( "can't find gene model parser; " "inferred file formats are %s", inferred_formats) return None
[docs] def is_loaded(self) -> bool: return len(self.transcript_models) > 0
[docs] def load(self) -> GeneModels: """Load gene models.""" if self.is_loaded(): logger.info( "loading already loaded gene models: %s", self.resource.resource_id) return self filename = self.resource.get_config()["filename"] fileformat = self.resource.get_config().get("format", None) gene_mapping_filename = self.resource.get_config().get( "gene_mapping", None) logger.debug("loading gene models %s (%s)", filename, fileformat) compression = False if filename.endswith(".gz"): compression = True with self.resource.open_raw_file( filename, mode="rt", compression=compression) as infile: if fileformat is None: fileformat = self._infer_gene_model_parser(infile) logger.info("infering gene models file format: %s", fileformat) if fileformat is None: logger.error( "can't infer gene models file format for " "%s...", self.resource.resource_id) raise ValueError("can't infer gene models file format") parser = self._get_parser(fileformat) if parser is None: logger.error( "Unsupported file format %s for " "gene model file %s.", fileformat, self.resource.resource_id) raise ValueError gene_mapping = None if gene_mapping_filename is not None: compression = False if gene_mapping_filename.endswith(".gz"): compression = True with self.resource.open_raw_file( gene_mapping_filename, "rt", compression=compression) as gene_mapping_file: logger.debug( "loading gene mapping from %s", gene_mapping_filename) gene_mapping = self._gene_mapping(gene_mapping_file) infile.seek(0) self._reset() parser(infile, gene_mapping=gene_mapping) return self
[docs] def get_template(self) -> Template: return Template(textwrap.dedent(""" {% extends base %} {% block content %} <h1>Configuration</h1> Gene models file: <p><a href="{{ data.config.filename }}"> {{ data.config.filename }} </a></p> <p>Format: {{ data.config.format }}</p> <h1>Statistics</h1> <table> {% for stat, value in data.stats.items() %} <tr><td> {{ stat }} </td><td> {{ value }} </td></tr> {% endfor %} </table> {% endblock %} """))
def _get_template_data(self) -> dict[str, Any]: return {"config": self.config, "stats": self.get_statistics()}
[docs] @staticmethod def get_schema() -> dict[str, Any]: return { **get_base_resource_schema(), "filename": {"type": "string"}, "format": {"type": "string"}, "gene_mapping": {"type": "string"}, }
[docs] def get_info(self) -> str: return InfoImplementationMixin.get_info(self)
[docs] def calc_info_hash(self) -> bytes: return b"placeholder"
[docs] def calc_statistics_hash(self) -> bytes: manifest = self.resource.get_manifest() return json.dumps({ "config": { "format": self.config.get("format"), }, "files_md5": { file_name: manifest[file_name].md5 for file_name in sorted(self.files)}, }, indent=2).encode()
[docs] def add_statistics_build_tasks( self, task_graph: TaskGraph, **kwargs: Any, ) -> list[Task]: task = task_graph.create_task( f"{self.resource_id}_cals_stats", GeneModels._do_statistics, [self.resource], []) return [task]
@staticmethod def _do_statistics(resource: GenomicResource) -> dict[str, int]: gene_models = build_gene_models_from_resource(resource).load() coding_transcripts = [tm for tm in gene_models.transcript_models.values() if tm.is_coding()] stats = {"transcript number": len(gene_models.transcript_models), "protein coding transcript number": len(coding_transcripts), "gene number": len(gene_models.gene_names()), "protein coding gene number": len({tm.gene for tm in coding_transcripts}), } tm_by_chrom: dict[str, list[TranscriptModel]] = defaultdict(list) for trm in gene_models.transcript_models.values(): tm_by_chrom[trm.chrom].append(trm) stats["chromosome number"] = len(tm_by_chrom) for chrom, tms in tm_by_chrom.items(): stats[f"{chrom} transcript numbers"] = len(tms) with resource.proto.open_raw_file( resource, "statistics/stats.yaml", "wt") as stats_file: yaml.dump(stats, stats_file, sort_keys=False) return stats
[docs] @lru_cache(maxsize=64) def get_statistics(self) -> Optional[dict[str, int]]: # type: ignore try: with self.resource.proto.open_raw_file( self.resource, "statistics/stats.yaml", "rt") as stats_file: stats = yaml.safe_load(stats_file) if not isinstance(stats, dict): logger.error( "The stats.yaml file for the " "%s is invalid (1).", self.resource) return None for stat, value in stats.items(): if not isinstance(stat, str): logger.error( "The stats.yaml file for the " "%s is invalid (2.{stat}).", self.resource) return None if not isinstance(value, int): logger.error( "The stats.yaml file for the " "%s is invalid (3. %s).", self.resource, value) return None return stats except FileExistsError: return None
[docs]def join_gene_models(*gene_models: GeneModels) -> GeneModels: """Join muliple gene models into a single gene models object.""" if len(gene_models) < 2: raise ValueError("The function needs at least 2 arguments!") gm = GeneModels(gene_models[0].resource) gm.utr_models = {} gm.gene_models = {} gm.transcript_models = gene_models[0].transcript_models.copy() for i in gene_models[1:]: gm.transcript_models.update(i.transcript_models) gm.update_indexes() return gm
[docs]def build_gene_models_from_file( file_name: str, file_format: Optional[str] = None, gene_mapping_file_name: Optional[str] = None, ) -> GeneModels: """Load gene models from local filesystem.""" config = { "type": "gene_models", "filename": file_name, } if file_format: config["format"] = file_format if gene_mapping_file_name: config["gene_mapping"] = gene_mapping_file_name res = build_local_resource(".", config) return build_gene_models_from_resource(res)
[docs]def build_gene_models_from_resource( resource: Optional[GenomicResource]) -> GeneModels: """Load gene models from a genomic resource.""" if resource is None: raise ValueError(f"missing resource {resource}") if resource.get_type() != "gene_models": logger.error( "trying to open a resource %s of type " "%s as gene models", resource.resource_id, resource.get_type()) raise ValueError(f"wrong resource type: {resource.resource_id}") gene_models = GeneModels(resource) # gene_models.load() return gene_models