From a9df9e088d4ba0af304fa79b623cbbf71ad09c97 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 1 Nov 2016 12:21:41 +0100 Subject: [PATCH] tasks/cephfs: refactor test case class A more generic CephTestCase and CephCluster, for writeing non-cephfs test cases. This avoids overloading one class with the functionality needed by lots of different subsystems. Signed-off-by: John Spray --- tasks/ceph_test_case.py | 136 ++++++++++++++++++++++++++++ tasks/cephfs/cephfs_test_case.py | 121 +------------------------ tasks/cephfs/filesystem.py | 85 +++++++++-------- tasks/{cephfs => }/vstart_runner.py | 42 ++++++--- 4 files changed, 212 insertions(+), 172 deletions(-) create mode 100644 tasks/ceph_test_case.py rename tasks/{cephfs => }/vstart_runner.py (97%) diff --git a/tasks/ceph_test_case.py b/tasks/ceph_test_case.py new file mode 100644 index 0000000000000..d9eefe46d1faf --- /dev/null +++ b/tasks/ceph_test_case.py @@ -0,0 +1,136 @@ + +import unittest +import time +import logging + +from teuthology.orchestra.run import CommandFailedError + +log = logging.getLogger(__name__) + + +class CephTestCase(unittest.TestCase): + """ + For test tasks that want to define a structured set of + tests implemented in python. Subclass this with appropriate + helpers for the subsystem you're testing. + """ + + # Environment references + mounts = None + fs = None + mds_cluster = None + mgr_cluster = None + ctx = None + + mon_manager = None + + def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10): + """ + Context manager. Assert that during execution, or up to 5 seconds later, + the Ceph cluster log emits a message matching the expected pattern. + + :param expected_pattern: a string that you expect to see in the log output + """ + + ceph_manager = self.fs.mon_manager + + class ContextManager(object): + def match(self): + found = expected_pattern in self.watcher_process.stdout.getvalue() + if invert_match: + return not found + + return found + + def __enter__(self): + self.watcher_process = ceph_manager.run_ceph_w() + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.watcher_process.finished: + # Check if we got an early match, wait a bit if we didn't + if self.match(): + return + else: + log.debug("No log hits yet, waiting...") + # Default monc tick interval is 10s, so wait that long and + # then some grace + time.sleep(5 + timeout) + + self.watcher_process.stdin.close() + try: + self.watcher_process.wait() + except CommandFailedError: + pass + + if not self.match(): + log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue())) + raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern)) + + return ContextManager() + + def wait_for_health(self, pattern, timeout): + """ + Wait until 'ceph health' contains messages matching the pattern + """ + def seen_health_warning(): + health = self.fs.mon_manager.get_mon_health() + summary_strings = [s['summary'] for s in health['summary']] + if len(summary_strings) == 0: + log.debug("Not expected number of summary strings ({0})".format(summary_strings)) + return False + else: + for ss in summary_strings: + if pattern in ss: + return True + + log.debug("Not found expected summary strings yet ({0})".format(summary_strings)) + return False + + self.wait_until_true(seen_health_warning, timeout) + + def wait_for_health_clear(self, timeout): + """ + Wait until `ceph health` returns no messages + """ + def is_clear(): + health = self.fs.mon_manager.get_mon_health() + return len(health['summary']) == 0 + + self.wait_until_true(is_clear, timeout) + + def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None): + period = 5 + elapsed = 0 + while True: + val = get_fn() + if val == expect_val: + return + elif reject_fn and reject_fn(val): + raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val)) + else: + if elapsed >= timeout: + raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format( + elapsed, expect_val, val + )) + else: + log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val)) + time.sleep(period) + elapsed += period + + log.debug("wait_until_equal: success") + + def wait_until_true(self, condition, timeout): + period = 5 + elapsed = 0 + while True: + if condition(): + return + else: + if elapsed >= timeout: + raise RuntimeError("Timed out after {0} seconds".format(elapsed)) + else: + log.debug("wait_until_true: waiting...") + time.sleep(period) + elapsed += period + + log.debug("wait_until_true: success") diff --git a/tasks/cephfs/cephfs_test_case.py b/tasks/cephfs/cephfs_test_case.py index b9228e269ad4d..ae9c0d636d8ee 100644 --- a/tasks/cephfs/cephfs_test_case.py +++ b/tasks/cephfs/cephfs_test_case.py @@ -1,8 +1,7 @@ import json import logging -import unittest from unittest import case -import time +from tasks.ceph_test_case import CephTestCase import os import re from StringIO import StringIO @@ -33,18 +32,13 @@ def needs_trimming(f): return f -class CephFSTestCase(unittest.TestCase): +class CephFSTestCase(CephTestCase): """ Test case for Ceph FS, requires caller to populate Filesystem and Mounts, into the fs, mount_a, mount_b class attributes (setting mount_b is optional) Handles resetting the cluster under test between tests. """ - # Environment references - mounts = None - fs = None - mds_cluster = None - ctx = None # FIXME weird explicit naming mount_a = None @@ -234,43 +228,6 @@ class CephFSTestCase(unittest.TestCase): def _session_by_id(self, session_ls): return dict([(s['id'], s) for s in session_ls]) - def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None): - period = 5 - elapsed = 0 - while True: - val = get_fn() - if val == expect_val: - return - elif reject_fn and reject_fn(val): - raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val)) - else: - if elapsed >= timeout: - raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format( - elapsed, expect_val, val - )) - else: - log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val)) - time.sleep(period) - elapsed += period - - log.debug("wait_until_equal: success") - - def wait_until_true(self, condition, timeout): - period = 5 - elapsed = 0 - while True: - if condition(): - return - else: - if elapsed >= timeout: - raise RuntimeError("Timed out after {0} seconds".format(elapsed)) - else: - log.debug("wait_until_true: waiting...") - time.sleep(period) - elapsed += period - - log.debug("wait_until_true: success") - def wait_for_daemon_start(self, daemon_ids=None): """ Wait until all the daemons appear in the FSMap, either assigned @@ -347,77 +304,3 @@ class CephFSTestCase(unittest.TestCase): else: raise AssertionError("MDS daemon '{0}' did not crash as expected".format(daemon_id)) - - def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10): - """ - Context manager. Assert that during execution, or up to 5 seconds later, - the Ceph cluster log emits a message matching the expected pattern. - - :param expected_pattern: a string that you expect to see in the log output - """ - - ceph_manager = self.fs.mon_manager - - class ContextManager(object): - def match(self): - found = expected_pattern in self.watcher_process.stdout.getvalue() - if invert_match: - return not found - - return found - - def __enter__(self): - self.watcher_process = ceph_manager.run_ceph_w() - - def __exit__(self, exc_type, exc_val, exc_tb): - if not self.watcher_process.finished: - # Check if we got an early match, wait a bit if we didn't - if self.match(): - return - else: - log.debug("No log hits yet, waiting...") - # Default monc tick interval is 10s, so wait that long and - # then some grace - time.sleep(5 + timeout) - - self.watcher_process.stdin.close() - try: - self.watcher_process.wait() - except CommandFailedError: - pass - - if not self.match(): - log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue())) - raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern)) - - return ContextManager() - - def wait_for_health(self, pattern, timeout): - """ - Wait until 'ceph health' contains messages matching the pattern - """ - def seen_health_warning(): - health = self.fs.mon_manager.get_mon_health() - summary_strings = [s['summary'] for s in health['summary']] - if len(summary_strings) == 0: - log.debug("Not expected number of summary strings ({0})".format(summary_strings)) - return False - else: - for ss in summary_strings: - if pattern in ss: - return True - - log.debug("Not found expected summary strings yet ({0})".format(summary_strings)) - return False - - self.wait_until_true(seen_health_warning, timeout) - - def wait_for_health_clear(self, timeout): - """ - Wait until `ceph health` returns no messages - """ - def is_clear(): - health = self.fs.mon_manager.get_mon_health() - return len(health['summary']) == 0 - - self.wait_until_true(is_clear, timeout) diff --git a/tasks/cephfs/filesystem.py b/tasks/cephfs/filesystem.py index 709ed1a1c10ff..fa4507f02fedd 100644 --- a/tasks/cephfs/filesystem.py +++ b/tasks/cephfs/filesystem.py @@ -32,7 +32,49 @@ class ObjectNotFound(Exception): return "Object not found: '{0}'".format(self._object_name) -class MDSCluster(object): +class CephCluster(object): + @property + def admin_remote(self): + first_mon = misc.get_first_mon(self._ctx, None) + (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys() + return result + + def __init__(self, ctx): + self._ctx = ctx + self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager')) + + def get_config(self, key, service_type=None): + """ + Get config from mon by default, or a specific service if caller asks for it + """ + if service_type is None: + service_type = 'mon' + + service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0] + return self.json_asok(['config', 'get', key], service_type, service_id)[key] + + def set_ceph_conf(self, subsys, key, value): + if subsys not in self._ctx.ceph['ceph'].conf: + self._ctx.ceph['ceph'].conf[subsys] = {} + self._ctx.ceph['ceph'].conf[subsys][key] = value + write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they + # used a different config path this won't work. + + def clear_ceph_conf(self, subsys, key): + del self._ctx.ceph['ceph'].conf[subsys][key] + write_conf(self._ctx) + + def json_asok(self, command, service_type, service_id): + proc = self.mon_manager.admin_socket(service_type, service_id, command) + response_data = proc.stdout.getvalue() + log.info("_json_asok output: {0}".format(response_data)) + if response_data.strip(): + return json.loads(response_data) + else: + return None + + +class MDSCluster(CephCluster): """ Collective operations on all the MDS daemons in the Ceph cluster. These daemons may be in use by various Filesystems. @@ -41,21 +83,14 @@ class MDSCluster(object): a parent of Filesystem. The correct way to use MDSCluster going forward is as a separate instance outside of your (multiple) Filesystem instances. """ - - @property - def admin_remote(self): - first_mon = misc.get_first_mon(self._ctx, None) - (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys() - return result - def __init__(self, ctx): + super(MDSCluster, self).__init__(ctx) + self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds')) - self._ctx = ctx if len(self.mds_ids) == 0: raise RuntimeError("This task requires at least one MDS") - self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager')) if hasattr(self._ctx, "daemons"): # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids]) @@ -157,36 +192,6 @@ class MDSCluster(object): return list(result) - def get_config(self, key, service_type=None): - """ - Get config from mon by default, or a specific service if caller asks for it - """ - if service_type is None: - service_type = 'mon' - - service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0] - return self.json_asok(['config', 'get', key], service_type, service_id)[key] - - def set_ceph_conf(self, subsys, key, value): - if subsys not in self._ctx.ceph['ceph'].conf: - self._ctx.ceph['ceph'].conf[subsys] = {} - self._ctx.ceph['ceph'].conf[subsys][key] = value - write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they - # used a different config path this won't work. - - def clear_ceph_conf(self, subsys, key): - del self._ctx.ceph['ceph'].conf[subsys][key] - write_conf(self._ctx) - - def json_asok(self, command, service_type, service_id): - proc = self.mon_manager.admin_socket(service_type, service_id, command) - response_data = proc.stdout.getvalue() - log.info("_json_asok output: {0}".format(response_data)) - if response_data.strip(): - return json.loads(response_data) - else: - return None - def set_clients_block(self, blocked, mds_id=None): """ Block (using iptables) client communications to this MDS. Be careful: if diff --git a/tasks/cephfs/vstart_runner.py b/tasks/vstart_runner.py similarity index 97% rename from tasks/cephfs/vstart_runner.py rename to tasks/vstart_runner.py index 81dca6c8b1845..012ec76133531 100644 --- a/tasks/cephfs/vstart_runner.py +++ b/tasks/vstart_runner.py @@ -107,7 +107,8 @@ try: from teuthology.exceptions import CommandFailedError from tasks.ceph_manager import CephManager from tasks.cephfs.fuse_mount import FuseMount - from tasks.cephfs.filesystem import Filesystem, MDSCluster + from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster + from mgr.mgr_test_case import MgrCluster from teuthology.contextutil import MaxWhileTries from teuthology.task import interactive except ImportError: @@ -289,8 +290,6 @@ class LocalRemote(object): return proc -# FIXME: twiddling vstart daemons is likely to be unreliable, we should probably just let vstart -# run RADOS and run the MDS daemons directly from the test runner class LocalDaemon(object): def __init__(self, daemon_type, daemon_id): self.daemon_type = daemon_type @@ -565,18 +564,11 @@ class LocalCephManager(CephManager): return j -class LocalMDSCluster(MDSCluster): +class LocalCephCluster(CephCluster): def __init__(self, ctx): # Deliberately skip calling parent constructor self._ctx = ctx - - self.mds_ids = ctx.daemons.daemons['mds'].keys() - if not self.mds_ids: - raise RuntimeError("No MDSs found in ceph.conf!") - self.mon_manager = LocalCephManager() - self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) - self._conf = defaultdict(dict) def get_config(self, key, service_type=None): @@ -638,6 +630,17 @@ class LocalMDSCluster(MDSCluster): del self._conf[subsys][key] self._write_conf() + +class LocalMDSCluster(LocalCephCluster, MDSCluster): + def __init__(self, ctx): + super(LocalMDSCluster, self).__init__(ctx) + + self.mds_ids = ctx.daemons.daemons['mds'].keys() + if not self.mds_ids: + raise RuntimeError("No MDSs found in ceph.conf!") + + self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) + def clear_firewall(self): # FIXME: unimplemented pass @@ -646,6 +649,17 @@ class LocalMDSCluster(MDSCluster): return LocalFilesystem(self._ctx, name) +class LocalMgrCluster(LocalCephCluster, MgrCluster): + def __init__(self, ctx): + super(LocalMgrCluster, self).__init__(ctx) + + self.mgr_ids = ctx.daemons.daemons['mgr'].keys() + if not self.mgr_ids: + raise RuntimeError("No manager daemonss found in ceph.conf!") + + self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids]) + + class LocalFilesystem(Filesystem, LocalMDSCluster): @property def admin_remote(self): @@ -769,7 +783,7 @@ def exec_test(): # tests that want to look these up via ctx can do so. # Inspect ceph.conf to see what roles exist for conf_line in open("ceph.conf").readlines(): - for svc_type in ["mon", "osd", "mds"]: + for svc_type in ["mon", "osd", "mds", "mgr"]: if svc_type not in self.daemons.daemons: self.daemons.daemons[svc_type] = {} match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) @@ -806,6 +820,7 @@ def exec_test(): os.rmdir(mount.mountpoint) filesystem = LocalFilesystem(ctx) mds_cluster = LocalMDSCluster(ctx) + mgr_cluster = LocalMgrCluster(ctx) from tasks.cephfs_test_runner import DecoratingLoader @@ -830,7 +845,8 @@ def exec_test(): "ctx": ctx, "mounts": mounts, "fs": filesystem, - "mds_cluster": mds_cluster + "mds_cluster": mds_cluster, + "mgr_cluster": mgr_cluster, }) # For the benefit of polling tests like test_full -- in teuthology land we set this -- 2.39.5