Source code for dae.backends.storage.schema2_genotype_storage

from copy import copy
import glob
from itertools import chain
import os
import logging
from dataclasses import dataclass
from collections.abc import Iterator
from dae.backends.impala.hdfs_helpers import HdfsHelpers
from dae.backends.impala.impala_helpers import ImpalaHelpers
from dae.backends.schema2.impala_variants import ImpalaVariants
from dae.backends.storage.genotype_storage import GenotypeStorage


logger = logging.getLogger(__name__)


[docs]@dataclass(frozen=True) class HdfsStudyLayout: pedigree_file: str summary_variant_dir: str family_variant_dir: str summary_sample: str family_sample: str meta_file: str
[docs]class Schema2GenotypeStorage(GenotypeStorage): """A genotype storing implementing the new schema2.""" def __init__(self, storage_config, section_id): super().__init__(storage_config, section_id) self._hdfs_helpers = None impala_hosts = self.storage_config.impala.hosts impala_port = self.storage_config.impala.port pool_size = self.storage_config.impala.pool_size self._impala_helpers = ImpalaHelpers( impala_hosts=impala_hosts, impala_port=impala_port, pool_size=pool_size)
[docs] def is_impala(self): return False
[docs] def is_filestorage(self): return False
[docs] def build_backend(self, study_config, genome, gene_models): assert study_config is not None family_table, summary_table, pedigree_table, meta_table \ = self._study_tables(study_config) variants = ImpalaVariants( self.impala_helpers, self.storage_config.impala.db, family_table, summary_table, pedigree_table, meta_table, gene_models, ) # TODO create a bigquery variants if so specified in the config return variants
[docs] def simple_study_import( self, study_id, families_loader=None, variant_loaders=None, study_config=None, **kwargs, ): raise NotImplementedError()
[docs] def hdfs_upload_dataset(self, study_id, variants_dir, pedigree_file, meta_file, partition_description) \ -> HdfsStudyLayout: """Upload local data to hdfs.""" # Copy pedigree study_path = os.path.join(self.storage_config.hdfs.base_dir, study_id) pedigree_hdfs_path = os.path.join( study_path, "pedigree", "pedigree.parquet" ) self.hdfs_helpers.put(pedigree_file, pedigree_hdfs_path) meta_hdfs_file = os.path.join( study_path, "meta", "meta.parquet" ) self.hdfs_helpers.put(meta_file, meta_hdfs_file) # Copy optional files optional_files = [ "_PARTITION_DESCRIPTION", "_VARIANTS_SCHEMA", ] for optional_file in optional_files: optional_filename = os.path.join(variants_dir, optional_file) logger.debug("checking for: %s", optional_filename) if os.path.exists(optional_filename): logger.info("copying %s into %s", optional_filename, study_path) self.hdfs_helpers.put_in_directory(optional_filename, study_path) # Copy variants if any summary_sample_hdfs_file, family_sample_hdfs_file = \ self._copy_variants(variants_dir, partition_description, study_path) return HdfsStudyLayout( pedigree_file=pedigree_hdfs_path, summary_variant_dir=os.path.join(study_path, "summary"), family_variant_dir=os.path.join(study_path, "family"), summary_sample=summary_sample_hdfs_file, family_sample=family_sample_hdfs_file, meta_file=meta_hdfs_file, )
@staticmethod def _study_tables(study_config) -> tuple[str, str, str, str]: study_id = study_config.id storage_config = study_config.genotype_storage has_tables = storage_config and storage_config.tables tables = storage_config.tables if has_tables else None family_table = f"{study_id}_family_alleles" if has_tables and tables.family: family_table = tables.family summary_table = f"{study_id}_summary_alleles" if has_tables and tables.summary: summary_table = tables.summary pedigree_table = f"{study_id}_pedigree" if has_tables and tables.pedigree: pedigree_table = tables.pedigree meta_table = f"{study_id}_meta" if has_tables and tables.meta: meta_table = tables.meta return family_table, summary_table, pedigree_table, meta_table def _copy_variants(self, variants_dir, partition_description, study_path): hdfs_summary_dir = os.path.join(study_path, "summary") hdfs_family_dir = os.path.join(study_path, "family") # TODO why pass variants_dir as a independant input parameter ? src_summary_dir = os.path.join( variants_dir, partition_description.summary_alleles_dirname) src_family_dir = os.path.join( variants_dir, partition_description.family_alleles_dirname) summary_files_to_copy = list(self._enum_parquet_files_to_copy( src_summary_dir, hdfs_summary_dir)) family_files_to_copy = list(self._enum_parquet_files_to_copy( src_family_dir, hdfs_family_dir)) for src_file, hdfs_file in chain(summary_files_to_copy, family_files_to_copy): hdfs_dir = os.path.dirname(hdfs_file) self.hdfs_helpers.makedirs(hdfs_dir) self.hdfs_helpers.put_in_directory(src_file, hdfs_dir) # return the first parquet files in the list as sample files return summary_files_to_copy[0][1], family_files_to_copy[0][1] @staticmethod def _enum_parquet_files_to_copy(src_variants_dir, dest_dir) \ -> Iterator[tuple[str, str]]: parquet_files_glob = glob.iglob( os.path.join(src_variants_dir, "**/*.parquet"), recursive=True) for src_file in parquet_files_glob: dst_file = os.path.join(dest_dir, src_file[len(src_variants_dir) + 1:]) yield src_file, dst_file @property def hdfs_helpers(self): """Return the hdfs helper used to interact with hdfs.""" if self._hdfs_helpers is None: self._hdfs_helpers = HdfsHelpers( self.storage_config.hdfs.host, self.storage_config.hdfs.port, replication=self.storage_config.hdfs.replication ) return self._hdfs_helpers
[docs] def import_dataset( self, study_id, hdfs_study_layout: HdfsStudyLayout, partition_description): """Load a dataset from HDFS into impala.""" pedigree_table = self._construct_pedigree_table(study_id) summary_variant_table, family_variant_table = \ self._construct_variant_tables(study_id) meta_table = self._construct_metadata_table(study_id) db = self.storage_config.impala.db self.impala_helpers.drop_table(db, summary_variant_table) self.impala_helpers.drop_table(db, family_variant_table) self.impala_helpers.drop_table(db, pedigree_table) self.impala_helpers.drop_table(db, meta_table) self.impala_helpers.import_pedigree_into_db( db, pedigree_table, hdfs_study_layout.pedigree_file) self.impala_helpers.import_pedigree_into_db( db, meta_table, hdfs_study_layout.meta_file) assert hdfs_study_layout.summary_sample is not None summary_pd = copy(partition_description) # XXX summary_alleles have to family_bin summary_pd._family_bin_size = 0 # pylint: disable=protected-access self.impala_helpers.import_variants_into_db( db, summary_variant_table, hdfs_study_layout.summary_variant_dir, summary_pd, variants_sample=hdfs_study_layout.summary_sample) assert hdfs_study_layout.family_sample is not None self.impala_helpers.import_variants_into_db( db, family_variant_table, hdfs_study_layout.family_variant_dir, partition_description, variants_sample=hdfs_study_layout.family_sample) return self._generate_study_config( study_id, pedigree_table, summary_variant_table, family_variant_table, meta_table)
@property def impala_helpers(self): assert self._impala_helpers is not None return self._impala_helpers @staticmethod def _construct_variant_tables(study_id): return f"{study_id}_summary_alleles", f"{study_id}_family_alleles" @staticmethod def _construct_pedigree_table(study_id): return f"{study_id}_pedigree" @staticmethod def _construct_metadata_table(study_id): return f"{study_id}_meta" def _generate_study_config(self, study_id, pedigree_table, summary_table, family_table, meta_table): assert study_id is not None study_config = { "id": study_id, "conf_dir": ".", "has_denovo": False, "genotype_storage": { "id": self.storage_id, "tables": {"pedigree": pedigree_table}, }, "genotype_browser": {"enabled": False}, } if summary_table: assert family_table is not None storage_config = study_config["genotype_storage"] storage_config["tables"]["summary"] = summary_table storage_config["tables"]["family"] = family_table storage_config["tables"]["meta"] = meta_table study_config["genotype_browser"]["enabled"] = True return study_config