Source code for dae.backends.schema2.serializers

import functools
import operator
import logging
from typing import Any

import pyarrow as pa
from dae.variants.core import Allele
from dae.variants.attributes import (
    Inheritance,
    TransmissionType,
    Sex,
    Role,
    Status,
)

logger = logging.getLogger(__name__)


[docs]class AlleleParquetSerializer: """Serialize a bunch of alleles.""" SUMMARY_ALLELE_BASE_SCHEMA = { "bucket_index": pa.int32(), "summary_index": pa.int32(), "allele_index": pa.int32(), "chromosome": pa.string(), "position": pa.int32(), "end_position": pa.int32(), "effect_gene": pa.list_( pa.field( "element", pa.struct( [ pa.field("effect_gene_symbols", pa.string()), pa.field("effect_types", pa.string()), ] ), ) ), "variant_type": pa.int8(), "transmission_type": pa.int8(), "reference": pa.string(), "af_allele_count": pa.int32(), "af_allele_freq": pa.float32(), "af_parents_called": pa.int32(), "af_parents_freq": pa.float32(), } FAMILY_ALLELE_BASE_SCHEMA = { "bucket_index": pa.int32(), "summary_index": pa.int32(), "allele_index": pa.int32(), "family_index": pa.int32(), "family_id": pa.string(), "is_denovo": pa.int8(), "allele_in_sexes": pa.int8(), "allele_in_statuses": pa.int8(), "allele_in_roles": pa.int32(), "inheritance_in_members": pa.int16(), "allele_in_members": pa.list_(pa.string()), } BASE_SEARCHABLE_PROPERTIES_TYPES = { "bucket_index": pa.int32(), "chromosome": pa.string(), "position": pa.int32(), "end_position": pa.int32(), "effect_gene": pa.list_( pa.struct( [ pa.field("effect_gene_symbols", pa.string()), pa.field("effect_types", pa.string()), ] ) ), "summary_index": pa.int32(), "allele_index": pa.int32(), "variant_type": pa.int8(), "transmission_type": pa.int8(), "reference": pa.string(), "family_index": pa.int32(), "family_id": pa.string(), "is_denovo": pa.int8(), "allele_in_sexes": pa.int8(), "allele_in_statuses": pa.int8(), "allele_in_roles": pa.int32(), "inheritance_in_members": pa.int16(), "allele_in_members": pa.string(), } ENUM_PROPERTIES = { "variant_type": Allele.Type, "transmission_type": TransmissionType, "allele_in_sexes": Sex, "allele_in_roles": Role, "allele_in_statuses": Status, "inheritance_in_members": Inheritance, } def __init__(self, annotation_schema, extra_attributes=None): self.annotation_schema = annotation_schema self._schema_summary = None self._schema_family = None additional_searchable_props = {} scores_searchable = {} scores_binary = {} self.scores_serializers = scores_binary self.searchable_properties_summary_types = { **self.SUMMARY_ALLELE_BASE_SCHEMA, **additional_searchable_props, **scores_searchable, } self.searchable_properties_family_types = { **self.FAMILY_ALLELE_BASE_SCHEMA } self.searchable_properties_types = { **self.BASE_SEARCHABLE_PROPERTIES_TYPES, **additional_searchable_props, **scores_searchable, } self.extra_attributes = [] if extra_attributes: for attribute_name in extra_attributes: self.extra_attributes.append(attribute_name) @property def schema_summary(self): """Lazy construct and return the schema for the summary alleles.""" if self._schema_summary is None: fields = [ pa.field(spr, pat) for spr, pat in ( self.SUMMARY_ALLELE_BASE_SCHEMA.items() ) ] fields.append(pa.field("summary_variant_data", pa.string())) annotation_type_to_pa_type = { "float": pa.float32(), "int": pa.int32(), } if self.annotation_schema is not None: for annotation in self.annotation_schema.public_fields: annotation_field_type = self.annotation_schema[ annotation ].type if annotation_field_type in annotation_type_to_pa_type: fields.append( pa.field( annotation, annotation_type_to_pa_type[ annotation_field_type ], ) ) self._schema_summary = pa.schema(fields) return self._schema_summary @property def schema_family(self): """Lazy construct and return the schema for the family alleles.""" if self._schema_family is None: fields = [] for spr, ftype in self.FAMILY_ALLELE_BASE_SCHEMA.items(): field = pa.field(spr, ftype) fields.append(field) fields.append(pa.field("family_variant_data", pa.string())) self._schema_family = pa.schema(fields) return self._schema_family @property def searchable_properties_summary(self): return self.searchable_properties_summary_types.keys() @property def searchable_properties_family(self): return self.searchable_properties_family_types.keys() @property def searchable_properties(self): return self.searchable_properties_types.keys() def _get_searchable_prop_value(self, allele, spr): prop_value = getattr(allele, spr, None) if prop_value is None: prop_value = allele.get_attribute(spr) if prop_value and spr in self.ENUM_PROPERTIES: if isinstance(prop_value, list): prop_value = functools.reduce( operator.or_, [enum.value for enum in prop_value if enum is not None], 0, ) else: prop_value = prop_value.value return prop_value
[docs] def build_family_allele_batch_dict(self, allele, family_variant_data) \ -> dict[str, list[Any]]: """Build a batch of family allele data in the form of a dict.""" family_header = [] family_properties = [] for spr in self.FAMILY_ALLELE_BASE_SCHEMA: prop_value = self._get_searchable_prop_value(allele, spr) family_header.append(spr) family_properties.append(prop_value) allele_data: dict[str, list] = { name: [] for name in self.schema_family.names } for name, value in zip(family_header, family_properties): allele_data[name].append(value) allele_data["family_variant_data"] = [family_variant_data] return allele_data
[docs] def build_summary_allele_batch_dict(self, allele, summary_variant_data) \ -> dict[str, list[Any]]: """Build a batch of summary allele data in the form of a dict.""" allele_data = {"summary_variant_data": [summary_variant_data]} for spr in self.SUMMARY_ALLELE_BASE_SCHEMA: if spr == "effect_gene": if allele.effect_types is None: assert allele.effect_gene_symbols is None prop_value = [ {"effect_types": None, "effect_gene_symbols": None} ] else: prop_value = [ {"effect_types": e[0], "effect_gene_symbols": e[1]} for e in zip( allele.effect_types, allele.effect_gene_symbols ) ] else: prop_value = self._get_searchable_prop_value(allele, spr) allele_data[spr] = [prop_value] if self.annotation_schema is not None: for field in self.annotation_schema.public_fields: allele_data[field] = [allele.get_attribute(field)] return allele_data