]> git-server-git.apps.pok.os.sepia.ceph.com Git - remoto.git/commitdiff
use newest execnet 1.2.0dev2
authorAlfredo Deza <alfredo@deza.pe>
Wed, 29 Jan 2014 17:40:24 +0000 (12:40 -0500)
committerAlfredo Deza <alfredo@deza.pe>
Wed, 29 Jan 2014 17:40:24 +0000 (12:40 -0500)
Signed-off-by: Alfredo Deza <alfredo@deza.pe>
14 files changed:
remoto/lib/execnet/__init__.py
remoto/lib/execnet/gateway.py
remoto/lib/execnet/gateway_base.py
remoto/lib/execnet/gateway_base.py.orig [new file with mode: 0644]
remoto/lib/execnet/gateway_bootstrap.py
remoto/lib/execnet/gateway_io.py
remoto/lib/execnet/gateway_socket.py
remoto/lib/execnet/multi.py
remoto/lib/execnet/rsync_remote.py
remoto/lib/execnet/script/shell.py [changed mode: 0644->0755]
remoto/lib/execnet/script/socketserver.py [changed mode: 0644->0755]
remoto/lib/execnet/script/socketserverservice.py
remoto/lib/execnet/threadpool.py [deleted file]
remoto/lib/execnet/xspec.py

index b9c684032c16de81948f0fb4925bb7a07db75c97..3fd5695ea9049e5117e48cf9d3a5c9634250b813 100644 (file)
@@ -3,7 +3,7 @@ execnet: pure python lib for connecting to local and remote Python Interpreters.
 
 (c) 2012, Holger Krekel and others
 """
-__version__ = '1.1.1-ad4'
+__version__ = '1.2.0.dev2'
 
 from . import apipkg
 
@@ -12,6 +12,7 @@ apipkg.initpkg(__name__, {
     'SocketGateway':    '.deprecated:SocketGateway',
     'SshGateway':       '.deprecated:SshGateway',
     'makegateway':      '.multi:makegateway',
+    'set_execmodel':    '.multi:set_execmodel',
     'HostNotFound':     '.gateway_bootstrap:HostNotFound',
     'RemoteError':      '.gateway_base:RemoteError',
     'TimeoutError':     '.gateway_base:TimeoutError',
@@ -26,17 +27,3 @@ apipkg.initpkg(__name__, {
     'dump':             '.gateway_base:dump',
     'DataFormatError':  '.gateway_base:DataFormatError',
 })
-
-# CHANGELOG
-#
-# 1.1.1-ad4: Use the newest execnet, includes SSH options
-#
-# 1.1-ad3: Catch more `TypeError` if the connection is closing but the channel attempts
-# to write. We now check is `struct.pack` is not None to proceed.
-# Issue: https://bitbucket.org/hpk42/execnet/issue/22/structpack-can-be-none-sometimes-spits
-#
-# 1.1-ad2: Allow for `sudo python` on local popen gateways
-# Issue: https://bitbucket.org/hpk42/execnet/issue/21/support-sudo-on-local-popen
-#
-# 1.1-ad1: `if case` to check if `Message` is None and avoid AttributeErrors
-# Issue: https://bitbucket.org/hpk42/execnet/issue/20/attributeerror-nonetype-object-has-no
index 152828037f91af76db2b99c1119e9480e2cd372e..e1d0e0781e3254d53b1037ad0e551a7518e344f0 100644 (file)
@@ -1,13 +1,12 @@
 """
 gateway code for initiating popen, socket and ssh connections.
-(c) 2004-2009, Holger Krekel and others
+(c) 2004-2013, Holger Krekel and others
 """
 
 import sys, os, inspect, types, linecache
 import textwrap
 import execnet
 from execnet.gateway_base import Message
-from execnet.gateway_io import Popen2IOMaster
 from execnet import gateway_base
 importdir = os.path.dirname(os.path.dirname(execnet.__file__))
 
@@ -31,8 +30,8 @@ class Gateway(gateway_base.BaseGateway):
         except AttributeError:
             r = "uninitialized"
             i = "no"
-        return "<%s id=%r %s, %s active channels>" %(
-                self.__class__.__name__, self.id, r, i)
+        return "<%s id=%r %s, %s model, %s active channels>" %(
+                self.__class__.__name__, self.id, r, self.execmodel.backend, i)
 
     def exit(self):
         """ trigger gateway exit.  Defer waiting for finishing
@@ -44,11 +43,12 @@ class Gateway(gateway_base.BaseGateway):
             self._trace("gateway already unregistered with group")
             return
         self._group._unregister(self)
-        self._trace("--> sending GATEWAY_TERMINATE")
         try:
+            self._trace("--> sending GATEWAY_TERMINATE")
             self._send(Message.GATEWAY_TERMINATE)
+            self._trace("--> io.close_write")
             self._io.close_write()
-        except IOError:
+        except (ValueError, EOFError, IOError):
             v = sys.exc_info()[1]
             self._trace("io-error: could not send termination sequence")
             self._trace(" exception: %r" % v)
@@ -73,7 +73,7 @@ class Gateway(gateway_base.BaseGateway):
 
     def hasreceiver(self):
         """ return True if gateway is able to receive data. """
-        return self._receiverthread.isAlive() # approxmimation
+        return self._receiverthread.running  # approxmimation
 
     def remote_status(self):
         """ return information object about remote execution status. """
@@ -121,18 +121,8 @@ class Gateway(gateway_base.BaseGateway):
         return channel
 
     def remote_init_threads(self, num=None):
-        """ start up to 'num' threads for subsequent
-            remote_exec() invocations to allow concurrent
-            execution.
-        """
-        if hasattr(self, '_remotechannelthread'):
-            raise IOError("remote threads already running")
-        from execnet import threadpool
-        source = inspect.getsource(threadpool)
-        self._remotechannelthread = self.remote_exec(source)
-        self._remotechannelthread.send(num)
-        status = self._remotechannelthread.receive()
-        assert status == "ok", status
+        """ DEPRECATED.  Is currently a NO-OPERATION already."""
+        print ("WARNING: remote_init_threads() is a no-operation in execnet-1.2")
 
 class RInfo:
     def __init__(self, kwargs):
index f754e9459e5f9a66e8dac723f3def71dba545a50..30e6e1acc325bbf00374ec5efe48cc837c0c3edf 100644 (file)
@@ -3,17 +3,18 @@ base execnet gateway code send to the other side for bootstrapping.
 
 NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
 
