import os
import shutil
import time
import glob
import logging
from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.configuration.study_config_builder import StudyConfigBuilder
from dae.pedigrees.loader import FamiliesLoader
from dae.backends.storage.genotype_storage import GenotypeStorage
from dae.backends.raw.loader import StoredAnnotationDecorator
from dae.backends.raw.raw_variants import RawMemoryVariants
from dae.backends.vcf.loader import VcfLoader
from dae.backends.dae.loader import DenovoLoader, DaeTransmittedLoader
from dae.backends.cnv.loader import CNVLoader
from dae.utils.dict_utils import recursive_dict_update
logger = logging.getLogger(__name__)
[docs]class FilesystemGenotypeStorage(GenotypeStorage):
"""A storage that uses the filesystem as its backend."""
def __init__(self, storage_config, section_id):
super().__init__(storage_config, section_id)
self.data_dir = self.storage_config.dir
[docs] def get_data_dir(self, *path):
return os.path.abspath(os.path.join(self.storage_config.dir, *path))
[docs] def is_filestorage(self):
return True
[docs] def build_backend(self, study_config, genome, gene_models):
if not study_config.genotype_storage.files:
data_dir = self.get_data_dir(study_config.id, "data")
vcf_filename = os.path.join(
data_dir, f"{study_config.id}.vcf")
ped_filename = os.path.join(
data_dir, f"{study_config.id}.ped")
families_loader = FamiliesLoader(ped_filename)
families = families_loader.load()
variants_loader = VcfLoader(
families, [vcf_filename], genome
)
variants_loader = StoredAnnotationDecorator.decorate(
variants_loader, vcf_filename
)
return RawMemoryVariants([variants_loader], families)
start = time.time()
ped_params = \
study_config.genotype_storage.files.pedigree.params.to_dict()
ped_filename = study_config.genotype_storage.files.pedigree.path
logger.debug("pedigree params: %s; %s", ped_filename, ped_params)
families_loader = FamiliesLoader(ped_filename, **ped_params)
families = families_loader.load()
elapsed = time.time() - start
logger.info("families loaded in in %.2f sec", elapsed)
loaders = []
for file_conf in study_config.genotype_storage.files.variants:
start = time.time()
variants_filename = file_conf.path
variants_params = file_conf.params.to_dict()
logger.debug(
"variant params: %s; %s", variants_filename, variants_params)
annotation_filename = variants_filename
if file_conf.format == "vcf":
variants_filenames = [
fn.strip() for fn in variants_filename.split(" ")
]
variants_loader = VcfLoader(
families,
variants_filenames,
genome,
params=variants_params,
)
annotation_filename = variants_filenames[0]
if file_conf.format == "denovo":
variants_loader = DenovoLoader(
families,
variants_filename,
genome,
params=variants_params,
)
if file_conf.format == "dae":
variants_loader = DaeTransmittedLoader(
families,
variants_filename,
genome,
params=variants_params,
)
if file_conf.format == "cnv":
variants_loader = CNVLoader(
families,
variants_filename,
genome,
params=variants_params,
)
variants_loader = StoredAnnotationDecorator.decorate(
variants_loader, annotation_filename
)
loaders.append(variants_loader)
return RawMemoryVariants(loaders, families)
[docs] def simple_study_import(
self,
study_id,
families_loader=None,
variant_loaders=None,
study_config=None,
**kwargs,
):
families_config = self._import_families_file(study_id, families_loader)
variants_config = self._import_variants_files(
study_id, variant_loaders
)
config_dict = {
"id": study_id,
"conf_dir": ".",
"has_denovo": False,
"has_cnv": False,
"genotype_storage": {
"id": self.storage_id,
"files": {
"variants": variants_config,
"pedigree": families_config,
},
},
"genotype_browser": {"enabled": True},
}
if not variant_loaders:
config_dict["genotype_browser"]["enabled"] = False
else:
variant_loaders[0].get_attribute("source_type")
if any(
loader.get_attribute("source_type") == "denovo"
for loader in variant_loaders):
config_dict["has_denovo"] = True
if any(
loader.get_attribute("source_type") == "cnv"
for loader in variant_loaders):
config_dict["has_denovo"] = True # FIXME
config_dict["has_cnv"] = True
if study_config is not None:
study_config_dict = GPFConfigParser.load_config_raw(study_config)
config_dict = recursive_dict_update(config_dict, study_config_dict)
config_builder = StudyConfigBuilder(config_dict)
return config_builder.build_config()
def _import_families_file(self, study_id, families_loader):
source_filename = families_loader.filename
destination_filename = os.path.join(
self.data_dir, study_id, "data", os.path.basename(source_filename)
)
params = families_loader.build_arguments_dict()
for key, value in params.items():
if isinstance(value, bool):
params[key] = "true" if value else "false"
if isinstance(value, str) and "\t" in value:
value = value.replace("\t", "\\t")
params[key] = value
config = {"path": destination_filename, "params": params}
os.makedirs(os.path.dirname(destination_filename), exist_ok=True)
shutil.copyfile(source_filename, destination_filename)
return config
def _import_variants_files(self, study_id, loaders):
result_config = []
destination_dirname = os.path.join(self.data_dir, study_id, "data")
for variants_loader in loaders:
def construct_destination_filename(fname):
return os.path.join(
destination_dirname, os.path.basename(fname))
source_filenames = variants_loader.variants_filenames
destination_filenames = list(
map(construct_destination_filename, source_filenames)
)
params = variants_loader.build_arguments_dict()
source_type = variants_loader.get_attribute("source_type")
for key, value in params.items():
if isinstance(value, bool):
params[key] = "true" if value else "false"
if isinstance(value, str) and "\t" in value:
value = value.replace("\t", "\\t")
params[key] = value
config = {
"path": " ".join(destination_filenames),
"params": params,
"format": source_type,
}
logger.debug("config prepared: %s", config)
result_config.append(config)
os.makedirs(destination_dirname, exist_ok=True)
annotation_filename = \
StoredAnnotationDecorator.build_annotation_filename(
destination_filenames[0]
)
StoredAnnotationDecorator.save_annotation_file(
variants_loader, annotation_filename
)
for filename in variants_loader.filenames:
source_filenames = glob.glob(f"{filename}*")
logger.debug("source filenames: %s", source_filenames)
for fname in source_filenames:
logger.debug(
"copying: %s -> %s",
fname, construct_destination_filename(fname))
shutil.copyfile(
fname, construct_destination_filename(fname))
return result_config