Source code for dae.genomic_resources.fsspec_protocol

"""Provides GRR protocols based on fsspec library."""
from __future__ import annotations

import datetime
import fcntl
import hashlib
import logging
import os
from collections.abc import Generator
from dataclasses import asdict
from types import TracebackType
from typing import (
    IO,
    Any,
    ContextManager,
    Dict,
    List,
    Optional,
    Union,
    cast,
)
from urllib.parse import urlparse

import fsspec
import jinja2
import pysam
import yaml

from dae.genomic_resources.repository import (
    GR_CONF_FILE_NAME,
    GR_CONTENTS_FILE_NAME,
    GR_INDEX_FILE_NAME,
    GR_MANIFEST_FILE_NAME,
    GenomicResource,
    Manifest,
    ManifestEntry,
    Mode,
    ReadOnlyRepositoryProtocol,
    ReadWriteRepositoryProtocol,
    ResourceFileState,
    is_gr_id_token,
    parse_gr_id_version_token,
    parse_resource_id_version,
)
from dae.utils.helpers import convert_size

logger = logging.getLogger(__name__)


def _scan_for_resources(
    content_dict: dict, parent_id: list[str],
) -> Generator[tuple[str, tuple[int, ...], dict], None, None]:
    name = "/".join(parent_id)
    id_ver = parse_gr_id_version_token(name)
    if isinstance(content_dict, dict) and id_ver and \
            GR_CONF_FILE_NAME in content_dict and \
            not isinstance(content_dict[GR_CONF_FILE_NAME], dict):
        # resource found
        resource_id, version = id_ver
        yield "/".join([*parent_id, resource_id]), version, content_dict
        return

    for name, content in content_dict.items():
        id_ver = parse_gr_id_version_token(name)
        if isinstance(content, dict) and id_ver and \
                GR_CONF_FILE_NAME in content and \
                not isinstance(content[GR_CONF_FILE_NAME], dict):
            # resource found
            resource_id, version = id_ver
            yield "/".join([*parent_id, resource_id]), version, content
        else:
            curr_id = parent_id + [name]
            curr_id_path = "/".join(curr_id)
            if not isinstance(content, dict):
                logger.warning("file <%s> is not used.", curr_id_path)
                continue
            if not is_gr_id_token(name):
                logger.warning(
                    "directory <%s> has a name <%s> that is not a "
                    "valid Genomic Resource Id Token.", curr_id_path, name)
                continue

            # scan children
            for rid, rver, rcontent in _scan_for_resources(content, curr_id):
                yield rid, rver, rcontent


def _scan_for_resource_files(
    content_dict: dict[str, Any], parent_dirs: list[str],
) -> Generator[tuple[str, Union[str, bytes]], None, None]:

    for path, content in content_dict.items():
        if isinstance(content, dict):
            # handle subdirectory
            for fname, fcontent in _scan_for_resource_files(
                    content, [*parent_dirs, path]):
                yield fname, fcontent
        else:
            fname = "/".join([*parent_dirs, path])
            if isinstance(content, (str, bytes)):
                # handle file content
                yield fname, content
            else:
                logger.error(
                    "unexpected content at %s: %s", fname, content)
                raise ValueError(f"unexpected content at {fname}: {content}")