-(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, and others
+(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others
 """
+from __future__ import with_statement
 import sys, os, weakref
-import threading, traceback, struct
+import traceback, struct
 
 # NOTE that we want to avoid try/except style importing
 # to avoid setting sys.exc_info() during import
+#
 
 ISPY3 = sys.version_info >= (3, 0)
 if ISPY3:
-    import queue
     from io import BytesIO
     exec("def do_exec(co, loc): exec(co, loc)\n"
          "def reraise(cls, val, tb): raise val\n")
@@ -21,7 +22,6 @@ if ISPY3:
     _long_type = int
     from _thread import interrupt_main
 else:
-    import Queue as queue
     from StringIO import StringIO as BytesIO
     exec("def do_exec(co, loc): exec co in loc\n"
          "def reraise(cls, val, tb): raise cls, val, tb\n")
@@ -32,6 +32,255 @@ else:
     except ImportError:
         interrupt_main = None
 
+class EmptySemaphore:
+    acquire = release = lambda self: None
+
+def get_execmodel(backend):
+    if hasattr(backend, "backend"):
+        return backend
+    if backend == "thread":
+        importdef = {
+            'get_ident': ['thread::get_ident', '_thread::get_ident'],
+            '_start_new_thread': ['thread::start_new_thread',
+                                  '_thread::start_new_thread'],
+            'threading': ["threading",],
+            'queue': ["queue", "Queue"],
+            'sleep': ['time::sleep'],
+            'subprocess': ['subprocess'],
+            'socket': ['socket'],
+            '_fdopen': ['os::fdopen'],
+            '_lock': ['threading'],
+            '_event': ['threading'],
+        }
+        def exec_start(self, func, args=()):
+            self._start_new_thread(func, args)
+
+    elif backend == "eventlet":
+        importdef = {
+            'get_ident': ['eventlet.green.thread::get_ident'],
+            '_spawn_n': ['eventlet::spawn_n'],
+            'threading': ['eventlet.green.threading'],
+            'queue': ["eventlet.queue"],
+            'sleep': ['eventlet::sleep'],
+            'subprocess': ['eventlet.green.subprocess'],
+            'socket': ['eventlet.green.socket'],
+            '_fdopen': ['eventlet.green.os::fdopen'],
+            '_lock': ['eventlet.green.threading'],
+            '_event': ['eventlet.green.threading'],
+        }
+        def exec_start(self, func, args=()):
+            self._spawn_n(func, *args)
+    elif backend == "gevent":
+        importdef = {
+            'get_ident': ['gevent.thread::get_ident'],
+            '_spawn_n': ['gevent::spawn'],
+            'threading': ['threading'],
+            'queue': ["gevent.queue"],
+            'sleep': ['gevent::sleep'],
+            'subprocess': ['gevent.subprocess'],
+            'socket': ['gevent.socket'],
+            # XXX
+            '_fdopen': ['gevent.fileobject::FileObjectThread'],
+            '_lock': ['gevent.lock'],
+            '_event': ['gevent.event'],
+        }
+        def exec_start(self, func, args=()):
+            self._spawn_n(func, *args)
+    else:
+        raise ValueError("unknown execmodel %r" %(backend,))
+
+    class ExecModel:
+        def __init__(self, name):
+            self._importdef = importdef
+            self.backend = name
+            self._count = 0
+
+        def __repr__(self):
+            return "<ExecModel %r>" % self.backend
+
+        def __getattr__(self, name):
+            locs = self._importdef.get(name)
+            if locs is None:
+                raise AttributeError(name)
+            for loc in locs:
+                parts = loc.split("::")
+                loc = parts.pop(0)
+                try:
+                    mod = __import__(loc, None, None, "__doc__")
+                except ImportError:
+                    pass
+                else:
+                    if parts:
+                        mod = getattr(mod, parts[0])
+                    setattr(self, name, mod)
+                    return mod
+            raise AttributeError(name)
+
+        start = exec_start
+
+        def fdopen(self, fd, mode, bufsize=1):
+            return self._fdopen(fd, mode, bufsize)
+
+        def WorkerPool(self, size=None, hasprimary=False):
+            return WorkerPool(self, size, hasprimary=hasprimary)
+
+        def Semaphore(self, size=None):
+            if size is None:
+                return EmptySemaphore()
+            return self._lock.Semaphore(size)
+
+        def Lock(self):
+            return self._lock.RLock()
+
+        def RLock(self):
+            return self._lock.RLock()
+
+        def Event(self):
+            event = self._event.Event()
+            if sys.version_info < (2,7):
+                # patch wait function to return event state instead of None
+                real_wait = event.wait
+                def wait(timeout=None):
+                    real_wait(timeout=timeout)
+                    return event.isSet()
+                event.wait = wait
+            return event
+
+        def PopenPiped(self, args):
+            PIPE = self.subprocess.PIPE
+            return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
+
+
+    return ExecModel(backend)
+
+
+class Reply(object):
+    """ reply instances provide access to the result
+        of a function execution that got dispatched
+        through WorkerPool.spawn()
+    """
+    def __init__(self, task, threadmodel):
+        self.task = task
+        self._result_ready = threadmodel.Event()
+        self.running = True
+
+    def get(self, timeout=None):
+        """ get the result object from an asynchronous function execution.
+            if the function execution raised an exception,
+            then calling get() will reraise that exception
+            including its traceback.
+        """
+        self.waitfinish(timeout)
+        try:
+            return self._result
+        except AttributeError:
+            reraise(*(self._excinfo[:3]))  # noqa
+
+    def waitfinish(self, timeout=None):
+        if not self._result_ready.wait(timeout):
+            raise IOError("timeout waiting for %r" %(self.task, ))
+
+    def run(self):
+        func, args, kwargs = self.task
+        try:
+            try:
+                self._result = func(*args, **kwargs)
+            except:
+                self._excinfo = sys.exc_info()
+        finally:
+            self._result_ready.set()
+            self.running = False
+
+
+class WorkerPool(object):
+    """ A WorkerPool allows to spawn function executions
+        to threads, returning a reply object on which you
+        can ask for the result (and get exceptions reraised)
+    """
+    def __init__(self, execmodel, size=None, hasprimary=False):
+        """ by default allow unlimited number of spawns. """
+        self.execmodel = execmodel
+        self._size = size
+        self._running_lock = self.execmodel.Lock()
+        self._sem = self.execmodel.Semaphore(size)
+        self._running = set()
+        self._shutdown_event = self.execmodel.Event()
+        if hasprimary:
+            if self.execmodel.backend != "thread":
+                raise ValueError("hasprimary=True requires thread model")
+            self._primary_thread_event = self.execmodel.Event()
+
+    def integrate_as_primary_thread(self):
+        """ integrate the thread with which we are called as a primary
+        thread to dispatch to when spawn is called.
+        """
+        assert self.execmodel.backend == "thread", self.execmodel
+        # XXX insert check if we really are in the main thread
+        primary_thread_event = self._primary_thread_event
+        # interacts with code at REF1
+        while not self._shutdown_event.isSet():
+            primary_thread_event.wait()
+            func, args, kwargs = self._primary_thread_task
+            if func is None:  # waitall() woke us up to finish the loop
+                break
+            func(*args, **kwargs)
+            primary_thread_event.clear()
+
+    def shutdown(self):
+        self._shutdown_event.set()
+
+    def wait_for_shutdown(self, timeout=None):
+        return self._shutdown_event.wait(timeout=timeout)
+
+    def active_count(self):
+        return len(self._running)
+
+    def spawn(self, func, *args, **kwargs):
+        """ return Reply object for the asynchronous dispatch
+            of the given func(*args, **kwargs).
+        """
+        reply = Reply((func, args, kwargs), self.execmodel)
+        def run_and_release():
+            reply.run()
+            with self._running_lock:
+                self._running.remove(reply)
+                self._sem.release()
+                if not self._running:
+                    try:
+                        self._waitall_event.set()
+                    except AttributeError:
+                        pass
+        self._sem.acquire()
+        with self._running_lock:
+            self._running.add(reply)
+            # REF1 in 'thread' model we give priority to running in main thread
+            primary_thread_event = getattr(self, "_primary_thread_event", None)
+            if primary_thread_event is not None:
+                if not primary_thread_event.isSet():
+                    self._primary_thread_task = run_and_release, (), {}
+                    primary_thread_event.set()
+                    return reply
+            self.execmodel.start(run_and_release, ())
+        return reply
+
+    def waitall(self, timeout=None):
+        """ wait until all previosuly spawns have terminated. """
+        # if it exists signal primary_thread to finish its loop
+        self._primary_thread_task = None, None, None
+        try:
+            self._primary_thread_event.set()
+        except AttributeError:
+            pass
+        with self._running_lock:
+            if not self._running:
+                return True
+            # if a Reply still runs, we let run_and_release
+            # signal us -- note that we are still holding the
+            # _running_lock to avoid race conditions
+            self._waitall_event = self.execmodel.Event()
+        return self._waitall_event.wait(timeout=timeout)
+
+
 sysex = (KeyboardInterrupt, SystemExit)
 
 
@@ -68,7 +317,7 @@ else:
 class Popen2IO:
     error = (IOError, OSError, EOFError)
 
-    def __init__(self, outfile, infile):
+    def __init__(self, outfile, infile, execmodel):
         # we need raw byte streams
         self.outfile, self.infile = outfile, infile
         if sys.platform == "win32":
@@ -80,6 +329,7 @@ class Popen2IO:
                 pass
         self._read = getattr(infile, "buffer", infile).read
         self._write = getattr(outfile, "buffer", outfile).write
+        self.execmodel = execmodel
 
     def read(self, numbytes):
         """Read exactly 'numbytes' bytes from the pipe. """
@@ -117,6 +367,8 @@ class Message:
     def from_io(io):
         try:
             header = io.read(9) # type 1, channel 4, payload 4
+            if not header:
+                raise EOFError("empty read")
         except EOFError:
             e = sys.exc_info()[1]
             raise EOFError('couldnt load message header, ' + e.args[0])
@@ -124,9 +376,9 @@ class Message:
         return Message(msgtype, channel, io.read(payload))
 
     def to_io(self, io):
-        if struct.pack is not None:
-            header = struct.pack('!bii', self.msgcode, self.channelid, len(self.data))
-            io.write(header+self.data)
+        header = struct.pack('!bii', self.msgcode, self.channelid,
+                             len(self.data))
+        io.write(header+self.data)
 
     def received(self, gateway):
         self._types[self.msgcode](self, gateway)
@@ -153,20 +405,20 @@ class Message:
             return "<Message.%s channelid=%d %s>" %(name,
                         self.channelid, r)
 
+class GatewayReceivedTerminate(Exception):
+    """ Receiverthread got termination message. """
+
 def _setupmessages():
     def status(message, gateway):
         # we use the channelid to send back information
         # but don't instantiate a channel object
-        active_channels = gateway._channelfactory.channels()
-        numexec = 0
-        for ch in active_channels:
-            if getattr(ch, '_executing', False):
-                numexec += 1
-        d = {'execqsize': gateway._execqueue.qsize(),
-             'numchannels': len(active_channels),
-             'numexecuting': numexec
+        d = {'numchannels': len(gateway._channelfactory._channels),
+             'numexecuting': gateway._execpool.active_count(),
+             'execmodel': gateway.execmodel.backend,
         }
-        gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d))
+        gateway._send(Message.CHANNEL_DATA, message.channelid,
+                      dumps_internal(d))
+        gateway._send(Message.CHANNEL_CLOSE, message.channelid)
 
     def channel_exec(message, gateway):
         channel = gateway._channelfactory.new(message.channelid)
@@ -186,8 +438,7 @@ def _setupmessages():
         gateway._channelfactory._local_close(message.channelid, sendonly=True)
 
     def gateway_terminate(message, gateway):
-        gateway._terminate_execution()
-        raise SystemExit(0)
+        raise GatewayReceivedTerminate(gateway)
 
     def reconfigure(message, gateway):
         if message.channelid == 0:
@@ -234,7 +485,8 @@ class RemoteError(Exception):
     def warn(self):
         if self.formatted != INTERRUPT_TEXT:
             # XXX do this better
-            sys.stderr.write("Warning: unhandled %r\n" % (self,))
+            sys.stderr.write("[%s] Warning: unhandled %r\n"
+                             % (os.getpid(), self,))
 
 class TimeoutError(IOError):
     """ Exception indicating that a timeout was reached. """
@@ -255,9 +507,9 @@ class Channel(object):
         #XXX: defaults copied from Unserializer
         self._strconfig = getattr(gateway, '_strconfig', (True, False))
         self.id = id
-        self._items = queue.Queue()
+        self._items = self.gateway.execmodel.queue.Queue()
         self._closed = False
-        self._receiveclosed = threading.Event()
+        self._receiveclosed = self.gateway.execmodel.Event()
         self._remoteerrors = []
 
     def _trace(self, *msg):
@@ -274,9 +526,7 @@ class Channel(object):
             be called with the endmarker when the channel closes.
         """
         _callbacks = self.gateway._channelfactory._callbacks
-        _receivelock = self.gateway._receivelock
-        _receivelock.acquire()
-        try:
+        with self.gateway._receivelock:
             if self._items is None:
                 raise IOError("%r has callback already registered" %(self,))
             items = self._items
@@ -284,7 +534,7 @@ class Channel(object):
             while 1:
                 try:
                     olditem = items.get(block=False)
-                except queue.Empty:
+                except self.gateway.execmodel.queue.Empty:
                     if not (self._closed or self._receiveclosed.isSet()):
                         _callbacks[self.id] = (
                             callback,
@@ -300,8 +550,6 @@ class Channel(object):
                         break
                     else:
                         callback(olditem)
-        finally:
-            _receivelock.release()
 
     def __repr__(self):
         flag = self.isclosed() and "closed" or "open"
@@ -321,8 +569,12 @@ class Channel(object):
             # the remote channel is already in "deleted" state, nothing to do
             pass
         else:
+            # state transition "opened" --> "deleted"
+            # check if we are in the middle of interpreter shutdown
+            # in which case the process will go away and we probably
+            # don't need to try to send a closing or last message
+            # (and often it won't work anymore to send things out)
             if Message is not None:
-                # state transition "opened" --> "deleted"
                 if self._items is None:    # has_callback
                     msgcode = Message.CHANNEL_LAST_MESSAGE
                 else:
@@ -365,7 +617,8 @@ class Channel(object):
     def close(self, error=None):
         """ close down this channel with an optional error message.
             Note that closing of a channel tied to remote_exec happens
-            automatically at the end of execution and cannot be done explicitely.
+            automatically at the end of execution and cannot
+            be done explicitely.
         """
         if self._executing:
             raise IOError("cannot explicitly close channel within remote_exec")
@@ -380,7 +633,8 @@ class Channel(object):
             if not self._receiveclosed.isSet():
                 put = self.gateway._send
                 if error is not None:
-                    put(Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error))
+                    put(Message.CHANNEL_CLOSE_ERROR, self.id,
+                        dumps_internal(error))
                 else:
                     put(Message.CHANNEL_CLOSE, self.id)
                 self._trace("sent channel close message")
@@ -436,7 +690,7 @@ class Channel(object):
             raise IOError("cannot receive(), channel has receiver callback")
         try:
             x = itemqueue.get(timeout=timeout)
-        except queue.Empty:
+        except self.gateway.execmodel.queue.Empty:
             raise self.TimeoutError("no item after %r seconds" %(timeout))
         if x is ENDMARKER:
             itemqueue.put(x)  # for other receivers
@@ -472,7 +726,7 @@ class ChannelFactory(object):
     def __init__(self, gateway, startcount=1):
         self._channels = weakref.WeakValueDictionary()
         self._callbacks = {}
-        self._writelock = threading.Lock()
+        self._writelock = gateway.execmodel.Lock()
         self.gateway = gateway
         self.count = startcount
         self.finished = False
@@ -480,8 +734,7 @@ class ChannelFactory(object):
 
     def new(self, id=None):
         """ create a new Channel with 'id' (or create new id if None). """
-        self._writelock.acquire()
-        try:
+        with self._writelock:
             if self.finished:
                 raise IOError("connexion already closed: %s" % (self.gateway,))
             if id is None:
@@ -492,8 +745,6 @@ class ChannelFactory(object):
             except KeyError:
                 channel = self._channels[id] = Channel(self.gateway, id)
             return channel
-        finally:
-            self._writelock.release()
 
     def channels(self):
         return self._list(self._channels.values())
@@ -535,35 +786,33 @@ class ChannelFactory(object):
 
     def _local_receive(self, id, data):
         # executes in receiver thread
+        channel = self._channels.get(id)
         try:
-            callback, endmarker, strconfig= self._callbacks[id]
-            channel = self._channels.get(id)
+            callback, endmarker, strconfig = self._callbacks[id]
         except KeyError:
-            channel = self._channels.get(id)
             queue = channel and channel._items
             if queue is None:
                 pass    # drop data
             else:
-                queue.put(loads_internal(data, channel))
+                item = loads_internal(data, channel)
+                queue.put(item)
         else:
             try:
                 data = loads_internal(data, channel, strconfig)
                 callback(data)   # even if channel may be already closed
-            except KeyboardInterrupt:
-                raise
-            except:
+            except Exception:
                 excinfo = sys.exc_info()
-                self.gateway._trace("exception during callback: %s" % excinfo[1])
+                self.gateway._trace("exception during callback: %s" %
+                                    excinfo[1])
                 errortext = self.gateway._geterrortext(excinfo)
-                self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext))
+                self.gateway._send(Message.CHANNEL_CLOSE_ERROR,
+                                   id, dumps_internal(errortext))
                 self._local_close(id, errortext)
 
     def _finished_receiving(self):
-        self._writelock.acquire()
-        try:
+        self.gateway._trace("finished receiving")
+        with self._writelock:
             self.finished = True
-        finally:
-            self._writelock.release()
         for id in self._list(self._channels):
             self._local_close(id, sendonly=True)
         for id in self._list(self._callbacks):
@@ -632,64 +881,60 @@ class BaseGateway(object):
     _sysex = sysex
     id = "<slave>"
 
-    class _StopExecLoop(Exception):
-        pass
-
     def __init__(self, io, id, _startcount=2):
+        self.execmodel = io.execmodel
         self._io = io
         self.id = id
-        self._strconfig = Unserializer.py2str_as_py3str, Unserializer.py3str_as_py2str
+        self._strconfig = (Unserializer.py2str_as_py3str,
+                           Unserializer.py3str_as_py2str)
         self._channelfactory = ChannelFactory(self, _startcount)
-        self._receivelock = threading.RLock()
+        self._receivelock = self.execmodel.RLock()
         # globals may be NONE at process-termination
         self.__trace = trace
         self._geterrortext = geterrortext
+        self._receivepool = self.execmodel.WorkerPool(1)
 
     def _trace(self, *msg):
         self.__trace(self.id, *msg)
 
     def _initreceive(self):
-        self._receiverthread = threading.Thread(name="receiver",
-                                 target=self._thread_receiver)
-        self._receiverthread.setDaemon(1)
-        self._receiverthread.start()
+        self._receiverthread = self._receivepool.spawn(self._thread_receiver)
 
     def _thread_receiver(self):
-        self._trace("RECEIVERTHREAD: starting to run")
-        eof = False
+        def log(*msg):
+            self._trace("[receiver-thread]", *msg)
+
+        log("RECEIVERTHREAD: starting to run")
         io = self._io
         try:
             try:
                 while 1:
                     msg = Message.from_io(io)
-                    self._trace("received", msg)
-                    _receivelock = self._receivelock
-                    _receivelock.acquire()
-                    try:
+                    log("received", msg)
+                    with self._receivelock:
                         msg.received(self)
                         del msg
-                    finally:
-                        _receivelock.release()
-            except self._sysex:
-                self._trace("RECEIVERTHREAD: doing io.close_read()")
-                self._io.close_read()
+            except (KeyboardInterrupt, GatewayReceivedTerminate):
+                pass
             except EOFError:
-                self._trace("RECEIVERTHREAD: got EOFError")
-                self._trace("RECEIVERTHREAD: traceback was: ",
-                    self._geterrortext(self.exc_info()))
+                log("EOF without prior gateway termination message")
                 self._error = self.exc_info()[1]
-                eof = True
-            except:
-                self._trace("RECEIVERTHREAD", self._geterrortext(self.exc_info()))
+            except Exception:
+                log(self._geterrortext(self.exc_info()))
         finally:
             try:
-                self._trace('RECEIVERTHREAD', 'entering finalization')
-                if eof:
-                    self._terminate_execution()
+                log('entering finalization')
+                # wake up and terminate any execution waiting to receive
                 self._channelfactory._finished_receiving()
