Source code for dae.effect_annotation.annotation_request

from __future__ import annotations

import abc
import logging
from typing import Optional, Union

from dae.effect_annotation.gene_codes import NuclearCode
from dae.effect_annotation.variant import Variant
from dae.genomic_resources.gene_models import TranscriptModel
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.utils.regions import BedRegion

logger = logging.getLogger(__name__)


[docs]class BaseAnnotationRequest(abc.ABC): """Effect annotation request description.""" def __init__( self, reference_genome: ReferenceGenome, code: NuclearCode, promoter_len: int, variant: Variant, transcript_model: TranscriptModel, ): self.reference_genome = reference_genome self.code = code self.promoter_len = promoter_len self.variant = variant self.transcript_model = transcript_model self.__cds_regions: Optional[list[BedRegion]] = None def _clamp_in_cds(self, position: int) -> int: if position < self.transcript_model.cds[0]: return self.transcript_model.cds[0] # if position > self.transcript_model.cds[1]: return self.transcript_model.cds[1] return position
[docs] def get_exonic_length(self) -> int: start = self.transcript_model.exons[0].start end = self.transcript_model.exons[-1].stop return self.get_exonic_distance(start, end) + 1
[docs] def get_exonic_position(self) -> int: start = self.transcript_model.exons[0].start end = self.variant.position return self.get_exonic_distance(start, end) + 1
[docs] def get_exonic_distance(self, start: int, end: int) -> int: """Calculate exonic distance.""" length = 0 for region in self.transcript_model.exons: if region.start <= start <= region.stop: if region.start <= end <= region.stop: return end - start length = region.stop - start + 1 logger.debug( "start len %d %d-%d", length, start, region.stop, ) else: length += region.stop - region.start + 1 logger.debug( "total %d + len %d %d-%d", length, region.stop - region.start + 1, region.stop, region.start - 1, ) if region.start <= end <= region.stop: length -= region.stop - end + 1 logger.debug( "total %d - len %d %d-%d", length, region.stop - end + 1, region.stop, end - 1, ) break return length
@abc.abstractmethod def _get_coding_nucleotide_position(self, position: int) -> int: raise NotImplementedError
[docs] @abc.abstractmethod def get_protein_position(self) -> tuple[int, int]: raise NotImplementedError
[docs] def get_protein_position_for_pos(self, position: int) -> Optional[int]: if position < self.transcript_model.cds[0]: return None if position > self.transcript_model.cds[1]: return None prot_pos = self._get_coding_nucleotide_position(position) return prot_pos // 3 + 1
def _get_sequence(self, start_position: int, end_position: int) -> str: logger.debug("_get_sequence %d-%d", start_position, end_position) if end_position < start_position: return "" return self.reference_genome.get_sequence( self.transcript_model.chrom, start_position, end_position, )
[docs] def cds_regions(self) -> list[BedRegion]: if self.__cds_regions is None: self.__cds_regions = self.transcript_model.cds_regions() return self.__cds_regions
[docs] def get_coding_region_for_pos(self, pos: int) -> Optional[int]: """Find conding region for a position.""" close_match = None for i, reg in enumerate(self.transcript_model.exons): if reg.start <= pos <= reg.stop: return i if reg.start - 1 <= pos <= reg.stop + 1: close_match = i return close_match
[docs] def get_coding_right(self, pos: int, length: int, index: int) -> str: """Construct conding sequence to the right of a position.""" logger.debug( "get_coding_right pos:%d len:%d index:%d", pos, length, index, ) if length <= 0: return "" if index < 0 or pos > self.transcript_model.exons[-1].stop: return self.get_codons_right(pos, length) reg = self.transcript_model.exons[index] if pos == -1: pos = reg.start last_index = min(pos + length - 1, reg.stop) seq = self._get_sequence(pos, last_index) length -= len(seq) return seq + self.get_coding_right(-1, length, index + 1)
[docs] def get_codons_right(self, pos: int, length: int) -> str: """Return coding sequence to the right of a position.""" if length <= 0: return "" last_index = pos + length - 1 seq = self._get_sequence(pos, last_index) return seq
[docs] def get_coding_left(self, pos: int, length: int, index: int) -> str: """Return coding sequence to the left of a position.""" logger.debug( "get_coding_left pos:%d len:%d index:%d", pos, length, index, ) if length <= 0: return "" if index < 0 or ( pos < self.transcript_model.exons[0].start and pos != -1 ): return self.get_codons_left(pos, length) reg = self.transcript_model.exons[index] if pos == -1: pos = reg.stop start_index = max(pos - length + 1, reg.start) seq = self._get_sequence(start_index, pos) length -= len(seq) if index == 0: return self.get_codons_left(reg.start - 1, length) + seq return self.get_coding_left(-1, length, index - 1) + seq
[docs] def get_codons_left(self, pos: int, length: int) -> str: if length <= 0: return "" start_index = pos - length + 1 seq = self._get_sequence(start_index, pos) return seq
[docs] @staticmethod def get_nucleotides_count_to_full_codon(length: int) -> int: return (3 - (length % 3)) % 3
[docs] def cod2aa(self, codon: str) -> str: """Translate codon to amino acid.""" codon = codon.upper() if len(codon) != 3: return "?" for i in codon: if i not in ["A", "C", "T", "G", "N"]: print( "Codon can only contain: A, G, C, T, N letters, \ this codon is: " + codon, ) return "?" for key in self.code.CodonsAaKeys: if codon in self.code.CodonsAa[key]: return key return "?"
[docs] def is_start_codon_affected(self) -> bool: return ( self.variant.position <= self.transcript_model.cds[0] + 2 and self.transcript_model.cds[0] <= self.variant.corrected_ref_position_last )
[docs] def is_stop_codon_affected(self) -> bool: return ( self.variant.position <= self.transcript_model.cds[1] and self.transcript_model.cds[1] - 2 <= self.variant.corrected_ref_position_last )
[docs] def has_utr3_region(self) -> bool: return ( self.transcript_model.exons[-1].stop != self.transcript_model.cds[1] )
[docs] def has_utr5_region(self) -> bool: return ( self.transcript_model.exons[0].start != self.transcript_model.cds[0] )
[docs]class PositiveStrandAnnotationRequest(BaseAnnotationRequest): """Effect annotation request on the positive strand.""" def __init__( self, reference_genome: ReferenceGenome, code: NuclearCode, promoter_len: int, variant: Variant, transcript_model: TranscriptModel, ): super().__init__( reference_genome, code, promoter_len, variant, transcript_model) self.__amino_acids: Optional[ tuple[list[str], list[str]]] = None def _get_coding_nucleotide_position(self, position: int) -> int: length = 0 for region in self.cds_regions(): if region.start - 1 <= position <= region.stop + 1: length += position - region.start break length += region.stop - region.start + 1 logger.debug( "get_coding_nucleotide_position pos=%d len=%d", position, length, ) return length
[docs] def get_protein_position(self) -> tuple[int, int]: """Calculate protein position.""" start_pos = self._clamp_in_cds(self.variant.position) end_pos = self._clamp_in_cds(self.variant.ref_position_last - 1) end_pos = max(start_pos, end_pos) start = self._get_coding_nucleotide_position(start_pos) end = self._get_coding_nucleotide_position(end_pos) return start // 3 + 1, end // 3 + 1
[docs] def get_protein_length(self) -> int: return ( self._get_coding_nucleotide_position(self.transcript_model.cds[1]) // 3 )
[docs] def in_start_codons(self, codon: str) -> bool: """Check if request belongs to the start codon.""" seq = self._get_sequence( self.transcript_model.cds[0], self.transcript_model.cds[0] + 2, ) if codon == seq or codon in self.code.startCodons: return True return False
[docs] def get_frame(self, pos: int, index: int) -> int: """Return the frame of the annotation request.""" reg = self.transcript_model.exons[index] if reg.stop < self.transcript_model.cds[0]: logger.error( "Cannot detect frame. \ Start of coding regions is after current region", ) return 0 start_pos = max(self.transcript_model.cds[0], reg.start) assert reg.frame is not None frame = (pos - start_pos + reg.frame) % 3 logger.debug("frame %d for pos=%s", frame, pos) return frame
[docs] def get_codons(self) -> tuple[str, str]: """Get list of codons.""" pos = max(self.transcript_model.cds[0], self.variant.position) index = self.get_coding_region_for_pos(pos) if index is None: raise IndexError frame = self.get_frame(pos, index) assert self.variant.reference is not None length = len(self.variant.reference) length += self.get_nucleotides_count_to_full_codon(length + frame) coding_before_pos = self.get_coding_left(pos - 1, frame, index) coding_after_pos = self.get_coding_right(pos, length, index) ref_codons = coding_before_pos + coding_after_pos assert self.variant.alternate is not None length_alt = self.get_nucleotides_count_to_full_codon( len(self.variant.alternate) + frame, ) alt_codons = coding_before_pos + self.variant.alternate alt_codons += self.get_coding_right( self.variant.position + len(self.variant.reference), length_alt, index, ) logger.debug( "ref codons=%s, alt codons=%s", ref_codons, alt_codons, ) return ref_codons, alt_codons
[docs] def get_amino_acids( self, ) -> tuple[list[str], list[str]]: """Construct the list of amino acids.""" if self.__amino_acids is None: ref_codons, alt_codons = self.get_codons() ref_amino_acids = [ self.cod2aa(ref_codons[i: i + 3]) for i in range(0, len(ref_codons), 3) ] alt_amino_acids = [ self.cod2aa(alt_codons[i: i + 3]) for i in range(0, len(alt_codons), 3) ] self.__amino_acids = ref_amino_acids, alt_amino_acids return self.__amino_acids
[docs]class NegativeStrandAnnotationRequest(BaseAnnotationRequest): """Effect annotation request on the negative strand.""" def __init__( self, reference_genome: ReferenceGenome, code: NuclearCode, promoter_len: int, variant: Variant, transcript_model: TranscriptModel, ): super().__init__( reference_genome, code, promoter_len, variant, transcript_model) self.__amino_acids: Optional[ tuple[list[str], list[str]]] = None def _get_coding_nucleotide_position(self, position: int) -> int: length = 0 for region in reversed(self.cds_regions()): if region.start - 1 <= position <= region.stop + 1: length += region.stop - position break length += region.stop - region.start + 1 logger.debug( "get_coding_nucleotide_position pos=%d len=%d", position, length, ) return length
[docs] def get_protein_position(self) -> tuple[int, int]: """Calculate the protein position.""" start_pos = self._clamp_in_cds(self.variant.position) end_pos = self._clamp_in_cds(self.variant.ref_position_last - 1) end_pos = max(start_pos, end_pos) end = self._get_coding_nucleotide_position(start_pos) start = self._get_coding_nucleotide_position(end_pos) return start // 3 + 1, end // 3 + 1
[docs] def get_protein_length(self) -> int: return ( self._get_coding_nucleotide_position(self.transcript_model.cds[0]) // 3 )
[docs] def in_start_codons(self, codon: str) -> bool: """Check if request belongs to the start codon.""" seq = self._get_sequence( self.transcript_model.cds[1] - 2, self.transcript_model.cds[1], ) complement_codon = self.complement(codon[::-1]) if codon == seq or complement_codon in self.code.startCodons: return True return False
[docs] def get_frame(self, pos: int, index: int) -> int: """Return the frame of the annotation request.""" reg = self.transcript_model.exons[index] if reg.start > self.transcript_model.cds[1]: logger.error( "Cannot detect frame. \ Start of coding regions is after current region", ) return 0 stop_pos = min(self.transcript_model.cds[1], reg.stop) assert reg.frame is not None frame = (stop_pos - pos + reg.frame) % 3 logger.debug("frame %d for pos=%s", frame, pos) return frame
[docs] def get_codons(self) -> tuple[str, str]: """Get list of codons.""" pos = max(self.variant.position, self.transcript_model.cds[0]) assert self.variant.reference is not None last_position = self.variant.position + len(self.variant.reference) - 1 if pos > last_position + 1: return "", "" index = self.get_coding_region_for_pos(pos) if index is None: raise IndexError frame = self.get_frame(last_position, index) length = last_position - pos + 1 length += self.get_nucleotides_count_to_full_codon(length + frame) logger.debug( "last_position=%d, length=%d index=%d pos=%d cds=%d", last_position, length, index, self.variant.position, self.transcript_model.cds[0], ) coding_before_pos = self.get_coding_left(last_position, length, index) coding_after_pos = self.get_coding_right( last_position + 1, frame, index, ) logger.debug( "coding_before_pos=%s, coding_after_pos=%s frame=%s", coding_before_pos, coding_after_pos, frame, ) ref_codons = coding_before_pos + coding_after_pos assert self.variant.alternate is not None length_alt = self.get_nucleotides_count_to_full_codon( len(self.variant.alternate) + frame, ) alt_codons = self.variant.alternate + coding_after_pos alt_codons = ( self.get_coding_left(self.variant.position - 1, length_alt, index) + alt_codons ) logger.debug( "%d-%d %d-%d->%d-%d %d-%d", last_position, last_position - length, last_position + 1, last_position + 1 + frame, pos, pos - length_alt, last_position + 1, last_position + 1 + frame, ) logger.debug( "ref codons=%s, alt codons=%s", ref_codons, alt_codons, ) return ref_codons, alt_codons
[docs] @staticmethod def complement(sequence: str) -> str: """Build the complementary sequence for a sequence of nucleotides.""" reverse = [] for nucleotide in sequence.upper(): if nucleotide == "A": reverse.append("T") elif nucleotide == "T": reverse.append("A") elif nucleotide == "G": reverse.append("C") elif nucleotide == "C": reverse.append("G") elif nucleotide == "N": reverse.append("N") else: logger.error( "invalid nucleotide: %s in %s", nucleotide, sequence) return "".join(reverse)
[docs] def cod2aa(self, codon: str) -> str: complement_codon = self.complement(codon[::-1]) logger.debug( "complement codon %s for codon %s", complement_codon, codon, ) return BaseAnnotationRequest.cod2aa(self, complement_codon)
[docs] def get_amino_acids( self, ) -> tuple[list[str], list[str]]: """Construct the list of amino acids.""" if self.__amino_acids is None: ref_codons, alt_codons = self.get_codons() ref_amino_acids = [ self.cod2aa(ref_codons[i - 3: i]) for i in range(len(ref_codons), 0, -3) ] alt_amino_acids = [ self.cod2aa(alt_codons[i - 3: i]) for i in range(len(alt_codons), 0, -3) ] self.__amino_acids = ref_amino_acids, alt_amino_acids return self.__amino_acids
[docs] def is_start_codon_affected(self) -> bool: return BaseAnnotationRequest.is_stop_codon_affected(self)
[docs] def is_stop_codon_affected(self) -> bool: return BaseAnnotationRequest.is_start_codon_affected(self)
[docs] def has_utr3_region(self) -> bool: return BaseAnnotationRequest.has_utr5_region(self)
[docs] def has_utr5_region(self) -> bool: return BaseAnnotationRequest.has_utr3_region(self)
AnnotationRequest = Union[ PositiveStrandAnnotationRequest, NegativeStrandAnnotationRequest]