(c) 2012, Holger Krekel and others
"""
-__version__ = '1.1.1-ad4'
+__version__ = '1.2.0.dev2'
from . import apipkg
'SocketGateway': '.deprecated:SocketGateway',
'SshGateway': '.deprecated:SshGateway',
'makegateway': '.multi:makegateway',
+ 'set_execmodel': '.multi:set_execmodel',
'HostNotFound': '.gateway_bootstrap:HostNotFound',
'RemoteError': '.gateway_base:RemoteError',
'TimeoutError': '.gateway_base:TimeoutError',
'dump': '.gateway_base:dump',
'DataFormatError': '.gateway_base:DataFormatError',
})
-
-# CHANGELOG
-#
-# 1.1.1-ad4: Use the newest execnet, includes SSH options
-#
-# 1.1-ad3: Catch more `TypeError` if the connection is closing but the channel attempts
-# to write. We now check is `struct.pack` is not None to proceed.
-# Issue: https://bitbucket.org/hpk42/execnet/issue/22/structpack-can-be-none-sometimes-spits
-#
-# 1.1-ad2: Allow for `sudo python` on local popen gateways
-# Issue: https://bitbucket.org/hpk42/execnet/issue/21/support-sudo-on-local-popen
-#
-# 1.1-ad1: `if case` to check if `Message` is None and avoid AttributeErrors
-# Issue: https://bitbucket.org/hpk42/execnet/issue/20/attributeerror-nonetype-object-has-no
"""
gateway code for initiating popen, socket and ssh connections.
-(c) 2004-2009, Holger Krekel and others
+(c) 2004-2013, Holger Krekel and others
"""
import sys, os, inspect, types, linecache
import textwrap
import execnet
from execnet.gateway_base import Message
-from execnet.gateway_io import Popen2IOMaster
from execnet import gateway_base
importdir = os.path.dirname(os.path.dirname(execnet.__file__))
except AttributeError:
r = "uninitialized"
i = "no"
- return "<%s id=%r %s, %s active channels>" %(
- self.__class__.__name__, self.id, r, i)
+ return "<%s id=%r %s, %s model, %s active channels>" %(
+ self.__class__.__name__, self.id, r, self.execmodel.backend, i)
def exit(self):
""" trigger gateway exit. Defer waiting for finishing
self._trace("gateway already unregistered with group")
return
self._group._unregister(self)
- self._trace("--> sending GATEWAY_TERMINATE")
try:
+ self._trace("--> sending GATEWAY_TERMINATE")
self._send(Message.GATEWAY_TERMINATE)
+ self._trace("--> io.close_write")
self._io.close_write()
- except IOError:
+ except (ValueError, EOFError, IOError):
v = sys.exc_info()[1]
self._trace("io-error: could not send termination sequence")
self._trace(" exception: %r" % v)
def hasreceiver(self):
""" return True if gateway is able to receive data. """
- return self._receiverthread.isAlive() # approxmimation
+ return self._receiverthread.running # approxmimation
def remote_status(self):
""" return information object about remote execution status. """
return channel
def remote_init_threads(self, num=None):
- """ start up to 'num' threads for subsequent
- remote_exec() invocations to allow concurrent
- execution.
- """
- if hasattr(self, '_remotechannelthread'):
- raise IOError("remote threads already running")
- from execnet import threadpool
- source = inspect.getsource(threadpool)
- self._remotechannelthread = self.remote_exec(source)
- self._remotechannelthread.send(num)
- status = self._remotechannelthread.receive()
- assert status == "ok", status
+ """ DEPRECATED. Is currently a NO-OPERATION already."""
+ print ("WARNING: remote_init_threads() is a no-operation in execnet-1.2")
class RInfo:
def __init__(self, kwargs):
NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
-(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, and others
+(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others
"""
+from __future__ import with_statement
import sys, os, weakref
-import threading, traceback, struct
+import traceback, struct
# NOTE that we want to avoid try/except style importing
# to avoid setting sys.exc_info() during import
+#
ISPY3 = sys.version_info >= (3, 0)
if ISPY3:
- import queue
from io import BytesIO
exec("def do_exec(co, loc): exec(co, loc)\n"
"def reraise(cls, val, tb): raise val\n")
_long_type = int
from _thread import interrupt_main
else:
- import Queue as queue
from StringIO import StringIO as BytesIO
exec("def do_exec(co, loc): exec co in loc\n"
"def reraise(cls, val, tb): raise cls, val, tb\n")
except ImportError:
interrupt_main = None
+class EmptySemaphore:
+ acquire = release = lambda self: None
+
+def get_execmodel(backend):
+ if hasattr(backend, "backend"):
+ return backend
+ if backend == "thread":
+ importdef = {
+ 'get_ident': ['thread::get_ident', '_thread::get_ident'],
+ '_start_new_thread': ['thread::start_new_thread',
+ '_thread::start_new_thread'],
+ 'threading': ["threading",],
+ 'queue': ["queue", "Queue"],
+ 'sleep': ['time::sleep'],
+ 'subprocess': ['subprocess'],
+ 'socket': ['socket'],
+ '_fdopen': ['os::fdopen'],
+ '_lock': ['threading'],
+ '_event': ['threading'],
+ }
+ def exec_start(self, func, args=()):
+ self._start_new_thread(func, args)
+
+ elif backend == "eventlet":
+ importdef = {
+ 'get_ident': ['eventlet.green.thread::get_ident'],
+ '_spawn_n': ['eventlet::spawn_n'],
+ 'threading': ['eventlet.green.threading'],
+ 'queue': ["eventlet.queue"],
+ 'sleep': ['eventlet::sleep'],
+ 'subprocess': ['eventlet.green.subprocess'],
+ 'socket': ['eventlet.green.socket'],
+ '_fdopen': ['eventlet.green.os::fdopen'],
+ '_lock': ['eventlet.green.threading'],
+ '_event': ['eventlet.green.threading'],
+ }
+ def exec_start(self, func, args=()):
+ self._spawn_n(func, *args)
+ elif backend == "gevent":
+ importdef = {
+ 'get_ident': ['gevent.thread::get_ident'],
+ '_spawn_n': ['gevent::spawn'],
+ 'threading': ['threading'],
+ 'queue': ["gevent.queue"],
+ 'sleep': ['gevent::sleep'],
+ 'subprocess': ['gevent.subprocess'],
+ 'socket': ['gevent.socket'],
+ # XXX
+ '_fdopen': ['gevent.fileobject::FileObjectThread'],
+ '_lock': ['gevent.lock'],
+ '_event': ['gevent.event'],
+ }
+ def exec_start(self, func, args=()):
+ self._spawn_n(func, *args)
+ else:
+ raise ValueError("unknown execmodel %r" %(backend,))
+
+ class ExecModel:
+ def __init__(self, name):
+ self._importdef = importdef
+ self.backend = name
+ self._count = 0
+
+ def __repr__(self):
+ return "<ExecModel %r>" % self.backend
+
+ def __getattr__(self, name):
+ locs = self._importdef.get(name)
+ if locs is None:
+ raise AttributeError(name)
+ for loc in locs:
+ parts = loc.split("::")
+ loc = parts.pop(0)
+ try:
+ mod = __import__(loc, None, None, "__doc__")
+ except ImportError:
+ pass
+ else:
+ if parts:
+ mod = getattr(mod, parts[0])
+ setattr(self, name, mod)
+ return mod
+ raise AttributeError(name)
+
+ start = exec_start
+
+ def fdopen(self, fd, mode, bufsize=1):
+ return self._fdopen(fd, mode, bufsize)
+
+ def WorkerPool(self, size=None, hasprimary=False):
+ return WorkerPool(self, size, hasprimary=hasprimary)
+
+ def Semaphore(self, size=None):
+ if size is None:
+ return EmptySemaphore()
+ return self._lock.Semaphore(size)
+
+ def Lock(self):
+ return self._lock.RLock()
+
+ def RLock(self):
+ return self._lock.RLock()
+
+ def Event(self):
+ event = self._event.Event()
+ if sys.version_info < (2,7):
+ # patch wait function to return event state instead of None
+ real_wait = event.wait
+ def wait(timeout=None):
+ real_wait(timeout=timeout)
+ return event.isSet()
+ event.wait = wait
+ return event
+
+ def PopenPiped(self, args):
+ PIPE = self.subprocess.PIPE
+ return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
+
+
+ return ExecModel(backend)
+
+
+class Reply(object):
+ """ reply instances provide access to the result
+ of a function execution that got dispatched
+ through WorkerPool.spawn()
+ """
+ def __init__(self, task, threadmodel):
+ self.task = task
+ self._result_ready = threadmodel.Event()
+ self.running = True
+
+ def get(self, timeout=None):
+ """ get the result object from an asynchronous function execution.
+ if the function execution raised an exception,
+ then calling get() will reraise that exception
+ including its traceback.
+ """
+ self.waitfinish(timeout)
+ try:
+ return self._result
+ except AttributeError:
+ reraise(*(self._excinfo[:3])) # noqa
+
+ def waitfinish(self, timeout=None):
+ if not self._result_ready.wait(timeout):
+ raise IOError("timeout waiting for %r" %(self.task, ))
+
+ def run(self):
+ func, args, kwargs = self.task
+ try:
+ try:
+ self._result = func(*args, **kwargs)
+ except:
+ self._excinfo = sys.exc_info()
+ finally:
+ self._result_ready.set()
+ self.running = False
+
+
+class WorkerPool(object):
+ """ A WorkerPool allows to spawn function executions
+ to threads, returning a reply object on which you
+ can ask for the result (and get exceptions reraised)
+ """
+ def __init__(self, execmodel, size=None, hasprimary=False):
+ """ by default allow unlimited number of spawns. """
+ self.execmodel = execmodel
+ self._size = size
+ self._running_lock = self.execmodel.Lock()
+ self._sem = self.execmodel.Semaphore(size)
+ self._running = set()
+ self._shutdown_event = self.execmodel.Event()
+ if hasprimary:
+ if self.execmodel.backend != "thread":
+ raise ValueError("hasprimary=True requires thread model")
+ self._primary_thread_event = self.execmodel.Event()
+
+ def integrate_as_primary_thread(self):
+ """ integrate the thread with which we are called as a primary
+ thread to dispatch to when spawn is called.
+ """
+ assert self.execmodel.backend == "thread", self.execmodel
+ # XXX insert check if we really are in the main thread
+ primary_thread_event = self._primary_thread_event
+ # interacts with code at REF1
+ while not self._shutdown_event.isSet():
+ primary_thread_event.wait()
+ func, args, kwargs = self._primary_thread_task
+ if func is None: # waitall() woke us up to finish the loop
+ break
+ func(*args, **kwargs)
+ primary_thread_event.clear()
+
+ def shutdown(self):
+ self._shutdown_event.set()
+
+ def wait_for_shutdown(self, timeout=None):
+ return self._shutdown_event.wait(timeout=timeout)
+
+ def active_count(self):
+ return len(self._running)
+
+ def spawn(self, func, *args, **kwargs):
+ """ return Reply object for the asynchronous dispatch
+ of the given func(*args, **kwargs).
+ """
+ reply = Reply((func, args, kwargs), self.execmodel)
+ def run_and_release():
+ reply.run()
+ with self._running_lock:
+ self._running.remove(reply)
+ self._sem.release()
+ if not self._running:
+ try:
+ self._waitall_event.set()
+ except AttributeError:
+ pass
+ self._sem.acquire()
+ with self._running_lock:
+ self._running.add(reply)
+ # REF1 in 'thread' model we give priority to running in main thread
+ primary_thread_event = getattr(self, "_primary_thread_event", None)
+ if primary_thread_event is not None:
+ if not primary_thread_event.isSet():
+ self._primary_thread_task = run_and_release, (), {}
+ primary_thread_event.set()
+ return reply
+ self.execmodel.start(run_and_release, ())
+ return reply
+
+ def waitall(self, timeout=None):
+ """ wait until all previosuly spawns have terminated. """
+ # if it exists signal primary_thread to finish its loop
+ self._primary_thread_task = None, None, None
+ try:
+ self._primary_thread_event.set()
+ except AttributeError:
+ pass
+ with self._running_lock:
+ if not self._running:
+ return True
+ # if a Reply still runs, we let run_and_release
+ # signal us -- note that we are still holding the
+ # _running_lock to avoid race conditions
+ self._waitall_event = self.execmodel.Event()
+ return self._waitall_event.wait(timeout=timeout)
+
+
sysex = (KeyboardInterrupt, SystemExit)
class Popen2IO:
error = (IOError, OSError, EOFError)
- def __init__(self, outfile, infile):
+ def __init__(self, outfile, infile, execmodel):
# we need raw byte streams
self.outfile, self.infile = outfile, infile
if sys.platform == "win32":
pass
self._read = getattr(infile, "buffer", infile).read
self._write = getattr(outfile, "buffer", outfile).write
+ self.execmodel = execmodel
def read(self, numbytes):
"""Read exactly 'numbytes' bytes from the pipe. """
def from_io(io):
try:
header = io.read(9) # type 1, channel 4, payload 4
+ if not header:
+ raise EOFError("empty read")
except EOFError:
e = sys.exc_info()[1]
raise EOFError('couldnt load message header, ' + e.args[0])
return Message(msgtype, channel, io.read(payload))
def to_io(self, io):
- if struct.pack is not None:
- header = struct.pack('!bii', self.msgcode, self.channelid, len(self.data))
- io.write(header+self.data)
+ header = struct.pack('!bii', self.msgcode, self.channelid,
+ len(self.data))
+ io.write(header+self.data)
def received(self, gateway):
self._types[self.msgcode](self, gateway)
return "<Message.%s channelid=%d %s>" %(name,
self.channelid, r)
+class GatewayReceivedTerminate(Exception):
+ """ Receiverthread got termination message. """
+
def _setupmessages():
def status(message, gateway):
# we use the channelid to send back information
# but don't instantiate a channel object
- active_channels = gateway._channelfactory.channels()
- numexec = 0
- for ch in active_channels:
- if getattr(ch, '_executing', False):
- numexec += 1
- d = {'execqsize': gateway._execqueue.qsize(),
- 'numchannels': len(active_channels),
- 'numexecuting': numexec
+ d = {'numchannels': len(gateway._channelfactory._channels),
+ 'numexecuting': gateway._execpool.active_count(),
+ 'execmodel': gateway.execmodel.backend,
}
- gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d))
+ gateway._send(Message.CHANNEL_DATA, message.channelid,
+ dumps_internal(d))
+ gateway._send(Message.CHANNEL_CLOSE, message.channelid)
def channel_exec(message, gateway):
channel = gateway._channelfactory.new(message.channelid)
gateway._channelfactory._local_close(message.channelid, sendonly=True)
def gateway_terminate(message, gateway):
- gateway._terminate_execution()
- raise SystemExit(0)
+ raise GatewayReceivedTerminate(gateway)
def reconfigure(message, gateway):
if message.channelid == 0:
def warn(self):
if self.formatted != INTERRUPT_TEXT:
# XXX do this better
- sys.stderr.write("Warning: unhandled %r\n" % (self,))
+ sys.stderr.write("[%s] Warning: unhandled %r\n"
+ % (os.getpid(), self,))
class TimeoutError(IOError):
""" Exception indicating that a timeout was reached. """
#XXX: defaults copied from Unserializer
self._strconfig = getattr(gateway, '_strconfig', (True, False))
self.id = id
- self._items = queue.Queue()
+ self._items = self.gateway.execmodel.queue.Queue()
self._closed = False
- self._receiveclosed = threading.Event()
+ self._receiveclosed = self.gateway.execmodel.Event()
self._remoteerrors = []
def _trace(self, *msg):
be called with the endmarker when the channel closes.
"""
_callbacks = self.gateway._channelfactory._callbacks
- _receivelock = self.gateway._receivelock
- _receivelock.acquire()
- try:
+ with self.gateway._receivelock:
if self._items is None:
raise IOError("%r has callback already registered" %(self,))
items = self._items
while 1:
try:
olditem = items.get(block=False)
- except queue.Empty:
+ except self.gateway.execmodel.queue.Empty:
if not (self._closed or self._receiveclosed.isSet()):
_callbacks[self.id] = (
callback,
break
else:
callback(olditem)
- finally:
- _receivelock.release()
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
# the remote channel is already in "deleted" state, nothing to do
pass
else:
+ # state transition "opened" --> "deleted"
+ # check if we are in the middle of interpreter shutdown
+ # in which case the process will go away and we probably
+ # don't need to try to send a closing or last message
+ # (and often it won't work anymore to send things out)
if Message is not None:
- # state transition "opened" --> "deleted"
if self._items is None: # has_callback
msgcode = Message.CHANNEL_LAST_MESSAGE
else:
def close(self, error=None):
""" close down this channel with an optional error message.
Note that closing of a channel tied to remote_exec happens
- automatically at the end of execution and cannot be done explicitely.
+ automatically at the end of execution and cannot
+ be done explicitely.
"""
if self._executing:
raise IOError("cannot explicitly close channel within remote_exec")
if not self._receiveclosed.isSet():
put = self.gateway._send
if error is not None:
- put(Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error))
+ put(Message.CHANNEL_CLOSE_ERROR, self.id,
+ dumps_internal(error))
else:
put(Message.CHANNEL_CLOSE, self.id)
self._trace("sent channel close message")
raise IOError("cannot receive(), channel has receiver callback")
try:
x = itemqueue.get(timeout=timeout)
- except queue.Empty:
+ except self.gateway.execmodel.queue.Empty:
raise self.TimeoutError("no item after %r seconds" %(timeout))
if x is ENDMARKER:
itemqueue.put(x) # for other receivers
def __init__(self, gateway, startcount=1):
self._channels = weakref.WeakValueDictionary()
self._callbacks = {}
- self._writelock = threading.Lock()
+ self._writelock = gateway.execmodel.Lock()
self.gateway = gateway
self.count = startcount
self.finished = False
def new(self, id=None):
""" create a new Channel with 'id' (or create new id if None). """
- self._writelock.acquire()
- try:
+ with self._writelock:
if self.finished:
raise IOError("connexion already closed: %s" % (self.gateway,))
if id is None:
except KeyError:
channel = self._channels[id] = Channel(self.gateway, id)
return channel
- finally:
- self._writelock.release()
def channels(self):
return self._list(self._channels.values())
def _local_receive(self, id, data):
# executes in receiver thread
+ channel = self._channels.get(id)
try:
- callback, endmarker, strconfig= self._callbacks[id]
- channel = self._channels.get(id)
+ callback, endmarker, strconfig = self._callbacks[id]
except KeyError:
- channel = self._channels.get(id)
queue = channel and channel._items
if queue is None:
pass # drop data
else:
- queue.put(loads_internal(data, channel))
+ item = loads_internal(data, channel)
+ queue.put(item)
else:
try:
data = loads_internal(data, channel, strconfig)
callback(data) # even if channel may be already closed
- except KeyboardInterrupt:
- raise
- except:
+ except Exception:
excinfo = sys.exc_info()
- self.gateway._trace("exception during callback: %s" % excinfo[1])
+ self.gateway._trace("exception during callback: %s" %
+ excinfo[1])
errortext = self.gateway._geterrortext(excinfo)
- self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext))
+ self.gateway._send(Message.CHANNEL_CLOSE_ERROR,
+ id, dumps_internal(errortext))
self._local_close(id, errortext)
def _finished_receiving(self):
- self._writelock.acquire()
- try:
+ self.gateway._trace("finished receiving")
+ with self._writelock:
self.finished = True
- finally:
- self._writelock.release()
for id in self._list(self._channels):
self._local_close(id, sendonly=True)
for id in self._list(self._callbacks):
_sysex = sysex
id = "<slave>"
- class _StopExecLoop(Exception):
- pass
-
def __init__(self, io, id, _startcount=2):
+ self.execmodel = io.execmodel
self._io = io
self.id = id
- self._strconfig = Unserializer.py2str_as_py3str, Unserializer.py3str_as_py2str
+ self._strconfig = (Unserializer.py2str_as_py3str,
+ Unserializer.py3str_as_py2str)
self._channelfactory = ChannelFactory(self, _startcount)
- self._receivelock = threading.RLock()
+ self._receivelock = self.execmodel.RLock()
# globals may be NONE at process-termination
self.__trace = trace
self._geterrortext = geterrortext
+ self._receivepool = self.execmodel.WorkerPool(1)
def _trace(self, *msg):
self.__trace(self.id, *msg)
def _initreceive(self):
- self._receiverthread = threading.Thread(name="receiver",
- target=self._thread_receiver)
- self._receiverthread.setDaemon(1)
- self._receiverthread.start()
+ self._receiverthread = self._receivepool.spawn(self._thread_receiver)
def _thread_receiver(self):
- self._trace("RECEIVERTHREAD: starting to run")
- eof = False
+ def log(*msg):
+ self._trace("[receiver-thread]", *msg)
+
+ log("RECEIVERTHREAD: starting to run")
io = self._io
try:
try:
while 1:
msg = Message.from_io(io)
- self._trace("received", msg)
- _receivelock = self._receivelock
- _receivelock.acquire()
- try:
+ log("received", msg)
+ with self._receivelock:
msg.received(self)
del msg
- finally:
- _receivelock.release()
- except self._sysex:
- self._trace("RECEIVERTHREAD: doing io.close_read()")
- self._io.close_read()
+ except (KeyboardInterrupt, GatewayReceivedTerminate):
+ pass
except EOFError:
- self._trace("RECEIVERTHREAD: got EOFError")
- self._trace("RECEIVERTHREAD: traceback was: ",
- self._geterrortext(self.exc_info()))
+ log("EOF without prior gateway termination message")
self._error = self.exc_info()[1]
- eof = True
- except:
- self._trace("RECEIVERTHREAD", self._geterrortext(self.exc_info()))
+ except Exception:
+ log(self._geterrortext(self.exc_info()))
finally:
try:
- self._trace('RECEIVERTHREAD', 'entering finalization')
- if eof:
- self._terminate_execution()
+ log('entering finalization')
+ # wake up and terminate any execution waiting to receive
self._channelfactory._finished_receiving()
- self._trace('RECEIVERTHREAD', 'leaving finalization')
- except:
- pass # XXX be silent at interp-shutdown
+ log('terminating execution')
+ self._terminate_execution()
+ log('closing read')
+ self._io.close_read()
+ log('closing write')
+ self._io.close_write()
+ log('leaving finalization')
+ except: # be silent at shutdown
+ pass
def _terminate_execution(self):
pass
except (IOError, ValueError):
e = sys.exc_info()[1]
self._trace('failed to send', message, e)
- raise
-
+ # ValueError might be because the IO is already closed
+ raise IOError("cannot send (already closed?)")
def _local_schedulexec(self, channel, sourcetask):
channel.close("execution disallowed")
def join(self, timeout=None):
""" Wait for receiverthread to terminate. """
- current = threading.currentThread()
- if self._receiverthread.isAlive():
- self._trace("joining receiver thread")
- self._receiverthread.join(timeout)
- else:
- self._trace("gateway.join() called while receiverthread "
- "already finished")
+ self._trace("waiting for receiver thread to finish")
+ self._receiverthread.waitfinish()
class SlaveGateway(BaseGateway):
+
def _local_schedulexec(self, channel, sourcetask):
sourcetask = loads_internal(sourcetask)
- self._execqueue.put((channel, sourcetask))
+ self._execpool.spawn(self.executetask, ((channel, sourcetask)))
def _terminate_execution(self):
# called from receiverthread
- self._trace("putting None to execqueue")
- self._execqueue.put(None)
- if interrupt_main:
- self._trace("calling interrupt_main()")
- interrupt_main()
- self._execfinished.wait(10.0)
- if not self._execfinished.isSet():
- self._trace("execution did not finish in 10 secs, calling os._exit()")
- os._exit(1)
-
- def serve(self, joining=True):
+ self._trace("shutting down execution pool")
+ self._execpool.shutdown()
+ if not self._execpool.waitall(5.0):
+ self._trace("execution ongoing after 5 secs, trying interrupt_main")
+ # We try hard to terminate execution based on the assumption
+ # that there is only one gateway object running per-process.
+ if sys.platform != "win32":
+ self._trace("sending ourselves a SIGINT")
+ os.kill(os.getpid(), 2) # send ourselves a SIGINT
+ elif interrupt_main is not None:
+ self._trace("calling interrupt_main()")
+ interrupt_main()
+ if not self._execpool.waitall(10.0):
+ self._trace("execution did not finish in 10 secs, "
+ "calling os._exit()")
+ os._exit(1)
+
+ def serve(self):
+ trace = lambda msg: self._trace("[serve] " + msg)
+ hasprimary = self.execmodel.backend == "thread"
+ self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
+ trace("spawning receiver thread")
+ self._initreceive()
try:
try:
- self._execqueue = queue.Queue()
- self._execfinished = threading.Event()
- self._initreceive()
- while 1:
- item = self._execqueue.get()
- if item is None:
- break
- try:
- self.executetask(item)
- except self._StopExecLoop:
- break
+ if hasprimary:
+ trace("integrating as main primary exec thread")
+ self._execpool.integrate_as_primary_thread()
+ else:
+ trace("waiting for execution to finish")
+ self._execpool.wait_for_shutdown()
finally:
- self._execfinished.set()
- self._trace("io.close_write()")
- self._io.close_write()
- self._trace("slavegateway.serve finished")
- if joining:
- self.join()
+ trace("execution finished")
+
+ trace("joining receiver thread")
+ self.join()
except KeyboardInterrupt:
# in the slave we can't really do anything sensible
- self._trace("swallowing keyboardinterrupt in main-thread")
+ trace("swallowing keyboardinterrupt, serve finished")
def executetask(self, item):
try:
channel._executing = True
try:
co = compile(source+'\n', '<remote exec>', 'exec')
- do_exec(co, loc)
+ do_exec(co, loc) # noqa
if call_name:
self._trace('calling %s(**%60r)' % (call_name, kwargs))
function = loc[call_name]
finally:
channel._executing = False
self._trace("execution finished")
- except self._StopExecLoop:
- channel.close()
- raise
except KeyboardInterrupt:
channel.close(INTERRUPT_TEXT)
raise
except:
excinfo = self.exc_info()
- self._trace("got exception: %s" % (excinfo[1],))
- errortext = self._geterrortext(excinfo)
- channel.close(errortext)
- else:
- channel.close()
+ if not isinstance(excinfo[1], EOFError):
+ if not channel.gateway._channelfactory.finished:
+ self._trace("got exception: %r" % (excinfo[1],))
+ errortext = self._geterrortext(excinfo)
+ channel.close(errortext)
+ return
+ self._trace("ignoring EOFError because receiving finished")
+ channel.close()
#
# Cross-Python pickling code, tested from test_serializer.py
self._write(opcode.CHANNEL)
self._write_int4(channel.id)
-def init_popen_io():
+def init_popen_io(execmodel):
if not hasattr(os, 'dup'): # jython
- io = Popen2IO(sys.stdout, sys.stdin)
+ io = Popen2IO(sys.stdout, sys.stdin, execmodel)
import tempfile
sys.stdin = tempfile.TemporaryFile('r')
sys.stdout = tempfile.TemporaryFile('w')
else:
devnull = '/dev/null'
# stdin
- stdin = os.fdopen(os.dup(0), 'r', 1)
+ stdin = execmodel.fdopen(os.dup(0), 'r', 1)
fd = os.open(devnull, os.O_RDONLY)
os.dup2(fd, 0)
os.close(fd)
# stdout
- stdout = os.fdopen(os.dup(1), 'w', 1)
+ stdout = execmodel.fdopen(os.dup(1), 'w', 1)
fd = os.open(devnull, os.O_WRONLY)
os.dup2(fd, 1)
# stderr for win32
if os.name == 'nt':
- sys.stderr = os.fdopen(os.dup(2), 'w', 1)
+ sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
os.dup2(fd, 2)
os.close(fd)
- io = Popen2IO(stdout, stdin)
- sys.stdin = os.fdopen(0, 'r', 1)
- sys.stdout = os.fdopen(1, 'w', 1)
+ io = Popen2IO(stdout, stdin, execmodel)
+ sys.stdin = execmodel.fdopen(0, 'r', 1)
+ sys.stdout = execmodel.fdopen(1, 'w', 1)
return io
def serve(io, id):
--- /dev/null
+"""
+base execnet gateway code send to the other side for bootstrapping.
+
+NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
+
+(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others
+"""
+from __future__ import with_statement
+import sys, os, weakref
+import traceback, struct
+
+# NOTE that we want to avoid try/except style importing
+# to avoid setting sys.exc_info() during import
+#
+
+ISPY3 = sys.version_info >= (3, 0)
+if ISPY3:
+ from io import BytesIO
+ exec("def do_exec(co, loc): exec(co, loc)\n"
+ "def reraise(cls, val, tb): raise val\n")
+ unicode = str
+ _long_type = int
+ from _thread import interrupt_main
+else:
+ from StringIO import StringIO as BytesIO
+ exec("def do_exec(co, loc): exec co in loc\n"
+ "def reraise(cls, val, tb): raise cls, val, tb\n")
+ bytes = str
+ _long_type = long
+ try:
+ from thread import interrupt_main
+ except ImportError:
+ interrupt_main = None
+
+class EmptySemaphore:
+ acquire = release = lambda self: None
+
+def get_execmodel(backend):
+ if hasattr(backend, "backend"):
+ return backend
+ if backend == "thread":
+ importdef = {
+ 'get_ident': ['thread::get_ident', '_thread::get_ident'],
+ '_start_new_thread': ['thread::start_new_thread',
+ '_thread::start_new_thread'],
+ 'threading': ["threading",],
+ 'queue': ["queue", "Queue"],
+ 'sleep': ['time::sleep'],
+ 'subprocess': ['subprocess'],
+ 'socket': ['socket'],
+ '_fdopen': ['os::fdopen'],
+ '_lock': ['threading'],
+ '_event': ['threading'],
+ }
+ def exec_start(self, func, args=()):
+ self._start_new_thread(func, args)
+
+ elif backend == "eventlet":
+ importdef = {
+ 'get_ident': ['eventlet.green.thread::get_ident'],
+ '_spawn_n': ['eventlet::spawn_n'],
+ 'threading': ['eventlet.green.threading'],
+ 'queue': ["eventlet.queue"],
+ 'sleep': ['eventlet::sleep'],
+ 'subprocess': ['eventlet.green.subprocess'],
+ 'socket': ['eventlet.green.socket'],
+ '_fdopen': ['eventlet.green.os::fdopen'],
+ '_lock': ['eventlet.green.threading'],
+ '_event': ['eventlet.green.threading'],
+ }
+ def exec_start(self, func, args=()):
+ self._spawn_n(func, *args)
+ elif backend == "gevent":
+ importdef = {
+ 'get_ident': ['gevent.thread::get_ident'],
+ '_spawn_n': ['gevent::spawn'],
+ 'threading': ['threading'],
+ 'queue': ["gevent.queue"],
+ 'sleep': ['gevent::sleep'],
+ 'subprocess': ['gevent.subprocess'],
+ 'socket': ['gevent.socket'],
+ # XXX
+ '_fdopen': ['gevent.fileobject::FileObjectThread'],
+ '_lock': ['gevent.lock'],
+ '_event': ['gevent.event'],
+ }
+ def exec_start(self, func, args=()):
+ self._spawn_n(func, *args)
+ else:
+ raise ValueError("unknown execmodel %r" %(backend,))
+
+ class ExecModel:
+ def __init__(self, name):
+ self._importdef = importdef
+ self.backend = name
+ self._count = 0
+
+ def __repr__(self):
+ return "<ExecModel %r>" % self.backend
+
+ def __getattr__(self, name):
+ locs = self._importdef.get(name)
+ if locs is None:
+ raise AttributeError(name)
+ for loc in locs:
+ parts = loc.split("::")
+ loc = parts.pop(0)
+ try:
+ mod = __import__(loc, None, None, "__doc__")
+ except ImportError:
+ pass
+ else:
+ if parts:
+ mod = getattr(mod, parts[0])
+ setattr(self, name, mod)
+ return mod
+ raise AttributeError(name)
+
+ start = exec_start
+
+ def fdopen(self, fd, mode, bufsize=1):
+ return self._fdopen(fd, mode, bufsize)
+
+ def WorkerPool(self, size=None, hasprimary=False):
+ return WorkerPool(self, size, hasprimary=hasprimary)
+
+ def Semaphore(self, size=None):
+ if size is None:
+ return EmptySemaphore()
+ return self._lock.Semaphore(size)
+
+ def Lock(self):
+ return self._lock.RLock()
+
+ def RLock(self):
+ return self._lock.RLock()
+
+ def Event(self):
+ event = self._event.Event()
+ if sys.version_info < (2,7):
+ # patch wait function to return event state instead of None
+ real_wait = event.wait
+ def wait(timeout=None):
+ real_wait(timeout=timeout)
+ return event.isSet()
+ event.wait = wait
+ return event
+
+ def PopenPiped(self, args):
+ PIPE = self.subprocess.PIPE
+ return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
+
+
+ return ExecModel(backend)
+
+
+class Reply(object):
+ """ reply instances provide access to the result
+ of a function execution that got dispatched
+ through WorkerPool.spawn()
+ """
+ def __init__(self, task, threadmodel):
+ self.task = task
+ self._result_ready = threadmodel.Event()
+ self.running = True
+
+ def get(self, timeout=None):
+ """ get the result object from an asynchronous function execution.
+ if the function execution raised an exception,
+ then calling get() will reraise that exception
+ including its traceback.
+ """
+ self.waitfinish(timeout)
+ try:
+ return self._result
+ except AttributeError:
+ reraise(*(self._excinfo[:3])) # noqa
+
+ def waitfinish(self, timeout=None):
+ if not self._result_ready.wait(timeout):
+ raise IOError("timeout waiting for %r" %(self.task, ))
+
+ def run(self):
+ func, args, kwargs = self.task
+ try:
+ try:
+ self._result = func(*args, **kwargs)
+ except:
+ self._excinfo = sys.exc_info()
+ finally:
+ self._result_ready.set()
+ self.running = False
+
+
+class WorkerPool(object):
+ """ A WorkerPool allows to spawn function executions
+ to threads, returning a reply object on which you
+ can ask for the result (and get exceptions reraised)
+ """
+ def __init__(self, execmodel, size=None, hasprimary=False):
+ """ by default allow unlimited number of spawns. """
+ self.execmodel = execmodel
+ self._size = size
+ self._running_lock = self.execmodel.Lock()
+ self._sem = self.execmodel.Semaphore(size)
+ self._running = set()
+ self._shutdown_event = self.execmodel.Event()
+ if hasprimary:
+ if self.execmodel.backend != "thread":
+ raise ValueError("hasprimary=True requires thread model")
+ self._primary_thread_event = self.execmodel.Event()
+
+ def integrate_as_primary_thread(self):
+ """ integrate the thread with which we are called as a primary
+ thread to dispatch to when spawn is called.
+ """
+ assert self.execmodel.backend == "thread", self.execmodel
+ # XXX insert check if we really are in the main thread
+ primary_thread_event = self._primary_thread_event
+ # interacts with code at REF1
+ while not self._shutdown_event.isSet():
+ primary_thread_event.wait()
+ func, args, kwargs = self._primary_thread_task
+ if func is None: # waitall() woke us up to finish the loop
+ break
+ func(*args, **kwargs)
+ primary_thread_event.clear()
+
+ def shutdown(self):
+ self._shutdown_event.set()
+
+ def wait_for_shutdown(self, timeout=None):
+ return self._shutdown_event.wait(timeout=timeout)
+
+ def active_count(self):
+ return len(self._running)
+
+ def spawn(self, func, *args, **kwargs):
+ """ return Reply object for the asynchronous dispatch
+ of the given func(*args, **kwargs).
+ """
+ reply = Reply((func, args, kwargs), self.execmodel)
+ def run_and_release():
+ reply.run()
+ with self._running_lock:
+ self._running.remove(reply)
+ self._sem.release()
+ if not self._running:
+ try:
+ self._waitall_event.set()
+ except AttributeError:
+ pass
+ self._sem.acquire()
+ with self._running_lock:
+ self._running.add(reply)
+ # REF1 in 'thread' model we give priority to running in main thread
+ primary_thread_event = getattr(self, "_primary_thread_event", None)
+ if primary_thread_event is not None:
+ if not primary_thread_event.isSet():
+ self._primary_thread_task = run_and_release, (), {}
+ primary_thread_event.set()
+ return reply
+ self.execmodel.start(run_and_release, ())
+ return reply
+
+ def waitall(self, timeout=None):
+ """ wait until all previosuly spawns have terminated. """
+ # if it exists signal primary_thread to finish its loop
+ self._primary_thread_task = None, None, None
+ try:
+ self._primary_thread_event.set()
+ except AttributeError:
+ pass
+ with self._running_lock:
+ if not self._running:
+ return True
+ # if a Reply still runs, we let run_and_release
+ # signal us -- note that we are still holding the
+ # _running_lock to avoid race conditions
+ self._waitall_event = self.execmodel.Event()
+ return self._waitall_event.wait(timeout=timeout)
+
+
+sysex = (KeyboardInterrupt, SystemExit)
+
+
+DEBUG = os.environ.get('EXECNET_DEBUG')
+pid = os.getpid()
+if DEBUG == '2':
+ def trace(*msg):
+ try:
+ line = " ".join(map(str, msg))
+ sys.stderr.write("[%s] %s\n" % (pid, line))
+ sys.stderr.flush()
+ except Exception:
+ pass # nothing we can do, likely interpreter-shutdown
+elif DEBUG:
+ import tempfile, os.path
+ fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid)
+ #sys.stderr.write("execnet-debug at %r" %(fn,))
+ debugfile = open(fn, 'w')
+ def trace(*msg):
+ try:
+ line = " ".join(map(str, msg))
+ debugfile.write(line + "\n")
+ debugfile.flush()
+ except Exception:
+ try:
+ v = sys.exc_info()[1]
+ sys.stderr.write(
+ "[%s] exception during tracing: %r\n" % (pid, v))
+ except Exception:
+ pass # nothing we can do, likely interpreter-shutdown
+else:
+ notrace = trace = lambda *msg: None
+
+class Popen2IO:
+ error = (IOError, OSError, EOFError)
+
+ def __init__(self, outfile, infile, execmodel):
+ # we need raw byte streams
+ self.outfile, self.infile = outfile, infile
+ if sys.platform == "win32":
+ import msvcrt
+ try:
+ msvcrt.setmode(infile.fileno(), os.O_BINARY)
+ msvcrt.setmode(outfile.fileno(), os.O_BINARY)
+ except (AttributeError, IOError):
+ pass
+ self._read = getattr(infile, "buffer", infile).read
+ self._write = getattr(outfile, "buffer", outfile).write
+ self.execmodel = execmodel
+
+ def read(self, numbytes):
+ """Read exactly 'numbytes' bytes from the pipe. """
+ # a file in non-blocking mode may return less bytes, so we loop
+ buf = bytes()
+ while numbytes > len(buf):
+ data = self._read(numbytes-len(buf))
+ if not data:
+ raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf)))
+ buf += data
+ return buf
+
+ def write(self, data):
+ """write out all data bytes. """
+ assert isinstance(data, bytes)
+ self._write(data)
+ self.outfile.flush()
+
+ def close_read(self):
+ self.infile.close()
+
+ def close_write(self):
+ self.outfile.close()
+
+class Message:
+ """ encapsulates Messages and their wire protocol. """
+ _types = []
+
+ def __init__(self, msgcode, channelid=0, data=''):
+ self.msgcode = msgcode
+ self.channelid = channelid
+ self.data = data
+
+ @staticmethod
+ def from_io(io):
+ try:
+ header = io.read(9) # type 1, channel 4, payload 4
+ if not header:
+ raise EOFError("empty read")
+ except EOFError:
+ e = sys.exc_info()[1]
+ raise EOFError('couldnt load message header, ' + e.args[0])
+ msgtype, channel, payload = struct.unpack('!bii', header)
+ return Message(msgtype, channel, io.read(payload))
+
+ def to_io(self, io):
+ header = struct.pack('!bii', self.msgcode, self.channelid,
+ len(self.data))
+ io.write(header+self.data)
+
+ def received(self, gateway):
+ self._types[self.msgcode](self, gateway)
+
+ def __repr__(self):
+ class FakeChannel(object):
+ _strconfig = False, False # never transform, never fail
+ def __init__(self, id):
+ self.id = id
+ def __repr__(self):
+ return '<Channel %s>' % self.id
+ FakeChannel.new = FakeChannel
+ FakeChannel.gateway = FakeChannel
+ name = self._types[self.msgcode].__name__.upper()
+ try:
+ data = loads_internal(self.data, FakeChannel)
+ except LoadError:
+ data = self.data
+ r = repr(data)
+ if len(r) > 90:
+ return "<Message.%s channelid=%d len=%d>" %(name,
+ self.channelid, len(r))
+ else:
+ return "<Message.%s channelid=%d %s>" %(name,
+ self.channelid, r)
+
+def _setupmessages():
+ def status(message, gateway):
+ # we use the channelid to send back information
+ # but don't instantiate a channel object
+ d = {'numchannels': len(gateway._channelfactory._channels),
+ 'numexecuting': gateway._execpool.active_count(),
+ 'execmodel': gateway.execmodel.backend,
+ }
+ gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d))
+ gateway._send(Message.CHANNEL_CLOSE, message.channelid)
+
+ def channel_exec(message, gateway):
+ channel = gateway._channelfactory.new(message.channelid)
+ gateway._local_schedulexec(channel=channel,sourcetask=message.data)
+
+ def channel_data(message, gateway):
+ gateway._channelfactory._local_receive(message.channelid, message.data)
+
+ def channel_close(message, gateway):
+ gateway._channelfactory._local_close(message.channelid)
+
+ def channel_close_error(message, gateway):
+ remote_error = RemoteError(loads_internal(message.data))
+ gateway._channelfactory._local_close(message.channelid, remote_error)
+
+ def channel_last_message(message, gateway):
+ gateway._channelfactory._local_close(message.channelid, sendonly=True)
+
+ def gateway_terminate(message, gateway):
+ # wake up and terminate any execution waiting to receive something
+ gateway._channelfactory._finished_receiving()
+ # then try harder to terminate execution
+ gateway._terminate_execution()
+
+ def reconfigure(message, gateway):
+ if message.channelid == 0:
+ target = gateway
+ else:
+ target = gateway._channelfactory.new(message.channelid)
+ target._strconfig = loads_internal(message.data, gateway)
+
+ types = [
+ status, reconfigure, gateway_terminate,
+ channel_exec, channel_data, channel_close,
+ channel_close_error, channel_last_message,
+ ]
+ for i, handler in enumerate(types):
+ Message._types.append(handler)
+ setattr(Message, handler.__name__.upper(), i)
+
+_setupmessages()
+
+def geterrortext(excinfo,
+ format_exception=traceback.format_exception, sysex=sysex):
+ try:
+ l = format_exception(*excinfo)
+ errortext = "".join(l)
+ except sysex:
+ raise
+ except:
+ errortext = '%s: %s' % (excinfo[0].__name__,
+ excinfo[1])
+ return errortext
+
+class RemoteError(Exception):
+ """ Exception containing a stringified error from the other side. """
+ def __init__(self, formatted):
+ self.formatted = formatted
+ Exception.__init__(self)
+
+ def __str__(self):
+ return self.formatted
+
+ def __repr__(self):
+ return "%s: %s" %(self.__class__.__name__, self.formatted)
+
+ def warn(self):
+ if self.formatted != INTERRUPT_TEXT:
+ # XXX do this better
+ sys.stderr.write("[%s] Warning: unhandled %r\n"
+ % (os.getpid(), self,))
+
+class TimeoutError(IOError):
+ """ Exception indicating that a timeout was reached. """
+
+
+NO_ENDMARKER_WANTED = object()
+
+class Channel(object):
+ """Communication channel between two Python Interpreter execution points."""
+ RemoteError = RemoteError
+ TimeoutError = TimeoutError
+ Message = Message
+ _INTERNALWAKEUP = 1000
+ _executing = False
+
+ def __init__(self, gateway, id):
+ assert isinstance(id, int)
+ self.gateway = gateway
+ #XXX: defaults copied from Unserializer
+ self._strconfig = getattr(gateway, '_strconfig', (True, False))
+ self.id = id
+ self._items = self.gateway.execmodel.queue.Queue()
+ self._closed = False
+ self._receiveclosed = self.gateway.execmodel.Event()
+ self._remoteerrors = []
+
+ def _trace(self, *msg):
+ self.gateway._trace(self.id, *msg)
+
+ def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
+ """ set a callback function for receiving items.
+
+ All already queued items will immediately trigger the callback.
+ Afterwards the callback will execute in the receiver thread
+ for each received data item and calls to ``receive()`` will
+ raise an error.
+ If an endmarker is specified the callback will eventually
+ be called with the endmarker when the channel closes.
+ """
+ _callbacks = self.gateway._channelfactory._callbacks
+ _receivelock = self.gateway._receivelock
+ _receivelock.acquire()
+ try:
+ if self._items is None:
+ raise IOError("%r has callback already registered" %(self,))
+ items = self._items
+ self._items = None
+ while 1:
+ try:
+ olditem = items.get(block=False)
+ except self.gateway.execmodel.queue.Empty:
+ if not (self._closed or self._receiveclosed.isSet()):
+ _callbacks[self.id] = (
+ callback,
+ endmarker,
+ self._strconfig,
+ )
+ break
+ else:
+ if olditem is ENDMARKER:
+ items.put(olditem) # for other receivers
+ if endmarker is not NO_ENDMARKER_WANTED:
+ callback(endmarker)
+ break
+ else:
+ callback(olditem)
+ finally:
+ _receivelock.release()
+
+ def __repr__(self):
+ flag = self.isclosed() and "closed" or "open"
+ return "<Channel id=%d %s>" % (self.id, flag)
+
+ def __del__(self):
+ if self.gateway is None: # can be None in tests
+ return
+ self._trace("channel.__del__")
+ # no multithreading issues here, because we have the last ref to 'self'
+ if self._closed:
+ # state transition "closed" --> "deleted"
+ for error in self._remoteerrors:
+ error.warn()
+ elif self._receiveclosed.isSet():
+ # state transition "sendonly" --> "deleted"
+ # the remote channel is already in "deleted" state, nothing to do
+ pass
+ else:
+ # state transition "opened" --> "deleted"
+ if self._items is None: # has_callback
+ msgcode = self.Message.CHANNEL_LAST_MESSAGE
+ else:
+ msgcode = self.Message.CHANNEL_CLOSE
+ try:
+ self.gateway._send(msgcode, self.id)
+ except (IOError, ValueError): # ignore problems with sending
+ pass
+
+ def _getremoteerror(self):
+ try:
+ return self._remoteerrors.pop(0)
+ except IndexError:
+ try:
+ return self.gateway._error
+ except AttributeError:
+ pass
+ return None
+
+ #
+ # public API for channel objects
+ #
+ def isclosed(self):
+ """ return True if the channel is closed. A closed
+ channel may still hold items.
+ """
+ return self._closed
+
+ def makefile(self, mode='w', proxyclose=False):
+ """ return a file-like object.
+ mode can be 'w' or 'r' for writeable/readable files.
+ if proxyclose is true file.close() will also close the channel.
+ """
+ if mode == "w":
+ return ChannelFileWrite(channel=self, proxyclose=proxyclose)
+ elif mode == "r":
+ return ChannelFileRead(channel=self, proxyclose=proxyclose)
+ raise ValueError("mode %r not availabe" %(mode,))
+
+ def close(self, error=None):
+ """ close down this channel with an optional error message.
+ Note that closing of a channel tied to remote_exec happens
+ automatically at the end of execution and cannot
+ be done explicitely.
+ """
+ if self._executing:
+ raise IOError("cannot explicitly close channel within remote_exec")
+ if self._closed:
+ self.gateway._trace(self, "ignoring redundant call to close()")
+ if not self._closed:
+ # state transition "opened/sendonly" --> "closed"
+ # threads warning: the channel might be closed under our feet,
+ # but it's never damaging to send too many CHANNEL_CLOSE messages
+ # however, if the other side triggered a close already, we
+ # do not send back a closed message.
+ if not self._receiveclosed.isSet():
+ put = self.gateway._send
+ if error is not None:
+ put(self.Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error))
+ else:
+ put(self.Message.CHANNEL_CLOSE, self.id)
+ self._trace("sent channel close message")
+ if isinstance(error, RemoteError):
+ self._remoteerrors.append(error)
+ self._closed = True # --> "closed"
+ self._receiveclosed.set()
+ queue = self._items
+ if queue is not None:
+ queue.put(ENDMARKER)
+ self.gateway._channelfactory._no_longer_opened(self.id)
+
+ def waitclose(self, timeout=None):
+ """ wait until this channel is closed (or the remote side
+ otherwise signalled that no more data was being sent).
+ The channel may still hold receiveable items, but not receive
+ any more after waitclose() has returned. Exceptions from executing
+ code on the other side are reraised as local channel.RemoteErrors.
+ EOFError is raised if the reading-connection was prematurely closed,
+ which often indicates a dying process.
+ self.TimeoutError is raised after the specified number of seconds
+ (default is None, i.e. wait indefinitely).
+ """
+ self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
+ if not self._receiveclosed.isSet():
+ raise self.TimeoutError("Timeout after %r seconds" % timeout)
+ error = self._getremoteerror()
+ if error:
+ raise error
+
+ def send(self, item):
+ """sends the given item to the other side of the channel,
+ possibly blocking if the sender queue is full.
+ The item must be a simple python type and will be
+ copied to the other side by value. IOError is
+ raised if the write pipe was prematurely closed.
+ """
+ if self.isclosed():
+ raise IOError("cannot send to %r" %(self,))
+ self.gateway._send(self.Message.CHANNEL_DATA, self.id, dumps_internal(item))
+
+ def receive(self, timeout=None):
+ """receive a data item that was sent from the other side.
+ timeout: None [default] blocked waiting. A positive number
+ indicates the number of seconds after which a channel.TimeoutError
+ exception will be raised if no item was received.
+ Note that exceptions from the remotely executing code will be
+ reraised as channel.RemoteError exceptions containing
+ a textual representation of the remote traceback.
+ """
+ itemqueue = self._items
+ if itemqueue is None:
+ raise IOError("cannot receive(), channel has receiver callback")
+ try:
+ x = itemqueue.get(timeout=timeout)
+ except self.gateway.execmodel.queue.Empty:
+ raise self.TimeoutError("no item after %r seconds" %(timeout))
+ if x is ENDMARKER:
+ itemqueue.put(x) # for other receivers
+ raise self._getremoteerror() or EOFError()
+ else:
+ return x
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ return self.receive()
+ except EOFError:
+ raise StopIteration
+ __next__ = next
+
+
+ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
+ """
+ set the string coercion for this channel
+ the default is to try to convert py2 str as py3 str,
+ but not to try and convert py3 str to py2 str
+ """
+ self._strconfig = (py2str_as_py3str, py3str_as_py2str)
+ data = dumps_internal(self._strconfig)
+ self.gateway._send(self.Message.RECONFIGURE, self.id, data=data)
+
+ENDMARKER = object()
+INTERRUPT_TEXT = "keyboard-interrupted"
+
+class ChannelFactory(object):
+ def __init__(self, gateway, startcount=1):
+ self._channels = weakref.WeakValueDictionary()
+ self._callbacks = {}
+ self._writelock = gateway.execmodel.Lock()
+ self.gateway = gateway
+ self.count = startcount
+ self.finished = False
+ self._list = list # needed during interp-shutdown
+
+ def new(self, id=None):
+ """ create a new Channel with 'id' (or create new id if None). """
+ self._writelock.acquire()
+ try:
+ if self.finished:
+ raise IOError("connexion already closed: %s" % (self.gateway,))
+ if id is None:
+ id = self.count
+ self.count += 2
+ try:
+ channel = self._channels[id]
+ except KeyError:
+ channel = self._channels[id] = Channel(self.gateway, id)
+ return channel
+ finally:
+ self._writelock.release()
+
+ def channels(self):
+ return self._list(self._channels.values())
+
+ #
+ # internal methods, called from the receiver thread
+ #
+ def _no_longer_opened(self, id):
+ try:
+ del self._channels[id]
+ except KeyError:
+ pass
+ try:
+ callback, endmarker, strconfig = self._callbacks.pop(id)
+ except KeyError:
+ pass
+ else:
+ if endmarker is not NO_ENDMARKER_WANTED:
+ callback(endmarker)
+
+ def _local_close(self, id, remoteerror=None, sendonly=False):
+ channel = self._channels.get(id)
+ if channel is None:
+ # channel already in "deleted" state
+ if remoteerror:
+ remoteerror.warn()
+ self._no_longer_opened(id)
+ else:
+ # state transition to "closed" state
+ if remoteerror:
+ channel._remoteerrors.append(remoteerror)
+ queue = channel._items
+ if queue is not None:
+ queue.put(ENDMARKER)
+ self._no_longer_opened(id)
+ if not sendonly: # otherwise #--> "sendonly"
+ channel._closed = True # --> "closed"
+ channel._receiveclosed.set()
+
+ def _local_receive(self, id, data):
+ # executes in receiver thread
+ try:
+ callback, endmarker, strconfig= self._callbacks[id]
+ channel = self._channels.get(id)
+ except KeyError:
+ channel = self._channels.get(id)
+ queue = channel and channel._items
+ if queue is None:
+ pass # drop data
+ else:
+ item = loads_internal(data, channel)
+ queue.put(item)
+ else:
+ try:
+ data = loads_internal(data, channel, strconfig)
+ callback(data) # even if channel may be already closed
+ except KeyboardInterrupt:
+ raise
+ except:
+ excinfo = sys.exc_info()
+ self.gateway._trace("exception during callback: %s" % excinfo[1])
+ errortext = self.gateway._geterrortext(excinfo)
+ self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext))
+ self._local_close(id, errortext)
+
+ def _finished_receiving(self):
+ self.gateway._trace("finished receiving")
+ self._writelock.acquire()
+ try:
+ self.finished = True
+ finally:
+ self._writelock.release()
+ for id in self._list(self._channels):
+ self._local_close(id, sendonly=True)
+ for id in self._list(self._callbacks):
+ self._no_longer_opened(id)
+
+class ChannelFile(object):
+ def __init__(self, channel, proxyclose=True):
+ self.channel = channel
+ self._proxyclose = proxyclose
+
+ def isatty(self):
+ return False
+
+ def close(self):
+ if self._proxyclose:
+ self.channel.close()
+
+ def __repr__(self):
+ state = self.channel.isclosed() and 'closed' or 'open'
+ return '<ChannelFile %d %s>' %(self.channel.id, state)
+
+class ChannelFileWrite(ChannelFile):
+ def write(self, out):
+ self.channel.send(out)
+
+ def flush(self):
+ pass
+
+class ChannelFileRead(ChannelFile):
+ def __init__(self, channel, proxyclose=True):
+ super(ChannelFileRead, self).__init__(channel, proxyclose)
+ self._buffer = None
+
+ def read(self, n):
+ try:
+ if self._buffer is None:
+ self._buffer = self.channel.receive()
+ while len(self._buffer) < n:
+ self._buffer += self.channel.receive()
+ except EOFError:
+ self.close()
+ if self._buffer is None:
+ ret = ""
+ else:
+ ret = self._buffer[:n]
+ self._buffer = self._buffer[n:]
+ return ret
+
+ def readline(self):
+ if self._buffer is not None:
+ i = self._buffer.find("\n")
+ if i != -1:
+ return self.read(i+1)
+ line = self.read(len(self._buffer)+1)
+ else:
+ line = self.read(1)
+ while line and line[-1] != "\n":
+ c = self.read(1)
+ if not c:
+ break
+ line += c
+ return line
+
+class BaseGateway(object):
+ exc_info = sys.exc_info
+ _sysex = sysex
+ id = "<slave>"
+
+ def __init__(self, io, id, _startcount=2):
+ self.execmodel = io.execmodel
+ self._io = io
+ self.id = id
+ self._strconfig = (Unserializer.py2str_as_py3str,
+ Unserializer.py3str_as_py2str)
+ self._channelfactory = ChannelFactory(self, _startcount)
+ self._receivelock = self.execmodel.RLock()
+ # globals may be NONE at process-termination
+ self.__trace = trace
+ self._geterrortext = geterrortext
+ self._workerpool = self.execmodel.WorkerPool(1)
+
+ def _trace(self, *msg):
+ self.__trace(self.id, *msg)
+
+ def _initreceive(self):
+ self._receiverthread = self._workerpool.spawn(self._thread_receiver)
+
+ def _thread_receiver(self):
+ def log(*msg):
+ self._trace("[receiver-thread]", *msg)
+ log("RECEIVERTHREAD: starting to run")
+ eof = False
+ io = self._io
+ try:
+ try:
+ while 1:
+ msg = Message.from_io(io)
+ log("received", msg)
+ _receivelock = self._receivelock
+ _receivelock.acquire()
+ try:
+ msg.received(self)
+ del msg
+ finally:
+ _receivelock.release()
+ except self._sysex:
+ log("io.close_read()")
+ self._io.close_read()
+ except EOFError:
+ log("got EOF")
+ #log("traceback was: ", self._geterrortext(self.exc_info()))
+ self._error = self.exc_info()[1]
+ eof = True
+ except:
+ log(self._geterrortext(self.exc_info()))
+ finally:
+ try:
+ log('entering finalization')
+ # wake up any execution waiting to receive something
+ self._channelfactory._finished_receiving()
+ if eof:
+ self._terminate_execution()
+ log('leaving finalization')
+ except:
+ pass # XXX be silent at interp-shutdown
+
+ def _terminate_execution(self):
+ pass
+
+ def _send(self, msgcode, channelid=0, data=bytes()):
+ message = Message(msgcode, channelid, data)
+ try:
+ message.to_io(self._io)
+ self._trace('sent', message)
+ except (IOError, ValueError):
+ e = sys.exc_info()[1]
+ self._trace('failed to send', message, e)
+ raise
+
+
+ def _local_schedulexec(self, channel, sourcetask):
+ channel.close("execution disallowed")
+
+ # _____________________________________________________________________
+ #
+ # High Level Interface
+ # _____________________________________________________________________
+ #
+ def newchannel(self):
+ """ return a new independent channel. """
+ return self._channelfactory.new()
+
+ def join(self, timeout=None):
+ """ Wait for receiverthread to terminate. """
+ self._trace("waiting for receiver thread to finish")
+ self._receiverthread.waitfinish()
+
+class SlaveGateway(BaseGateway):
+
+ def _local_schedulexec(self, channel, sourcetask):
+ sourcetask = loads_internal(sourcetask)
+ self._execpool.spawn(self.executetask, ((channel, sourcetask)))
+
+ def _terminate_execution(self):
+ # called from receiverthread
+ self._trace("shutting down execution pool")
+ self._execpool.shutdown()
+ if not self._execpool.waitall(1.0):
+ self._trace("execution ongoing, trying interrupt_main")
+ # We try hard to terminate execution based on the assumption
+ # that there is only one gateway object running per-process.
+ if sys.platform != "win32":
+ self._trace("sending ourselves a SIGINT")
+ os.kill(os.getpid(), 2) # send ourselves a SIGINT
+ elif interrupt_main is not None:
+ self._trace("calling interrupt_main()")
+ interrupt_main()
+ if not self._execpool.waitall(10.0):
+ self._trace("execution did not finish in 10 secs, "
+ "calling os._exit()")
+ os._exit(1)
+
+ def serve(self):
+ trace = lambda msg: self._trace("[serve] " + msg)
+ hasprimary = self.execmodel.backend == "thread"
+ self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
+ trace("spawning receiver thread")
+ self._initreceive()
+ try:
+ try:
+ if hasprimary:
+ trace("integrating as main primary exec thread")
+ self._execpool.integrate_as_primary_thread()
+ else:
+ trace("waiting for execution to finish")
+ self._execpool.wait_for_shutdown()
+ finally:
+ trace("execution finished, closing io write stream")
+ self._io.close_write()
+ trace("joining receiver thread")
+ self.join()
+ except KeyboardInterrupt:
+ # in the slave we can't really do anything sensible
+ trace("swallowing keyboardinterrupt, serve finished")
+
+ def executetask(self, item):
+ try:
+ channel, (source, call_name, kwargs) = item
+ if not ISPY3 and kwargs:
+ # some python2 versions do not accept unicode keyword params
+ # note: Unserializer generally turns py2-str to py3-str objects
+ newkwargs = {}
+ for name, value in kwargs.items():
+ if isinstance(name, unicode):
+ name = name.encode('ascii')
+ newkwargs[name] = value
+ kwargs = newkwargs
+ loc = {'channel' : channel, '__name__': '__channelexec__'}
+ self._trace("execution starts[%s]: %s" %
+ (channel.id, repr(source)[:50]))
+ channel._executing = True
+ try:
+ co = compile(source+'\n', '<remote exec>', 'exec')
+ do_exec(co, loc) # noqa
+ if call_name:
+ self._trace('calling %s(**%60r)' % (call_name, kwargs))
+ function = loc[call_name]
+ function(channel, **kwargs)
+ finally:
+ channel._executing = False
+ self._trace("execution finished")
+ except KeyboardInterrupt:
+ channel.close(INTERRUPT_TEXT)
+ raise
+ except:
+ excinfo = self.exc_info()
+ if not isinstance(excinfo[1], EOFError):
+ if not channel.gateway._channelfactory.finished:
+ self._trace("got exception: %r" % (excinfo[1],))
+ errortext = self._geterrortext(excinfo)
+ channel.close(errortext)
+ return
+ self._trace("ignoring EOFError because receiving finished")
+ channel.close()
+
+#
+# Cross-Python pickling code, tested from test_serializer.py
+#
+
+class DataFormatError(Exception):
+ pass
+
+class DumpError(DataFormatError):
+ """Error while serializing an object."""
+
+class LoadError(DataFormatError):
+ """Error while unserializing an object."""
+
+if ISPY3:
+ def bchr(n):
+ return bytes([n])
+else:
+ bchr = chr
+
+DUMPFORMAT_VERSION = bchr(1)
+
+FOUR_BYTE_INT_MAX = 2147483647
+
+FLOAT_FORMAT = "!d"
+FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT)
+
+class _Stop(Exception):
+ pass
+
+class Unserializer(object):
+ num2func = {} # is filled after this class definition
+ py2str_as_py3str = True # True
+ py3str_as_py2str = False # false means py2 will get unicode
+
+ def __init__(self, stream, channel_or_gateway=None, strconfig=None):
+ gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway)
+ strconfig = getattr(channel_or_gateway, '_strconfig', strconfig)
+ if strconfig:
+ self.py2str_as_py3str, self.py3str_as_py2str = strconfig
+ self.stream = stream
+ self.channelfactory = getattr(gateway, '_channelfactory', gateway)
+
+ def load(self, versioned=False):
+ if versioned:
+ ver = self.stream.read(1)
+ if ver != DUMPFORMAT_VERSION:
+ raise LoadError("wrong dumpformat version")
+ self.stack = []
+ try:
+ while True:
+ opcode = self.stream.read(1)
+ if not opcode:
+ raise EOFError
+ try:
+ loader = self.num2func[opcode]
+ except KeyError:
+ raise LoadError("unkown opcode %r - "
+ "wire protocol corruption?" % (opcode,))
+ loader(self)
+ except _Stop:
+ if len(self.stack) != 1:
+ raise LoadError("internal unserialization error")
+ return self.stack.pop(0)
+ else:
+ raise LoadError("didn't get STOP")
+
+ def load_none(self):
+ self.stack.append(None)
+
+ def load_true(self):
+ self.stack.append(True)
+
+ def load_false(self):
+ self.stack.append(False)
+
+ def load_int(self):
+ i = self._read_int4()
+ self.stack.append(i)
+
+ def load_longint(self):
+ s = self._read_byte_string()
+ self.stack.append(int(s))
+
+ if ISPY3:
+ load_long = load_int
+ load_longlong = load_longint
+ else:
+ def load_long(self):
+ i = self._read_int4()
+ self.stack.append(long(i))
+
+ def load_longlong(self):
+ l = self._read_byte_string()
+ self.stack.append(long(l))
+
+ def load_float(self):
+ binary = self.stream.read(FLOAT_FORMAT_SIZE)
+ self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0])
+
+ def _read_int4(self):
+ return struct.unpack("!i", self.stream.read(4))[0]
+
+ def _read_byte_string(self):
+ length = self._read_int4()
+ as_bytes = self.stream.read(length)
+ return as_bytes
+
+ def load_py3string(self):
+ as_bytes = self._read_byte_string()
+ if not ISPY3 and self.py3str_as_py2str:
+ # XXX Should we try to decode into latin-1?
+ self.stack.append(as_bytes)
+ else:
+ self.stack.append(as_bytes.decode("utf-8"))
+
+ def load_py2string(self):
+ as_bytes = self._read_byte_string()
+ if ISPY3 and self.py2str_as_py3str:
+ s = as_bytes.decode("latin-1")
+ else:
+ s = as_bytes
+ self.stack.append(s)
+
+ def load_bytes(self):
+ s = self._read_byte_string()
+ self.stack.append(s)
+
+ def load_unicode(self):
+ self.stack.append(self._read_byte_string().decode("utf-8"))
+
+ def load_newlist(self):
+ length = self._read_int4()
+ self.stack.append([None] * length)
+
+ def load_setitem(self):
+ if len(self.stack) < 3:
+ raise LoadError("not enough items for setitem")
+ value = self.stack.pop()
+ key = self.stack.pop()
+ self.stack[-1][key] = value
+
+ def load_newdict(self):
+ self.stack.append({})
+
+ def _load_collection(self, type_):
+ length = self._read_int4()
+ if length:
+ res = type_(self.stack[-length:])
+ del self.stack[-length:]
+ self.stack.append(res)
+ else:
+ self.stack.append(type_())
+
+ def load_buildtuple(self):
+ self._load_collection(tuple)
+
+ def load_set(self):
+ self._load_collection(set)
+
+ def load_frozenset(self):
+ self._load_collection(frozenset)
+
+ def load_stop(self):
+ raise _Stop
+
+ def load_channel(self):
+ id = self._read_int4()
+ newchannel = self.channelfactory.new(id)
+ self.stack.append(newchannel)
+
+# automatically build opcodes and byte-encoding
+
+class opcode:
+ """ container for name -> num mappings. """
+
+def _buildopcodes():
+ l = []
+ for name, func in Unserializer.__dict__.items():
+ if name.startswith("load_"):
+ opname = name[5:].upper()
+ l.append((opname, func))
+ l.sort()
+ for i,(opname, func) in enumerate(l):
+ assert i < 26, "xxx"
+ i = bchr(64+i)
+ Unserializer.num2func[i] = func
+ setattr(opcode, opname, i)
+
+_buildopcodes()
+
+def dumps(obj):
+ """ return a serialized bytestring of the given obj.
+
+ The obj and all contained objects must be of a builtin
+ python type (so nested dicts, sets, etc. are all ok but
+ not user-level instances).
+ """
+ return _Serializer().save(obj, versioned=True)
+
+def dump(byteio, obj):
+ """ write a serialized bytestring of the given obj to the given stream. """
+ _Serializer(write=byteio.write).save(obj, versioned=True)
+
+def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False):
+ """ return the object as deserialized from the given bytestring.
+
+ py2str_as_py3str: if true then string (str) objects previously
+ dumped on Python2 will be loaded as Python3
+ strings which really are text objects.
+ py3str_as_py2str: if true then string (str) objects previously
+ dumped on Python3 will be loaded as Python2
+ strings instead of unicode objects.
+
+ if the bytestring was dumped with an incompatible protocol
+ version or if the bytestring is corrupted, the
+ ``execnet.DataFormatError`` will be raised.
+ """
+ io = BytesIO(bytestring)
+ return load(io, py2str_as_py3str=py2str_as_py3str,
+ py3str_as_py2str=py3str_as_py2str)
+
+def load(io, py2str_as_py3str=False, py3str_as_py2str=False):
+ """ derserialize an object form the specified stream.
+
+ Behaviour and parameters are otherwise the same as with ``loads``
+ """
+ strconfig=(py2str_as_py3str, py3str_as_py2str)
+ return Unserializer(io, strconfig=strconfig).load(versioned=True)
+
+def loads_internal(bytestring, channelfactory=None, strconfig=None):
+ io = BytesIO(bytestring)
+ return Unserializer(io, channelfactory, strconfig).load()
+
+def dumps_internal(obj):
+ return _Serializer().save(obj)
+
+
+class _Serializer(object):
+ _dispatch = {}
+
+ def __init__(self, write=None):
+ if write is None:
+ self._streamlist = []
+ write = self._streamlist.append
+ self._write = write
+
+ def save(self, obj, versioned=False):
+ # calling here is not re-entrant but multiple instances
+ # may write to the same stream because of the common platform
+ # atomic-write guaruantee (concurrent writes each happen atomicly)
+ if versioned:
+ self._write(DUMPFORMAT_VERSION)
+ self._save(obj)
+ self._write(opcode.STOP)
+ try:
+ streamlist = self._streamlist
+ except AttributeError:
+ return None
+ return type(streamlist[0])().join(streamlist)
+
+ def _save(self, obj):
+ tp = type(obj)
+ try:
+ dispatch = self._dispatch[tp]
+ except KeyError:
+ methodname = 'save_' + tp.__name__
+ meth = getattr(self.__class__, methodname, None)
+ if meth is None:
+ raise DumpError("can't serialize %s" % (tp,))
+ dispatch = self._dispatch[tp] = meth
+ dispatch(self, obj)
+
+ def save_NoneType(self, non):
+ self._write(opcode.NONE)
+
+ def save_bool(self, boolean):
+ if boolean:
+ self._write(opcode.TRUE)
+ else:
+ self._write(opcode.FALSE)
+
+ def save_bytes(self, bytes_):
+ self._write(opcode.BYTES)
+ self._write_byte_sequence(bytes_)
+
+ if ISPY3:
+ def save_str(self, s):
+ self._write(opcode.PY3STRING)
+ self._write_unicode_string(s)
+ else:
+ def save_str(self, s):
+ self._write(opcode.PY2STRING)
+ self._write_byte_sequence(s)
+
+ def save_unicode(self, s):
+ self._write(opcode.UNICODE)
+ self._write_unicode_string(s)
+
+ def _write_unicode_string(self, s):
+ try:
+ as_bytes = s.encode("utf-8")
+ except UnicodeEncodeError:
+ raise DumpError("strings must be utf-8 encodable")
+ self._write_byte_sequence(as_bytes)
+
+ def _write_byte_sequence(self, bytes_):
+ self._write_int4(len(bytes_), "string is too long")
+ self._write(bytes_)
+
+ def _save_integral(self, i, short_op, long_op):
+ if i <= FOUR_BYTE_INT_MAX:
+ self._write(short_op)
+ self._write_int4(i)
+ else:
+ self._write(long_op)
+ self._write_byte_sequence(str(i).rstrip("L").encode("ascii"))
+
+ def save_int(self, i):
+ self._save_integral(i, opcode.INT, opcode.LONGINT)
+
+ def save_long(self, l):
+ self._save_integral(l, opcode.LONG, opcode.LONGLONG)
+
+ def save_float(self, flt):
+ self._write(opcode.FLOAT)
+ self._write(struct.pack(FLOAT_FORMAT, flt))
+
+ def _write_int4(self, i, error="int must be less than %i" %
+ (FOUR_BYTE_INT_MAX,)):
+ if i > FOUR_BYTE_INT_MAX:
+ raise DumpError(error)
+ self._write(struct.pack("!i", i))
+
+ def save_list(self, L):
+ self._write(opcode.NEWLIST)
+ self._write_int4(len(L), "list is too long")
+ for i, item in enumerate(L):
+ self._write_setitem(i, item)
+
+ def _write_setitem(self, key, value):
+ self._save(key)
+ self._save(value)
+ self._write(opcode.SETITEM)
+
+ def save_dict(self, d):
+ self._write(opcode.NEWDICT)
+ for key, value in d.items():
+ self._write_setitem(key, value)
+
+ def save_tuple(self, tup):
+ for item in tup:
+ self._save(item)
+ self._write(opcode.BUILDTUPLE)
+ self._write_int4(len(tup), "tuple is too long")
+
+ def _write_set(self, s, op):
+ for item in s:
+ self._save(item)
+ self._write(op)
+ self._write_int4(len(s), "set is too long")
+
+ def save_set(self, s):
+ self._write_set(s, opcode.SET)
+
+ def save_frozenset(self, s):
+ self._write_set(s, opcode.FROZENSET)
+
+ def save_Channel(self, channel):
+ self._write(opcode.CHANNEL)
+ self._write_int4(channel.id)
+
+def init_popen_io(execmodel):
+ if not hasattr(os, 'dup'): # jython
+ io = Popen2IO(sys.stdout, sys.stdin, execmodel)
+ import tempfile
+ sys.stdin = tempfile.TemporaryFile('r')
+ sys.stdout = tempfile.TemporaryFile('w')
+ else:
+ try:
+ devnull = os.devnull
+ except AttributeError:
+ if os.name == 'nt':
+ devnull = 'NUL'
+ else:
+ devnull = '/dev/null'
+ # stdin
+ stdin = execmodel.fdopen(os.dup(0), 'r', 1)
+ fd = os.open(devnull, os.O_RDONLY)
+ os.dup2(fd, 0)
+ os.close(fd)
+
+ # stdout
+ stdout = execmodel.fdopen(os.dup(1), 'w', 1)
+ fd = os.open(devnull, os.O_WRONLY)
+ os.dup2(fd, 1)
+
+ # stderr for win32
+ if os.name == 'nt':
+ sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
+ os.dup2(fd, 2)
+ os.close(fd)
+ io = Popen2IO(stdout, stdin, execmodel)
+ sys.stdin = execmodel.fdopen(0, 'r', 1)
+ sys.stdout = execmodel.fdopen(1, 'w', 1)
+ return io
+
+def serve(io, id):
+ trace("creating slavegateway on %r" %(io,))
+ SlaveGateway(io=io, id=id, _startcount=2).serve()
sendexec(io,
"import sys",
"sys.path.insert(0, %r)" % importdir,
- "from execnet.gateway_base import serve, init_popen_io",
+ "from execnet.gateway_base import serve, init_popen_io, get_execmodel",
"sys.stdout.write('1')",
"sys.stdout.flush()",
- "serve(init_popen_io(), id='%s-slave')" % spec.id,
+ "execmodel = get_execmodel(%r)" % spec.execmodel,
+ "serve(init_popen_io(execmodel), id='%s-slave')" % spec.id,
)
s = io.read(1)
- assert s == "1".encode('ascii')
+ assert s == "1".encode('ascii'), repr(s)
def bootstrap_ssh(io, spec):
try:
sendexec(io,
inspect.getsource(gateway_base),
- 'io = init_popen_io()',
+ "execmodel = get_execmodel(%r)" % spec.execmodel,
+ 'io = init_popen_io(execmodel)',
"io.write('1'.encode('ascii'))",
"serve(io, id='%s-slave')" % spec.id,
)
inspect.getsource(gateway_base),
'import socket',
inspect.getsource(SocketIO),
- "io = SocketIO(clientsock)",
+ "try: execmodel",
+ "except NameError:",
+ " execmodel = get_execmodel('thread')",
+ "io = SocketIO(clientsock, execmodel)",
"io.write('1'.encode('ascii'))",
"serve(io, id='%s-slave')" % id,
)
"""
import os
import sys
-from subprocess import Popen, PIPE
try:
from execnet.gateway_base import Popen2IO, Message
from __main__ import Popen2IO, Message
class Popen2IOMaster(Popen2IO):
- def __init__(self, args):
- self.popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
- Popen2IO.__init__(self, p.stdin, p.stdout)
+ def __init__(self, args, execmodel):
+ self.popen = p = execmodel.PopenPiped(args)
+ Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel)
def wait(self):
try:
args.append(remotecmd)
return args
-
-def create_io(spec):
+def create_io(spec, execmodel):
if spec.popen:
args = popen_args(spec)
- return Popen2IOMaster(args)
+ return Popen2IOMaster(args, execmodel)
if spec.ssh:
args = ssh_args(spec)
- io = Popen2IOMaster(args)
+ io = Popen2IOMaster(args, execmodel)
io.remoteaddress = spec.ssh
return io
+#
+# Proxy Gateway handling code
+#
+# master: proxy initiator
+# forwarder: forwards between master and sub
+# sub: sub process that is proxied to the initiator
+
RIO_KILL = 1
RIO_WAIT = 2
RIO_REMOTEADDRESS = 3
RIO_CLOSE_WRITE = 4
-class RemoteIO(object):
- def __init__(self, master_channel):
- self.iochan = master_channel.gateway.newchannel()
- self.controlchan = master_channel.gateway.newchannel()
- master_channel.send((self.iochan, self.controlchan))
- self.io = self.iochan.makefile('r')
-
+class ProxyIO(object):
+ """ A Proxy IO object allows to instantiate a Gateway
+ through another "via" gateway. A master:ProxyIO object
+ provides an IO object effectively connected to the sub
+ via the forwarder. To achieve this, master:ProxyIO interacts
+ with forwarder:serve_proxy_io() which itself
+ instantiates and interacts with the sub.
+ """
+ def __init__(self, proxy_channel, execmodel):
+ # after exchanging the control channel we use proxy_channel
+ # for messaging IO
+ self.controlchan = proxy_channel.gateway.newchannel()
+ proxy_channel.send(self.controlchan)
+ self.iochan = proxy_channel
+ self.iochan_file = self.iochan.makefile('r')
+ self.execmodel = execmodel
def read(self, nbytes):
- return self.io.read(nbytes)
+ return self.iochan_file.read(nbytes)
def write(self, data):
return self.iochan.send(data)
def wait(self):
return self._controll(RIO_WAIT)
+ @property
+ def remoteaddress(self):
+ return self._controll(RIO_REMOTEADDRESS)
+
def __repr__(self):
return '<RemoteIO via %s>' % (self.iochan.gateway.id, )
-
-def serve_remote_io(channel):
- class PseudoSpec(object):
- def __getattr__(self, name):
- return None
- spec = PseudoSpec()
- spec.__dict__.update(channel.receive())
- io = create_io(spec)
- io_chan, control_chan = channel.receive()
- io_target = io_chan.makefile()
-
- def iothread():
- initial = io.read(1)
- assert initial == '1'.encode('ascii')
- channel.gateway._trace('initializing transfer io for', spec.id)
- io_target.write(initial)
- while True:
- message = Message.from_io(io)
- message.to_io(io_target)
- import threading
- thread = threading.Thread(name='io-forward-'+spec.id,
- target=iothread)
- thread.setDaemon(True)
- thread.start()
-
- def iocallback(data):
- io.write(data)
- io_chan.setcallback(iocallback)
-
+class PseudoSpec:
+ def __init__(self, vars):
+ self.__dict__.update(vars)
+ def __getattr__(self, name):
+ return None
+
+def serve_proxy_io(proxy_channelX):
+ execmodel = proxy_channelX.gateway.execmodel
+ _trace = proxy_channelX.gateway._trace
+ tag = "serve_proxy_io:%s " % proxy_channelX.id
+ def log(*msg):
+ _trace(tag + msg[0], *msg[1:])
+ spec = PseudoSpec(proxy_channelX.receive())
+ # create sub IO object which we will proxy back to our proxy initiator
+ sub_io = create_io(spec, execmodel)
+ control_chan = proxy_channelX.receive()
+ log("got control chan", control_chan)
+
+ # read data from master, forward it to the sub
+ # XXX writing might block, thus blocking the receiver thread
+ def forward_to_sub(data):
+ log("forward data to sub, size %s" % len(data))
+ sub_io.write(data)
+ proxy_channelX.setcallback(forward_to_sub)
def controll(data):
if data==RIO_WAIT:
- control_chan.send(io.wait())
+ control_chan.send(sub_io.wait())
elif data==RIO_KILL:
- control_chan.send(io.kill())
+ control_chan.send(sub_io.kill())
elif data==RIO_REMOTEADDRESS:
- control_chan.send(io.remoteaddress)
+ control_chan.send(sub_io.remoteaddress)
elif data==RIO_CLOSE_WRITE:
- control_chan.send(io.close_write())
+ control_chan.send(sub_io.close_write())
control_chan.setcallback(controll)
+ # write data to the master coming from the sub
+ forward_to_master_file = proxy_channelX.makefile("w")
+
+ # read bootstrap byte from sub, send it on to master
+ log('reading bootstrap byte from sub', spec.id)
+ initial = sub_io.read(1)
+ assert initial == '1'.encode('ascii'), initial
+ log('forwarding bootstrap byte from sub', spec.id)
+ forward_to_master_file.write(initial)
+
+ # enter message forwarding loop
+ while True:
+ try:
+ message = Message.from_io(sub_io)
+ except EOFError:
+ log('EOF from sub, terminating proxying loop', spec.id)
+ break
+ message.to_io(forward_to_master_file)
+ # proxy_channelX will be closed from remote_exec's finalization code
+
if __name__ == "__channelexec__":
- serve_remote_io(channel)
+ serve_proxy_io(channel) # noqa
-import socket
-from execnet.gateway import Gateway
from execnet.gateway_bootstrap import HostNotFound
-import os, sys, inspect
-
+import sys
try: bytes
except NameError: bytes = str
class SocketIO:
-
- error = (socket.error, EOFError)
- def __init__(self, sock):
+ def __init__(self, sock, execmodel):
self.sock = sock
+ self.execmodel = execmodel
+ socket = execmodel.socket
try:
sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
def close_read(self):
try:
self.sock.shutdown(0)
- except socket.error:
+ except self.execmodel.socket.error:
pass
def close_write(self):
try:
self.sock.shutdown(1)
- except socket.error:
+ except self.execmodel.socket.error:
pass
def wait(self):
return realhost, realport
-def create_io(spec, group):
+def create_io(spec, group, execmodel):
assert not spec.python, (
"socket: specifying python executables not yet supported")
gateway_id = spec.installvia
host, port = spec.socket.split(":")
port = int(port)
+ socket = execmodel.socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- io = SocketIO(sock)
+ io = SocketIO(sock, execmodel)
io.remoteaddress = '%s:%d' % (host, port)
try:
sock.connect((host, port))
- except socket.gaierror:
+ except execmodel.socket.gaierror:
raise HostNotFound(str(sys.exc_info()[1]))
return io
"""
Managing Gateway Groups and interactions with multiple channels.
-(c) 2008-2009, Holger Krekel and others
+(c) 2008-2014, Holger Krekel and others
"""
-import os, sys, atexit
-import time
-import execnet
-from execnet.threadpool import WorkerPool
+import sys, atexit
from execnet import XSpec
-from execnet import gateway, gateway_io, gateway_bootstrap
-from execnet.gateway_base import queue, reraise, trace, TimeoutError
+from execnet import gateway_io, gateway_bootstrap
+from execnet.gateway_base import reraise, trace, get_execmodel
+from threading import Lock
NO_ENDMARKER_WANTED = object()
-class Group:
+class Group(object):
""" Gateway Groups. """
defaultspec = "popen"
- def __init__(self, xspecs=()):
- """ initialize group and make gateways as specified. """
- # Gateways may evolve to become GC-collectable
+ def __init__(self, xspecs=(), execmodel="thread"):
+ """ initialize group and make gateways as specified.
+ execmodel can be 'thread' or 'eventlet'.
+ """
self._gateways = []
self._autoidcounter = 0
+ self._autoidlock = Lock()
self._gateways_to_join = []
+ # we use the same execmodel for all of the Gateway objects
+ # we spawn on our side. Probably we should not allow different
+ # execmodels between different groups but not clear.
+ # Note that "other side" execmodels may differ and is typically
+ # specified by the spec passed to makegateway.
+ self.set_execmodel(execmodel)
for xspec in xspecs:
self.makegateway(xspec)
atexit.register(self._cleanup_atexit)
+ @property
+ def execmodel(self):
+ return self._execmodel
+
+ @property
+ def remote_execmodel(self):
+ return self._remote_execmodel
+
+ def set_execmodel(self, execmodel, remote_execmodel=None):
+ """ Set the execution model for local and remote site.
+
+ execmodel can be one of "thread" or "eventlet" (XXX gevent).
+ It determines the execution model for any newly created gateway.
+ If remote_execmodel is not specified it takes on the value
+ of execmodel.
+
+ NOTE: Execution models can only be set before any gateway is created.
+
+ """
+ if self._gateways:
+ raise ValueError("can not set execution models if "
+ "gateways have been created already")
+ if remote_execmodel is None:
+ remote_execmodel = execmodel
+ self._execmodel = get_execmodel(execmodel)
+ self._remote_execmodel = get_execmodel(remote_execmodel)
+
def __repr__(self):
idgateways = [gw.id for gw in self]
return "<Group %r>" %(idgateways)
id=<string> specifies the gateway id
python=<path> specifies which python interpreter to execute
+ execmodel=model 'thread', 'eventlet', 'gevent' model for execution
chdir=<path> specifies to which directory to change
nice=<path> specifies process priority of new process
env:NAME=value specifies a remote environment variable setting.
if not isinstance(spec, XSpec):
spec = XSpec(spec)
self.allocate_id(spec)
+ if spec.execmodel is None:
+ spec.execmodel = self.remote_execmodel.backend
if spec.via:
assert not spec.socket
master = self[spec.via]
- channel = master.remote_exec(gateway_io)
- channel.send(vars(spec))
- io = gateway_io.RemoteIO(channel)
- gw = gateway_bootstrap.bootstrap(io, spec)
+ proxy_channel = master.remote_exec(gateway_io)
+ proxy_channel.send(vars(spec))
+ proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel)
+ gw = gateway_bootstrap.bootstrap(proxy_io_master, spec)
elif spec.popen or spec.ssh:
- io = gateway_io.create_io(spec)
+ io = gateway_io.create_io(spec, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
elif spec.socket:
from execnet import gateway_socket
- io = gateway_socket.create_io(spec, self)
+ io = gateway_socket.create_io(spec, self, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
else:
raise ValueError("no gateway type found for %r" % (spec._spec,))
return gw
def allocate_id(self, spec):
- """ allocate id for the given xspec object. """
+ """ (re-entrant) allocate id for the given xspec object. """
if spec.id is None:
- id = "gw" + str(self._autoidcounter)
- self._autoidcounter += 1
- if id in self:
- raise ValueError("already have gateway with id %r" %(id,))
- spec.id = id
+ with self._autoidlock:
+ id = "gw" + str(self._autoidcounter)
+ self._autoidcounter += 1
+ if id in self:
+ raise ValueError("already have gateway with id %r" %(id,))
+ spec.id = id
def _register(self, gateway):
assert not hasattr(gateway, '_group')
"""
while self:
- from execnet.threadpool import WorkerPool
vias = {}
for gw in self:
if gw.spec.via:
trace("Gateways did not come down after timeout: %r" % gw)
gw._io.kill()
- safe_terminate(timeout, [
+ safe_terminate(self.execmodel, timeout, [
(lambda: join_wait(gw), lambda: kill(gw))
for gw in self._gateways_to_join])
self._gateways_to_join[:] = []
try:
return self._queue
except AttributeError:
- self._queue = queue.Queue()
+ self._queue = None
for ch in self._channels:
+ if self._queue is None:
+ self._queue = ch.gateway.execmodel.queue.Queue()
def putreceived(obj, channel=ch):
self._queue.put((channel, obj))
if endmarker is NO_ENDMARKER_WANTED:
-def safe_terminate(timeout, list_of_paired_functions):
- workerpool = WorkerPool(len(list_of_paired_functions)*2)
+def safe_terminate(execmodel, timeout, list_of_paired_functions):
+ workerpool = execmodel.WorkerPool()
def termkill(termfunc, killfunc):
- termreply = workerpool.dispatch(termfunc)
+ termreply = workerpool.spawn(termfunc)
try:
termreply.get(timeout=timeout)
except IOError:
replylist = []
for termfunc, killfunc in list_of_paired_functions:
- reply = workerpool.dispatch(termkill, termfunc, killfunc)
+ reply = workerpool.spawn(termkill, termfunc, killfunc)
replylist.append(reply)
for reply in replylist:
reply.get()
- workerpool.shutdown()
- workerpool.join()
+ workerpool.waitall()
default_group = Group()
makegateway = default_group.makegateway
+set_execmodel = default_group.set_execmodel
"""
-(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
+(c) 2006-2013, Armin Rigo, Holger Krekel, Maciej Fijalkowski
"""
def serve_rsync(channel):
import os, stat, shutil
channel.send(("done", None))
if __name__ == '__channelexec__':
- serve_rsync(channel)
+ serve_rsync(channel) # noqa
clientsock
except NameError:
print("client side starting")
- import sys
host, port = sys.argv[1].split(':')
port = int(port)
myself = open(os.path.abspath(sys.argv[0]), 'rU').read()
sys.stderr=olderr
clientfile.flush()
except EOFError:
- e = sys.exc_info()[1]
+ #e = sys.exc_info()[1]
sys.stderr.write("connection close, prompt thread returns")
break
#print >>sys.stdout, "".join(apply(format_exception,sys.exc_info()))
self.clientsock.close()
-prompter = promptagent(clientsock)
+prompter = promptagent(clientsock) # noqa
prompter.start()
print("promptagent - thread started")
progname = 'socket_readline_exec_server-1.2'
-import sys, socket, os
-try:
- import fcntl
-except ImportError:
- fcntl = None
+import sys, os
+
+def get_fcntl():
+ try:
+ import fcntl
+ except ImportError:
+ fcntl = None
+ return fcntl
+
+fcntl = get_fcntl()
debug = 0
# rstrip so that we can use \r\n for telnet testing
source = clientfile.readline().rstrip()
clientfile.close()
- g = {'clientsock' : clientsock, 'address' : address}
+ g = {'clientsock' : clientsock, 'address' : address, 'execmodel': execmodel}
source = eval(source)
if source:
co = compile(source+'\n', source, 'exec')
print_(progname, 'compiled source, executing')
try:
- exec_(co, g)
+ exec_(co, g) # noqa
finally:
print_(progname, 'finished executing code')
# background thread might hold a reference to this (!?)
#clientsock.close()
-def bind_and_listen(hostport):
+def bind_and_listen(hostport, execmodel):
+ socket = execmodel.socket
if isinstance(hostport, str):
host, port = hostport.split(':')
hostport = (host, int(port))
hostport = sys.argv[1]
else:
hostport = ':8888'
- serversock = bind_and_listen(hostport)
+ from execnet.gateway_base import get_execmodel
+ execmodel = get_execmodel("thread")
+ serversock = bind_and_listen(hostport, execmodel)
startserver(serversock, loop=False)
+
elif __name__=='__channelexec__':
- bindname = channel.receive()
- sock = bind_and_listen(bindname)
+ execmodel = channel.gateway.execmodel # noqa
+ bindname = channel.receive() # noqa
+ sock = bind_and_listen(bindname, execmodel)
port = sock.getsockname()
- channel.send(port)
+ channel.send(port) # noqa
startserver(sock)
"""
import sys
-import os
-import time
import win32serviceutil
import win32service
import win32event
+++ /dev/null
-"""
-dispatching execution to threads
-
-(c) 2009, holger krekel
-"""
-import threading
-import time
-import sys
-
-# py2/py3 compatibility
-try:
- import queue
-except ImportError:
- import Queue as queue
-if sys.version_info >= (3,0):
- exec ("def reraise(cls, val, tb): raise val")
-else:
- exec ("def reraise(cls, val, tb): raise cls, val, tb")
-
-ERRORMARKER = object()
-
-class Reply(object):
- """ reply instances provide access to the result
- of a function execution that got dispatched
- through WorkerPool.dispatch()
- """
- _excinfo = None
- def __init__(self, task):
- self.task = task
- self._queue = queue.Queue()
-
- def _set(self, result):
- self._queue.put(result)
-
- def _setexcinfo(self, excinfo):
- self._excinfo = excinfo
- self._queue.put(ERRORMARKER)
-
- def get(self, timeout=None):
- """ get the result object from an asynchronous function execution.
- if the function execution raised an exception,
- then calling get() will reraise that exception
- including its traceback.
- """
- if self._queue is None:
- raise EOFError("reply has already been delivered")
- try:
- result = self._queue.get(timeout=timeout)
- except queue.Empty:
- raise IOError("timeout waiting for %r" %(self.task, ))
- if result is ERRORMARKER:
- self._queue = None
- excinfo = self._excinfo
- reraise(excinfo[0], excinfo[1], excinfo[2])
- return result
-
-class WorkerThread(threading.Thread):
- def __init__(self, pool):
- threading.Thread.__init__(self)
- self._queue = queue.Queue()
- self._pool = pool
- self.setDaemon(1)
-
- def _run_once(self):
- reply = self._queue.get()
- if reply is SystemExit:
- return False
- assert self not in self._pool._ready
- task = reply.task
- try:
- func, args, kwargs = task
- result = func(*args, **kwargs)
- except (SystemExit, KeyboardInterrupt):
- return False
- except:
- reply._setexcinfo(sys.exc_info())
- else:
- reply._set(result)
- # at this point, reply, task and all other local variables go away
- return True
-
- def run(self):
- try:
- while self._run_once():
- self._pool._ready[self] = True
- finally:
- del self._pool._alive[self]
- try:
- del self._pool._ready[self]
- except KeyError:
- pass
-
- def send(self, task):
- reply = Reply(task)
- self._queue.put(reply)
- return reply
-
- def stop(self):
- self._queue.put(SystemExit)
-
-class WorkerPool(object):
- """ A WorkerPool allows to dispatch function executions
- to threads. Each Worker Thread is reused for multiple
- function executions. The dispatching operation
- takes care to create and dispatch to existing
- threads.
-
- You need to call shutdown() to signal
- the WorkerThreads to terminate and join()
- in order to wait until all worker threads
- have terminated.
- """
- _shuttingdown = False
- def __init__(self, maxthreads=None):
- """ init WorkerPool instance which may
- create up to `maxthreads` worker threads.
- """
- self.maxthreads = maxthreads
- self._ready = {}
- self._alive = {}
-
- def dispatch(self, func, *args, **kwargs):
- """ return Reply object for the asynchronous dispatch
- of the given func(*args, **kwargs) in a
- separate worker thread.
- """
- if self._shuttingdown:
- raise IOError("WorkerPool is already shutting down")
- try:
- thread, _ = self._ready.popitem()
- except KeyError: # pop from empty list
- if self.maxthreads and len(self._alive) >= self.maxthreads:
- raise IOError("can't create more than %d threads." %
- (self.maxthreads,))
- thread = self._newthread()
- return thread.send((func, args, kwargs))
-
- def _newthread(self):
- thread = WorkerThread(self)
- self._alive[thread] = True
- thread.start()
- return thread
-
- def shutdown(self):
- """ signal all worker threads to terminate.
- call join() to wait until all threads termination.
- """
- if not self._shuttingdown:
- self._shuttingdown = True
- for t in list(self._alive):
- t.stop()
-
- def join(self, timeout=None):
- """ wait until all worker threads have terminated. """
- current = threading.currentThread()
- deadline = delta = None
- if timeout is not None:
- deadline = time.time() + timeout
- for thread in list(self._alive):
- if deadline:
- delta = deadline - time.time()
- if delta <= 0:
- raise IOError("timeout while joining threads")
- thread.join(timeout=delta)
- if thread.isAlive():
- raise IOError("timeout while joining threads")
-
-if __name__ == '__channelexec__':
- maxthreads = channel.receive()
- execpool = WorkerPool(maxthreads=maxthreads)
- gw = channel.gateway
- channel.send("ok")
- gw._trace("instantiated thread work pool maxthreads=%s" %(maxthreads,))
- while 1:
- gw._trace("waiting for new exec task")
- task = gw._execqueue.get()
- if task is None:
- gw._trace("thread-dispatcher got None, exiting")
- execpool.shutdown()
- execpool.join()
- raise gw._StopExecLoop
- gw._trace("dispatching exec task to thread pool")
- execpool.dispatch(gw.executetask, task)
"""
-(c) 2008-2009, holger krekel
+(c) 2008-2013, holger krekel
"""
-import execnet
-
class XSpec:
""" Execution Specification: key1=value1//key2=value2 ...
* keys need to be unique within the specification scope
* if no "=value" is given, assume a boolean True value
"""
# XXX allow customization, for only allow specific key names
- popen = ssh = socket = python = chdir = nice = dont_write_bytecode = None
+ popen = ssh = socket = python = chdir = nice = \
+ dont_write_bytecode = execmodel = None
def __init__(self, string):
self._spec = string