except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.ENOENT, error_message)
+ def test_subvolume_create_without_normalization(self):
+ # create subvolume
+ subvolume = self._gen_subvol_name()
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # make sure it exists
+ subvolpath = self._get_subvolume_path(self.volname, subvolume)
+ self.assertNotEqual(subvolpath, None)
+
+ # check normalization
+ try:
+ self._fs_cmd("subvolume", "charmap", "get", self.volname, subvolume, "normalization")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENODATA)
+ else:
+ self.fail("expected the 'fs subvolume charmap' command to fail")
+
+ def test_subvolume_create_with_normalization(self):
+ # create subvolume
+ subvolume = self._gen_subvol_name()
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--normalization", "nfc")
+
+ # make sure it exists
+ subvolpath = self._get_subvolume_path(self.volname, subvolume)
+ self.assertNotEqual(subvolpath, None)
+
+ # check normalization
+ normalization = self._fs_cmd("subvolume", "charmap", "get", self.volname, subvolume, "normalization")
+ self.assertEqual(normalization.strip(), "nfc")
+
def test_subvolume_expand(self):
"""
That a subvolume can be expanded in size and its quota matches the expected size.
except cephfs.NoData:
attrs["normalization"] = None
+ try:
+ case_insensitive = self.fs.getxattr(pathname, 'ceph.dir.caseinsensitive').decode('utf-8')
+ attrs["case_insensitive"] = case_insensitive == "0"
+ except cephfs.NoData:
+ attrs["case_insensitive"] = False
+
return attrs
def set_attrs(self, path, attrs):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
+ case_insensitive = attrs.get("case_insensitive")
+ if case_insensitive:
+ try:
+ self.fs.setxattr(path, "ceph.dir.casesensitive", "0".encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
def _resize(self, path, newsize, noshrink):
try:
newsize = int(newsize)
except cephfs.NoData:
normalization = "none"
+ try:
+ case_insensitive = self.fs.getxattr(subvolpath,
+ 'ceph.dir.casesensitive'
+ ).decode('utf-8')
+ case_insensitive = case_insensitive == "0"
+ except cephfs.NoData:
+ case_insensitive = False
+
return {'path': subvolpath,
'type': etype.value,
'uid': int(st["uid"]),
'state': self.state.value,
'earmark': earmark,
'normalization': normalization,
+ 'case_insensitive': case_insensitive,
}
def set_user_metadata(self, keyname, value):