Source code for dae.backends.impala.hdfs_helpers

import os
import re
import tempfile
import logging

from pyarrow import fs  # type: ignore
from fsspec.implementations.arrow import ArrowFSWrapper  # type: ignore


logger = logging.getLogger(__name__)


[docs]class HdfsHelpers: """Helper methods for working with HDFS.""" def __init__(self, hdfs_host, hdfs_port, replication=None): assert hdfs_host assert hdfs_port if os.environ.get("DAE_HDFS_HOST", None) is not None: hdfs_host = os.environ.get("DAE_HDFS_HOST") print("hdfs overwrite connecting to:", hdfs_host, hdfs_port) self.host = hdfs_host self.port = hdfs_port self.replication = replication self._hdfs = None @property def hdfs(self): """Return a file system for working with HDFS.""" if self._hdfs is None: extra_conf = None if self.replication and self.replication > 0: assert self.replication > 0, self.replication extra_conf = { "dfs.replication": str(self.replication) } logger.info("hdfs connecting to: %s:%s; extra: %s", self.host, self.port, extra_conf) hdfs = fs.HadoopFileSystem( host=self.host, port=self.port, extra_conf=extra_conf) self._hdfs = ArrowFSWrapper(hdfs) return self._hdfs
[docs] def exists(self, path): return self.hdfs.exists(path)
[docs] def mkdir(self, path): self.hdfs.mkdir(path)
[docs] def makedirs(self, path): """Make all dire alone the path.""" if path[0] == os.sep: paths = path[1:].split(os.sep) paths[0] = "/" + paths[0] else: paths = path.split(os.sep) current_path = "" for directory in paths: current_path = os.path.join(current_path, directory) if not self.exists(current_path): self.mkdir(current_path) return self.exists(current_path)
[docs] def tempdir(self, prefix="", suffix=""): dirname = tempfile.mktemp(prefix=prefix, suffix=suffix) # NOSONAR logger.debug("creating temporary directory %s", dirname) self.mkdir(dirname) assert self.exists(dirname) return dirname
[docs] def delete(self, path, recursive=False): return self.hdfs.delete(path, recursive=recursive)
[docs] def filesystem(self): return self.hdfs
[docs] def rename(self, path, new_path): self.hdfs.rename(path, new_path)
[docs] def put(self, local_filename, hdfs_filename): assert os.path.exists(local_filename) self.hdfs.upload(local_filename, hdfs_filename)
[docs] def put_in_directory(self, local_file, hdfs_dirname): basename = os.path.basename(local_file) hdfs_filename = os.path.join(hdfs_dirname, basename) self.put(local_file, hdfs_filename)
[docs] def put_content(self, local_path, hdfs_dirname): """Copy local_path to hdfs_dirname.""" assert os.path.exists(local_path), local_path if os.path.isdir(local_path): for local_file in os.listdir(local_path): self.put_in_directory( os.path.join(local_path, local_file), hdfs_dirname ) else: self.put_in_directory(local_path, hdfs_dirname)
[docs] def get(self, hdfs_filename, local_filename): assert self.exists(hdfs_filename) with open(local_filename, "wb") as outfile: self.hdfs.download(hdfs_filename, outfile)
[docs] def list_dir(self, hdfs_dirname): return self.hdfs.ls(hdfs_dirname)
[docs] def isdir(self, hdfs_dirname): if not self.exists(hdfs_dirname): return False info = self.hdfs.info(hdfs_dirname) return info["type"] == "directory"
[docs] def isfile(self, hdfs_filename): if not self.exists(hdfs_filename): return False info = self.hdfs.info(hdfs_filename) return info["type"] == "file"
[docs] def list_parquet_files(self, hdfs_dirname, regexp=r".*\.parquet"): """List all parquet files in hdfs_dirname.""" regexp = re.compile(regexp) def list_parquet_files_recursive(dirname, collection): assert self.isdir(dirname) allfiles = self.list_dir(dirname) for hfile in allfiles: if self.isdir(hfile): list_parquet_files_recursive(hfile, collection) elif self.isfile(hfile) and regexp.match(hfile) and \ hfile not in collection: collection.append(hfile) assert self.isdir(hdfs_dirname), hdfs_dirname result = [] list_parquet_files_recursive(hdfs_dirname, result) return result