import os
import glob
import logging
import toml
from dae.backends.raw.loader import VariantsLoader, TransmissionType
from dae.backends.storage.genotype_storage import GenotypeStorage
from dae.backends.impala.hdfs_helpers import HdfsHelpers
from dae.backends.impala.impala_helpers import ImpalaHelpers
from dae.backends.impala.impala_variants import ImpalaVariants
from dae.backends.impala.parquet_io import NoPartitionDescriptor, \
ParquetManager, \
ParquetPartitionDescriptor
from dae.configuration.study_config_builder import StudyConfigBuilder
from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.utils.dict_utils import recursive_dict_update
from dae.backends.impala.rsync_helpers import RsyncHelpers
logger = logging.getLogger(__name__)
[docs]class ImpalaGenotypeStorage(GenotypeStorage):
"""Defines Apache Impala genotype storage."""
def __init__(self, storage_config, section_id):
super().__init__(storage_config, section_id)
impala_hosts = self.storage_config.impala.hosts
impala_port = self.storage_config.impala.port
pool_size = self.storage_config.impala.pool_size
self._impala_helpers = ImpalaHelpers(
impala_hosts=impala_hosts, impala_port=impala_port,
pool_size=pool_size)
self._hdfs_helpers = None
self._rsync_helpers = None
if self.storage_config.rsync:
self._rsync_helpers = RsyncHelpers(
self.storage_config.rsync.location)
[docs] def get_db(self):
return self.storage_config.impala.db
[docs] def is_impala(self):
return True
@property
def impala_helpers(self):
assert self._impala_helpers is not None
return self._impala_helpers
@property
def hdfs_helpers(self):
"""Create and return an HDFS helpers object."""
if self._hdfs_helpers is None:
self._hdfs_helpers = HdfsHelpers(
self.storage_config.hdfs.host,
self.storage_config.hdfs.port,
replication=self.storage_config.hdfs.replication
)
assert self._hdfs_helpers is not None
return self._hdfs_helpers
@property
def rsync_helpers(self):
return self._rsync_helpers
[docs] @classmethod
def study_tables(cls, study_config):
"""Return variants and pedigree tables names for a study."""
storage_config = study_config.genotype_storage
if storage_config and storage_config.tables \
and storage_config.tables.pedigree \
and storage_config.tables.variants:
variants_table = storage_config.tables.variants
pedigree_table = storage_config.tables.pedigree
elif storage_config and storage_config.tables \
and storage_config.tables.pedigree:
variants_table = None
pedigree_table = storage_config.tables.pedigree
else:
# default study tables
variants_table = cls._construct_variants_table(
study_config.id)
pedigree_table = cls._construct_pedigree_table(
study_config.id)
return variants_table, pedigree_table
@staticmethod
def _construct_variants_table(study_id):
return f"{study_id}_variants"
@staticmethod
def _construct_pedigree_table(study_id):
return f"{study_id}_pedigree"
[docs] def build_backend(self, study_config, genome, gene_models):
assert study_config is not None
variants_table, pedigree_table = self.study_tables(study_config)
family_variants = ImpalaVariants(
self.impala_helpers,
self.storage_config.impala.db,
variants_table,
pedigree_table,
gene_models,
)
return family_variants
def _generate_study_config(
self, study_id, pedigree_table, variants_table=None):
assert study_id is not None
study_config = {
"id": study_id,
"conf_dir": ".",
"has_denovo": False,
"genotype_storage": {
"id": self.storage_id,
"tables": {"pedigree": pedigree_table},
},
"genotype_browser": {"enabled": False},
}
if variants_table:
storage_config = study_config["genotype_storage"]
storage_config["tables"]["variants"] = variants_table
study_config["genotype_browser"]["enabled"] = True
return study_config
# pylint: disable=arguments-differ
[docs] def simple_study_import(
self,
study_id,
families_loader=None,
variant_loaders=None,
study_config=None,
output=".",
include_reference=False,
**kwargs):
variants_dir = None
has_denovo = False
has_cnv = False
bucket_index = 0
if variant_loaders:
for index, variant_loader in enumerate(variant_loaders):
assert isinstance(variant_loader, VariantsLoader), \
type(variant_loader)
if variant_loader.get_attribute("source_type") == "denovo":
has_denovo = True
if variant_loader.get_attribute("source_type") == "cnv":
has_denovo = True
has_cnv = True
if variant_loader.transmission_type == \
TransmissionType.denovo:
assert index < 100
bucket_index = index # denovo buckets < 100
elif variant_loader.transmission_type == \
TransmissionType.transmitted:
bucket_index = index + 100 # transmitted buckets >=100
variants_dir = os.path.join(output, "variants")
partition_description = NoPartitionDescriptor(variants_dir)
ParquetManager.variants_to_parquet(
variant_loader,
partition_description,
# parquet_filenames.variants,
bucket_index=bucket_index,
include_reference=include_reference
)
pedigree_filename = os.path.join(
output, "pedigree", "pedigree.parquet")
families = families_loader.load()
ParquetManager.families_to_parquet(
families, pedigree_filename
)
config_dict = self.impala_load_dataset(
study_id,
variants_dir=variants_dir,
pedigree_file=pedigree_filename
)
config_dict["has_denovo"] = has_denovo
config_dict["has_cnv"] = has_cnv
if study_config is not None:
study_config_dict = GPFConfigParser.load_config_raw(study_config)
config_dict = recursive_dict_update(config_dict, study_config_dict)
config_builder = StudyConfigBuilder(config_dict)
return config_builder.build_config()
[docs] def default_hdfs_study_path(self, study_id):
study_path = os.path.join(self.storage_config.hdfs.base_dir, study_id)
# study_path = \
# f"hdfs://{self.storage_config.hdfs.host}:" \
# f"{self.storage_config.hdfs.port}{study_path}"
return study_path
[docs] def default_pedigree_hdfs_filename(self, study_id):
study_path = self.default_hdfs_study_path(study_id)
return os.path.join(study_path, "pedigree", "pedigree.parquet")
[docs] def default_variants_hdfs_dirname(self, study_id):
study_path = self.default_hdfs_study_path(study_id)
return os.path.join(study_path, "variants")
[docs] def full_hdfs_path(self, hdfs_path):
result = \
f"hdfs://{self.storage_config.hdfs.host}:" \
f"{self.storage_config.hdfs.port}{hdfs_path}"
return result
def _build_hdfs_pedigree(
self, study_id, pedigree_file):
study_path = os.path.join(self.storage_config.hdfs.base_dir, study_id)
hdfs_dir = os.path.join(study_path, "pedigree")
basename = os.path.basename(pedigree_file)
return os.path.join(hdfs_dir, basename)
def _build_hdfs_variants(
self, study_id, variants_dir, partition_description):
study_path = os.path.join(self.storage_config.hdfs.base_dir, study_id)
hdfs_variants_dir = os.path.join(study_path, "variants")
files_glob = partition_description.generate_file_access_glob()
files_glob = os.path.join(variants_dir, files_glob)
local_variants_files = glob.glob(files_glob)
# logger.debug(f"{variants_dir}, {files_glob}, {local_variants_files}")
local_basedir = partition_description.variants_filename_basedir(
local_variants_files[0])
assert local_basedir.endswith("/")
hdfs_variants_files = []
for lvf in local_variants_files:
assert lvf.startswith(local_basedir)
hdfs_variants_files.append(
os.path.join(hdfs_variants_dir, lvf[len(local_basedir):]))
# logger.debug(f"{local_variants_files}, {hdfs_variants_files}")
return local_variants_files, hdfs_variants_dir, hdfs_variants_files
def _native_hdfs_upload_dataset(
self, study_id, variants_dir,
pedigree_file, partition_description):
study_path = os.path.join(self.storage_config.hdfs.base_dir, study_id)
pedigree_hdfs_path = os.path.join(
study_path, "pedigree", "pedigree.parquet"
)
self.hdfs_helpers.put(pedigree_file, pedigree_hdfs_path)
if variants_dir is None:
return (None, None, pedigree_hdfs_path)
local_variants_files, hdfs_variants_dir, hdfs_variants_files = \
self._build_hdfs_variants(
study_id, variants_dir, partition_description)
partition_filename = os.path.join(
variants_dir, "_PARTITION_DESCRIPTION")
logger.debug(
"checking for partition description: %s", partition_filename)
if os.path.exists(partition_filename):
logger.info(
"copying partition description %s "
"into %s", partition_filename, study_path)
self.hdfs_helpers.put_in_directory(
partition_filename, study_path)
schema_filename = os.path.join(
variants_dir, "_VARIANTS_SCHEMA")
logger.debug(
"checking for variants schema: %s", schema_filename)
if os.path.exists(schema_filename):
logger.info(
"copying variants schema %s "
"into %s", schema_filename, study_path)
self.hdfs_helpers.put_in_directory(
schema_filename, study_path)
for lvf, hvf in zip(local_variants_files, hdfs_variants_files):
hdfs_dir = os.path.dirname(hvf)
self.hdfs_helpers.makedirs(hdfs_dir)
self.hdfs_helpers.put_in_directory(lvf, hdfs_dir)
return (hdfs_variants_dir, hdfs_variants_files[0], pedigree_hdfs_path)
def _rsync_hdfs_upload_dataset(
self, study_id, variants_dir,
pedigree_file, partition_description):
assert self.rsync_helpers is not None
study_path = os.path.join(
self.storage_config.hdfs.base_dir, study_id)
if not study_path.endswith("/"):
study_path += "/"
self.rsync_helpers.clear_remote(study_path)
partition_filename = os.path.join(
variants_dir, "_PARTITION_DESCRIPTION")
if os.path.exists(partition_filename):
self.rsync_helpers.copy_to_remote(
partition_filename, remote_subdir=study_path,
clear_remote=False)
schema_filename = os.path.join(
variants_dir, "_VARIANTS_SCHEMA")
if os.path.exists(schema_filename):
self.rsync_helpers.copy_to_remote(
schema_filename, remote_subdir=study_path,
clear_remote=False)
pedigree_rsync_path = os.path.join(
study_path, "pedigree")
self.rsync_helpers.copy_to_remote(
pedigree_file, remote_subdir=pedigree_rsync_path)
if variants_dir is None:
return (
None, None,
self._build_hdfs_pedigree(study_id, pedigree_file))
variants_rsync_path = os.path.join(
study_path, "variants/")
if not variants_dir.endswith("/"):
variants_dir += "/"
self.rsync_helpers.copy_to_remote(
variants_dir, remote_subdir=variants_rsync_path,
exclude=["_*"])
_, hdfs_variants_dir, hdfs_variants_files = self._build_hdfs_variants(
study_id, variants_dir, partition_description)
# logger.debug(
# f"HDFS_VARIANTS_FILES: {hdfs_variants_dir}, "
# f"{hdfs_variants_files}")
return (
hdfs_variants_dir, hdfs_variants_files[0],
self._build_hdfs_pedigree(study_id, pedigree_file))
[docs] def hdfs_upload_dataset(
self, study_id, variants_dir,
pedigree_file, partition_description):
"""Upload a variants dir and pedigree file to hdfs."""
if self.rsync_helpers is not None:
return self._rsync_hdfs_upload_dataset(
study_id, variants_dir,
pedigree_file, partition_description)
return self._native_hdfs_upload_dataset(
study_id, variants_dir,
pedigree_file, partition_description)
[docs] def impala_import_dataset(
self, study_id,
pedigree_hdfs_file,
variants_hdfs_dir,
partition_description,
variants_sample=None,
variants_schema=None):
"""Create pedigree and variant tables for a study."""
pedigree_table = self._construct_pedigree_table(study_id)
variants_table = self._construct_variants_table(study_id)
db = self.storage_config.impala.db
self.impala_helpers.drop_table(db, variants_table)
self.impala_helpers.drop_table(db, pedigree_table)
self.impala_helpers.import_pedigree_into_db(
db, pedigree_table, pedigree_hdfs_file)
if variants_hdfs_dir is None:
return self._generate_study_config(
study_id, pedigree_table)
assert variants_sample is not None or variants_schema is not None
self.impala_helpers.import_variants_into_db(
db, variants_table, variants_hdfs_dir,
partition_description,
variants_sample=variants_sample,
variants_schema=variants_schema)
return self._generate_study_config(
study_id, pedigree_table, variants_table)
[docs] def impala_load_dataset(self, study_id, variants_dir, pedigree_file):
"""Load a study data into impala genotype storage."""
if variants_dir is None:
partition_description = None
variants_schema = None
else:
partition_config_file = os.path.join(
variants_dir, "_PARTITION_DESCRIPTION"
)
if os.path.exists(partition_config_file):
partition_description = ParquetPartitionDescriptor.from_config(
partition_config_file, root_dirname=variants_dir)
else:
partition_description = NoPartitionDescriptor(
root_dirname=variants_dir)
variants_schema_file = os.path.join(
variants_dir, "_VARIANTS_SCHEMA"
)
variants_schema = None
if os.path.exists(variants_schema_file):
with open(variants_schema_file, "rt") as infile:
content = infile.read()
schema = toml.loads(content)
variants_schema = schema["variants_schema"]
variants_hdfs_dir, variants_hdfs_path, pedigree_hdfs_path = \
self.hdfs_upload_dataset(
study_id, variants_dir, pedigree_file, partition_description)
return self.impala_import_dataset(
study_id,
pedigree_hdfs_path,
variants_hdfs_dir,
partition_description=partition_description,
variants_schema=variants_schema,
variants_sample=variants_hdfs_path)
STUDY_CONFIG_TEMPLATE = """
id = "{id}"
conf_dir = "."
[genotype_storage]
id = "{genotype_storage}"
[genotype_storage.tables]
pedigree = "{pedigree_table}"
variants = "{variants_table}"
"""