-                self._trace('RECEIVERTHREAD', 'leaving finalization')
-            except:
-                pass # XXX be silent at interp-shutdown
+                log('terminating execution')
+                self._terminate_execution()
+                log('closing read')
+                self._io.close_read()
+                log('closing write')
+                self._io.close_write()
+                log('leaving finalization')
+            except:  # be silent at shutdown
+                pass
 
     def _terminate_execution(self):
         pass
@@ -702,8 +947,8 @@ class BaseGateway(object):
         except (IOError, ValueError):
             e = sys.exc_info()[1]
             self._trace('failed to send', message, e)
-            raise
-
+            # ValueError might be because the IO is already closed
+            raise IOError("cannot send (already closed?)")
 
     def _local_schedulexec(self, channel, sourcetask):
         channel.close("execution disallowed")
@@ -719,55 +964,56 @@ class BaseGateway(object):
 
     def join(self, timeout=None):
         """ Wait for receiverthread to terminate. """
-        current = threading.currentThread()
-        if self._receiverthread.isAlive():
-            self._trace("joining receiver thread")
-            self._receiverthread.join(timeout)
-        else:
-            self._trace("gateway.join() called while receiverthread "
-                        "already finished")
+        self._trace("waiting for receiver thread to finish")
+        self._receiverthread.waitfinish()
 
 class SlaveGateway(BaseGateway):
+
     def _local_schedulexec(self, channel, sourcetask):
         sourcetask = loads_internal(sourcetask)
-        self._execqueue.put((channel, sourcetask))
+        self._execpool.spawn(self.executetask, ((channel, sourcetask)))
 
     def _terminate_execution(self):
         # called from receiverthread
-        self._trace("putting None to execqueue")
-        self._execqueue.put(None)
-        if interrupt_main:
-            self._trace("calling interrupt_main()")
-            interrupt_main()
-        self._execfinished.wait(10.0)
-        if not self._execfinished.isSet():
-            self._trace("execution did not finish in 10 secs, calling os._exit()")
-            os._exit(1)
-
-    def serve(self, joining=True):
+        self._trace("shutting down execution pool")
+        self._execpool.shutdown()
+        if not self._execpool.waitall(5.0):
+            self._trace("execution ongoing after 5 secs, trying interrupt_main")
+            # We try hard to terminate execution based on the assumption
+            # that there is only one gateway object running per-process.
+            if sys.platform != "win32":
+                self._trace("sending ourselves a SIGINT")
+                os.kill(os.getpid(), 2)  # send ourselves a SIGINT
+            elif interrupt_main is not None:
+                self._trace("calling interrupt_main()")
+                interrupt_main()
+            if not self._execpool.waitall(10.0):
+                self._trace("execution did not finish in 10 secs, "
+                            "calling os._exit()")
+                os._exit(1)
+
+    def serve(self):
+        trace = lambda msg: self._trace("[serve] " + msg)
+        hasprimary = self.execmodel.backend == "thread"
+        self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
+        trace("spawning receiver thread")
+        self._initreceive()
         try:
             try:
-                self._execqueue = queue.Queue()
-                self._execfinished = threading.Event()
-                self._initreceive()
-                while 1:
-                    item = self._execqueue.get()
-                    if item is None:
-                        break
-                    try:
-                        self.executetask(item)
-                    except self._StopExecLoop:
-                        break
+                if hasprimary:
+                    trace("integrating as main primary exec thread")
+                    self._execpool.integrate_as_primary_thread()
+                else:
+                    trace("waiting for execution to finish")
+                    self._execpool.wait_for_shutdown()
             finally:
-                self._execfinished.set()
-                self._trace("io.close_write()")
-                self._io.close_write()
-                self._trace("slavegateway.serve finished")
-            if joining:
-                self.join()
+                trace("execution finished")
+
+            trace("joining receiver thread")
+            self.join()
         except KeyboardInterrupt:
             # in the slave we can't really do anything sensible
-            self._trace("swallowing keyboardinterrupt in main-thread")
+            trace("swallowing keyboardinterrupt, serve finished")
 
     def executetask(self, item):
         try:
@@ -787,7 +1033,7 @@ class SlaveGateway(BaseGateway):
             channel._executing = True
             try:
                 co = compile(source+'\n', '<remote exec>', 'exec')
-                do_exec(co, loc)
+                do_exec(co, loc) # noqa
                 if call_name:
                     self._trace('calling %s(**%60r)' % (call_name, kwargs))
                     function = loc[call_name]
@@ -795,19 +1041,19 @@ class SlaveGateway(BaseGateway):
             finally:
                 channel._executing = False
                 self._trace("execution finished")
-        except self._StopExecLoop:
-            channel.close()
-            raise
         except KeyboardInterrupt:
             channel.close(INTERRUPT_TEXT)
             raise
         except:
             excinfo = self.exc_info()
-            self._trace("got exception: %s" % (excinfo[1],))
-            errortext = self._geterrortext(excinfo)
-            channel.close(errortext)
-        else:
-            channel.close()
+            if not isinstance(excinfo[1], EOFError):
+                if not channel.gateway._channelfactory.finished:
+                    self._trace("got exception: %r" % (excinfo[1],))
+                    errortext = self._geterrortext(excinfo)
+                    channel.close(errortext)
+                    return
+            self._trace("ignoring EOFError because receiving finished")
+        channel.close()
 
 #
 # Cross-Python pickling code, tested from test_serializer.py
@@ -1180,9 +1426,9 @@ class _Serializer(object):
         self._write(opcode.CHANNEL)
         self._write_int4(channel.id)
 
-def init_popen_io():
+def init_popen_io(execmodel):
     if not hasattr(os, 'dup'): # jython
-        io = Popen2IO(sys.stdout, sys.stdin)
+        io = Popen2IO(sys.stdout, sys.stdin, execmodel)
         import tempfile
         sys.stdin = tempfile.TemporaryFile('r')
         sys.stdout = tempfile.TemporaryFile('w')
@@ -1195,24 +1441,24 @@ def init_popen_io():
             else:
                 devnull = '/dev/null'
         # stdin
-        stdin  = os.fdopen(os.dup(0), 'r', 1)
+        stdin  = execmodel.fdopen(os.dup(0), 'r', 1)
         fd = os.open(devnull, os.O_RDONLY)
         os.dup2(fd, 0)
         os.close(fd)
 
         # stdout
-        stdout = os.fdopen(os.dup(1), 'w', 1)
+        stdout = execmodel.fdopen(os.dup(1), 'w', 1)
         fd = os.open(devnull, os.O_WRONLY)
         os.dup2(fd, 1)
 
         # stderr for win32
         if os.name == 'nt':
-            sys.stderr = os.fdopen(os.dup(2), 'w', 1)
+            sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
             os.dup2(fd, 2)
         os.close(fd)
-        io = Popen2IO(stdout, stdin)
-        sys.stdin = os.fdopen(0, 'r', 1)
-        sys.stdout = os.fdopen(1, 'w', 1)
+        io = Popen2IO(stdout, stdin, execmodel)
+        sys.stdin = execmodel.fdopen(0, 'r', 1)
+        sys.stdout = execmodel.fdopen(1, 'w', 1)
     return io
 
 def serve(io, id):
