"""Provides Apache Parquet storage of genotype data."""
import os
import sys
import time
import re
import hashlib
import itertools
import logging
import configparser
from copy import copy
from urllib.parse import urlparse
from fsspec.core import url_to_fs
import toml
from box import Box
import numpy as np
import pyarrow as pa
from pyarrow import fs as pa_fs
import pyarrow.parquet as pq
import fsspec
from dae.utils.variant_utils import GENOTYPE_TYPE
from dae.variants.attributes import TransmissionType
from dae.variants.family_variant import FamilyAllele, FamilyVariant, \
calculate_simple_best_state
from dae.variants.variant import SummaryVariant, SummaryAllele
from dae.backends.impala.serializers import AlleleParquetSerializer
logger = logging.getLogger(__name__)
[docs]class PartitionDescriptor:
"""Abstract class for partition description."""
def __init__(self):
pass
[docs] def has_partitions(self):
raise NotImplementedError()
[docs] def build_impala_partitions(self):
raise NotImplementedError()
@property
def chromosomes(self):
raise NotImplementedError()
@property
def region_length(self):
raise NotImplementedError()
[docs] def variant_filename(self, family_allele):
raise NotImplementedError()
[docs] def write_partition_configuration(self):
raise NotImplementedError()
[docs]class NoPartitionDescriptor(PartitionDescriptor):
"""Defines class for missing partition description."""
def __init__(self, root_dirname=""):
super().__init__()
self.output = root_dirname
[docs] def has_partitions(self):
return False
[docs] def build_impala_partitions(self):
raise ValueError("unexpected use of build_impala_partitions")
@property
def chromosomes(self):
return []
@property
def region_length(self):
return 3_000_000_000
[docs] def variant_filename(self, family_allele):
bucket_index = family_allele.get_attribute("bucket_index")
filename = f"nopart_{bucket_index:0>6}_variants.parquet"
return os.path.join(self.output, filename)
[docs] def write_partition_configuration(self):
return None
[docs] @staticmethod
def generate_file_access_glob():
"""Generate a glob for accessing parquet files in the partition."""
return "*variants.parquet"
[docs] @staticmethod
def variants_filename_basedir(filename):
"""Extract the variants basedir from filename."""
regexp = re.compile(
"^(?P<basedir>.+)/(?P<prefix>.+)variants.parquet$")
match = regexp.match(filename)
if not match:
return None
assert "basedir" in match.groupdict()
basedir = match.groupdict()["basedir"]
if basedir and basedir[-1] != "/":
basedir += "/"
return basedir
[docs]class ParquetPartitionDescriptor(PartitionDescriptor):
"""Defines partition description used for parquet datasets."""
def __init__(
self,
chromosomes,
region_length,
family_bin_size=0,
coding_effect_types=None,
rare_boundary=0,
root_dirname=""):
super().__init__()
self.output = root_dirname
self._chromosomes = chromosomes
self._region_length = region_length
self._family_bin_size = family_bin_size
self._coding_effect_types = \
coding_effect_types if coding_effect_types else []
self._rare_boundary = rare_boundary
[docs] def has_partitions(self):
return True
[docs] def build_impala_partitions(self):
partitions = ["region_bin string"]
if self.rare_boundary > 0:
partitions.append("frequency_bin tinyint")
if self.coding_effect_types:
partitions.append("coding_bin tinyint")
if self.family_bin_size > 0:
partitions.append("family_bin tinyint")
return ", ".join(partitions)
@property
def chromosomes(self):
return self._chromosomes
@property
def region_length(self):
return self._region_length
@property
def family_bin_size(self):
return self._family_bin_size
@property
def coding_effect_types(self):
return self._coding_effect_types
@property
def rare_boundary(self):
return self._rare_boundary
[docs] @staticmethod
def from_config(config_path, root_dirname=""):
"""Create a partition description from the provided config file."""
filesystem, _ = url_to_fs(config_path)
assert filesystem.exists(config_path), config_path
config = configparser.ConfigParser()
with filesystem.open(config_path, "rt") as input_config_file:
config.read_file(input_config_file, config_path)
return ParquetPartitionDescriptor.from_dict(config, root_dirname)
[docs] @staticmethod
def from_dict(config, root_dirname="") -> "ParquetPartitionDescriptor":
"""Create a paritition description from the provided config."""
assert config["region_bin"] is not None
chromosomes = list(
map(str.strip, config["region_bin"]["chromosomes"].split(","))
)
region_length = int(config["region_bin"]["region_length"])
family_bin_size = 0
coding_effect_types = []
rare_boundary = 0
if "family_bin" in config:
family_bin_size = int(config["family_bin"]["family_bin_size"])
if "coding_bin" in config:
ce_types_str = config["coding_bin"]["coding_effect_types"]
coding_effect_types = [
et.strip() for et in ce_types_str.split(",")
]
coding_effect_types = [et for et in coding_effect_types if et]
if "frequency_bin" in config:
rare_boundary = int(config["frequency_bin"]["rare_boundary"])
return ParquetPartitionDescriptor(
chromosomes,
region_length,
family_bin_size,
coding_effect_types,
rare_boundary,
root_dirname=root_dirname,
)
def _evaluate_region_bin(self, family_allele):
chromosome = family_allele.chromosome
pos = family_allele.position // self._region_length
if chromosome in self._chromosomes:
return f"{chromosome}_{pos}"
return f"other_{pos}"
def _family_bin_from_id(self, family_id):
sha256 = hashlib.sha256()
sha256.update(family_id.encode())
digest = int(sha256.hexdigest(), 16)
return digest % self.family_bin_size
def _evaluate_family_bin(self, family_allele):
return self._family_bin_from_id(family_allele.family_id)
def _evaluate_coding_bin(self, family_allele):
if family_allele.is_reference_allele:
return 0
variant_effects = set(family_allele.effects.types)
coding_effect_types = set(self._coding_effect_types)
result = variant_effects.intersection(coding_effect_types)
if len(result) == 0:
return 0
return 1
def _evaluate_frequency_bin(self, family_allele):
count = family_allele.get_attribute("af_allele_count")
frequency = family_allele.get_attribute("af_allele_freq")
transmission_type = family_allele.transmission_type
if transmission_type == TransmissionType.denovo:
frequency_bin = 0
elif count and int(count) == 1: # Ultra rare
frequency_bin = 1
elif frequency and float(frequency) < self._rare_boundary: # Rare
frequency_bin = 2
else: # Common
frequency_bin = 3
return frequency_bin
[docs] def variant_filename(self, family_allele):
current_bin = self._evaluate_region_bin(family_allele)
filepath = os.path.join(self.output, f"region_bin={current_bin}")
bucket_index = family_allele.get_attribute("bucket_index")
filename = f"variants_region_bin_{current_bin}"
if self._rare_boundary > 0:
current_bin = self._evaluate_frequency_bin(family_allele)
filepath = os.path.join(filepath, f"frequency_bin={current_bin}")
filename += f"_frequency_bin_{current_bin}"
if len(self._coding_effect_types) > 0:
current_bin = self._evaluate_coding_bin(family_allele)
filepath = os.path.join(filepath, f"coding_bin={current_bin}")
filename += f"_coding_bin_{current_bin}"
if self._family_bin_size > 0:
current_bin = self._evaluate_family_bin(family_allele)
filepath = os.path.join(filepath, f"family_bin={current_bin}")
filename += f"_family_bin_{current_bin}"
if bucket_index is not None:
filename += f"_bucket_index_{bucket_index}"
filename += ".parquet"
return os.path.join(filepath, filename)
def _variants_partition_bins(self):
partition_bins = []
region_bins = [
("region_bin", f"{chrom}_0") for chrom in self._chromosomes
]
partition_bins.append(region_bins)
if self._rare_boundary > 0:
frequency_bins = [
("frequency_bin", "1"),
("frequency_bin", "2"),
("frequency_bin", "3")
]
partition_bins.append(frequency_bins)
if len(self._coding_effect_types) > 0:
coding_bins = [
("coding_bin", "0"),
("coding_bin", "1")
]
partition_bins.append(coding_bins)
if self._family_bin_size > 0:
family_bins = [
("family_bin", f"{fb}")
for fb in range(self._family_bin_size)
]
partition_bins.append(family_bins)
return partition_bins
def _variants_filenames_regexp(self):
partition_bins = self._variants_partition_bins()
product = next(itertools.product(*partition_bins))
dirname_parts = [
f"{p[0]}=(?P<{p[0]}>.+)" for p in product
]
filename_parts = [
f"{p[0]}_(?P={p[0]})" for p in product
]
dirname = "/".join(dirname_parts)
dirname = os.path.join("^(?P<basedir>.+)", dirname)
filename = "_".join(filename_parts)
filename = filename + r"(_bucket_index_\d+)?"
filename = f"variants_{filename}\\.parquet$"
filename = os.path.join(dirname, filename)
return re.compile(filename, re.VERBOSE)
[docs] def variants_filename_basedir(self, filename):
"""Return the basedir of the variant file."""
regexp = self._variants_filenames_regexp()
match = regexp.match(filename)
if not match:
return None
assert "basedir" in match.groupdict()
basedir = match.groupdict()["basedir"]
if basedir and basedir[-1] != "/":
basedir += "/"
return basedir
[docs] def write_partition_configuration(self):
config = configparser.ConfigParser()
config.add_section("region_bin")
config["region_bin"]["chromosomes"] = ", ".join(self._chromosomes)
config["region_bin"]["region_length"] = str(self._region_length)
if self._family_bin_size > 0:
config.add_section("family_bin")
config["family_bin"]["family_bin_size"] = str(
self._family_bin_size
)
if len(self._coding_effect_types) > 0:
config.add_section("coding_bin")
config["coding_bin"]["coding_effect_types"] = ", ".join(
self._coding_effect_types
)
if self._rare_boundary > 0:
config.add_section("frequency_bin")
config["frequency_bin"]["rare_boundary"] = str(self._rare_boundary)
filename = os.path.join(self.output, "_PARTITION_DESCRIPTION")
with fsspec.open(filename, "w") as configfile:
config.write(configfile)
[docs] def generate_file_access_glob(self):
"""Return a glob for accessing every parquet file in the partition."""
glob = "*/"
if self.family_bin_size > 0:
glob += "*/"
if self.coding_effect_types:
glob += "*/"
if self.rare_boundary > 0:
glob += "*/"
glob += "*.parquet"
return glob
[docs] def add_family_bins_to_families(self, families):
for family in families.values():
family_bin = self._family_bin_from_id(family.family_id)
for person in family.persons.values():
person.set_attr("family_bin", family_bin)
families._ped_df = None # pylint: disable=protected-access
return families
[docs]class ContinuousParquetFileWriter:
"""Class that writes to a output parquet file.
This class automatically writes to a given parquet file when supplied
enough data. Automatically dumps leftover data when closing into the file
"""
def __init__(
self, filepath, variant_loader, filesystem=None, rows=100_000):
self.filepath = filepath
extra_attributes = variant_loader.get_attribute("extra_attributes")
logger.info(
"using variant loader %s with annotation schema %s",
variant_loader,
variant_loader.annotation_schema)
self.serializer = AlleleParquetSerializer(
variant_loader.annotation_schema, extra_attributes
)
self.schema = self.serializer.schema
if filesystem is not None:
filesystem.create_dir(filepath)
self.dirname = filepath
else:
dirname = os.path.dirname(filepath)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
self.dirname = dirname
self._writer = pq.ParquetWriter(
filepath, self.schema, compression="snappy", filesystem=filesystem
)
self.rows = rows
self._data = None
self.data_reset()
[docs] def data_reset(self):
self._data = {name: [] for name in self.schema.names}
[docs] def size(self):
return len(self._data["chromosome"])
[docs] def build_table(self):
table = pa.Table.from_pydict(self._data, self.schema)
return table
def _write_table(self):
self._writer.write_table(self.build_table())
self.data_reset()
[docs] def append_allele(
self, allele, variant_data,
extra_attributes_data, summary_vectors):
"""Append the data for an entire variant to the buffer.
:param list attributes: List of key-value tuples containing the data
"""
data = self.serializer.build_allele_batch_dict(
allele, variant_data, extra_attributes_data, summary_vectors)
for k, v in self._data.items():
v.extend(data[k])
if self.size() >= self.rows:
logger.info(
"parquet writer %s data flushing at len %s",
self.filepath, self.size())
self._write_table()
[docs] def close(self):
logger.info(
"closing parquet writer %s at len %s", self.dirname, self.size())
if self.size() > 0:
self._write_table()
self._writer.close()
[docs]class VariantsParquetWriter:
"""Provide functions for storing variants into parquet dataset."""
def __init__(
self,
variants_loader,
partition_descriptor,
bucket_index=None,
rows=100_000,
include_reference=True):
self.variants_loader = variants_loader
self.families = variants_loader.families
self.full_variants_iterator = variants_loader.full_variants_iterator()
self.bucket_index = bucket_index
self.rows = rows
self.include_reference = include_reference
self.start = time.time()
self.data_writers = {}
assert isinstance(partition_descriptor, PartitionDescriptor)
self.partition_descriptor = partition_descriptor
extra_attributes = self.variants_loader.get_attribute(
"extra_attributes")
self.serializer = AlleleParquetSerializer(
self.variants_loader.annotation_schema, extra_attributes)
@staticmethod
def _setup_reference_allele(summary_variant, family):
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
best_state = calculate_simple_best_state(
genotype, summary_variant.allele_count
)
ref_summary_allele = summary_variant.ref_allele
reference_allele = FamilyAllele(ref_summary_allele, family, genotype,
best_state)
return reference_allele
@staticmethod
def _setup_all_unknown_allele(summary_variant, family):
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
best_state = calculate_simple_best_state(
genotype, summary_variant.allele_count
)
ref_allele = summary_variant.ref_allele
unknown_allele = FamilyAllele(
SummaryAllele(
ref_allele.chromosome,
ref_allele.position,
ref_allele.reference,
ref_allele.reference,
summary_index=ref_allele.summary_index,
allele_index=-1,
transmission_type=ref_allele.transmission_type,
attributes={},
),
family,
genotype,
best_state,
)
return unknown_allele
def _setup_all_unknown_variant(self, summary_variant, family_id):
family = self.families[family_id]
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
alleles = [
self._setup_reference_allele(summary_variant, family),
self._setup_all_unknown_allele(summary_variant, family),
]
best_state = -1 * np.ones(
shape=(len(alleles), len(family)), dtype=GENOTYPE_TYPE
)
return FamilyVariant(
SummaryVariant(alleles), family, genotype, best_state
)
def _get_bin_writer(self, family_allele):
filename = self.partition_descriptor.variant_filename(family_allele)
if filename not in self.data_writers:
if urlparse(filename).scheme:
filesystem, path = pa_fs.FileSystem.from_uri(filename)
else:
filesystem, path = None, filename
self.data_writers[filename] = ContinuousParquetFileWriter(
path,
self.variants_loader,
filesystem=filesystem,
rows=self.rows,
)
return self.data_writers[filename]
def _write_internal(self):
# pylint: disable=too-many-locals
family_variant_index = 0
report = False
for summary_variant_index, (summary_variant, family_variants) in \
enumerate(self.full_variants_iterator):
summary_alleles = summary_variant.alleles
summary_blobs = self.serializer.serialize_summary_data(
summary_alleles
)
scores_blob = self.serializer.serialize_scores_data(
summary_alleles)
for summary_allele in summary_alleles:
extra_atts = {
"bucket_index": self.bucket_index,
}
summary_allele.update_attributes(extra_atts)
summary_vectors = self.serializer.build_searchable_vectors_summary(
summary_variant)
for family_variant in family_variants:
family_variant_index += 1
fv = family_variant
if family_variant.is_unknown():
# handle all unknown variants
unknown_variant = self._setup_all_unknown_variant(
summary_variant, family_variant.family_id
)
fv = unknown_variant
if -1 not in summary_vectors:
summary_vectors[-1] = copy(summary_vectors[0])
header = self.serializer.allele_batch_header
allele_index_idx = header.index("allele_index")
vectors = summary_vectors[-1]
for vector in vectors:
vector[allele_index_idx] = -1
fv.summary_index = summary_variant_index
fv.family_index = family_variant_index
for family_allele in fv.alleles:
extra_atts = {
"bucket_index": self.bucket_index,
"family_index": family_variant_index
}
family_allele.update_attributes(extra_atts)
alleles = fv.alt_alleles
if self.include_reference or fv.is_reference():
alleles = fv.alleles
variant_data = self.serializer.serialize_family_variant(
alleles, summary_blobs, scores_blob)
assert variant_data is not None
extra_attributes_data = \
self.serializer.serialize_extra_attributes(fv)
for family_allele in alleles:
bin_writer = self._get_bin_writer(family_allele)
bin_writer.append_allele(
family_allele, variant_data,
extra_attributes_data, summary_vectors)
if summary_variant_index % 1000 == 0:
report = True
if report:
elapsed = time.time() - self.start
print(
f"Bucket {self.bucket_index}; "
f"{summary_variant.chromosome}:"
f"{summary_variant.position}: "
f"{summary_variant_index}/"
f"{family_variant_index} variants imported "
f"for {elapsed:.2f} sec ({len(self.data_writers)} files)",
file=sys.stderr,
)
report = False
filenames = list(self.data_writers.keys())
for bin_writer in self.data_writers.values():
bin_writer.close()
print("-------------------------------------------", file=sys.stderr)
print("Bucket:", self.bucket_index, file=sys.stderr)
print("-------------------------------------------", file=sys.stderr)
elapsed = time.time() - self.start
print(
f"DONE: {family_variant_index} family variants imported "
f"for {elapsed:.2f} sec ({len(self.data_writers)} files)",
file=sys.stderr,
)
print("-------------------------------------------", file=sys.stderr)
return filenames
[docs] def write_schema(self):
"""Write the schema to a separate file."""
config = {}
schema = self.serializer.schema
config["variants_schema"] = {}
for k in schema.names:
v = schema.field(k)
config["variants_schema"][k] = str(v.type)
schema = self.serializer.describe_blob_schema()
config["blob"] = {}
for k, v in schema.items():
config["blob"][k] = v
filename = os.path.join(
self.partition_descriptor.output, "_VARIANTS_SCHEMA")
config["extra_attributes"] = {}
extra_attributes = self.serializer.extra_attributes
for attr in extra_attributes:
config["extra_attributes"][attr] = "string"
with fsspec.open(filename, "w") as configfile:
content = toml.dumps(config)
configfile.write(content)
[docs] def write_dataset(self):
"""Write the variants, parittion description and schema."""
filenames = self._write_internal()
self.partition_descriptor.write_partition_configuration()
self.write_schema()
self.variants_loader.close()
return filenames
[docs]class ParquetManager:
"""Provide function for producing variants and pedigree parquet files."""
[docs] @staticmethod
def build_parquet_filenames(
prefix, study_id=None, bucket_index=0, suffix=None):
"""Build parquet filenames."""
assert bucket_index >= 0
basename = os.path.basename(os.path.abspath(prefix))
if study_id is None:
study_id = basename
assert study_id
if suffix is None and bucket_index == 0:
filesuffix = ""
elif bucket_index > 0 and suffix is None:
filesuffix = f"_{bucket_index:0>6}"
elif bucket_index == 0 and suffix is not None:
filesuffix = f"{suffix}"
else:
filesuffix = f"_{bucket_index:0>6}{suffix}"
variants_dirname = os.path.join(prefix, "variants")
variant_filename = os.path.join(
variants_dirname, f"{study_id}{filesuffix}_variants.parquet")
pedigree_filename = os.path.join(
prefix, "pedigree", f"{study_id}{filesuffix}_pedigree.parquet")
conf = {
"variants_dirname": variants_dirname,
"variants": variant_filename,
"pedigree": pedigree_filename,
}
return Box(conf, default_box=True)
[docs] @staticmethod
def families_to_parquet(families, pedigree_filename):
dirname = os.path.dirname(pedigree_filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
save_ped_df_to_parquet(families.ped_df, pedigree_filename)
[docs] @staticmethod
def variants_to_parquet(
variants_loader, partition_descriptor,
bucket_index=1, rows=100_000, include_reference=False):
"""Read variants from variant_loader and store them in parquet."""
# assert variants_loader.get_attribute("annotation_schema") is not None
print(f"variants to parquet ({rows} rows)", file=sys.stderr)
start = time.time()
variants_writer = VariantsParquetWriter(
variants_loader,
partition_descriptor,
bucket_index=bucket_index,
rows=rows,
include_reference=include_reference)
variants_writer.write_dataset()
elapsed = time.time() - start
print(f"DONE: for {elapsed:.2f} sec", file=sys.stderr)
[docs]def pedigree_parquet_schema():
"""Return the schema for pedigree parquet file."""
fields = [
pa.field("family_id", pa.string()),
pa.field("person_id", pa.string()),
pa.field("dad_id", pa.string()),
pa.field("mom_id", pa.string()),
pa.field("sex", pa.int8()),
pa.field("status", pa.int8()),
pa.field("role", pa.int32()),
pa.field("sample_id", pa.string()),
pa.field("generated", pa.bool_()),
pa.field("layout", pa.string()),
pa.field("not_sequenced", pa.bool_()),
]
return pa.schema(fields)
[docs]def add_missing_parquet_fields(pps, ped_df):
"""Add missing parquet fields."""
missing_fields = set(ped_df.columns.values) - set(pps.names)
if "family_bin" in missing_fields:
pps = pps.append(pa.field("family_bin", pa.int8()))
missing_fields = missing_fields - set(["family_bin"])
rename = {}
for column in missing_fields:
name = column.lower().replace(".", "_")
pps = pps.append(pa.field(name, pa.string()))
rename[column] = name
ped_df = ped_df.rename(columns=rename)
missing_fields = [rename[col] for col in missing_fields]
for column in missing_fields:
ped_df[column] = ped_df[column].apply(str)
return ped_df, pps
[docs]def save_ped_df_to_parquet(ped_df, filename, filesystem=None):
"""Convert pdf_df from a dataframe to parquet and store it in filename."""
ped_df = ped_df.copy()
ped_df.role = ped_df.role.apply(lambda r: r.value)
ped_df.sex = ped_df.sex.apply(lambda s: s.value)
ped_df.status = ped_df.status.apply(lambda s: s.value)
if "generated" not in ped_df:
ped_df["generated"] = False
if "layout" not in ped_df:
ped_df["layout"] = None
if "not_sequenced" not in ped_df:
ped_df["not_sequenced"] = False
pps = pedigree_parquet_schema()
ped_df, pps = add_missing_parquet_fields(pps, ped_df)
table = pa.Table.from_pandas(ped_df, schema=pps)
pq.write_table(table, filename, filesystem=filesystem)