class TestRmdir(TestCephFSShell):
dir_name = "test_dir"
+ def dir_deleted(self, msg, opt=" "):
+ self.run_cephfs_shell_cmd("rmdir"+ opt + self.dir_name)
+ self.assertFalse(path.exists(path.join(self.mount_a.mountpoint,
+ self.dir_name)), msg)
+
+ def dir_not_deleted(self, msg, opt=" "):
+ rmdir_output = self.get_cephfs_shell_cmd_error("rmdir" + opt + self.dir_name)
+ log.info("cephfs-shell rmdir output:\n{}".format(rmdir_output))
+ self.assertTrue(path.exists(path.join(self.mount_a.mountpoint,
+ self.dir_name)), msg)
+
def test_rmdir(self):
"""
Test that rmdir deletes directory
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
- self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
-
- self.assertFalse(path.exists(self.dir_name),
- "Something went wrong!! test_dir is not deleted")
-
+ self.dir_deleted("test_dir is not deleted")
def test_rmdir_non_existing_dir(self):
"""
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
- self.assertTrue(path.exists(path.join(self.mount_a.mountpoint, self.dir_name)),
- "Something went wrong!! non-empty test_dir is not deleted")
+ self.dir_not_deleted("non-empty test_dir is deleted")
def test_rmdir_existing_file(self):
"""
Test that rmdir outputs error on file
"""
self.run_cephfs_shell_cmd("put - dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir dumpfile")
- self.assertTrue(path.exists(path.join(self.mount_a.mountpoint, "dumpfile")),
- "Something went wrong!! rmdir deleted a file")
+ try:
+ self.run_cephfs_shell_cmd("rmdir dumpfile")
+ except Exception as e:
+ o = self.get_cephfs_shell_cmd_output("ls")
+ log.info("cephfs-shell output:\n{}".format(o))
+ log.info("cephfs-shell output Exception:\n{}".format(e))
+ """
+ self.assertTrue(path.exists(path.join(self.mount_a.mountpoint,
+ "dumpfile")),
+ "Something went wrong!! rmdir deleted a file")
+ """
def test_rmdir_p(self):
"""
Test that rmdir -p deletes all empty directories in the root directory passed
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
- self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
- self.assertFalse(path.exists(path.join(self.mount_a.mountpoint, self.dir_name)),
- "Something went wrong!! test_dir is not deleted")
+ self.dir_deleted("test_dir containing empty directories is not deleted",
+ " -p ")
def test_rmdir_p_valid_path(self):
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
self.run_cephfs_shell_cmd("rmdir -p test_dir/t1/t2/t3")
self.assertFalse(path.exists(path.join(self.mount_a.mountpoint, self.dir_name)),
- "Something went wrong!! test_dir/t1/t2/t3 is not deleted")
+ "test_dir/t1/t2/t3 is not deleted")
def test_rmdir_p_non_existing_dir(self):
"""
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
- self.assertTrue(path.exists(path.join(self.mount_a.mountpoint, self.dir_name)),
- "Something went wrong!! test_dir is deleted")
+ self.dir_not_deleted("non-empty test_dir is deleted with -p", " -p ")
class TestGetAndPut(TestCephFSShell):
# the 'put' command gets tested as well with the 'get' comamnd