import unittest
import time
import logging
+from io import StringIO
from teuthology.exceptions import CommandFailedError
class TestTimeoutError(RuntimeError):
pass
-class CephTestCase(unittest.TestCase):
+class RunCephCmd:
+
+ def run_ceph_cmd(self, *args, **kwargs):
+ if kwargs.get('args') is None and args:
+ if len(args) == 1:
+ args = args[0]
+ kwargs['args'] = args
+ return self.mon_manager.run_cluster_cmd(**kwargs)
+
+ def get_ceph_cmd_result(self, *args, **kwargs):
+ if kwargs.get('args') is None and args:
+ if len(args) == 1:
+ args = args[0]
+ kwargs['args'] = args
+ return self.run_ceph_cmd(**kwargs).exitstatus
+
+ def get_ceph_cmd_stdout(self, *args, **kwargs):
+ if kwargs.get('args') is None and args:
+ if len(args) == 1:
+ args = args[0]
+ kwargs['args'] = args
+ kwargs['stdout'] = kwargs.pop('stdout', StringIO())
+ return self.run_ceph_cmd(**kwargs).stdout.getvalue()
+
+
+class CephTestCase(unittest.TestCase, RunCephCmd):
"""
For test tasks that want to define a structured set of
tests implemented in python. Subclass this with appropriate
# their special needs. If not met, tests will be skipped.
REQUIRE_MEMSTORE = False
+ def _init_mon_manager(self):
+ # if vstart_runner.py has invoked this code
+ if 'Local' in str(type(self.ceph_cluster)):
+ from tasks.vstart_runner import LocalCephManager
+ self.mon_manager = LocalCephManager(ctx=self.ctx)
+ # else teuthology has invoked this code
+ else:
+ from tasks.ceph_manager import CephManager
+ self.mon_manager = CephManager(self.ceph_cluster.admin_remote,
+ ctx=self.ctx, logger=log.getChild('ceph_manager'))
+
def setUp(self):
self._mon_configs_set = set()
+ self._init_mon_manager()
+ self.admin_remote = self.ceph_cluster.admin_remote
+
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Starting test {0}".format(self.id()))
import logging
import os
import re
-from io import StringIO
from tasks.ceph_test_case import CephTestCase
mntobj.hostfs_mntpt = self.hostfs_mntpt
-class RunCephCmd:
-
- def run_ceph_cmd(self, *args, **kwargs):
- if kwargs.get('args') is None and args:
- if len(args) == 1:
- args = args[0]
- kwargs['args'] = args
- return self.mon_manager.run_cluster_cmd(**kwargs)
-
- def get_ceph_cmd_result(self, *args, **kwargs):
- if kwargs.get('args') is None and args:
- if len(args) == 1:
- args = args[0]
- kwargs['args'] = args
- return self.run_ceph_cmd(**kwargs).exitstatus
-
- def get_ceph_cmd_stdout(self, *args, **kwargs):
- if kwargs.get('args') is None and args:
- if len(args) == 1:
- args = args[0]
- kwargs['args'] = args
- kwargs['stdout'] = kwargs.pop('stdout', StringIO())
- return self.run_ceph_cmd(**kwargs).stdout.getvalue()
-
-
-class CephFSTestCase(CephTestCase, RunCephCmd):
+class CephFSTestCase(CephTestCase):
"""
Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
for addr, blocklisted_at in blacklist.items():
self.run_ceph_cmd("osd", "blacklist", "rm", addr)
- def _init_mon_manager(self):
- # if vstart_runner.py has invoked this code
- if 'Local' in str(type(self.ceph_cluster)):
- from tasks.vstart_runner import LocalCephManager
- self.mon_manager = LocalCephManager(ctx=self.ctx)
- # else teuthology has invoked this code
- else:
- from tasks.ceph_manager import CephManager
- self.mon_manager = CephManager(self.ceph_cluster.admin_remote,
- ctx=self.ctx, logger=log.getChild('ceph_manager'))
-
def setUp(self):
super(CephFSTestCase, self).setUp()
- self._init_mon_manager()
- self.admin_remote = self.ceph_cluster.admin_remote
self.config_set('mon', 'mon_allow_pool_delete', True)
from tasks.ceph_manager import write_conf
from tasks.ceph_manager import CephManager
-from tasks.cephfs.cephfs_test_case import RunCephCmd
+from tasks.ceph_test_case import RunCephCmd
log = logging.getLogger(__name__)