def test_delete_non_existent_image(self):
res = self.remove_image('rbd', 'i_dont_exist')
- self.assertStatus(409)
- self.assertEqual(res, {"errno": 2, "status": 409, "component": "rbd",
+ self.assertStatus(400)
+ self.assertEqual(res, {"errno": 2, "status": 400, "component": "rbd",
"detail": "[errno 2] error removing image"})
def test_image_delete(self):
if 'status' in task.ret_value:
status = task.ret_value['status']
else:
- status = 500
+ status = getattr(ex, 'status', 500)
cherrypy.response.status = status
return task.ret_value
raise ex
cherrypy.response.status = 202
return {'name': self.name, 'metadata': md}
return value
- wrapper.__wrapped__ = func
return wrapper
# -*- coding: utf-8 -*-
from __future__ import absolute_import
+import time
+
import cherrypy
import rados
from ..services.ceph_service import SendCommandError
-from ..controllers import RESTController, ApiController
+from ..controllers import RESTController, ApiController, Task
from .helper import ControllerTestCase
from ..services.exception import handle_rados_error, handle_send_command_error, \
serialize_dashboard_exception
-from ..tools import ViewCache, TaskManager
+from ..tools import ViewCache, TaskManager, NotificationQueue
# pylint: disable=W0613
@ApiController('foo')
class FooResource(RESTController):
+
@cherrypy.expose
@cherrypy.tools.json_out()
@handle_rados_error('foo')
def vc_no_data(self):
@ViewCache(timeout=0)
def _no_data():
- import time
time.sleep(0.2)
_no_data()
class RESTControllerTest(ControllerTestCase):
-
@classmethod
def setup_server(cls):
+ NotificationQueue.start_queue()
+ TaskManager.init()
cls.setup_controllers([FooResource])
def test_no_exception(self):
{'detail': '[errno -42] hi', 'code': "42", 'component': 'foo'}
)
+ def test_task_exception(self):
+ self._get('/foo/task_exception')
+ self.assertStatus(400)
+ self.assertJsonBody(
+ {'detail': '[errno -42] hi', 'code': "42", 'component': 'foo'}
+ )
+
+ self._get('/foo/wait_task_exception')
+ while self.jsonBody():
+ time.sleep(0.5)
+ self._get('/foo/wait_task_exception')
+
def test_internal_server_error(self):
self._get('/foo/internal_server_error')
self.assertStatus(500)
state, result = task1.run('test5/task1', 0.5)
self.assertEqual(state, TaskManager.VALUE_EXECUTING)
self.assertIsNone(result)
- ex_t, _ = TaskManager.list()
+ ex_t, _ = TaskManager.list('test5/*')
self.assertEqual(len(ex_t), 1)
self.assertEqual(ex_t[0].name, 'test5/task1')
self.assertEqual(ex_t[0].progress, 30)
self.assertEqual(task.progress, 60)
task2.resume()
self.wait_for_task('test5/task2')
- ex_t, _ = TaskManager.list()
+ ex_t, _ = TaskManager.list('test5/*')
self.assertEqual(len(ex_t), 1)
self.assertEqual(ex_t[0].name, 'test5/task1')
task1.resume()
self.wait_for_task('test5/task1')
- ex_t, _ = TaskManager.list()
+ ex_t, _ = TaskManager.list('test5/*')
self.assertEqual(len(ex_t), 0)
def test_task_idempotent(self):
state, result = task1.run('test6/task1', 0.5)
self.assertEqual(state, TaskManager.VALUE_EXECUTING)
self.assertIsNone(result)
- ex_t, _ = TaskManager.list()
+ ex_t, _ = TaskManager.list('test6/*')
self.assertEqual(len(ex_t), 1)
self.assertEqual(ex_t[0].name, 'test6/task1')
state, result = task1_clone.run('test6/task1', 0.5)
self.assertEqual(state, TaskManager.VALUE_EXECUTING)
self.assertIsNone(result)
- ex_t, _ = TaskManager.list()
+ ex_t, _ = TaskManager.list('test6/*')
self.assertEqual(len(ex_t), 1)
self.assertEqual(ex_t[0].name, 'test6/task1')
task1.resume()
return "Task(ns={}, md={})" \
.format(self.name, self.metadata)
+ def __repr__(self):
+ return str(self)
+
def _run(self):
with self.lock:
assert not self.running