return f
+class MountDetails():
+
+ def __init__(self, mntobj):
+ self.client_id = mntobj.client_id
+ self.client_keyring_path = mntobj.client_keyring_path
+ self.client_remote = mntobj.client_remote
+ self.cephfs_name = mntobj.cephfs_name
+ self.cephfs_mntpt = mntobj.cephfs_mntpt
+ self.hostfs_mntpt = mntobj.hostfs_mntpt
+
+ def restore(self, mntobj):
+ mntobj.client_id = self.client_id
+ mntobj.client_keyring_path = self.client_keyring_path
+ mntobj.client_remote = self.client_remote
+ mntobj.cephfs_name = self.cephfs_name
+ mntobj.cephfs_mntpt = self.cephfs_mntpt
+ mntobj.hostfs_mntpt = self.hostfs_mntpt
+
+
class CephFSTestCase(CephTestCase):
"""
Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
LOAD_SETTINGS = [] # type: ignore
+ def _save_mount_details(self):
+ """
+ XXX: Tests may change details of mount objects, so let's stash them so
+ that these details are restored later to ensure smooth setUps and
+ tearDowns for upcoming tests.
+ """
+ self._orig_mount_details = [MountDetails(m) for m in self.mounts]
+ log.info(self._orig_mount_details)
+
def setUp(self):
super(CephFSTestCase, self).setUp()
for mount in self.mounts:
if mount.is_mounted():
mount.umount_wait(force=True)
+ self._save_mount_details()
# To avoid any issues with e.g. unlink bugs, we destroy and recreate
# the filesystem rather than just doing a rm -rf of files
self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr)
client_mount_ids = [m.client_id for m in self.mounts]
- # In case the test changes the IDs of clients, stash them so that we can
- # reset in tearDown
- self._original_client_ids = client_mount_ids
- log.info(client_mount_ids)
-
# In case there were any extra auth identities around from a previous
# test, delete them
for entry in self.auth_list():
for m in self.mounts:
m.teardown()
- for i, m in enumerate(self.mounts):
- m.client_id = self._original_client_ids[i]
+ for m, md in zip(self.mounts, self._orig_mount_details):
+ md.restore(m)
for subsys, key in self.configs_set:
self.mds_cluster.clear_ceph_conf(subsys, key)