diff --git a/remoto/lib/execnet/gateway_base.py.orig b/remoto/lib/execnet/gateway_base.py.orig
new file mode 100644 (file)
index 0000000..9dc8dcd
--- /dev/null
@@ -0,0 +1,1474 @@
+"""
+base execnet gateway code send to the other side for bootstrapping.
+
+NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
+
+(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others
+"""
+from __future__ import with_statement
+import sys, os, weakref
+import traceback, struct
+
+# NOTE that we want to avoid try/except style importing
+# to avoid setting sys.exc_info() during import
+#
+
+ISPY3 = sys.version_info >= (3, 0)
+if ISPY3:
+    from io import BytesIO
+    exec("def do_exec(co, loc): exec(co, loc)\n"
+         "def reraise(cls, val, tb): raise val\n")
+    unicode = str
+    _long_type = int
+    from _thread import interrupt_main
+else:
+    from StringIO import StringIO as BytesIO
+    exec("def do_exec(co, loc): exec co in loc\n"
+         "def reraise(cls, val, tb): raise cls, val, tb\n")
+    bytes = str
+    _long_type = long
+    try:
+        from thread import interrupt_main
+    except ImportError:
+        interrupt_main = None
+
+class EmptySemaphore:
+    acquire = release = lambda self: None
+
+def get_execmodel(backend):
+    if hasattr(backend, "backend"):
+        return backend
+    if backend == "thread":
+        importdef = {
+            'get_ident': ['thread::get_ident', '_thread::get_ident'],
+            '_start_new_thread': ['thread::start_new_thread',
+                                  '_thread::start_new_thread'],
+            'threading': ["threading",],
+            'queue': ["queue", "Queue"],
+            'sleep': ['time::sleep'],
+            'subprocess': ['subprocess'],
+            'socket': ['socket'],
+            '_fdopen': ['os::fdopen'],
+            '_lock': ['threading'],
+            '_event': ['threading'],
+        }
+        def exec_start(self, func, args=()):
+            self._start_new_thread(func, args)
+
+    elif backend == "eventlet":
+        importdef = {
+            'get_ident': ['eventlet.green.thread::get_ident'],
+            '_spawn_n': ['eventlet::spawn_n'],
+            'threading': ['eventlet.green.threading'],
+            'queue': ["eventlet.queue"],
+            'sleep': ['eventlet::sleep'],
+            'subprocess': ['eventlet.green.subprocess'],
+            'socket': ['eventlet.green.socket'],
+            '_fdopen': ['eventlet.green.os::fdopen'],
+            '_lock': ['eventlet.green.threading'],
+            '_event': ['eventlet.green.threading'],
+        }
+        def exec_start(self, func, args=()):
+            self._spawn_n(func, *args)
+    elif backend == "gevent":
+        importdef = {
+            'get_ident': ['gevent.thread::get_ident'],
+            '_spawn_n': ['gevent::spawn'],
+            'threading': ['threading'],
+            'queue': ["gevent.queue"],
+            'sleep': ['gevent::sleep'],
+            'subprocess': ['gevent.subprocess'],
+            'socket': ['gevent.socket'],
+            # XXX
+            '_fdopen': ['gevent.fileobject::FileObjectThread'],
+            '_lock': ['gevent.lock'],
+            '_event': ['gevent.event'],
+        }
+        def exec_start(self, func, args=()):
+            self._spawn_n(func, *args)
+    else:
+        raise ValueError("unknown execmodel %r" %(backend,))
+
+    class ExecModel:
+        def __init__(self, name):
+            self._importdef = importdef
+            self.backend = name
+            self._count = 0
+
+        def __repr__(self):
+            return "<ExecModel %r>" % self.backend
+
+        def __getattr__(self, name):
+            locs = self._importdef.get(name)
+            if locs is None:
+                raise AttributeError(name)
+            for loc in locs:
+                parts = loc.split("::")
+                loc = parts.pop(0)
+                try:
+                    mod = __import__(loc, None, None, "__doc__")
+                except ImportError:
+                    pass
+                else:
+                    if parts:
+                        mod = getattr(mod, parts[0])
+                    setattr(self, name, mod)
+                    return mod
+            raise AttributeError(name)
+
+        start = exec_start
+
+        def fdopen(self, fd, mode, bufsize=1):
+            return self._fdopen(fd, mode, bufsize)
+
+        def WorkerPool(self, size=None, hasprimary=False):
+            return WorkerPool(self, size, hasprimary=hasprimary)
+
+        def Semaphore(self, size=None):
+            if size is None:
+                return EmptySemaphore()
+            return self._lock.Semaphore(size)
+
+        def Lock(self):
+            return self._lock.RLock()
+
+        def RLock(self):
+            return self._lock.RLock()
+
+        def Event(self):
+            event = self._event.Event()
+            if sys.version_info < (2,7):
+                # patch wait function to return event state instead of None
+                real_wait = event.wait
+                def wait(timeout=None):
+                    real_wait(timeout=timeout)
+                    return event.isSet()
+                event.wait = wait
+            return event
+
+        def PopenPiped(self, args):
+            PIPE = self.subprocess.PIPE
+            return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
+
+
+    return ExecModel(backend)
+
+
+class Reply(object):
+    """ reply instances provide access to the result
+        of a function execution that got dispatched
+        through WorkerPool.spawn()
+    """
+    def __init__(self, task, threadmodel):
+        self.task = task
+        self._result_ready = threadmodel.Event()
+        self.running = True
+
+    def get(self, timeout=None):
+        """ get the result object from an asynchronous function execution.
+            if the function execution raised an exception,
+            then calling get() will reraise that exception
+            including its traceback.
+        """
+        self.waitfinish(timeout)
+        try:
+            return self._result
+        except AttributeError:
+            reraise(*(self._excinfo[:3]))  # noqa
+
+    def waitfinish(self, timeout=None):
+        if not self._result_ready.wait(timeout):
+            raise IOError("timeout waiting for %r" %(self.task, ))
+
+    def run(self):
+        func, args, kwargs = self.task
+        try:
+            try:
+                self._result = func(*args, **kwargs)
+            except:
+                self._excinfo = sys.exc_info()
+        finally:
+            self._result_ready.set()
+            self.running = False
+
+
+class WorkerPool(object):
+    """ A WorkerPool allows to spawn function executions
+        to threads, returning a reply object on which you
+        can ask for the result (and get exceptions reraised)
+    """
+    def __init__(self, execmodel, size=None, hasprimary=False):
+        """ by default allow unlimited number of spawns. """
+        self.execmodel = execmodel
+        self._size = size
+        self._running_lock = self.execmodel.Lock()
+        self._sem = self.execmodel.Semaphore(size)
+        self._running = set()
+        self._shutdown_event = self.execmodel.Event()
+        if hasprimary:
+            if self.execmodel.backend != "thread":
+                raise ValueError("hasprimary=True requires thread model")
+            self._primary_thread_event = self.execmodel.Event()
+
+    def integrate_as_primary_thread(self):
+        """ integrate the thread with which we are called as a primary
+        thread to dispatch to when spawn is called.
+        """
+        assert self.execmodel.backend == "thread", self.execmodel
+        # XXX insert check if we really are in the main thread
+        primary_thread_event = self._primary_thread_event
+        # interacts with code at REF1
+        while not self._shutdown_event.isSet():
+            primary_thread_event.wait()
+            func, args, kwargs = self._primary_thread_task
+            if func is None:  # waitall() woke us up to finish the loop
+                break
+            func(*args, **kwargs)
+            primary_thread_event.clear()
+
+    def shutdown(self):
+        self._shutdown_event.set()
+
+    def wait_for_shutdown(self, timeout=None):
+        return self._shutdown_event.wait(timeout=timeout)
+
+    def active_count(self):
+        return len(self._running)
+
+    def spawn(self, func, *args, **kwargs):
+        """ return Reply object for the asynchronous dispatch
+            of the given func(*args, **kwargs).
+        """
+        reply = Reply((func, args, kwargs), self.execmodel)
+        def run_and_release():
+            reply.run()
+            with self._running_lock:
+                self._running.remove(reply)
+                self._sem.release()
+                if not self._running:
+                    try:
+                        self._waitall_event.set()
+                    except AttributeError:
+                        pass
+        self._sem.acquire()
+        with self._running_lock:
+            self._running.add(reply)
+            # REF1 in 'thread' model we give priority to running in main thread
+            primary_thread_event = getattr(self, "_primary_thread_event", None)
+            if primary_thread_event is not None:
+                if not primary_thread_event.isSet():
+                    self._primary_thread_task = run_and_release, (), {}
+                    primary_thread_event.set()
+                    return reply
+            self.execmodel.start(run_and_release, ())
+        return reply
+
+    def waitall(self, timeout=None):
+        """ wait until all previosuly spawns have terminated. """
+        # if it exists signal primary_thread to finish its loop
+        self._primary_thread_task = None, None, None
+        try:
+            self._primary_thread_event.set()
+        except AttributeError:
+            pass
+        with self._running_lock:
+            if not self._running:
+                return True
+            # if a Reply still runs, we let run_and_release
+            # signal us -- note that we are still holding the
+            # _running_lock to avoid race conditions
+            self._waitall_event = self.execmodel.Event()
+        return self._waitall_event.wait(timeout=timeout)
+
+
+sysex = (KeyboardInterrupt, SystemExit)
+
+
+DEBUG = os.environ.get('EXECNET_DEBUG')
+pid = os.getpid()
+if DEBUG == '2':
+    def trace(*msg):
+        try:
+            line = " ".join(map(str, msg))
+            sys.stderr.write("[%s] %s\n" % (pid, line))
+            sys.stderr.flush()
+        except Exception:
+            pass # nothing we can do, likely interpreter-shutdown
+elif DEBUG:
+    import tempfile, os.path
+    fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid)
+    #sys.stderr.write("execnet-debug at %r" %(fn,))
+    debugfile = open(fn, 'w')
+    def trace(*msg):
+        try:
+            line = " ".join(map(str, msg))
+            debugfile.write(line + "\n")
+            debugfile.flush()
+        except Exception:
+            try:
+                v = sys.exc_info()[1]
+                sys.stderr.write(
+                    "[%s] exception during tracing: %r\n" % (pid, v))
+            except Exception:
+                pass # nothing we can do, likely interpreter-shutdown
+else:
+    notrace = trace = lambda *msg: None
+
+class Popen2IO:
+    error = (IOError, OSError, EOFError)
+
+    def __init__(self, outfile, infile, execmodel):
+        # we need raw byte streams
+        self.outfile, self.infile = outfile, infile
+        if sys.platform == "win32":
+            import msvcrt
+            try:
+                msvcrt.setmode(infile.fileno(), os.O_BINARY)
+                msvcrt.setmode(outfile.fileno(), os.O_BINARY)
+            except (AttributeError, IOError):
+                pass
+        self._read = getattr(infile, "buffer", infile).read
+        self._write = getattr(outfile, "buffer", outfile).write
+        self.execmodel = execmodel
+
+    def read(self, numbytes):
+        """Read exactly 'numbytes' bytes from the pipe. """
+        # a file in non-blocking mode may return less bytes, so we loop
+        buf = bytes()
+        while numbytes > len(buf):
+            data = self._read(numbytes-len(buf))
+            if not data:
+                raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf)))
+            buf += data
+        return buf
+
+    def write(self, data):
+        """write out all data bytes. """
+        assert isinstance(data, bytes)
+        self._write(data)
+        self.outfile.flush()
+
+    def close_read(self):
+        self.infile.close()
+
+    def close_write(self):
+        self.outfile.close()
+
+class Message:
+    """ encapsulates Messages and their wire protocol. """
+    _types = []
+
+    def __init__(self, msgcode, channelid=0, data=''):
+        self.msgcode = msgcode
+        self.channelid = channelid
+        self.data = data
+
+    @staticmethod
+    def from_io(io):
+        try:
+            header = io.read(9) # type 1, channel 4, payload 4
+            if not header:
+                raise EOFError("empty read")
+        except EOFError:
+            e = sys.exc_info()[1]
+            raise EOFError('couldnt load message header, ' + e.args[0])
+        msgtype, channel, payload = struct.unpack('!bii', header)
+        return Message(msgtype, channel, io.read(payload))
+
+    def to_io(self, io):
+        header = struct.pack('!bii', self.msgcode, self.channelid,
+                             len(self.data))
+        io.write(header+self.data)
+
+    def received(self, gateway):
+        self._types[self.msgcode](self, gateway)
+
+    def __repr__(self):
+        class FakeChannel(object):
+            _strconfig = False, False # never transform, never fail
+            def __init__(self, id):
+                self.id = id
+            def __repr__(self):
+                return '<Channel %s>' % self.id
+        FakeChannel.new = FakeChannel
+        FakeChannel.gateway = FakeChannel
+        name = self._types[self.msgcode].__name__.upper()
+        try:
+            data = loads_internal(self.data, FakeChannel)
+        except LoadError:
+            data = self.data
+        r = repr(data)
+        if len(r) > 90:
+            return "<Message.%s channelid=%d len=%d>" %(name,
+                        self.channelid, len(r))
+        else:
+            return "<Message.%s channelid=%d %s>" %(name,
+                        self.channelid, r)
+
+def _setupmessages():
+    def status(message, gateway):
+        # we use the channelid to send back information
+        # but don't instantiate a channel object
+        d = {'numchannels': len(gateway._channelfactory._channels),
+             'numexecuting': gateway._execpool.active_count(),
+             'execmodel': gateway.execmodel.backend,
+        }
+        gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d))
+        gateway._send(Message.CHANNEL_CLOSE, message.channelid)
+
+    def channel_exec(message, gateway):
+        channel = gateway._channelfactory.new(message.channelid)
+        gateway._local_schedulexec(channel=channel,sourcetask=message.data)
+
+    def channel_data(message, gateway):
+        gateway._channelfactory._local_receive(message.channelid, message.data)
+
+    def channel_close(message, gateway):
+        gateway._channelfactory._local_close(message.channelid)
+
+    def channel_close_error(message, gateway):
+        remote_error = RemoteError(loads_internal(message.data))
+        gateway._channelfactory._local_close(message.channelid, remote_error)
+
+    def channel_last_message(message, gateway):
+        gateway._channelfactory._local_close(message.channelid, sendonly=True)
+
+    def gateway_terminate(message, gateway):
+        # wake up and terminate any execution waiting to receive something
+        gateway._channelfactory._finished_receiving()
+        # then try harder to terminate execution
+        gateway._terminate_execution()
+
+    def reconfigure(message, gateway):
+        if message.channelid == 0:
+            target = gateway
+        else:
+            target = gateway._channelfactory.new(message.channelid)
+        target._strconfig = loads_internal(message.data, gateway)
+
+    types = [
+        status, reconfigure, gateway_terminate,
+        channel_exec, channel_data, channel_close,
+        channel_close_error, channel_last_message,
+    ]
+    for i, handler in enumerate(types):
+        Message._types.append(handler)
+        setattr(Message, handler.__name__.upper(), i)
+
+_setupmessages()
+
+def geterrortext(excinfo,
+    format_exception=traceback.format_exception, sysex=sysex):
+    try:
+        l = format_exception(*excinfo)
+        errortext = "".join(l)
+    except sysex:
+        raise
+    except:
+        errortext = '%s: %s' % (excinfo[0].__name__,
+                                excinfo[1])
+    return errortext
+
+class RemoteError(Exception):
+    """ Exception containing a stringified error from the other side. """
+    def __init__(self, formatted):
+        self.formatted = formatted
+        Exception.__init__(self)
+
+    def __str__(self):
+        return self.formatted
+
+    def __repr__(self):
+        return "%s: %s" %(self.__class__.__name__, self.formatted)
+
+    def warn(self):
+        if self.formatted != INTERRUPT_TEXT:
+            # XXX do this better
+            sys.stderr.write("[%s] Warning: unhandled %r\n"
+                             % (os.getpid(), self,))
+
+class TimeoutError(IOError):
+    """ Exception indicating that a timeout was reached. """
+
+
+NO_ENDMARKER_WANTED = object()
+
+class Channel(object):
+    """Communication channel between two Python Interpreter execution points."""
+    RemoteError = RemoteError
+    TimeoutError = TimeoutError
+    Message = Message
+    _INTERNALWAKEUP = 1000
+    _executing = False
+
+    def __init__(self, gateway, id):
+        assert isinstance(id, int)
+        self.gateway = gateway
+        #XXX: defaults copied from Unserializer
+        self._strconfig = getattr(gateway, '_strconfig', (True, False))
+        self.id = id
+        self._items = self.gateway.execmodel.queue.Queue()
+        self._closed = False
+        self._receiveclosed = self.gateway.execmodel.Event()
+        self._remoteerrors = []
+
+    def _trace(self, *msg):
+        self.gateway._trace(self.id, *msg)
+
+    def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
+        """ set a callback function for receiving items.
+
+            All already queued items will immediately trigger the callback.
+            Afterwards the callback will execute in the receiver thread
+            for each received data item and calls to ``receive()`` will
+            raise an error.
+            If an endmarker is specified the callback will eventually
+            be called with the endmarker when the channel closes.
+        """
+        _callbacks = self.gateway._channelfactory._callbacks
+        _receivelock = self.gateway._receivelock
+        _receivelock.acquire()
+        try:
+            if self._items is None:
+                raise IOError("%r has callback already registered" %(self,))
+            items = self._items
+            self._items = None
+            while 1:
+                try:
+                    olditem = items.get(block=False)
+                except self.gateway.execmodel.queue.Empty:
+                    if not (self._closed or self._receiveclosed.isSet()):
+                        _callbacks[self.id] = (
+                            callback,
+                            endmarker,
+                            self._strconfig,
+                        )
+                    break
+                else:
+                    if olditem is ENDMARKER:
+                        items.put(olditem) # for other receivers
+                        if endmarker is not NO_ENDMARKER_WANTED:
+                            callback(endmarker)
+                        break
+                    else:
+                        callback(olditem)
+        finally:
+            _receivelock.release()
+
+    def __repr__(self):
+        flag = self.isclosed() and "closed" or "open"
+        return "<Channel id=%d %s>" % (self.id, flag)
+
+    def __del__(self):
+        if self.gateway is None:   # can be None in tests
+            return
+        self._trace("channel.__del__")
+        # no multithreading issues here, because we have the last ref to 'self'
+        if self._closed:
+            # state transition "closed" --> "deleted"
+            for error in self._remoteerrors:
+                error.warn()
+        elif self._receiveclosed.isSet():
+            # state transition "sendonly" --> "deleted"
+            # the remote channel is already in "deleted" state, nothing to do
+            pass
+        else:
+            # state transition "opened" --> "deleted"
+            if self._items is None:    # has_callback
+                msgcode = self.Message.CHANNEL_LAST_MESSAGE
+            else:
+                msgcode = self.Message.CHANNEL_CLOSE
+            try:
+                self.gateway._send(msgcode, self.id)
+            except (IOError, ValueError): # ignore problems with sending
+                pass
+
+    def _getremoteerror(self):
+        try:
+            return self._remoteerrors.pop(0)
+        except IndexError:
+            try:
+                return self.gateway._error
+            except AttributeError:
+                pass
+            return None
+
+    #
+    # public API for channel objects
+    #
+    def isclosed(self):
+        """ return True if the channel is closed. A closed
+            channel may still hold items.
+        """
+        return self._closed
+
+    def makefile(self, mode='w', proxyclose=False):
+        """ return a file-like object.
+            mode can be 'w' or 'r' for writeable/readable files.
+            if proxyclose is true file.close() will also close the channel.
+        """
+        if mode == "w":
+            return ChannelFileWrite(channel=self, proxyclose=proxyclose)
+        elif mode == "r":
+            return ChannelFileRead(channel=self, proxyclose=proxyclose)
+        raise ValueError("mode %r not availabe" %(mode,))
+
+    def close(self, error=None):
+        """ close down this channel with an optional error message.
+            Note that closing of a channel tied to remote_exec happens
+            automatically at the end of execution and cannot
+            be done explicitely.
+        """
+        if self._executing:
+            raise IOError("cannot explicitly close channel within remote_exec")
+        if self._closed:
+            self.gateway._trace(self, "ignoring redundant call to close()")
+        if not self._closed:
+            # state transition "opened/sendonly" --> "closed"
+            # threads warning: the channel might be closed under our feet,
+            # but it's never damaging to send too many CHANNEL_CLOSE messages
+            # however, if the other side triggered a close already, we
+            # do not send back a closed message.
+            if not self._receiveclosed.isSet():
+                put = self.gateway._send
+                if error is not None:
+                    put(self.Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error))
+                else:
+                    put(self.Message.CHANNEL_CLOSE, self.id)
+                self._trace("sent channel close message")
+            if isinstance(error, RemoteError):
+                self._remoteerrors.append(error)
+            self._closed = True         # --> "closed"
+            self._receiveclosed.set()
+            queue = self._items
+            if queue is not None:
+                queue.put(ENDMARKER)
+            self.gateway._channelfactory._no_longer_opened(self.id)
+
+    def waitclose(self, timeout=None):
+        """ wait until this channel is closed (or the remote side
+        otherwise signalled that no more data was being sent).
+        The channel may still hold receiveable items, but not receive
+        any more after waitclose() has returned.  Exceptions from executing
+        code on the other side are reraised as local channel.RemoteErrors.
+        EOFError is raised if the reading-connection was prematurely closed,
+        which often indicates a dying process.
+        self.TimeoutError is raised after the specified number of seconds
+        (default is None, i.e. wait indefinitely).
+        """
+        self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
+        if not self._receiveclosed.isSet():
+            raise self.TimeoutError("Timeout after %r seconds" % timeout)
+        error = self._getremoteerror()
+        if error:
+            raise error
+
+    def send(self, item):
+        """sends the given item to the other side of the channel,
+        possibly blocking if the sender queue is full.
+        The item must be a simple python type and will be
+        copied to the other side by value.  IOError is
+        raised if the write pipe was prematurely closed.
+        """
+        if self.isclosed():
+            raise IOError("cannot send to %r" %(self,))
+        self.gateway._send(self.Message.CHANNEL_DATA, self.id, dumps_internal(item))
+
+    def receive(self, timeout=None):
+        """receive a data item that was sent from the other side.
+        timeout: None [default] blocked waiting.  A positive number
+        indicates the number of seconds after which a channel.TimeoutError
+        exception will be raised if no item was received.
+        Note that exceptions from the remotely executing code will be
+        reraised as channel.RemoteError exceptions containing
+        a textual representation of the remote traceback.
+        """
+        itemqueue = self._items
+        if itemqueue is None:
+            raise IOError("cannot receive(), channel has receiver callback")
+        try:
+            x = itemqueue.get(timeout=timeout)
+        except self.gateway.execmodel.queue.Empty:
+            raise self.TimeoutError("no item after %r seconds" %(timeout))
+        if x is ENDMARKER:
+            itemqueue.put(x)  # for other receivers
+            raise self._getremoteerror() or EOFError()
+        else:
+            return x
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        try:
+            return self.receive()
+        except EOFError:
+            raise StopIteration
+    __next__ = next
+
+
+    def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
+        """
+        set the string coercion for this channel
+        the default is to try to convert py2 str as py3 str,
+        but not to try and convert py3 str to py2 str
+        """
+        self._strconfig = (py2str_as_py3str, py3str_as_py2str)
+        data = dumps_internal(self._strconfig)
+        self.gateway._send(self.Message.RECONFIGURE, self.id, data=data)
+
+ENDMARKER = object()
+INTERRUPT_TEXT = "keyboard-interrupted"
+
+class ChannelFactory(object):
+    def __init__(self, gateway, startcount=1):
+        self._channels = weakref.WeakValueDictionary()
+        self._callbacks = {}
+        self._writelock = gateway.execmodel.Lock()
+        self.gateway = gateway
+        self.count = startcount
+        self.finished = False
+        self._list = list # needed during interp-shutdown
+
+    def new(self, id=None):
+        """ create a new Channel with 'id' (or create new id if None). """
+        self._writelock.acquire()
+        try:
+            if self.finished:
+                raise IOError("connexion already closed: %s" % (self.gateway,))
+            if id is None:
+                id = self.count
+                self.count += 2
+            try:
+                channel = self._channels[id]
+            except KeyError:
+                channel = self._channels[id] = Channel(self.gateway, id)
+            return channel
+        finally:
+            self._writelock.release()
+
+    def channels(self):
+        return self._list(self._channels.values())
+
+    #
+    # internal methods, called from the receiver thread
+    #
+    def _no_longer_opened(self, id):
+        try:
+            del self._channels[id]
+        except KeyError:
+            pass
+        try:
+            callback, endmarker, strconfig = self._callbacks.pop(id)
+        except KeyError:
+            pass
+        else:
+            if endmarker is not NO_ENDMARKER_WANTED:
+                callback(endmarker)
+
+    def _local_close(self, id, remoteerror=None, sendonly=False):
+        channel = self._channels.get(id)
+        if channel is None:
+            # channel already in "deleted" state
+            if remoteerror:
+                remoteerror.warn()
+            self._no_longer_opened(id)
+        else:
+            # state transition to "closed" state
+            if remoteerror:
+                channel._remoteerrors.append(remoteerror)
+            queue = channel._items
+            if queue is not None:
+                queue.put(ENDMARKER)
+            self._no_longer_opened(id)
+            if not sendonly: # otherwise #--> "sendonly"
+                channel._closed = True          # --> "closed"
+            channel._receiveclosed.set()
+
+    def _local_receive(self, id, data):
+        # executes in receiver thread
+        try:
+            callback, endmarker, strconfig= self._callbacks[id]
+            channel = self._channels.get(id)
+        except KeyError:
+            channel = self._channels.get(id)
+            queue = channel and channel._items
+            if queue is None:
+                pass    # drop data
+            else:
+                item = loads_internal(data, channel)
+                queue.put(item)
+        else:
+            try:
+                data = loads_internal(data, channel, strconfig)
+                callback(data)   # even if channel may be already closed
+            except KeyboardInterrupt:
+                raise
+            except:
+                excinfo = sys.exc_info()
+                self.gateway._trace("exception during callback: %s" % excinfo[1])
+                errortext = self.gateway._geterrortext(excinfo)
+                self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext))
+                self._local_close(id, errortext)
+
+    def _finished_receiving(self):
+        self.gateway._trace("finished receiving")
+        self._writelock.acquire()
+        try:
+            self.finished = True
+        finally:
+            self._writelock.release()
+        for id in self._list(self._channels):
+            self._local_close(id, sendonly=True)
+        for id in self._list(self._callbacks):
+            self._no_longer_opened(id)
+
+class ChannelFile(object):
+    def __init__(self, channel, proxyclose=True):
+        self.channel = channel
+        self._proxyclose = proxyclose
+
+    def isatty(self):
+        return False
+
+    def close(self):
+        if self._proxyclose:
+            self.channel.close()
+
+    def __repr__(self):
+        state = self.channel.isclosed() and 'closed' or 'open'
+        return '<ChannelFile %d %s>' %(self.channel.id, state)
+
+class ChannelFileWrite(ChannelFile):
+    def write(self, out):
+        self.channel.send(out)
+
+    def flush(self):
+        pass
+
+class ChannelFileRead(ChannelFile):
+    def __init__(self, channel, proxyclose=True):
+        super(ChannelFileRead, self).__init__(channel, proxyclose)
+        self._buffer = None
+
+    def read(self, n):
+        try:
+            if self._buffer is None:
+                self._buffer = self.channel.receive()
+            while len(self._buffer) < n:
+                self._buffer += self.channel.receive()
+        except EOFError:
+            self.close()
+        if self._buffer is None:
+            ret = ""
+        else:
+            ret = self._buffer[:n]
+            self._buffer = self._buffer[n:]
+        return ret
+
+    def readline(self):
+        if self._buffer is not None:
+            i = self._buffer.find("\n")
+            if i != -1:
+                return self.read(i+1)
+            line = self.read(len(self._buffer)+1)
+        else:
+            line = self.read(1)
+        while line and line[-1] != "\n":
+            c = self.read(1)
+            if not c:
+                break
+            line += c
+        return line
+
+class BaseGateway(object):
+    exc_info = sys.exc_info
+    _sysex = sysex
+    id = "<slave>"
+
+    def __init__(self, io, id, _startcount=2):
+        self.execmodel = io.execmodel
+        self._io = io
+        self.id = id
+        self._strconfig = (Unserializer.py2str_as_py3str,
+                           Unserializer.py3str_as_py2str)
+        self._channelfactory = ChannelFactory(self, _startcount)
+        self._receivelock = self.execmodel.RLock()
+        # globals may be NONE at process-termination
+        self.__trace = trace
+        self._geterrortext = geterrortext
+        self._workerpool = self.execmodel.WorkerPool(1)
+
+    def _trace(self, *msg):
+        self.__trace(self.id, *msg)
+
+    def _initreceive(self):
+        self._receiverthread = self._workerpool.spawn(self._thread_receiver)
+
+    def _thread_receiver(self):
+        def log(*msg):
+            self._trace("[receiver-thread]", *msg)
+        log("RECEIVERTHREAD: starting to run")
+        eof = False
+        io = self._io
+        try:
+            try:
+                while 1:
+                    msg = Message.from_io(io)
+                    log("received", msg)
+                    _receivelock = self._receivelock
+                    _receivelock.acquire()
+                    try:
+                        msg.received(self)
+                        del msg
+                    finally:
+                        _receivelock.release()
+            except self._sysex:
+                log("io.close_read()")
+                self._io.close_read()
+            except EOFError:
+                log("got EOF")
+                #log("traceback was: ", self._geterrortext(self.exc_info()))
+                self._error = self.exc_info()[1]
+                eof = True
+            except:
+                log(self._geterrortext(self.exc_info()))
+        finally:
+            try:
+                log('entering finalization')
+                # wake up any execution waiting to receive something
+                self._channelfactory._finished_receiving()
+                if eof:
+                    self._terminate_execution()
+                log('leaving finalization')
+            except:
+                pass # XXX be silent at interp-shutdown
+
+    def _terminate_execution(self):
+        pass
+
+    def _send(self, msgcode, channelid=0, data=bytes()):
+        message = Message(msgcode, channelid, data)
+        try:
+            message.to_io(self._io)
+            self._trace('sent', message)
+        except (IOError, ValueError):
+            e = sys.exc_info()[1]
+            self._trace('failed to send', message, e)
+            raise
+
+
+    def _local_schedulexec(self, channel, sourcetask):
+        channel.close("execution disallowed")
+
+    # _____________________________________________________________________
+    #
+    # High Level Interface
+    # _____________________________________________________________________
+    #
+    def newchannel(self):
+        """ return a new independent channel. """
+        return self._channelfactory.new()
+
+    def join(self, timeout=None):
+        """ Wait for receiverthread to terminate. """
+        self._trace("waiting for receiver thread to finish")
+        self._receiverthread.waitfinish()
+
+class SlaveGateway(BaseGateway):
+
+    def _local_schedulexec(self, channel, sourcetask):
+        sourcetask = loads_internal(sourcetask)
+        self._execpool.spawn(self.executetask, ((channel, sourcetask)))
+
+    def _terminate_execution(self):
+        # called from receiverthread
+        self._trace("shutting down execution pool")
+        self._execpool.shutdown()
+        if not self._execpool.waitall(1.0):
+            self._trace("execution ongoing, trying interrupt_main")
+            # We try hard to terminate execution based on the assumption
+            # that there is only one gateway object running per-process.
+            if sys.platform != "win32":
+                self._trace("sending ourselves a SIGINT")
+                os.kill(os.getpid(), 2)  # send ourselves a SIGINT
+            elif interrupt_main is not None:
+                self._trace("calling interrupt_main()")
+                interrupt_main()
+            if not self._execpool.waitall(10.0):
+                self._trace("execution did not finish in 10 secs, "
+                            "calling os._exit()")
+                os._exit(1)
+
+    def serve(self):
+        trace = lambda msg: self._trace("[serve] " + msg)
+        hasprimary = self.execmodel.backend == "thread"
+        self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
+        trace("spawning receiver thread")
+        self._initreceive()
+        try:
+            try:
+                if hasprimary:
+                    trace("integrating as main primary exec thread")
+                    self._execpool.integrate_as_primary_thread()
+                else:
+                    trace("waiting for execution to finish")
+                    self._execpool.wait_for_shutdown()
+            finally:
+                trace("execution finished, closing io write stream")
+                self._io.close_write()
+            trace("joining receiver thread")
+            self.join()
+        except KeyboardInterrupt:
+            # in the slave we can't really do anything sensible
+            trace("swallowing keyboardinterrupt, serve finished")
+
+    def executetask(self, item):
+        try:
+            channel, (source, call_name, kwargs) = item
+            if not ISPY3 and kwargs:
+                # some python2 versions do not accept unicode keyword params
+                # note: Unserializer generally turns py2-str to py3-str objects
+                newkwargs = {}
+                for name, value in kwargs.items():
+                    if isinstance(name, unicode):
+                        name = name.encode('ascii')
+                    newkwargs[name] = value
+                kwargs = newkwargs
+            loc = {'channel' : channel, '__name__': '__channelexec__'}
+            self._trace("execution starts[%s]: %s" %
+                            (channel.id, repr(source)[:50]))
+            channel._executing = True
+            try:
+                co = compile(source+'\n', '<remote exec>', 'exec')
+                do_exec(co, loc) # noqa
+                if call_name:
+                    self._trace('calling %s(**%60r)' % (call_name, kwargs))
+                    function = loc[call_name]
+                    function(channel, **kwargs)
+            finally:
+                channel._executing = False
+                self._trace("execution finished")
+        except KeyboardInterrupt:
+            channel.close(INTERRUPT_TEXT)
+            raise
+        except:
+            excinfo = self.exc_info()
+            if not isinstance(excinfo[1], EOFError):
+                if not channel.gateway._channelfactory.finished:
+                    self._trace("got exception: %r" % (excinfo[1],))
+                    errortext = self._geterrortext(excinfo)
+                    channel.close(errortext)
+                    return
+            self._trace("ignoring EOFError because receiving finished")
+        channel.close()
+
+#
+# Cross-Python pickling code, tested from test_serializer.py
+#
+
+class DataFormatError(Exception):
+    pass
+
+class DumpError(DataFormatError):
+    """Error while serializing an object."""
+
+class LoadError(DataFormatError):
+    """Error while unserializing an object."""
+
+if ISPY3:
+    def bchr(n):
+        return bytes([n])
+else:
+    bchr = chr
+
+DUMPFORMAT_VERSION = bchr(1)
+
+FOUR_BYTE_INT_MAX = 2147483647
+
+FLOAT_FORMAT = "!d"
+FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT)
+
+class _Stop(Exception):
+    pass
+
+class Unserializer(object):
+    num2func = {} # is filled after this class definition
+    py2str_as_py3str = True # True
+    py3str_as_py2str = False  # false means py2 will get unicode
+
+    def __init__(self, stream, channel_or_gateway=None, strconfig=None):
+        gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway)
+        strconfig = getattr(channel_or_gateway, '_strconfig', strconfig)
+        if strconfig:
+            self.py2str_as_py3str, self.py3str_as_py2str = strconfig
+        self.stream = stream
+        self.channelfactory = getattr(gateway, '_channelfactory', gateway)
+
+    def load(self, versioned=False):
+        if versioned:
+            ver = self.stream.read(1)
+            if ver != DUMPFORMAT_VERSION:
+                raise LoadError("wrong dumpformat version")
+        self.stack = []
+        try:
+            while True:
+                opcode = self.stream.read(1)
+                if not opcode:
+                    raise EOFError
+                try:
+                    loader = self.num2func[opcode]
+                except KeyError:
+                    raise LoadError("unkown opcode %r - "
+                        "wire protocol corruption?" % (opcode,))
+                loader(self)
+        except _Stop:
+            if len(self.stack) != 1:
+                raise LoadError("internal unserialization error")
+            return self.stack.pop(0)
+        else:
+            raise LoadError("didn't get STOP")
+
+    def load_none(self):
+        self.stack.append(None)
+
+    def load_true(self):
+        self.stack.append(True)
+
+    def load_false(self):
+        self.stack.append(False)
+
+    def load_int(self):
+        i = self._read_int4()
+        self.stack.append(i)
+
+    def load_longint(self):
+        s = self._read_byte_string()
+        self.stack.append(int(s))
+
+    if ISPY3:
+        load_long = load_int
+        load_longlong = load_longint
+    else:
+        def load_long(self):
+            i = self._read_int4()
+            self.stack.append(long(i))
+
+        def load_longlong(self):
+            l = self._read_byte_string()
+            self.stack.append(long(l))
+
+    def load_float(self):
+        binary = self.stream.read(FLOAT_FORMAT_SIZE)
+        self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0])
+
+    def _read_int4(self):
+        return struct.unpack("!i", self.stream.read(4))[0]
+
+    def _read_byte_string(self):
+        length = self._read_int4()
+        as_bytes = self.stream.read(length)
+        return as_bytes
+
+    def load_py3string(self):
+        as_bytes = self._read_byte_string()
+        if not ISPY3 and self.py3str_as_py2str:
+            # XXX Should we try to decode into latin-1?
+            self.stack.append(as_bytes)
+        else:
+            self.stack.append(as_bytes.decode("utf-8"))
+
+    def load_py2string(self):
+        as_bytes = self._read_byte_string()
+        if ISPY3 and self.py2str_as_py3str:
+            s = as_bytes.decode("latin-1")
+        else:
+            s = as_bytes
+        self.stack.append(s)
+
+    def load_bytes(self):
+        s = self._read_byte_string()
+        self.stack.append(s)
+
+    def load_unicode(self):
+        self.stack.append(self._read_byte_string().decode("utf-8"))
+
+    def load_newlist(self):
+        length = self._read_int4()
+        self.stack.append([None] * length)
+
+    def load_setitem(self):
+        if len(self.stack) < 3:
+            raise LoadError("not enough items for setitem")
+        value = self.stack.pop()
+        key = self.stack.pop()
+        self.stack[-1][key] = value
+
+    def load_newdict(self):
+        self.stack.append({})
+
+    def _load_collection(self, type_):
+        length = self._read_int4()
+        if length:
+            res = type_(self.stack[-length:])
+            del self.stack[-length:]
+            self.stack.append(res)
+        else:
+            self.stack.append(type_())
+
+    def load_buildtuple(self):
+        self._load_collection(tuple)
+
+    def load_set(self):
+        self._load_collection(set)
+
+    def load_frozenset(self):
+        self._load_collection(frozenset)
+
+    def load_stop(self):
+        raise _Stop
+
+    def load_channel(self):
+        id = self._read_int4()
+        newchannel = self.channelfactory.new(id)
+        self.stack.append(newchannel)
+
+# automatically build opcodes and byte-encoding
+
+class opcode:
+    """ container for name -> num mappings. """
+
+def _buildopcodes():
+    l = []
+    for name, func in Unserializer.__dict__.items():
+        if name.startswith("load_"):
+            opname = name[5:].upper()
+            l.append((opname, func))
+    l.sort()
+    for i,(opname, func) in enumerate(l):
+        assert i < 26, "xxx"
+        i = bchr(64+i)
+        Unserializer.num2func[i] = func
+        setattr(opcode, opname, i)
+
+_buildopcodes()
+
+def dumps(obj):
+    """ return a serialized bytestring of the given obj.
+
+    The obj and all contained objects must be of a builtin
+    python type (so nested dicts, sets, etc. are all ok but
+    not user-level instances).
+    """
+    return _Serializer().save(obj, versioned=True)
+
+def dump(byteio, obj):
+    """ write a serialized bytestring of the given obj to the given stream. """
+    _Serializer(write=byteio.write).save(obj, versioned=True)
+
+def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False):
+    """ return the object as deserialized from the given bytestring.
+
+    py2str_as_py3str: if true then string (str) objects previously
+                      dumped on Python2 will be loaded as Python3
+                      strings which really are text objects.
+    py3str_as_py2str: if true then string (str) objects previously
+                      dumped on Python3 will be loaded as Python2
+                      strings instead of unicode objects.
+
+    if the bytestring was dumped with an incompatible protocol
+    version or if the bytestring is corrupted, the
+    ``execnet.DataFormatError`` will be raised.
+    """
+    io = BytesIO(bytestring)
+    return load(io, py2str_as_py3str=py2str_as_py3str,
+                    py3str_as_py2str=py3str_as_py2str)
+
+def load(io, py2str_as_py3str=False, py3str_as_py2str=False):
+    """ derserialize an object form the specified stream.
+
+    Behaviour and parameters are otherwise the same as with ``loads``
+    """
+    strconfig=(py2str_as_py3str, py3str_as_py2str)
+    return Unserializer(io, strconfig=strconfig).load(versioned=True)
+
+def loads_internal(bytestring, channelfactory=None, strconfig=None):
+    io = BytesIO(bytestring)
+    return Unserializer(io, channelfactory, strconfig).load()
+
+def dumps_internal(obj):
+    return _Serializer().save(obj)
+
+
+class _Serializer(object):
+    _dispatch = {}
+
+    def __init__(self, write=None):
+        if write is None:
+            self._streamlist = []
+            write = self._streamlist.append
+        self._write = write
+
+    def save(self, obj, versioned=False):
+        # calling here is not re-entrant but multiple instances
+        # may write to the same stream because of the common platform
+        # atomic-write guaruantee (concurrent writes each happen atomicly)
+        if versioned:
+            self._write(DUMPFORMAT_VERSION)
+        self._save(obj)
+        self._write(opcode.STOP)
+        try:
+            streamlist = self._streamlist
+        except AttributeError:
+            return None
+        return type(streamlist[0])().join(streamlist)
+
+    def _save(self, obj):
+        tp = type(obj)
+        try:
+            dispatch = self._dispatch[tp]
+        except KeyError:
+            methodname = 'save_' + tp.__name__
+            meth = getattr(self.__class__, methodname, None)
+            if meth is None:
+                raise DumpError("can't serialize %s" % (tp,))
+            dispatch = self._dispatch[tp] = meth
+        dispatch(self, obj)
+
+    def save_NoneType(self, non):
+        self._write(opcode.NONE)
+
+    def save_bool(self, boolean):
+        if boolean:
+            self._write(opcode.TRUE)
+        else:
+            self._write(opcode.FALSE)
+
+    def save_bytes(self, bytes_):
+        self._write(opcode.BYTES)
+        self._write_byte_sequence(bytes_)
+
+    if ISPY3:
+        def save_str(self, s):
+            self._write(opcode.PY3STRING)
+            self._write_unicode_string(s)
+    else:
+        def save_str(self, s):
+            self._write(opcode.PY2STRING)
+            self._write_byte_sequence(s)
+
+        def save_unicode(self, s):
+            self._write(opcode.UNICODE)
+            self._write_unicode_string(s)
+
+    def _write_unicode_string(self, s):
+        try:
+            as_bytes = s.encode("utf-8")
+        except UnicodeEncodeError:
+            raise DumpError("strings must be utf-8 encodable")
+        self._write_byte_sequence(as_bytes)
+
+    def _write_byte_sequence(self, bytes_):
+        self._write_int4(len(bytes_), "string is too long")
+        self._write(bytes_)
+
+    def _save_integral(self, i, short_op, long_op):
+        if i <= FOUR_BYTE_INT_MAX:
+            self._write(short_op)
+            self._write_int4(i)
+        else:
+            self._write(long_op)
+            self._write_byte_sequence(str(i).rstrip("L").encode("ascii"))
+
+    def save_int(self, i):
+        self._save_integral(i, opcode.INT, opcode.LONGINT)
+
+    def save_long(self, l):
+        self._save_integral(l, opcode.LONG, opcode.LONGLONG)
+
+    def save_float(self, flt):
+        self._write(opcode.FLOAT)
+        self._write(struct.pack(FLOAT_FORMAT, flt))
+
+    def _write_int4(self, i, error="int must be less than %i" %
+                    (FOUR_BYTE_INT_MAX,)):
+        if i > FOUR_BYTE_INT_MAX:
+            raise DumpError(error)
+        self._write(struct.pack("!i", i))
+
+    def save_list(self, L):
+        self._write(opcode.NEWLIST)
+        self._write_int4(len(L), "list is too long")
+        for i, item in enumerate(L):
+            self._write_setitem(i, item)
+
+    def _write_setitem(self, key, value):
+        self._save(key)
+        self._save(value)
+        self._write(opcode.SETITEM)
+
+    def save_dict(self, d):
+        self._write(opcode.NEWDICT)
+        for key, value in d.items():
+            self._write_setitem(key, value)
+
+    def save_tuple(self, tup):
+        for item in tup:
+            self._save(item)
+        self._write(opcode.BUILDTUPLE)
+        self._write_int4(len(tup), "tuple is too long")
+
+    def _write_set(self, s, op):
+        for item in s:
+            self._save(item)
+        self._write(op)
+        self._write_int4(len(s), "set is too long")
+
+    def save_set(self, s):
+        self._write_set(s, opcode.SET)
+
+    def save_frozenset(self, s):
+        self._write_set(s, opcode.FROZENSET)
+
+    def save_Channel(self, channel):
+        self._write(opcode.CHANNEL)
+        self._write_int4(channel.id)
+
+def init_popen_io(execmodel):
+    if not hasattr(os, 'dup'): # jython
+        io = Popen2IO(sys.stdout, sys.stdin, execmodel)
+        import tempfile
+        sys.stdin = tempfile.TemporaryFile('r')
+        sys.stdout = tempfile.TemporaryFile('w')
+    else:
+        try:
+            devnull = os.devnull
+        except AttributeError:
+            if os.name == 'nt':
+                devnull = 'NUL'
+            else:
+                devnull = '/dev/null'
+        # stdin
+        stdin  = execmodel.fdopen(os.dup(0), 'r', 1)
+        fd = os.open(devnull, os.O_RDONLY)
+        os.dup2(fd, 0)
+        os.close(fd)
+
+        # stdout
+        stdout = execmodel.fdopen(os.dup(1), 'w', 1)
+        fd = os.open(devnull, os.O_WRONLY)
+        os.dup2(fd, 1)
+
+        # stderr for win32
+        if os.name == 'nt':
+            sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
+            os.dup2(fd, 2)
+        os.close(fd)
+        io = Popen2IO(stdout, stdin, execmodel)
+        sys.stdin = execmodel.fdopen(0, 'r', 1)
+        sys.stdout = execmodel.fdopen(1, 'w', 1)
+    return io
+
+def serve(io, id):
+    trace("creating slavegateway on %r" %(io,))
+    SlaveGateway(io=io, id=id, _startcount=2).serve()
index 971d058ac4be757d4c82dfcee001ce2074e26106..abd084a45e4325fe391fdecb60e950609fea6ab7 100644 (file)
@@ -17,20 +17,22 @@ def bootstrap_popen(io, spec):
     sendexec(io,
         "import sys",
         "sys.path.insert(0, %r)" % importdir,
-        "from execnet.gateway_base import serve, init_popen_io",
+        "from execnet.gateway_base import serve, init_popen_io, get_execmodel",
         "sys.stdout.write('1')",
         "sys.stdout.flush()",
-        "serve(init_popen_io(), id='%s-slave')" % spec.id,
+        "execmodel = get_execmodel(%r)" % spec.execmodel,
+        "serve(init_popen_io(execmodel), id='%s-slave')" % spec.id,
     )
     s = io.read(1)
