Source code for dae.genomic_resources.reference_genome

from __future__ import annotations

import logging
import os
from collections.abc import Generator
from types import TracebackType
from typing import IO, Any, Optional, Type, cast

from dae.genomic_resources import GenomicResource
from dae.genomic_resources.fsspec_protocol import build_local_resource
from dae.genomic_resources.resource_implementation import (
    ResourceConfigValidationMixin,
    get_base_resource_schema,
)
from dae.utils.regions import Region

logger = logging.getLogger(__name__)


[docs]class ReferenceGenome( ResourceConfigValidationMixin, ): """Provides an interface for quering a reference genome.""" def __init__(self, resource: GenomicResource): self.resource = resource if resource.get_type() != "genome": raise ValueError( f"wrong type of resource passed: {resource.get_type()}") self._index: dict[str, Any] = {} self._chromosomes: list[str] = [] self._sequence: Optional[IO] = None self.pars: dict = self._parse_pars(resource.get_config()) @property def resource_id(self) -> str: return self.resource.resource_id @staticmethod def _parse_pars(config: dict[str, Any]) -> dict: if "PARS" not in config: return {} assert config["PARS"]["X"] is not None regions_x = [ Region.from_str(region) for region in config["PARS"]["X"] ] chrom_x = regions_x[0].chrom result = { chrom_x: regions_x, } if config["PARS"]["Y"] is not None: regions_y = [ Region.from_str(region) for region in config["PARS"]["Y"] ] chrom_y = regions_y[0].chrom result[chrom_y] = regions_y return result @property def chromosomes(self) -> list[str]: """Return a list of all chromosomes of the reference genome.""" return self._chromosomes @property def chrom_prefix(self) -> str: """Return a prefix of all chromosomes of the reference genome.""" chrom = self._chromosomes[0] if chrom.startswith("chr"): return "chr" return "" def _load_genome_index(self) -> None: config = self.resource.get_config() file_name = config["filename"] index_file_name = config.get( "index_file", f"{file_name}.fai") index_content = self.resource.get_file_content(index_file_name) self._parse_genome_index(index_content) def _parse_genome_index(self, index_content: str) -> None: for line in index_content.split("\n"): line = line.strip() if not line: break rec = line.split() self._index[rec[0]] = { "length": int(rec[1]), "startBit": int(rec[2]), "seqLineLength": int(rec[3]), "lineLength": int(rec[4]), } self._chromosomes = list(self._index.keys()) @property def files(self) -> list[str]: config = self.resource.get_config() file_name = config["filename"] index_file_name = config.get("index_file", f"{file_name}.fai") return [file_name, index_file_name]
[docs] def close(self) -> None: """Close reference genome sequence file-like objects."""
# FIXME: consider using weakref to work around this problem # self._sequence.close() # self._sequence = None # self._index = {} # self._chromosomes = []
[docs] def open(self) -> ReferenceGenome: """Open reference genome resources.""" if self.is_open(): logger.info( "opening already opened reference genome %s", self.resource.resource_id) return self self._load_genome_index() config = self.resource.get_config() file_name = config["filename"] self._sequence = self.resource.open_raw_file( file_name, "rb", uncompress=False, seekable=True) return self
[docs] def is_open(self) -> bool: return self._sequence is not None
def __enter__(self) -> ReferenceGenome: return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: if exc_type is not None: logger.error( "exception while using reference genome: %s, %s, %s", exc_type, exc_value, exc_tb, exc_info=True) try: self.close() except Exception: # pylint: disable=broad-except logger.error( "exception during closing reference genome", exc_info=True)
[docs] def get_chrom_length(self, chrom: str) -> int: """Return the length of a specified chromosome.""" if not self._index: logger.info("genome index not loaded; loading") self._load_genome_index() chrom_data = self._index.get(chrom) if chrom_data is None: raise ValueError(f"can't find chromosome {chrom}") return cast(int, chrom_data["length"])
[docs] def get_all_chrom_lengths(self) -> list[tuple[str, int]]: """Return list of all chromosomes lengths.""" if not self._index: logger.info("genome index not loaded; loading") self._load_genome_index() return [ (key, value["length"]) for key, value in self._index.items()]
[docs] def split_into_regions( self, region_size: int, chromosome: Optional[str] = None, ) -> Generator[Region, None, None]: """ Split the reference genome into regions and yield them. Can specify a specific chromosome to limit the regions to be in that chromosome only. """ if chromosome is None: chromosome_lengths = self.get_all_chrom_lengths() else: chromosome_lengths = [ (chromosome, self.get_chrom_length(chromosome)), ] for chrom, chrom_len in chromosome_lengths: logger.debug( "Chromosome '%s' has length %s", chrom, chrom_len) i = 1 while i < chrom_len - region_size: yield Region(chrom, i, i + region_size - 1) i += region_size yield Region(chrom, i, None)
[docs] def fetch( self, chrom: str, start: int, stop: Optional[int], buffer_size: int = 512, ) -> Generator[str, None, None]: """ Yield the nucleotides in a specific region. While line feed calculation can be inaccurate because not every fetch will start at the start of a line, line feeds add extra characters to read and the output is limited by the amount of nucleotides expected to be read. """ if chrom not in self.chromosomes: logger.warning( "chromosome %s not found in %s", chrom, self.resource.resource_id) return None assert self._sequence is not None self._sequence.seek( self._index[chrom]["startBit"] + start - 1 + (start - 1) // self._index[chrom]["seqLineLength"], ) chrom_length = self.get_chrom_length(chrom) if stop is None: length = chrom_length - start + 1 else: length = min(stop, chrom_length) - start + 1 line_feeds = 1 + length // self._index[chrom]["seqLineLength"] total_length = length + line_feeds read_progress = 0 while read_progress < length: read_length = min(buffer_size, total_length - read_progress) sequence = self._sequence.read(read_length).decode("ascii") sequence = sequence.replace("\n", "").upper() end = min(read_progress + read_length, length - read_progress) sequence = sequence[:end] for nuc in sequence: yield nuc read_progress += len(sequence) return None
[docs] def get_sequence(self, chrom: str, start: int, stop: int) -> str: """Return sequence of nucleotides from specified chromosome region.""" return "".join(self.fetch(chrom, start, stop))
[docs] def is_pseudoautosomal(self, chrom: str, pos: int) -> bool: """Return true if specified position is pseudoautosomal.""" def in_any_region( chrom: str, pos: int, regions: list[Region]) -> bool: return any(map(lambda reg: reg.isin(chrom, pos), regions)) pars_regions = self.pars.get(chrom, None) if pars_regions: return in_any_region( chrom, pos, pars_regions) return False
[docs] @staticmethod def get_schema() -> dict[str, Any]: return { **get_base_resource_schema(), "filename": {"type": "string"}, "chrom_prefix": {"type": "string"}, "PARS": {"type": "dict", "schema": { "X": {"type": "list", "schema": {"type": "string"}}, "Y": {"type": "list", "schema": {"type": "string"}}, }}, }
[docs]def build_reference_genome_from_file(filename: str) -> ReferenceGenome: """Open a reference genome from a file.""" dirname = os.path.dirname(filename) basename = os.path.basename(filename) res = build_local_resource(dirname, { "type": "genome", "filename": basename, }) return build_reference_genome_from_resource(res)
[docs]def build_reference_genome_from_resource( resource: GenomicResource) -> ReferenceGenome: """Open a reference genome from resource.""" if resource.get_type() != "genome": logger.error( "trying to open a resource %s of type " "%s as reference genome", resource.resource_id, resource.get_type()) raise ValueError(f"wrong resource type: {resource.resource_id}") ref = ReferenceGenome(resource) return ref