cephfs.close(fd)
+ def test_chownat_on_regfile(self, testdir):
+ cephfs.mkdir(b'dir1', 0o755)
+
+ fd = cephfs.open(b'dir1/file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ st1 = cephfs.stat(b'dir1/file1')
+ old_uid = st1.st_uid
+ old_gid = st1.st_gid
+ new_uid = st1.st_uid + 1
+ new_gid = st1.st_gid + 1
+
+ dirfd = cephfs.open('dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chownat(dirfd, 'file1', new_uid, new_gid, 0)
+
+ st2 = cephfs.stat(b'dir1/file1')
+ assert_equal(st2.st_uid, new_uid)
+ assert_equal(st2.st_gid, new_gid)
+
+ # reset to old uid & gid for sake of teardown.
+ cephfs.chownat(dirfd, b'file1', old_uid, old_gid, 0)
+ cephfs.close(dirfd)
+
+ def test_chownat_on_dir(self, testdir):
+ cephfs.mkdir(b'dir1', 0o755)
+ cephfs.mkdir(b'dir1/dir2', 0o755)
+
+ st1 = cephfs.stat(b'dir1/dir2')
+ old_uid = st1.st_uid
+ old_gid = st1.st_gid
+ new_uid = st1.st_uid + 1
+ new_gid = st1.st_gid + 1
+
+ dirfd = cephfs.open(b'dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chownat(dirfd, b'dir2', new_uid, new_gid, 0)
+
+ st2 = cephfs.stat(b'dir1/dir2')
+ assert_equal(st2.st_uid, new_uid)
+ assert_equal(st2.st_gid, new_gid)
+
+ # reset to old uid & gid for sake of teardown.
+ cephfs.chownat(dirfd, b'dir2', old_uid, old_gid, 0)
+ cephfs.close(dirfd)
+
+ def test_chownat_on_symlink(self, testdir):
+ cephfs.mkdir(b'dir1', 0o755)
+
+ fd = cephfs.open(b'dir1/file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ cephfs.symlink('file1', b'dir1/slink1')
+
+ st1 = cephfs.stat(b'dir1/slink1', follow_symlink=False)
+ old_uid = st1.st_uid
+ old_gid = st1.st_gid
+ new_uid = st1.st_uid + 1
+ new_gid = st1.st_gid + 1
+
+ dirfd = cephfs.open(b'/dir1', os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ cephfs.chownat(dirfd, b'slink1', new_uid, new_gid, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+ st2 = cephfs.stat(b'dir1/slink1', follow_symlink=False)
+ assert_equal(st2.st_uid, new_uid)
+ assert_equal(st2.st_gid, new_gid)
+
+ # reset to old uid & gid for sake of teardown.
+ cephfs.chownat(dirfd, b'slink1', old_uid, old_gid, 0)
+ cephfs.close(dirfd)
+
def test_setattrx(self, testdir):
fd = cephfs.open(b'file-setattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)