Source code for dae.gene.denovo_gene_set_collection_factory

import json
import os
from itertools import product

from dae.effect_annotation.effect import expand_effect_types
from dae.gene.denovo_gene_set_collection import DenovoGeneSetCollection
from dae.variants.attributes import Inheritance, Sex

[docs]class DenovoGeneSetCollectionFactory: """Class for creating and loading created denovo gene sets."""
[docs] @staticmethod def denovo_gene_set_cache_file(config, person_set_collection_id=""): """Return the path to the cache file for a person set collection.""" cache_path = os.path.join( config.conf_dir, "denovo-cache-" + person_set_collection_id + ".json", ) return cache_path
[docs] @classmethod def load_collection(cls, genotype_data_study): """Load a denovo gene set collection for a given study.""" config = genotype_data_study.config assert config is not None, selected_person_set_collections = \ config.denovo_gene_sets.selected_person_set_collections person_set_collection_configs = { psc.config_json() for psc in genotype_data_study.person_set_collections.values() if in selected_person_set_collections } collection = DenovoGeneSetCollection( genotype_data_study.study_id,, config, dict(person_set_collection_configs.items())) for ( person_set_collection_id ) in config.denovo_gene_sets.selected_person_set_collections: cache_dir = cls.denovo_gene_set_cache_file( config, person_set_collection_id, ) if not os.path.exists(cache_dir): raise OSError( f"Denovo gene sets caches dir '{cache_dir}' " f"does not exists", ) with open(cache_dir, "r") as infile: contents = json.load(infile) # change all list to sets after loading from json contents = cls._convert_cache_innermost_types(contents, list, set) collection.cache[person_set_collection_id] = contents return collection
[docs] @classmethod def build_collection(cls, genotype_data_study): """Build a denovo gene set collection for a study and save it.""" config = genotype_data_study.config assert config is not None, denovo_person_set_collections = \ config.denovo_gene_sets.selected_person_set_collections for person_set_collection_id in denovo_person_set_collections: gene_set_cache = cls._generate_gene_set_for( genotype_data_study, config.denovo_gene_sets, person_set_collection_id, ) cache_path = cls.denovo_gene_set_cache_file( config, person_set_collection_id, ) cls._save_cache(gene_set_cache, cache_path)
@classmethod def _format_criterias(cls, standard_criterias): """ Replicates functionality from denovo gene set config parser. Given a TOML config's standard criterias, it does additional formatting which was done before in the parser. """ effect_type_criterias = [] for name, criteria in standard_criterias.effect_types.segments.items(): effect_type_criterias.append( { "property": "effect_types", "name": name, "value": expand_effect_types(criteria), }, ) sex_criterias = [] for name, criteria in standard_criterias.sexes.segments.items(): sex_criterias.append( { "property": "sexes", "name": name, "value": [Sex.from_name(criteria)], }, ) return (effect_type_criterias, sex_criterias) @classmethod def _recursive_cache_update( cls, input_cache: dict, updater_cache: dict, ) -> None: """Recursively update a dictionary with another dictionary.""" # FIXME ! # This method cannot handle nested dictionaries # that hold a reference to the dictionary that # contains them. If such a dictionary is given # to this function, it will reach the maximum # recursion depth. for key, val in updater_cache.items(): if key in input_cache and isinstance(val, dict): assert isinstance(input_cache[key], dict), updater_cache[key] cls._recursive_cache_update( input_cache[key], updater_cache[key], ) elif key in input_cache and isinstance(val, set): input_cache[key] = input_cache[key].union(val) else: input_cache[key] = updater_cache[key] @classmethod def _generate_gene_set_for( cls, genotype_data, config, person_set_collection_id): """ Produce a nested dictionary which represents a denovo gene set. It maps denovo gene set criteria to an innermost dictionary mapping gene set symbols to lists of family IDs. """ person_set_collection = genotype_data.get_person_set_collection( person_set_collection_id, ) cache = { set_id: {} for set_id in person_set_collection.person_sets.keys() } variants = genotype_data.query_variants(inheritance=["denovo"]) criterias = list(product( *cls._format_criterias(config.standard_criterias)), ) for variant in variants: for criteria_combination in criterias: search_args = { criteria["property"]: criteria["value"] for criteria in criteria_combination } for person_set in person_set_collection.person_sets.values(): innermost_cache = cache[] for criteria in criteria_combination: innermost_cache = innermost_cache.setdefault( criteria["name"], {}, ) persons_in_set = set(person_set.persons.keys()) cls._recursive_cache_update( innermost_cache, cls._add_genes_families( variant, persons_in_set, search_args, ), ) return cache @classmethod def _save_cache(cls, cache, cache_path): """Write a denovo gene set cache to the filesystem in JSON format.""" # change all sets to lists so they can be saved in json cache = cls._convert_cache_innermost_types( cache, set, list, sort_values=True, ) if not os.path.exists(os.path.dirname(cache_path)): os.makedirs(os.path.dirname(cache_path)) with open(cache_path, "w") as out: json.dump( cache, out, sort_keys=True, indent=4, separators=(",", ": "), ) @classmethod def _convert_cache_innermost_types( cls, cache, from_type, to_type, sort_values=False, ): """ Coerce the types of all values in a dictionary matching a given type. This is done recursively. """ if isinstance(cache, from_type): if sort_values is True: return sorted(to_type(cache)) return to_type(cache) assert isinstance( cache, dict, ), f"expected type 'dict', got '{type(cache)}'" res = {} for key, value in cache.items(): res[key] = cls._convert_cache_innermost_types( value, from_type, to_type, sort_values=sort_values, ) return res @staticmethod def _add_genes_families(variant, persons_in_set, search_args): """ Return a map of gene symbols in variants to family IDs. For the given variants and people with a certain people group, produce a dictionary which maps the gene symbols of those variants matching the given search_args to the IDs of the families in which those variants are found. """ cache = {} family_id = variant.family_id for aa in variant.alt_alleles: if Inheritance.denovo not in aa.inheritance_in_members: continue if not set(aa.variant_in_members_fpid) & persons_in_set: continue filter_flag = False for search_arg_name, search_arg_value in search_args.items(): if search_arg_name == "effect_types": # FIXME: Avoid conversion of effect types to set if not ( aa.effects and set(aa.effects.types) & set(search_arg_value) ): filter_flag = True break elif search_arg_name == "sexes": if not ( set(aa.variant_in_sexes) & set(search_arg_value) ): filter_flag = True break if filter_flag: continue effect = aa.effects for gene in effect.genes: if gene.effect in search_args.get("effect_types", set()): cache.setdefault(gene.symbol, set()).add(family_id) return cache