dae.import_tools package

Submodules

dae.import_tools.import_tools module

class dae.import_tools.import_tools.Bucket(type: str, region_bin: str, regions: list[str], index: int)[source]

Bases: object

A region of the input used for processing.

index: int
region_bin: str
regions: list[str]
type: str
class dae.import_tools.import_tools.ImportConfigNormalizer[source]

Bases: object

Class to normalize import configs.

Most of the normalization is done by Cerberus but it fails short in a few cases. This class picks up the slack. It also reads external files and embeds them in the final configuration dict.

normalize(import_config: dict, base_input_dir: str) tuple[dict[str, Any], str, list[str]][source]

Normalize the import config.

class dae.import_tools.import_tools.ImportProject(import_config: dict[str, Any], base_input_dir: str | None, base_config_dir: str | None = None, gpf_instance: GPFInstance | None = None, config_filenames: list[str] | None = None)[source]

Bases: object

Encapsulate the import configuration.

This class creates the necessary objects needed to import a study (e.g. loaders, family data and so one).

build_annotation_pipeline() AnnotationPipeline[source]
static build_from_config(import_config: dict[str, Any], base_input_dir: str = '', gpf_instance: GPFInstance | None = None) ImportProject[source]

Create a new project from the provided config.

The config is first validated and normalized. :param import_config: The config to use for the import. :base_input_dir: Default input dir. Use cwd by default.

static build_from_file(import_filename: str | PathLike, gpf_instance: GPFInstance | None = None) ImportProject[source]

Create a new project from the provided config filename.

The file is first parsed, validated and normalized. The path to the file is used as the default input path for the project.

Parameters:
  • import_filename – Path to the config file

  • gpf_instance – Gpf Instance to use.

build_variants_loader_pipeline(variants_loader: VariantsLoader) VariantsLoader[source]

Create an annotation pipeline around variants_loader.

static del_loader_prefix(params: dict[str, Any], prefix: str) dict[str, Any][source]

Remove prefix from parameter keys.

get_annotation_pipeline_config() list[dict][source]

Return the annotation pipeline configuration.

get_genotype_storage() GenotypeStorage[source]

Find, create and return the correct genotype storage.

get_gpf_instance() GPFInstance[source]

Create and return a gpf instance as desribed in the config.

get_import_storage() ImportStorage[source]

Create an import storage as described in the import config.

get_import_variants_buckets() list[dae.import_tools.import_tools.Bucket][source]

Split variant files into buckets enabling parallel processing.

get_input_filenames(bucket: Bucket) list[str][source]

Get a list of input files for a specific bucket.

get_parquet_dataset_dir() str[source]

Return parquet dataset direcotry.

If processing parquet dataset dir is configured this method will return it. Otherwise it will construct work dir parquet dataset directory.

get_partition_descriptor() PartitionDescriptor[source]
get_pedigree() FamiliesData[source]

Load, parse and return the pedigree data.

get_pedigree_filename() str[source]

Return the path to the pedigree file.

get_pedigree_loader() FamiliesLoader[source]
get_pedigree_params() tuple[str, dict[str, Any]][source]

Get params for loading the pedigree.

get_processing_parquet_dataset_dir() str | None[source]

Return processing parquet dataset dir if configured and exists.

get_row_group_size() int[source]
get_variant_loader(bucket: Bucket | None = None, loader_type: str | None = None, reference_genome: ReferenceGenome | None = None) VariantsLoader[source]

Get the appropriate variant loader for the specified bucket.

get_variant_loader_chromosomes(loader_type: str | None = None) list[str][source]

Collect all chromosomes available in input files.

get_variant_loader_types() set[str][source]

Collect all variant import types used in the project.

get_variant_params(loader_type: str) tuple[Union[str, list[str]], dict[str, Any]][source]

Return variant loader filenames and params.

has_denovo_variants() bool[source]

Check if the resulting imported study has denovo variants.

has_genotype_storage() bool[source]

Return if a genotype storage can be created.

has_variants() bool[source]
property include_reference: bool

Check if the import should include ref allele in the output data.

property input_dir: str

Return the path relative to which input files are specified.

property study_id: str
property work_dir: str

Where to store generated import files (e.g. parquet files).

class dae.import_tools.import_tools.ImportStorage[source]

Bases: ABC

Defines abstract base class for import storages.

abstract generate_import_task_graph(project: ImportProject) TaskGraph[source]

Generate task grap for import of the project into this storage.

class dae.import_tools.import_tools.MakefilePartitionHelper(partition_descriptor: PartitionDescriptor, genome: ReferenceGenome)[source]

Bases: object

Helper class for organizing partition targets.

bucket_index(region_bin: str) int[source]

Return bucket index based on variants target.

static build_target_chromosomes(target_chromosomes: list[str]) list[str][source]
generate_chrom_targets(target_chrom: str) list[tuple[str, str]][source]

Generate variant targets based on partition descriptor.

generate_variants_targets(target_chromosomes: list[str], mode: str | None = None) dict[str, list][source]

Produce variants targets.

region_bins_count(chrom: str) int[source]
dae.import_tools.import_tools.construct_import_annotation_pipeline(gpf_instance: GPFInstance, annotation_configfile: str | None = None) AnnotationPipeline[source]

Construct annotation pipeline for importing data.

dae.import_tools.import_tools.construct_import_annotation_pipeline_config(gpf_instance: GPFInstance, annotation_configfile: str | None = None) list[dict][source]

Construct annotation pipeline config for importing data.

dae.import_tools.import_tools.get_import_storage_factory(storage_type: str) Callable[[], ImportStorage][source]

Find and return a factory function for creation of a storage type.

dae.import_tools.import_tools.get_import_storage_types() list[str][source]
dae.import_tools.import_tools.register_import_storage_factory(storage_type: str, factory: Callable[[], ImportStorage]) None[source]
dae.import_tools.import_tools.save_study_config(dae_config: Box, study_id: str, study_config: str, force: bool = False) None[source]

Save the study config to a file.

dae.import_tools.parquet_writer module

dae.import_tools.task_graph module

Module contents