assert(list(mds_sessions.keys())[0].startswith('mds.'))
+class TestUnlinkat:
+
+ def test_unlinkat_regfile_fd_of_root(self, testdir):
+ regfilename = 'file1'
+
+ fd = cephfs.open(regfilename, 'w', 0o755)
+ cephfs.write(fd, b"abcd", 0)
+ cephfs.close(fd)
+
+ fd = cephfs.open('/', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.unlinkat(fd, './' + regfilename, 0)
+ cephfs.close(fd)
+
+ def test_unlinkat_dir_fd_of_root(self, testdir):
+ dirname = 'dir1'
+
+ cephfs.mkdir(dirname, 0o755)
+ fd = cephfs.open('/', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ # AT_REMOVEDIR = 0x200
+ cephfs.unlinkat(fd, dirname, 0x200)
+ cephfs.close(fd)
+
+ def test_unlinkat_regfile_fd_of_nonroot(self, testdir):
+ dir1_name = 'dir1'
+ regfile_name = 'file1'
+ regfile_path = 'dir1/file1'
+
+ cephfs.mkdir(dir1_name, 0o755)
+ fd = cephfs.open(regfile_path, 'w', 0o755)
+ cephfs.write(fd, b"abcd", 0)
+ cephfs.close(fd)
+
+ fd = cephfs.open(dir1_name, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.unlinkat(fd, regfile_name, 0)
+ cephfs.close(fd)
+
+ def test_unlinkat_dir_fd_of_nonroot(self, testdir):
+ dir1_name = 'dir1'
+ dir2_name = 'dir2'
+ dir2_path = 'dir1/dir2'
+
+ cephfs.mkdir(dir1_name, 0o755)
+ cephfs.mkdir(dir2_path, 0o755)
+ fd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.unlinkat(fd, dir2_name, libcephfs.AT_REMOVEDIR)
+ cephfs.close(fd)
+
+ def test_unlinkat_regfile_AT_FDCWD(self, testdir):
+ regfilename = 'file1'
+
+ fd = cephfs.open(regfilename, 'w', 0o755)
+ cephfs.write(fd, b"abcd", 0)
+ cephfs.close(fd)
+ cephfs.unlinkat(libcephfs.AT_FDCWD, regfilename, 0)
+
+ def test_unlinkat_dir_AT_FDCWD(self, testdir):
+ dirname = 'dir1'
+
+ cephfs.mkdir(dirname, 0o755)
+ cephfs.unlinkat(libcephfs.AT_FDCWD, dirname, libcephfs.AT_REMOVEDIR)
+
+ def test_unlinkat_for_opened_dir(self, testdir):
+ dirname = 'dir1'
+
+ cephfs.mkdir(dirname, 0o755)
+ fd = cephfs.open('/', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ fh = cephfs.opendir(dirname)
+ cephfs.unlinkat(fd, dirname, libcephfs.AT_REMOVEDIR)
+ cephfs.close(fd)
+
+
class TestWithRootUser:
def setup_method(self):