[docs]def build_inmemory_protocol( proto_id: str, root_path: str, content: Dict[str, Any]) -> FsspecReadWriteProtocol: """Build and return an embedded fsspec protocol for testing.""" if not os.path.isabs(root_path): logger.error( "for embedded resources repository we expects an " "absolute path: %s", root_path) raise ValueError(f"not an absolute root path: {root_path}") proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol(proto_id, f"memory://{root_path}")) for rid, rver, rcontent in _scan_for_resources(content, []): resource = GenomicResource(rid, rver, proto) for fname, fcontent in _scan_for_resource_files(rcontent, []): mode = "wt" if isinstance(fcontent, bytes): mode = "wb" with proto.open_raw_file(resource, fname, mode) as outfile: outfile.write(fcontent) proto.save_resource_file_state( resource, proto.build_resource_file_state(resource, fname)) proto.save_manifest(resource, proto.build_manifest(resource)) return proto
[docs]class FsspecReadOnlyProtocol(ReadOnlyRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __init__( self, proto_id: str, url: str, filesystem: fsspec.AbstractFileSystem): super().__init__(proto_id) parsed = urlparse(url) self.scheme = parsed.scheme if self.scheme == "": self.scheme = "file" self.netloc = parsed.netloc self.root_path = parsed.path # if not self.root_path.startswith("/"): ### WHY??? # self.root_path = f"/{self.root_path}" self.url = f"{self.scheme}://{self.netloc}{self.root_path}" self.filesystem = filesystem self._all_resources: Optional[List[GenomicResource]] = None
[docs] def get_url(self) -> str: return self.url
[docs] def invalidate(self) -> None: self._all_resources = None
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" if self._all_resources is None: self._all_resources = [] content_filename = os.path.join( self.url, GR_CONTENTS_FILE_NAME) contents = yaml.safe_load(self.filesystem.open(content_filename)) for entry in contents: version = tuple(map(int, entry["version"].split("."))) manifest = Manifest.from_manifest_entries(entry["manifest"]) resource = self.build_genomic_resource( entry["id"], version, config=entry["config"], manifest=manifest) logger.debug( "url repo caching resource %s", resource.resource_id) self._all_resources.append(resource) self._all_resources = sorted( self._all_resources, key=lambda r: r.get_genomic_resource_id_version()) yield from self._all_resources
[docs] def get_resource_url(self, resource: GenomicResource) -> str: """Return url of the specified resources.""" resource_url = os.path.join( self.url, resource.get_genomic_resource_id_version()) return resource_url
[docs] def get_resource_file_url( self, resource: GenomicResource, filename: str) -> str: """Return url of a file in the resource.""" url = os.path.join( self.get_resource_url(resource), filename) return url
[docs] def file_exists( self, resource: GenomicResource, filename: str) -> bool: filepath = self.get_resource_file_url(resource, filename) return cast(bool, self.filesystem.exists(filepath))
[docs] def load_manifest(self, resource: GenomicResource) -> Manifest: """Load resource manifest.""" content = self.get_file_content(resource, GR_MANIFEST_FILE_NAME) return Manifest.from_file_content(content)
[docs] def open_raw_file( self, resource: GenomicResource, filename: str, mode: str = "rt", **kwargs: Union[str, bool, None]) -> IO: filepath = self.get_resource_file_url(resource, filename) if "w" in mode: if self.mode() == Mode.READONLY: raise OSError( f"Read-Only protocol {self.get_id()} trying to open " f"{filepath} for writing") # Create the containing directory if it doesn't exists. parent = os.path.dirname(filepath) if not self.filesystem.exists(parent): self.filesystem.mkdir( parent, create_parents=True, exist_ok=True) compression = None if kwargs.get("compression"): compression = "gzip" return cast( IO, self.filesystem.open( filepath, mode=mode, compression=compression))
def _get_file_url(self, resource: GenomicResource, filename: str) -> str: def process_file_url(url: str) -> str: if self.scheme == "file": return urlparse(url).path if self.scheme == "s3": return cast(str, self.filesystem.sign(url)) return url return process_file_url(self.get_resource_file_url(resource, filename))
[docs] def open_tabix_file( self, resource: GenomicResource, filename: str, index_filename: Optional[str] = None) -> pysam.TabixFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"tabix files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.TabixFile( # pylint: disable=no-member file_url, index=index_url, encoding="utf-8")
[docs] def open_vcf_file( self, resource: GenomicResource, filename: str, index_filename: Optional[str] = None) -> pysam.VariantFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"vcf files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.VariantFile( # pylint: disable=no-member file_url, index_filename=index_url)
[docs]class FsspecReadWriteProtocol( FsspecReadOnlyProtocol, ReadWriteRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __init__( self, proto_id: str, url: str, filesystem: fsspec.AbstractFileSystem): super().__init__(proto_id, url, filesystem) self.filesystem.makedirs(self.url, exist_ok=True) def _get_resource_file_lockfile_path( self, resource: GenomicResource, filename: str, ) -> str: """Return path of the resource file's lockfile.""" if self.scheme != "file": raise NotImplementedError resource_url = self.get_resource_url(resource) path = os.path.join(resource_url, ".grr", f"{filename}.lockfile") return path.removeprefix(f"{self.scheme}://")
[docs] def obtain_resource_file_lock( self, resource: GenomicResource, filename: str, ) -> ContextManager: """Lock a resource's file.""" class Lock: """Lock representation.""" def __enter__(self) -> None: pass def __exit__( self, exc_type: type[BaseException] | None, exc_value: Optional[BaseException], exc_tb: TracebackType | None) -> None: pass lock = Lock() if self.scheme == "file": path = self._get_resource_file_lockfile_path(resource, filename) if not self.filesystem.exists(os.path.dirname(path)): self.filesystem.makedirs( os.path.dirname(path), exist_ok=True) # pylint: disable=consider-using-with lockfile = open(path, "wt", encoding="utf8") lockfile.write(str(datetime.datetime.now()) + "\n") fcntl.flock(lockfile, fcntl.LOCK_EX) lock.__enter__ = lockfile.__enter__ # type: ignore lock.__exit__ = lockfile.__exit__ # type: ignore return lock
def _scan_path_for_resources( self, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, *path_array) path = os.path.join(self.root_path, *path_array) assert isinstance(url, str) if not self.filesystem.isdir(url): return content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) if GR_CONF_FILE_NAME in content: res_path = "/".join(path_array) resource_id, version = parse_resource_id_version(res_path) if resource_id is None: logger.error("bad resource id/version: %s", res_path) return yield resource_id, version, res_path else: for name in content: yield from self._scan_path_for_resources([*path_array, name]) def _scan_resource_for_files( self, resource_path: str, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, resource_path, *path_array) if not self.filesystem.isdir(url): if path_array: yield os.path.join(*path_array), url return path = os.path.join(self.root_path, resource_path, *path_array) content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) for name in content: yield from self._scan_resource_for_files( resource_path, [*path_array, name]) def _get_filepath_timestamp(self, filepath: str) -> float: try: modification = self.filesystem.modified(filepath) modification = modification.replace(tzinfo=datetime.timezone.utc) return cast(float, round(modification.timestamp(), 2)) except NotImplementedError: info = self.filesystem.info(filepath) modification = info.get("created") return cast(float, round(modification, 2))
[docs] def collect_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources managed by this protocol.""" for res_id, res_ver, res_path in self._scan_path_for_resources([]): res_fullpath = os.path.join(self.root_path, res_path) assert res_fullpath.startswith("/") res_fullpath = f"{self.scheme}://{self.netloc}{res_fullpath}" with self.filesystem.open( os.path.join( res_fullpath, GR_CONF_FILE_NAME), "rt") as infile: config = yaml.safe_load(infile) manifest: Optional[Manifest] = None manifest_filename = os.path.join( res_fullpath, GR_MANIFEST_FILE_NAME) if self.filesystem.exists(manifest_filename): with self.filesystem.open(manifest_filename, "rt") as infile: manifest = Manifest.from_file_content(infile.read()) yield self.build_genomic_resource( res_id, res_ver, config, manifest)
[docs] def collect_resource_entries(self, resource: GenomicResource) -> Manifest: """Scan the resource and resturn a manifest.""" resource_path = resource.get_genomic_resource_id_version() result = Manifest() for name, path in self._scan_resource_for_files(resource_path, []): size = self._get_filepath_size(path) result.add(ManifestEntry(name, size, None)) return result
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" if self._all_resources is None: self._all_resources = sorted( list(self.collect_all_resources()), key=lambda r: r.get_genomic_resource_id_version()) yield from self._all_resources
def _get_resource_file_state_path( self, resource: GenomicResource, filename: str) -> str: """Return filename of the resource file state path.""" resource_url = self.get_resource_url(resource) return os.path.join(resource_url, ".grr", f"{filename}.state")
[docs] def get_resource_file_timestamp( self, resource: GenomicResource, filename: str) -> float: url = self.get_resource_file_url(resource, filename) return self._get_filepath_timestamp(url)
def _get_filepath_size( self, filepath: str) -> int: fileinfo = self.filesystem.info(filepath) return int(fileinfo["size"])
[docs] def get_resource_file_size( self, resource: GenomicResource, filename: str) -> int: path = self.get_resource_file_url(resource, filename) return self._get_filepath_size(path)
[docs] def save_resource_file_state( self, resource: GenomicResource, state: ResourceFileState) -> None: """Save resource file state into internal GRR state.""" path = self._get_resource_file_state_path(resource, state.filename) if not self.filesystem.exists(os.path.dirname(path)): self.filesystem.makedirs( os.path.dirname(path), exist_ok=True) content = asdict(state) with self.filesystem.open(path, "wt", encoding="utf8") as outfile: outfile.write(yaml.safe_dump(content))
[docs] def load_resource_file_state( self, resource: GenomicResource, filename: str) -> Optional[ResourceFileState]: """Load resource file state from internal GRR state. If the specified resource file has no internal state returns None. """ path = self._get_resource_file_state_path(resource, filename) if not self.filesystem.exists(path): return None with self.filesystem.open(path, "rt", encodings="utf8") as infile: content = yaml.safe_load(infile.read()) return ResourceFileState( content["filename"], content["size"], content["timestamp"], content["md5"], )
[docs] def delete_resource_file( self, resource: GenomicResource, filename: str) -> None: """Delete a resource file and it's internal state.""" filepath = self.get_resource_file_url(resource, filename) if self.filesystem.exists(filepath): self.filesystem.delete(filepath) statepath = self._get_resource_file_state_path(resource, filename) if self.filesystem.exists(statepath): self.filesystem.delete(statepath)
[docs] def copy_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> Optional[ResourceFileState]: """Copy a resource file into repository.""" assert dest_resource.resource_id == remote_resource.resource_id logger.debug( "copying resource file (%s: %s) from %s", remote_resource.resource_id, filename, remote_resource.proto.proto_id) remote_manifest = remote_resource.get_manifest() if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return None manifest_entry = remote_manifest[filename] dest_filepath = self.get_resource_file_url(dest_resource, filename) dest_parent = os.path.dirname(dest_filepath) if not self.filesystem.exists(dest_parent): self.filesystem.mkdir( dest_parent, create_parents=True, exist_ok=True) with remote_resource.open_raw_file( filename, "rb", uncompress=False) as infile, \ self.open_raw_file( dest_resource, filename, "wb", uncompress=False) as outfile: md5_hash = hashlib.md5() while chunk := infile.read(self.CHUNK_SIZE): outfile.write(chunk) md5_hash.update(chunk) md5 = md5_hash.hexdigest() if not self.filesystem.exists(dest_filepath): raise OSError(f"destination file not created {dest_filepath}") if md5 != manifest_entry.md5: raise OSError( f"file copy is broken " f"{dest_resource.resource_id} ({filename}); " f"md5sum are different: " f"{md5}!={manifest_entry.md5}") state = self.build_resource_file_state( dest_resource, filename, md5sum=md5) self.save_resource_file_state(dest_resource, state) return state
[docs] def update_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> Optional[ResourceFileState]: """Update a resource file into repository if needed.""" assert dest_resource.resource_id == remote_resource.resource_id if not self.file_exists(dest_resource, filename): return self.copy_resource_file( remote_resource, dest_resource, filename) local_state = self.load_resource_file_state(dest_resource, filename) if local_state is None: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) else: timestamp = self.get_resource_file_timestamp( dest_resource, filename) size = self.get_resource_file_size(dest_resource, filename) if timestamp != local_state.timestamp or \ size != local_state.size: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) remote_manifest = remote_resource.get_manifest() if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return None manifest_entry = remote_manifest[filename] if local_state.md5 != manifest_entry.md5: return self.copy_resource_file( remote_resource, dest_resource, filename) return local_state
[docs] def build_content_file(self) -> list[dict[str, Any]]: """Build the content of the repository (i.e '.CONTENTS' file).""" content = [ { "id": res.resource_id, "version": res.get_version_str(), "config": res.get_config(), "manifest": res.get_manifest().to_manifest_entries(), } for res in self.get_all_resources()] content = sorted(content, key=lambda x: x["id"]) # type: ignore content_filepath = os.path.join( self.url, GR_CONTENTS_FILE_NAME) with self.filesystem.open( content_filepath, "wt", encoding="utf8") as outfile: yaml.dump(content, outfile) return content
[docs] def build_index_info(self, repository_template: jinja2.Template) -> dict: """Build info dict for the repository.""" result = {} for res in self.get_all_resources(): res_size = convert_size( sum(f for _, f in res.get_manifest().get_files()), ) assert res.config is not None result[res.resource_id] = { **res.config, "res_version": res.get_version_str(), "res_files": len(list(res.get_manifest().get_files())), "res_size": res_size, "res_summary": res.get_summary(), } content_filepath = os.path.join(self.url, GR_INDEX_FILE_NAME) with self.filesystem.open( content_filepath, "wt", encoding="utf8") as outfile: outfile.write(repository_template.render(data=result)) return result
[docs]def build_local_resource( dirname: str, config: Dict[str, Any]) -> GenomicResource: """Build a resource from a local filesystem directory.""" proto = build_fsspec_protocol("d", dirname) resource = GenomicResource( dirname, (0, ), proto, config) return resource
FsspecRepositoryProtocol = Union[ FsspecReadOnlyProtocol, FsspecReadWriteProtocol]
[docs]def build_fsspec_protocol( proto_id: str, root_url: str, **kwargs: Union[str, None], ) -> FsspecRepositoryProtocol: """Create fsspec GRR protocol based on the root url.""" url = urlparse(root_url) # pylint: disable=import-outside-toplevel if url.scheme in {"file", ""}: from fsspec.implementations.local import LocalFileSystem filesystem = LocalFileSystem() return FsspecReadWriteProtocol( proto_id, root_url, filesystem) if url.scheme in {"http", "https"}: from fsspec.implementations.http import HTTPFileSystem base_url = kwargs.get("base_url") filesystem = HTTPFileSystem(client_kwargs={"base_url": base_url}) return FsspecReadOnlyProtocol(proto_id, root_url, filesystem) if url.scheme == "s3": filesystem = kwargs.get("filesystem") if filesystem is None: from s3fs.core import S3FileSystem endpoint_url = kwargs.get("endpoint_url") filesystem = S3FileSystem( anon=False, client_kwargs={"endpoint_url": endpoint_url}) return FsspecReadWriteProtocol( proto_id, root_url, filesystem) if url.scheme == "memory": from fsspec.implementations.memory import MemoryFileSystem filesystem = MemoryFileSystem() return FsspecReadWriteProtocol(proto_id, root_url, filesystem) raise NotImplementedError(f"unsupported schema {url.scheme}")