]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks/cephfs: test sharded cephfs-data-scan
authorJohn Spray <john.spray@redhat.com>
Wed, 23 Dec 2015 13:57:02 +0000 (13:57 +0000)
committerJohn Spray <john.spray@redhat.com>
Fri, 29 Jan 2016 10:26:32 +0000 (10:26 +0000)
Signed-off-by: John Spray <john.spray@redhat.com>
tasks/cephfs/filesystem.py
tasks/cephfs/test_data_scan.py

index 368f2c476a3e90382d448d5d10404915159d7b32..d32d85e6f0591f0aca65ea39a6283181e0ffa718 100644 (file)
@@ -2,6 +2,7 @@
 from StringIO import StringIO
 import json
 import logging
+from gevent import Greenlet
 import os
 import time
 import datetime
@@ -752,8 +753,32 @@ class Filesystem(object):
         """
         return self._run_tool("cephfs-table-tool", args, None, quiet)
 
-    def data_scan(self, args, quiet=False):
+    def data_scan(self, args, quiet=False, worker_count=1):
         """
         Invoke cephfs-data-scan with the passed arguments, and return its stdout
+
+        :param worker_count: if greater than 1, multiple workers will be run
+                             in parallel and the return value will be None
         """
-        return self._run_tool("cephfs-data-scan", args, None, quiet)
+
+        workers = []
+
+        for n in range(0, worker_count):
+            if worker_count > 1:
+                # data-scan args first token is a command, followed by args to it.
+                # insert worker arguments after the command.
+                cmd = args[0]
+                worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
+            else:
+                worker_args = args
+
+            workers.append(Greenlet.spawn(lambda wargs=worker_args:
+                                          self._run_tool("cephfs-data-scan", wargs, None, quiet)))
+
+        for w in workers:
+            w.get()
+
+        if worker_count == 1:
+            return workers[0].value
+        else:
+            return None
index 47b9651cec5632ef696c38bada4b064b81a20f5c..e92447434a7a9373ecbe5bb46d9415530df85337 100644 (file)
@@ -10,8 +10,7 @@ import traceback
 from collections import namedtuple
 
 from teuthology.orchestra.run import CommandFailedError
-from tasks.cephfs.cephfs_test_case import CephFSTestCase
-
+from tasks.cephfs.cephfs_test_case import CephFSTestCase, long_running
 
 log = logging.getLogger(__name__)
 
@@ -212,6 +211,28 @@ class StripedStashedLayout(Workload):
         return self._errors
 
 
+class ManyFilesWorkload(Workload):
+    def __init__(self, filesystem, mount, file_count):
+        super(ManyFilesWorkload, self).__init__(filesystem, mount)
+        self.file_count = file_count
+
+    def write(self):
+        self._mount.run_shell(["mkdir", "subdir"])
+        for n in range(0, self.file_count):
+            self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
+
+    def validate(self):
+        for n in range(0, self.file_count):
+            try:
+                self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
+            except CommandFailedError as e:
+                self._errors.append(
+                    ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
+                )
+
+        return self._errors
+
+
 class MovedDir(Workload):
     def write(self):
         # Create a nested dir that we will then move.  Two files with two different
@@ -289,7 +310,7 @@ class TestDataScan(CephFSTestCase):
         mds_map = self.fs.get_mds_map()
         return rank in mds_map['damaged']
 
-    def _rebuild_metadata(self, workload):
+    def _rebuild_metadata(self, workload, workers=1):
         """
         That when all objects in metadata pool are removed, we can rebuild a metadata pool
         based on the contents of a data pool, and a client can see and read our files.
@@ -336,8 +357,8 @@ class TestDataScan(CephFSTestCase):
                 self.fs.journal_tool(["journal", "reset"])
         self.fs.journal_tool(["journal", "reset", "--force"])
         self.fs.data_scan(["init"])
-        self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
-        self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
+        self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
+        self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
 
         # Mark the MDS repaired
         self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
@@ -470,3 +491,7 @@ class TestDataScan(CephFSTestCase):
         frag_obj_id = "{0:x}.00000000".format(dir_ino)
         keys = self._dirfrag_keys(frag_obj_id)
         self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
+
+    @long_running
+    def test_parallel_execution(self):
+        self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)