From: Alfredo Deza Date: Wed, 29 Jan 2014 17:40:24 +0000 (-0500) Subject: use newest execnet 1.2.0dev2 X-Git-Tag: 0.0.14~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c0021e90bd1edc9e4a9210baa74aebcf01ab129c;p=remoto.git use newest execnet 1.2.0dev2 Signed-off-by: Alfredo Deza --- diff --git a/remoto/lib/execnet/__init__.py b/remoto/lib/execnet/__init__.py index b9c6840..3fd5695 100644 --- a/remoto/lib/execnet/__init__.py +++ b/remoto/lib/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting to local and remote Python Interpreters. (c) 2012, Holger Krekel and others """ -__version__ = '1.1.1-ad4' +__version__ = '1.2.0.dev2' from . import apipkg @@ -12,6 +12,7 @@ apipkg.initpkg(__name__, { 'SocketGateway': '.deprecated:SocketGateway', 'SshGateway': '.deprecated:SshGateway', 'makegateway': '.multi:makegateway', + 'set_execmodel': '.multi:set_execmodel', 'HostNotFound': '.gateway_bootstrap:HostNotFound', 'RemoteError': '.gateway_base:RemoteError', 'TimeoutError': '.gateway_base:TimeoutError', @@ -26,17 +27,3 @@ apipkg.initpkg(__name__, { 'dump': '.gateway_base:dump', 'DataFormatError': '.gateway_base:DataFormatError', }) - -# CHANGELOG -# -# 1.1.1-ad4: Use the newest execnet, includes SSH options -# -# 1.1-ad3: Catch more `TypeError` if the connection is closing but the channel attempts -# to write. We now check is `struct.pack` is not None to proceed. -# Issue: https://bitbucket.org/hpk42/execnet/issue/22/structpack-can-be-none-sometimes-spits -# -# 1.1-ad2: Allow for `sudo python` on local popen gateways -# Issue: https://bitbucket.org/hpk42/execnet/issue/21/support-sudo-on-local-popen -# -# 1.1-ad1: `if case` to check if `Message` is None and avoid AttributeErrors -# Issue: https://bitbucket.org/hpk42/execnet/issue/20/attributeerror-nonetype-object-has-no diff --git a/remoto/lib/execnet/gateway.py b/remoto/lib/execnet/gateway.py index 1528280..e1d0e07 100644 --- a/remoto/lib/execnet/gateway.py +++ b/remoto/lib/execnet/gateway.py @@ -1,13 +1,12 @@ """ gateway code for initiating popen, socket and ssh connections. -(c) 2004-2009, Holger Krekel and others +(c) 2004-2013, Holger Krekel and others """ import sys, os, inspect, types, linecache import textwrap import execnet from execnet.gateway_base import Message -from execnet.gateway_io import Popen2IOMaster from execnet import gateway_base importdir = os.path.dirname(os.path.dirname(execnet.__file__)) @@ -31,8 +30,8 @@ class Gateway(gateway_base.BaseGateway): except AttributeError: r = "uninitialized" i = "no" - return "<%s id=%r %s, %s active channels>" %( - self.__class__.__name__, self.id, r, i) + return "<%s id=%r %s, %s model, %s active channels>" %( + self.__class__.__name__, self.id, r, self.execmodel.backend, i) def exit(self): """ trigger gateway exit. Defer waiting for finishing @@ -44,11 +43,12 @@ class Gateway(gateway_base.BaseGateway): self._trace("gateway already unregistered with group") return self._group._unregister(self) - self._trace("--> sending GATEWAY_TERMINATE") try: + self._trace("--> sending GATEWAY_TERMINATE") self._send(Message.GATEWAY_TERMINATE) + self._trace("--> io.close_write") self._io.close_write() - except IOError: + except (ValueError, EOFError, IOError): v = sys.exc_info()[1] self._trace("io-error: could not send termination sequence") self._trace(" exception: %r" % v) @@ -73,7 +73,7 @@ class Gateway(gateway_base.BaseGateway): def hasreceiver(self): """ return True if gateway is able to receive data. """ - return self._receiverthread.isAlive() # approxmimation + return self._receiverthread.running # approxmimation def remote_status(self): """ return information object about remote execution status. """ @@ -121,18 +121,8 @@ class Gateway(gateway_base.BaseGateway): return channel def remote_init_threads(self, num=None): - """ start up to 'num' threads for subsequent - remote_exec() invocations to allow concurrent - execution. - """ - if hasattr(self, '_remotechannelthread'): - raise IOError("remote threads already running") - from execnet import threadpool - source = inspect.getsource(threadpool) - self._remotechannelthread = self.remote_exec(source) - self._remotechannelthread.send(num) - status = self._remotechannelthread.receive() - assert status == "ok", status + """ DEPRECATED. Is currently a NO-OPERATION already.""" + print ("WARNING: remote_init_threads() is a no-operation in execnet-1.2") class RInfo: def __init__(self, kwargs): diff --git a/remoto/lib/execnet/gateway_base.py b/remoto/lib/execnet/gateway_base.py index f754e94..30e6e1a 100644 --- a/remoto/lib/execnet/gateway_base.py +++ b/remoto/lib/execnet/gateway_base.py @@ -3,17 +3,18 @@ base execnet gateway code send to the other side for bootstrapping. NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython -(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, and others +(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others """ +from __future__ import with_statement import sys, os, weakref -import threading, traceback, struct +import traceback, struct # NOTE that we want to avoid try/except style importing # to avoid setting sys.exc_info() during import +# ISPY3 = sys.version_info >= (3, 0) if ISPY3: - import queue from io import BytesIO exec("def do_exec(co, loc): exec(co, loc)\n" "def reraise(cls, val, tb): raise val\n") @@ -21,7 +22,6 @@ if ISPY3: _long_type = int from _thread import interrupt_main else: - import Queue as queue from StringIO import StringIO as BytesIO exec("def do_exec(co, loc): exec co in loc\n" "def reraise(cls, val, tb): raise cls, val, tb\n") @@ -32,6 +32,255 @@ else: except ImportError: interrupt_main = None +class EmptySemaphore: + acquire = release = lambda self: None + +def get_execmodel(backend): + if hasattr(backend, "backend"): + return backend + if backend == "thread": + importdef = { + 'get_ident': ['thread::get_ident', '_thread::get_ident'], + '_start_new_thread': ['thread::start_new_thread', + '_thread::start_new_thread'], + 'threading': ["threading",], + 'queue': ["queue", "Queue"], + 'sleep': ['time::sleep'], + 'subprocess': ['subprocess'], + 'socket': ['socket'], + '_fdopen': ['os::fdopen'], + '_lock': ['threading'], + '_event': ['threading'], + } + def exec_start(self, func, args=()): + self._start_new_thread(func, args) + + elif backend == "eventlet": + importdef = { + 'get_ident': ['eventlet.green.thread::get_ident'], + '_spawn_n': ['eventlet::spawn_n'], + 'threading': ['eventlet.green.threading'], + 'queue': ["eventlet.queue"], + 'sleep': ['eventlet::sleep'], + 'subprocess': ['eventlet.green.subprocess'], + 'socket': ['eventlet.green.socket'], + '_fdopen': ['eventlet.green.os::fdopen'], + '_lock': ['eventlet.green.threading'], + '_event': ['eventlet.green.threading'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + elif backend == "gevent": + importdef = { + 'get_ident': ['gevent.thread::get_ident'], + '_spawn_n': ['gevent::spawn'], + 'threading': ['threading'], + 'queue': ["gevent.queue"], + 'sleep': ['gevent::sleep'], + 'subprocess': ['gevent.subprocess'], + 'socket': ['gevent.socket'], + # XXX + '_fdopen': ['gevent.fileobject::FileObjectThread'], + '_lock': ['gevent.lock'], + '_event': ['gevent.event'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + else: + raise ValueError("unknown execmodel %r" %(backend,)) + + class ExecModel: + def __init__(self, name): + self._importdef = importdef + self.backend = name + self._count = 0 + + def __repr__(self): + return "" % self.backend + + def __getattr__(self, name): + locs = self._importdef.get(name) + if locs is None: + raise AttributeError(name) + for loc in locs: + parts = loc.split("::") + loc = parts.pop(0) + try: + mod = __import__(loc, None, None, "__doc__") + except ImportError: + pass + else: + if parts: + mod = getattr(mod, parts[0]) + setattr(self, name, mod) + return mod + raise AttributeError(name) + + start = exec_start + + def fdopen(self, fd, mode, bufsize=1): + return self._fdopen(fd, mode, bufsize) + + def WorkerPool(self, size=None, hasprimary=False): + return WorkerPool(self, size, hasprimary=hasprimary) + + def Semaphore(self, size=None): + if size is None: + return EmptySemaphore() + return self._lock.Semaphore(size) + + def Lock(self): + return self._lock.RLock() + + def RLock(self): + return self._lock.RLock() + + def Event(self): + event = self._event.Event() + if sys.version_info < (2,7): + # patch wait function to return event state instead of None + real_wait = event.wait + def wait(timeout=None): + real_wait(timeout=timeout) + return event.isSet() + event.wait = wait + return event + + def PopenPiped(self, args): + PIPE = self.subprocess.PIPE + return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) + + + return ExecModel(backend) + + +class Reply(object): + """ reply instances provide access to the result + of a function execution that got dispatched + through WorkerPool.spawn() + """ + def __init__(self, task, threadmodel): + self.task = task + self._result_ready = threadmodel.Event() + self.running = True + + def get(self, timeout=None): + """ get the result object from an asynchronous function execution. + if the function execution raised an exception, + then calling get() will reraise that exception + including its traceback. + """ + self.waitfinish(timeout) + try: + return self._result + except AttributeError: + reraise(*(self._excinfo[:3])) # noqa + + def waitfinish(self, timeout=None): + if not self._result_ready.wait(timeout): + raise IOError("timeout waiting for %r" %(self.task, )) + + def run(self): + func, args, kwargs = self.task + try: + try: + self._result = func(*args, **kwargs) + except: + self._excinfo = sys.exc_info() + finally: + self._result_ready.set() + self.running = False + + +class WorkerPool(object): + """ A WorkerPool allows to spawn function executions + to threads, returning a reply object on which you + can ask for the result (and get exceptions reraised) + """ + def __init__(self, execmodel, size=None, hasprimary=False): + """ by default allow unlimited number of spawns. """ + self.execmodel = execmodel + self._size = size + self._running_lock = self.execmodel.Lock() + self._sem = self.execmodel.Semaphore(size) + self._running = set() + self._shutdown_event = self.execmodel.Event() + if hasprimary: + if self.execmodel.backend != "thread": + raise ValueError("hasprimary=True requires thread model") + self._primary_thread_event = self.execmodel.Event() + + def integrate_as_primary_thread(self): + """ integrate the thread with which we are called as a primary + thread to dispatch to when spawn is called. + """ + assert self.execmodel.backend == "thread", self.execmodel + # XXX insert check if we really are in the main thread + primary_thread_event = self._primary_thread_event + # interacts with code at REF1 + while not self._shutdown_event.isSet(): + primary_thread_event.wait() + func, args, kwargs = self._primary_thread_task + if func is None: # waitall() woke us up to finish the loop + break + func(*args, **kwargs) + primary_thread_event.clear() + + def shutdown(self): + self._shutdown_event.set() + + def wait_for_shutdown(self, timeout=None): + return self._shutdown_event.wait(timeout=timeout) + + def active_count(self): + return len(self._running) + + def spawn(self, func, *args, **kwargs): + """ return Reply object for the asynchronous dispatch + of the given func(*args, **kwargs). + """ + reply = Reply((func, args, kwargs), self.execmodel) + def run_and_release(): + reply.run() + with self._running_lock: + self._running.remove(reply) + self._sem.release() + if not self._running: + try: + self._waitall_event.set() + except AttributeError: + pass + self._sem.acquire() + with self._running_lock: + self._running.add(reply) + # REF1 in 'thread' model we give priority to running in main thread + primary_thread_event = getattr(self, "_primary_thread_event", None) + if primary_thread_event is not None: + if not primary_thread_event.isSet(): + self._primary_thread_task = run_and_release, (), {} + primary_thread_event.set() + return reply + self.execmodel.start(run_and_release, ()) + return reply + + def waitall(self, timeout=None): + """ wait until all previosuly spawns have terminated. """ + # if it exists signal primary_thread to finish its loop + self._primary_thread_task = None, None, None + try: + self._primary_thread_event.set() + except AttributeError: + pass + with self._running_lock: + if not self._running: + return True + # if a Reply still runs, we let run_and_release + # signal us -- note that we are still holding the + # _running_lock to avoid race conditions + self._waitall_event = self.execmodel.Event() + return self._waitall_event.wait(timeout=timeout) + + sysex = (KeyboardInterrupt, SystemExit) @@ -68,7 +317,7 @@ else: class Popen2IO: error = (IOError, OSError, EOFError) - def __init__(self, outfile, infile): + def __init__(self, outfile, infile, execmodel): # we need raw byte streams self.outfile, self.infile = outfile, infile if sys.platform == "win32": @@ -80,6 +329,7 @@ class Popen2IO: pass self._read = getattr(infile, "buffer", infile).read self._write = getattr(outfile, "buffer", outfile).write + self.execmodel = execmodel def read(self, numbytes): """Read exactly 'numbytes' bytes from the pipe. """ @@ -117,6 +367,8 @@ class Message: def from_io(io): try: header = io.read(9) # type 1, channel 4, payload 4 + if not header: + raise EOFError("empty read") except EOFError: e = sys.exc_info()[1] raise EOFError('couldnt load message header, ' + e.args[0]) @@ -124,9 +376,9 @@ class Message: return Message(msgtype, channel, io.read(payload)) def to_io(self, io): - if struct.pack is not None: - header = struct.pack('!bii', self.msgcode, self.channelid, len(self.data)) - io.write(header+self.data) + header = struct.pack('!bii', self.msgcode, self.channelid, + len(self.data)) + io.write(header+self.data) def received(self, gateway): self._types[self.msgcode](self, gateway) @@ -153,20 +405,20 @@ class Message: return "" %(name, self.channelid, r) +class GatewayReceivedTerminate(Exception): + """ Receiverthread got termination message. """ + def _setupmessages(): def status(message, gateway): # we use the channelid to send back information # but don't instantiate a channel object - active_channels = gateway._channelfactory.channels() - numexec = 0 - for ch in active_channels: - if getattr(ch, '_executing', False): - numexec += 1 - d = {'execqsize': gateway._execqueue.qsize(), - 'numchannels': len(active_channels), - 'numexecuting': numexec + d = {'numchannels': len(gateway._channelfactory._channels), + 'numexecuting': gateway._execpool.active_count(), + 'execmodel': gateway.execmodel.backend, } - gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d)) + gateway._send(Message.CHANNEL_DATA, message.channelid, + dumps_internal(d)) + gateway._send(Message.CHANNEL_CLOSE, message.channelid) def channel_exec(message, gateway): channel = gateway._channelfactory.new(message.channelid) @@ -186,8 +438,7 @@ def _setupmessages(): gateway._channelfactory._local_close(message.channelid, sendonly=True) def gateway_terminate(message, gateway): - gateway._terminate_execution() - raise SystemExit(0) + raise GatewayReceivedTerminate(gateway) def reconfigure(message, gateway): if message.channelid == 0: @@ -234,7 +485,8 @@ class RemoteError(Exception): def warn(self): if self.formatted != INTERRUPT_TEXT: # XXX do this better - sys.stderr.write("Warning: unhandled %r\n" % (self,)) + sys.stderr.write("[%s] Warning: unhandled %r\n" + % (os.getpid(), self,)) class TimeoutError(IOError): """ Exception indicating that a timeout was reached. """ @@ -255,9 +507,9 @@ class Channel(object): #XXX: defaults copied from Unserializer self._strconfig = getattr(gateway, '_strconfig', (True, False)) self.id = id - self._items = queue.Queue() + self._items = self.gateway.execmodel.queue.Queue() self._closed = False - self._receiveclosed = threading.Event() + self._receiveclosed = self.gateway.execmodel.Event() self._remoteerrors = [] def _trace(self, *msg): @@ -274,9 +526,7 @@ class Channel(object): be called with the endmarker when the channel closes. """ _callbacks = self.gateway._channelfactory._callbacks - _receivelock = self.gateway._receivelock - _receivelock.acquire() - try: + with self.gateway._receivelock: if self._items is None: raise IOError("%r has callback already registered" %(self,)) items = self._items @@ -284,7 +534,7 @@ class Channel(object): while 1: try: olditem = items.get(block=False) - except queue.Empty: + except self.gateway.execmodel.queue.Empty: if not (self._closed or self._receiveclosed.isSet()): _callbacks[self.id] = ( callback, @@ -300,8 +550,6 @@ class Channel(object): break else: callback(olditem) - finally: - _receivelock.release() def __repr__(self): flag = self.isclosed() and "closed" or "open" @@ -321,8 +569,12 @@ class Channel(object): # the remote channel is already in "deleted" state, nothing to do pass else: + # state transition "opened" --> "deleted" + # check if we are in the middle of interpreter shutdown + # in which case the process will go away and we probably + # don't need to try to send a closing or last message + # (and often it won't work anymore to send things out) if Message is not None: - # state transition "opened" --> "deleted" if self._items is None: # has_callback msgcode = Message.CHANNEL_LAST_MESSAGE else: @@ -365,7 +617,8 @@ class Channel(object): def close(self, error=None): """ close down this channel with an optional error message. Note that closing of a channel tied to remote_exec happens - automatically at the end of execution and cannot be done explicitely. + automatically at the end of execution and cannot + be done explicitely. """ if self._executing: raise IOError("cannot explicitly close channel within remote_exec") @@ -380,7 +633,8 @@ class Channel(object): if not self._receiveclosed.isSet(): put = self.gateway._send if error is not None: - put(Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error)) + put(Message.CHANNEL_CLOSE_ERROR, self.id, + dumps_internal(error)) else: put(Message.CHANNEL_CLOSE, self.id) self._trace("sent channel close message") @@ -436,7 +690,7 @@ class Channel(object): raise IOError("cannot receive(), channel has receiver callback") try: x = itemqueue.get(timeout=timeout) - except queue.Empty: + except self.gateway.execmodel.queue.Empty: raise self.TimeoutError("no item after %r seconds" %(timeout)) if x is ENDMARKER: itemqueue.put(x) # for other receivers @@ -472,7 +726,7 @@ class ChannelFactory(object): def __init__(self, gateway, startcount=1): self._channels = weakref.WeakValueDictionary() self._callbacks = {} - self._writelock = threading.Lock() + self._writelock = gateway.execmodel.Lock() self.gateway = gateway self.count = startcount self.finished = False @@ -480,8 +734,7 @@ class ChannelFactory(object): def new(self, id=None): """ create a new Channel with 'id' (or create new id if None). """ - self._writelock.acquire() - try: + with self._writelock: if self.finished: raise IOError("connexion already closed: %s" % (self.gateway,)) if id is None: @@ -492,8 +745,6 @@ class ChannelFactory(object): except KeyError: channel = self._channels[id] = Channel(self.gateway, id) return channel - finally: - self._writelock.release() def channels(self): return self._list(self._channels.values()) @@ -535,35 +786,33 @@ class ChannelFactory(object): def _local_receive(self, id, data): # executes in receiver thread + channel = self._channels.get(id) try: - callback, endmarker, strconfig= self._callbacks[id] - channel = self._channels.get(id) + callback, endmarker, strconfig = self._callbacks[id] except KeyError: - channel = self._channels.get(id) queue = channel and channel._items if queue is None: pass # drop data else: - queue.put(loads_internal(data, channel)) + item = loads_internal(data, channel) + queue.put(item) else: try: data = loads_internal(data, channel, strconfig) callback(data) # even if channel may be already closed - except KeyboardInterrupt: - raise - except: + except Exception: excinfo = sys.exc_info() - self.gateway._trace("exception during callback: %s" % excinfo[1]) + self.gateway._trace("exception during callback: %s" % + excinfo[1]) errortext = self.gateway._geterrortext(excinfo) - self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext)) + self.gateway._send(Message.CHANNEL_CLOSE_ERROR, + id, dumps_internal(errortext)) self._local_close(id, errortext) def _finished_receiving(self): - self._writelock.acquire() - try: + self.gateway._trace("finished receiving") + with self._writelock: self.finished = True - finally: - self._writelock.release() for id in self._list(self._channels): self._local_close(id, sendonly=True) for id in self._list(self._callbacks): @@ -632,64 +881,60 @@ class BaseGateway(object): _sysex = sysex id = "" - class _StopExecLoop(Exception): - pass - def __init__(self, io, id, _startcount=2): + self.execmodel = io.execmodel self._io = io self.id = id - self._strconfig = Unserializer.py2str_as_py3str, Unserializer.py3str_as_py2str + self._strconfig = (Unserializer.py2str_as_py3str, + Unserializer.py3str_as_py2str) self._channelfactory = ChannelFactory(self, _startcount) - self._receivelock = threading.RLock() + self._receivelock = self.execmodel.RLock() # globals may be NONE at process-termination self.__trace = trace self._geterrortext = geterrortext + self._receivepool = self.execmodel.WorkerPool(1) def _trace(self, *msg): self.__trace(self.id, *msg) def _initreceive(self): - self._receiverthread = threading.Thread(name="receiver", - target=self._thread_receiver) - self._receiverthread.setDaemon(1) - self._receiverthread.start() + self._receiverthread = self._receivepool.spawn(self._thread_receiver) def _thread_receiver(self): - self._trace("RECEIVERTHREAD: starting to run") - eof = False + def log(*msg): + self._trace("[receiver-thread]", *msg) + + log("RECEIVERTHREAD: starting to run") io = self._io try: try: while 1: msg = Message.from_io(io) - self._trace("received", msg) - _receivelock = self._receivelock - _receivelock.acquire() - try: + log("received", msg) + with self._receivelock: msg.received(self) del msg - finally: - _receivelock.release() - except self._sysex: - self._trace("RECEIVERTHREAD: doing io.close_read()") - self._io.close_read() + except (KeyboardInterrupt, GatewayReceivedTerminate): + pass except EOFError: - self._trace("RECEIVERTHREAD: got EOFError") - self._trace("RECEIVERTHREAD: traceback was: ", - self._geterrortext(self.exc_info())) + log("EOF without prior gateway termination message") self._error = self.exc_info()[1] - eof = True - except: - self._trace("RECEIVERTHREAD", self._geterrortext(self.exc_info())) + except Exception: + log(self._geterrortext(self.exc_info())) finally: try: - self._trace('RECEIVERTHREAD', 'entering finalization') - if eof: - self._terminate_execution() + log('entering finalization') + # wake up and terminate any execution waiting to receive self._channelfactory._finished_receiving() - self._trace('RECEIVERTHREAD', 'leaving finalization') - except: - pass # XXX be silent at interp-shutdown + log('terminating execution') + self._terminate_execution() + log('closing read') + self._io.close_read() + log('closing write') + self._io.close_write() + log('leaving finalization') + except: # be silent at shutdown + pass def _terminate_execution(self): pass @@ -702,8 +947,8 @@ class BaseGateway(object): except (IOError, ValueError): e = sys.exc_info()[1] self._trace('failed to send', message, e) - raise - + # ValueError might be because the IO is already closed + raise IOError("cannot send (already closed?)") def _local_schedulexec(self, channel, sourcetask): channel.close("execution disallowed") @@ -719,55 +964,56 @@ class BaseGateway(object): def join(self, timeout=None): """ Wait for receiverthread to terminate. """ - current = threading.currentThread() - if self._receiverthread.isAlive(): - self._trace("joining receiver thread") - self._receiverthread.join(timeout) - else: - self._trace("gateway.join() called while receiverthread " - "already finished") + self._trace("waiting for receiver thread to finish") + self._receiverthread.waitfinish() class SlaveGateway(BaseGateway): + def _local_schedulexec(self, channel, sourcetask): sourcetask = loads_internal(sourcetask) - self._execqueue.put((channel, sourcetask)) + self._execpool.spawn(self.executetask, ((channel, sourcetask))) def _terminate_execution(self): # called from receiverthread - self._trace("putting None to execqueue") - self._execqueue.put(None) - if interrupt_main: - self._trace("calling interrupt_main()") - interrupt_main() - self._execfinished.wait(10.0) - if not self._execfinished.isSet(): - self._trace("execution did not finish in 10 secs, calling os._exit()") - os._exit(1) - - def serve(self, joining=True): + self._trace("shutting down execution pool") + self._execpool.shutdown() + if not self._execpool.waitall(5.0): + self._trace("execution ongoing after 5 secs, trying interrupt_main") + # We try hard to terminate execution based on the assumption + # that there is only one gateway object running per-process. + if sys.platform != "win32": + self._trace("sending ourselves a SIGINT") + os.kill(os.getpid(), 2) # send ourselves a SIGINT + elif interrupt_main is not None: + self._trace("calling interrupt_main()") + interrupt_main() + if not self._execpool.waitall(10.0): + self._trace("execution did not finish in 10 secs, " + "calling os._exit()") + os._exit(1) + + def serve(self): + trace = lambda msg: self._trace("[serve] " + msg) + hasprimary = self.execmodel.backend == "thread" + self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) + trace("spawning receiver thread") + self._initreceive() try: try: - self._execqueue = queue.Queue() - self._execfinished = threading.Event() - self._initreceive() - while 1: - item = self._execqueue.get() - if item is None: - break - try: - self.executetask(item) - except self._StopExecLoop: - break + if hasprimary: + trace("integrating as main primary exec thread") + self._execpool.integrate_as_primary_thread() + else: + trace("waiting for execution to finish") + self._execpool.wait_for_shutdown() finally: - self._execfinished.set() - self._trace("io.close_write()") - self._io.close_write() - self._trace("slavegateway.serve finished") - if joining: - self.join() + trace("execution finished") + + trace("joining receiver thread") + self.join() except KeyboardInterrupt: # in the slave we can't really do anything sensible - self._trace("swallowing keyboardinterrupt in main-thread") + trace("swallowing keyboardinterrupt, serve finished") def executetask(self, item): try: @@ -787,7 +1033,7 @@ class SlaveGateway(BaseGateway): channel._executing = True try: co = compile(source+'\n', '', 'exec') - do_exec(co, loc) + do_exec(co, loc) # noqa if call_name: self._trace('calling %s(**%60r)' % (call_name, kwargs)) function = loc[call_name] @@ -795,19 +1041,19 @@ class SlaveGateway(BaseGateway): finally: channel._executing = False self._trace("execution finished") - except self._StopExecLoop: - channel.close() - raise except KeyboardInterrupt: channel.close(INTERRUPT_TEXT) raise except: excinfo = self.exc_info() - self._trace("got exception: %s" % (excinfo[1],)) - errortext = self._geterrortext(excinfo) - channel.close(errortext) - else: - channel.close() + if not isinstance(excinfo[1], EOFError): + if not channel.gateway._channelfactory.finished: + self._trace("got exception: %r" % (excinfo[1],)) + errortext = self._geterrortext(excinfo) + channel.close(errortext) + return + self._trace("ignoring EOFError because receiving finished") + channel.close() # # Cross-Python pickling code, tested from test_serializer.py @@ -1180,9 +1426,9 @@ class _Serializer(object): self._write(opcode.CHANNEL) self._write_int4(channel.id) -def init_popen_io(): +def init_popen_io(execmodel): if not hasattr(os, 'dup'): # jython - io = Popen2IO(sys.stdout, sys.stdin) + io = Popen2IO(sys.stdout, sys.stdin, execmodel) import tempfile sys.stdin = tempfile.TemporaryFile('r') sys.stdout = tempfile.TemporaryFile('w') @@ -1195,24 +1441,24 @@ def init_popen_io(): else: devnull = '/dev/null' # stdin - stdin = os.fdopen(os.dup(0), 'r', 1) + stdin = execmodel.fdopen(os.dup(0), 'r', 1) fd = os.open(devnull, os.O_RDONLY) os.dup2(fd, 0) os.close(fd) # stdout - stdout = os.fdopen(os.dup(1), 'w', 1) + stdout = execmodel.fdopen(os.dup(1), 'w', 1) fd = os.open(devnull, os.O_WRONLY) os.dup2(fd, 1) # stderr for win32 if os.name == 'nt': - sys.stderr = os.fdopen(os.dup(2), 'w', 1) + sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1) os.dup2(fd, 2) os.close(fd) - io = Popen2IO(stdout, stdin) - sys.stdin = os.fdopen(0, 'r', 1) - sys.stdout = os.fdopen(1, 'w', 1) + io = Popen2IO(stdout, stdin, execmodel) + sys.stdin = execmodel.fdopen(0, 'r', 1) + sys.stdout = execmodel.fdopen(1, 'w', 1) return io def serve(io, id): diff --git a/remoto/lib/execnet/gateway_base.py.orig b/remoto/lib/execnet/gateway_base.py.orig new file mode 100644 index 0000000..9dc8dcd --- /dev/null +++ b/remoto/lib/execnet/gateway_base.py.orig @@ -0,0 +1,1474 @@ +""" +base execnet gateway code send to the other side for bootstrapping. + +NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython + +(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others +""" +from __future__ import with_statement +import sys, os, weakref +import traceback, struct + +# NOTE that we want to avoid try/except style importing +# to avoid setting sys.exc_info() during import +# + +ISPY3 = sys.version_info >= (3, 0) +if ISPY3: + from io import BytesIO + exec("def do_exec(co, loc): exec(co, loc)\n" + "def reraise(cls, val, tb): raise val\n") + unicode = str + _long_type = int + from _thread import interrupt_main +else: + from StringIO import StringIO as BytesIO + exec("def do_exec(co, loc): exec co in loc\n" + "def reraise(cls, val, tb): raise cls, val, tb\n") + bytes = str + _long_type = long + try: + from thread import interrupt_main + except ImportError: + interrupt_main = None + +class EmptySemaphore: + acquire = release = lambda self: None + +def get_execmodel(backend): + if hasattr(backend, "backend"): + return backend + if backend == "thread": + importdef = { + 'get_ident': ['thread::get_ident', '_thread::get_ident'], + '_start_new_thread': ['thread::start_new_thread', + '_thread::start_new_thread'], + 'threading': ["threading",], + 'queue': ["queue", "Queue"], + 'sleep': ['time::sleep'], + 'subprocess': ['subprocess'], + 'socket': ['socket'], + '_fdopen': ['os::fdopen'], + '_lock': ['threading'], + '_event': ['threading'], + } + def exec_start(self, func, args=()): + self._start_new_thread(func, args) + + elif backend == "eventlet": + importdef = { + 'get_ident': ['eventlet.green.thread::get_ident'], + '_spawn_n': ['eventlet::spawn_n'], + 'threading': ['eventlet.green.threading'], + 'queue': ["eventlet.queue"], + 'sleep': ['eventlet::sleep'], + 'subprocess': ['eventlet.green.subprocess'], + 'socket': ['eventlet.green.socket'], + '_fdopen': ['eventlet.green.os::fdopen'], + '_lock': ['eventlet.green.threading'], + '_event': ['eventlet.green.threading'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + elif backend == "gevent": + importdef = { + 'get_ident': ['gevent.thread::get_ident'], + '_spawn_n': ['gevent::spawn'], + 'threading': ['threading'], + 'queue': ["gevent.queue"], + 'sleep': ['gevent::sleep'], + 'subprocess': ['gevent.subprocess'], + 'socket': ['gevent.socket'], + # XXX + '_fdopen': ['gevent.fileobject::FileObjectThread'], + '_lock': ['gevent.lock'], + '_event': ['gevent.event'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + else: + raise ValueError("unknown execmodel %r" %(backend,)) + + class ExecModel: + def __init__(self, name): + self._importdef = importdef + self.backend = name + self._count = 0 + + def __repr__(self): + return "" % self.backend + + def __getattr__(self, name): + locs = self._importdef.get(name) + if locs is None: + raise AttributeError(name) + for loc in locs: + parts = loc.split("::") + loc = parts.pop(0) + try: + mod = __import__(loc, None, None, "__doc__") + except ImportError: + pass + else: + if parts: + mod = getattr(mod, parts[0]) + setattr(self, name, mod) + return mod + raise AttributeError(name) + + start = exec_start + + def fdopen(self, fd, mode, bufsize=1): + return self._fdopen(fd, mode, bufsize) + + def WorkerPool(self, size=None, hasprimary=False): + return WorkerPool(self, size, hasprimary=hasprimary) + + def Semaphore(self, size=None): + if size is None: + return EmptySemaphore() + return self._lock.Semaphore(size) + + def Lock(self): + return self._lock.RLock() + + def RLock(self): + return self._lock.RLock() + + def Event(self): + event = self._event.Event() + if sys.version_info < (2,7): + # patch wait function to return event state instead of None + real_wait = event.wait + def wait(timeout=None): + real_wait(timeout=timeout) + return event.isSet() + event.wait = wait + return event + + def PopenPiped(self, args): + PIPE = self.subprocess.PIPE + return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) + + + return ExecModel(backend) + + +class Reply(object): + """ reply instances provide access to the result + of a function execution that got dispatched + through WorkerPool.spawn() + """ + def __init__(self, task, threadmodel): + self.task = task + self._result_ready = threadmodel.Event() + self.running = True + + def get(self, timeout=None): + """ get the result object from an asynchronous function execution. + if the function execution raised an exception, + then calling get() will reraise that exception + including its traceback. + """ + self.waitfinish(timeout) + try: + return self._result + except AttributeError: + reraise(*(self._excinfo[:3])) # noqa + + def waitfinish(self, timeout=None): + if not self._result_ready.wait(timeout): + raise IOError("timeout waiting for %r" %(self.task, )) + + def run(self): + func, args, kwargs = self.task + try: + try: + self._result = func(*args, **kwargs) + except: + self._excinfo = sys.exc_info() + finally: + self._result_ready.set() + self.running = False + + +class WorkerPool(object): + """ A WorkerPool allows to spawn function executions + to threads, returning a reply object on which you + can ask for the result (and get exceptions reraised) + """ + def __init__(self, execmodel, size=None, hasprimary=False): + """ by default allow unlimited number of spawns. """ + self.execmodel = execmodel + self._size = size + self._running_lock = self.execmodel.Lock() + self._sem = self.execmodel.Semaphore(size) + self._running = set() + self._shutdown_event = self.execmodel.Event() + if hasprimary: + if self.execmodel.backend != "thread": + raise ValueError("hasprimary=True requires thread model") + self._primary_thread_event = self.execmodel.Event() + + def integrate_as_primary_thread(self): + """ integrate the thread with which we are called as a primary + thread to dispatch to when spawn is called. + """ + assert self.execmodel.backend == "thread", self.execmodel + # XXX insert check if we really are in the main thread + primary_thread_event = self._primary_thread_event + # interacts with code at REF1 + while not self._shutdown_event.isSet(): + primary_thread_event.wait() + func, args, kwargs = self._primary_thread_task + if func is None: # waitall() woke us up to finish the loop + break + func(*args, **kwargs) + primary_thread_event.clear() + + def shutdown(self): + self._shutdown_event.set() + + def wait_for_shutdown(self, timeout=None): + return self._shutdown_event.wait(timeout=timeout) + + def active_count(self): + return len(self._running) + + def spawn(self, func, *args, **kwargs): + """ return Reply object for the asynchronous dispatch + of the given func(*args, **kwargs). + """ + reply = Reply((func, args, kwargs), self.execmodel) + def run_and_release(): + reply.run() + with self._running_lock: + self._running.remove(reply) + self._sem.release() + if not self._running: + try: + self._waitall_event.set() + except AttributeError: + pass + self._sem.acquire() + with self._running_lock: + self._running.add(reply) + # REF1 in 'thread' model we give priority to running in main thread + primary_thread_event = getattr(self, "_primary_thread_event", None) + if primary_thread_event is not None: + if not primary_thread_event.isSet(): + self._primary_thread_task = run_and_release, (), {} + primary_thread_event.set() + return reply + self.execmodel.start(run_and_release, ()) + return reply + + def waitall(self, timeout=None): + """ wait until all previosuly spawns have terminated. """ + # if it exists signal primary_thread to finish its loop + self._primary_thread_task = None, None, None + try: + self._primary_thread_event.set() + except AttributeError: + pass + with self._running_lock: + if not self._running: + return True + # if a Reply still runs, we let run_and_release + # signal us -- note that we are still holding the + # _running_lock to avoid race conditions + self._waitall_event = self.execmodel.Event() + return self._waitall_event.wait(timeout=timeout) + + +sysex = (KeyboardInterrupt, SystemExit) + + +DEBUG = os.environ.get('EXECNET_DEBUG') +pid = os.getpid() +if DEBUG == '2': + def trace(*msg): + try: + line = " ".join(map(str, msg)) + sys.stderr.write("[%s] %s\n" % (pid, line)) + sys.stderr.flush() + except Exception: + pass # nothing we can do, likely interpreter-shutdown +elif DEBUG: + import tempfile, os.path + fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) + #sys.stderr.write("execnet-debug at %r" %(fn,)) + debugfile = open(fn, 'w') + def trace(*msg): + try: + line = " ".join(map(str, msg)) + debugfile.write(line + "\n") + debugfile.flush() + except Exception: + try: + v = sys.exc_info()[1] + sys.stderr.write( + "[%s] exception during tracing: %r\n" % (pid, v)) + except Exception: + pass # nothing we can do, likely interpreter-shutdown +else: + notrace = trace = lambda *msg: None + +class Popen2IO: + error = (IOError, OSError, EOFError) + + def __init__(self, outfile, infile, execmodel): + # we need raw byte streams + self.outfile, self.infile = outfile, infile + if sys.platform == "win32": + import msvcrt + try: + msvcrt.setmode(infile.fileno(), os.O_BINARY) + msvcrt.setmode(outfile.fileno(), os.O_BINARY) + except (AttributeError, IOError): + pass + self._read = getattr(infile, "buffer", infile).read + self._write = getattr(outfile, "buffer", outfile).write + self.execmodel = execmodel + + def read(self, numbytes): + """Read exactly 'numbytes' bytes from the pipe. """ + # a file in non-blocking mode may return less bytes, so we loop + buf = bytes() + while numbytes > len(buf): + data = self._read(numbytes-len(buf)) + if not data: + raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) + buf += data + return buf + + def write(self, data): + """write out all data bytes. """ + assert isinstance(data, bytes) + self._write(data) + self.outfile.flush() + + def close_read(self): + self.infile.close() + + def close_write(self): + self.outfile.close() + +class Message: + """ encapsulates Messages and their wire protocol. """ + _types = [] + + def __init__(self, msgcode, channelid=0, data=''): + self.msgcode = msgcode + self.channelid = channelid + self.data = data + + @staticmethod + def from_io(io): + try: + header = io.read(9) # type 1, channel 4, payload 4 + if not header: + raise EOFError("empty read") + except EOFError: + e = sys.exc_info()[1] + raise EOFError('couldnt load message header, ' + e.args[0]) + msgtype, channel, payload = struct.unpack('!bii', header) + return Message(msgtype, channel, io.read(payload)) + + def to_io(self, io): + header = struct.pack('!bii', self.msgcode, self.channelid, + len(self.data)) + io.write(header+self.data) + + def received(self, gateway): + self._types[self.msgcode](self, gateway) + + def __repr__(self): + class FakeChannel(object): + _strconfig = False, False # never transform, never fail + def __init__(self, id): + self.id = id + def __repr__(self): + return '' % self.id + FakeChannel.new = FakeChannel + FakeChannel.gateway = FakeChannel + name = self._types[self.msgcode].__name__.upper() + try: + data = loads_internal(self.data, FakeChannel) + except LoadError: + data = self.data + r = repr(data) + if len(r) > 90: + return "" %(name, + self.channelid, len(r)) + else: + return "" %(name, + self.channelid, r) + +def _setupmessages(): + def status(message, gateway): + # we use the channelid to send back information + # but don't instantiate a channel object + d = {'numchannels': len(gateway._channelfactory._channels), + 'numexecuting': gateway._execpool.active_count(), + 'execmodel': gateway.execmodel.backend, + } + gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d)) + gateway._send(Message.CHANNEL_CLOSE, message.channelid) + + def channel_exec(message, gateway): + channel = gateway._channelfactory.new(message.channelid) + gateway._local_schedulexec(channel=channel,sourcetask=message.data) + + def channel_data(message, gateway): + gateway._channelfactory._local_receive(message.channelid, message.data) + + def channel_close(message, gateway): + gateway._channelfactory._local_close(message.channelid) + + def channel_close_error(message, gateway): + remote_error = RemoteError(loads_internal(message.data)) + gateway._channelfactory._local_close(message.channelid, remote_error) + + def channel_last_message(message, gateway): + gateway._channelfactory._local_close(message.channelid, sendonly=True) + + def gateway_terminate(message, gateway): + # wake up and terminate any execution waiting to receive something + gateway._channelfactory._finished_receiving() + # then try harder to terminate execution + gateway._terminate_execution() + + def reconfigure(message, gateway): + if message.channelid == 0: + target = gateway + else: + target = gateway._channelfactory.new(message.channelid) + target._strconfig = loads_internal(message.data, gateway) + + types = [ + status, reconfigure, gateway_terminate, + channel_exec, channel_data, channel_close, + channel_close_error, channel_last_message, + ] + for i, handler in enumerate(types): + Message._types.append(handler) + setattr(Message, handler.__name__.upper(), i) + +_setupmessages() + +def geterrortext(excinfo, + format_exception=traceback.format_exception, sysex=sysex): + try: + l = format_exception(*excinfo) + errortext = "".join(l) + except sysex: + raise + except: + errortext = '%s: %s' % (excinfo[0].__name__, + excinfo[1]) + return errortext + +class RemoteError(Exception): + """ Exception containing a stringified error from the other side. """ + def __init__(self, formatted): + self.formatted = formatted + Exception.__init__(self) + + def __str__(self): + return self.formatted + + def __repr__(self): + return "%s: %s" %(self.__class__.__name__, self.formatted) + + def warn(self): + if self.formatted != INTERRUPT_TEXT: + # XXX do this better + sys.stderr.write("[%s] Warning: unhandled %r\n" + % (os.getpid(), self,)) + +class TimeoutError(IOError): + """ Exception indicating that a timeout was reached. """ + + +NO_ENDMARKER_WANTED = object() + +class Channel(object): + """Communication channel between two Python Interpreter execution points.""" + RemoteError = RemoteError + TimeoutError = TimeoutError + Message = Message + _INTERNALWAKEUP = 1000 + _executing = False + + def __init__(self, gateway, id): + assert isinstance(id, int) + self.gateway = gateway + #XXX: defaults copied from Unserializer + self._strconfig = getattr(gateway, '_strconfig', (True, False)) + self.id = id + self._items = self.gateway.execmodel.queue.Queue() + self._closed = False + self._receiveclosed = self.gateway.execmodel.Event() + self._remoteerrors = [] + + def _trace(self, *msg): + self.gateway._trace(self.id, *msg) + + def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): + """ set a callback function for receiving items. + + All already queued items will immediately trigger the callback. + Afterwards the callback will execute in the receiver thread + for each received data item and calls to ``receive()`` will + raise an error. + If an endmarker is specified the callback will eventually + be called with the endmarker when the channel closes. + """ + _callbacks = self.gateway._channelfactory._callbacks + _receivelock = self.gateway._receivelock + _receivelock.acquire() + try: + if self._items is None: + raise IOError("%r has callback already registered" %(self,)) + items = self._items + self._items = None + while 1: + try: + olditem = items.get(block=False) + except self.gateway.execmodel.queue.Empty: + if not (self._closed or self._receiveclosed.isSet()): + _callbacks[self.id] = ( + callback, + endmarker, + self._strconfig, + ) + break + else: + if olditem is ENDMARKER: + items.put(olditem) # for other receivers + if endmarker is not NO_ENDMARKER_WANTED: + callback(endmarker) + break + else: + callback(olditem) + finally: + _receivelock.release() + + def __repr__(self): + flag = self.isclosed() and "closed" or "open" + return "" % (self.id, flag) + + def __del__(self): + if self.gateway is None: # can be None in tests + return + self._trace("channel.__del__") + # no multithreading issues here, because we have the last ref to 'self' + if self._closed: + # state transition "closed" --> "deleted" + for error in self._remoteerrors: + error.warn() + elif self._receiveclosed.isSet(): + # state transition "sendonly" --> "deleted" + # the remote channel is already in "deleted" state, nothing to do + pass + else: + # state transition "opened" --> "deleted" + if self._items is None: # has_callback + msgcode = self.Message.CHANNEL_LAST_MESSAGE + else: + msgcode = self.Message.CHANNEL_CLOSE + try: + self.gateway._send(msgcode, self.id) + except (IOError, ValueError): # ignore problems with sending + pass + + def _getremoteerror(self): + try: + return self._remoteerrors.pop(0) + except IndexError: + try: + return self.gateway._error + except AttributeError: + pass + return None + + # + # public API for channel objects + # + def isclosed(self): + """ return True if the channel is closed. A closed + channel may still hold items. + """ + return self._closed + + def makefile(self, mode='w', proxyclose=False): + """ return a file-like object. + mode can be 'w' or 'r' for writeable/readable files. + if proxyclose is true file.close() will also close the channel. + """ + if mode == "w": + return ChannelFileWrite(channel=self, proxyclose=proxyclose) + elif mode == "r": + return ChannelFileRead(channel=self, proxyclose=proxyclose) + raise ValueError("mode %r not availabe" %(mode,)) + + def close(self, error=None): + """ close down this channel with an optional error message. + Note that closing of a channel tied to remote_exec happens + automatically at the end of execution and cannot + be done explicitely. + """ + if self._executing: + raise IOError("cannot explicitly close channel within remote_exec") + if self._closed: + self.gateway._trace(self, "ignoring redundant call to close()") + if not self._closed: + # state transition "opened/sendonly" --> "closed" + # threads warning: the channel might be closed under our feet, + # but it's never damaging to send too many CHANNEL_CLOSE messages + # however, if the other side triggered a close already, we + # do not send back a closed message. + if not self._receiveclosed.isSet(): + put = self.gateway._send + if error is not None: + put(self.Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error)) + else: + put(self.Message.CHANNEL_CLOSE, self.id) + self._trace("sent channel close message") + if isinstance(error, RemoteError): + self._remoteerrors.append(error) + self._closed = True # --> "closed" + self._receiveclosed.set() + queue = self._items + if queue is not None: + queue.put(ENDMARKER) + self.gateway._channelfactory._no_longer_opened(self.id) + + def waitclose(self, timeout=None): + """ wait until this channel is closed (or the remote side + otherwise signalled that no more data was being sent). + The channel may still hold receiveable items, but not receive + any more after waitclose() has returned. Exceptions from executing + code on the other side are reraised as local channel.RemoteErrors. + EOFError is raised if the reading-connection was prematurely closed, + which often indicates a dying process. + self.TimeoutError is raised after the specified number of seconds + (default is None, i.e. wait indefinitely). + """ + self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state + if not self._receiveclosed.isSet(): + raise self.TimeoutError("Timeout after %r seconds" % timeout) + error = self._getremoteerror() + if error: + raise error + + def send(self, item): + """sends the given item to the other side of the channel, + possibly blocking if the sender queue is full. + The item must be a simple python type and will be + copied to the other side by value. IOError is + raised if the write pipe was prematurely closed. + """ + if self.isclosed(): + raise IOError("cannot send to %r" %(self,)) + self.gateway._send(self.Message.CHANNEL_DATA, self.id, dumps_internal(item)) + + def receive(self, timeout=None): + """receive a data item that was sent from the other side. + timeout: None [default] blocked waiting. A positive number + indicates the number of seconds after which a channel.TimeoutError + exception will be raised if no item was received. + Note that exceptions from the remotely executing code will be + reraised as channel.RemoteError exceptions containing + a textual representation of the remote traceback. + """ + itemqueue = self._items + if itemqueue is None: + raise IOError("cannot receive(), channel has receiver callback") + try: + x = itemqueue.get(timeout=timeout) + except self.gateway.execmodel.queue.Empty: + raise self.TimeoutError("no item after %r seconds" %(timeout)) + if x is ENDMARKER: + itemqueue.put(x) # for other receivers + raise self._getremoteerror() or EOFError() + else: + return x + + def __iter__(self): + return self + + def next(self): + try: + return self.receive() + except EOFError: + raise StopIteration + __next__ = next + + + def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): + """ + set the string coercion for this channel + the default is to try to convert py2 str as py3 str, + but not to try and convert py3 str to py2 str + """ + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = dumps_internal(self._strconfig) + self.gateway._send(self.Message.RECONFIGURE, self.id, data=data) + +ENDMARKER = object() +INTERRUPT_TEXT = "keyboard-interrupted" + +class ChannelFactory(object): + def __init__(self, gateway, startcount=1): + self._channels = weakref.WeakValueDictionary() + self._callbacks = {} + self._writelock = gateway.execmodel.Lock() + self.gateway = gateway + self.count = startcount + self.finished = False + self._list = list # needed during interp-shutdown + + def new(self, id=None): + """ create a new Channel with 'id' (or create new id if None). """ + self._writelock.acquire() + try: + if self.finished: + raise IOError("connexion already closed: %s" % (self.gateway,)) + if id is None: + id = self.count + self.count += 2 + try: + channel = self._channels[id] + except KeyError: + channel = self._channels[id] = Channel(self.gateway, id) + return channel + finally: + self._writelock.release() + + def channels(self): + return self._list(self._channels.values()) + + # + # internal methods, called from the receiver thread + # + def _no_longer_opened(self, id): + try: + del self._channels[id] + except KeyError: + pass + try: + callback, endmarker, strconfig = self._callbacks.pop(id) + except KeyError: + pass + else: + if endmarker is not NO_ENDMARKER_WANTED: + callback(endmarker) + + def _local_close(self, id, remoteerror=None, sendonly=False): + channel = self._channels.get(id) + if channel is None: + # channel already in "deleted" state + if remoteerror: + remoteerror.warn() + self._no_longer_opened(id) + else: + # state transition to "closed" state + if remoteerror: + channel._remoteerrors.append(remoteerror) + queue = channel._items + if queue is not None: + queue.put(ENDMARKER) + self._no_longer_opened(id) + if not sendonly: # otherwise #--> "sendonly" + channel._closed = True # --> "closed" + channel._receiveclosed.set() + + def _local_receive(self, id, data): + # executes in receiver thread + try: + callback, endmarker, strconfig= self._callbacks[id] + channel = self._channels.get(id) + except KeyError: + channel = self._channels.get(id) + queue = channel and channel._items + if queue is None: + pass # drop data + else: + item = loads_internal(data, channel) + queue.put(item) + else: + try: + data = loads_internal(data, channel, strconfig) + callback(data) # even if channel may be already closed + except KeyboardInterrupt: + raise + except: + excinfo = sys.exc_info() + self.gateway._trace("exception during callback: %s" % excinfo[1]) + errortext = self.gateway._geterrortext(excinfo) + self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext)) + self._local_close(id, errortext) + + def _finished_receiving(self): + self.gateway._trace("finished receiving") + self._writelock.acquire() + try: + self.finished = True + finally: + self._writelock.release() + for id in self._list(self._channels): + self._local_close(id, sendonly=True) + for id in self._list(self._callbacks): + self._no_longer_opened(id) + +class ChannelFile(object): + def __init__(self, channel, proxyclose=True): + self.channel = channel + self._proxyclose = proxyclose + + def isatty(self): + return False + + def close(self): + if self._proxyclose: + self.channel.close() + + def __repr__(self): + state = self.channel.isclosed() and 'closed' or 'open' + return '' %(self.channel.id, state) + +class ChannelFileWrite(ChannelFile): + def write(self, out): + self.channel.send(out) + + def flush(self): + pass + +class ChannelFileRead(ChannelFile): + def __init__(self, channel, proxyclose=True): + super(ChannelFileRead, self).__init__(channel, proxyclose) + self._buffer = None + + def read(self, n): + try: + if self._buffer is None: + self._buffer = self.channel.receive() + while len(self._buffer) < n: + self._buffer += self.channel.receive() + except EOFError: + self.close() + if self._buffer is None: + ret = "" + else: + ret = self._buffer[:n] + self._buffer = self._buffer[n:] + return ret + + def readline(self): + if self._buffer is not None: + i = self._buffer.find("\n") + if i != -1: + return self.read(i+1) + line = self.read(len(self._buffer)+1) + else: + line = self.read(1) + while line and line[-1] != "\n": + c = self.read(1) + if not c: + break + line += c + return line + +class BaseGateway(object): + exc_info = sys.exc_info + _sysex = sysex + id = "" + + def __init__(self, io, id, _startcount=2): + self.execmodel = io.execmodel + self._io = io + self.id = id + self._strconfig = (Unserializer.py2str_as_py3str, + Unserializer.py3str_as_py2str) + self._channelfactory = ChannelFactory(self, _startcount) + self._receivelock = self.execmodel.RLock() + # globals may be NONE at process-termination + self.__trace = trace + self._geterrortext = geterrortext + self._workerpool = self.execmodel.WorkerPool(1) + + def _trace(self, *msg): + self.__trace(self.id, *msg) + + def _initreceive(self): + self._receiverthread = self._workerpool.spawn(self._thread_receiver) + + def _thread_receiver(self): + def log(*msg): + self._trace("[receiver-thread]", *msg) + log("RECEIVERTHREAD: starting to run") + eof = False + io = self._io + try: + try: + while 1: + msg = Message.from_io(io) + log("received", msg) + _receivelock = self._receivelock + _receivelock.acquire() + try: + msg.received(self) + del msg + finally: + _receivelock.release() + except self._sysex: + log("io.close_read()") + self._io.close_read() + except EOFError: + log("got EOF") + #log("traceback was: ", self._geterrortext(self.exc_info())) + self._error = self.exc_info()[1] + eof = True + except: + log(self._geterrortext(self.exc_info())) + finally: + try: + log('entering finalization') + # wake up any execution waiting to receive something + self._channelfactory._finished_receiving() + if eof: + self._terminate_execution() + log('leaving finalization') + except: + pass # XXX be silent at interp-shutdown + + def _terminate_execution(self): + pass + + def _send(self, msgcode, channelid=0, data=bytes()): + message = Message(msgcode, channelid, data) + try: + message.to_io(self._io) + self._trace('sent', message) + except (IOError, ValueError): + e = sys.exc_info()[1] + self._trace('failed to send', message, e) + raise + + + def _local_schedulexec(self, channel, sourcetask): + channel.close("execution disallowed") + + # _____________________________________________________________________ + # + # High Level Interface + # _____________________________________________________________________ + # + def newchannel(self): + """ return a new independent channel. """ + return self._channelfactory.new() + + def join(self, timeout=None): + """ Wait for receiverthread to terminate. """ + self._trace("waiting for receiver thread to finish") + self._receiverthread.waitfinish() + +class SlaveGateway(BaseGateway): + + def _local_schedulexec(self, channel, sourcetask): + sourcetask = loads_internal(sourcetask) + self._execpool.spawn(self.executetask, ((channel, sourcetask))) + + def _terminate_execution(self): + # called from receiverthread + self._trace("shutting down execution pool") + self._execpool.shutdown() + if not self._execpool.waitall(1.0): + self._trace("execution ongoing, trying interrupt_main") + # We try hard to terminate execution based on the assumption + # that there is only one gateway object running per-process. + if sys.platform != "win32": + self._trace("sending ourselves a SIGINT") + os.kill(os.getpid(), 2) # send ourselves a SIGINT + elif interrupt_main is not None: + self._trace("calling interrupt_main()") + interrupt_main() + if not self._execpool.waitall(10.0): + self._trace("execution did not finish in 10 secs, " + "calling os._exit()") + os._exit(1) + + def serve(self): + trace = lambda msg: self._trace("[serve] " + msg) + hasprimary = self.execmodel.backend == "thread" + self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) + trace("spawning receiver thread") + self._initreceive() + try: + try: + if hasprimary: + trace("integrating as main primary exec thread") + self._execpool.integrate_as_primary_thread() + else: + trace("waiting for execution to finish") + self._execpool.wait_for_shutdown() + finally: + trace("execution finished, closing io write stream") + self._io.close_write() + trace("joining receiver thread") + self.join() + except KeyboardInterrupt: + # in the slave we can't really do anything sensible + trace("swallowing keyboardinterrupt, serve finished") + + def executetask(self, item): + try: + channel, (source, call_name, kwargs) = item + if not ISPY3 and kwargs: + # some python2 versions do not accept unicode keyword params + # note: Unserializer generally turns py2-str to py3-str objects + newkwargs = {} + for name, value in kwargs.items(): + if isinstance(name, unicode): + name = name.encode('ascii') + newkwargs[name] = value + kwargs = newkwargs + loc = {'channel' : channel, '__name__': '__channelexec__'} + self._trace("execution starts[%s]: %s" % + (channel.id, repr(source)[:50])) + channel._executing = True + try: + co = compile(source+'\n', '', 'exec') + do_exec(co, loc) # noqa + if call_name: + self._trace('calling %s(**%60r)' % (call_name, kwargs)) + function = loc[call_name] + function(channel, **kwargs) + finally: + channel._executing = False + self._trace("execution finished") + except KeyboardInterrupt: + channel.close(INTERRUPT_TEXT) + raise + except: + excinfo = self.exc_info() + if not isinstance(excinfo[1], EOFError): + if not channel.gateway._channelfactory.finished: + self._trace("got exception: %r" % (excinfo[1],)) + errortext = self._geterrortext(excinfo) + channel.close(errortext) + return + self._trace("ignoring EOFError because receiving finished") + channel.close() + +# +# Cross-Python pickling code, tested from test_serializer.py +# + +class DataFormatError(Exception): + pass + +class DumpError(DataFormatError): + """Error while serializing an object.""" + +class LoadError(DataFormatError): + """Error while unserializing an object.""" + +if ISPY3: + def bchr(n): + return bytes([n]) +else: + bchr = chr + +DUMPFORMAT_VERSION = bchr(1) + +FOUR_BYTE_INT_MAX = 2147483647 + +FLOAT_FORMAT = "!d" +FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT) + +class _Stop(Exception): + pass + +class Unserializer(object): + num2func = {} # is filled after this class definition + py2str_as_py3str = True # True + py3str_as_py2str = False # false means py2 will get unicode + + def __init__(self, stream, channel_or_gateway=None, strconfig=None): + gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) + strconfig = getattr(channel_or_gateway, '_strconfig', strconfig) + if strconfig: + self.py2str_as_py3str, self.py3str_as_py2str = strconfig + self.stream = stream + self.channelfactory = getattr(gateway, '_channelfactory', gateway) + + def load(self, versioned=False): + if versioned: + ver = self.stream.read(1) + if ver != DUMPFORMAT_VERSION: + raise LoadError("wrong dumpformat version") + self.stack = [] + try: + while True: + opcode = self.stream.read(1) + if not opcode: + raise EOFError + try: + loader = self.num2func[opcode] + except KeyError: + raise LoadError("unkown opcode %r - " + "wire protocol corruption?" % (opcode,)) + loader(self) + except _Stop: + if len(self.stack) != 1: + raise LoadError("internal unserialization error") + return self.stack.pop(0) + else: + raise LoadError("didn't get STOP") + + def load_none(self): + self.stack.append(None) + + def load_true(self): + self.stack.append(True) + + def load_false(self): + self.stack.append(False) + + def load_int(self): + i = self._read_int4() + self.stack.append(i) + + def load_longint(self): + s = self._read_byte_string() + self.stack.append(int(s)) + + if ISPY3: + load_long = load_int + load_longlong = load_longint + else: + def load_long(self): + i = self._read_int4() + self.stack.append(long(i)) + + def load_longlong(self): + l = self._read_byte_string() + self.stack.append(long(l)) + + def load_float(self): + binary = self.stream.read(FLOAT_FORMAT_SIZE) + self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0]) + + def _read_int4(self): + return struct.unpack("!i", self.stream.read(4))[0] + + def _read_byte_string(self): + length = self._read_int4() + as_bytes = self.stream.read(length) + return as_bytes + + def load_py3string(self): + as_bytes = self._read_byte_string() + if not ISPY3 and self.py3str_as_py2str: + # XXX Should we try to decode into latin-1? + self.stack.append(as_bytes) + else: + self.stack.append(as_bytes.decode("utf-8")) + + def load_py2string(self): + as_bytes = self._read_byte_string() + if ISPY3 and self.py2str_as_py3str: + s = as_bytes.decode("latin-1") + else: + s = as_bytes + self.stack.append(s) + + def load_bytes(self): + s = self._read_byte_string() + self.stack.append(s) + + def load_unicode(self): + self.stack.append(self._read_byte_string().decode("utf-8")) + + def load_newlist(self): + length = self._read_int4() + self.stack.append([None] * length) + + def load_setitem(self): + if len(self.stack) < 3: + raise LoadError("not enough items for setitem") + value = self.stack.pop() + key = self.stack.pop() + self.stack[-1][key] = value + + def load_newdict(self): + self.stack.append({}) + + def _load_collection(self, type_): + length = self._read_int4() + if length: + res = type_(self.stack[-length:]) + del self.stack[-length:] + self.stack.append(res) + else: + self.stack.append(type_()) + + def load_buildtuple(self): + self._load_collection(tuple) + + def load_set(self): + self._load_collection(set) + + def load_frozenset(self): + self._load_collection(frozenset) + + def load_stop(self): + raise _Stop + + def load_channel(self): + id = self._read_int4() + newchannel = self.channelfactory.new(id) + self.stack.append(newchannel) + +# automatically build opcodes and byte-encoding + +class opcode: + """ container for name -> num mappings. """ + +def _buildopcodes(): + l = [] + for name, func in Unserializer.__dict__.items(): + if name.startswith("load_"): + opname = name[5:].upper() + l.append((opname, func)) + l.sort() + for i,(opname, func) in enumerate(l): + assert i < 26, "xxx" + i = bchr(64+i) + Unserializer.num2func[i] = func + setattr(opcode, opname, i) + +_buildopcodes() + +def dumps(obj): + """ return a serialized bytestring of the given obj. + + The obj and all contained objects must be of a builtin + python type (so nested dicts, sets, etc. are all ok but + not user-level instances). + """ + return _Serializer().save(obj, versioned=True) + +def dump(byteio, obj): + """ write a serialized bytestring of the given obj to the given stream. """ + _Serializer(write=byteio.write).save(obj, versioned=True) + +def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False): + """ return the object as deserialized from the given bytestring. + + py2str_as_py3str: if true then string (str) objects previously + dumped on Python2 will be loaded as Python3 + strings which really are text objects. + py3str_as_py2str: if true then string (str) objects previously + dumped on Python3 will be loaded as Python2 + strings instead of unicode objects. + + if the bytestring was dumped with an incompatible protocol + version or if the bytestring is corrupted, the + ``execnet.DataFormatError`` will be raised. + """ + io = BytesIO(bytestring) + return load(io, py2str_as_py3str=py2str_as_py3str, + py3str_as_py2str=py3str_as_py2str) + +def load(io, py2str_as_py3str=False, py3str_as_py2str=False): + """ derserialize an object form the specified stream. + + Behaviour and parameters are otherwise the same as with ``loads`` + """ + strconfig=(py2str_as_py3str, py3str_as_py2str) + return Unserializer(io, strconfig=strconfig).load(versioned=True) + +def loads_internal(bytestring, channelfactory=None, strconfig=None): + io = BytesIO(bytestring) + return Unserializer(io, channelfactory, strconfig).load() + +def dumps_internal(obj): + return _Serializer().save(obj) + + +class _Serializer(object): + _dispatch = {} + + def __init__(self, write=None): + if write is None: + self._streamlist = [] + write = self._streamlist.append + self._write = write + + def save(self, obj, versioned=False): + # calling here is not re-entrant but multiple instances + # may write to the same stream because of the common platform + # atomic-write guaruantee (concurrent writes each happen atomicly) + if versioned: + self._write(DUMPFORMAT_VERSION) + self._save(obj) + self._write(opcode.STOP) + try: + streamlist = self._streamlist + except AttributeError: + return None + return type(streamlist[0])().join(streamlist) + + def _save(self, obj): + tp = type(obj) + try: + dispatch = self._dispatch[tp] + except KeyError: + methodname = 'save_' + tp.__name__ + meth = getattr(self.__class__, methodname, None) + if meth is None: + raise DumpError("can't serialize %s" % (tp,)) + dispatch = self._dispatch[tp] = meth + dispatch(self, obj) + + def save_NoneType(self, non): + self._write(opcode.NONE) + + def save_bool(self, boolean): + if boolean: + self._write(opcode.TRUE) + else: + self._write(opcode.FALSE) + + def save_bytes(self, bytes_): + self._write(opcode.BYTES) + self._write_byte_sequence(bytes_) + + if ISPY3: + def save_str(self, s): + self._write(opcode.PY3STRING) + self._write_unicode_string(s) + else: + def save_str(self, s): + self._write(opcode.PY2STRING) + self._write_byte_sequence(s) + + def save_unicode(self, s): + self._write(opcode.UNICODE) + self._write_unicode_string(s) + + def _write_unicode_string(self, s): + try: + as_bytes = s.encode("utf-8") + except UnicodeEncodeError: + raise DumpError("strings must be utf-8 encodable") + self._write_byte_sequence(as_bytes) + + def _write_byte_sequence(self, bytes_): + self._write_int4(len(bytes_), "string is too long") + self._write(bytes_) + + def _save_integral(self, i, short_op, long_op): + if i <= FOUR_BYTE_INT_MAX: + self._write(short_op) + self._write_int4(i) + else: + self._write(long_op) + self._write_byte_sequence(str(i).rstrip("L").encode("ascii")) + + def save_int(self, i): + self._save_integral(i, opcode.INT, opcode.LONGINT) + + def save_long(self, l): + self._save_integral(l, opcode.LONG, opcode.LONGLONG) + + def save_float(self, flt): + self._write(opcode.FLOAT) + self._write(struct.pack(FLOAT_FORMAT, flt)) + + def _write_int4(self, i, error="int must be less than %i" % + (FOUR_BYTE_INT_MAX,)): + if i > FOUR_BYTE_INT_MAX: + raise DumpError(error) + self._write(struct.pack("!i", i)) + + def save_list(self, L): + self._write(opcode.NEWLIST) + self._write_int4(len(L), "list is too long") + for i, item in enumerate(L): + self._write_setitem(i, item) + + def _write_setitem(self, key, value): + self._save(key) + self._save(value) + self._write(opcode.SETITEM) + + def save_dict(self, d): + self._write(opcode.NEWDICT) + for key, value in d.items(): + self._write_setitem(key, value) + + def save_tuple(self, tup): + for item in tup: + self._save(item) + self._write(opcode.BUILDTUPLE) + self._write_int4(len(tup), "tuple is too long") + + def _write_set(self, s, op): + for item in s: + self._save(item) + self._write(op) + self._write_int4(len(s), "set is too long") + + def save_set(self, s): + self._write_set(s, opcode.SET) + + def save_frozenset(self, s): + self._write_set(s, opcode.FROZENSET) + + def save_Channel(self, channel): + self._write(opcode.CHANNEL) + self._write_int4(channel.id) + +def init_popen_io(execmodel): + if not hasattr(os, 'dup'): # jython + io = Popen2IO(sys.stdout, sys.stdin, execmodel) + import tempfile + sys.stdin = tempfile.TemporaryFile('r') + sys.stdout = tempfile.TemporaryFile('w') + else: + try: + devnull = os.devnull + except AttributeError: + if os.name == 'nt': + devnull = 'NUL' + else: + devnull = '/dev/null' + # stdin + stdin = execmodel.fdopen(os.dup(0), 'r', 1) + fd = os.open(devnull, os.O_RDONLY) + os.dup2(fd, 0) + os.close(fd) + + # stdout + stdout = execmodel.fdopen(os.dup(1), 'w', 1) + fd = os.open(devnull, os.O_WRONLY) + os.dup2(fd, 1) + + # stderr for win32 + if os.name == 'nt': + sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1) + os.dup2(fd, 2) + os.close(fd) + io = Popen2IO(stdout, stdin, execmodel) + sys.stdin = execmodel.fdopen(0, 'r', 1) + sys.stdout = execmodel.fdopen(1, 'w', 1) + return io + +def serve(io, id): + trace("creating slavegateway on %r" %(io,)) + SlaveGateway(io=io, id=id, _startcount=2).serve() diff --git a/remoto/lib/execnet/gateway_bootstrap.py b/remoto/lib/execnet/gateway_bootstrap.py index 971d058..abd084a 100644 --- a/remoto/lib/execnet/gateway_bootstrap.py +++ b/remoto/lib/execnet/gateway_bootstrap.py @@ -17,20 +17,22 @@ def bootstrap_popen(io, spec): sendexec(io, "import sys", "sys.path.insert(0, %r)" % importdir, - "from execnet.gateway_base import serve, init_popen_io", + "from execnet.gateway_base import serve, init_popen_io, get_execmodel", "sys.stdout.write('1')", "sys.stdout.flush()", - "serve(init_popen_io(), id='%s-slave')" % spec.id, + "execmodel = get_execmodel(%r)" % spec.execmodel, + "serve(init_popen_io(execmodel), id='%s-slave')" % spec.id, ) s = io.read(1) - assert s == "1".encode('ascii') + assert s == "1".encode('ascii'), repr(s) def bootstrap_ssh(io, spec): try: sendexec(io, inspect.getsource(gateway_base), - 'io = init_popen_io()', + "execmodel = get_execmodel(%r)" % spec.execmodel, + 'io = init_popen_io(execmodel)', "io.write('1'.encode('ascii'))", "serve(io, id='%s-slave')" % spec.id, ) @@ -50,7 +52,10 @@ def bootstrap_socket(io, id): inspect.getsource(gateway_base), 'import socket', inspect.getsource(SocketIO), - "io = SocketIO(clientsock)", + "try: execmodel", + "except NameError:", + " execmodel = get_execmodel('thread')", + "io = SocketIO(clientsock, execmodel)", "io.write('1'.encode('ascii'))", "serve(io, id='%s-slave')" % id, ) diff --git a/remoto/lib/execnet/gateway_io.py b/remoto/lib/execnet/gateway_io.py index 242ac9e..cc64d84 100644 --- a/remoto/lib/execnet/gateway_io.py +++ b/remoto/lib/execnet/gateway_io.py @@ -5,7 +5,6 @@ creates io instances used for gateway io """ import os import sys -from subprocess import Popen, PIPE try: from execnet.gateway_base import Popen2IO, Message @@ -13,9 +12,9 @@ except ImportError: from __main__ import Popen2IO, Message class Popen2IOMaster(Popen2IO): - def __init__(self, args): - self.popen = p = Popen(args, stdin=PIPE, stdout=PIPE) - Popen2IO.__init__(self, p.stdin, p.stdout) + def __init__(self, args, execmodel): + self.popen = p = execmodel.PopenPiped(args) + Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel) def wait(self): try: @@ -86,32 +85,47 @@ def ssh_args(spec): args.append(remotecmd) return args - -def create_io(spec): +def create_io(spec, execmodel): if spec.popen: args = popen_args(spec) - return Popen2IOMaster(args) + return Popen2IOMaster(args, execmodel) if spec.ssh: args = ssh_args(spec) - io = Popen2IOMaster(args) + io = Popen2IOMaster(args, execmodel) io.remoteaddress = spec.ssh return io +# +# Proxy Gateway handling code +# +# master: proxy initiator +# forwarder: forwards between master and sub +# sub: sub process that is proxied to the initiator + RIO_KILL = 1 RIO_WAIT = 2 RIO_REMOTEADDRESS = 3 RIO_CLOSE_WRITE = 4 -class RemoteIO(object): - def __init__(self, master_channel): - self.iochan = master_channel.gateway.newchannel() - self.controlchan = master_channel.gateway.newchannel() - master_channel.send((self.iochan, self.controlchan)) - self.io = self.iochan.makefile('r') - +class ProxyIO(object): + """ A Proxy IO object allows to instantiate a Gateway + through another "via" gateway. A master:ProxyIO object + provides an IO object effectively connected to the sub + via the forwarder. To achieve this, master:ProxyIO interacts + with forwarder:serve_proxy_io() which itself + instantiates and interacts with the sub. + """ + def __init__(self, proxy_channel, execmodel): + # after exchanging the control channel we use proxy_channel + # for messaging IO + self.controlchan = proxy_channel.gateway.newchannel() + proxy_channel.send(self.controlchan) + self.iochan = proxy_channel + self.iochan_file = self.iochan.makefile('r') + self.execmodel = execmodel def read(self, nbytes): - return self.io.read(nbytes) + return self.iochan_file.read(nbytes) def write(self, data): return self.iochan.send(data) @@ -129,49 +143,68 @@ class RemoteIO(object): def wait(self): return self._controll(RIO_WAIT) + @property + def remoteaddress(self): + return self._controll(RIO_REMOTEADDRESS) + def __repr__(self): return '' % (self.iochan.gateway.id, ) - -def serve_remote_io(channel): - class PseudoSpec(object): - def __getattr__(self, name): - return None - spec = PseudoSpec() - spec.__dict__.update(channel.receive()) - io = create_io(spec) - io_chan, control_chan = channel.receive() - io_target = io_chan.makefile() - - def iothread(): - initial = io.read(1) - assert initial == '1'.encode('ascii') - channel.gateway._trace('initializing transfer io for', spec.id) - io_target.write(initial) - while True: - message = Message.from_io(io) - message.to_io(io_target) - import threading - thread = threading.Thread(name='io-forward-'+spec.id, - target=iothread) - thread.setDaemon(True) - thread.start() - - def iocallback(data): - io.write(data) - io_chan.setcallback(iocallback) - +class PseudoSpec: + def __init__(self, vars): + self.__dict__.update(vars) + def __getattr__(self, name): + return None + +def serve_proxy_io(proxy_channelX): + execmodel = proxy_channelX.gateway.execmodel + _trace = proxy_channelX.gateway._trace + tag = "serve_proxy_io:%s " % proxy_channelX.id + def log(*msg): + _trace(tag + msg[0], *msg[1:]) + spec = PseudoSpec(proxy_channelX.receive()) + # create sub IO object which we will proxy back to our proxy initiator + sub_io = create_io(spec, execmodel) + control_chan = proxy_channelX.receive() + log("got control chan", control_chan) + + # read data from master, forward it to the sub + # XXX writing might block, thus blocking the receiver thread + def forward_to_sub(data): + log("forward data to sub, size %s" % len(data)) + sub_io.write(data) + proxy_channelX.setcallback(forward_to_sub) def controll(data): if data==RIO_WAIT: - control_chan.send(io.wait()) + control_chan.send(sub_io.wait()) elif data==RIO_KILL: - control_chan.send(io.kill()) + control_chan.send(sub_io.kill()) elif data==RIO_REMOTEADDRESS: - control_chan.send(io.remoteaddress) + control_chan.send(sub_io.remoteaddress) elif data==RIO_CLOSE_WRITE: - control_chan.send(io.close_write()) + control_chan.send(sub_io.close_write()) control_chan.setcallback(controll) + # write data to the master coming from the sub + forward_to_master_file = proxy_channelX.makefile("w") + + # read bootstrap byte from sub, send it on to master + log('reading bootstrap byte from sub', spec.id) + initial = sub_io.read(1) + assert initial == '1'.encode('ascii'), initial + log('forwarding bootstrap byte from sub', spec.id) + forward_to_master_file.write(initial) + + # enter message forwarding loop + while True: + try: + message = Message.from_io(sub_io) + except EOFError: + log('EOF from sub, terminating proxying loop', spec.id) + break + message.to_io(forward_to_master_file) + # proxy_channelX will be closed from remote_exec's finalization code + if __name__ == "__channelexec__": - serve_remote_io(channel) + serve_proxy_io(channel) # noqa diff --git a/remoto/lib/execnet/gateway_socket.py b/remoto/lib/execnet/gateway_socket.py index 7b98bff..3bb0589 100644 --- a/remoto/lib/execnet/gateway_socket.py +++ b/remoto/lib/execnet/gateway_socket.py @@ -1,17 +1,14 @@ -import socket -from execnet.gateway import Gateway from execnet.gateway_bootstrap import HostNotFound -import os, sys, inspect - +import sys try: bytes except NameError: bytes = str class SocketIO: - - error = (socket.error, EOFError) - def __init__(self, sock): + def __init__(self, sock, execmodel): self.sock = sock + self.execmodel = execmodel + socket = execmodel.socket try: sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) @@ -34,12 +31,12 @@ class SocketIO: def close_read(self): try: self.sock.shutdown(0) - except socket.error: + except self.execmodel.socket.error: pass def close_write(self): try: self.sock.shutdown(1) - except socket.error: + except self.execmodel.socket.error: pass def wait(self): @@ -71,7 +68,7 @@ def start_via(gateway, hostport=None): return realhost, realport -def create_io(spec, group): +def create_io(spec, group, execmodel): assert not spec.python, ( "socket: specifying python executables not yet supported") gateway_id = spec.installvia @@ -81,11 +78,12 @@ def create_io(spec, group): host, port = spec.socket.split(":") port = int(port) + socket = execmodel.socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - io = SocketIO(sock) + io = SocketIO(sock, execmodel) io.remoteaddress = '%s:%d' % (host, port) try: sock.connect((host, port)) - except socket.gaierror: + except execmodel.socket.gaierror: raise HostNotFound(str(sys.exc_info()[1])) return io diff --git a/remoto/lib/execnet/multi.py b/remoto/lib/execnet/multi.py index 5cd76d6..1343f3c 100644 --- a/remoto/lib/execnet/multi.py +++ b/remoto/lib/execnet/multi.py @@ -1,33 +1,66 @@ """ Managing Gateway Groups and interactions with multiple channels. -(c) 2008-2009, Holger Krekel and others +(c) 2008-2014, Holger Krekel and others """ -import os, sys, atexit -import time -import execnet -from execnet.threadpool import WorkerPool +import sys, atexit from execnet import XSpec -from execnet import gateway, gateway_io, gateway_bootstrap -from execnet.gateway_base import queue, reraise, trace, TimeoutError +from execnet import gateway_io, gateway_bootstrap +from execnet.gateway_base import reraise, trace, get_execmodel +from threading import Lock NO_ENDMARKER_WANTED = object() -class Group: +class Group(object): """ Gateway Groups. """ defaultspec = "popen" - def __init__(self, xspecs=()): - """ initialize group and make gateways as specified. """ - # Gateways may evolve to become GC-collectable + def __init__(self, xspecs=(), execmodel="thread"): + """ initialize group and make gateways as specified. + execmodel can be 'thread' or 'eventlet'. + """ self._gateways = [] self._autoidcounter = 0 + self._autoidlock = Lock() self._gateways_to_join = [] + # we use the same execmodel for all of the Gateway objects + # we spawn on our side. Probably we should not allow different + # execmodels between different groups but not clear. + # Note that "other side" execmodels may differ and is typically + # specified by the spec passed to makegateway. + self.set_execmodel(execmodel) for xspec in xspecs: self.makegateway(xspec) atexit.register(self._cleanup_atexit) + @property + def execmodel(self): + return self._execmodel + + @property + def remote_execmodel(self): + return self._remote_execmodel + + def set_execmodel(self, execmodel, remote_execmodel=None): + """ Set the execution model for local and remote site. + + execmodel can be one of "thread" or "eventlet" (XXX gevent). + It determines the execution model for any newly created gateway. + If remote_execmodel is not specified it takes on the value + of execmodel. + + NOTE: Execution models can only be set before any gateway is created. + + """ + if self._gateways: + raise ValueError("can not set execution models if " + "gateways have been created already") + if remote_execmodel is None: + remote_execmodel = execmodel + self._execmodel = get_execmodel(execmodel) + self._remote_execmodel = get_execmodel(remote_execmodel) + def __repr__(self): idgateways = [gw.id for gw in self] return "" %(idgateways) @@ -66,6 +99,7 @@ class Group: id= specifies the gateway id python= specifies which python interpreter to execute + execmodel=model 'thread', 'eventlet', 'gevent' model for execution chdir= specifies to which directory to change nice= specifies process priority of new process env:NAME=value specifies a remote environment variable setting. @@ -77,19 +111,21 @@ class Group: if not isinstance(spec, XSpec): spec = XSpec(spec) self.allocate_id(spec) + if spec.execmodel is None: + spec.execmodel = self.remote_execmodel.backend if spec.via: assert not spec.socket master = self[spec.via] - channel = master.remote_exec(gateway_io) - channel.send(vars(spec)) - io = gateway_io.RemoteIO(channel) - gw = gateway_bootstrap.bootstrap(io, spec) + proxy_channel = master.remote_exec(gateway_io) + proxy_channel.send(vars(spec)) + proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel) + gw = gateway_bootstrap.bootstrap(proxy_io_master, spec) elif spec.popen or spec.ssh: - io = gateway_io.create_io(spec) + io = gateway_io.create_io(spec, execmodel=self.execmodel) gw = gateway_bootstrap.bootstrap(io, spec) elif spec.socket: from execnet import gateway_socket - io = gateway_socket.create_io(spec, self) + io = gateway_socket.create_io(spec, self, execmodel=self.execmodel) gw = gateway_bootstrap.bootstrap(io, spec) else: raise ValueError("no gateway type found for %r" % (spec._spec,)) @@ -115,13 +151,14 @@ class Group: return gw def allocate_id(self, spec): - """ allocate id for the given xspec object. """ + """ (re-entrant) allocate id for the given xspec object. """ if spec.id is None: - id = "gw" + str(self._autoidcounter) - self._autoidcounter += 1 - if id in self: - raise ValueError("already have gateway with id %r" %(id,)) - spec.id = id + with self._autoidlock: + id = "gw" + str(self._autoidcounter) + self._autoidcounter += 1 + if id in self: + raise ValueError("already have gateway with id %r" %(id,)) + spec.id = id def _register(self, gateway): assert not hasattr(gateway, '_group') @@ -147,7 +184,6 @@ class Group: """ while self: - from execnet.threadpool import WorkerPool vias = {} for gw in self: if gw.spec.via: @@ -163,7 +199,7 @@ class Group: trace("Gateways did not come down after timeout: %r" % gw) gw._io.kill() - safe_terminate(timeout, [ + safe_terminate(self.execmodel, timeout, [ (lambda: join_wait(gw), lambda: kill(gw)) for gw in self._gateways_to_join]) self._gateways_to_join[:] = [] @@ -212,8 +248,10 @@ class MultiChannel: try: return self._queue except AttributeError: - self._queue = queue.Queue() + self._queue = None for ch in self._channels: + if self._queue is None: + self._queue = ch.gateway.execmodel.queue.Queue() def putreceived(obj, channel=ch): self._queue.put((channel, obj)) if endmarker is NO_ENDMARKER_WANTED: @@ -236,11 +274,11 @@ class MultiChannel: -def safe_terminate(timeout, list_of_paired_functions): - workerpool = WorkerPool(len(list_of_paired_functions)*2) +def safe_terminate(execmodel, timeout, list_of_paired_functions): + workerpool = execmodel.WorkerPool() def termkill(termfunc, killfunc): - termreply = workerpool.dispatch(termfunc) + termreply = workerpool.spawn(termfunc) try: termreply.get(timeout=timeout) except IOError: @@ -248,14 +286,14 @@ def safe_terminate(timeout, list_of_paired_functions): replylist = [] for termfunc, killfunc in list_of_paired_functions: - reply = workerpool.dispatch(termkill, termfunc, killfunc) + reply = workerpool.spawn(termkill, termfunc, killfunc) replylist.append(reply) for reply in replylist: reply.get() - workerpool.shutdown() - workerpool.join() + workerpool.waitall() default_group = Group() makegateway = default_group.makegateway +set_execmodel = default_group.set_execmodel diff --git a/remoto/lib/execnet/rsync_remote.py b/remoto/lib/execnet/rsync_remote.py index eee139c..a36b8ce 100644 --- a/remoto/lib/execnet/rsync_remote.py +++ b/remoto/lib/execnet/rsync_remote.py @@ -1,5 +1,5 @@ """ -(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski +(c) 2006-2013, Armin Rigo, Holger Krekel, Maciej Fijalkowski """ def serve_rsync(channel): import os, stat, shutil @@ -106,4 +106,4 @@ def serve_rsync(channel): channel.send(("done", None)) if __name__ == '__channelexec__': - serve_rsync(channel) + serve_rsync(channel) # noqa diff --git a/remoto/lib/execnet/script/shell.py b/remoto/lib/execnet/script/shell.py old mode 100644 new mode 100755 index 9196f41..ecea167 --- a/remoto/lib/execnet/script/shell.py +++ b/remoto/lib/execnet/script/shell.py @@ -10,7 +10,6 @@ try: clientsock except NameError: print("client side starting") - import sys host, port = sys.argv[1].split(':') port = int(port) myself = open(os.path.abspath(sys.argv[0]), 'rU').read() @@ -73,13 +72,13 @@ class promptagent(Thread): sys.stderr=olderr clientfile.flush() except EOFError: - e = sys.exc_info()[1] + #e = sys.exc_info()[1] sys.stderr.write("connection close, prompt thread returns") break #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info())) self.clientsock.close() -prompter = promptagent(clientsock) +prompter = promptagent(clientsock) # noqa prompter.start() print("promptagent - thread started") diff --git a/remoto/lib/execnet/script/socketserver.py b/remoto/lib/execnet/script/socketserver.py old mode 100644 new mode 100755 index 596597b..7b0b92a --- a/remoto/lib/execnet/script/socketserver.py +++ b/remoto/lib/execnet/script/socketserver.py @@ -15,11 +15,16 @@ progname = 'socket_readline_exec_server-1.2' -import sys, socket, os -try: - import fcntl -except ImportError: - fcntl = None +import sys, os + +def get_fcntl(): + try: + import fcntl + except ImportError: + fcntl = None + return fcntl + +fcntl = get_fcntl() debug = 0 @@ -47,19 +52,20 @@ def exec_from_one_connection(serversock): # rstrip so that we can use \r\n for telnet testing source = clientfile.readline().rstrip() clientfile.close() - g = {'clientsock' : clientsock, 'address' : address} + g = {'clientsock' : clientsock, 'address' : address, 'execmodel': execmodel} source = eval(source) if source: co = compile(source+'\n', source, 'exec') print_(progname, 'compiled source, executing') try: - exec_(co, g) + exec_(co, g) # noqa finally: print_(progname, 'finished executing code') # background thread might hold a reference to this (!?) #clientsock.close() -def bind_and_listen(hostport): +def bind_and_listen(hostport, execmodel): + socket = execmodel.socket if isinstance(hostport, str): host, port = hostport.split(':') hostport = (host, int(port)) @@ -102,11 +108,15 @@ if __name__ == '__main__': hostport = sys.argv[1] else: hostport = ':8888' - serversock = bind_and_listen(hostport) + from execnet.gateway_base import get_execmodel + execmodel = get_execmodel("thread") + serversock = bind_and_listen(hostport, execmodel) startserver(serversock, loop=False) + elif __name__=='__channelexec__': - bindname = channel.receive() - sock = bind_and_listen(bindname) + execmodel = channel.gateway.execmodel # noqa + bindname = channel.receive() # noqa + sock = bind_and_listen(bindname, execmodel) port = sock.getsockname() - channel.send(port) + channel.send(port) # noqa startserver(sock) diff --git a/remoto/lib/execnet/script/socketserverservice.py b/remoto/lib/execnet/script/socketserverservice.py index 0d208f8..562083c 100644 --- a/remoto/lib/execnet/script/socketserverservice.py +++ b/remoto/lib/execnet/script/socketserverservice.py @@ -7,8 +7,6 @@ To use, run: """ import sys -import os -import time import win32serviceutil import win32service import win32event diff --git a/remoto/lib/execnet/threadpool.py b/remoto/lib/execnet/threadpool.py deleted file mode 100644 index 812d16a..0000000 --- a/remoto/lib/execnet/threadpool.py +++ /dev/null @@ -1,183 +0,0 @@ -""" -dispatching execution to threads - -(c) 2009, holger krekel -""" -import threading -import time -import sys - -# py2/py3 compatibility -try: - import queue -except ImportError: - import Queue as queue -if sys.version_info >= (3,0): - exec ("def reraise(cls, val, tb): raise val") -else: - exec ("def reraise(cls, val, tb): raise cls, val, tb") - -ERRORMARKER = object() - -class Reply(object): - """ reply instances provide access to the result - of a function execution that got dispatched - through WorkerPool.dispatch() - """ - _excinfo = None - def __init__(self, task): - self.task = task - self._queue = queue.Queue() - - def _set(self, result): - self._queue.put(result) - - def _setexcinfo(self, excinfo): - self._excinfo = excinfo - self._queue.put(ERRORMARKER) - - def get(self, timeout=None): - """ get the result object from an asynchronous function execution. - if the function execution raised an exception, - then calling get() will reraise that exception - including its traceback. - """ - if self._queue is None: - raise EOFError("reply has already been delivered") - try: - result = self._queue.get(timeout=timeout) - except queue.Empty: - raise IOError("timeout waiting for %r" %(self.task, )) - if result is ERRORMARKER: - self._queue = None - excinfo = self._excinfo - reraise(excinfo[0], excinfo[1], excinfo[2]) - return result - -class WorkerThread(threading.Thread): - def __init__(self, pool): - threading.Thread.__init__(self) - self._queue = queue.Queue() - self._pool = pool - self.setDaemon(1) - - def _run_once(self): - reply = self._queue.get() - if reply is SystemExit: - return False - assert self not in self._pool._ready - task = reply.task - try: - func, args, kwargs = task - result = func(*args, **kwargs) - except (SystemExit, KeyboardInterrupt): - return False - except: - reply._setexcinfo(sys.exc_info()) - else: - reply._set(result) - # at this point, reply, task and all other local variables go away - return True - - def run(self): - try: - while self._run_once(): - self._pool._ready[self] = True - finally: - del self._pool._alive[self] - try: - del self._pool._ready[self] - except KeyError: - pass - - def send(self, task): - reply = Reply(task) - self._queue.put(reply) - return reply - - def stop(self): - self._queue.put(SystemExit) - -class WorkerPool(object): - """ A WorkerPool allows to dispatch function executions - to threads. Each Worker Thread is reused for multiple - function executions. The dispatching operation - takes care to create and dispatch to existing - threads. - - You need to call shutdown() to signal - the WorkerThreads to terminate and join() - in order to wait until all worker threads - have terminated. - """ - _shuttingdown = False - def __init__(self, maxthreads=None): - """ init WorkerPool instance which may - create up to `maxthreads` worker threads. - """ - self.maxthreads = maxthreads - self._ready = {} - self._alive = {} - - def dispatch(self, func, *args, **kwargs): - """ return Reply object for the asynchronous dispatch - of the given func(*args, **kwargs) in a - separate worker thread. - """ - if self._shuttingdown: - raise IOError("WorkerPool is already shutting down") - try: - thread, _ = self._ready.popitem() - except KeyError: # pop from empty list - if self.maxthreads and len(self._alive) >= self.maxthreads: - raise IOError("can't create more than %d threads." % - (self.maxthreads,)) - thread = self._newthread() - return thread.send((func, args, kwargs)) - - def _newthread(self): - thread = WorkerThread(self) - self._alive[thread] = True - thread.start() - return thread - - def shutdown(self): - """ signal all worker threads to terminate. - call join() to wait until all threads termination. - """ - if not self._shuttingdown: - self._shuttingdown = True - for t in list(self._alive): - t.stop() - - def join(self, timeout=None): - """ wait until all worker threads have terminated. """ - current = threading.currentThread() - deadline = delta = None - if timeout is not None: - deadline = time.time() + timeout - for thread in list(self._alive): - if deadline: - delta = deadline - time.time() - if delta <= 0: - raise IOError("timeout while joining threads") - thread.join(timeout=delta) - if thread.isAlive(): - raise IOError("timeout while joining threads") - -if __name__ == '__channelexec__': - maxthreads = channel.receive() - execpool = WorkerPool(maxthreads=maxthreads) - gw = channel.gateway - channel.send("ok") - gw._trace("instantiated thread work pool maxthreads=%s" %(maxthreads,)) - while 1: - gw._trace("waiting for new exec task") - task = gw._execqueue.get() - if task is None: - gw._trace("thread-dispatcher got None, exiting") - execpool.shutdown() - execpool.join() - raise gw._StopExecLoop - gw._trace("dispatching exec task to thread pool") - execpool.dispatch(gw.executetask, task) diff --git a/remoto/lib/execnet/xspec.py b/remoto/lib/execnet/xspec.py index 549966a..c72f5b6 100644 --- a/remoto/lib/execnet/xspec.py +++ b/remoto/lib/execnet/xspec.py @@ -1,8 +1,6 @@ """ -(c) 2008-2009, holger krekel +(c) 2008-2013, holger krekel """ -import execnet - class XSpec: """ Execution Specification: key1=value1//key2=value2 ... * keys need to be unique within the specification scope @@ -12,7 +10,8 @@ class XSpec: * if no "=value" is given, assume a boolean True value """ # XXX allow customization, for only allow specific key names - popen = ssh = socket = python = chdir = nice = dont_write_bytecode = None + popen = ssh = socket = python = chdir = nice = \ + dont_write_bytecode = execmodel = None def __init__(self, string): self._spec = string