Source code for dae.backends.impala.impala_helpers

import os
import re
import time
import itertools
import logging

from contextlib import closing

from impala import dbapi  # type: ignore
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import TimeoutError as SqlTimeoutError


logger = logging.getLogger(__name__)


[docs]class ImpalaHelpers: """Helper methods for working with impala.""" def __init__(self, impala_hosts, impala_port=21050, pool_size=1): if os.environ.get("DAE_IMPALA_HOST", None) is not None: impala_host = os.environ.get("DAE_IMPALA_HOST", None) logger.info("impala host overwritten: %s", impala_host) if impala_host is not None: impala_hosts = [impala_host] if impala_hosts is None: impala_hosts = [] host_generator = itertools.cycle(impala_hosts) def create_connection(): impala_host = next(host_generator) logger.debug("creating connection to impala host %s", impala_host) connection = dbapi.connect(host=impala_host, port=impala_port) connection.host = impala_host return connection if pool_size is None: pool_size = 3 * len(impala_host) + 1 logger.info("impala connection pool size is: %s", pool_size) self._connection_pool = QueuePool( create_connection, pool_size=pool_size, reset_on_return=False, max_overflow=pool_size, timeout=1.0) logger.debug( "created impala pool with %s connections", self._connection_pool.status())
[docs] def connection(self, timeout: float = None): """Create a new connection to the impala host.""" logger.debug("going to get impala connection from the pool; %s", self._connection_pool.status()) started = time.time() while True: try: connection = self._connection_pool.connect() return connection except SqlTimeoutError: elapsed = time.time() - started logger.debug( "unable to connect; elapsed %0.2fsec", elapsed) if timeout is not None and elapsed > timeout: return None
@staticmethod def _import_single_file(cursor, db, table, import_file): cursor.execute( f""" DROP TABLE IF EXISTS {db}.{table} """) dirname = os.path.dirname(import_file) statement = f""" CREATE EXTERNAL TABLE {db}.{table} LIKE PARQUET '{import_file}' STORED AS PARQUET LOCATION '{dirname}' """ logger.debug("%s", statement) cursor.execute(statement) cursor.execute(f"REFRESH {db}.{table}") @staticmethod def _add_partition_properties(cursor, db, table, partition_description): chromosomes = ", ".join(partition_description.chromosomes) cursor.execute( f"ALTER TABLE {db}.{table} " "SET TBLPROPERTIES(" f"'gpf_partitioning_region_bin_chromosomes' = " f"'{chromosomes}'" ")" ) cursor.execute( f"ALTER TABLE {db}.{table} " "SET TBLPROPERTIES(" f"'gpf_partitioning_region_bin_region_length' = " f"'{partition_description.region_length}'" ")" ) cursor.execute( f"ALTER TABLE {db}.{table} " "SET TBLPROPERTIES(" f"'gpf_partitioning_family_bin_family_bin_size' = " f"'{partition_description.family_bin_size}'" ")" ) coding_effect_types = ",".join( partition_description.coding_effect_types ) coding_effect_types = coding_effect_types.replace("'", "\\'") cursor.execute( f"ALTER TABLE {db}.{table} " "SET TBLPROPERTIES(" f"'gpf_partitioning_coding_bin_coding_effect_types' = " f"'{coding_effect_types}'" ")" ) cursor.execute( f"ALTER TABLE {db}.{table} " "SET TBLPROPERTIES(" f"'gpf_partitioning_frequency_bin_rare_boundary' = " f"'{partition_description.rare_boundary}'" ")" ) @staticmethod def _create_dataset_table( cursor, db, table, sample_file, partition_description ): cursor.execute( f"DROP TABLE IF EXISTS {db}.{table}") hdfs_dir = partition_description.variants_filename_basedir(sample_file) if not partition_description.has_partitions(): statement = f""" CREATE EXTERNAL TABLE {db}.{table} LIKE PARQUET '{sample_file}' STORED AS PARQUET LOCATION '{hdfs_dir}' """ else: partitions = partition_description.build_impala_partitions() statement = f""" CREATE EXTERNAL TABLE {db}.{table} LIKE PARQUET '{sample_file}' PARTITIONED BY ({partitions}) STORED AS PARQUET LOCATION '{hdfs_dir}' """ cursor.execute(statement) if partition_description.has_partitions(): cursor.execute( f"ALTER TABLE {db}.{table} RECOVER PARTITIONS") cursor.execute( f"REFRESH {db}.{table}")
[docs] def import_pedigree_into_db(self, db, pedigree_table, pedigree_hdfs_file): with closing(self.connection()) as conn: with conn.cursor() as cursor: cursor.execute( f"CREATE DATABASE IF NOT EXISTS {db}") self._import_single_file( cursor, db, pedigree_table, pedigree_hdfs_file)
@staticmethod def _build_variants_schema(variants_schema): type_convertion = { "int32": "INT", "int16": "SMALLINT", "int8": "TINYINT", "float": "FLOAT", "string": "STRING", "binary": "STRING", } result = [] for field_name, field_type in variants_schema.items(): impala_type = type_convertion.get(field_type) assert impala_type is not None, (field_name, field_type) result.append(f"`{field_name}` {impala_type}") statement = ", ".join(result) statement = f"( {statement} )" return statement def _build_import_variants_statement( self, db, variants_table, variants_hdfs_dir, partition_description, variants_sample=None, variants_schema=None): assert variants_sample is not None or variants_schema is not None statement = [ "CREATE EXTERNAL TABLE", f"{db}.{variants_table}"] if variants_schema is not None: statement.append( self._build_variants_schema(variants_schema)) else: assert variants_sample is not None statement.extend( ["LIKE PARQUET", f"'{variants_sample}'"]) if partition_description.has_partitions(): partitions = partition_description.build_impala_partitions() statement.extend([ "PARTITIONED BY", f"({partitions})" ]) statement.extend([ "STORED AS PARQUET LOCATION", f"'{variants_hdfs_dir}'" ]) return " ".join(statement)
[docs] def import_variants_into_db( self, db, variants_table, variants_hdfs_dir, partition_description, variants_sample=None, variants_schema=None): """Import variant parquet files in variants_hdfs_dir in impala.""" assert variants_schema is not None or variants_sample is not None with closing(self.connection()) as conn: with conn.cursor() as cursor: cursor.execute( f"CREATE DATABASE IF NOT EXISTS {db}") cursor.execute( f"DROP TABLE IF EXISTS {db}.{variants_table}") statement = self._build_import_variants_statement( db, variants_table, variants_hdfs_dir, partition_description, variants_sample=variants_sample, variants_schema=variants_schema) logger.info("going to execute: %s", statement) cursor.execute(statement) if partition_description.has_partitions(): cursor.execute( f"ALTER TABLE {db}.{variants_table} " f"RECOVER PARTITIONS") cursor.execute( f"REFRESH {db}.{variants_table}") if partition_description.has_partitions(): self._add_partition_properties( cursor, db, variants_table, partition_description)
[docs] def get_table_create_statement(self, db, table): """Get the create statement for table.""" with closing(self.connection()) as conn: with conn.cursor() as cursor: statement = f"SHOW CREATE TABLE {db}.{table}" cursor.execute(statement) create_statement = None for row in cursor: create_statement = row[0] break return create_statement
[docs] def recreate_table(self, db, table, new_table, new_hdfs_dir): """Recreate a table.""" create_statement = self.get_table_create_statement(db, table) assert create_statement is not None with closing(self.connection()) as conn: table_name = re.compile( r"CREATE EXTERNAL TABLE (?P<table_name>[a-zA-Z0-9._]+)\s") create_statement = table_name.sub( f"CREATE EXTERNAL TABLE {db}.{new_table} ", create_statement ) location = re.compile( r"LOCATION '(?P<location>.+)'\s") create_statement = location.sub( f"LOCATION '{new_hdfs_dir}' ", create_statement ) position = re.compile( r"\s(position)\s") create_statement = position.sub( " `position` ", create_statement ) role = re.compile( r"\s(role)\s") create_statement = role.sub( " `role` ", create_statement ) create_statement = create_statement.replace("3'UTR", "3\\'UTR") create_statement = create_statement.replace("5'UTR", "5\\'UTR") with conn.cursor() as cursor: cursor.execute( f"DROP TABLE IF EXISTS {db}.{new_table}" ) logger.info("going to execute %s", create_statement) cursor.execute(create_statement) if "PARTITIONED" in create_statement: cursor.execute( f"ALTER TABLE {db}.{new_table} " f"RECOVER PARTITIONS") cursor.execute( f"REFRESH {db}.{new_table}")
[docs] def rename_table(self, db, table, new_table): """Rename db.table to new_table.""" statement = [ f"ALTER TABLE {db}.{table} RENAME TO {db}.{new_table}" ] statement = " ".join(statement) logger.info("going to execute %s", statement) with closing(self.connection()) as conn: with conn.cursor() as cursor: cursor.execute(statement)
[docs] def check_database(self, dbname): """Check if dbname exists.""" with closing(self.connection()) as conn: with conn.cursor() as cursor: query = "SHOW DATABASES" cursor.execute(query) for row in cursor: if row[0] == dbname: return True return False
[docs] def check_table(self, dbname, tablename): """Check if dbname.tablename exists.""" with closing(self.connection()) as conn: with conn.cursor() as cursor: query = f"SHOW TABLES IN {dbname}" cursor.execute(query) for row in cursor: if row[0] == tablename.lower(): return True return False
[docs] def drop_table(self, dbname, tablename): with closing(self.connection()) as conn: with conn.cursor() as cursor: query = f"DROP TABLE IF EXISTS {dbname}.{tablename}" cursor.execute(query)
[docs] def create_database(self, dbname): with closing(self.connection()) as conn: with conn.cursor() as cursor: query = f"CREATE DATABASE IF NOT EXISTS {dbname}" cursor.execute(query)
[docs] def drop_database(self, dbname): with closing(self.connection()) as conn: with conn.cursor() as cursor: cursor.execute( f"DROP DATABASE IF EXISTS {dbname} CASCADE")