import json
import logging
from collections import namedtuple
+import threading
+import time
import requests
import six
log.info("request %s to %s", method, url)
if method == 'GET':
cls._resp = cls._session.get(url, params=params)
- try:
- return cls._resp.json()
- except ValueError as ex:
- log.exception("Failed to decode response: %s", cls._resp.text)
- raise ex
elif method == 'POST':
cls._resp = cls._session.post(url, json=data, params=params)
elif method == 'DELETE':
cls._resp = cls._session.put(url, json=data, params=params)
else:
assert False
- return None
+ try:
+ if cls._resp.text and cls._resp.text != "":
+ return cls._resp.json()
+ return cls._resp.text
+ except ValueError as ex:
+ log.exception("Failed to decode response: %s", cls._resp.text)
+ raise ex
@classmethod
def _get(cls, url, params=None):
return cls._request(url, 'GET', params=params)
@classmethod
- def _get_view_cache(cls, url, retries=5):
+ def _view_cache_get(cls, url, retries=5):
retry = True
while retry and retries > 0:
retry = False
def _put(cls, url, data=None, params=None):
cls._request(url, 'PUT', data, params)
+ @classmethod
+ def _assertEq(cls, v1, v2):
+ if not v1 == v2:
+ raise Exception("assertion failed: {} != {}".format(v1, v2))
+
+ @classmethod
+ def _assertIn(cls, v1, v2):
+ if v1 not in v2:
+ raise Exception("assertion failed: {} not in {}".format(v1, v2))
+
+ @classmethod
+ def _assertIsInst(cls, v1, v2):
+ if not isinstance(v1, v2):
+ raise Exception("assertion failed: {} not instance of {}".format(v1, v2))
+
+ # pylint: disable=too-many-arguments
+ @classmethod
+ def _task_post(cls, url, task_name, task_metadata, data=None,
+ timeout=60):
+ res = cls._post(url, data)
+ cls._assertIn(cls._resp.status_code, [200, 201])
+ cls._assertIsInst(res, dict)
+ cls._assertIn('status', res)
+ cls._assertIn(res['status'], ['done', 'executing'])
+ if res['status'] == 'done':
+ cls._assertIn('value', res)
+ log.info("task (%s, %s) finished immediately", task_name,
+ task_metadata)
+ return res['value']
+
+ class Waiter(threading.Thread):
+ def __init__(self, task_name, task_metadata):
+ super(Waiter, self).__init__()
+ self.task_name = task_name
+ self.task_metadata = task_metadata
+ self.ev = threading.Event()
+ self.abort = False
+ self.res_task = None
+
+ def run(self):
+ running = True
+ while running and not self.abort:
+ log.info("task (%s, %s) is still executing", self.task_name,
+ self.task_metadata)
+ time.sleep(1)
+ res = cls._get('/api/task?name={}'.format(self.task_name))
+ for task in res['finished_tasks']:
+ if task['metadata'] == self.task_metadata:
+ # task finished
+ running = False
+ self.res_task = task
+ self.ev.set()
+
+ thread = Waiter(task_name, task_metadata)
+ thread.start()
+ status = thread.ev.wait(timeout)
+ if not status:
+ # timeout expired
+ thread.abort = True
+ thread.join()
+ raise Exception("Waiting for task ({}, {}) to finish timed out"
+ .format(task_name, task_metadata))
+ log.info("task (%s, %s) finished", task_name, task_metadata)
+ if thread.res_task['success']:
+ return thread.res_task['ret_value']
+ raise Exception(thread.res_task['exception'])
+
@classmethod
def cookies(cls):
return cls._resp.cookies
'application_metadata': application})
@classmethod
- def create_image(cls, name, pool, size):
- cls._post('/api/rbd', {'name': name, 'pool_name': pool, 'size': size})
+ def create_image(cls, name, pool, size, **kwargs):
+ data = {'name': name, 'pool_name': pool, 'size': size}
+ data.update(kwargs)
+ return cls._task_post('/api/rbd', 'rbd/create',
+ {'pool_name': pool, 'image_name': name}, data)
@classmethod
def setUpClass(cls):
self.assertEqual(snap[k], v)
def test_list(self):
- data = self._get_view_cache('/api/rbd')
+ data = self._view_cache_get('/api/rbd')
self.assertStatus(200)
self.assertEqual(len(data), 2)
def test_create(self):
rbd_name = 'test_rbd'
- data = {'pool_name': 'rbd',
- 'name': rbd_name,
- 'size': 10240}
- self._post('/api/rbd', data)
- self.assertStatus(201)
- self.assertJsonBody({"success": True})
+ res = self.create_image(rbd_name, 'rbd', 10240)
+ self.assertEqual(res, {"success": True})
img = self._get('/api/rbd/rbd/test_rbd')
self.assertStatus(200)
self._ceph_cmd(['osd', 'pool', 'set', 'data_pool', 'allow_ec_overwrites', 'true'])
rbd_name = 'test_rbd_in_data_pool'
- data = {'pool_name': 'rbd',
- 'name': rbd_name,
- 'size': 10240,
- 'data_pool': 'data_pool'}
- self._post('/api/rbd', data)
- self.assertStatus(201)
- self.assertJsonBody({"success": True})
+ res = self.create_image(rbd_name, 'rbd', 10240, data_pool='data_pool')
+ self.assertEqual(res, {"success": True})
img = self._get('/api/rbd/rbd/test_rbd_in_data_pool')
self.assertStatus(200)
'--yes-i-really-really-mean-it'])
def test_create_rbd_twice(self):
- data = {'pool_name': 'rbd',
- 'name': 'test_rbd_twice',
- 'size': 10240}
- self._post('/api/rbd', data)
- self.assertStatus(201)
-
- self._post('/api/rbd', data)
- self.assertStatus(400)
- self.assertJsonBody({"success": False, "errno": 17,
- "detail": "[errno 17] error creating image"})
+ res = self.create_image('test_rbd_twice', 'rbd', 10240)
+
+ res = self.create_image('test_rbd_twice', 'rbd', 10240)
+ self.assertEqual(res, {"success": False, "errno": 17,
+ "detail": "[errno 17] error creating image"})
self._rbd_cmd(['rm', 'rbd/test_rbd_twice'])
def test_snapshots_and_clone_info(self):