self.gather_mount_info()
- self.mounted = True
-
def _run_mount_cmd(self, mntopts, check_status):
mount_cmd = self._get_mount_cmd(mntopts)
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
time.sleep(5)
- self.mounted = True
-
# Now that we're mounted, set permissions so that the rest of the test
# will have unrestricted access to the filesystem mount.
for retry in range(10):
if self.is_mounted():
raise
- self.mounted = False
self._fuse_conn = None
self.id = None
self.inst = None
if require_clean:
raise
- self.mounted = False
self.cleanup()
def teardown(self):
except CommandFailedError:
pass
- self.mounted = False
-
def _asok_path(self):
return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
self.enable_dynamic_debug()
self.ctx[f'kmount_count.{self.client_remote.hostname}'] = kmount_count + 1
- self.mounted = True
-
def _run_mount_cmd(self, mntopts, check_status):
mount_cmd = self._get_mount_cmd(mntopts)
mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
self.disable_dynamic_debug()
self.ctx[f'kmount_count.{self.client_remote.hostname}'] = kmount_count - 1
- self.mounted = False
self.cleanup()
def umount_wait(self, force=False, require_clean=False,
self.mountpoint], timeout=timeout,
omit_sudo=False)
- self.mounted = False
self.cleanup()
def wait_until_mounted(self):
Unlike the fuse client, the kernel client is up and running as soon
as the initial mount() function returns.
"""
- assert self.mounted
+ assert self.is_mounted()
def teardown(self):
super(KernelMount, self).teardown()
- if self.mounted:
+ if self.is_mounted():
self.umount()
def _get_debug_dir(self):
Look up the CephFS client ID for this mount, using debugfs.
"""
- assert self.mounted
+ assert self.is_mounted()
return self._get_global_id()
:param cephfs_mntpt: Path to directory inside Ceph FS that will be
mounted as root
"""
- self.mounted = False
self.ctx = ctx
self.test_dir = test_dir
sudo=True).decode())
def is_mounted(self):
- return self.mounted
+ return self.hostfs_mntpt in \
+ self.client_remote.read_file('/proc/self/mounts',stdout=StringIO())
def setupfs(self, name=None):
if name is None and self.fs is not None:
exception: wait accepted too which can be True or False.
"""
self.umount_wait()
- assert not self.mounted
+ assert not self.is_mounted()
mntopts = kwargs.pop('mntopts', [])
check_status = kwargs.pop('check_status', True)