Source code for dae.backends.impala.base_query_builder

import logging

from abc import ABC, abstractmethod

from dae.variants.attributes import Inheritance
from dae.backends.attributes_query import inheritance_query

from dae.utils.regions import Region
import dae.utils.regions

from ..attributes_query import QueryTreeToSQLBitwiseTransformer, \
    role_query, sex_query, variant_type_query
from ..attributes_query_inheritance import InheritanceTransformer, \
    inheritance_parser


logger = logging.getLogger(__name__)


[docs]class BaseQueryBuilder(ABC): """A base class for all query builders.""" QUOTE = "'" WHERE = """ WHERE {where} """ GENE_REGIONS_HEURISTIC_CUTOFF = 20 GENE_REGIONS_HEURISTIC_EXTEND = 20000 MAX_CHILD_NUMBER = 9999 def __init__( self, db, variants_table, pedigree_table, variants_schema, table_properties, pedigree_schema, pedigree_df, gene_models=None): assert variants_schema is not None self.db = db self.variants_table = variants_table self.pedigree_table = pedigree_table self.table_properties = table_properties self.variants_columns = variants_schema.fields self.pedigree_columns = pedigree_schema self.ped_df = pedigree_df self.has_extra_attributes = \ "extra_attributes" in self.variants_columns self._product = "" self.query_columns = self._query_columns() self.gene_models = gene_models self.where_accessors = self._where_accessors()
[docs] def reset_product(self): self._product = ""
@property def product(self): return self._product def _where_accessors(self): cols = list(self.variants_columns) accessors = dict(zip(cols, cols)) if "effect_types" not in accessors: accessors["effect_types"] = "effect_types" return accessors
[docs] def build_select(self): columns = ", ".join(self.query_columns) select_clause = f"SELECT {columns}" self._add_to_product(select_clause)
[docs] def build_from(self): from_clause = f"FROM {self.db}.{self.variants_table}" self._add_to_product(from_clause)
[docs] @abstractmethod def build_join(self): pass
[docs] def build_where( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, **_kwargs): """Build the where clause of a query.""" # pylint: disable=too-many-arguments where_clause = self._base_build_where( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, ) self._add_to_product(where_clause)
def _base_build_where( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, **_kwargs): # pylint: disable=too-many-arguments,too-many-branches where = [] if genes is not None: regions = self._build_gene_regions_heuristic(genes, regions) where.append( self._build_iterable_string_attr_where( self.where_accessors["effect_gene_symbols"], genes ) ) if regions is not None: where.append(self._build_regions_where(regions)) if family_ids is not None: where.append( self._build_iterable_string_attr_where( self.where_accessors["family_id"], family_ids) ) if person_ids is not None: # pylint: disable=no-member person_ids = set(person_ids) & set(self.families.persons.keys()) where.append( self._build_iterable_string_attr_where( self.where_accessors["variant_in_members"], person_ids ) ) if effect_types is not None: where.append( self._build_iterable_string_attr_where( self.where_accessors["effect_types"], effect_types ) ) if inheritance is not None: where.extend( self._build_inheritance_where( self.where_accessors["inheritance_in_members"], inheritance ) ) if roles is not None: where.append( self._build_bitwise_attr_where( self.where_accessors["variant_in_roles"], roles, role_query ) ) if sexes is not None: where.append( self._build_bitwise_attr_where( self.where_accessors["variant_in_sexes"], sexes, sex_query ) ) if variant_type is not None: where.append( self._build_bitwise_attr_where( self.where_accessors["variant_type"], variant_type, variant_type_query ) ) if real_attr_filter is not None: where.append(self._build_real_attr_where(real_attr_filter)) if frequency_filter is not None: where.append( self._build_real_attr_where( frequency_filter, is_frequency=True)) if ultra_rare: where.append(self._build_ultra_rare_where(ultra_rare)) where.append( self._build_return_reference_and_return_unknown( return_reference, return_unknown ) ) where.append( self._build_frequency_bin_heuristic( inheritance, ultra_rare, frequency_filter ) ) where.append(self._build_family_bin_heuristic(family_ids, person_ids)) where.append(self._build_coding_heuristic(effect_types)) where.append(self._build_region_bin_heuristic(regions)) where = [w for w in where if w] where_clause = "" if where: where_clause = self.WHERE.format( where=" AND ".join([f"( {w} )" for w in where]) ) return where_clause
[docs] @abstractmethod def build_group_by(self): pass
[docs] def build_limit(self, limit): if limit is not None: self._add_to_product(f"LIMIT {limit}")
[docs] @abstractmethod def create_row_deserializer(self, serializer): pass
def _add_to_product(self, string): if string is None or string == "": return if self._product == "": self._product += string else: self._product += f" {string}" @abstractmethod def _query_columns(self): pass def _build_real_attr_where(self, real_attr_filter, is_frequency=False): query = [] for attr_name, attr_range in real_attr_filter: if attr_name not in self.variants_columns: query.append("false") continue assert attr_name in self.variants_columns assert ( self.variants_columns[attr_name].type in (float, int) ), self.variants_columns[attr_name] left, right = attr_range attr_name = self.where_accessors[attr_name] if left is None and right is None: if not is_frequency: query.append( f"({attr_name} is not null)" ) elif left is None: assert right is not None query.append( f"({attr_name} <= {right} or {attr_name} is null)") elif right is None: assert left is not None query.append(f"({attr_name} >= {left})") else: query.append( "({attr} >= {left} AND {attr} <= {right})".format( attr=attr_name, left=left, right=right ) ) return " AND ".join(query) def _build_ultra_rare_where(self, ultra_rare): assert ultra_rare return self._build_real_attr_where( real_attr_filter=[("af_allele_count", (None, 1))], is_frequency=True ) def _build_regions_where(self, regions): assert isinstance(regions, list), regions where = [] for region in regions: assert isinstance(region, Region) end_position = "COALESCE(end_position, -1)" where.append( "(`chromosome` = {q}{chrom}{q} AND " "(" "(`position` >= {start} AND `position` <= {stop}) " "OR " "({end_position} >= {start} AND {end_position} <= {stop}) " "OR " "({start} >= `position` AND {stop} <= {end_position})" "))".format( q=self.QUOTE, chrom=region.chrom, start=region.start, stop=region.stop, end_position=end_position ) ) return " OR ".join(where) def _build_iterable_string_attr_where(self, column_name, query_values): assert query_values is not None assert isinstance(query_values, (list, set)), type(query_values) if not query_values: where = f" {column_name} IS NULL" return where values = [ " {q}{val}{q} ".format( q=self.QUOTE, val=val.replace("'", "\\'") ) for val in query_values ] where = [] for i in range(0, len(values), self.MAX_CHILD_NUMBER): chunk_values = ",".join(values[i: i + self.MAX_CHILD_NUMBER]) where_str = f" {column_name} in ( {chunk_values} ) " where.append(where_str) where_clause = " OR ".join([f"( {w} )" for w in where]) return where_clause @staticmethod def _build_bitwise_attr_where(column_name, query_value, query_transformer): assert query_value is not None parsed = query_value if isinstance(query_value, str): parsed = query_transformer.transform_query_string_to_tree( query_value ) transformer = QueryTreeToSQLBitwiseTransformer(column_name) return transformer.transform(parsed) @staticmethod def _build_inheritance_where(column_name, query_value): trees = [] if isinstance(query_value, str): tree = inheritance_parser.parse(query_value) trees.append(tree) elif isinstance(query_value, list): for qval in query_value: tree = inheritance_parser.parse(qval) trees.append(tree) else: tree = query_value trees.append(tree) result = [] for tree in trees: transformer = InheritanceTransformer(column_name) res = transformer.transform(tree) result.append(res) return result def _build_gene_regions_heuristic(self, genes, regions): assert genes is not None if len(genes) > 0 and len(genes) <= self.GENE_REGIONS_HEURISTIC_CUTOFF: gene_regions = [] for gene in genes: gene_model = self.gene_models.gene_models_by_gene_name(gene) if gene_model is None: logger.warning("gene model for %s not found", gene) continue for gm in gene_model: gene_regions.append( Region( gm.chrom, gm.tx[0] - self.GENE_REGIONS_HEURISTIC_EXTEND, gm.tx[1] + self.GENE_REGIONS_HEURISTIC_EXTEND, ) ) gene_regions = dae.utils.regions.collapse(gene_regions) logger.info("gene regions for %s: %s", genes, gene_regions) logger.info("input regions: %s", regions) if not regions: regions = gene_regions else: result = [] for gene_region in gene_regions: for region in regions: intersection = gene_region.intersection(region) if intersection: result.append(intersection) result = dae.utils.regions.collapse(result) logger.info("original regions: %s; result: %s", regions, result) regions = result return regions def _build_frequency_bin_heuristic( self, inheritance, ultra_rare, real_attr_filter): # pylint: disable=too-many-branches if "frequency_bin" not in self.variants_columns: return "" rare_boundary = self.table_properties["rare_boundary"] frequency_bin = set() frequency_bin_col = self.where_accessors["frequency_bin"] matchers = [] if inheritance is not None: logger.debug( "frequence_bin_heuristic inheritance: %s (%s)", inheritance, type(inheritance) ) if isinstance(inheritance, str): inheritance = [inheritance] matchers = [ inheritance_query.transform_tree_to_matcher( inheritance_query.transform_query_string_to_tree(inh)) for inh in inheritance] if any(m.match([Inheritance.denovo]) for m in matchers): frequency_bin.add(f"{frequency_bin_col} = 0") has_transmitted_query = all( any( m.match([inh]) for inh in [ Inheritance.mendelian, Inheritance.possible_denovo, Inheritance.possible_omission, Inheritance.unknown, Inheritance.missing ] ) for m in matchers ) if inheritance is None or has_transmitted_query: if ultra_rare: frequency_bin.update([ f"{frequency_bin_col} = 0", f"{frequency_bin_col} = 1", ]) elif real_attr_filter: for name, (begin, end) in real_attr_filter: if name == "af_allele_freq": if end and end < rare_boundary: frequency_bin.update([ f"{frequency_bin_col} = 0", f"{frequency_bin_col} = 1", f"{frequency_bin_col} = 2"]) elif begin and begin >= rare_boundary: frequency_bin.add(f"{frequency_bin_col} = 3") elif end is not None and end >= rare_boundary: frequency_bin.update([ f"{frequency_bin_col} = 0", f"{frequency_bin_col} = 1", f"{frequency_bin_col} = 2", f"{frequency_bin_col} = 3", ]) elif inheritance is not None: frequency_bin.update([ f"{frequency_bin_col} = 1", f"{frequency_bin_col} = 2", f"{frequency_bin_col} = 3"]) if len(frequency_bin) == 4: return "" return " OR ".join(frequency_bin) def _build_coding_heuristic(self, effect_types): if effect_types is None: return "" if "coding_bin" not in self.variants_columns: return "" effect_types = set(effect_types) intersection = \ effect_types & \ set(self.table_properties["coding_effect_types"]) logger.debug( "coding bin heuristic for %s: query effect types: %s; " "coding_effect_types: %s; => %s", self.variants_table, effect_types, self.table_properties["coding_effect_types"], intersection == effect_types ) coding_bin_col = self.where_accessors["coding_bin"] if intersection == effect_types: return f"{coding_bin_col} = 1" if not intersection: return f"{coding_bin_col} = 0" return "" def _build_region_bin_heuristic(self, regions): if not regions or self.table_properties["region_length"] == 0: return "" chroms = set(self.table_properties["chromosomes"]) region_length = self.table_properties["region_length"] region_bins = [] for region in regions: if region.chrom in chroms: chrom_bin = region.chrom else: chrom_bin = "other" start = region.start // region_length stop = region.stop // region_length for position_bin in range(start, stop + 1): region_bins.append(f"{chrom_bin}_{position_bin}") if not region_bins: return "" region_bin_col = self.where_accessors["region_bin"] bins_str = ",".join([f"'{rb}'" for rb in region_bins]) return f"{region_bin_col} IN ({bins_str})" def _build_family_bin_heuristic(self, family_ids, person_ids): if "family_bin" not in self.variants_columns: return "" if "family_bin" not in self.pedigree_columns: return "" family_bins = set() if family_ids: family_ids = set(family_ids) family_bins = family_bins.union( set( self.ped_df[ self.ped_df["family_id"].isin(family_ids) ].family_bin.values ) ) if person_ids: person_ids = set(person_ids) family_bins = family_bins.union( set( self.ped_df[ self.ped_df["person_id"].isin(person_ids) ].family_bin.values ) ) family_bin_col = self.where_accessors["family_bin"] if 0 < len(family_bins) < self.table_properties["family_bin_size"]: family_bin_list = ", ".join([str(fb) for fb in family_bins]) return f"{family_bin_col} IN ({family_bin_list})" return "" def _build_return_reference_and_return_unknown( self, return_reference=None, return_unknown=None ): allele_index_col = self.where_accessors["allele_index"] if not return_reference: return f"{allele_index_col} > 0" if not return_unknown: return f"{allele_index_col} >= 0" return ""