-    assert s == "1".encode('ascii')
+    assert s == "1".encode('ascii'), repr(s)
 
 
 def bootstrap_ssh(io, spec):
     try:
         sendexec(io,
             inspect.getsource(gateway_base),
-            'io = init_popen_io()',
+            "execmodel = get_execmodel(%r)" % spec.execmodel,
+            'io = init_popen_io(execmodel)',
             "io.write('1'.encode('ascii'))",
             "serve(io, id='%s-slave')" % spec.id,
         )
@@ -50,7 +52,10 @@ def bootstrap_socket(io, id):
         inspect.getsource(gateway_base),
         'import socket',
         inspect.getsource(SocketIO),
-        "io = SocketIO(clientsock)",
+        "try: execmodel",
+        "except NameError:",
+        "   execmodel = get_execmodel('thread')",
+        "io = SocketIO(clientsock, execmodel)",
         "io.write('1'.encode('ascii'))",
         "serve(io, id='%s-slave')" % id,
     )
index 242ac9e652f54af1401097e670ab451c7a793fb7..cc64d841cd80f7a1caefd386c6f626b664a93d70 100644 (file)
@@ -5,7 +5,6 @@ creates io instances used for gateway io
 """
 import os
 import sys
-from subprocess import Popen, PIPE
 
 try:
     from execnet.gateway_base import Popen2IO, Message
@@ -13,9 +12,9 @@ except ImportError:
     from __main__ import Popen2IO, Message
 
 class Popen2IOMaster(Popen2IO):
-    def __init__(self, args):
-        self.popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
-        Popen2IO.__init__(self, p.stdin, p.stdout)
+    def __init__(self, args, execmodel):
+        self.popen = p = execmodel.PopenPiped(args)
+        Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel)
 
     def wait(self):
         try:
@@ -86,32 +85,47 @@ def ssh_args(spec):
     args.append(remotecmd)
     return args
 
-
-def create_io(spec):
+def create_io(spec, execmodel):
     if spec.popen:
         args = popen_args(spec)
-        return Popen2IOMaster(args)
+        return Popen2IOMaster(args, execmodel)
     if spec.ssh:
         args = ssh_args(spec)
-        io = Popen2IOMaster(args)
+        io = Popen2IOMaster(args, execmodel)
         io.remoteaddress = spec.ssh
         return io
 
+#
+# Proxy Gateway handling code
+#
+# master: proxy initiator
+# forwarder: forwards between master and sub
+# sub: sub process that is proxied to the initiator
+
 RIO_KILL = 1
 RIO_WAIT = 2
 RIO_REMOTEADDRESS = 3
 RIO_CLOSE_WRITE = 4
 
-class RemoteIO(object):
-    def __init__(self, master_channel):
-        self.iochan = master_channel.gateway.newchannel()
-        self.controlchan = master_channel.gateway.newchannel()
-        master_channel.send((self.iochan, self.controlchan))
-        self.io = self.iochan.makefile('r')
-
+class ProxyIO(object):
+    """ A Proxy IO object allows to instantiate a Gateway
+    through another "via" gateway.  A master:ProxyIO object
+    provides an IO object effectively connected to the sub
+    via the forwarder.  To achieve this, master:ProxyIO interacts
+    with forwarder:serve_proxy_io() which itself
+    instantiates and interacts with the sub.
+    """
+    def __init__(self, proxy_channel, execmodel):
+        # after exchanging the control channel we use proxy_channel
+        # for messaging IO
+        self.controlchan = proxy_channel.gateway.newchannel()
+        proxy_channel.send(self.controlchan)
+        self.iochan = proxy_channel
+        self.iochan_file = self.iochan.makefile('r')
+        self.execmodel = execmodel
 
     def read(self, nbytes):
-        return self.io.read(nbytes)
+        return self.iochan_file.read(nbytes)
 
     def write(self, data):
         return self.iochan.send(data)
@@ -129,49 +143,68 @@ class RemoteIO(object):
     def wait(self):
         return self._controll(RIO_WAIT)
 
+    @property
+    def remoteaddress(self):
+        return self._controll(RIO_REMOTEADDRESS)
+
     def __repr__(self):
         return '<RemoteIO via %s>' % (self.iochan.gateway.id, )
 
-
-def serve_remote_io(channel):
-    class PseudoSpec(object):
-        def __getattr__(self, name):
-            return None
-    spec = PseudoSpec()
-    spec.__dict__.update(channel.receive())
-    io = create_io(spec)
-    io_chan, control_chan = channel.receive()
-    io_target = io_chan.makefile()
-
-    def iothread():
-        initial = io.read(1)
-        assert initial == '1'.encode('ascii')
-        channel.gateway._trace('initializing transfer io for', spec.id)
-        io_target.write(initial)
-        while True:
-            message = Message.from_io(io)
-            message.to_io(io_target)
-    import threading
-    thread = threading.Thread(name='io-forward-'+spec.id,
-                              target=iothread)
-    thread.setDaemon(True)
-    thread.start()
-
-    def iocallback(data):
-        io.write(data)
-    io_chan.setcallback(iocallback)
-
+class PseudoSpec:
+    def __init__(self, vars):
+        self.__dict__.update(vars)
+    def __getattr__(self, name):
+        return None
+
+def serve_proxy_io(proxy_channelX):
+    execmodel = proxy_channelX.gateway.execmodel
+    _trace = proxy_channelX.gateway._trace
+    tag = "serve_proxy_io:%s " % proxy_channelX.id
+    def log(*msg):
+        _trace(tag + msg[0], *msg[1:])
+    spec = PseudoSpec(proxy_channelX.receive())
+    # create sub IO object which we will proxy back to our proxy initiator
+    sub_io = create_io(spec, execmodel)
+    control_chan = proxy_channelX.receive()
+    log("got control chan", control_chan)
+
+    # read data from master, forward it to the sub
+    # XXX writing might block, thus blocking the receiver thread
+    def forward_to_sub(data):
+        log("forward data to sub, size %s" % len(data))
+        sub_io.write(data)
+    proxy_channelX.setcallback(forward_to_sub)
 
     def controll(data):
         if data==RIO_WAIT:
-            control_chan.send(io.wait())
+            control_chan.send(sub_io.wait())
         elif data==RIO_KILL:
-            control_chan.send(io.kill())
+            control_chan.send(sub_io.kill())
         elif data==RIO_REMOTEADDRESS:
-            control_chan.send(io.remoteaddress)
+            control_chan.send(sub_io.remoteaddress)
         elif data==RIO_CLOSE_WRITE:
-            control_chan.send(io.close_write())
+            control_chan.send(sub_io.close_write())
     control_chan.setcallback(controll)
 
+    # write data to the master coming from the sub
+    forward_to_master_file = proxy_channelX.makefile("w")
+
+    # read bootstrap byte from sub, send it on to master
+    log('reading bootstrap byte from sub', spec.id)
+    initial = sub_io.read(1)
+    assert initial == '1'.encode('ascii'), initial
+    log('forwarding bootstrap byte from sub', spec.id)
+    forward_to_master_file.write(initial)
+
+    # enter message forwarding loop
+    while True:
+        try:
+            message = Message.from_io(sub_io)
+        except EOFError:
+            log('EOF from sub, terminating proxying loop', spec.id)
+            break
+        message.to_io(forward_to_master_file)
+    # proxy_channelX will be closed from remote_exec's finalization code
+
 if __name__ == "__channelexec__":
-    serve_remote_io(channel)
+    serve_proxy_io(channel) # noqa
index 7b98bff447c49b50a823cc140cd9a1a2f5ada2d8..3bb0589cb5af7553e90000fe25371c64dc226068 100644 (file)
@@ -1,17 +1,14 @@
-import socket
-from execnet.gateway import Gateway
 from execnet.gateway_bootstrap import HostNotFound
-import os, sys, inspect
-
+import sys
 
 try: bytes
 except NameError: bytes = str
 
 class SocketIO:
-
-    error = (socket.error, EOFError)
-    def __init__(self, sock):
+    def __init__(self, sock, execmodel):
         self.sock = sock
+        self.execmodel = execmodel
+        socket = execmodel.socket
         try:
             sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
             sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
@@ -34,12 +31,12 @@ class SocketIO:
     def close_read(self):
         try:
             self.sock.shutdown(0)
-        except socket.error:
+        except self.execmodel.socket.error:
             pass
     def close_write(self):
         try:
             self.sock.shutdown(1)
-        except socket.error:
+        except self.execmodel.socket.error:
             pass
 
     def wait(self):
@@ -71,7 +68,7 @@ def start_via(gateway, hostport=None):
     return realhost, realport
 
 
-def create_io(spec, group):
+def create_io(spec, group, execmodel):
     assert not spec.python, (
         "socket: specifying python executables not yet supported")
     gateway_id = spec.installvia
@@ -81,11 +78,12 @@ def create_io(spec, group):
         host, port = spec.socket.split(":")
         port = int(port)
 
+    socket = execmodel.socket
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    io = SocketIO(sock)
+    io = SocketIO(sock, execmodel)
     io.remoteaddress = '%s:%d' % (host, port)
     try:
         sock.connect((host, port))
-    except socket.gaierror:
+    except execmodel.socket.gaierror:
         raise HostNotFound(str(sys.exc_info()[1]))
     return io
index 5cd76d68a3b7026dc97915ba40993dcc724870f7..1343f3c57293e067869f4623c2091f8e22f63244 100644 (file)
@@ -1,33 +1,66 @@
 """
 Managing Gateway Groups and interactions with multiple channels.
 
