elif method == 'DELETE':
cls._resp.status_code = 204
return thread.res_task['ret_value']
- raise Exception(thread.res_task['exception'])
+ else:
+ if 'status' in thread.res_task['exception']:
+ cls._resp.status_code = thread.res_task['exception']['status']
+ else:
+ cls._resp.status_code = 500
+ return thread.res_task['exception']
@classmethod
def _task_post(cls, url, data=None, timeout=60):
res = self.create_image('rbd', 'test_rbd_twice', 10240)
res = self.create_image('rbd', 'test_rbd_twice', 10240)
- self.assertEqual(res, {"errno": 17,
+ self.assertStatus(409)
+ self.assertEqual(res, {"errno": 17, "status": 409, "component": "rbd",
"detail": "[errno 17] error creating image"})
self.remove_image('rbd', 'test_rbd_twice')
self.assertStatus(204)
def test_delete_non_existent_image(self):
res = self.remove_image('rbd', 'i_dont_exist')
- self.assertEqual(res, {"errno": 2,
+ self.assertStatus(409)
+ self.assertEqual(res, {"errno": 2, "status": 409, "component": "rbd",
"detail": "[errno 2] error removing image"})
def test_image_delete(self):
md[k] = arg_map[v[1:-1]]
else:
md[k] = v
- task = TaskManager.run(self.name, md, func, args, kwargs)
+ task = TaskManager.run(self.name, md, func, args, kwargs,
+ exception_handler=self.exception_handler)
try:
status, value = task.wait(self.wait_for)
except Exception as ex:
- if self.exception_handler:
- return self.exception_handler(ex)
+ if task.ret_value:
+ # exception was handled by task.exception_handler
+ if 'status' in task.ret_value:
+ status = task.ret_value['status']
+ else:
+ status = 500
+ cherrypy.response.status = status
+ return task.ret_value
raise ex
if status == TaskManager.VALUE_EXECUTING:
cherrypy.response.status = 202
import math
import cherrypy
+import rados
import rbd
from . import ApiController, AuthRequired, RESTController, Task
# pylint: disable=inconsistent-return-statements
def _rbd_exception_handler(ex):
if isinstance(ex, rbd.OSError):
- cherrypy.response.status = 409
- return {'detail': str(ex), 'errno': ex.errno}
+ return {'status': 409, 'detail': str(ex), 'errno': ex.errno,
+ 'component': 'rbd'}
+ elif isinstance(ex, rados.OSError):
+ return {'status': 409, 'detail': str(ex), 'errno': ex.errno,
+ 'component': 'rados'}
raise ex
logger.info("task (%s, %s) finished", task_name, task_metadata)
if thread.res_task['success']:
self.body = json.dumps(thread.res_task['ret_value'])
+ if method == 'POST':
+ self.status = '201 Created'
+ elif method == 'PUT':
+ self.status = '200 OK'
+ elif method == 'DELETE':
+ self.status = '204 No Content'
+ return
+ else:
+ if 'status' in thread.res_task['exception']:
+ self.status = thread.res_task['exception']['status']
+ else:
+ self.status = 500
+ self.body = json.dumps(thread.rest_task['exception'])
return
- raise Exception(thread.res_task['exception'])
def _task_post(self, url, data=None, timeout=60):
self._task_request('POST', url, data, timeout)
self.finish(result, None)
# pylint: disable=too-many-arguments
- def __init__(self, op_seconds, wait=False, fail=False, progress=50, is_async=False):
+ def __init__(self, op_seconds, wait=False, fail=False, progress=50,
+ is_async=False, handle_ex=False):
self.op_seconds = op_seconds
self.wait = wait
self.fail = fail
self.progress = progress
self.is_async = is_async
+ self.handle_ex = handle_ex
self._event = threading.Event()
+ def _handle_exception(self, ex):
+ return {'status': 409, 'detail': str(ex)}
+
def run(self, ns, timeout=None):
args = ['dummy arg']
kwargs = {'dummy': 'arg'}
+ h_ex = self._handle_exception if self.handle_ex else None
if not self.is_async:
task = TaskManager.run(
- ns, self.metadata(), self.task_op, args, kwargs)
+ ns, self.metadata(), self.task_op, args, kwargs,
+ exception_handler=h_ex)
else:
task = TaskManager.run(
ns, self.metadata(), self.task_async_op, args, kwargs,
- executor=MyTask.CallbackExecutor(self.fail, self.progress))
+ executor=MyTask.CallbackExecutor(self.fail, self.progress),
+ exception_handler=h_ex)
return task.wait(timeout)
def task_op(self, *args, **kwargs):
'wait': self.wait,
'fail': self.fail,
'progress': self.progress,
- 'is_async': self.is_async
+ 'is_async': self.is_async,
+ 'handle_ex': self.handle_ex
}
self.assertEqual(fn_t[0]['progress'], 50)
self.assertFalse(fn_t[0]['success'])
self.assertIsNotNone(fn_t[0]['exception'])
- self.assertEqual(fn_t[0]['exception'], "Task Unexpected Exception")
+ self.assertEqual(fn_t[0]['exception'],
+ {"detail": "Task Unexpected Exception"})
+
+ def test_task_serialization_format_on_failure_with_handler(self):
+ task1 = MyTask(1, fail=True, handle_ex=True)
+ task1.run('test15/task1', 0.5)
+ self.wait_for_task('test15/task1')
+ ex_t, fn_t = TaskManager.list_serializable('test15/*')
+ self.assertEqual(len(ex_t), 0)
+ self.assertEqual(len(fn_t), 1)
+ # validate finished tasks attributes
+
+ try:
+ json.dumps(fn_t)
+ except TypeError as ex:
+ self.fail("Failed to serialize finished tasks: {}".format(str(ex)))
+
+ self.assertEqual(len(fn_t[0].keys()), 9)
+ self.assertEqual(fn_t[0]['name'], 'test15/task1')
+ self.assertEqual(fn_t[0]['metadata'], task1.metadata())
+ self.assertIsNotNone(fn_t[0]['begin_time'])
+ self.assertIsNotNone(fn_t[0]['end_time'])
+ self.assertGreaterEqual(fn_t[0]['duration'], 1.0)
+ self.assertEqual(fn_t[0]['progress'], 50)
+ self.assertFalse(fn_t[0]['success'])
+ self.assertIsNotNone(fn_t[0]['exception'])
+ self.assertEqual(fn_t[0]['exception'],
+ {"status": 409,
+ "detail": "Task Unexpected Exception"})
cls._finished_tasks.append(task)
@classmethod
- def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None):
+ def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None,
+ exception_handler=None):
if not args:
args = []
if not kwargs:
kwargs = {}
if not executor:
executor = ThreadedExecutor()
- task = Task(name, metadata, fn, args, kwargs, executor)
+ task = Task(name, metadata, fn, args, kwargs, executor,
+ exception_handler)
with cls._lock:
if task in cls._executing_tasks:
logger.debug("TM: task already executing: %s", task)
'duration': t.duration,
'progress': t.progress,
'success': not t.exception,
- 'ret_value': t.ret_value,
- 'exception': str(t.exception) if t.exception else None
+ 'ret_value': t.ret_value if not t.exception else None,
+ 'exception': t.ret_value if t.exception and t.ret_value else (
+ {'detail': str(t.exception)} if t.exception else None)
} for t in fn_t]
class Task(object):
- def __init__(self, name, metadata, fn, args, kwargs, executor):
+ def __init__(self, name, metadata, fn, args, kwargs, executor,
+ exception_handler=None):
self.name = name
self.metadata = metadata
self.fn = fn
self.fn_args = args
self.fn_kwargs = kwargs
self.executor = executor
+ self.ex_handler = exception_handler
self.running = False
self.event = threading.Event()
self.progress = None
def _complete(self, ret_value, exception=None):
now = time.time()
+ if exception and self.ex_handler:
+ # pylint: disable=broad-except
+ try:
+ ret_value = self.ex_handler(exception)
+ except Exception as ex:
+ exception = ex
with self.lock:
assert self.running, "_complete cannot be called before _run"
self.end_time = now