from __future__ import annotations
import copy
import logging
from collections import defaultdict
from collections.abc import Iterator, Sequence
from typing import Any, Optional, Union
import networkx as nx
import pysam
logger = logging.getLogger(__name__)
MAX_POSITION = 3_000_000_000
[docs]def coalesce(v1: Optional[int], v2: int) -> int:
"""Return first non-None value."""
if v1 is not None:
return v1
return v2
[docs]def bedfile2regions(bed_filename: str) -> list[BedRegion]:
"""Transform BED file into list of regions."""
with open(bed_filename) as infile:
regions = []
for line in infile:
if line[0] == "#":
continue
chrom, sbeg, send = line.strip().split("\t")
beg = int(sbeg)
end = int(send)
regions.append(BedRegion(chrom, beg + 1, end))
return regions
[docs]def regions2bedfile(regions: list[BedRegion], bed_filename: str) -> None:
"""Save list of regions into a BED file."""
with open(bed_filename, "w") as outfile:
for reg in regions:
outfile.write(
f"{reg.chrom}\t{reg.start - 1}\t{reg.stop}\n")
[docs]def split_into_regions(
chrom: str, chrom_length: int,
region_size: int) -> list[Region]:
"""Return a list of regions for a chrom with a given length."""
regions = []
current_start = 1
while current_start < chrom_length + 1:
end = min(chrom_length, current_start + region_size - 1)
regions.append(Region(chrom, current_start, end))
current_start = current_start + region_size
return regions
[docs]def get_chromosome_length_tabix(
tabix_file: Union[pysam.TabixFile, pysam.VariantFile], chrom: str,
step: int = 100_000_000, precision: int = 5_000_000,
) -> Optional[int]:
"""
Return the length of a chromosome (or contig).
Returned value is guarnteed to be larger than the actual contig length.
"""
def any_records(riter: Iterator) -> bool:
try:
next(riter)
except StopIteration:
return False
except ValueError:
return False
return True
try:
# First we find any region that includes the last record i.e.
# the length of the chromosome
left, right = None, None
pos = step
while left is None or right is None:
region = Region(chrom, pos, None)
if any_records(tabix_file.fetch(str(region))):
left = pos
pos = pos * 2
else:
right = pos
pos = pos // 2
# Second we use binary search to narrow the region until we find the
# index of the last element (in left) and the length (in right)
while (right - left) > precision:
pos = (left + right) // 2
region = Region(chrom, pos, None)
if any_records(tabix_file.fetch(str(region))):
left = pos
else:
right = pos
return right
except ValueError as ex:
logger.warning(
"unable to find length of contig %s: %s", chrom, ex)
return None
[docs]class Region:
"""Class representing a genomic region."""
def __init__(
self, chrom: str,
start: Optional[int] = None, stop: Optional[int] = None,
):
if start is not None and not isinstance(start, int):
raise TypeError(f"Invalid type for start position - {type(start)}")
if stop is not None and not isinstance(stop, int):
raise TypeError(f"Invalid type for stop position - {type(stop)}")
if start is not None and stop is not None:
assert start <= stop
self.chrom = chrom
self._start = start
self._stop = stop
@property
def start(self) -> Optional[int]:
return self._start
@property
def stop(self) -> Optional[int]:
return self._stop
@property
def begin(self) -> Optional[int]:
return self.start
@property
def end(self) -> Optional[int]:
return self.stop
def __repr__(self) -> str:
if self.start is None:
return self.chrom
if self.end is None:
return f"{self.chrom}:{self.start}"
return f"{self.chrom}:{self.start}-{self.stop}"
def __hash__(self) -> int:
return str(self).__hash__()
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Region):
return False
return bool(
self.chrom == other.chrom
and self.start == other.start
and self.stop == other.stop,
)
def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)
[docs] def isin(self, chrom: str, pos: int) -> bool:
"""Check if a genomic position is insde of the region."""
if chrom != self.chrom:
return False
if self.start and pos < self.start:
return False
if self.stop and pos > self.stop:
return False
return True
@staticmethod
def _min(a: Optional[int], b: Optional[int]) -> Optional[int]:
if a is None:
return b
if b is None:
return a
return min(a, b)
@staticmethod
def _max(a: Optional[int], b: Optional[int]) -> Optional[int]:
if a is None:
return b
if b is None:
return a
return max(a, b)
@staticmethod
def _make_region(
chrom: str, start: Optional[int], stop: Optional[int],
) -> Optional[Region]:
if chrom is None:
return None
if start is not None and stop is not None:
if start > stop:
return None
return Region(chrom, start, stop)
[docs] def intersection(self, other: Region) -> Optional[Region]:
"""Return intersection of the region with other region."""
if self.chrom != other.chrom:
return None
if self.start is None and self.stop is None:
return Region(other.chrom, other.start, other.stop)
if other.start is None and other.stop is None:
return Region(self.chrom, self.start, self.stop)
if self.start is None:
assert self.stop is not None
return self._make_region(
self.chrom, other.start, self._min(self.stop, other.stop))
if self.stop is None:
assert self.start is not None
return self._make_region(
self.chrom, self._max(self.start, other.start), other.stop)
assert self.start is not None and self.stop is not None
return self._make_region(
self.chrom,
self._max(self.start, other.start),
self._min(self.stop, other.stop),
)
[docs] def contains(self, other: Region) -> bool:
"""Check if the region contains other region."""
if self.chrom != other.chrom:
return False
if self.start is None and self.stop is None:
return True
if self.start is None:
assert self.stop is not None
if other.stop is not None:
assert other.stop is not None
return bool(other.stop <= self.stop)
return False
if self.stop is None:
assert self.start is not None
if other.start is not None:
return other.start >= self.start
return False
assert self.start is not None and self.stop is not None
if other.stop is None or other.start is None:
return False
return self.start <= other.start \
and other.stop <= self.stop
[docs] def intersects(self, other: Region) -> bool:
"""Check if the region intersects another."""
if self.chrom != other.chrom:
return False
if self.start is not None and self.stop is not None:
if other.start is None or other.stop is None:
return other.intersects(self)
return not (self.stop < other.start or self.start > other.stop)
if self.stop is not None:
return other.start is None or self.stop >= other.start
if self.start is not None:
return other.stop is None or self.start <= other.stop
return True
[docs] @staticmethod
def from_str(region: str) -> Region:
"""Parse string representation of a region."""
parts = [p.strip() for p in region.rsplit(":", maxsplit=1)]
if len(parts) == 1:
return Region(parts[0], None, None)
if len(parts) == 2:
chrom = parts[0]
parts = [p.strip() for p in parts[1].split("-")]
start = int(parts[0].replace(",", ""))
if len(parts) == 1:
return BedRegion(chrom, start, start)
if len(parts) == 2:
stop = int(parts[1].replace(",", ""))
return BedRegion(chrom, start, stop)
raise ValueError(f"unexpeced format for region {region}")
[docs]class BedRegion(Region):
"""Represents proper bed regions."""
def __init__(
self, chrom: str,
start: int, stop: int,
):
assert start is not None
assert stop is not None
assert stop >= start
super().__init__(chrom, start, stop)
@property
def start(self) -> int:
assert self._start is not None
return self._start
@property
def stop(self) -> int:
assert self._stop is not None
return self._stop
@property
def begin(self) -> int:
return self.start
@property
def end(self) -> int:
return self.stop
def __len__(self) -> int:
return self.stop - self.start
[docs]def all_regions_from_chrom(regions: list[Region], chrom: str) -> list[Region]:
"""Subset of regions in R that are from chr."""
return [r for r in regions if r.chrom == chrom]
[docs]def unique_regions(regions: list[Region]) -> list[Region]:
"""Remove duplicated regions."""
return list(set(regions))
[docs]def connected_component(regions: list[BedRegion]) -> Any:
"""Return connected component of regions.
This might be the same as collapse.
"""
graph = nx.Graph()
graph.add_nodes_from(regions)
regions_by_chrom = defaultdict(list)
for reg in regions:
regions_by_chrom[reg.chrom].append(reg)
for _chrom, nds in regions_by_chrom.items():
nds.sort(key=lambda x: x.stop)
for k in range(1, len(nds)):
for j in range(k - 1, -1, -1):
if nds[k].start <= nds[j].stop:
graph.add_edge(nds[k], nds[j])
else:
break
return nx.connected_components(graph)
[docs]def collapse(
source: Sequence[Region],
is_sorted: bool = False,
) -> list[Region]:
"""Collapse list of regions."""
if not source:
return list(source)
regions = copy.deepcopy(list(source))
if not is_sorted:
regions.sort(key=lambda x: x.start if x.start is not None else -1)
collapsed: dict[str, list[Region]] = defaultdict(list)
collapsed[regions[0].chrom].append(regions[0])
for reg in regions[1:]:
chrom_collapsed = collapsed.get(reg.chrom)
if not chrom_collapsed:
collapsed[reg.chrom].append(reg)
continue
prev_reg = chrom_collapsed[-1]
if coalesce(reg.start, 1) <= coalesce(prev_reg.stop, 1):
if coalesce(reg.stop, 1) > coalesce(prev_reg.stop, 1):
last = collapsed[reg.chrom][-1]
collapsed[reg.chrom][-1] = \
Region(last.chrom, last.start, reg.stop)
continue
collapsed[reg.chrom].append(reg)
result = []
for v in list(collapsed.values()):
result.extend(v)
return result
[docs]def collapse_no_chrom(
source: list[BedRegion],
is_sorted: bool = False,
) -> list[BedRegion]:
"""Collapse by ignoring the chromosome.
Useful when the caller knows
that all the regions are from the same chromosome.
"""
if not source:
return source
regions = copy.copy(source)
if len(regions) == 1:
return regions
if not is_sorted:
regions.sort(key=lambda x: x.start)
collapsed = [regions[0]]
for reg in regions[1:]:
prev_reg = collapsed[-1]
if reg.start <= prev_reg.stop:
if reg.stop > prev_reg.stop:
prev_reg = BedRegion(prev_reg.chrom, prev_reg.start, reg.stop)
collapsed[-1] = prev_reg
continue
collapsed.append(reg)
return collapsed
[docs]def total_length(regions: list[BedRegion]) -> int:
return sum(len(regions) for x in regions)
[docs]def intersection(
regions1: list[Region], regions2: list[Region],
) -> list[Region]:
"""Compute intersection of two list of regions.
First collapses each for lists of regions s1 and s2 and then find
the intersection.
"""
s1_c = collapse(regions1)
s2_c = collapse(regions2)
s1_c.sort(key=lambda x: (x.chrom, x.start))
s2_c.sort(key=lambda x: (x.chrom, x.start))
intersect = []
k = 0
for i in s2_c:
while k < len(s1_c):
if i.chrom != s1_c[k].chrom:
if i.chrom > s1_c[k].chrom:
k += 1
continue
break
if coalesce(i.stop, 1) < coalesce(s1_c[k].start, 1):
break
if coalesce(i.start, 1) > coalesce(s1_c[k].stop, MAX_POSITION):
k += 1
continue
if coalesce(i.start, 1) <= coalesce(s1_c[k].start, 1):
if coalesce(i.stop, MAX_POSITION) >= \
coalesce(s1_c[k].stop, MAX_POSITION):
intersect.append(s1_c[k])
k += 1
continue
new_i = Region(i.chrom, s1_c[k].start, i.stop)
intersect.append(new_i)
break
if coalesce(i.start, 1) > coalesce(s1_c[k].start, 1):
if coalesce(i.stop, MAX_POSITION) <= \
coalesce(s1_c[k].stop, MAX_POSITION):
intersect.append(i)
break
new_i = Region(i.chrom, i.start, s1_c[k].stop)
intersect.append(new_i)
k += 1
continue
return intersect
[docs]def union(*r: list[Region]) -> list[Region]:
"""Collapse many lists of regions."""
r_sum = [el for list in r for el in list]
return collapse(r_sum)
def _diff(
regions_a: list[Region], regions_b: list[Region],
) -> list[Region]:
result = []
k = 0
for reg_a in regions_a:
if k >= len(regions_b):
result.append(reg_a)
continue
if reg_a.chrom < regions_b[k].chrom:
result.append(reg_a)
continue
if coalesce(reg_a.stop, MAX_POSITION) < \
coalesce(regions_b[k].start, 1):
result.append(reg_a)
continue
prev = coalesce(reg_a.start, 1)
while k < len(regions_b) \
and coalesce(regions_b[k].stop, MAX_POSITION) <= \
coalesce(reg_a.stop, MAX_POSITION) \
and regions_b[k].chrom == reg_a.chrom:
if prev < coalesce(regions_b[k].start, 1):
new_a = Region(
reg_a.chrom, prev,
coalesce(regions_b[k].start, 1) - 1)
result.append(new_a)
prev = coalesce(regions_b[k].stop, 1) + 1
k += 1
if k < len(regions_b) and regions_b[k].chrom != reg_a.chrom:
continue
if prev <= coalesce(reg_a.stop, MAX_POSITION):
result.append(Region(reg_a.chrom, prev, reg_a.stop))
return result
[docs]def difference(
regions1: list[Region], regions2: list[Region],
symmetric: bool = False,
) -> list[Region]:
"""Compute difference between two list of regions."""
if not symmetric:
left = collapse(regions1)
left.sort(key=lambda x: (x.chrom, x.start))
else:
left = union(regions1, regions2)
left.sort(key=lambda x: (x.chrom, x.start))
right = intersection(regions1, regions2)
return _diff(left, right)