-(c) 2008-2009, Holger Krekel and others
+(c) 2008-2014, Holger Krekel and others
 """
 
-import os, sys, atexit
-import time
-import execnet
-from execnet.threadpool import WorkerPool
+import sys, atexit
 
 from execnet import XSpec
-from execnet import gateway, gateway_io, gateway_bootstrap
-from execnet.gateway_base import queue, reraise, trace, TimeoutError
+from execnet import gateway_io, gateway_bootstrap
+from execnet.gateway_base import reraise, trace, get_execmodel
+from threading import Lock
 
 NO_ENDMARKER_WANTED = object()
 
-class Group:
+class Group(object):
     """ Gateway Groups. """
     defaultspec = "popen"
-    def __init__(self, xspecs=()):
-        """ initialize group and make gateways as specified. """
-        # Gateways may evolve to become GC-collectable
+    def __init__(self, xspecs=(), execmodel="thread"):
+        """ initialize group and make gateways as specified.
+        execmodel can be 'thread' or 'eventlet'.
+        """
         self._gateways = []
         self._autoidcounter = 0
+        self._autoidlock = Lock()
         self._gateways_to_join = []
+        # we use the same execmodel for all of the Gateway objects
+        # we spawn on our side.  Probably we should not allow different
+        # execmodels between different groups but not clear.
+        # Note that "other side" execmodels may differ and is typically
+        # specified by the spec passed to makegateway.
+        self.set_execmodel(execmodel)
         for xspec in xspecs:
             self.makegateway(xspec)
         atexit.register(self._cleanup_atexit)
 
+    @property
+    def execmodel(self):
+        return self._execmodel
+
+    @property
+    def remote_execmodel(self):
+        return self._remote_execmodel
+
+    def set_execmodel(self, execmodel, remote_execmodel=None):
+        """ Set the execution model for local and remote site.
+
+        execmodel can be one of "thread" or "eventlet" (XXX gevent).
+        It determines the execution model for any newly created gateway.
+        If remote_execmodel is not specified it takes on the value
+        of execmodel.
+
+        NOTE: Execution models can only be set before any gateway is created.
+
+        """
+        if self._gateways:
+            raise ValueError("can not set execution models if "
+                             "gateways have been created already")
+        if remote_execmodel is None:
+            remote_execmodel = execmodel
+        self._execmodel = get_execmodel(execmodel)
+        self._remote_execmodel = get_execmodel(remote_execmodel)
+
     def __repr__(self):
         idgateways = [gw.id for gw in self]
         return "<Group %r>" %(idgateways)
@@ -66,6 +99,7 @@ class Group:
 
             id=<string>     specifies the gateway id
             python=<path>   specifies which python interpreter to execute
+            execmodel=model 'thread', 'eventlet', 'gevent' model for execution
             chdir=<path>    specifies to which directory to change
             nice=<path>     specifies process priority of new process
             env:NAME=value  specifies a remote environment variable setting.
@@ -77,19 +111,21 @@ class Group:
         if not isinstance(spec, XSpec):
             spec = XSpec(spec)
         self.allocate_id(spec)
+        if spec.execmodel is None:
+            spec.execmodel = self.remote_execmodel.backend
         if spec.via:
             assert not spec.socket
             master = self[spec.via]
-            channel = master.remote_exec(gateway_io)
-            channel.send(vars(spec))
-            io = gateway_io.RemoteIO(channel)
-            gw = gateway_bootstrap.bootstrap(io, spec)
+            proxy_channel = master.remote_exec(gateway_io)
+            proxy_channel.send(vars(spec))
+            proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel)
+            gw = gateway_bootstrap.bootstrap(proxy_io_master, spec)
         elif spec.popen or spec.ssh:
-            io = gateway_io.create_io(spec)
+            io = gateway_io.create_io(spec, execmodel=self.execmodel)
             gw = gateway_bootstrap.bootstrap(io, spec)
         elif spec.socket:
             from execnet import gateway_socket
-            io = gateway_socket.create_io(spec, self)
+            io = gateway_socket.create_io(spec, self, execmodel=self.execmodel)
             gw = gateway_bootstrap.bootstrap(io, spec)
         else:
             raise ValueError("no gateway type found for %r" % (spec._spec,))
@@ -115,13 +151,14 @@ class Group:
         return gw
 
     def allocate_id(self, spec):
-        """ allocate id for the given xspec object. """
+        """ (re-entrant) allocate id for the given xspec object. """
         if spec.id is None:
-            id = "gw" + str(self._autoidcounter)
-            self._autoidcounter += 1
-            if id in self:
-                raise ValueError("already have gateway with id %r" %(id,))
-            spec.id = id
+            with self._autoidlock:
+                id = "gw" + str(self._autoidcounter)
+                self._autoidcounter += 1
+                if id in self:
+                    raise ValueError("already have gateway with id %r" %(id,))
+                spec.id = id
 
     def _register(self, gateway):
         assert not hasattr(gateway, '_group')
@@ -147,7 +184,6 @@ class Group:
         """
 
         while self:
