import uuid
import json
from datetime import datetime
+from subprocess import getoutput as get_cmd_output
+
cephfs = None
+
def setup_module():
global cephfs
cephfs = libcephfs.LibCephFS(conffile='')
+
+ username = get_cmd_output('id -un')
+ uid = get_cmd_output(f'id -u {username}')
+ gid = get_cmd_output(f'id -g {username}')
+ cephfs.conf_set('client_mount_uid', uid)
+ cephfs.conf_set('client_mount_gid', gid)
cephfs.mount()
+ cephfs.conf_set('client_permissions' , 'false')
+ cephfs.chown('/', int(uid), int(gid))
+ cephfs.conf_set('client_permissions' , 'true')
+
def teardown_module():
global cephfs
cephfs.shutdown()
cephfs.close(fd)
cephfs.unlink(b'/file-fchmod')
-def test_chown(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
-
- uid = 1012
- gid = 1012
- cephfs.chown('file1', uid, gid)
- st = cephfs.stat(b'/file1')
- assert_equal(st.st_uid, uid)
- assert_equal(st.st_gid, gid)
-
-def test_chown_change_uid_but_not_gid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
-
- st1 = cephfs.stat(b'/file1')
-
- uid = 1012
- cephfs.chown('file1', uid, -1)
- st2 = cephfs.stat(b'/file1')
- assert_equal(st2.st_uid, uid)
-
- # ensure that gid is unchaged.
- assert_equal(st1.st_gid, st2.st_gid)
-
-def test_chown_change_gid_but_not_uid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
-
- st1 = cephfs.stat(b'/file1')
-
- gid = 1012
- cephfs.chown('file1', -1, gid)
- st2 = cephfs.stat(b'/file1')
- assert_equal(st2.st_gid, gid)
-
- # ensure that uid is unchaged.
- assert_equal(st1.st_uid, st2.st_uid)
-
-def test_lchown(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
- cephfs.symlink('file1', 'slink1')
-
- uid = 1012
- gid = 1012
- cephfs.lchown('slink1', uid, gid)
- st = cephfs.lstat(b'/slink1')
- assert_equal(st.st_uid, uid)
- assert_equal(st.st_gid, gid)
-
-def test_lchown_change_uid_but_not_gid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
- cephfs.symlink('file1', 'slink1')
-
- st1 = cephfs.lstat(b'slink1')
-
- uid = 1012
- cephfs.lchown('slink1', uid, -1)
- st2 = cephfs.lstat(b'/slink1')
- assert_equal(st2.st_uid, uid)
-
- # ensure that gid is unchaged.
- assert_equal(st1.st_gid, st2.st_gid)
-
-def test_lchown_change_gid_but_not_uid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
- cephfs.close(fd)
- cephfs.symlink('file1', 'slink1')
-
- st1 = cephfs.lstat(b'slink1')
-
- gid = 1012
- cephfs.lchown('slink1', -1, gid)
- st2 = cephfs.lstat(b'/slink1')
- assert_equal(st2.st_gid, gid)
-
- # ensure that uid is unchaged.
- assert_equal(st1.st_uid, st2.st_uid)
-
-def test_fchown(testdir):
- fd = cephfs.open(b'/file-fchown', 'w', 0o655)
- uid = os.getuid()
- gid = os.getgid()
- assert_raises(TypeError, cephfs.fchown, b'/file-fchown', uid, gid)
- assert_raises(TypeError, cephfs.fchown, fd, "uid", "gid")
- cephfs.fchown(fd, uid, gid)
- st = cephfs.statx(b'/file-fchown', libcephfs.CEPH_STATX_UID | libcephfs.CEPH_STATX_GID, 0)
- assert_equal(st["uid"], uid)
- assert_equal(st["gid"], gid)
- cephfs.fchown(fd, 9999, 9999)
- st = cephfs.statx(b'/file-fchown', libcephfs.CEPH_STATX_UID | libcephfs.CEPH_STATX_GID, 0)
- assert_equal(st["uid"], 9999)
- assert_equal(st["gid"], 9999)
- cephfs.close(fd)
- cephfs.unlink(b'/file-fchown')
-
-def test_fchown_change_uid_but_not_gid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
-
- st1 = cephfs.stat(b'/file1')
-
- uid = 1012
- cephfs.fchown(fd, uid, -1)
- st2 = cephfs.stat(b'/file1')
- assert_equal(st2.st_uid, uid)
-
- # ensure that uid is unchanged.
- assert_equal(st1.st_gid, st2.st_gid)
-
- cephfs.close(fd)
-
-def test_fchown_change_gid_but_not_uid(testdir):
- fd = cephfs.open('file1', 'w', 0o755)
- cephfs.write(fd, b'abcd', 0)
-
- st1 = cephfs.stat(b'/file1')
-
- gid = 1012
- cephfs.chown('file1', -1, gid)
- st2 = cephfs.stat(b'/file1')
- assert_equal(st2.st_gid, gid)
-
- # ensure that gid is unchanged.
- assert_equal(st1.st_uid, st2.st_uid)
-
- cephfs.close(fd)
-
def test_truncate(testdir):
fd = cephfs.open(b'/file-truncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
cephfs.unlink(b'file-1')
-def test_setattrx(testdir):
- fd = cephfs.open(b'file-setattrx', 'w', 0o655)
- cephfs.write(fd, b"1111", 0)
- cephfs.close(fd)
- st = cephfs.statx(b'file-setattrx', libcephfs.CEPH_STATX_MODE, 0)
- mode = st["mode"] | stat.S_IXUSR
- assert_raises(TypeError, cephfs.setattrx, b'file-setattrx', "dict", 0, 0)
-
- time.sleep(1)
- statx_dict = dict()
- statx_dict["mode"] = mode
- statx_dict["uid"] = 9999
- statx_dict["gid"] = 9999
- dt = datetime.now()
- statx_dict["mtime"] = dt
- statx_dict["atime"] = dt
- statx_dict["ctime"] = dt
- statx_dict["size"] = 10
- statx_dict["btime"] = dt
- cephfs.setattrx(b'file-setattrx', statx_dict, libcephfs.CEPH_SETATTR_MODE | libcephfs.CEPH_SETATTR_UID |
- libcephfs.CEPH_SETATTR_GID | libcephfs.CEPH_SETATTR_MTIME |
- libcephfs.CEPH_SETATTR_ATIME | libcephfs.CEPH_SETATTR_CTIME |
- libcephfs.CEPH_SETATTR_SIZE | libcephfs.CEPH_SETATTR_BTIME, 0)
- st1 = cephfs.statx(b'file-setattrx', libcephfs.CEPH_STATX_MODE | libcephfs.CEPH_STATX_UID |
- libcephfs.CEPH_STATX_GID | libcephfs.CEPH_STATX_MTIME |
- libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_CTIME |
- libcephfs.CEPH_STATX_SIZE | libcephfs.CEPH_STATX_BTIME, 0)
- assert_equal(mode, st1["mode"])
- assert_equal(9999, st1["uid"])
- assert_equal(9999, st1["gid"])
- assert_equal(int(dt.timestamp()), int(st1["mtime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["atime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["ctime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["btime"].timestamp()))
- assert_equal(10, st1["size"])
- cephfs.unlink(b'file-setattrx')
-
-def test_fsetattrx(testdir):
- fd = cephfs.open(b'file-fsetattrx', 'w', 0o655)
- cephfs.write(fd, b"1111", 0)
- st = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE, 0)
- mode = st["mode"] | stat.S_IXUSR
- assert_raises(TypeError, cephfs.fsetattrx, fd, "dict", 0, 0)
-
- time.sleep(1)
- statx_dict = dict()
- statx_dict["mode"] = mode
- statx_dict["uid"] = 9999
- statx_dict["gid"] = 9999
- dt = datetime.now()
- statx_dict["mtime"] = dt
- statx_dict["atime"] = dt
- statx_dict["ctime"] = dt
- statx_dict["size"] = 10
- statx_dict["btime"] = dt
- cephfs.fsetattrx(fd, statx_dict, libcephfs.CEPH_SETATTR_MODE | libcephfs.CEPH_SETATTR_UID |
- libcephfs.CEPH_SETATTR_GID | libcephfs.CEPH_SETATTR_MTIME |
- libcephfs.CEPH_SETATTR_ATIME | libcephfs.CEPH_SETATTR_CTIME |
- libcephfs.CEPH_SETATTR_SIZE | libcephfs.CEPH_SETATTR_BTIME)
- st1 = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE | libcephfs.CEPH_STATX_UID |
- libcephfs.CEPH_STATX_GID | libcephfs.CEPH_STATX_MTIME |
- libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_CTIME |
- libcephfs.CEPH_STATX_SIZE | libcephfs.CEPH_STATX_BTIME, 0)
- assert_equal(mode, st1["mode"])
- assert_equal(9999, st1["uid"])
- assert_equal(9999, st1["gid"])
- assert_equal(int(dt.timestamp()), int(st1["mtime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["atime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["ctime"].timestamp()))
- assert_equal(int(dt.timestamp()), int(st1["btime"].timestamp()))
- assert_equal(10, st1["size"])
- cephfs.close(fd)
- cephfs.unlink(b'file-fsetattrx')
-
def test_get_layout(testdir):
fd = cephfs.open(b'file-get-layout', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert(list(mds_sessions.keys())[0].startswith('mds.'))
+class TestWithRootUser:
+
+ def setup_method(self):
+ cephfs.unmount()
+ cephfs.conf_set('client_mount_uid', '0')
+ cephfs.conf_set('client_mount_gid', '0')
+ cephfs.mount()
+
+ cephfs.conf_set('client_permissions' , 'false')
+ cephfs.chown('/', 0, 0)
+ cephfs.conf_set('client_permissions' , 'true')
+
+ def teardown_method(self):
+ cephfs.unmount()
+
+ username = get_cmd_output('id -un')
+ uid = get_cmd_output(f'id -u {username}')
+ gid = get_cmd_output(f'id -g {username}')
+
+ cephfs.conf_set('client_mount_uid', uid)
+ cephfs.conf_set('client_mount_gid', gid)
+ cephfs.mount()
+
+ cephfs.conf_set('client_permissions' , 'false')
+ cephfs.chown('/', int(uid), int(gid))
+ cephfs.conf_set('client_permissions' , 'true')
+
+ def test_chown(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ uid = 1012
+ gid = 1012
+ cephfs.chown('file1', uid, gid)
+ st = cephfs.stat(b'/file1')
+ assert_equal(st.st_uid, uid)
+ assert_equal(st.st_gid, gid)
+
+ def test_chown_change_uid_but_not_gid(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ st1 = cephfs.stat(b'/file1')
+
+ uid = 1012
+ cephfs.chown('file1', uid, -1)
+ st2 = cephfs.stat(b'/file1')
+ assert_equal(st2.st_uid, uid)
+
+ # ensure that gid is unchaged.
+ assert_equal(st1.st_gid, st2.st_gid)
+
+ def test_chown_change_gid_but_not_uid(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+
+ st1 = cephfs.stat(b'/file1')
+
+ gid = 1012
+ cephfs.chown('file1', -1, gid)
+ st2 = cephfs.stat(b'/file1')
+ assert_equal(st2.st_gid, gid)
+
+ # ensure that uid is unchaged.
+ assert_equal(st1.st_uid, st2.st_uid)
+
+ def test_lchown(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+ cephfs.symlink('file1', 'slink1')
+
+ uid = 1012
+ gid = 1012
+ cephfs.lchown('slink1', uid, gid)
+ st = cephfs.lstat(b'/slink1')
+ assert_equal(st.st_uid, uid)
+ assert_equal(st.st_gid, gid)
+
+ def test_lchown_change_uid_but_not_gid(testdir):
+ fd = cephfs.open('file2', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+ cephfs.symlink('file2', 'slink2')
+
+ st1 = cephfs.lstat(b'slink2')
+
+ uid = 1012
+ cephfs.lchown('slink2', uid, -1)
+ st2 = cephfs.lstat(b'/slink2')
+ assert_equal(st2.st_uid, uid)
+
+ # ensure that gid is unchaged.
+ assert_equal(st1.st_gid, st2.st_gid)
+
+ def test_lchown_change_gid_but_not_uid(testdir):
+ fd = cephfs.open('file3', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+ cephfs.close(fd)
+ cephfs.symlink('file3', 'slink3')
+
+ st1 = cephfs.lstat(b'slink3')
+
+ gid = 1012
+ cephfs.lchown('slink3', -1, gid)
+ st2 = cephfs.lstat(b'/slink3')
+ assert_equal(st2.st_gid, gid)
+
+ # ensure that uid is unchaged.
+ assert_equal(st1.st_uid, st2.st_uid)
+
+ def test_fchown(testdir):
+ fd = cephfs.open(b'/file-fchown', 'w', 0o655)
+ uid = os.getuid()
+ gid = os.getgid()
+ assert_raises(TypeError, cephfs.fchown, b'/file-fchown', uid, gid)
+ assert_raises(TypeError, cephfs.fchown, fd, "uid", "gid")
+ cephfs.fchown(fd, uid, gid)
+ st = cephfs.statx(b'/file-fchown', libcephfs.CEPH_STATX_UID | libcephfs.CEPH_STATX_GID, 0)
+ assert_equal(st["uid"], uid)
+ assert_equal(st["gid"], gid)
+ cephfs.fchown(fd, 9999, 9999)
+ st = cephfs.statx(b'/file-fchown', libcephfs.CEPH_STATX_UID | libcephfs.CEPH_STATX_GID, 0)
+ assert_equal(st["uid"], 9999)
+ assert_equal(st["gid"], 9999)
+ cephfs.close(fd)
+ cephfs.unlink(b'/file-fchown')
+
+ def test_fchown_change_uid_but_not_gid(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+
+ st1 = cephfs.stat(b'/file1')
+
+ uid = 1012
+ cephfs.fchown(fd, uid, -1)
+ st2 = cephfs.stat(b'/file1')
+ assert_equal(st2.st_uid, uid)
+
+ # ensure that uid is unchanged.
+ assert_equal(st1.st_gid, st2.st_gid)
+
+ cephfs.close(fd)
+
+ def test_fchown_change_gid_but_not_uid(testdir):
+ fd = cephfs.open('file1', 'w', 0o755)
+ cephfs.write(fd, b'abcd', 0)
+
+ st1 = cephfs.stat(b'/file1')
+
+ gid = 1012
+ cephfs.chown('file1', -1, gid)
+ st2 = cephfs.stat(b'/file1')
+ assert_equal(st2.st_gid, gid)
+
+ # ensure that gid is unchanged.
+ assert_equal(st1.st_uid, st2.st_uid)
+
+ cephfs.close(fd)
+
+ def test_setattrx(testdir):
+ fd = cephfs.open(b'file-setattrx', 'w', 0o655)
+ cephfs.write(fd, b"1111", 0)
+ cephfs.close(fd)
+ st = cephfs.statx(b'file-setattrx', libcephfs.CEPH_STATX_MODE, 0)
+ mode = st["mode"] | stat.S_IXUSR
+ assert_raises(TypeError, cephfs.setattrx, b'file-setattrx', "dict", 0, 0)
+
+ time.sleep(1)
+ statx_dict = dict()
+ statx_dict["mode"] = mode
+ statx_dict["uid"] = 9999
+ statx_dict["gid"] = 9999
+ dt = datetime.now()
+ statx_dict["mtime"] = dt
+ statx_dict["atime"] = dt
+ statx_dict["ctime"] = dt
+ statx_dict["size"] = 10
+ statx_dict["btime"] = dt
+ cephfs.setattrx(b'file-setattrx', statx_dict, libcephfs.CEPH_SETATTR_MODE |
+ libcephfs.CEPH_SETATTR_UID |
+ libcephfs.CEPH_SETATTR_GID |
+ libcephfs.CEPH_SETATTR_MTIME |
+ libcephfs.CEPH_SETATTR_ATIME |
+ libcephfs.CEPH_SETATTR_CTIME |
+ libcephfs.CEPH_SETATTR_SIZE |
+ libcephfs.CEPH_SETATTR_BTIME, 0)
+ st1 = cephfs.statx(b'file-setattrx', libcephfs.CEPH_STATX_MODE |
+ libcephfs.CEPH_STATX_UID |
+ libcephfs.CEPH_STATX_GID |
+ libcephfs.CEPH_STATX_MTIME |
+ libcephfs.CEPH_STATX_ATIME |
+ libcephfs.CEPH_STATX_CTIME |
+ libcephfs.CEPH_STATX_SIZE |
+ libcephfs.CEPH_STATX_BTIME, 0)
+ assert_equal(mode, st1["mode"])
+ assert_equal(9999, st1["uid"])
+ assert_equal(9999, st1["gid"])
+ assert_equal(int(dt.timestamp()), int(st1["mtime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["atime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["ctime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["btime"].timestamp()))
+ assert_equal(10, st1["size"])
+ cephfs.unlink(b'file-setattrx')
+
+ def test_fsetattrx(testdir):
+ fd = cephfs.open(b'file-fsetattrx', 'w', 0o655)
+ cephfs.write(fd, b"1111", 0)
+ st = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE, 0)
+ mode = st["mode"] | stat.S_IXUSR
+ assert_raises(TypeError, cephfs.fsetattrx, fd, "dict", 0, 0)
+
+ time.sleep(1)
+ statx_dict = dict()
+ statx_dict["mode"] = mode
+ statx_dict["uid"] = 9999
+ statx_dict["gid"] = 9999
+ dt = datetime.now()
+ statx_dict["mtime"] = dt
+ statx_dict["atime"] = dt
+ statx_dict["ctime"] = dt
+ statx_dict["size"] = 10
+ statx_dict["btime"] = dt
+ cephfs.fsetattrx(fd, statx_dict, libcephfs.CEPH_SETATTR_MODE |
+ libcephfs.CEPH_SETATTR_UID |
+ libcephfs.CEPH_SETATTR_GID |
+ libcephfs.CEPH_SETATTR_MTIME |
+ libcephfs.CEPH_SETATTR_ATIME |
+ libcephfs.CEPH_SETATTR_CTIME |
+ libcephfs.CEPH_SETATTR_SIZE |
+ libcephfs.CEPH_SETATTR_BTIME)
+ st1 = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE |
+ libcephfs.CEPH_STATX_UID |
+ libcephfs.CEPH_STATX_GID |
+ libcephfs.CEPH_STATX_MTIME |
+ libcephfs.CEPH_STATX_ATIME |
+ libcephfs.CEPH_STATX_CTIME |
+ libcephfs.CEPH_STATX_SIZE |
+ libcephfs.CEPH_STATX_BTIME, 0)
+ assert_equal(mode, st1["mode"])
+ assert_equal(9999, st1["uid"])
+ assert_equal(9999, st1["gid"])
+ assert_equal(int(dt.timestamp()), int(st1["mtime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["atime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["ctime"].timestamp()))
+ assert_equal(int(dt.timestamp()), int(st1["btime"].timestamp()))
+ assert_equal(10, st1["size"])
+ cephfs.close(fd)
+ cephfs.unlink(b'file-fsetattrx')
+
+
class TestRmtree:
'''
Test rmtree() method of CephFS python bindings.