# verify trash dir is clean
self._wait_for_trash_empty()
+
+ def test_subvolume_ops_on_nonexistent_vol(self):
+ # tests the fs subvolume operations on non existing volume
+
+ volname = "non_existent_subvolume"
+
+ # try subvolume operations
+ for op in ("create", "rm", "getpath", "info", "resize", "pin", "ls"):
+ try:
+ if op == "resize":
+ self._fs_cmd("subvolume", "resize", volname, "subvolname_1", "inf")
+ elif op == "pin":
+ self._fs_cmd("subvolume", "pin", volname, "subvolname_1", "export", "1")
+ elif op == "ls":
+ self._fs_cmd("subvolume", "ls", volname)
+ else:
+ self._fs_cmd("subvolume", op, volname, "subvolume_1")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT)
+ else:
+ self.fail("expected the 'fs subvolume {0}' command to fail".format(op))
+
+ # try subvolume snapshot operations and clone create
+ for op in ("create", "rm", "info", "protect", "unprotect", "ls", "clone"):
+ try:
+ if op == "ls":
+ self._fs_cmd("subvolume", "snapshot", op, volname, "subvolume_1")
+ elif op == "clone":
+ self._fs_cmd("subvolume", "snapshot", op, volname, "subvolume_1", "snapshot_1", "clone_1")
+ else:
+ self._fs_cmd("subvolume", "snapshot", op, volname, "subvolume_1", "snapshot_1")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT)
+ else:
+ self.fail("expected the 'fs subvolume snapshot {0}' command to fail".format(op))
+
+ # try, clone status
+ try:
+ self._fs_cmd("clone", "status", volname, "clone_1")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT)
+ else:
+ self.fail("expected the 'fs clone status' command to fail")
+
+ # try subvolumegroup operations
+ for op in ("create", "rm", "getpath", "pin", "ls"):
+ try:
+ if op == "pin":
+ self._fs_cmd("subvolumegroup", "pin", volname, "group_1", "export", "0")
+ elif op == "ls":
+ self._fs_cmd("subvolumegroup", op, volname)
+ else:
+ self._fs_cmd("subvolumegroup", op, volname, "group_1")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT)
+ else:
+ self.fail("expected the 'fs subvolumegroup {0}' command to fail".format(op))
+
+ # try subvolumegroup snapshot operations
+ for op in ("create", "rm", "ls"):
+ try:
+ if op == "ls":
+ self._fs_cmd("subvolumegroup", "snapshot", op, volname, "group_1")
+ else:
+ self._fs_cmd("subvolumegroup", "snapshot", op, volname, "group_1", "snapshot_1")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT)
+ else:
+ self.fail("expected the 'fs subvolumegroup snapshot {0}' command to fail".format(op))
from ..exception import VolumeException
from ..fs_util import create_pool, remove_pool, create_filesystem, \
remove_filesystem, create_mds, volume_exists
-from mgr_util import open_filesystem
+from mgr_util import open_filesystem, CephfsConnectionException
log = logging.getLogger(__name__)
"""
g_lock = GlobalLock()
with g_lock.lock_op():
- with open_filesystem(vc, volname) as fs_handle:
- yield fs_handle
+ try:
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)
@contextmanager
:param volname: volume name
:return: yields a volume handle (ceph filesystem handle)
"""
- with open_filesystem(vc, volname) as fs_handle:
- yield fs_handle
+ try:
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)