at the path passed in testpath for the given list of mounts. If
testpath is empty, the file is created at the root of the CephFS.
"""
- self.mounts = mounts
- self.filepaths, self.filedata = [], 'testdata'
- dirname, filename = 'testdir', 'testfile'
+ dirname, filename, filedata = 'testdir', 'testfile', 'testdata'
+ self.test_set = []
# XXX: The reason behind testpath[1:] below is that the testpath is
# supposed to contain a path inside CephFS (which might be passed as
# an absolute path). os.path.join() deletes all previous path
if testpath == '/':
testpath = ''
- for mount_x in self.mounts:
+ for mount_x in mounts:
dirpath = os_path_join(mount_x.hostfs_mntpt, testpath, dirname)
mount_x.run_shell(f'mkdir {dirpath}')
filepath = os_path_join(dirpath, filename)
- mount_x.write_file(filepath, self.filedata)
- self.filepaths.append(filepath)
+ mount_x.write_file(filepath, filedata)
+ self.test_set.append((mount_x, filepath, filedata))
def run_cap_tests(self, perm, mntpt=None):
# TODO
# cephfs dir serving as root for current mnt: /dir1/dir2
# therefore, final path: /mnt/cephfs_x//testdir
if mntpt:
- self.filepaths = [x.replace(mntpt, '') for x in self.filepaths]
+ self.test_set = [(x, y.replace(mntpt, ''), z) for x, y, z in \
+ self.test_set]
self.conduct_pos_test_for_read_caps()
raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
def conduct_pos_test_for_read_caps(self):
- for mount in self.mounts:
- for path, data in zip(self.filepaths, (self.filedata,)):
- # XXX: conduct tests only if path belongs to current mount; in
- # teuth tests client are located on same machines.
- if path.find(mount.hostfs_mntpt) != -1:
- contents = mount.read_file(path)
- self.assertEqual(data, contents)
+ for mount, path, data in self.test_set:
+ contents = mount.read_file(path)
+ self.assertEqual(data, contents)
def conduct_pos_test_for_write_caps(self):
- for mount in self.mounts:
- for path, data in zip(self.filepaths, (self.filedata,)):
- if path.find(mount.hostfs_mntpt) != -1:
- # test that write was successful
- mount.write_file(path=path, data=data)
- # verify that contents written was same as the one that was
- # intended
- contents1 = mount.read_file(path=path)
- self.assertEqual(data, contents1)
+ for mount, path, data in self.test_set:
+ mount.write_file(path=path, data=data)
+ contents = mount.read_file(path=path)
+ self.assertEqual(data, contents)
def conduct_neg_test_for_write_caps(self):
possible_errmsgs = ('permission denied', 'operation not permitted')
cmdargs = ['echo', 'some random data', Raw('|'), 'tee']
- for mount in self.mounts:
- for path in self.filepaths:
- if path.find(mount.hostfs_mntpt) != -1:
- cmdargs.append(path)
- mount.negtestcmd(args=cmdargs, retval=1,
- errmsgs=possible_errmsgs)
- cmdargs.pop(-1)
+ # don't use data, cmd args to write are set already above.
+ for mount, path, data in self.test_set:
+ cmdargs.append(path)
+ mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs)
+ cmdargs.pop(-1)
def get_mon_cap_from_keyring(self, client_name):
keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')