-            from execnet.threadpool import WorkerPool
             vias = {}
             for gw in self:
                 if gw.spec.via:
@@ -163,7 +199,7 @@ class Group:
                 trace("Gateways did not come down after timeout: %r" % gw)
                 gw._io.kill()
 
-            safe_terminate(timeout, [
+            safe_terminate(self.execmodel, timeout, [
                 (lambda: join_wait(gw), lambda: kill(gw))
                 for gw in self._gateways_to_join])
             self._gateways_to_join[:] = []
@@ -212,8 +248,10 @@ class MultiChannel:
         try:
             return self._queue
         except AttributeError:
-            self._queue = queue.Queue()
+            self._queue = None
             for ch in self._channels:
+                if self._queue is None:
+                    self._queue = ch.gateway.execmodel.queue.Queue()
                 def putreceived(obj, channel=ch):
                     self._queue.put((channel, obj))
                 if endmarker is NO_ENDMARKER_WANTED:
@@ -236,11 +274,11 @@ class MultiChannel:
 
 
 
-def safe_terminate(timeout, list_of_paired_functions):
-    workerpool = WorkerPool(len(list_of_paired_functions)*2)
+def safe_terminate(execmodel, timeout, list_of_paired_functions):
+    workerpool = execmodel.WorkerPool()
 
     def termkill(termfunc, killfunc):
-        termreply = workerpool.dispatch(termfunc)
+        termreply = workerpool.spawn(termfunc)
         try:
             termreply.get(timeout=timeout)
         except IOError:
@@ -248,14 +286,14 @@ def safe_terminate(timeout, list_of_paired_functions):
 
     replylist = []
     for termfunc, killfunc in list_of_paired_functions:
-        reply = workerpool.dispatch(termkill, termfunc, killfunc)
+        reply = workerpool.spawn(termkill, termfunc, killfunc)
         replylist.append(reply)
     for reply in replylist:
         reply.get()
-    workerpool.shutdown()
-    workerpool.join()
+    workerpool.waitall()
 
 
 default_group = Group()
 makegateway = default_group.makegateway
+set_execmodel = default_group.set_execmodel
 
index eee139c9c8f6e53d3d9fc3ad7868e655fed18036..a36b8ce4f12ad53f1004b8318745dafc8b2a2580 100644 (file)
@@ -1,5 +1,5 @@
 """
-(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
+(c) 2006-2013, Armin Rigo, Holger Krekel, Maciej Fijalkowski
 """
 def serve_rsync(channel):
     import os, stat, shutil
@@ -106,4 +106,4 @@ def serve_rsync(channel):
     channel.send(("done", None))
 
 if __name__ == '__channelexec__':
-    serve_rsync(channel)
+    serve_rsync(channel) # noqa
old mode 100644 (file)
new mode 100755 (executable)
index 9196f41..ecea167
@@ -10,7 +10,6 @@ try:
     clientsock
 except NameError:
     print("client side starting")
-    import sys
     host, port  = sys.argv[1].split(':')
     port = int(port)
     myself = open(os.path.abspath(sys.argv[0]), 'rU').read()
@@ -73,13 +72,13 @@ class promptagent(Thread):
                         sys.stderr=olderr
                 clientfile.flush()
             except EOFError:
-                e = sys.exc_info()[1]
+                #e = sys.exc_info()[1]
                 sys.stderr.write("connection close, prompt thread returns")
                 break
                 #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info()))
 
         self.clientsock.close()
 
-prompter = promptagent(clientsock)
+prompter = promptagent(clientsock)  # noqa
 prompter.start()
 print("promptagent - thread started")
old mode 100644 (file)
new mode 100755 (executable)
index 596597b..7b0b92a
 
 progname = 'socket_readline_exec_server-1.2'
 
-import sys, socket, os
-try:
-    import fcntl
-except ImportError:
-    fcntl = None
+import sys, os
+
+def get_fcntl():
+    try:
+        import fcntl
+    except ImportError:
+        fcntl = None
+    return fcntl
+
+fcntl = get_fcntl()
 
 debug = 0
 
@@ -47,19 +52,20 @@ def exec_from_one_connection(serversock):
     # rstrip so that we can use \r\n for telnet testing
     source = clientfile.readline().rstrip()
     clientfile.close()
-    g = {'clientsock' : clientsock, 'address' : address}
+    g = {'clientsock' : clientsock, 'address' : address, 'execmodel': execmodel}
     source = eval(source)
     if source:
         co = compile(source+'\n', source, 'exec')
         print_(progname, 'compiled source, executing')
         try:
-            exec_(co, g)
+            exec_(co, g) # noqa
         finally:
             print_(progname, 'finished executing code')
             # background thread might hold a reference to this (!?)
             #clientsock.close()
 
-def bind_and_listen(hostport):
+def bind_and_listen(hostport, execmodel):
+    socket = execmodel.socket
     if isinstance(hostport, str):
         host, port = hostport.split(':')
         hostport = (host, int(port))
@@ -102,11 +108,15 @@ if __name__ == '__main__':
         hostport = sys.argv[1]
     else:
         hostport = ':8888'
-    serversock = bind_and_listen(hostport)
+    from execnet.gateway_base import get_execmodel
+    execmodel = get_execmodel("thread")
+    serversock = bind_and_listen(hostport, execmodel)
     startserver(serversock, loop=False)
+
 elif __name__=='__channelexec__':
-    bindname = channel.receive()
-    sock = bind_and_listen(bindname)
+    execmodel = channel.gateway.execmodel # noqa
+    bindname = channel.receive() # noqa
+    sock = bind_and_listen(bindname, execmodel)
     port = sock.getsockname()
-    channel.send(port)
+    channel.send(port) # noqa
     startserver(sock)
index 0d208f89d87e3b7cafc1c28bbceb51ee15d5c480..562083c27e246e6b35c515fd7de005bafa37f392 100644 (file)
@@ -7,8 +7,6 @@ To use, run:
 """
 
 import sys
-import os
-import time
 import win32serviceutil
 import win32service
 import win32event
diff --git a/remoto/lib/execnet/threadpool.py b/remoto/lib/execnet/threadpool.py
deleted file mode 100644 (file)
index 812d16a..0000000
+++ /dev/null
@@ -1,183 +0,0 @@
-"""
-dispatching execution to threads
-
-(c) 2009, holger krekel
-"""
-import threading
-import time
-import sys
-
-# py2/py3 compatibility
-try:
-    import queue
-except ImportError:
-    import Queue as queue
-if sys.version_info >= (3,0):
-    exec ("def reraise(cls, val, tb): raise val")
-else:
-    exec ("def reraise(cls, val, tb): raise cls, val, tb")
-
-ERRORMARKER = object()
-
-class Reply(object):
-    """ reply instances provide access to the result
-        of a function execution that got dispatched
-        through WorkerPool.dispatch()
-    """
-    _excinfo = None
-    def __init__(self, task):
-        self.task = task
-        self._queue = queue.Queue()
-
-    def _set(self, result):
-        self._queue.put(result)
-
-    def _setexcinfo(self, excinfo):
-        self._excinfo = excinfo
-        self._queue.put(ERRORMARKER)
-
-    def get(self, timeout=None):
-        """ get the result object from an asynchronous function execution.
-            if the function execution raised an exception,
-            then calling get() will reraise that exception
-            including its traceback.
-        """
-        if self._queue is None:
-            raise EOFError("reply has already been delivered")
-        try:
-            result = self._queue.get(timeout=timeout)
-        except queue.Empty:
-            raise IOError("timeout waiting for %r" %(self.task, ))
-        if result is ERRORMARKER:
-            self._queue = None
-            excinfo = self._excinfo
-            reraise(excinfo[0], excinfo[1], excinfo[2])
-        return result
-
-class WorkerThread(threading.Thread):
-    def __init__(self, pool):
-        threading.Thread.__init__(self)
-        self._queue = queue.Queue()
-        self._pool = pool
-        self.setDaemon(1)
-
-    def _run_once(self):
-        reply = self._queue.get()
-        if reply is SystemExit:
-            return False
-        assert self not in self._pool._ready
-        task = reply.task
-        try:
-            func, args, kwargs = task
-            result = func(*args, **kwargs)
-        except (SystemExit, KeyboardInterrupt):
-            return False
-        except:
-            reply._setexcinfo(sys.exc_info())
-        else:
-            reply._set(result)
-        # at this point, reply, task and all other local variables go away
-        return True
-
-    def run(self):
-        try:
-            while self._run_once():
-                self._pool._ready[self] = True
-        finally:
-            del self._pool._alive[self]
-            try:
-                del self._pool._ready[self]
-            except KeyError:
-                pass
-
-    def send(self, task):
-        reply = Reply(task)
-        self._queue.put(reply)
-        return reply
-
-    def stop(self):
-        self._queue.put(SystemExit)
-
-class WorkerPool(object):
-    """ A WorkerPool allows to dispatch function executions
-        to threads.  Each Worker Thread is reused for multiple
-        function executions. The dispatching operation
-        takes care to create and dispatch to existing
-        threads.
-
-        You need to call shutdown() to signal
-        the WorkerThreads to terminate and join()
-        in order to wait until all worker threads
-        have terminated.
-    """
-    _shuttingdown = False
-    def __init__(self, maxthreads=None):
-        """ init WorkerPool instance which may
-            create up to `maxthreads` worker threads.
-        """
-        self.maxthreads = maxthreads
-        self._ready = {}
-        self._alive = {}
-
-    def dispatch(self, func, *args, **kwargs):
-        """ return Reply object for the asynchronous dispatch
-            of the given func(*args, **kwargs) in a
-            separate worker thread.
-        """
-        if self._shuttingdown:
-            raise IOError("WorkerPool is already shutting down")
-        try:
-            thread, _ = self._ready.popitem()
-        except KeyError: # pop from empty list
-            if self.maxthreads and len(self._alive) >= self.maxthreads:
-                raise IOError("can't create more than %d threads." %
-                              (self.maxthreads,))
-            thread = self._newthread()
-        return thread.send((func, args, kwargs))
-
-    def _newthread(self):
-        thread = WorkerThread(self)
-        self._alive[thread] = True
-        thread.start()
-        return thread
-
-    def shutdown(self):
-        """ signal all worker threads to terminate.
-            call join() to wait until all threads termination.
-        """
-        if not self._shuttingdown:
-            self._shuttingdown = True
-            for t in list(self._alive):
-                t.stop()
-
-    def join(self, timeout=None):
-        """ wait until all worker threads have terminated. """
-        current = threading.currentThread()
-        deadline = delta = None
-        if timeout is not None:
-            deadline = time.time() + timeout
-        for thread in list(self._alive):
-            if deadline:
-                delta = deadline - time.time()
-                if delta <= 0:
-                    raise IOError("timeout while joining threads")
-            thread.join(timeout=delta)
-            if thread.isAlive():
-                raise IOError("timeout while joining threads")
-
-if __name__ == '__channelexec__':
-    maxthreads = channel.receive()
-    execpool = WorkerPool(maxthreads=maxthreads)
-    gw = channel.gateway
-    channel.send("ok")
-    gw._trace("instantiated thread work pool maxthreads=%s" %(maxthreads,))
-    while 1:
-        gw._trace("waiting for new exec task")
-        task = gw._execqueue.get()
-        if task is None:
-            gw._trace("thread-dispatcher got None, exiting")
-            execpool.shutdown()
-            execpool.join()
-            raise gw._StopExecLoop
-        gw._trace("dispatching exec task to thread pool")
-        execpool.dispatch(gw.executetask, task)
index 549966a9b255fb6c82cebd269513108f1f94b439..c72f5b650f81e3e3e8e9059cd49ac6c51f3cf20a 100644 (file)
@@ -1,8 +1,6 @@
 """
-(c) 2008-2009, holger krekel
+(c) 2008-2013, holger krekel
 """
-import execnet
-
 class XSpec:
     """ Execution Specification: key1=value1//key2=value2 ...
         * keys need to be unique within the specification scope
@@ -12,7 +10,8 @@ class XSpec:
         * if no "=value" is given, assume a boolean True value
     """
     # XXX allow customization, for only allow specific key names
-    popen = ssh = socket = python = chdir = nice = dont_write_bytecode = None
+    popen = ssh = socket = python = chdir = nice = \
+            dont_write_bytecode = execmodel = None
 
     def __init__(self, string):
         self._spec = string