'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
])
- def _run_python(self, pyscript, py_version='python3', sudo=False):
+ def _run_python(self, pyscript, py_version='python3', sudo=False, timeout=None):
args, omit_sudo = [], True
if sudo:
args.append('sudo')
omit_sudo = False
- args += ['stdin-killer', '--', py_version, '-c', pyscript]
+ timeout_args = ['--timeout', "%d" % timeout] if timeout is not None else []
+ args += ['stdin-killer', *timeout_args, '--', py_version, '-c', pyscript]
return self.client_remote.run(args=args, wait=False, stdin=run.PIPE,
stdout=StringIO(), omit_sudo=omit_sudo)
- def run_python(self, pyscript, py_version='python3', sudo=False):
- p = self._run_python(pyscript, py_version, sudo=sudo)
+ def run_python(self, pyscript, py_version='python3', sudo=False, timeout=None):
+ p = self._run_python(pyscript, py_version, sudo=sudo, timeout=timeout)
p.wait()
return p.stdout.getvalue().strip()
size=size
)))
- def validate_test_pattern(self, filename, size):
+ def validate_test_pattern(self, filename, size, timeout=None):
log.info("Validating {0} bytes from {1}".format(size, filename))
# Use sudo because cephfs-data-scan may recreate the file with owner==root
return self.run_python(dedent("""
""".format(
path=os.path.join(self.hostfs_mntpt, filename),
size=size
- )), sudo=True)
+ )), sudo=True, timeout=timeout)
def open_n_background(self, fs_path, count):
"""
self.mount_a.write_test_pattern("d0/d1/file_a", 8 * 1024 * 1024)
self.mount_a.run_shell(["mkdir", "d0/.snap/s1"])
self.mount_a.run_shell(["rm", "-f", "d0/d1/file_a"])
- self.mount_a.validate_test_pattern("d0/.snap/s1/d1/file_a", 8 * 1024 * 1024)
+ self.mount_a.validate_test_pattern("d0/.snap/s1/d1/file_a", 8 * 1024 * 1024, timeout=20)
self.mount_a.run_shell(["rmdir", "d0/.snap/s1"])
self.mount_a.run_shell(["rm", "-rf", "d0"])