Source code for dae.gene.gene_sets_db

"""Classes for handling of gene sets."""

import abc
import copy
import logging
import os
import textwrap
from functools import cached_property
from typing import Any, Optional

from jinja2 import Template
from markdown2 import markdown

from dae.gene.gene_term import (
    read_ewa_set_file,
    read_gmt_file,
    read_mapping_file,
)
from dae.genomic_resources.fsspec_protocol import build_local_resource
from dae.genomic_resources.repository import GenomicResource
from dae.genomic_resources.resource_implementation import (
    GenomicResourceImplementation,
    InfoImplementationMixin,
    ResourceConfigValidationMixin,
    get_base_resource_schema,
)
from dae.task_graph.graph import Task, TaskGraph

logger = logging.getLogger(__name__)


[docs]class GeneSet: """Class representing a set of genes.""" # FIXME: consider using a dataclass # pylint: disable=too-few-public-methods name: str desc: str count: int syms: list[str] def __init__(self, name: str, desc: str, syms: list[str]) -> None: self.name = name self.desc = desc self.count = len(syms) self.syms = syms def __getitem__(self, name: str) -> Any: # This is done so that GeneSet instances and # denovo gene set dictionaries can be accessed in a uniform way if name == "name": return self.name if name == "desc": return self.desc if name == "count": return self.count if name == "syms": return self.syms raise KeyError
[docs]class BaseGeneSetCollection(abc.ABC): """Base class for gene set collections."""
[docs] @abc.abstractmethod def get_gene_set(self, gene_set_id: str) -> Optional[GeneSet]: """Return the gene set if found; returns None if not found.""" raise NotImplementedError
[docs] @abc.abstractmethod def get_all_gene_sets(self) -> list[GeneSet]: """Return list of all gene sets in the collection.""" raise NotImplementedError
[docs]class GeneSetCollection( GenomicResourceImplementation, ResourceConfigValidationMixin, InfoImplementationMixin, BaseGeneSetCollection, ): """Class representing a collection of gene sets in a resource.""" def __init__(self, resource: GenomicResource) -> None: super().__init__(resource) self.config = self.validate_and_normalize_schema( self.config, resource, ) config = resource.get_config() self.collection_id = self.config["id"] assert self.collection_id != "denovo" assert resource.get_type() == "gene_set", "Invalid resource type" self.web_label = config.get("web_label", None) self.web_format_str = config.get("web_format_str", None) logger.debug("loading %s: %s", self.collection_id, config) self.gene_sets: dict[str, GeneSet] = self.load_gene_sets() assert self.collection_id, self.gene_sets @property def files(self) -> set[str]: raise NotImplementedError
[docs] def load_gene_sets(self) -> dict[str, GeneSet]: """Build a gene set collection from a given GenomicResource.""" assert self.resource is not None gene_sets = {} config = self.resource.get_config() collection_format = config["format"] logger.debug("loading %s: %s", self.collection_id, config) if collection_format == "map": filename = self.config["filename"] names_filename = filename[:-4] + "names.txt" names_file = None if self.resource.file_exists(names_filename): names_file = self.resource.open_raw_file(names_filename) gene_terms = read_mapping_file( self.resource.open_raw_file(filename), names_file, ) elif collection_format == "gmt": filename = config["filename"] gene_terms = read_gmt_file(self.resource.open_raw_file(filename)) elif collection_format == "directory": directory = config["directory"] filepaths = [] if directory == ".": directory = "" # Easier check with startswith for filepath, _ in self.resource.get_manifest().get_files(): if filepath.startswith(directory) and \ filepath.endswith(".txt"): filepaths.append(filepath) files = [self.resource.open_raw_file(f) for f in filepaths] gene_terms = read_ewa_set_file(files) else: raise ValueError("Invalid collection format type") for key, value in gene_terms.tDesc.items(): syms = list(gene_terms.t2G[key].keys()) gene_set = GeneSet(key, value, syms) gene_sets[gene_set.name] = gene_set return gene_sets
[docs] def get_gene_set(self, gene_set_id: str) -> Optional[GeneSet]: """Return the gene set if found; returns None if not found.""" gene_set = self.gene_sets.get(gene_set_id) if gene_set is None: logger.warning( "%s not found in %s", gene_set_id, self.gene_sets.keys(), ) return gene_set
[docs] def get_all_gene_sets(self) -> list[GeneSet]: return list(self.gene_sets.values())
[docs] def get_template(self) -> Template: return Template(textwrap.dedent(""" {% extends base %} {% block content %} <hr> <h2>Gene set ID: {{ data["id"] }}</h2> {% if data["format"] == "directory" %} <h3>Gene sets directory:</h3> <a href="{{ data["directory"] }}"> {{ data["directory"] }} </a> {% else %} <h3>Gene sets file:</h3> <a href="{{ data["filename"] }}"> {{ data["filename"] }} </a> {% endif %} <p>Format: {{ data["format"] }}</p> {% if data["web_label"] %} <p>Web label: {{ data["web_label"] }}</p> {% endif %} {% if data["web_format_str"] %} <p>Web label: {{ data["web_format_str"] }}</p> {% endif %} {% endblock %} """))
def _get_template_data(self) -> dict: info = copy.deepcopy(self.config) if "meta" in info: info["meta"] = markdown(str(info["meta"])) return info
[docs] @staticmethod def get_schema() -> dict[str, Any]: return { **get_base_resource_schema(), "filename": {"type": "string"}, "id": {"type": "string"}, "directory": {"type": "string"}, "format": {"type": "string"}, "web_label": {"type": "string"}, "web_format_str": {"type": "string"}, }
[docs] def get_info(self) -> str: return InfoImplementationMixin.get_info(self)
[docs] def calc_info_hash(self) -> bytes: return b"placeholder"
[docs] def calc_statistics_hash(self) -> bytes: return b"placeholder"
[docs] def add_statistics_build_tasks( self, task_graph: TaskGraph, **kwargs: Any, ) -> list[Task]: return []
# class SqliteGeneSetCollectionDB( # GenomicResourceImplementation, # ResourceConfigValidationMixin, # InfoImplementationMixin # ): # """Collection of gene sets stored in a SQLite database.""" # def __init__(self, resource): # super().__init__(resource) # self.config = self.validate_and_normalize_schema( # self.config, resource # ) # self.collection_id = self.config["id"] # assert self.collection_id != "denovo" # assert resource.get_type() == "gene_set", "Invalid resource type" # self.web_label = self.config.get("web_label", None) # self.web_format_str = self.config.get("web_format_str", None) # self.dbfile = self._get_dbfile_path() # self.engine = create_engine(f"sqlite:///{self.dbfile}") # self.metadata = MetaData(self.engine) # self._create_gene_sets_table() # def _get_dbfile_path(self) -> str: # dbfile = self.config["dbfile"] # proto: FsspecReadOnlyProtocol = \ # cast(FsspecReadOnlyProtocol, self.resource.proto) # if not isinstance(proto, FsspecReadOnlyProtocol) \ # and proto.scheme != "file": # raise ValueError( # "sqlite gene sets are supported only on local filesystem") # dbfile_url = proto.get_resource_file_url(self.resource, dbfile) # dbfile_path = urlparse(dbfile_url).path # return dbfile_path # def _create_gene_sets_table(self): # self.gene_sets_table = Table( # "gene_sets", # self.metadata, # Column("name", String(), primary_key=True), # Column("desc", String()), # Column("syms", String()), # ) # self.metadata.create_all(self.engine) # def add_gene_set(self, gene_set: GeneSet): # """Add a gene set to the database.""" # with self.engine.begin() as connection: # insert_values = { # "name": gene_set.name, # "desc": gene_set.desc, # "syms": ",".join(gene_set.syms) # } # connection.execute( # insert(self.gene_sets_table).values(insert_values) # ) # connection.commit() # def get_gene_set(self, gene_set_id): # """Fetch and construct a GeneSet from the database.""" # table = self.gene_sets_table # select = table.select().where(table.c.name == gene_set_id) # with self.engine.connect() as connection: # row = connection.execute(select).fetchone() # gene_set = GeneSet( # row["name"], # row["desc"], # row["syms"].split(",") # ) # return gene_set # def get_template(self): # return Template(textwrap.dedent(""" # {% extends base %} # {% block content %} # <hr> # <h3>Gene sets dbfile:</h3> # <a href="{{ data["dbfile"] }}"> # {{ data["dbfile"] }} # </a> # <p>Format: {{ data["format"] }}</p> # {% if data["web_label"] %} # <p>Web label: {{ data["web_label"] }}</p> # {% endif %} # {% if data["web_format_str"] %} # <p>Web label: {{ data["web_format_str"] }}</p> # {% endif %} # {% endblock %} # """)) # def _get_template_data(self): # info = copy.deepcopy(self.config) # if "meta" in info: # info["meta"] = markdown(str(info["meta"])) # return info # @property # def files(self): # raise NotImplementedError # @staticmethod # def get_schema(): # return { # **get_base_resource_schema(), # "dbfile": {"type": "string"}, # "id": {"type": "string"}, # "format": {"type": "string"}, # "web_label": {"type": "string"}, # "web_format_str": {"type": "string"} # } # def get_info(self): # return InfoImplementationMixin.get_info(self) # def calc_info_hash(self): # return "placeholder" # def calc_statistics_hash(self) -> bytes: # return b"placeholder" # def add_statistics_build_tasks(self, task_graph, **kwargs) -> list[Task]: # return []
[docs]class GeneSetsDb: """Class that represents a dictionary of gene set collections.""" def __init__(self, gene_set_collections: list[GeneSetCollection]) -> None: self.gene_set_collections: dict[str, GeneSetCollection] = { gsc.collection_id: gsc for gsc in gene_set_collections } @cached_property def collections_descriptions(self) -> list[dict[str, Any]]: """Collect gene set descriptions. Iterates and creates a list of descriptions for each gene set collection """ gene_sets_collections_desc = [] for gsc in self.gene_set_collections.values(): label = gsc.web_label format_str = gsc.web_format_str gsc_id = gsc.collection_id if not label or not format_str: continue gene_sets_collections_desc.append( { "desc": label, "name": gsc_id, "format": format_str.split("|"), "types": [], }, ) return gene_sets_collections_desc
[docs] def has_gene_set_collection(self, gsc_id: str) -> bool: """Check the database if contains the specified gene set collection.""" return gsc_id in self.gene_set_collections
[docs] def get_gene_set_collection_ids(self) -> set[str]: """Return all gene set collection ids. Including the ids of collections which have not been loaded. """ return set(self.gene_set_collections.keys())
[docs] def get_gene_set_ids(self, collection_id: str) -> set[str]: """Return the IDs of all the gene sets in specified collection.""" gsc = self.gene_set_collections[collection_id] return set(gsc.gene_sets.keys())
[docs] def get_all_gene_sets(self, collection_id: str) -> list[GeneSet]: """Return all the gene sets in the specified collection.""" gsc = self.gene_set_collections[collection_id] logger.debug( "gene sets from %s: %s", collection_id, len(gsc.gene_sets.keys())) return list(gsc.gene_sets.values())
[docs] def get_gene_set( self, collection_id: str, gene_set_id: str, ) -> Optional[GeneSet]: """Find and return a gene set in a gene set collection.""" gsc = self.gene_set_collections[collection_id] return gsc.get_gene_set(gene_set_id)
def __len__(self) -> int: return len(self.gene_set_collections)
[docs]def build_gene_set_collection_from_file( filename: str, collection_id: Optional[str] = None, collection_format: Optional[str] = None, web_label: Optional[str] = None, web_format_str: Optional[str] = None, ) -> GeneSetCollection: """Return a Gene Set Collection by adapting a file to a local resource.""" dirname = os.path.dirname(filename) basename = os.path.basename(filename) if collection_format is None: is_dir = os.path.isdir(filename) if is_dir: collection_format = "directory" else: extension = os.path.splitext(filename)[1] if extension == ".txt": collection_format = "map" elif extension == ".gmt": collection_format = "gmt" elif extension == ".sql": collection_format = "sqlite" else: raise ValueError("Cannot find collection format automatically") if collection_id is None: collection_id = basename config = { "type": "gene_set", "id": collection_id, "format": collection_format, "web_label": web_label, "web_format_str": web_format_str, } if collection_format == "directory": config["directory"] = basename elif collection_format == "sqlite": config["dbfile"] = basename else: config["filename"] = basename resource = build_local_resource(dirname, config) return build_gene_set_collection_from_resource(resource)
[docs]def build_gene_set_collection_from_resource( resource: GenomicResource, ) -> GeneSetCollection: """Return a Gene Set Collection built from a resource.""" if resource is None: raise ValueError(f"missing resource {resource}") return GeneSetCollection(resource)