from StringIO import StringIO
import json
import logging
+from gevent import Greenlet
import os
import time
import datetime
"""
return self._run_tool("cephfs-table-tool", args, None, quiet)
- def data_scan(self, args, quiet=False):
+ def data_scan(self, args, quiet=False, worker_count=1):
"""
Invoke cephfs-data-scan with the passed arguments, and return its stdout
+
+ :param worker_count: if greater than 1, multiple workers will be run
+ in parallel and the return value will be None
"""
- return self._run_tool("cephfs-data-scan", args, None, quiet)
+
+ workers = []
+
+ for n in range(0, worker_count):
+ if worker_count > 1:
+ # data-scan args first token is a command, followed by args to it.
+ # insert worker arguments after the command.
+ cmd = args[0]
+ worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
+ else:
+ worker_args = args
+
+ workers.append(Greenlet.spawn(lambda wargs=worker_args:
+ self._run_tool("cephfs-data-scan", wargs, None, quiet)))
+
+ for w in workers:
+ w.get()
+
+ if worker_count == 1:
+ return workers[0].value
+ else:
+ return None
from collections import namedtuple
from teuthology.orchestra.run import CommandFailedError
-from tasks.cephfs.cephfs_test_case import CephFSTestCase
-
+from tasks.cephfs.cephfs_test_case import CephFSTestCase, long_running
log = logging.getLogger(__name__)
return self._errors
+class ManyFilesWorkload(Workload):
+ def __init__(self, filesystem, mount, file_count):
+ super(ManyFilesWorkload, self).__init__(filesystem, mount)
+ self.file_count = file_count
+
+ def write(self):
+ self._mount.run_shell(["mkdir", "subdir"])
+ for n in range(0, self.file_count):
+ self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
+
+ def validate(self):
+ for n in range(0, self.file_count):
+ try:
+ self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
+ except CommandFailedError as e:
+ self._errors.append(
+ ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
+ )
+
+ return self._errors
+
+
class MovedDir(Workload):
def write(self):
# Create a nested dir that we will then move. Two files with two different
mds_map = self.fs.get_mds_map()
return rank in mds_map['damaged']
- def _rebuild_metadata(self, workload):
+ def _rebuild_metadata(self, workload, workers=1):
"""
That when all objects in metadata pool are removed, we can rebuild a metadata pool
based on the contents of a data pool, and a client can see and read our files.
self.fs.journal_tool(["journal", "reset"])
self.fs.journal_tool(["journal", "reset", "--force"])
self.fs.data_scan(["init"])
- self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
- self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
+ self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
+ self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
# Mark the MDS repaired
self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
frag_obj_id = "{0:x}.00000000".format(dir_ino)
keys = self._dirfrag_keys(frag_obj_id)
self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
+
+ @long_running
+ def test_parallel_execution(self):
+ self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)