Source code for dae.backends.impala.summary_variants_query_builder

import logging
from dae.backends.impala.base_query_builder import BaseQueryBuilder

logger = logging.getLogger(__name__)


[docs]class SummaryVariantsQueryBuilder(BaseQueryBuilder): """Build queries related to summary variants.""" def __init__( self, db, variants_table, pedigree_table, variants_schema, table_properties, pedigree_schema, pedigree_df, gene_models=None, summary_variants_table=None): self.summary_variants_table = summary_variants_table super().__init__( db, variants_table, pedigree_table, variants_schema, table_properties, pedigree_schema, pedigree_df, gene_models=gene_models) def _where_accessors(self): accessors = super()._where_accessors() for key, value in accessors.items(): accessors[key] = f"variants.{value}" return accessors def _query_columns(self): if self.summary_variants_table: self.select_accessors = { "bucket_index": "variants.bucket_index", "summary_index": "variants.summary_index", "variant_data": "variants.variant_data", "family_variants_count": "variants.family_variants_count", "seen_in_status": "variants.seen_in_status", "seen_as_denovo": "variants.seen_as_denovo", } # if self.has_extra_attributes: # self.select_accessors["extra_attributes"] = \ # "MIN(variants.extra_attributes)" else: self.select_accessors = { "bucket_index": "variants.bucket_index", "summary_index": "variants.summary_index", "variant_data": "MIN(variants.variant_data)", "family_variants_count": "COUNT(DISTINCT variants.family_id)", "seen_in_status": "gpf_bit_or(pedigree.status)", "seen_as_denovo": "gpf_or(BITAND(inheritance_in_members, 4))", } if self.has_extra_attributes: self.select_accessors["extra_attributes"] = \ "MIN(variants.extra_attributes)" columns = list(self.select_accessors.values()) return columns
[docs] def build_from(self): table = self.summary_variants_table \ if self.summary_variants_table is not None \ else self.variants_table from_clause = f"FROM {self.db}.{table} as variants" self._add_to_product(from_clause)
[docs] def build_join(self): if self.summary_variants_table is not None: return join_clause = f"JOIN {self.db}.{self.pedigree_table} as pedigree" self._add_to_product(join_clause)
[docs] def build_group_by(self): if self.summary_variants_table is not None: return self._add_to_product( "GROUP BY bucket_index, summary_index, " "allele_index, variant_type, transmission_type")
[docs] def build_where( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, **_kwargs): # FIXME too many arguments # pylint: disable=too-many-arguments if self.summary_variants_table: inheritance = None where_clause = self._base_build_where( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, ) self._add_to_product(where_clause) if self.summary_variants_table is not None: return if where_clause: in_members = "AND variants.variant_in_members = pedigree.person_id" else: in_members = \ "WHERE variants.variant_in_members = pedigree.person_id" self._add_to_product(in_members)
[docs] def create_row_deserializer(self, serializer): def deserialize_row(row): cols = {} for idx, col_name in enumerate(self.query_columns): cols[col_name] = row[idx] bucket_index = cols[self.select_accessors["bucket_index"]] summary_index = cols[self.select_accessors["summary_index"]] variant_data = cols[self.select_accessors["variant_data"]] family_variants_count = cols[ self.select_accessors["family_variants_count"]] seen_in_status = cols[self.select_accessors["seen_in_status"]] seen_as_denovo = cols[self.select_accessors["seen_as_denovo"]] extra_attributes = cols.get( self.select_accessors.get("extra_attributes", None), None) if isinstance(variant_data, str): logger.debug( "variant_data is string!!!! %d, %s", bucket_index, summary_index ) variant_data = bytes(variant_data, "utf8") if isinstance(extra_attributes, str): # TODO do we really need that if. Looks like a python2 leftover # logger.debug( # f"extra_attributes is string!!!! " # f"{bucket_index}, {summary_index}" # ) extra_attributes = bytes(extra_attributes, "utf8") v = serializer.deserialize_summary_variant( variant_data, extra_attributes ) if v is not None: v.update_attributes({ "family_variants_count": [family_variants_count], "seen_in_status": [seen_in_status], "seen_as_denovo": [seen_as_denovo] }) return v return deserialize_row