import logging
+import sys
import gevent.pool
import gevent.queue
log = logging.getLogger(__name__)
+class ExceptionHolder(object):
+ def __init__(self, exc_info):
+ self.exc_info = exc_info
+
+def capture_traceback(func, *args, **kwargs):
+ """
+ Utility function to capture tracebacks of any exception func
+ raises.
+ """
+ try:
+ return func(*args, **kwargs)
+ except Exception:
+ return ExceptionHolder(sys.exc_info())
+
+def resurrect_traceback(exc):
+ if isinstance(exc, ExceptionHolder):
+ exc_info = exc.exc_info
+ elif isinstance(exc, BaseException):
+ print type(exc)
+ exc_info = (type(exc), exc, None)
+ else:
+ return
+
+ raise exc_info[0], exc_info[1], exc_info[2]
+
class parallel(object):
"""
This class is a context manager for running functions in parallel.
def spawn(self, func, *args, **kwargs):
self.count += 1
self.any_spawned = True
- greenlet = self.group.spawn(func, *args, **kwargs)
+ greenlet = self.group.spawn(capture_traceback, func, *args, **kwargs)
greenlet.link(self._finish)
def __enter__(self):
if not self.any_spawned or self.iteration_stopped:
raise StopIteration()
result = self.results.get()
- if isinstance(result, BaseException):
- if isinstance(result, StopIteration):
- self.iteration_stopped = True
- raise result
+
+ try:
+ resurrect_traceback(result)
+ except StopIteration:
+ self.iteration_stopped = True
+ raise
+
return result
def _finish(self, greenlet):