import os
import subprocess
import logging
from urllib.parse import urlparse, urlunparse
logger = logging.getLogger(__name__)
[docs]class RsyncHelpers:
"""A class containing helper funcs for working with rsync."""
def __init__(self, remote):
if not remote.endswith("/"):
remote += "/"
self.remote = remote
parsed_remote = urlparse(remote)
self.parsed_remote = parsed_remote
if parsed_remote.scheme:
assert parsed_remote.scheme == "ssh"
self.rsync_remote = remote
if parsed_remote.hostname:
self.rsync_remote = \
f"{parsed_remote.hostname}:{parsed_remote.path}"
if parsed_remote.username:
self.rsync_remote = \
f"{parsed_remote.username}@{self.rsync_remote}"
self.rsync_remote_shell = None
if parsed_remote.port and parsed_remote.port != 22:
self.rsync_remote_shell = f"ssh -p {parsed_remote.port}"
logger.debug("parsed_remote: %s", parsed_remote)
[docs] def hosturl(self):
logger.debug(self.parsed_remote)
return urlunparse(
(
self.parsed_remote.scheme,
self.parsed_remote.netloc,
"", # path
"", # params
"", # query
"", # fragment identifier
))
@staticmethod
def _exclude_options(exclude=None):
if exclude is None:
return []
result = []
for ex in exclude:
if not ex:
continue
result.extend(["--exclude", f"{ex}"])
return result
def _copy_to_remote_cmd(
self, local_path, remote_subdir=None,
exclude=None,
ignore_existing=False,
clear_remote=True):
# pylint: disable=too-many-branches
exclude = exclude if exclude is not None else []
logger.debug("rsync remote: %s", self.rsync_remote)
logger.debug("rsync hosturl: %s", self.hosturl())
cmds = []
if os.path.isdir(local_path):
local_dir = local_path
if not local_path.endswith("/"):
local_path += "/"
else:
local_dir = os.path.dirname(local_path)
if not local_dir.endswith("/"):
local_dir += "/"
logger.debug("rsync remote: <%s>", self.rsync_remote)
rsync_path = ""
rsync_remote = self.rsync_remote
if remote_subdir is not None:
if remote_subdir.startswith("/"):
remote_subdir = remote_subdir[1:]
rsync_path = os.path.join(self.parsed_remote.path, remote_subdir)
rsync_remote = os.path.join(self.rsync_remote, remote_subdir)
if clear_remote:
if self.hosturl():
cmds.append(
[
"ssh", f"{self.parsed_remote.netloc}",
f"rm -rf {rsync_path}"
])
else:
cmds.append(["rm", "-rf", "{rsync_path}"])
if self.hosturl():
cmds.append(
[
"ssh", f"{self.parsed_remote.netloc}",
f"mkdir -p {rsync_path}"
])
else:
cmds.append(["mkdir", "-p", f"{rsync_path}"])
rsync_cmd = ["/usr/bin/rsync", "-avPHt"]
exclude_options = self._exclude_options(exclude)
rsync_cmd.extend(exclude_options)
if ignore_existing:
rsync_cmd.append("--ignore-existing")
if self.rsync_remote_shell:
rsync_cmd.extend(["-e", self.rsync_remote_shell])
rsync_cmd.extend([local_path, rsync_remote])
logger.debug("rsync command: %s", rsync_cmd)
cmds.append(rsync_cmd)
return cmds
def _copy_to_local_cmd(self, local_path, remote_subdir=None, exclude=None):
exclude = exclude if exclude is not None else []
os.makedirs(local_path, exist_ok=True)
cmds = []
if not local_path.endswith("/"):
local_path += "/"
rsync_remote = self.rsync_remote
if remote_subdir is not None:
if remote_subdir.startswith("/"):
remote_subdir = remote_subdir[1:]
rsync_remote = os.path.join(self.rsync_remote, remote_subdir)
rsync_cmd = ["/usr/bin/rsync", "-avPHt"]
if self.rsync_remote_shell:
rsync_cmd.extend(["-e", self.rsync_remote_shell])
exclude_options = self._exclude_options(exclude)
rsync_cmd.extend(exclude_options)
rsync_cmd.extend([rsync_remote, local_path])
cmds.append(rsync_cmd)
return cmds
@staticmethod
def _cmd_execute(commands):
for cmd in commands:
logger.info("executing command: %s", cmd)
# argv = [c.strip() for c in cmd.split(" ")]
# argv = list(filter(lambda c: len(c) > 0, argv))
logger.debug("executing command: %s", cmd)
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True) as proc:
while True:
line = proc.stdout.readline()
if not line:
break
line = line.strip()
logger.debug(line)
logger.debug("command %s finished", cmd)
if proc.returncode:
logger.error("command %s finished with %s",
cmd, proc.returncode)
raise ValueError(f"error in {cmd}")
[docs] def clear_remote(self, remote_subdir):
"""Clear the remote directory."""
cmds = []
rsync_path = ""
assert remote_subdir is not None
if remote_subdir.startswith("/"):
remote_subdir = remote_subdir[1:]
rsync_path = os.path.join(self.parsed_remote.path, remote_subdir)
if self.hosturl():
cmds.append(
[
"ssh", f"{self.parsed_remote.netloc}",
f"rm -rf {rsync_path}"
])
cmds.append(
[
"ssh", f"{self.parsed_remote.netloc}",
f"mkdir -p {rsync_path}"
])
else:
cmds.append(["rm", "-rf", f"{rsync_path}"])
cmds.append(["mkdir", "-p", f"{rsync_path}"])
self._cmd_execute(cmds)
[docs] def copy_to_remote(
self, local_path, remote_subdir=None, exclude=None,
clear_remote=True):
"""Copy from a local dir to a remote one."""
logger.debug("copying %s to %s", local_path, remote_subdir)
cmd = self._copy_to_remote_cmd(
local_path, remote_subdir=remote_subdir, exclude=exclude,
clear_remote=clear_remote)
self._cmd_execute(cmd)
[docs] def copy_to_local(self, local_path, remote_subdir=None, exclude=None):
cmd = self._copy_to_local_cmd(
local_path, remote_subdir=remote_subdir, exclude=exclude)
self._cmd_execute(cmd)