import logging
import shutil
+from ..contextutil import safe_while
+
log = logging.getLogger(__name__)
class RemoteProcess(object):
class Sentinel(object):
"""
- Sentinel -- used to define PIPE file-like object.
+ Sentinel -- used to define PIPE file-like object.
"""
def __init__(self, name):
self.name = name
return r
-def wait(processes):
+def wait(processes, timeout=None):
"""
Wait for all given processes to exit.
Raise if any one of them fails.
+
+ Optionally, timeout after 'timeout' seconds.
"""
+ if timeout and timeout > 0:
+ with safe_while(sleep=5, increment=0, tries=(timeout/5)) as check_time:
+ not_ready = list(processes)
+ while len(not_ready) > 0:
+ check_time()
+ for proc in not_ready:
+ if proc.ready():
+ not_ready.remove(proc)
+
for proc in processes:
assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
proc.exitstatus.get()
yield
finally:
log.info('joining radosbench')
- run.wait(radosbench.itervalues())
+ run.wait(radosbench.itervalues(), timeout=900)
if pool is not 'data':
ctx.manager.remove_pool(pool)