Source code for dae.backends.schema2.bigquery_variants

import time
import json
import logging
from contextlib import closing
import configparser
import numpy as np
from google.cloud import bigquery
from dae.pedigrees.family import FamiliesData
from dae.variants.attributes import Role, Status, Sex
from dae.backends.schema2.base_query_builder import Dialect
from dae.backends.schema2.family_builder import FamilyQueryBuilder
from dae.backends.schema2.summary_builder import SummaryQueryBuilder
from dae.variants.variant import SummaryVariantFactory
from dae.variants.family_variant import FamilyVariant

logger = logging.getLogger(__name__)


[docs]class BigQueryDialect(Dialect): """Abstracts away details related to bigquery.""" def __init__(self, ns: str = None): super().__init__(namespace=ns)
[docs] @staticmethod def add_unnest_in_join() -> bool: return True
[docs] @staticmethod def int_type() -> str: return "INT64"
[docs] @staticmethod def float_type() -> str: return "FLOAT64"
# FIXME too-many-instance-attributes # pylint: disable=too-many-instance-attributes
[docs]class BigQueryVariants: """Backend for BigQuery.""" def __init__( self, gcp_project_id, db, summary_allele_table, family_variant_table, pedigree_table, meta_table, gene_models=None, ): super().__init__() assert db, db assert pedigree_table, pedigree_table # instead of a connection handler bigquery has a client object self.dialect = BigQueryDialect(ns=gcp_project_id) self.client = bigquery.Client(project=gcp_project_id) self.db = db self.start_time = time.time() # meta table: partition settings self.meta_table = meta_table # family and summary tables self.summary_allele_table = summary_allele_table self.family_variant_table = family_variant_table self.summary_allele_schema = self._fetch_schema(summary_allele_table) self.family_variant_schema = self._fetch_schema(family_variant_table) self.combined_columns = { **self.family_variant_schema, **self.summary_allele_schema, } # pedigree tables self.pedigree_table = pedigree_table self.pedigree_schema = self._fetch_schema(self.pedigree_table) self.pedigree_df = self._fetch_pedigree() self.families = FamiliesData.from_pedigree_df(self.pedigree_df) # serializer # VariantSchema = namedtuple('VariantSchema', 'col_names') # self.variants_schema = VariantSchema( # col_names=list(self.combined_columns)) # self.serializer = AlleleParquetSerializer( # variants_schema=self.variants_schema) self.gene_models = gene_models assert gene_models is not None # self._fetch_tblproperties() # hardcoding relevant for specific dataset # pass in table_properties OR table in datastore _tbl_props = self._fetch_tblproperties(self.meta_table) self.table_properties = { "region_length": int(_tbl_props["region_bin"]["region_length"]), "chromosomes": list( map( lambda c: c.strip(), _tbl_props["region_bin"]["chromosomes"].split(","), ) ), "family_bin_size": int( _tbl_props["family_bin"]["family_bin_size"] ), "rare_boundary": int(_tbl_props["frequency_bin"]["rare_boundary"]), "coding_effect_types": { s.strip() for s in _tbl_props["coding_bin"][ "coding_effect_types" ].split(",") }, } def _fetch_tblproperties(self, meta_table): query = f"""SELECT value FROM {self.db}.{meta_table} WHERE key = 'partition_description' LIMIT 1 """ result = self.client.query(query).result() config = configparser.ConfigParser() for row in result: config.read_string(row[0]) return config def _fetch_schema(self, table): query = f""" SELECT * FROM {self.db}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """ df = self.client.query(query).result().to_dataframe() records = df[["column_name", "data_type"]].to_records() schema = {col_name: col_type for (_, col_name, col_type) in records} return schema def _fetch_pedigree(self): query = f"SELECT * FROM {self.db}.{self.pedigree_table}" # ped_df = pandas_gbq.read_gbq(q, project_id=self.gcp_project_id) ped_df = self.client.query(query).result().to_dataframe() columns = { "personId": "person_id", "familyId": "family_id", "momId": "mom_id", "dadId": "dad_id", "sampleId": "sample_id", "sex": "sex", "status": "status", "role": "role", "generated": "generated", "layout": "layout", "phenotype": "phenotype", } if "not_sequenced" in self.pedigree_schema: columns = {"not_sequenced": "not_sequenced"} ped_df = ped_df.rename(columns=columns) ped_df.role = ped_df.role.apply(Role) ped_df.sex = ped_df.sex.apply(Sex) ped_df.status = ped_df.status.apply(Status) return ped_df def _summary_variants_iterator( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, limit=None, affected_status=None, ): # pylint: disable=too-many-arguments,too-many-locals query_builder = SummaryQueryBuilder( self.dialect, self.db, self.family_variant_table, self.summary_allele_table, self.pedigree_table, self.family_variant_schema, self.summary_allele_schema, self.table_properties, self.pedigree_schema, self.pedigree_df, self.families, gene_models=self.gene_models, do_join_affected=False, ) query = query_builder.build_query( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, affected_status=affected_status, ) result = self.client.query(query) for row in result: try: sv_record = json.loads(row.summary_variant_data) sv = SummaryVariantFactory.summary_variant_from_records( sv_record ) if sv is None: continue yield sv except Exception as ex: # pylint: disable=broad-except logger.error("unable to deserialize summary variant (BQ)") logger.exception(ex) continue def _family_variants_iterator( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, limit=None, affected_status=None, ): # pylint: disable=too-many-arguments,too-many-locals do_join_affected = affected_status is not None query_builder = FamilyQueryBuilder( self.dialect, self.db, self.family_variant_table, self.summary_allele_table, self.pedigree_table, self.family_variant_schema, self.summary_allele_schema, self.table_properties, self.pedigree_schema, self.pedigree_df, gene_models=self.gene_models, do_join_affected=do_join_affected, ) query = query_builder.build_query( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, affected_status=affected_status, ) # ------------------ DEBUG --------------------- result = [] logger.info("BQ QUERY BUILDER:\n%s}", query) start = time.perf_counter() bq_job = self.client.query(query) end = time.perf_counter() logger.info("TIME (BQ DB): %f", end - start) result = bq_job # ------------------ DEBUG --------------------- for row in result: try: sv_record = json.loads(row.summary_variant_data) fv_record = json.loads(row.family_variant_data) fv = FamilyVariant( SummaryVariantFactory.summary_variant_from_records( sv_record ), self.families[fv_record["family_id"]], np.array(fv_record["genotype"]), np.array(fv_record["best_state"]), ) if fv is None: continue yield fv except Exception as ex: # pylint: disable=broad-except logger.error("unable to deserialize family variant (BQ)") logger.exception(ex) continue
[docs] def query_summary_variants( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, limit=None, ): """Query summary variants.""" # FIXME too-many-arguments # pylint: disable=too-many-arguments if limit is None: count = -1 else: count = limit limit = 10 * limit with closing( self._summary_variants_iterator( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, ) ) as sv_iterator: for sv in sv_iterator: if sv is None: continue yield sv count -= 1 if count == 0: break
[docs] def query_variants( self, regions=None, genes=None, effect_types=None, family_ids=None, person_ids=None, inheritance=None, roles=None, sexes=None, variant_type=None, real_attr_filter=None, ultra_rare=None, frequency_filter=None, return_reference=None, return_unknown=None, limit=None, affected_status=None, ): """Query summary variants.""" # FIXME too-many-arguments # pylint: disable=too-many-arguments if limit is None: count = -1 else: count = limit limit = 10 * limit with closing( self._family_variants_iterator( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, affected_status=affected_status, ) ) as fv_iterator: for v in fv_iterator: if v is None: continue yield v count -= 1 if count == 0: break logger.debug("[DONE] FAMILY VARIANTS QUERY")