Source code for dae.genomic_resources.histogram

"""Handling of genomic scores statistics.

Currently we support only genomic scores histograms.
"""
from __future__ import annotations

import copy
import logging
from collections import Counter
from dataclasses import dataclass
from typing import IO, Any, Optional, Union, cast

import numpy as np
import yaml

from dae.genomic_resources.repository import GenomicResource
from dae.genomic_resources.statistics.base_statistic import Statistic
from dae.genomic_resources.statistics.min_max import MinMaxValue

logger = logging.getLogger(__name__)


[docs]class HistogramError(BaseException): """ Class used for histogram specific errors. Histograms should be nullified when a HistogramError occurs. """
[docs]@dataclass class NumberHistogramConfig: """Configuration class for number histograms.""" view_range: tuple[Optional[float], Optional[float]] number_of_bins: int = 30 x_log_scale: bool = False y_log_scale: bool = False x_min_log: Optional[float] = None
[docs] def has_view_range(self) -> bool: return self.view_range[0] is not None and \ self.view_range[1] is not None
[docs] def to_dict(self) -> dict[str, Any]: return { "type": "number", "view_range": { "min": self.view_range[0], "max": self.view_range[1], }, "number_of_bins": self.number_of_bins, "x_log_scale": self.x_log_scale, "y_log_scale": self.y_log_scale, "x_min_log": self.x_min_log, }
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> NumberHistogramConfig: """Build a number histogram config from a parsed yaml file.""" hist_type = parsed.get("type") if hist_type != "number": logger.error( "Invalid configuration type (%s)" " for number histogram!\n%s", hist_type, parsed, ) raise TypeError( "Invalid configuration for number histogram!\n" f"{parsed}", ) yaml_range = parsed.get("view_range", {}) x_min = yaml_range.get("min", None) x_max = yaml_range.get("max", None) view_range = (x_min, x_max) number_of_bins = parsed.get("number_of_bins", 100) x_log_scale = parsed.get("x_log_scale", False) y_log_scale = parsed.get("y_log_scale", False) x_min_log = parsed.get("x_min_log") return NumberHistogramConfig( view_range, number_of_bins, x_log_scale, y_log_scale, x_min_log, )
[docs] @staticmethod def default_config( min_max: Optional[MinMaxValue], ) -> NumberHistogramConfig: """Build a number histogram config from a parsed yaml file.""" if min_max is None: view_range: tuple[Optional[float], Optional[float]] = (None, None) elif min_max.min == min_max.max: view_range = (min_max.min, min_max.min + 1.0) else: view_range = (min_max.min, min_max.max) number_of_bins = 100 x_log_scale = False y_log_scale = False return NumberHistogramConfig( view_range, number_of_bins, x_log_scale, y_log_scale)
[docs]@dataclass class CategoricalHistogramConfig: """Configuration class for categorical histograms.""" value_order: Optional[list[str]] = None y_log_scale: bool = False
[docs] def to_dict(self) -> dict[str, Any]: return { "type": "categorical", "value_order": self.value_order, "y_log_scale": self.y_log_scale, }
[docs] @staticmethod def default_config() -> CategoricalHistogramConfig: return CategoricalHistogramConfig([])
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> CategoricalHistogramConfig: """Create categorical histogram config from configuratin dict.""" hist_type = parsed.get("type") if hist_type != "categorical": raise TypeError( "Invalid configuration type for categorical histogram!\n" f"{parsed}", ) value_order = parsed.get("value_order", []) y_log_scale = parsed.get("y_log_scale", False) return CategoricalHistogramConfig( value_order=value_order, y_log_scale=y_log_scale, )
[docs]@dataclass class NullHistogramConfig: """Configuration class for null histograms.""" reason: str
[docs] def to_dict(self) -> dict[str, Any]: return { "type": "null", "reason": self.reason, }
[docs] @staticmethod def default_config() -> NullHistogramConfig: return NullHistogramConfig("Unspecified reason")
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> NullHistogramConfig: """Create Null histogram from configuration dict.""" hist_type = parsed.get("type") if hist_type != "null": raise TypeError( "Invalid configuration type for null histogram!\n" f"{parsed}", ) reason = parsed.get("reason", "Unspecified reason") return NullHistogramConfig( reason=reason, )
[docs]class NumberHistogram(Statistic): """Class to represent a histogram.""" type = "number_histogram" def __init__( self, config: NumberHistogramConfig, bins: Optional[np.ndarray] = None, bars: Optional[np.ndarray] = None): super().__init__("histogram", "Collects values for histogram.") logger.debug("number histogram config: %s", config) assert isinstance(config, NumberHistogramConfig) self.config = config self.out_of_range_values: list[float] = [] self.out_of_range_bins: list[int] = [0, 0] self.min_value: float = np.nan self.max_value: float = np.nan if self.config.x_log_scale and self.config.x_min_log is None: raise ValueError( "Invalid histogram configuration, missing x_min_log", ) if self.config.view_range[0] is None or \ self.config.view_range[1] is None: logger.error( "unexpected min/max value: [%s, %s]", self.config.view_range[0], self.config.view_range[1]) raise ValueError( "unexpected min/max value:" f"[{self.config.view_range[0]}, " f"{self.config.view_range[1]}]") if bins is not None and bars is not None: self.bins = bins self.bars = bars elif bins is None and bars is None: if self.config.x_log_scale: assert self.config.x_min_log is not None self.bins = np.array([ self.config.view_range[0], * np.logspace( np.log10(self.config.x_min_log), np.log10(self.config.view_range[1]), self.config.number_of_bins, )]) self._rstep = (self.config.number_of_bins - 1) / \ (np.log10(self.view_max()) - np.log10(self.config.x_min_log)) else: self.bins = np.linspace( self.config.view_range[0], self.config.view_range[1], self.config.number_of_bins + 1, ) self._rstep = self.config.number_of_bins / \ (self.view_max() - self.view_min()) self.bars = np.zeros(self.config.number_of_bins, dtype=np.int64) assert not np.any(np.isnan(self.bins)), ("nan bins", self.config) elif self.bins is None or self.bars is None: raise ValueError( "Cannot instantiate histogram with only bins or only bars!", )
[docs] def view_min(self) -> float: if self.config.view_range[0] is None \ or np.isnan(self.config.view_range[0]): raise ValueError("view range min value not set") return self.config.view_range[0]
[docs] def view_max(self) -> float: if self.config.view_range[1] is None \ or np.isnan(self.config.view_range[1]): raise ValueError("view range max value not set") return self.config.view_range[1]
[docs] def merge(self, other: Statistic) -> None: """Merge two histograms.""" assert isinstance(other, NumberHistogram) # assert self.config == other.config, (self.config, other.config) assert self.bins is not None and self.bars is not None assert other.bins is not None and other.bars is not None assert np.allclose(self.bins, other.bins, rtol=1e-5), \ (self.bins, other.bins) self.bars += other.bars self.out_of_range_bins[0] += other.out_of_range_bins[0] self.out_of_range_bins[1] += other.out_of_range_bins[1] if np.isnan(self.min_value): self.min_value = min(other.min_value, self.min_value) else: self.min_value = min(self.min_value, other.min_value) if np.isnan(self.max_value): self.max_value = max(other.max_value, self.max_value) else: self.max_value = max(self.max_value, other.max_value)
@property def view_range(self) -> tuple[Optional[float], Optional[float]]: return self.config.view_range
[docs] def values_domain(self) -> str: return f"[{self.min_value:0.3f}, {self.max_value:0.3f}]"
[docs] def add_value(self, value: Optional[float]) -> None: """Add value to the histogram.""" if value is None or np.isnan(value): return if not isinstance(value, (int, float, np.integer)): raise TypeError( "Cannot add non numerical value " f"{value} ({type(value)}) to number histogram", ) self.min_value = min(value, self.min_value) self.max_value = max(value, self.max_value) if self.config.x_log_scale: index = self.choose_bin_log(value) else: index = self.choose_bin_lin(value) if index < 0: logger.warning( "out of range %s value %s", self.view_range, value) tindex = index + 2 self.out_of_range_bins[tindex] += 1 return self.bars[index] += 1
[docs] def choose_bin_lin(self, value: float) -> int: """Compute bin index for a passed value for linear x-scale.""" if value < self.view_min(): return -2 if value > self.view_max(): return -1 index = int((value - self.view_min()) * self._rstep) return min(index, self.config.number_of_bins - 1)
[docs] def choose_bin_log(self, value: float) -> int: """Compute bin index for a passed value for log x-scale.""" assert self.config.x_log_scale assert self.config.x_min_log is not None if value < self.view_min(): return -2 if value > self.view_max(): return -1 if value < self.config.x_min_log: return 0 index = int( (np.log10(value) - np.log10(self.config.x_min_log)) * self._rstep) + 1 return min(index, self.config.number_of_bins - 1)
[docs] def to_dict(self) -> dict[str, Any]: return { "config": self.config.to_dict(), "bins": self.bins.tolist(), "bars": self.bars.tolist(), "out_of_range_bins": self.out_of_range_bins, "min_value": float(self.min_value), "max_value": float(self.max_value), }
[docs] def serialize(self) -> str: return cast(str, yaml.dump(self.to_dict()))
[docs] def plot(self, outfile: IO, score_id: str) -> None: """Plot histogram and save it into outfile.""" # pylint: disable=import-outside-toplevel import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt width = self.bins[1:] - self.bins[:-1] plt.bar( x=self.bins[:-1], height=self.bars, log=self.config.y_log_scale, width=width, align="edge") if self.config.x_log_scale: plt.xscale("log") plt.xlabel(score_id) plt.ylabel("count") plt.grid(axis="y") plt.grid(axis="x") plt.savefig(outfile) plt.clf()
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> NumberHistogram: """Build a number histogram from a dict.""" config = NumberHistogramConfig.from_dict(data["config"]) hist = NumberHistogram( config, bins=np.array(data.get("bins")), bars=np.array(data.get("bars")), ) hist.min_value = data.get("min_value", np.nan) hist.max_value = data.get("max_value", np.nan) hist.out_of_range_bins = data.get("out_of_range_bins", [0, 0]) return hist
[docs] @staticmethod def deserialize(content: str) -> NumberHistogram: data = yaml.load(content, yaml.Loader) return NumberHistogram.from_dict(data)
[docs]class HistogramStatisticMixin: """Mixin for creating statistics classes with histograms."""
[docs] @staticmethod def get_histogram_file(score_id: str) -> str: return f"histogram_{score_id}.yaml"
[docs] @staticmethod def get_histogram_image_file(score_id: str) -> str: return f"histogram_{score_id}.png"
[docs]class NullHistogram(Statistic): """Class for annulled histograms.""" type = "null_histogram" def __init__(self, config: Optional[NullHistogramConfig]) -> None: super().__init__( "null_histogram", "Used for invalid/annulled histograms", ) if config is None: config = NullHistogramConfig.default_config() self.reason = config.reason
[docs] def add_value(self, value: Any) -> None: return
[docs] def merge(self, other: Any) -> None: return
[docs] def to_dict(self) -> dict[str, Any]: return { "config": { "type": "null", "reason": self.reason, }, }
[docs] def values_domain(self) -> str: return "NO DOMAIN"
# pylint: disable=unused-argument
[docs] def plot(self, outfile: IO, score_id: str) -> None: return
[docs] def serialize(self) -> str: return cast(str, yaml.dump( self.to_dict(), ))
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> NullHistogram: """Build a null histogram from a dict.""" config = data["config"] hist_type = config.get("type") if hist_type != "null": raise TypeError( f"Invalid configuration type for null histogram!\n{data}", ) reason = config.get("reason", "") return NullHistogram(NullHistogramConfig(reason=reason))
[docs] @staticmethod def deserialize(content: str) -> NullHistogram: data = yaml.load(content, yaml.Loader) return NullHistogram.from_dict(data)
[docs]class CategoricalHistogram(Statistic): """Class for categorical data histograms.""" type = "categorical_histogram" VALUES_LIMIT = 100 # pylint: disable=too-few-public-methods def __init__( self, config: CategoricalHistogramConfig, values: Optional[dict[str, int]] = None, ): super().__init__( "categorical_histogram", "Collects values for categorical histogram.", ) self.config = config if values is not None: self._values = Counter(values) else: self._values = Counter() self.y_log_scale = config.y_log_scale self._bars: Optional[dict[str, int]] = None
[docs] def add_value(self, value: Optional[str]) -> None: """Add a value to the categorical histogram. Returns true if successfully added and false if failed. Will fail if too many values are accumulated. """ self._bars = None if value is None: return if not isinstance(value, str): raise TypeError( "Cannot add non string value " f"{value} to categorical histogram", ) self._values[value] += 1 if len(self._values) > CategoricalHistogram.VALUES_LIMIT: raise HistogramError( f"Too many values already present to add {value}" " to categorical histogram.", )
[docs] def merge(self, other: Statistic) -> None: """Merge with other histogram.""" assert isinstance(other, CategoricalHistogram) assert self.config == other.config self._bars = None self._values += other._values # pylint: disable=protected-access if len(self._values) > CategoricalHistogram.VALUES_LIMIT: raise HistogramError( "Can not merge categorical histograms; too many unique values")
@property def bars(self) -> dict[str, int]: """Return categorical histogram bars in order.""" if self._bars is None: values = {} if self.config.value_order: for key in self.config.value_order: values[key] = self._values[key] for key, count in self._values.most_common(): if key not in values: values[key] = count self._bars = values return self._bars
[docs] def values_domain(self) -> str: return ", ".join(self.bars.keys())
[docs] def to_dict(self) -> dict[str, Any]: return { "config": self.config.to_dict(), "values": self.bars, }
[docs] def serialize(self) -> str: return cast(str, yaml.dump(self.to_dict()))
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> CategoricalHistogram: config = CategoricalHistogramConfig.from_dict(data["config"]) return CategoricalHistogram(config, data.get("values"))
[docs] @staticmethod def deserialize(content: str) -> CategoricalHistogram: data = yaml.load(content, yaml.Loader) return CategoricalHistogram.from_dict(data)
[docs] def plot(self, outfile: IO, score_id: str) -> None: """Plot histogram and save it into outfile.""" # pylint: disable=import-outside-toplevel import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt values = self.bars.keys() counts = self.bars.values() plt.figure(figsize=(15, 10), tight_layout=True) plt.bar(values, counts) plt.xlabel(score_id) plt.ylabel("count") plt.tick_params(axis="x", labelrotation=90) plt.savefig(outfile) plt.clf()
[docs]def build_histogram_config( config: Optional[dict[str, Any]]) -> Optional[HistogramConfig]: """Create histogram config form configuration dict.""" if config is None: return None if "histogram" in config: hist_config = config["histogram"] hist_type = hist_config["type"] elif "number_hist" in config: hist_type = "number" hist_config = copy.copy(config["number_hist"]) hist_config["type"] = hist_type elif "categorical_hist" in config: hist_type = "categorical" hist_config = copy.copy(config["categorical_hist"]) hist_config["type"] = hist_type elif "null_hist" in config: hist_type = "null" hist_config = copy.copy(config["null_hist"]) hist_config["type"] = hist_type else: return None if hist_type == "number": return NumberHistogramConfig.from_dict(hist_config) if hist_type == "categorical": return CategoricalHistogramConfig.from_dict(hist_config) if hist_type == "null": return NullHistogramConfig.from_dict(hist_config) return NullHistogramConfig(f"Invalid histogram configuration {config}")
[docs]def build_default_histogram_conf(value_type: str, **kwargs: Any) -> Union[ NumberHistogramConfig, CategoricalHistogramConfig, NullHistogramConfig, ]: """Build default histogram config for given value type.""" if value_type in ["int", "float"]: min_max = kwargs.get("min_max") return NumberHistogramConfig.default_config(min_max) if value_type == "str": return CategoricalHistogramConfig.default_config() return NullHistogramConfig( "No histogram configured and no default config available for type" f"{value_type}", )
[docs]def build_empty_histogram( config: HistogramConfig, ) -> Union[NumberHistogram, CategoricalHistogram, NullHistogram]: """Create an empty histogram from a deserialize histogram dictionary.""" try: if isinstance(config, NumberHistogramConfig): return NumberHistogram(config) if isinstance(config, CategoricalHistogramConfig): return CategoricalHistogram(config) if isinstance(config, NullHistogramConfig): return NullHistogram(config) return NullHistogram(NullHistogramConfig( "Could not match histogram config type", )) except BaseException as err: # pylint: disable=broad-except logger.warning( "Failed to create empty histogram from config", exc_info=True) return NullHistogram(NullHistogramConfig( f"Failed to create empty histogram from config: {err}"))
[docs]def load_histogram( resource: GenomicResource, filename: str, ) -> Histogram: """Load and return a histogram in a resource. On an error or missing histogram, an appropriate NullHistogram is returned. """ try: with resource.open_raw_file(filename) as infile: content = infile.read() except FileNotFoundError: logger.error( "unable to load histogram file: %s; file not found", filename) return NullHistogram(NullHistogramConfig( "Histogram file not found.", )) hist_data = yaml.load(content, yaml.Loader) config = hist_data["config"] hist_type = config["type"] try: if hist_type == "number": return NumberHistogram.deserialize(content) if hist_type == "categorical": return CategoricalHistogram.deserialize(content) if hist_type == "null": return NullHistogram.deserialize(content) return NullHistogram(NullHistogramConfig("Invalid histogram type")) except BaseException: # pylint: disable=broad-except logger.exception( "Failed to deserialize histogram from %s", filename, ) return NullHistogram(NullHistogramConfig( "Failed to deserialize histogram.", ))
HistogramConfig = Union[ NullHistogramConfig, CategoricalHistogramConfig, NumberHistogramConfig] Histogram = Union[NullHistogram, CategoricalHistogram, NumberHistogram]