From: Alfredo Deza Date: Thu, 26 Jun 2014 19:26:05 +0000 (-0400) Subject: remove execnet from source X-Git-Tag: 0.0.17~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ff9cddb2afa1664c42b8235a0d78e26a547b053c;p=remoto.git remove execnet from source Signed-off-by: Alfredo Deza --- diff --git a/remoto/lib/execnet/__init__.py b/remoto/lib/execnet/__init__.py deleted file mode 100644 index fe1b681..0000000 --- a/remoto/lib/execnet/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -execnet: pure python lib for connecting to local and remote Python Interpreters. - -(c) 2012, Holger Krekel and others -""" -__version__ = '1.2.0.dev2-ad1' - -from . import apipkg - -apipkg.initpkg(__name__, { - 'PopenGateway': '.deprecated:PopenGateway', - 'SocketGateway': '.deprecated:SocketGateway', - 'SshGateway': '.deprecated:SshGateway', - 'makegateway': '.multi:makegateway', - 'set_execmodel': '.multi:set_execmodel', - 'HostNotFound': '.gateway_bootstrap:HostNotFound', - 'RemoteError': '.gateway_base:RemoteError', - 'TimeoutError': '.gateway_base:TimeoutError', - 'XSpec': '.xspec:XSpec', - 'Group': '.multi:Group', - 'MultiChannel': '.multi:MultiChannel', - 'RSync': '.rsync:RSync', - 'default_group': '.multi:default_group', - 'dumps': '.gateway_base:dumps', - 'loads': '.gateway_base:loads', - 'load': '.gateway_base:load', - 'dump': '.gateway_base:dump', - 'DataFormatError': '.gateway_base:DataFormatError', -}) - -# CHANGELOG -# -# 1.2.0-ad1: Patch `if case` in to_io method to prevent AttributeErrors diff --git a/remoto/lib/execnet/apipkg.py b/remoto/lib/execnet/apipkg.py deleted file mode 100644 index a4576c0..0000000 --- a/remoto/lib/execnet/apipkg.py +++ /dev/null @@ -1,167 +0,0 @@ -""" -apipkg: control the exported namespace of a python package. - -see http://pypi.python.org/pypi/apipkg - -(c) holger krekel, 2009 - MIT license -""" -import os -import sys -from types import ModuleType - -__version__ = '1.2' - -def initpkg(pkgname, exportdefs, attr=dict()): - """ initialize given package from the export definitions. """ - oldmod = sys.modules.get(pkgname) - d = {} - f = getattr(oldmod, '__file__', None) - if f: - f = os.path.abspath(f) - d['__file__'] = f - if hasattr(oldmod, '__version__'): - d['__version__'] = oldmod.__version__ - if hasattr(oldmod, '__loader__'): - d['__loader__'] = oldmod.__loader__ - if hasattr(oldmod, '__path__'): - d['__path__'] = [os.path.abspath(p) for p in oldmod.__path__] - if '__doc__' not in exportdefs and getattr(oldmod, '__doc__', None): - d['__doc__'] = oldmod.__doc__ - d.update(attr) - if hasattr(oldmod, "__dict__"): - oldmod.__dict__.update(d) - mod = ApiModule(pkgname, exportdefs, implprefix=pkgname, attr=d) - sys.modules[pkgname] = mod - -def importobj(modpath, attrname): - module = __import__(modpath, None, None, ['__doc__']) - if not attrname: - return module - - retval = module - names = attrname.split(".") - for x in names: - retval = getattr(retval, x) - return retval - -class ApiModule(ModuleType): - def __docget(self): - try: - return self.__doc - except AttributeError: - if '__doc__' in self.__map__: - return self.__makeattr('__doc__') - def __docset(self, value): - self.__doc = value - __doc__ = property(__docget, __docset) - - def __init__(self, name, importspec, implprefix=None, attr=None): - self.__name__ = name - self.__all__ = [x for x in importspec if x != '__onfirstaccess__'] - self.__map__ = {} - self.__implprefix__ = implprefix or name - if attr: - for name, val in attr.items(): - #print "setting", self.__name__, name, val - setattr(self, name, val) - for name, importspec in importspec.items(): - if isinstance(importspec, dict): - subname = '%s.%s'%(self.__name__, name) - apimod = ApiModule(subname, importspec, implprefix) - sys.modules[subname] = apimod - setattr(self, name, apimod) - else: - parts = importspec.split(':') - modpath = parts.pop(0) - attrname = parts and parts[0] or "" - if modpath[0] == '.': - modpath = implprefix + modpath - - if not attrname: - subname = '%s.%s'%(self.__name__, name) - apimod = AliasModule(subname, modpath) - sys.modules[subname] = apimod - if '.' not in name: - setattr(self, name, apimod) - else: - self.__map__[name] = (modpath, attrname) - - def __repr__(self): - l = [] - if hasattr(self, '__version__'): - l.append("version=" + repr(self.__version__)) - if hasattr(self, '__file__'): - l.append('from ' + repr(self.__file__)) - if l: - return '' % (self.__name__, " ".join(l)) - return '' % (self.__name__,) - - def __makeattr(self, name): - """lazily compute value for name or raise AttributeError if unknown.""" - #print "makeattr", self.__name__, name - target = None - if '__onfirstaccess__' in self.__map__: - target = self.__map__.pop('__onfirstaccess__') - importobj(*target)() - try: - modpath, attrname = self.__map__[name] - except KeyError: - if target is not None and name != '__onfirstaccess__': - # retry, onfirstaccess might have set attrs - return getattr(self, name) - raise AttributeError(name) - else: - result = importobj(modpath, attrname) - setattr(self, name, result) - try: - del self.__map__[name] - except KeyError: - pass # in a recursive-import situation a double-del can happen - return result - - __getattr__ = __makeattr - - def __dict__(self): - # force all the content of the module to be loaded when __dict__ is read - dictdescr = ModuleType.__dict__['__dict__'] - dict = dictdescr.__get__(self) - if dict is not None: - hasattr(self, 'some') - for name in self.__all__: - try: - self.__makeattr(name) - except AttributeError: - pass - return dict - __dict__ = property(__dict__) - - -def AliasModule(modname, modpath, attrname=None): - mod = [] - - def getmod(): - if not mod: - x = importobj(modpath, None) - if attrname is not None: - x = getattr(x, attrname) - mod.append(x) - return mod[0] - - class AliasModule(ModuleType): - - def __repr__(self): - x = modpath - if attrname: - x += "." + attrname - return '' % (modname, x) - - def __getattribute__(self, name): - return getattr(getmod(), name) - - def __setattr__(self, name, value): - setattr(getmod(), name, value) - - def __delattr__(self, name): - delattr(getmod(), name) - - return AliasModule(modname) diff --git a/remoto/lib/execnet/deprecated.py b/remoto/lib/execnet/deprecated.py deleted file mode 100644 index aef4626..0000000 --- a/remoto/lib/execnet/deprecated.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -some deprecated calls - -(c) 2008-2009, Holger Krekel and others -""" -import execnet - -def PopenGateway(python=None): - """ instantiate a gateway to a subprocess - started with the given 'python' executable. - """ - APIWARN("1.0.0b4", "use makegateway('popen')") - spec = execnet.XSpec("popen") - spec.python = python - return execnet.default_group.makegateway(spec) - -def SocketGateway(host, port): - """ This Gateway provides interaction with a remote process - by connecting to a specified socket. On the remote - side you need to manually start a small script - (py/execnet/script/socketserver.py) that accepts - SocketGateway connections or use the experimental - new_remote() method on existing gateways. - """ - APIWARN("1.0.0b4", "use makegateway('socket=host:port')") - spec = execnet.XSpec("socket=%s:%s" %(host, port)) - return execnet.default_group.makegateway(spec) - -def SshGateway(sshaddress, remotepython=None, ssh_config=None): - """ instantiate a remote ssh process with the - given 'sshaddress' and remotepython version. - you may specify an ssh_config file. - """ - APIWARN("1.0.0b4", "use makegateway('ssh=host')") - spec = execnet.XSpec("ssh=%s" % sshaddress) - spec.python = remotepython - spec.ssh_config = ssh_config - return execnet.default_group.makegateway(spec) - -def APIWARN(version, msg, stacklevel=3): - import warnings - Warn = DeprecationWarning("(since version %s) %s" %(version, msg)) - warnings.warn(Warn, stacklevel=stacklevel) diff --git a/remoto/lib/execnet/gateway.py b/remoto/lib/execnet/gateway.py deleted file mode 100644 index e1d0e07..0000000 --- a/remoto/lib/execnet/gateway.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -gateway code for initiating popen, socket and ssh connections. -(c) 2004-2013, Holger Krekel and others -""" - -import sys, os, inspect, types, linecache -import textwrap -import execnet -from execnet.gateway_base import Message -from execnet import gateway_base -importdir = os.path.dirname(os.path.dirname(execnet.__file__)) - -class Gateway(gateway_base.BaseGateway): - """ Gateway to a local or remote Python Intepreter. """ - - def __init__(self, io, spec): - super(Gateway, self).__init__(io=io, id=spec.id, _startcount=1) - self.spec = spec - self._initreceive() - - @property - def remoteaddress(self): - return self._io.remoteaddress - - def __repr__(self): - """ return string representing gateway type and status. """ - try: - r = (self.hasreceiver() and 'receive-live' or 'not-receiving') - i = len(self._channelfactory.channels()) - except AttributeError: - r = "uninitialized" - i = "no" - return "<%s id=%r %s, %s model, %s active channels>" %( - self.__class__.__name__, self.id, r, self.execmodel.backend, i) - - def exit(self): - """ trigger gateway exit. Defer waiting for finishing - of receiver-thread and subprocess activity to when - group.terminate() is called. - """ - self._trace("gateway.exit() called") - if self not in self._group: - self._trace("gateway already unregistered with group") - return - self._group._unregister(self) - try: - self._trace("--> sending GATEWAY_TERMINATE") - self._send(Message.GATEWAY_TERMINATE) - self._trace("--> io.close_write") - self._io.close_write() - except (ValueError, EOFError, IOError): - v = sys.exc_info()[1] - self._trace("io-error: could not send termination sequence") - self._trace(" exception: %r" % v) - - def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): - """ - set the string coercion for this gateway - the default is to try to convert py2 str as py3 str, - but not to try and convert py3 str to py2 str - """ - self._strconfig = (py2str_as_py3str, py3str_as_py2str) - data = gateway_base.dumps_internal(self._strconfig) - self._send(Message.RECONFIGURE, data=data) - - - def _rinfo(self, update=False): - """ return some sys/env information from remote. """ - if update or not hasattr(self, '_cache_rinfo'): - ch = self.remote_exec(rinfo_source) - self._cache_rinfo = RInfo(ch.receive()) - return self._cache_rinfo - - def hasreceiver(self): - """ return True if gateway is able to receive data. """ - return self._receiverthread.running # approxmimation - - def remote_status(self): - """ return information object about remote execution status. """ - channel = self.newchannel() - self._send(Message.STATUS, channel.id) - statusdict = channel.receive() - # the other side didn't actually instantiate a channel - # so we just delete the internal id/channel mapping - self._channelfactory._local_close(channel.id) - return RemoteStatus(statusdict) - - def remote_exec(self, source, **kwargs): - """ return channel object and connect it to a remote - execution thread where the given ``source`` executes. - - * ``source`` is a string: execute source string remotely - with a ``channel`` put into the global namespace. - * ``source`` is a pure function: serialize source and - call function with ``**kwargs``, adding a - ``channel`` object to the keyword arguments. - * ``source`` is a pure module: execute source of module - with a ``channel`` in its global namespace - - In all cases the binding ``__name__='__channelexec__'`` - will be available in the global namespace of the remotely - executing code. - """ - call_name = None - if isinstance(source, types.ModuleType): - linecache.updatecache(inspect.getsourcefile(source)) - source = inspect.getsource(source) - elif isinstance(source, types.FunctionType): - call_name = source.__name__ - source = _source_of_function(source) - else: - source = textwrap.dedent(str(source)) - - if call_name is None and kwargs: - raise TypeError("can't pass kwargs to non-function remote_exec") - - channel = self.newchannel() - self._send(Message.CHANNEL_EXEC, - channel.id, - gateway_base.dumps_internal((source, call_name, kwargs))) - return channel - - def remote_init_threads(self, num=None): - """ DEPRECATED. Is currently a NO-OPERATION already.""" - print ("WARNING: remote_init_threads() is a no-operation in execnet-1.2") - -class RInfo: - def __init__(self, kwargs): - self.__dict__.update(kwargs) - - def __repr__(self): - info = ", ".join(["%s=%s" % item - for item in self.__dict__.items()]) - return "" % info - -RemoteStatus = RInfo - -def rinfo_source(channel): - import sys, os - channel.send(dict( - executable = sys.executable, - version_info = sys.version_info[:5], - platform = sys.platform, - cwd = os.getcwd(), - pid = os.getpid(), - )) - - -def _find_non_builtin_globals(source, codeobj): - try: - import ast - except ImportError: - return None - try: - import __builtin__ - except ImportError: - import builtins as __builtin__ - - vars = dict.fromkeys(codeobj.co_varnames) - all = [] - for node in ast.walk(ast.parse(source)): - if (isinstance(node, ast.Name) and node.id not in vars and - node.id not in __builtin__.__dict__): - all.append(node.id) - return all - - -def _source_of_function(function): - if function.__name__ == '': - raise ValueError("can't evaluate lambda functions'") - #XXX: we dont check before remote instanciation - # if arguments are used propperly - args, varargs, keywords, defaults = inspect.getargspec(function) - if args[0] != 'channel': - raise ValueError('expected first function argument to be `channel`') - - if sys.version_info < (3,0): - closure = function.func_closure - codeobj = function.func_code - else: - closure = function.__closure__ - codeobj = function.__code__ - - if closure is not None: - raise ValueError("functions with closures can't be passed") - - try: - source = inspect.getsource(function) - except IOError: - raise ValueError("can't find source file for %s" % function) - - source = textwrap.dedent(source) # just for inner functions - - used_globals = _find_non_builtin_globals(source, codeobj) - if used_globals: - raise ValueError( - "the use of non-builtin globals isn't supported", - used_globals, - ) - - return source - diff --git a/remoto/lib/execnet/gateway_base.py b/remoto/lib/execnet/gateway_base.py deleted file mode 100644 index e586dcd..0000000 --- a/remoto/lib/execnet/gateway_base.py +++ /dev/null @@ -1,1470 +0,0 @@ -""" -base execnet gateway code send to the other side for bootstrapping. - -NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython - -(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others -""" -from __future__ import with_statement -import sys, os, weakref -import traceback, struct - -# NOTE that we want to avoid try/except style importing -# to avoid setting sys.exc_info() during import -# - -ISPY3 = sys.version_info >= (3, 0) -if ISPY3: - from io import BytesIO - exec("def do_exec(co, loc): exec(co, loc)\n" - "def reraise(cls, val, tb): raise val\n") - unicode = str - _long_type = int - from _thread import interrupt_main -else: - from StringIO import StringIO as BytesIO - exec("def do_exec(co, loc): exec co in loc\n" - "def reraise(cls, val, tb): raise cls, val, tb\n") - bytes = str - _long_type = long - try: - from thread import interrupt_main - except ImportError: - interrupt_main = None - -class EmptySemaphore: - acquire = release = lambda self: None - -def get_execmodel(backend): - if hasattr(backend, "backend"): - return backend - if backend == "thread": - importdef = { - 'get_ident': ['thread::get_ident', '_thread::get_ident'], - '_start_new_thread': ['thread::start_new_thread', - '_thread::start_new_thread'], - 'threading': ["threading",], - 'queue': ["queue", "Queue"], - 'sleep': ['time::sleep'], - 'subprocess': ['subprocess'], - 'socket': ['socket'], - '_fdopen': ['os::fdopen'], - '_lock': ['threading'], - '_event': ['threading'], - } - def exec_start(self, func, args=()): - self._start_new_thread(func, args) - - elif backend == "eventlet": - importdef = { - 'get_ident': ['eventlet.green.thread::get_ident'], - '_spawn_n': ['eventlet::spawn_n'], - 'threading': ['eventlet.green.threading'], - 'queue': ["eventlet.queue"], - 'sleep': ['eventlet::sleep'], - 'subprocess': ['eventlet.green.subprocess'], - 'socket': ['eventlet.green.socket'], - '_fdopen': ['eventlet.green.os::fdopen'], - '_lock': ['eventlet.green.threading'], - '_event': ['eventlet.green.threading'], - } - def exec_start(self, func, args=()): - self._spawn_n(func, *args) - elif backend == "gevent": - importdef = { - 'get_ident': ['gevent.thread::get_ident'], - '_spawn_n': ['gevent::spawn'], - 'threading': ['threading'], - 'queue': ["gevent.queue"], - 'sleep': ['gevent::sleep'], - 'subprocess': ['gevent.subprocess'], - 'socket': ['gevent.socket'], - # XXX - '_fdopen': ['gevent.fileobject::FileObjectThread'], - '_lock': ['gevent.lock'], - '_event': ['gevent.event'], - } - def exec_start(self, func, args=()): - self._spawn_n(func, *args) - else: - raise ValueError("unknown execmodel %r" %(backend,)) - - class ExecModel: - def __init__(self, name): - self._importdef = importdef - self.backend = name - self._count = 0 - - def __repr__(self): - return "" % self.backend - - def __getattr__(self, name): - locs = self._importdef.get(name) - if locs is None: - raise AttributeError(name) - for loc in locs: - parts = loc.split("::") - loc = parts.pop(0) - try: - mod = __import__(loc, None, None, "__doc__") - except ImportError: - pass - else: - if parts: - mod = getattr(mod, parts[0]) - setattr(self, name, mod) - return mod - raise AttributeError(name) - - start = exec_start - - def fdopen(self, fd, mode, bufsize=1): - return self._fdopen(fd, mode, bufsize) - - def WorkerPool(self, size=None, hasprimary=False): - return WorkerPool(self, size, hasprimary=hasprimary) - - def Semaphore(self, size=None): - if size is None: - return EmptySemaphore() - return self._lock.Semaphore(size) - - def Lock(self): - return self._lock.RLock() - - def RLock(self): - return self._lock.RLock() - - def Event(self): - event = self._event.Event() - if sys.version_info < (2,7): - # patch wait function to return event state instead of None - real_wait = event.wait - def wait(timeout=None): - real_wait(timeout=timeout) - return event.isSet() - event.wait = wait - return event - - def PopenPiped(self, args): - PIPE = self.subprocess.PIPE - return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) - - - return ExecModel(backend) - - -class Reply(object): - """ reply instances provide access to the result - of a function execution that got dispatched - through WorkerPool.spawn() - """ - def __init__(self, task, threadmodel): - self.task = task - self._result_ready = threadmodel.Event() - self.running = True - - def get(self, timeout=None): - """ get the result object from an asynchronous function execution. - if the function execution raised an exception, - then calling get() will reraise that exception - including its traceback. - """ - self.waitfinish(timeout) - try: - return self._result - except AttributeError: - reraise(*(self._excinfo[:3])) # noqa - - def waitfinish(self, timeout=None): - if not self._result_ready.wait(timeout): - raise IOError("timeout waiting for %r" %(self.task, )) - - def run(self): - func, args, kwargs = self.task - try: - try: - self._result = func(*args, **kwargs) - except: - self._excinfo = sys.exc_info() - finally: - self._result_ready.set() - self.running = False - - -class WorkerPool(object): - """ A WorkerPool allows to spawn function executions - to threads, returning a reply object on which you - can ask for the result (and get exceptions reraised) - """ - def __init__(self, execmodel, size=None, hasprimary=False): - """ by default allow unlimited number of spawns. """ - self.execmodel = execmodel - self._size = size - self._running_lock = self.execmodel.Lock() - self._sem = self.execmodel.Semaphore(size) - self._running = set() - self._shutdown_event = self.execmodel.Event() - if hasprimary: - if self.execmodel.backend != "thread": - raise ValueError("hasprimary=True requires thread model") - self._primary_thread_event = self.execmodel.Event() - - def integrate_as_primary_thread(self): - """ integrate the thread with which we are called as a primary - thread to dispatch to when spawn is called. - """ - assert self.execmodel.backend == "thread", self.execmodel - # XXX insert check if we really are in the main thread - primary_thread_event = self._primary_thread_event - # interacts with code at REF1 - while not self._shutdown_event.isSet(): - primary_thread_event.wait() - func, args, kwargs = self._primary_thread_task - if func is None: # waitall() woke us up to finish the loop - break - func(*args, **kwargs) - primary_thread_event.clear() - - def shutdown(self): - self._shutdown_event.set() - - def wait_for_shutdown(self, timeout=None): - return self._shutdown_event.wait(timeout=timeout) - - def active_count(self): - return len(self._running) - - def spawn(self, func, *args, **kwargs): - """ return Reply object for the asynchronous dispatch - of the given func(*args, **kwargs). - """ - reply = Reply((func, args, kwargs), self.execmodel) - def run_and_release(): - reply.run() - try: - with self._running_lock: - self._running.remove(reply) - self._sem.release() - if not self._running: - try: - self._waitall_event.set() - except AttributeError: - pass - except TypeError: - pass - self._sem.acquire() - with self._running_lock: - self._running.add(reply) - # REF1 in 'thread' model we give priority to running in main thread - primary_thread_event = getattr(self, "_primary_thread_event", None) - if primary_thread_event is not None: - if not primary_thread_event.isSet(): - self._primary_thread_task = run_and_release, (), {} - primary_thread_event.set() - return reply - self.execmodel.start(run_and_release, ()) - return reply - - def waitall(self, timeout=None): - """ wait until all previosuly spawns have terminated. """ - # if it exists signal primary_thread to finish its loop - self._primary_thread_task = None, None, None - try: - self._primary_thread_event.set() - except AttributeError: - pass - with self._running_lock: - if not self._running: - return True - # if a Reply still runs, we let run_and_release - # signal us -- note that we are still holding the - # _running_lock to avoid race conditions - self._waitall_event = self.execmodel.Event() - return self._waitall_event.wait(timeout=timeout) - - -sysex = (KeyboardInterrupt, SystemExit) - - -DEBUG = os.environ.get('EXECNET_DEBUG') -pid = os.getpid() -if DEBUG == '2': - def trace(*msg): - try: - line = " ".join(map(str, msg)) - sys.stderr.write("[%s] %s\n" % (pid, line)) - sys.stderr.flush() - except Exception: - pass # nothing we can do, likely interpreter-shutdown -elif DEBUG: - import tempfile, os.path - fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) - #sys.stderr.write("execnet-debug at %r" %(fn,)) - debugfile = open(fn, 'w') - def trace(*msg): - try: - line = " ".join(map(str, msg)) - debugfile.write(line + "\n") - debugfile.flush() - except Exception: - try: - v = sys.exc_info()[1] - sys.stderr.write( - "[%s] exception during tracing: %r\n" % (pid, v)) - except Exception: - pass # nothing we can do, likely interpreter-shutdown -else: - notrace = trace = lambda *msg: None - -class Popen2IO: - error = (IOError, OSError, EOFError) - - def __init__(self, outfile, infile, execmodel): - # we need raw byte streams - self.outfile, self.infile = outfile, infile - if sys.platform == "win32": - import msvcrt - try: - msvcrt.setmode(infile.fileno(), os.O_BINARY) - msvcrt.setmode(outfile.fileno(), os.O_BINARY) - except (AttributeError, IOError): - pass - self._read = getattr(infile, "buffer", infile).read - self._write = getattr(outfile, "buffer", outfile).write - self.execmodel = execmodel - - def read(self, numbytes): - """Read exactly 'numbytes' bytes from the pipe. """ - # a file in non-blocking mode may return less bytes, so we loop - buf = bytes() - while numbytes > len(buf): - data = self._read(numbytes-len(buf)) - if not data: - raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) - buf += data - return buf - - def write(self, data): - """write out all data bytes. """ - assert isinstance(data, bytes) - self._write(data) - self.outfile.flush() - - def close_read(self): - self.infile.close() - - def close_write(self): - self.outfile.close() - -class Message: - """ encapsulates Messages and their wire protocol. """ - _types = [] - - def __init__(self, msgcode, channelid=0, data=''): - self.msgcode = msgcode - self.channelid = channelid - self.data = data - - @staticmethod - def from_io(io): - try: - header = io.read(9) # type 1, channel 4, payload 4 - if not header: - raise EOFError("empty read") - except EOFError: - e = sys.exc_info()[1] - raise EOFError('couldnt load message header, ' + e.args[0]) - msgtype, channel, payload = struct.unpack('!bii', header) - return Message(msgtype, channel, io.read(payload)) - - def to_io(self, io): - if struct.pack is not None: - header = struct.pack('!bii', self.msgcode, self.channelid, - len(self.data)) - io.write(header+self.data) - - def received(self, gateway): - self._types[self.msgcode](self, gateway) - - def __repr__(self): - class FakeChannel(object): - _strconfig = False, False # never transform, never fail - def __init__(self, id): - self.id = id - def __repr__(self): - return '' % self.id - FakeChannel.new = FakeChannel - FakeChannel.gateway = FakeChannel - name = self._types[self.msgcode].__name__.upper() - try: - data = loads_internal(self.data, FakeChannel) - except LoadError: - data = self.data - r = repr(data) - if len(r) > 90: - return "" %(name, - self.channelid, len(r)) - else: - return "" %(name, - self.channelid, r) - -class GatewayReceivedTerminate(Exception): - """ Receiverthread got termination message. """ - -def _setupmessages(): - def status(message, gateway): - # we use the channelid to send back information - # but don't instantiate a channel object - d = {'numchannels': len(gateway._channelfactory._channels), - 'numexecuting': gateway._execpool.active_count(), - 'execmodel': gateway.execmodel.backend, - } - gateway._send(Message.CHANNEL_DATA, message.channelid, - dumps_internal(d)) - gateway._send(Message.CHANNEL_CLOSE, message.channelid) - - def channel_exec(message, gateway): - channel = gateway._channelfactory.new(message.channelid) - gateway._local_schedulexec(channel=channel,sourcetask=message.data) - - def channel_data(message, gateway): - gateway._channelfactory._local_receive(message.channelid, message.data) - - def channel_close(message, gateway): - gateway._channelfactory._local_close(message.channelid) - - def channel_close_error(message, gateway): - remote_error = RemoteError(loads_internal(message.data)) - gateway._channelfactory._local_close(message.channelid, remote_error) - - def channel_last_message(message, gateway): - gateway._channelfactory._local_close(message.channelid, sendonly=True) - - def gateway_terminate(message, gateway): - raise GatewayReceivedTerminate(gateway) - - def reconfigure(message, gateway): - if message.channelid == 0: - target = gateway - else: - target = gateway._channelfactory.new(message.channelid) - target._strconfig = loads_internal(message.data, gateway) - - types = [ - status, reconfigure, gateway_terminate, - channel_exec, channel_data, channel_close, - channel_close_error, channel_last_message, - ] - for i, handler in enumerate(types): - Message._types.append(handler) - setattr(Message, handler.__name__.upper(), i) - -_setupmessages() - -def geterrortext(excinfo, - format_exception=traceback.format_exception, sysex=sysex): - try: - l = format_exception(*excinfo) - errortext = "".join(l) - except sysex: - raise - except: - errortext = '%s: %s' % (excinfo[0].__name__, - excinfo[1]) - return errortext - -class RemoteError(Exception): - """ Exception containing a stringified error from the other side. """ - def __init__(self, formatted): - self.formatted = formatted - Exception.__init__(self) - - def __str__(self): - return self.formatted - - def __repr__(self): - return "%s: %s" %(self.__class__.__name__, self.formatted) - - def warn(self): - if self.formatted != INTERRUPT_TEXT: - # XXX do this better - sys.stderr.write("[%s] Warning: unhandled %r\n" - % (os.getpid(), self,)) - -class TimeoutError(IOError): - """ Exception indicating that a timeout was reached. """ - - -NO_ENDMARKER_WANTED = object() - -class Channel(object): - """Communication channel between two Python Interpreter execution points.""" - RemoteError = RemoteError - TimeoutError = TimeoutError - _INTERNALWAKEUP = 1000 - _executing = False - - def __init__(self, gateway, id): - assert isinstance(id, int) - self.gateway = gateway - #XXX: defaults copied from Unserializer - self._strconfig = getattr(gateway, '_strconfig', (True, False)) - self.id = id - self._items = self.gateway.execmodel.queue.Queue() - self._closed = False - self._receiveclosed = self.gateway.execmodel.Event() - self._remoteerrors = [] - - def _trace(self, *msg): - self.gateway._trace(self.id, *msg) - - def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): - """ set a callback function for receiving items. - - All already queued items will immediately trigger the callback. - Afterwards the callback will execute in the receiver thread - for each received data item and calls to ``receive()`` will - raise an error. - If an endmarker is specified the callback will eventually - be called with the endmarker when the channel closes. - """ - _callbacks = self.gateway._channelfactory._callbacks - with self.gateway._receivelock: - if self._items is None: - raise IOError("%r has callback already registered" %(self,)) - items = self._items - self._items = None - while 1: - try: - olditem = items.get(block=False) - except self.gateway.execmodel.queue.Empty: - if not (self._closed or self._receiveclosed.isSet()): - _callbacks[self.id] = ( - callback, - endmarker, - self._strconfig, - ) - break - else: - if olditem is ENDMARKER: - items.put(olditem) # for other receivers - if endmarker is not NO_ENDMARKER_WANTED: - callback(endmarker) - break - else: - callback(olditem) - - def __repr__(self): - flag = self.isclosed() and "closed" or "open" - return "" % (self.id, flag) - - def __del__(self): - if self.gateway is None: # can be None in tests - return - self._trace("channel.__del__") - # no multithreading issues here, because we have the last ref to 'self' - if self._closed: - # state transition "closed" --> "deleted" - for error in self._remoteerrors: - error.warn() - elif self._receiveclosed.isSet(): - # state transition "sendonly" --> "deleted" - # the remote channel is already in "deleted" state, nothing to do - pass - else: - # state transition "opened" --> "deleted" - # check if we are in the middle of interpreter shutdown - # in which case the process will go away and we probably - # don't need to try to send a closing or last message - # (and often it won't work anymore to send things out) - if Message is not None: - if self._items is None: # has_callback - msgcode = Message.CHANNEL_LAST_MESSAGE - else: - msgcode = Message.CHANNEL_CLOSE - try: - self.gateway._send(msgcode, self.id) - except (IOError, ValueError): # ignore problems with sending - pass - - def _getremoteerror(self): - try: - return self._remoteerrors.pop(0) - except IndexError: - try: - return self.gateway._error - except AttributeError: - pass - return None - - # - # public API for channel objects - # - def isclosed(self): - """ return True if the channel is closed. A closed - channel may still hold items. - """ - return self._closed - - def makefile(self, mode='w', proxyclose=False): - """ return a file-like object. - mode can be 'w' or 'r' for writeable/readable files. - if proxyclose is true file.close() will also close the channel. - """ - if mode == "w": - return ChannelFileWrite(channel=self, proxyclose=proxyclose) - elif mode == "r": - return ChannelFileRead(channel=self, proxyclose=proxyclose) - raise ValueError("mode %r not availabe" %(mode,)) - - def close(self, error=None): - """ close down this channel with an optional error message. - Note that closing of a channel tied to remote_exec happens - automatically at the end of execution and cannot - be done explicitely. - """ - if self._executing: - raise IOError("cannot explicitly close channel within remote_exec") - if self._closed: - self.gateway._trace(self, "ignoring redundant call to close()") - if not self._closed: - # state transition "opened/sendonly" --> "closed" - # threads warning: the channel might be closed under our feet, - # but it's never damaging to send too many CHANNEL_CLOSE messages - # however, if the other side triggered a close already, we - # do not send back a closed message. - if not self._receiveclosed.isSet(): - put = self.gateway._send - if error is not None: - put(Message.CHANNEL_CLOSE_ERROR, self.id, - dumps_internal(error)) - else: - put(Message.CHANNEL_CLOSE, self.id) - self._trace("sent channel close message") - if isinstance(error, RemoteError): - self._remoteerrors.append(error) - self._closed = True # --> "closed" - self._receiveclosed.set() - queue = self._items - if queue is not None: - queue.put(ENDMARKER) - self.gateway._channelfactory._no_longer_opened(self.id) - - def waitclose(self, timeout=None): - """ wait until this channel is closed (or the remote side - otherwise signalled that no more data was being sent). - The channel may still hold receiveable items, but not receive - any more after waitclose() has returned. Exceptions from executing - code on the other side are reraised as local channel.RemoteErrors. - EOFError is raised if the reading-connection was prematurely closed, - which often indicates a dying process. - self.TimeoutError is raised after the specified number of seconds - (default is None, i.e. wait indefinitely). - """ - self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state - if not self._receiveclosed.isSet(): - raise self.TimeoutError("Timeout after %r seconds" % timeout) - error = self._getremoteerror() - if error: - raise error - - def send(self, item): - """sends the given item to the other side of the channel, - possibly blocking if the sender queue is full. - The item must be a simple python type and will be - copied to the other side by value. IOError is - raised if the write pipe was prematurely closed. - """ - if self.isclosed(): - raise IOError("cannot send to %r" %(self,)) - self.gateway._send(Message.CHANNEL_DATA, self.id, dumps_internal(item)) - - def receive(self, timeout=None): - """receive a data item that was sent from the other side. - timeout: None [default] blocked waiting. A positive number - indicates the number of seconds after which a channel.TimeoutError - exception will be raised if no item was received. - Note that exceptions from the remotely executing code will be - reraised as channel.RemoteError exceptions containing - a textual representation of the remote traceback. - """ - itemqueue = self._items - if itemqueue is None: - raise IOError("cannot receive(), channel has receiver callback") - try: - x = itemqueue.get(timeout=timeout) - except self.gateway.execmodel.queue.Empty: - raise self.TimeoutError("no item after %r seconds" %(timeout)) - if x is ENDMARKER: - itemqueue.put(x) # for other receivers - raise self._getremoteerror() or EOFError() - else: - return x - - def __iter__(self): - return self - - def next(self): - try: - return self.receive() - except EOFError: - raise StopIteration - __next__ = next - - - def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): - """ - set the string coercion for this channel - the default is to try to convert py2 str as py3 str, - but not to try and convert py3 str to py2 str - """ - self._strconfig = (py2str_as_py3str, py3str_as_py2str) - data = dumps_internal(self._strconfig) - self.gateway._send(Message.RECONFIGURE, self.id, data=data) - -ENDMARKER = object() -INTERRUPT_TEXT = "keyboard-interrupted" - -class ChannelFactory(object): - def __init__(self, gateway, startcount=1): - self._channels = weakref.WeakValueDictionary() - self._callbacks = {} - self._writelock = gateway.execmodel.Lock() - self.gateway = gateway - self.count = startcount - self.finished = False - self._list = list # needed during interp-shutdown - - def new(self, id=None): - """ create a new Channel with 'id' (or create new id if None). """ - with self._writelock: - if self.finished: - raise IOError("connexion already closed: %s" % (self.gateway,)) - if id is None: - id = self.count - self.count += 2 - try: - channel = self._channels[id] - except KeyError: - channel = self._channels[id] = Channel(self.gateway, id) - return channel - - def channels(self): - return self._list(self._channels.values()) - - # - # internal methods, called from the receiver thread - # - def _no_longer_opened(self, id): - try: - del self._channels[id] - except KeyError: - pass - try: - callback, endmarker, strconfig = self._callbacks.pop(id) - except KeyError: - pass - else: - if endmarker is not NO_ENDMARKER_WANTED: - callback(endmarker) - - def _local_close(self, id, remoteerror=None, sendonly=False): - channel = self._channels.get(id) - if channel is None: - # channel already in "deleted" state - if remoteerror: - remoteerror.warn() - self._no_longer_opened(id) - else: - # state transition to "closed" state - if remoteerror: - channel._remoteerrors.append(remoteerror) - queue = channel._items - if queue is not None: - queue.put(ENDMARKER) - self._no_longer_opened(id) - if not sendonly: # otherwise #--> "sendonly" - channel._closed = True # --> "closed" - channel._receiveclosed.set() - - def _local_receive(self, id, data): - # executes in receiver thread - channel = self._channels.get(id) - try: - callback, endmarker, strconfig = self._callbacks[id] - except KeyError: - queue = channel and channel._items - if queue is None: - pass # drop data - else: - item = loads_internal(data, channel) - queue.put(item) - else: - try: - data = loads_internal(data, channel, strconfig) - callback(data) # even if channel may be already closed - except Exception: - excinfo = sys.exc_info() - self.gateway._trace("exception during callback: %s" % - excinfo[1]) - errortext = self.gateway._geterrortext(excinfo) - self.gateway._send(Message.CHANNEL_CLOSE_ERROR, - id, dumps_internal(errortext)) - self._local_close(id, errortext) - - def _finished_receiving(self): - self.gateway._trace("finished receiving") - with self._writelock: - self.finished = True - for id in self._list(self._channels): - self._local_close(id, sendonly=True) - for id in self._list(self._callbacks): - self._no_longer_opened(id) - -class ChannelFile(object): - def __init__(self, channel, proxyclose=True): - self.channel = channel - self._proxyclose = proxyclose - - def isatty(self): - return False - - def close(self): - if self._proxyclose: - self.channel.close() - - def __repr__(self): - state = self.channel.isclosed() and 'closed' or 'open' - return '' %(self.channel.id, state) - -class ChannelFileWrite(ChannelFile): - def write(self, out): - self.channel.send(out) - - def flush(self): - pass - -class ChannelFileRead(ChannelFile): - def __init__(self, channel, proxyclose=True): - super(ChannelFileRead, self).__init__(channel, proxyclose) - self._buffer = None - - def read(self, n): - try: - if self._buffer is None: - self._buffer = self.channel.receive() - while len(self._buffer) < n: - self._buffer += self.channel.receive() - except EOFError: - self.close() - if self._buffer is None: - ret = "" - else: - ret = self._buffer[:n] - self._buffer = self._buffer[n:] - return ret - - def readline(self): - if self._buffer is not None: - i = self._buffer.find("\n") - if i != -1: - return self.read(i+1) - line = self.read(len(self._buffer)+1) - else: - line = self.read(1) - while line and line[-1] != "\n": - c = self.read(1) - if not c: - break - line += c - return line - -class BaseGateway(object): - exc_info = sys.exc_info - _sysex = sysex - id = "" - - def __init__(self, io, id, _startcount=2): - self.execmodel = io.execmodel - self._io = io - self.id = id - self._strconfig = (Unserializer.py2str_as_py3str, - Unserializer.py3str_as_py2str) - self._channelfactory = ChannelFactory(self, _startcount) - self._receivelock = self.execmodel.RLock() - # globals may be NONE at process-termination - self.__trace = trace - self._geterrortext = geterrortext - self._receivepool = self.execmodel.WorkerPool(1) - - def _trace(self, *msg): - self.__trace(self.id, *msg) - - def _initreceive(self): - self._receiverthread = self._receivepool.spawn(self._thread_receiver) - - def _thread_receiver(self): - def log(*msg): - self._trace("[receiver-thread]", *msg) - - log("RECEIVERTHREAD: starting to run") - io = self._io - try: - try: - while 1: - msg = Message.from_io(io) - log("received", msg) - with self._receivelock: - msg.received(self) - del msg - except (KeyboardInterrupt, GatewayReceivedTerminate): - pass - except EOFError: - log("EOF without prior gateway termination message") - self._error = self.exc_info()[1] - except Exception: - log(self._geterrortext(self.exc_info())) - finally: - try: - log('entering finalization') - # wake up and terminate any execution waiting to receive - self._channelfactory._finished_receiving() - log('terminating execution') - self._terminate_execution() - log('closing read') - self._io.close_read() - log('closing write') - self._io.close_write() - log('leaving finalization') - except: # be silent at shutdown - pass - - def _terminate_execution(self): - pass - - def _send(self, msgcode, channelid=0, data=bytes()): - message = Message(msgcode, channelid, data) - try: - message.to_io(self._io) - self._trace('sent', message) - except (IOError, ValueError): - e = sys.exc_info()[1] - self._trace('failed to send', message, e) - # ValueError might be because the IO is already closed - raise IOError("cannot send (already closed?)") - - def _local_schedulexec(self, channel, sourcetask): - channel.close("execution disallowed") - - # _____________________________________________________________________ - # - # High Level Interface - # _____________________________________________________________________ - # - def newchannel(self): - """ return a new independent channel. """ - return self._channelfactory.new() - - def join(self, timeout=None): - """ Wait for receiverthread to terminate. """ - self._trace("waiting for receiver thread to finish") - self._receiverthread.waitfinish() - -class SlaveGateway(BaseGateway): - - def _local_schedulexec(self, channel, sourcetask): - sourcetask = loads_internal(sourcetask) - self._execpool.spawn(self.executetask, ((channel, sourcetask))) - - def _terminate_execution(self): - # called from receiverthread - self._trace("shutting down execution pool") - self._execpool.shutdown() - if not self._execpool.waitall(5.0): - self._trace("execution ongoing after 5 secs, trying interrupt_main") - # We try hard to terminate execution based on the assumption - # that there is only one gateway object running per-process. - if sys.platform != "win32": - self._trace("sending ourselves a SIGINT") - os.kill(os.getpid(), 2) # send ourselves a SIGINT - elif interrupt_main is not None: - self._trace("calling interrupt_main()") - interrupt_main() - if not self._execpool.waitall(10.0): - self._trace("execution did not finish in 10 secs, " - "calling os._exit()") - os._exit(1) - - def serve(self): - trace = lambda msg: self._trace("[serve] " + msg) - hasprimary = self.execmodel.backend == "thread" - self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) - trace("spawning receiver thread") - self._initreceive() - try: - try: - if hasprimary: - trace("integrating as main primary exec thread") - self._execpool.integrate_as_primary_thread() - else: - trace("waiting for execution to finish") - self._execpool.wait_for_shutdown() - finally: - trace("execution finished") - - trace("joining receiver thread") - self.join() - except KeyboardInterrupt: - # in the slave we can't really do anything sensible - trace("swallowing keyboardinterrupt, serve finished") - - def executetask(self, item): - try: - channel, (source, call_name, kwargs) = item - if not ISPY3 and kwargs: - # some python2 versions do not accept unicode keyword params - # note: Unserializer generally turns py2-str to py3-str objects - newkwargs = {} - for name, value in kwargs.items(): - if isinstance(name, unicode): - name = name.encode('ascii') - newkwargs[name] = value - kwargs = newkwargs - loc = {'channel' : channel, '__name__': '__channelexec__'} - self._trace("execution starts[%s]: %s" % - (channel.id, repr(source)[:50])) - channel._executing = True - try: - co = compile(source+'\n', '', 'exec') - do_exec(co, loc) # noqa - if call_name: - self._trace('calling %s(**%60r)' % (call_name, kwargs)) - function = loc[call_name] - function(channel, **kwargs) - finally: - channel._executing = False - self._trace("execution finished") - except KeyboardInterrupt: - channel.close(INTERRUPT_TEXT) - raise - except: - excinfo = self.exc_info() - if not isinstance(excinfo[1], EOFError): - if not channel.gateway._channelfactory.finished: - self._trace("got exception: %r" % (excinfo[1],)) - errortext = self._geterrortext(excinfo) - channel.close(errortext) - return - self._trace("ignoring EOFError because receiving finished") - channel.close() - -# -# Cross-Python pickling code, tested from test_serializer.py -# - -class DataFormatError(Exception): - pass - -class DumpError(DataFormatError): - """Error while serializing an object.""" - -class LoadError(DataFormatError): - """Error while unserializing an object.""" - -if ISPY3: - def bchr(n): - return bytes([n]) -else: - bchr = chr - -DUMPFORMAT_VERSION = bchr(1) - -FOUR_BYTE_INT_MAX = 2147483647 - -FLOAT_FORMAT = "!d" -FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT) - -class _Stop(Exception): - pass - -class Unserializer(object): - num2func = {} # is filled after this class definition - py2str_as_py3str = True # True - py3str_as_py2str = False # false means py2 will get unicode - - def __init__(self, stream, channel_or_gateway=None, strconfig=None): - gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) - strconfig = getattr(channel_or_gateway, '_strconfig', strconfig) - if strconfig: - self.py2str_as_py3str, self.py3str_as_py2str = strconfig - self.stream = stream - self.channelfactory = getattr(gateway, '_channelfactory', gateway) - - def load(self, versioned=False): - if versioned: - ver = self.stream.read(1) - if ver != DUMPFORMAT_VERSION: - raise LoadError("wrong dumpformat version") - self.stack = [] - try: - while True: - opcode = self.stream.read(1) - if not opcode: - raise EOFError - try: - loader = self.num2func[opcode] - except KeyError: - raise LoadError("unkown opcode %r - " - "wire protocol corruption?" % (opcode,)) - loader(self) - except _Stop: - if len(self.stack) != 1: - raise LoadError("internal unserialization error") - return self.stack.pop(0) - else: - raise LoadError("didn't get STOP") - - def load_none(self): - self.stack.append(None) - - def load_true(self): - self.stack.append(True) - - def load_false(self): - self.stack.append(False) - - def load_int(self): - i = self._read_int4() - self.stack.append(i) - - def load_longint(self): - s = self._read_byte_string() - self.stack.append(int(s)) - - if ISPY3: - load_long = load_int - load_longlong = load_longint - else: - def load_long(self): - i = self._read_int4() - self.stack.append(long(i)) - - def load_longlong(self): - l = self._read_byte_string() - self.stack.append(long(l)) - - def load_float(self): - binary = self.stream.read(FLOAT_FORMAT_SIZE) - self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0]) - - def _read_int4(self): - return struct.unpack("!i", self.stream.read(4))[0] - - def _read_byte_string(self): - length = self._read_int4() - as_bytes = self.stream.read(length) - return as_bytes - - def load_py3string(self): - as_bytes = self._read_byte_string() - if not ISPY3 and self.py3str_as_py2str: - # XXX Should we try to decode into latin-1? - self.stack.append(as_bytes) - else: - self.stack.append(as_bytes.decode("utf-8")) - - def load_py2string(self): - as_bytes = self._read_byte_string() - if ISPY3 and self.py2str_as_py3str: - s = as_bytes.decode("latin-1") - else: - s = as_bytes - self.stack.append(s) - - def load_bytes(self): - s = self._read_byte_string() - self.stack.append(s) - - def load_unicode(self): - self.stack.append(self._read_byte_string().decode("utf-8")) - - def load_newlist(self): - length = self._read_int4() - self.stack.append([None] * length) - - def load_setitem(self): - if len(self.stack) < 3: - raise LoadError("not enough items for setitem") - value = self.stack.pop() - key = self.stack.pop() - self.stack[-1][key] = value - - def load_newdict(self): - self.stack.append({}) - - def _load_collection(self, type_): - length = self._read_int4() - if length: - res = type_(self.stack[-length:]) - del self.stack[-length:] - self.stack.append(res) - else: - self.stack.append(type_()) - - def load_buildtuple(self): - self._load_collection(tuple) - - def load_set(self): - self._load_collection(set) - - def load_frozenset(self): - self._load_collection(frozenset) - - def load_stop(self): - raise _Stop - - def load_channel(self): - id = self._read_int4() - newchannel = self.channelfactory.new(id) - self.stack.append(newchannel) - -# automatically build opcodes and byte-encoding - -class opcode: - """ container for name -> num mappings. """ - -def _buildopcodes(): - l = [] - for name, func in Unserializer.__dict__.items(): - if name.startswith("load_"): - opname = name[5:].upper() - l.append((opname, func)) - l.sort() - for i,(opname, func) in enumerate(l): - assert i < 26, "xxx" - i = bchr(64+i) - Unserializer.num2func[i] = func - setattr(opcode, opname, i) - -_buildopcodes() - -def dumps(obj): - """ return a serialized bytestring of the given obj. - - The obj and all contained objects must be of a builtin - python type (so nested dicts, sets, etc. are all ok but - not user-level instances). - """ - return _Serializer().save(obj, versioned=True) - -def dump(byteio, obj): - """ write a serialized bytestring of the given obj to the given stream. """ - _Serializer(write=byteio.write).save(obj, versioned=True) - -def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False): - """ return the object as deserialized from the given bytestring. - - py2str_as_py3str: if true then string (str) objects previously - dumped on Python2 will be loaded as Python3 - strings which really are text objects. - py3str_as_py2str: if true then string (str) objects previously - dumped on Python3 will be loaded as Python2 - strings instead of unicode objects. - - if the bytestring was dumped with an incompatible protocol - version or if the bytestring is corrupted, the - ``execnet.DataFormatError`` will be raised. - """ - io = BytesIO(bytestring) - return load(io, py2str_as_py3str=py2str_as_py3str, - py3str_as_py2str=py3str_as_py2str) - -def load(io, py2str_as_py3str=False, py3str_as_py2str=False): - """ derserialize an object form the specified stream. - - Behaviour and parameters are otherwise the same as with ``loads`` - """ - strconfig=(py2str_as_py3str, py3str_as_py2str) - return Unserializer(io, strconfig=strconfig).load(versioned=True) - -def loads_internal(bytestring, channelfactory=None, strconfig=None): - io = BytesIO(bytestring) - return Unserializer(io, channelfactory, strconfig).load() - -def dumps_internal(obj): - return _Serializer().save(obj) - - -class _Serializer(object): - _dispatch = {} - - def __init__(self, write=None): - if write is None: - self._streamlist = [] - write = self._streamlist.append - self._write = write - - def save(self, obj, versioned=False): - # calling here is not re-entrant but multiple instances - # may write to the same stream because of the common platform - # atomic-write guaruantee (concurrent writes each happen atomicly) - if versioned: - self._write(DUMPFORMAT_VERSION) - self._save(obj) - self._write(opcode.STOP) - try: - streamlist = self._streamlist - except AttributeError: - return None - return type(streamlist[0])().join(streamlist) - - def _save(self, obj): - tp = type(obj) - try: - dispatch = self._dispatch[tp] - except KeyError: - methodname = 'save_' + tp.__name__ - meth = getattr(self.__class__, methodname, None) - if meth is None: - raise DumpError("can't serialize %s" % (tp,)) - dispatch = self._dispatch[tp] = meth - dispatch(self, obj) - - def save_NoneType(self, non): - self._write(opcode.NONE) - - def save_bool(self, boolean): - if boolean: - self._write(opcode.TRUE) - else: - self._write(opcode.FALSE) - - def save_bytes(self, bytes_): - self._write(opcode.BYTES) - self._write_byte_sequence(bytes_) - - if ISPY3: - def save_str(self, s): - self._write(opcode.PY3STRING) - self._write_unicode_string(s) - else: - def save_str(self, s): - self._write(opcode.PY2STRING) - self._write_byte_sequence(s) - - def save_unicode(self, s): - self._write(opcode.UNICODE) - self._write_unicode_string(s) - - def _write_unicode_string(self, s): - try: - as_bytes = s.encode("utf-8") - except UnicodeEncodeError: - raise DumpError("strings must be utf-8 encodable") - self._write_byte_sequence(as_bytes) - - def _write_byte_sequence(self, bytes_): - self._write_int4(len(bytes_), "string is too long") - self._write(bytes_) - - def _save_integral(self, i, short_op, long_op): - if i <= FOUR_BYTE_INT_MAX: - self._write(short_op) - self._write_int4(i) - else: - self._write(long_op) - self._write_byte_sequence(str(i).rstrip("L").encode("ascii")) - - def save_int(self, i): - self._save_integral(i, opcode.INT, opcode.LONGINT) - - def save_long(self, l): - self._save_integral(l, opcode.LONG, opcode.LONGLONG) - - def save_float(self, flt): - self._write(opcode.FLOAT) - self._write(struct.pack(FLOAT_FORMAT, flt)) - - def _write_int4(self, i, error="int must be less than %i" % - (FOUR_BYTE_INT_MAX,)): - if i > FOUR_BYTE_INT_MAX: - raise DumpError(error) - self._write(struct.pack("!i", i)) - - def save_list(self, L): - self._write(opcode.NEWLIST) - self._write_int4(len(L), "list is too long") - for i, item in enumerate(L): - self._write_setitem(i, item) - - def _write_setitem(self, key, value): - self._save(key) - self._save(value) - self._write(opcode.SETITEM) - - def save_dict(self, d): - self._write(opcode.NEWDICT) - for key, value in d.items(): - self._write_setitem(key, value) - - def save_tuple(self, tup): - for item in tup: - self._save(item) - self._write(opcode.BUILDTUPLE) - self._write_int4(len(tup), "tuple is too long") - - def _write_set(self, s, op): - for item in s: - self._save(item) - self._write(op) - self._write_int4(len(s), "set is too long") - - def save_set(self, s): - self._write_set(s, opcode.SET) - - def save_frozenset(self, s): - self._write_set(s, opcode.FROZENSET) - - def save_Channel(self, channel): - self._write(opcode.CHANNEL) - self._write_int4(channel.id) - -def init_popen_io(execmodel): - if not hasattr(os, 'dup'): # jython - io = Popen2IO(sys.stdout, sys.stdin, execmodel) - import tempfile - sys.stdin = tempfile.TemporaryFile('r') - sys.stdout = tempfile.TemporaryFile('w') - else: - try: - devnull = os.devnull - except AttributeError: - if os.name == 'nt': - devnull = 'NUL' - else: - devnull = '/dev/null' - # stdin - stdin = execmodel.fdopen(os.dup(0), 'r', 1) - fd = os.open(devnull, os.O_RDONLY) - os.dup2(fd, 0) - os.close(fd) - - # stdout - stdout = execmodel.fdopen(os.dup(1), 'w', 1) - fd = os.open(devnull, os.O_WRONLY) - os.dup2(fd, 1) - - # stderr for win32 - if os.name == 'nt': - sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1) - os.dup2(fd, 2) - os.close(fd) - io = Popen2IO(stdout, stdin, execmodel) - sys.stdin = execmodel.fdopen(0, 'r', 1) - sys.stdout = execmodel.fdopen(1, 'w', 1) - return io - -def serve(io, id): - trace("creating slavegateway on %r" %(io,)) - SlaveGateway(io=io, id=id, _startcount=2).serve() diff --git a/remoto/lib/execnet/gateway_bootstrap.py b/remoto/lib/execnet/gateway_bootstrap.py deleted file mode 100644 index abd084a..0000000 --- a/remoto/lib/execnet/gateway_bootstrap.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -code to initialize the remote side of a gateway once the io is created -""" -import os -import inspect -import execnet -from execnet import gateway_base -from execnet.gateway import Gateway -importdir = os.path.dirname(os.path.dirname(execnet.__file__)) - - -class HostNotFound(Exception): - pass - - -def bootstrap_popen(io, spec): - sendexec(io, - "import sys", - "sys.path.insert(0, %r)" % importdir, - "from execnet.gateway_base import serve, init_popen_io, get_execmodel", - "sys.stdout.write('1')", - "sys.stdout.flush()", - "execmodel = get_execmodel(%r)" % spec.execmodel, - "serve(init_popen_io(execmodel), id='%s-slave')" % spec.id, - ) - s = io.read(1) - assert s == "1".encode('ascii'), repr(s) - - -def bootstrap_ssh(io, spec): - try: - sendexec(io, - inspect.getsource(gateway_base), - "execmodel = get_execmodel(%r)" % spec.execmodel, - 'io = init_popen_io(execmodel)', - "io.write('1'.encode('ascii'))", - "serve(io, id='%s-slave')" % spec.id, - ) - s = io.read(1) - assert s == "1".encode('ascii') - except EOFError: - ret = io.wait() - if ret == 255: - raise HostNotFound(io.remoteaddress) - - -def bootstrap_socket(io, id): - #XXX: switch to spec - from execnet.gateway_socket import SocketIO - - sendexec(io, - inspect.getsource(gateway_base), - 'import socket', - inspect.getsource(SocketIO), - "try: execmodel", - "except NameError:", - " execmodel = get_execmodel('thread')", - "io = SocketIO(clientsock, execmodel)", - "io.write('1'.encode('ascii'))", - "serve(io, id='%s-slave')" % id, - ) - s = io.read(1) - assert s == "1".encode('ascii') - - -def sendexec(io, *sources): - source = "\n".join(sources) - io.write((repr(source)+ "\n").encode('ascii')) - - -def fix_pid_for_jython_popen(gw): - """ - fix for jython 2.5.1 - """ - spec, io = gw.spec, gw._io - if spec.popen and not spec.via: - #XXX: handle the case of remote being jython - # and not having the popen pid - if io.popen.pid is None: - io.popen.pid = gw.remote_exec( - "import os; channel.send(os.getpid())").receive() - - -def bootstrap(io, spec): - if spec.popen: - bootstrap_popen(io, spec) - elif spec.ssh: - bootstrap_ssh(io, spec) - elif spec.socket: - bootstrap_socket(io, spec) - else: - raise ValueError('unknown gateway type, cant bootstrap') - gw = Gateway(io, spec) - fix_pid_for_jython_popen(gw) - return gw - - diff --git a/remoto/lib/execnet/gateway_io.py b/remoto/lib/execnet/gateway_io.py deleted file mode 100644 index cc64d84..0000000 --- a/remoto/lib/execnet/gateway_io.py +++ /dev/null @@ -1,210 +0,0 @@ -""" -execnet io initialization code - -creates io instances used for gateway io -""" -import os -import sys - -try: - from execnet.gateway_base import Popen2IO, Message -except ImportError: - from __main__ import Popen2IO, Message - -class Popen2IOMaster(Popen2IO): - def __init__(self, args, execmodel): - self.popen = p = execmodel.PopenPiped(args) - Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel) - - def wait(self): - try: - return self.popen.wait() - except OSError: - pass # subprocess probably dead already - - def kill(self): - killpopen(self.popen) - -def killpopen(popen): - try: - if hasattr(popen, 'kill'): - popen.kill() - else: - killpid(popen.pid) - except EnvironmentError: - sys.stderr.write("ERROR killing: %s\n" %(sys.exc_info()[1])) - sys.stderr.flush() - -def killpid(pid): - if hasattr(os, 'kill'): - os.kill(pid, 15) - elif sys.platform == "win32" or getattr(os, '_name', None) == 'nt': - try: - import ctypes - except ImportError: - import subprocess - # T: treekill, F: Force - cmd = ("taskkill /T /F /PID %d" %(pid)).split() - ret = subprocess.call(cmd) - if ret != 0: - raise EnvironmentError("taskkill returned %r" %(ret,)) - else: - PROCESS_TERMINATE = 1 - handle = ctypes.windll.kernel32.OpenProcess( - PROCESS_TERMINATE, False, pid) - ctypes.windll.kernel32.TerminateProcess(handle, -1) - ctypes.windll.kernel32.CloseHandle(handle) - else: - raise EnvironmentError("no method to kill %s" %(pid,)) - - - -popen_bootstrapline = "import sys;exec(eval(sys.stdin.readline()))" - - -def popen_args(spec): - python = spec.python or sys.executable - args = str(python).split(' ') - args.append('-u') - if spec is not None and spec.dont_write_bytecode: - args.append("-B") - # Slight gymnastics in ordering these arguments because CPython (as of - # 2.7.1) ignores -B if you provide `python -c "something" -B` - args.extend(['-c', popen_bootstrapline]) - return args - - -def ssh_args(spec): - remotepython = spec.python or "python" - args = ["ssh", "-C" ] - if spec.ssh_config is not None: - args.extend(['-F', str(spec.ssh_config)]) - - args.extend(spec.ssh.split()) - remotecmd = '%s -c "%s"' % (remotepython, popen_bootstrapline) - args.append(remotecmd) - return args - -def create_io(spec, execmodel): - if spec.popen: - args = popen_args(spec) - return Popen2IOMaster(args, execmodel) - if spec.ssh: - args = ssh_args(spec) - io = Popen2IOMaster(args, execmodel) - io.remoteaddress = spec.ssh - return io - -# -# Proxy Gateway handling code -# -# master: proxy initiator -# forwarder: forwards between master and sub -# sub: sub process that is proxied to the initiator - -RIO_KILL = 1 -RIO_WAIT = 2 -RIO_REMOTEADDRESS = 3 -RIO_CLOSE_WRITE = 4 - -class ProxyIO(object): - """ A Proxy IO object allows to instantiate a Gateway - through another "via" gateway. A master:ProxyIO object - provides an IO object effectively connected to the sub - via the forwarder. To achieve this, master:ProxyIO interacts - with forwarder:serve_proxy_io() which itself - instantiates and interacts with the sub. - """ - def __init__(self, proxy_channel, execmodel): - # after exchanging the control channel we use proxy_channel - # for messaging IO - self.controlchan = proxy_channel.gateway.newchannel() - proxy_channel.send(self.controlchan) - self.iochan = proxy_channel - self.iochan_file = self.iochan.makefile('r') - self.execmodel = execmodel - - def read(self, nbytes): - return self.iochan_file.read(nbytes) - - def write(self, data): - return self.iochan.send(data) - - def _controll(self, event): - self.controlchan.send(event) - return self.controlchan.receive() - - def close_write(self): - self._controll(RIO_CLOSE_WRITE) - - def kill(self): - self._controll(RIO_KILL) - - def wait(self): - return self._controll(RIO_WAIT) - - @property - def remoteaddress(self): - return self._controll(RIO_REMOTEADDRESS) - - def __repr__(self): - return '' % (self.iochan.gateway.id, ) - -class PseudoSpec: - def __init__(self, vars): - self.__dict__.update(vars) - def __getattr__(self, name): - return None - -def serve_proxy_io(proxy_channelX): - execmodel = proxy_channelX.gateway.execmodel - _trace = proxy_channelX.gateway._trace - tag = "serve_proxy_io:%s " % proxy_channelX.id - def log(*msg): - _trace(tag + msg[0], *msg[1:]) - spec = PseudoSpec(proxy_channelX.receive()) - # create sub IO object which we will proxy back to our proxy initiator - sub_io = create_io(spec, execmodel) - control_chan = proxy_channelX.receive() - log("got control chan", control_chan) - - # read data from master, forward it to the sub - # XXX writing might block, thus blocking the receiver thread - def forward_to_sub(data): - log("forward data to sub, size %s" % len(data)) - sub_io.write(data) - proxy_channelX.setcallback(forward_to_sub) - - def controll(data): - if data==RIO_WAIT: - control_chan.send(sub_io.wait()) - elif data==RIO_KILL: - control_chan.send(sub_io.kill()) - elif data==RIO_REMOTEADDRESS: - control_chan.send(sub_io.remoteaddress) - elif data==RIO_CLOSE_WRITE: - control_chan.send(sub_io.close_write()) - control_chan.setcallback(controll) - - # write data to the master coming from the sub - forward_to_master_file = proxy_channelX.makefile("w") - - # read bootstrap byte from sub, send it on to master - log('reading bootstrap byte from sub', spec.id) - initial = sub_io.read(1) - assert initial == '1'.encode('ascii'), initial - log('forwarding bootstrap byte from sub', spec.id) - forward_to_master_file.write(initial) - - # enter message forwarding loop - while True: - try: - message = Message.from_io(sub_io) - except EOFError: - log('EOF from sub, terminating proxying loop', spec.id) - break - message.to_io(forward_to_master_file) - # proxy_channelX will be closed from remote_exec's finalization code - -if __name__ == "__channelexec__": - serve_proxy_io(channel) # noqa diff --git a/remoto/lib/execnet/gateway_socket.py b/remoto/lib/execnet/gateway_socket.py deleted file mode 100644 index 3bb0589..0000000 --- a/remoto/lib/execnet/gateway_socket.py +++ /dev/null @@ -1,89 +0,0 @@ -from execnet.gateway_bootstrap import HostNotFound -import sys - -try: bytes -except NameError: bytes = str - -class SocketIO: - def __init__(self, sock, execmodel): - self.sock = sock - self.execmodel = execmodel - socket = execmodel.socket - try: - sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY - sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - except (AttributeError, socket.error): - sys.stderr.write("WARNING: cannot set socketoption") - - def read(self, numbytes): - "Read exactly 'bytes' bytes from the socket." - buf = bytes() - while len(buf) < numbytes: - t = self.sock.recv(numbytes - len(buf)) - if not t: - raise EOFError - buf += t - return buf - - def write(self, data): - self.sock.sendall(data) - - def close_read(self): - try: - self.sock.shutdown(0) - except self.execmodel.socket.error: - pass - def close_write(self): - try: - self.sock.shutdown(1) - except self.execmodel.socket.error: - pass - - def wait(self): - pass - - def kill(self): - pass - - -def start_via(gateway, hostport=None): - """ return a host, port tuple, - after instanciating a socketserver on the given gateway - """ - if hostport is None: - host, port = ('localhost', 0) - else: - host, port = hostport - - from execnet.script import socketserver - - # execute the above socketserverbootstrap on the other side - channel = gateway.remote_exec(socketserver) - channel.send((host, port)) - (realhost, realport) = channel.receive() - #self._trace("new_remote received" - # "port=%r, hostname = %r" %(realport, hostname)) - if not realhost or realhost=="0.0.0.0": - realhost = "localhost" - return realhost, realport - - -def create_io(spec, group, execmodel): - assert not spec.python, ( - "socket: specifying python executables not yet supported") - gateway_id = spec.installvia - if gateway_id: - host, port = start_via(group[gateway_id]) - else: - host, port = spec.socket.split(":") - port = int(port) - - socket = execmodel.socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - io = SocketIO(sock, execmodel) - io.remoteaddress = '%s:%d' % (host, port) - try: - sock.connect((host, port)) - except execmodel.socket.gaierror: - raise HostNotFound(str(sys.exc_info()[1])) - return io diff --git a/remoto/lib/execnet/multi.py b/remoto/lib/execnet/multi.py deleted file mode 100644 index 1343f3c..0000000 --- a/remoto/lib/execnet/multi.py +++ /dev/null @@ -1,299 +0,0 @@ -""" -Managing Gateway Groups and interactions with multiple channels. - -(c) 2008-2014, Holger Krekel and others -""" - -import sys, atexit - -from execnet import XSpec -from execnet import gateway_io, gateway_bootstrap -from execnet.gateway_base import reraise, trace, get_execmodel -from threading import Lock - -NO_ENDMARKER_WANTED = object() - -class Group(object): - """ Gateway Groups. """ - defaultspec = "popen" - def __init__(self, xspecs=(), execmodel="thread"): - """ initialize group and make gateways as specified. - execmodel can be 'thread' or 'eventlet'. - """ - self._gateways = [] - self._autoidcounter = 0 - self._autoidlock = Lock() - self._gateways_to_join = [] - # we use the same execmodel for all of the Gateway objects - # we spawn on our side. Probably we should not allow different - # execmodels between different groups but not clear. - # Note that "other side" execmodels may differ and is typically - # specified by the spec passed to makegateway. - self.set_execmodel(execmodel) - for xspec in xspecs: - self.makegateway(xspec) - atexit.register(self._cleanup_atexit) - - @property - def execmodel(self): - return self._execmodel - - @property - def remote_execmodel(self): - return self._remote_execmodel - - def set_execmodel(self, execmodel, remote_execmodel=None): - """ Set the execution model for local and remote site. - - execmodel can be one of "thread" or "eventlet" (XXX gevent). - It determines the execution model for any newly created gateway. - If remote_execmodel is not specified it takes on the value - of execmodel. - - NOTE: Execution models can only be set before any gateway is created. - - """ - if self._gateways: - raise ValueError("can not set execution models if " - "gateways have been created already") - if remote_execmodel is None: - remote_execmodel = execmodel - self._execmodel = get_execmodel(execmodel) - self._remote_execmodel = get_execmodel(remote_execmodel) - - def __repr__(self): - idgateways = [gw.id for gw in self] - return "" %(idgateways) - - def __getitem__(self, key): - if isinstance(key, int): - return self._gateways[key] - for gw in self._gateways: - if gw == key or gw.id == key: - return gw - raise KeyError(key) - - def __contains__(self, key): - try: - self[key] - return True - except KeyError: - return False - - def __len__(self): - return len(self._gateways) - - def __iter__(self): - return iter(list(self._gateways)) - - def makegateway(self, spec=None): - """create and configure a gateway to a Python interpreter. - The ``spec`` string encodes the target gateway type - and configuration information. The general format is:: - - key1=value1//key2=value2//... - - If you leave out the ``=value`` part a True value is assumed. - Valid types: ``popen``, ``ssh=hostname``, ``socket=host:port``. - Valid configuration:: - - id= specifies the gateway id - python= specifies which python interpreter to execute - execmodel=model 'thread', 'eventlet', 'gevent' model for execution - chdir= specifies to which directory to change - nice= specifies process priority of new process - env:NAME=value specifies a remote environment variable setting. - - If no spec is given, self.defaultspec is used. - """ - if not spec: - spec = self.defaultspec - if not isinstance(spec, XSpec): - spec = XSpec(spec) - self.allocate_id(spec) - if spec.execmodel is None: - spec.execmodel = self.remote_execmodel.backend - if spec.via: - assert not spec.socket - master = self[spec.via] - proxy_channel = master.remote_exec(gateway_io) - proxy_channel.send(vars(spec)) - proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel) - gw = gateway_bootstrap.bootstrap(proxy_io_master, spec) - elif spec.popen or spec.ssh: - io = gateway_io.create_io(spec, execmodel=self.execmodel) - gw = gateway_bootstrap.bootstrap(io, spec) - elif spec.socket: - from execnet import gateway_socket - io = gateway_socket.create_io(spec, self, execmodel=self.execmodel) - gw = gateway_bootstrap.bootstrap(io, spec) - else: - raise ValueError("no gateway type found for %r" % (spec._spec,)) - gw.spec = spec - self._register(gw) - if spec.chdir or spec.nice or spec.env: - channel = gw.remote_exec(""" - import os - path, nice, env = channel.receive() - if path: - if not os.path.exists(path): - os.mkdir(path) - os.chdir(path) - if nice and hasattr(os, 'nice'): - os.nice(nice) - if env: - for name, value in env.items(): - os.environ[name] = value - """) - nice = spec.nice and int(spec.nice) or 0 - channel.send((spec.chdir, nice, spec.env)) - channel.waitclose() - return gw - - def allocate_id(self, spec): - """ (re-entrant) allocate id for the given xspec object. """ - if spec.id is None: - with self._autoidlock: - id = "gw" + str(self._autoidcounter) - self._autoidcounter += 1 - if id in self: - raise ValueError("already have gateway with id %r" %(id,)) - spec.id = id - - def _register(self, gateway): - assert not hasattr(gateway, '_group') - assert gateway.id - assert id not in self - self._gateways.append(gateway) - gateway._group = self - - def _unregister(self, gateway): - self._gateways.remove(gateway) - self._gateways_to_join.append(gateway) - - def _cleanup_atexit(self): - trace("=== atexit cleanup %r ===" %(self,)) - self.terminate(timeout=1.0) - - def terminate(self, timeout=None): - """ trigger exit of member gateways and wait for termination - of member gateways and associated subprocesses. After waiting - timeout seconds try to to kill local sub processes of popen- - and ssh-gateways. Timeout defaults to None meaning - open-ended waiting and no kill attempts. - """ - - while self: - vias = {} - for gw in self: - if gw.spec.via: - vias[gw.spec.via] = True - for gw in self: - if gw.id not in vias: - gw.exit() - - def join_wait(gw): - gw.join() - gw._io.wait() - def kill(gw): - trace("Gateways did not come down after timeout: %r" % gw) - gw._io.kill() - - safe_terminate(self.execmodel, timeout, [ - (lambda: join_wait(gw), lambda: kill(gw)) - for gw in self._gateways_to_join]) - self._gateways_to_join[:] = [] - - def remote_exec(self, source, **kwargs): - """ remote_exec source on all member gateways and return - MultiChannel connecting to all sub processes. - """ - channels = [] - for gw in self: - channels.append(gw.remote_exec(source, **kwargs)) - return MultiChannel(channels) - -class MultiChannel: - def __init__(self, channels): - self._channels = channels - - def __len__(self): - return len(self._channels) - - def __iter__(self): - return iter(self._channels) - - def __getitem__(self, key): - return self._channels[key] - - def __contains__(self, chan): - return chan in self._channels - - def send_each(self, item): - for ch in self._channels: - ch.send(item) - - def receive_each(self, withchannel=False): - assert not hasattr(self, '_queue') - l = [] - for ch in self._channels: - obj = ch.receive() - if withchannel: - l.append((ch, obj)) - else: - l.append(obj) - return l - - def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): - try: - return self._queue - except AttributeError: - self._queue = None - for ch in self._channels: - if self._queue is None: - self._queue = ch.gateway.execmodel.queue.Queue() - def putreceived(obj, channel=ch): - self._queue.put((channel, obj)) - if endmarker is NO_ENDMARKER_WANTED: - ch.setcallback(putreceived) - else: - ch.setcallback(putreceived, endmarker=endmarker) - return self._queue - - - def waitclose(self): - first = None - for ch in self._channels: - try: - ch.waitclose() - except ch.RemoteError: - if first is None: - first = sys.exc_info() - if first: - reraise(*first) - - - -def safe_terminate(execmodel, timeout, list_of_paired_functions): - workerpool = execmodel.WorkerPool() - - def termkill(termfunc, killfunc): - termreply = workerpool.spawn(termfunc) - try: - termreply.get(timeout=timeout) - except IOError: - killfunc() - - replylist = [] - for termfunc, killfunc in list_of_paired_functions: - reply = workerpool.spawn(termkill, termfunc, killfunc) - replylist.append(reply) - for reply in replylist: - reply.get() - workerpool.waitall() - - -default_group = Group() -makegateway = default_group.makegateway -set_execmodel = default_group.set_execmodel - diff --git a/remoto/lib/execnet/rsync.py b/remoto/lib/execnet/rsync.py deleted file mode 100644 index ccfad91..0000000 --- a/remoto/lib/execnet/rsync.py +++ /dev/null @@ -1,207 +0,0 @@ -""" -1:N rsync implemenation on top of execnet. - -(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski -""" -import os, stat - -try: - from hashlib import md5 -except ImportError: - from md5 import md5 - -try: - from queue import Queue -except ImportError: - from Queue import Queue - -import execnet.rsync_remote - -class RSync(object): - """ This class allows to send a directory structure (recursively) - to one or multiple remote filesystems. - - There is limited support for symlinks, which means that symlinks - pointing to the sourcetree will be send "as is" while external - symlinks will be just copied (regardless of existance of such - a path on remote side). - """ - def __init__(self, sourcedir, callback=None, verbose=True): - self._sourcedir = str(sourcedir) - self._verbose = verbose - assert callback is None or hasattr(callback, '__call__') - self._callback = callback - self._channels = {} - self._receivequeue = Queue() - self._links = [] - - def filter(self, path): - return True - - def _end_of_channel(self, channel): - if channel in self._channels: - # too early! we must have got an error - channel.waitclose() - # or else we raise one - raise IOError('connection unexpectedly closed: %s ' % ( - channel.gateway,)) - - def _process_link(self, channel): - for link in self._links: - channel.send(link) - # completion marker, this host is done - channel.send(42) - - def _done(self, channel): - """ Call all callbacks - """ - finishedcallback = self._channels.pop(channel) - if finishedcallback: - finishedcallback() - channel.waitclose() - - def _list_done(self, channel): - # sum up all to send - if self._callback: - s = sum([self._paths[i] for i in self._to_send[channel]]) - self._callback("list", s, channel) - - def _send_item(self, channel, data): - """ Send one item - """ - modified_rel_path, checksum = data - modifiedpath = os.path.join(self._sourcedir, *modified_rel_path) - try: - f = open(modifiedpath, 'rb') - data = f.read() - except IOError: - data = None - - # provide info to progress callback function - modified_rel_path = "/".join(modified_rel_path) - if data is not None: - self._paths[modified_rel_path] = len(data) - else: - self._paths[modified_rel_path] = 0 - if channel not in self._to_send: - self._to_send[channel] = [] - self._to_send[channel].append(modified_rel_path) - #print "sending", modified_rel_path, data and len(data) or 0, checksum - - if data is not None: - f.close() - if checksum is not None and checksum == md5(data).digest(): - data = None # not really modified - else: - self._report_send_file(channel.gateway, modified_rel_path) - channel.send(data) - - def _report_send_file(self, gateway, modified_rel_path): - if self._verbose: - print("%s <= %s" %(gateway, modified_rel_path)) - - def send(self, raises=True): - """ Sends a sourcedir to all added targets. Flag indicates - whether to raise an error or return in case of lack of - targets - """ - if not self._channels: - if raises: - raise IOError("no targets available, maybe you " - "are trying call send() twice?") - return - # normalize a trailing '/' away - self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x')) - # send directory structure and file timestamps/sizes - self._send_directory_structure(self._sourcedir) - - # paths and to_send are only used for doing - # progress-related callbacks - self._paths = {} - self._to_send = {} - - # send modified file to clients - while self._channels: - channel, req = self._receivequeue.get() - if req is None: - self._end_of_channel(channel) - else: - command, data = req - if command == "links": - self._process_link(channel) - elif command == "done": - self._done(channel) - elif command == "ack": - if self._callback: - self._callback("ack", self._paths[data], channel) - elif command == "list_done": - self._list_done(channel) - elif command == "send": - self._send_item(channel, data) - del data - else: - assert "Unknown command %s" % command - - def add_target(self, gateway, destdir, - finishedcallback=None, **options): - """ Adds a remote target specified via a gateway - and a remote destination directory. - """ - for name in options: - assert name in ('delete',) - def itemcallback(req): - self._receivequeue.put((channel, req)) - channel = gateway.remote_exec(execnet.rsync_remote) - channel.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False) - channel.setcallback(itemcallback, endmarker = None) - channel.send((str(destdir), options)) - self._channels[channel] = finishedcallback - - def _broadcast(self, msg): - for channel in self._channels: - channel.send(msg) - - def _send_link(self, linktype, basename, linkpoint): - self._links.append((linktype, basename, linkpoint)) - - def _send_directory(self, path): - # dir: send a list of entries - names = [] - subpaths = [] - for name in os.listdir(path): - p = os.path.join(path, name) - if self.filter(p): - names.append(name) - subpaths.append(p) - mode = os.lstat(path).st_mode - self._broadcast([mode] + names) - for p in subpaths: - self._send_directory_structure(p) - - def _send_link_structure(self, path): - linkpoint = os.readlink(path) - basename = path[len(self._sourcedir) + 1:] - if linkpoint.startswith(self._sourcedir): - self._send_link("linkbase", basename, - linkpoint[len(self._sourcedir) + 1:]) - else: - # relative or absolute link, just send it - self._send_link("link", basename, linkpoint) - self._broadcast(None) - - def _send_directory_structure(self, path): - try: - st = os.lstat(path) - except OSError: - self._broadcast((None, 0, 0)) - return - if stat.S_ISREG(st.st_mode): - # regular file: send a mode/timestamp/size pair - self._broadcast((st.st_mode, st.st_mtime, st.st_size)) - elif stat.S_ISDIR(st.st_mode): - self._send_directory(path) - elif stat.S_ISLNK(st.st_mode): - self._send_link_structure(path) - else: - raise ValueError("cannot sync %r" % (path,)) - diff --git a/remoto/lib/execnet/rsync_remote.py b/remoto/lib/execnet/rsync_remote.py deleted file mode 100644 index a36b8ce..0000000 --- a/remoto/lib/execnet/rsync_remote.py +++ /dev/null @@ -1,109 +0,0 @@ -""" -(c) 2006-2013, Armin Rigo, Holger Krekel, Maciej Fijalkowski -""" -def serve_rsync(channel): - import os, stat, shutil - try: - from hashlib import md5 - except ImportError: - from md5 import md5 - destdir, options = channel.receive() - modifiedfiles = [] - - def remove(path): - assert path.startswith(destdir) - try: - os.unlink(path) - except OSError: - # assume it's a dir - shutil.rmtree(path) - - def receive_directory_structure(path, relcomponents): - try: - st = os.lstat(path) - except OSError: - st = None - msg = channel.receive() - if isinstance(msg, list): - if st and not stat.S_ISDIR(st.st_mode): - os.unlink(path) - st = None - if not st: - os.makedirs(path) - mode = msg.pop(0) - if mode: - os.chmod(path, mode) - entrynames = {} - for entryname in msg: - destpath = os.path.join(path, entryname) - receive_directory_structure(destpath, relcomponents + [entryname]) - entrynames[entryname] = True - if options.get('delete'): - for othername in os.listdir(path): - if othername not in entrynames: - otherpath = os.path.join(path, othername) - remove(otherpath) - elif msg is not None: - assert isinstance(msg, tuple) - checksum = None - if st: - if stat.S_ISREG(st.st_mode): - msg_mode, msg_mtime, msg_size = msg - if msg_size != st.st_size: - pass - elif msg_mtime != st.st_mtime: - f = open(path, 'rb') - checksum = md5(f.read()).digest() - f.close() - elif msg_mode and msg_mode != st.st_mode: - os.chmod(path, msg_mode) - return - else: - return # already fine - else: - remove(path) - channel.send(("send", (relcomponents, checksum))) - modifiedfiles.append((path, msg)) - receive_directory_structure(destdir, []) - - STRICT_CHECK = False # seems most useful this way for py.test - channel.send(("list_done", None)) - - for path, (mode, time, size) in modifiedfiles: - data = channel.receive() - channel.send(("ack", path[len(destdir) + 1:])) - if data is not None: - if STRICT_CHECK and len(data) != size: - raise IOError('file modified during rsync: %r' % (path,)) - f = open(path, 'wb') - f.write(data) - f.close() - try: - if mode: - os.chmod(path, mode) - os.utime(path, (time, time)) - except OSError: - pass - del data - channel.send(("links", None)) - - msg = channel.receive() - while msg != 42: - # we get symlink - _type, relpath, linkpoint = msg - path = os.path.join(destdir, relpath) - try: - remove(path) - except OSError: - pass - if _type == "linkbase": - src = os.path.join(destdir, linkpoint) - else: - assert _type == "link", _type - src = linkpoint - os.symlink(src, path) - msg = channel.receive() - channel.send(("done", None)) - -if __name__ == '__channelexec__': - serve_rsync(channel) # noqa diff --git a/remoto/lib/execnet/script/__init__.py b/remoto/lib/execnet/script/__init__.py deleted file mode 100644 index 792d600..0000000 --- a/remoto/lib/execnet/script/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/remoto/lib/execnet/script/loop_socketserver.py b/remoto/lib/execnet/script/loop_socketserver.py deleted file mode 100644 index 44896b6..0000000 --- a/remoto/lib/execnet/script/loop_socketserver.py +++ /dev/null @@ -1,14 +0,0 @@ - -import os, sys -import subprocess - -if __name__ == '__main__': - directory = os.path.dirname(os.path.abspath(sys.argv[0])) - script = os.path.join(directory, 'socketserver.py') - while 1: - cmdlist = ["python", script] - cmdlist.extend(sys.argv[1:]) - text = "starting subcommand: " + " ".join(cmdlist) - print(text) - process = subprocess.Popen(cmdlist) - process.wait() diff --git a/remoto/lib/execnet/script/quitserver.py b/remoto/lib/execnet/script/quitserver.py deleted file mode 100644 index 5b7ebdb..0000000 --- a/remoto/lib/execnet/script/quitserver.py +++ /dev/null @@ -1,16 +0,0 @@ -""" - - send a "quit" signal to a remote server - -""" - -import sys -import socket - -hostport = sys.argv[1] -host, port = hostport.split(':') -hostport = (host, int(port)) - -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(hostport) -sock.sendall('"raise KeyboardInterrupt"\n') diff --git a/remoto/lib/execnet/script/shell.py b/remoto/lib/execnet/script/shell.py deleted file mode 100755 index ecea167..0000000 --- a/remoto/lib/execnet/script/shell.py +++ /dev/null @@ -1,84 +0,0 @@ -#! /usr/bin/env python -""" -a remote python shell - -for injection into startserver.py -""" -import sys, os, socket, select - -try: - clientsock -except NameError: - print("client side starting") - host, port = sys.argv[1].split(':') - port = int(port) - myself = open(os.path.abspath(sys.argv[0]), 'rU').read() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - sock.sendall(repr(myself)+'\n') - print("send boot string") - inputlist = [ sock, sys.stdin ] - try: - while 1: - r,w,e = select.select(inputlist, [], []) - if sys.stdin in r: - line = raw_input() - sock.sendall(line + '\n') - if sock in r: - line = sock.recv(4096) - sys.stdout.write(line) - sys.stdout.flush() - except: - import traceback - print(traceback.print_exc()) - - sys.exit(1) - -print("server side starting") -# server side -# -from traceback import print_exc -from threading import Thread - -class promptagent(Thread): - def __init__(self, clientsock): - Thread.__init__(self) - self.clientsock = clientsock - - def run(self): - print("Entering thread prompt loop") - clientfile = self.clientsock.makefile('w') - - filein = self.clientsock.makefile('r') - loc = self.clientsock.getsockname() - - while 1: - try: - clientfile.write('%s %s >>> ' % loc) - clientfile.flush() - line = filein.readline() - if len(line)==0: raise EOFError("nothing") - #print >>sys.stderr,"got line: " + line - if line.strip(): - oldout, olderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = clientfile, clientfile - try: - try: - exec(compile(line + '\n','', 'single')) - except: - print_exc() - finally: - sys.stdout=oldout - sys.stderr=olderr - clientfile.flush() - except EOFError: - #e = sys.exc_info()[1] - sys.stderr.write("connection close, prompt thread returns") - break - #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info())) - - self.clientsock.close() - -prompter = promptagent(clientsock) # noqa -prompter.start() -print("promptagent - thread started") diff --git a/remoto/lib/execnet/script/socketserver.py b/remoto/lib/execnet/script/socketserver.py deleted file mode 100755 index 7b0b92a..0000000 --- a/remoto/lib/execnet/script/socketserver.py +++ /dev/null @@ -1,122 +0,0 @@ -#! /usr/bin/env python - -""" - start socket based minimal readline exec server - - it can exeuted in 2 modes of operation - - 1. as normal script, that listens for new connections - - 2. via existing_gateway.remote_exec (as imported module) - -""" -# this part of the program only executes on the server side -# - -progname = 'socket_readline_exec_server-1.2' - -import sys, os - -def get_fcntl(): - try: - import fcntl - except ImportError: - fcntl = None - return fcntl - -fcntl = get_fcntl() - -debug = 0 - -if debug: # and not os.isatty(sys.stdin.fileno()): - f = open('/tmp/execnet-socket-pyout.log', 'w') - old = sys.stdout, sys.stderr - sys.stdout = sys.stderr = f - -def print_(*args): - print(" ".join(str(arg) for arg in args)) - -if sys.version_info > (3, 0): - exec("""def exec_(source, locs): - exec(source, locs)""") -else: - exec("""def exec_(source, locs): - exec source in locs""") - -def exec_from_one_connection(serversock): - print_(progname, 'Entering Accept loop', serversock.getsockname()) - clientsock,address = serversock.accept() - print_(progname, 'got new connection from %s %s' % address) - clientfile = clientsock.makefile('rb') - print_("reading line") - # rstrip so that we can use \r\n for telnet testing - source = clientfile.readline().rstrip() - clientfile.close() - g = {'clientsock' : clientsock, 'address' : address, 'execmodel': execmodel} - source = eval(source) - if source: - co = compile(source+'\n', source, 'exec') - print_(progname, 'compiled source, executing') - try: - exec_(co, g) # noqa - finally: - print_(progname, 'finished executing code') - # background thread might hold a reference to this (!?) - #clientsock.close() - -def bind_and_listen(hostport, execmodel): - socket = execmodel.socket - if isinstance(hostport, str): - host, port = hostport.split(':') - hostport = (host, int(port)) - serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # set close-on-exec - if hasattr(fcntl, 'FD_CLOEXEC'): - old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD) - fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) - # allow the address to be re-used in a reasonable amount of time - if os.name == 'posix' and sys.platform != 'cygwin': - serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - serversock.bind(hostport) - serversock.listen(5) - return serversock - -def startserver(serversock, loop=False): - try: - while 1: - try: - exec_from_one_connection(serversock) - except (KeyboardInterrupt, SystemExit): - raise - except: - if debug: - import traceback - traceback.print_exc() - else: - excinfo = sys.exc_info() - print_("got exception", excinfo[1]) - if not loop: - break - finally: - print_("leaving socketserver execloop") - serversock.shutdown(2) - -if __name__ == '__main__': - import sys - if len(sys.argv)>1: - hostport = sys.argv[1] - else: - hostport = ':8888' - from execnet.gateway_base import get_execmodel - execmodel = get_execmodel("thread") - serversock = bind_and_listen(hostport, execmodel) - startserver(serversock, loop=False) - -elif __name__=='__channelexec__': - execmodel = channel.gateway.execmodel # noqa - bindname = channel.receive() # noqa - sock = bind_and_listen(bindname, execmodel) - port = sock.getsockname() - channel.send(port) # noqa - startserver(sock) diff --git a/remoto/lib/execnet/script/socketserverservice.py b/remoto/lib/execnet/script/socketserverservice.py deleted file mode 100644 index 562083c..0000000 --- a/remoto/lib/execnet/script/socketserverservice.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -A windows service wrapper for the py.execnet socketserver. - -To use, run: - python socketserverservice.py register - net start ExecNetSocketServer -""" - -import sys -import win32serviceutil -import win32service -import win32event -import win32evtlogutil -import servicemanager -import threading -import socketserver - - -appname = 'ExecNetSocketServer' - - -class SocketServerService(win32serviceutil.ServiceFramework): - _svc_name_ = appname - _svc_display_name_ = "%s" % appname - _svc_deps_ = ["EventLog"] - def __init__(self, args): - # The exe-file has messages for the Event Log Viewer. - # Register the exe-file as event source. - # - # Probably it would be better if this is done at installation time, - # so that it also could be removed if the service is uninstalled. - # Unfortunately it cannot be done in the 'if __name__ == "__main__"' - # block below, because the 'frozen' exe-file does not run this code. - # - win32evtlogutil.AddSourceToRegistry(self._svc_display_name_, - servicemanager.__file__, - "Application") - win32serviceutil.ServiceFramework.__init__(self, args) - self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) - self.WAIT_TIME = 1000 # in milliseconds - - - def SvcStop(self): - self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) - win32event.SetEvent(self.hWaitStop) - - - def SvcDoRun(self): - # Redirect stdout and stderr to prevent "IOError: [Errno 9] - # Bad file descriptor". Windows services don't have functional - # output streams. - sys.stdout = sys.stderr = open('nul', 'w') - - # Write a 'started' event to the event log... - win32evtlogutil.ReportEvent(self._svc_display_name_, - servicemanager.PYS_SERVICE_STARTED, - 0, # category - servicemanager.EVENTLOG_INFORMATION_TYPE, - (self._svc_name_, '')) - print("Begin: %s" % (self._svc_display_name_)) - - hostport = ':8888' - print('Starting py.execnet SocketServer on %s' % hostport) - serversock = socketserver.bind_and_listen(hostport) - thread = threading.Thread(target=socketserver.startserver, - args=(serversock,), - kwargs={'loop':True}) - thread.setDaemon(True) - thread.start() - - # wait to be stopped or self.WAIT_TIME to pass - while True: - result = win32event.WaitForSingleObject(self.hWaitStop, - self.WAIT_TIME) - if result == win32event.WAIT_OBJECT_0: - break - - # write a 'stopped' event to the event log. - win32evtlogutil.ReportEvent(self._svc_display_name_, - servicemanager.PYS_SERVICE_STOPPED, - 0, # category - servicemanager.EVENTLOG_INFORMATION_TYPE, - (self._svc_name_, '')) - print("End: %s" % appname) - - -if __name__ == '__main__': - # Note that this code will not be run in the 'frozen' exe-file!!! - win32serviceutil.HandleCommandLine(SocketServerService) diff --git a/remoto/lib/execnet/script/xx.py b/remoto/lib/execnet/script/xx.py deleted file mode 100644 index 931e4b7..0000000 --- a/remoto/lib/execnet/script/xx.py +++ /dev/null @@ -1,9 +0,0 @@ -import rlcompleter2 -rlcompleter2.setup() - -import register, sys -try: - hostport = sys.argv[1] -except: - hostport = ':8888' -gw = register.ServerGateway(hostport) diff --git a/remoto/lib/execnet/xspec.py b/remoto/lib/execnet/xspec.py deleted file mode 100644 index c72f5b6..0000000 --- a/remoto/lib/execnet/xspec.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -(c) 2008-2013, holger krekel -""" -class XSpec: - """ Execution Specification: key1=value1//key2=value2 ... - * keys need to be unique within the specification scope - * neither key nor value are allowed to contain "//" - * keys are not allowed to contain "=" - * keys are not allowed to start with underscore - * if no "=value" is given, assume a boolean True value - """ - # XXX allow customization, for only allow specific key names - popen = ssh = socket = python = chdir = nice = \ - dont_write_bytecode = execmodel = None - - def __init__(self, string): - self._spec = string - self.env = {} - for keyvalue in string.split("//"): - i = keyvalue.find("=") - if i == -1: - key, value = keyvalue, True - else: - key, value = keyvalue[:i], keyvalue[i+1:] - if key[0] == "_": - raise AttributeError("%r not a valid XSpec key" % key) - if key in self.__dict__: - raise ValueError("duplicate key: %r in %r" %(key, string)) - if key.startswith("env:"): - self.env[key[4:]] = value - else: - setattr(self, key, value) - - def __getattr__(self, name): - if name[0] == "_": - raise AttributeError(name) - return None - - def __repr__(self): - return "" %(self._spec,) - def __str__(self): - return self._spec - - def __hash__(self): - return hash(self._spec) - def __eq__(self, other): - return self._spec == getattr(other, '_spec', None) - def __ne__(self, other): - return self._spec != getattr(other, '_spec', None) - - def _samefilesystem(self): - return bool(self.popen and not self.chdir) -