cephfs.close(fd)
+class TestChmodat:
+
+ def test_chmodat_on_file(self, testdir):
+ cephfs.mkdir('dir1', 0o755)
+ fd = cephfs.open('dir1/file1', 'w')
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ fd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chmodat(fd, 'file1', 0o644, 0)
+
+ statx_buf = cephfs.statx(b'dir1/file1', libcephfs.CEPH_STATX_MODE, 0)
+ mode = statx_buf['mode'] & ~stat.S_IFMT(statx_buf['mode'])
+ assert_equal(0o644, mode)
+ cephfs.close(fd)
+
+ def test_chmodat_on_dir(self, testdir):
+ cephfs.mkdir('dir1', 0o755)
+ cephfs.mkdir('dir1/dir2', 0o755)
+
+ fd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chmodat(fd, 'dir2', 0o644, 0)
+
+ statx_buf = cephfs.statx(b'dir1/dir2', libcephfs.CEPH_STATX_MODE, 0)
+ mode = statx_buf['mode'] & ~stat.S_IFMT(statx_buf['mode'])
+ assert_equal(0o644, mode)
+ cephfs.close(fd)
+
+ def test_chmodat_on_link(self, testdir):
+ cephfs.mkdir('dir1', 0o755)
+ fd = cephfs.open('dir1/file1', 'w', 0o644)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ cephfs.chdir('dir1')
+ cephfs.symlink('file1', 'slink1')
+ cephfs.chdir('..')
+
+ fd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chmodat(fd, 'slink1', 0o755, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+ statx_buf = cephfs.statx(b'dir1/slink1', libcephfs.CEPH_STATX_MODE, libcephfs.AT_SYMLINK_NOFOLLOW)
+ mode = statx_buf['mode'] & ~stat.S_IFMT(statx_buf['mode'])
+ assert_equal(0o755, mode)
+ cephfs.close(fd)
+
+ def test_chmodat_on_link_follow_slink(self, testdir):
+ '''
+ Test chmodat() for a symlink without passing NOFOLLOW flag.
+ '''
+ cephfs.mkdir('dir1', 0o755)
+ fd = cephfs.open('dir1/file1', 'w', 0o644)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ cephfs.chdir('dir1')
+ cephfs.symlink('file1', 'slink1')
+ cephfs.chdir('..')
+
+ fd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chmodat(fd, 'slink1', 0o755, 0)
+
+ statx_buf = cephfs.statx(b'dir1/file1', libcephfs.CEPH_STATX_MODE, libcephfs.AT_SYMLINK_NOFOLLOW)
+ mode = statx_buf['mode'] & ~stat.S_IFMT(statx_buf['mode'])
+ assert_equal(0o755, mode)
+ cephfs.close(fd)
+
+
class TestWithRootUser:
def setup_method(self):