from __future__ import with_statement
import re
from contextlib import contextmanager
from plumbum.commands import CommandNotFound, shquote, ConcreteCommand
from plumbum.lib import _setdoc, ProcInfo, six
from plumbum.machines.local import LocalPath
from tempfile import NamedTemporaryFile
from plumbum.machines.env import BaseEnv
from plumbum.path.remote import RemotePath, RemoteWorkdir, StatRes
[docs]class RemoteEnv(BaseEnv):
"""The remote machine's environment; exposes a dict-like interface"""
__slots__ = ["_orig", "remote"]
def __init__(self, remote):
self.remote = remote
self._curr = dict(line.split("=", 1) for line in self.remote._session.run("env")[1].splitlines())
self._orig = self._curr.copy()
BaseEnv.__init__(self, self.remote.path, ":")
@_setdoc(BaseEnv)
[docs] def __delitem__(self, name):
BaseEnv.__delitem__(self, name)
self.remote._session.run("unset %s" % (name,))
@_setdoc(BaseEnv)
[docs] def __setitem__(self, name, value):
BaseEnv.__setitem__(self, name, value)
self.remote._session.run("export %s=%s" % (name, shquote(value)))
@_setdoc(BaseEnv)
[docs] def pop(self, name, *default):
BaseEnv.pop(self, name, *default)
self.remote._session.run("unset %s" % (name,))
@_setdoc(BaseEnv)
[docs] def update(self, *args, **kwargs):
BaseEnv.update(self, *args, **kwargs)
self.remote._session.run("export " +
" ".join("%s=%s" % (k, shquote(v)) for k, v in self.getdict().items()))
[docs] def expand(self, expr):
"""Expands any environment variables and home shortcuts found in ``expr``
(like ``os.path.expanduser`` combined with ``os.path.expandvars``)
:param expr: An expression containing environment variables (as ``$FOO``) or
home shortcuts (as ``~/.bashrc``)
:returns: The expanded string"""
return self.remote._session.run("echo %s" % (expr,))[1].strip()
[docs] def expanduser(self, expr):
"""Expand home shortcuts (e.g., ``~/foo/bar`` or ``~john/foo/bar``)
:param expr: An expression containing home shortcuts
:returns: The expanded string"""
if not any(part.startswith("~") for part in expr.split("/")):
return expr
# we escape all $ signs to avoid expanding env-vars
return self.remote._session.run("echo %s" % (expr.replace("$", "\\$"),))[1].strip()
# def clear(self):
# BaseEnv.clear(self, *args, **kwargs)
# self.remote._session.run("export %s" % " ".join("%s=%s" % (k, v) for k, v in self.getdict()))
[docs] def getdelta(self):
"""Returns the difference between the this environment and the original environment of
the remote machine"""
self._curr["PATH"] = self.path.join()
delta = {}
for k, v in self._curr.items():
if k not in self._orig:
delta[k] = str(v)
for k, v in self._orig.items():
if k not in self._curr:
delta[k] = ""
else:
if v != self._curr[k]:
delta[k] = self._curr[k]
return delta
class RemoteCommand(ConcreteCommand):
__slots__ = ["remote", "executable"]
QUOTE_LEVEL = 1
def __init__(self, remote, executable, encoding = "auto"):
self.remote = remote
ConcreteCommand.__init__(self, executable,
remote.encoding if encoding == "auto" else encoding)
@property
def machine(self):
return self.remote
def __repr__(self):
return "RemoteCommand(%r, %r)" % (self.remote, self.executable)
def popen(self, args = (), **kwargs):
return self.remote.popen(self[args], **kwargs)
class ClosedRemoteMachine(Exception):
pass
class ClosedRemote(object):
__slots__ = ["_obj"]
def __init__(self, obj):
self._obj = obj
def close(self):
pass
def __getattr__(self, name):
raise ClosedRemoteMachine("%r has been closed" % (self._obj,))
[docs]class BaseRemoteMachine(object):
"""Represents a *remote machine*; serves as an entry point to everything related to that
remote machine, such as working directory and environment manipulation, command creation,
etc.
Attributes:
* ``cwd`` - the remote working directory
* ``env`` - the remote environment
* ``encoding`` - the remote machine's default encoding (assumed to be UTF8)
* ``connect_timeout`` - the connection timeout
"""
# allow inheritors to override the RemoteCommand class
RemoteCommand = RemoteCommand
def __init__(self, encoding = "utf8", connect_timeout = 10, new_session = False):
self.encoding = encoding
self.connect_timeout = connect_timeout
self._session = self.session(new_session = new_session)
self.uname = self._get_uname()
self.cwd = RemoteWorkdir(self)
self.env = RemoteEnv(self)
self._python = None
def _get_uname(self):
rc, out, _ = self._session.run("uname", retcode = None)
if rc == 0:
return out.strip()
else:
rc, out, _ = self._session.run("python -c 'import platform;print(platform.uname()[0])'", retcode = None)
if rc == 0:
return out.strip()
else:
# all POSIX systems should have uname. make an educated guess it's Windows
return "Windows"
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self)
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
[docs] def close(self):
"""closes the connection to the remote machine; all paths and programs will
become defunct"""
self._session.close()
self._session = ClosedRemote(self)
[docs] def path(self, *parts):
"""A factory for :class:`RemotePaths <plumbum.path.remote.RemotePath>`.
Usage: ``p = rem.path("/usr", "lib", "python2.7")``
"""
parts2 = [str(self.cwd)]
for p in parts:
if isinstance(p, LocalPath):
raise TypeError("Cannot construct RemotePath from %r" % (p,))
p = str(p)
if "~" in p:
p = self.env.expanduser(p)
parts2.append(p)
return RemotePath(self, *parts2)
[docs] def which(self, progname):
"""Looks up a program in the ``PATH``. If the program is not found, raises
:class:`CommandNotFound <plumbum.commands.CommandNotFound>`
:param progname: The program's name. Note that if underscores (``_``) are present
in the name, and the exact name is not found, they will be replaced
in turn by hyphens (``-``) then periods (``.``), and the name will
be looked up again for each alternative
:returns: A :class:`RemotePath <plumbum.path.local.RemotePath>`
"""
alternatives = [progname]
if "_" in progname:
alternatives.append(progname.replace("_", "-"))
alternatives.append(progname.replace("_", "."))
for name in alternatives:
for p in self.env.path:
fn = p / name
if fn.access("x"):
return fn
raise CommandNotFound(progname, self.env.path)
[docs] def __getitem__(self, cmd):
"""Returns a `Command` object representing the given program. ``cmd`` can be a string or
a :class:`RemotePath <plumbum.path.remote.RemotePath>`; if it is a path, a command
representing this path will be returned; otherwise, the program name will be looked up in
the system's ``PATH`` (using ``which``). Usage::
r_ls = rem["ls"]
"""
if isinstance(cmd, RemotePath):
if cmd.remote is self:
return self.RemoteCommand(self, cmd)
else:
raise TypeError("Given path does not belong to this remote machine: %r" % (cmd,))
elif not isinstance(cmd, LocalPath):
if "/" in cmd or "\\" in cmd:
return self.RemoteCommand(self, self.path(cmd))
else:
return self.RemoteCommand(self, self.which(cmd))
else:
raise TypeError("cmd must not be a LocalPath: %r" % (cmd,))
[docs] def __contains__(self, cmd):
"""Tests for the existance of the command, e.g., ``"ls" in remote_machine``.
``cmd`` can be anything acceptable by ``__getitem__``.
"""
try:
self[cmd]
except CommandNotFound:
return False
else:
return True
@property
def python(self):
"""A command that represents the default remote python interpreter"""
if not self._python:
self._python = self["python"]
return self._python
[docs] def session(self, isatty = False, new_session = False):
"""Creates a new :class:`ShellSession <plumbum.session.ShellSession>` object; this invokes the user's
shell on the remote machine and executes commands on it over stdin/stdout/stderr"""
raise NotImplementedError()
[docs] def download(self, src, dst):
"""Downloads a remote file/directory (``src``) to a local destination (``dst``).
``src`` must be a string or a :class:`RemotePath <plumbum.path.remote.RemotePath>`
pointing to this remote machine, and ``dst`` must be a string or a
:class:`LocalPath <plumbum.machines.local.LocalPath>`"""
raise NotImplementedError()
[docs] def upload(self, src, dst):
"""Uploads a local file/directory (``src``) to a remote destination (``dst``).
``src`` must be a string or a :class:`LocalPath <plumbum.machines.local.LocalPath>`,
and ``dst`` must be a string or a :class:`RemotePath <plumbum.path.remote.RemotePath>`
pointing to this remote machine"""
raise NotImplementedError()
[docs] def popen(self, args, **kwargs):
"""Spawns the given command on the remote machine, returning a ``Popen``-like object;
do not use this method directly, unless you need "low-level" control on the remote
process"""
raise NotImplementedError()
[docs] def list_processes(self):
"""
Returns information about all running processes (on POSIX systems: using ``ps``)
.. versionadded:: 1.3
"""
ps = self["ps"]
lines = ps("-e", "-o", "pid,uid,stat,args").splitlines()
lines.pop(0) # header
for line in lines:
parts = line.strip().split()
yield ProcInfo(int(parts[0]), int(parts[1]), parts[2], " ".join(parts[3:]))
[docs] def pgrep(self, pattern):
"""
Process grep: return information about all processes whose command-line args match the given regex pattern
"""
pat = re.compile(pattern)
for procinfo in self.list_processes():
if pat.search(procinfo.args):
yield procinfo
@contextmanager
[docs] def tempdir(self):
"""A context manager that creates a remote temporary directory, which is removed when
the context exits"""
_, out, _ = self._session.run("mktemp -d")
dir = self.path(out.strip()) # @ReservedAssignment
try:
yield dir
finally:
dir.delete()
#
# Path implementation
#
def _path_listdir(self, fn):
files = self._session.run("ls -a %s" % (shquote(fn),))[1].splitlines()
files.remove(".")
files.remove("..")
return files
def _path_glob(self, fn, pattern):
matches = self._session.run("for fn in %s/%s; do echo $fn; done" % (fn, pattern))[1].splitlines()
if len(matches) == 1 and not self._path_stat(matches[0]):
return [] # pattern expansion failed
return matches
def _path_getuid(self, fn):
return self._session.run("stat -c '%u,%U' " + shquote(fn))[1].strip().split(",")
def _path_getgid(self, fn):
return self._session.run("stat -c '%g,%G' " + shquote(fn))[1].strip().split(",")
def _path_stat(self, fn):
rc, out, _ = self._session.run("stat -c '%F,%f,%i,%d,%h,%u,%g,%s,%X,%Y,%Z' " + shquote(fn),
retcode = None)
if rc != 0:
return None
statres = out.strip().split(",")
text_mode = statres.pop(0).lower()
res = StatRes((int(statres[0], 16),) + tuple(int(sr) for sr in statres[1:]))
res.text_mode = text_mode
return res
def _path_delete(self, fn):
self._session.run("rm -rf %s" % (shquote(fn),))
def _path_move(self, src, dst):
self._session.run("mv %s %s" % (shquote(src), shquote(dst)))
def _path_copy(self, src, dst):
self._session.run("cp -r %s %s" % (shquote(src), shquote(dst)))
def _path_mkdir(self, fn):
self._session.run("mkdir -p %s" % (shquote(fn),))
def _path_chmod(self, mode, fn):
self._session.run("chmod %o %s" % (mode, shquote(fn)))
def _path_chown(self, fn, owner, group, recursive):
args = ["chown"]
if recursive:
args.append("-R")
if owner is not None and group is not None:
args.append("%s:%s" % (owner, group))
elif owner is not None:
args.append(str(owner))
elif group is not None:
args.append(":%s" % (group,))
args.append(shquote(fn))
self._session.run(" ".join(args))
def _path_read(self, fn):
data = self["cat"](fn)
if self.encoding and isinstance(data, six.unicode_type):
data = data.encode(self.encoding)
return data
def _path_write(self, fn, data):
if self.encoding and isinstance(data, six.unicode_type):
data = data.encode(self.encoding)
with NamedTemporaryFile() as f:
f.write(data)
f.flush()
f.seek(0)
self.upload(f.name, fn)
def _path_link(self, src, dst, symlink):
self._session.run("ln -s %s %s" % ("-s" if symlink else "", shquote(src), shquote(dst)))