From: Rishabh Dave Date: Thu, 2 Apr 2026 12:09:13 +0000 (+0530) Subject: test_cephfs.py: add tests for mkdirat() X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6b1bbae7b71583c79c402cbca13e278b83f0250b;p=ceph.git test_cephfs.py: add tests for mkdirat() Signed-off-by: Rishabh Dave --- diff --git a/src/test/pybind/test_cephfs.py b/src/test/pybind/test_cephfs.py index c95d334b3cf2..df9311ef9e64 100644 --- a/src/test/pybind/test_cephfs.py +++ b/src/test/pybind/test_cephfs.py @@ -988,6 +988,76 @@ class TestOpenat: cephfs.close(fd2) +class TestMkdirat: + ''' + Test mkdirat() of LibCephFS Python binding. + ''' + + def test_create_child_dir(self, testdir): + ''' + Test that mkdirat() creates a child directory successfully. + ''' + dir1_name = 'dir1' + dir2_name = 'dir2' + dir2_path = f'{dir1_name}/{dir2_name}' + + cephfs.mkdir(dir1_name, 0o755) + fd = cephfs.open(dir1_name, os.O_RDONLY | os.O_DIRECTORY, 0o755) + + cephfs.mkdirat(fd, dir2_name, 0o755) + + cephfs.close(fd) + + def test_create_grandchild_dir(self, testdir): + ''' + Test that mkdirat() creates a grandchild directory successfully. + ''' + dir1_name = 'dir1' + dir2_name = 'dir2' + dir2_path = f'dir1/{dir2_name}' + dir3_name = 'dir3' + dir3_path = f'{dir1_name}/{dir2_name}/{dir3_name}' + + cephfs.mkdir(dir1_name, 0o755) + cephfs.mkdir(dir2_path, 0o755) + fd = cephfs.open(dir1_name, os.O_RDONLY | os.O_DIRECTORY, 0o755) + + cephfs.mkdirat(fd, f'{dir2_name}/{dir3_name}', 0o755) + + cephfs.close(fd) + + def test_when_fd_is_root(self, testdir): + ''' + Test that mkdirat() fails when the directory is already present. + ''' + dir1_name = 'dir1' + dir2_name = 'dir2' + dir2_path = f'{dir1_name}/{dir2_name}' + + cephfs.mkdir(dir1_name, 0o755) + + fd = cephfs.open('/', os.O_RDONLY | os.O_DIRECTORY, 0o755) + cephfs.mkdirat(fd, dir2_path, 0o755) + + cephfs.close(fd) + + def test_when_file_exists(self, testdir): + ''' + Test that mkdirat() fails when the directory is already present. + ''' + dir1_name = 'dir1' + dir2_name = 'dir2' + dir2_path = f'{dir1_name}/{dir2_name}' + + cephfs.mkdir(dir1_name, 0o755) + cephfs.mkdir(dir2_path, 0o755) + + fd = cephfs.open(dir1_name, os.O_RDONLY | os.O_DIRECTORY, 0o755) + assert_raises(libcephfs.ObjectExists, cephfs.mkdirat, fd, dir2_name, 0o755) + + cephfs.close(fd) + + class TestWithRootUser: def setup_method(self):