]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: add ceph-rbd windows service restart test
authorLucian Petrut <lpetrut@cloudbasesolutions.com>
Thu, 12 Jan 2023 10:55:06 +0000 (12:55 +0200)
committerLucian Petrut <lpetrut@cloudbasesolutions.com>
Thu, 23 May 2024 08:15:43 +0000 (08:15 +0000)
We're adding a test that:

* maps a configurable number of images
* runs a specified test - we're reusing the ones from stress_test,
  making just a few minor changes to allow running the same test
  multiple times
* restarts the ceph-rbd Windows service
* waits for the images to be reconnected and refreshes the mount
  information
* reruns the test
* repeats the above workflow for a specified number of times,
  reusing the same images

This test ensures that:

* mounted images are still available after a service restart
* drive letters are retained
* the image content is retained
* there are no race conditions when connecting or disconnecting
  a large number of images in parallel
* the driver is capable of mapping a specified number of images
  simultaneously

Signed-off-by: Lucian Petrut <lpetrut@cloudbasesolutions.com>
(cherry picked from commit d6d36b535c86618aa754298f724854a6fde31140)

qa/workunits/windows/py_tests/internal/rbd_image.py
qa/workunits/windows/py_tests/internal/task_group.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/tracer.py
qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py [new file with mode: 0644]
qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py
qa/workunits/windows/run-tests.ps1

index be2f2300ff84d3b11dc8391f5324337d6bd341e9..772819ee804afe735e82ffa58eb866c076a842e4 100644 (file)
@@ -7,6 +7,7 @@
 
 import json
 import logging
+import os
 import time
 
 from py_tests.internal import exception
@@ -87,6 +88,28 @@ class RbdImage(object):
 
         return wait_for_disk()
 
+    @Tracer.trace
+    def _wait_for_fs(self,
+                     timeout: int = 60,
+                     retry_interval: int = 2):
+        @utils.retry_decorator(
+            retried_exceptions=exception.CephTestException,
+            additional_details="the mapped fs isn't available yet",
+            timeout=timeout,
+            retry_interval=retry_interval)
+        def wait_for_fs():
+            drive_letter = self._get_drive_letter()
+            path = f"{drive_letter}:\\"
+
+            LOG.debug("Waiting for disk to be accessible: %s %s",
+                      self.name, self.path)
+
+            if not os.path.exists(path):
+                raise exception.CephTestException(
+                    f"path not available yet: {path}")
+
+        return wait_for_fs()
+
     @property
     def path(self):
         return f"\\\\.\\PhysicalDrive{self.disk_number}"
@@ -119,6 +142,25 @@ class RbdImage(object):
         elapsed = time.time() - tstart
         self._wait_for_disk(timeout=timeout - elapsed)
 
+    @Tracer.trace
+    def refresh_after_remap(self, timeout: int = 60):
+        tstart = time.time()
+
+        # The disk number may change after a remap, we need to refresh it.
+        self.disk_number = self.get_disk_number(timeout=timeout)
+
+        elapsed = time.time() - tstart
+        self._wait_for_disk(timeout=timeout - elapsed)
+
+        if self.drive_letter:
+            elapsed = time.time() - tstart
+            self._wait_for_fs(timeout=timeout - elapsed)
+
+            drive_letter = self._get_drive_letter()
+
+            # We expect the drive letter to remain the same after a remap.
+            assert self.drive_letter == drive_letter
+
     @Tracer.trace
     def unmap(self):
         if self.mapped:
@@ -170,10 +212,11 @@ class RbdImage(object):
 
         # The PowerShell command will place a null character if no drive letter
         # is available. For example, we can receive "\x00\r\n".
-        self.drive_letter = result.stdout.decode().strip()
-        if not self.drive_letter.isalpha() or len(self.drive_letter) != 1:
+        drive_letter = result.stdout.decode().strip()
+        if not drive_letter.isalpha() or len(drive_letter) != 1:
             raise exception.CephTestException(
-                "Invalid drive letter received: %s" % self.drive_letter)
+                "Invalid drive letter received: %s" % drive_letter)
+        return drive_letter
 
     @Tracer.trace
     def init_fs(self):
@@ -186,7 +229,7 @@ class RbdImage(object):
         self._init_disk()
         self._create_partition()
         self._format_volume()
-        self._get_drive_letter()
+        self.drive_letter = self._get_drive_letter()
 
     @Tracer.trace
     def get_fs_capacity(self):
diff --git a/qa/workunits/windows/py_tests/internal/task_group.py b/qa/workunits/windows/py_tests/internal/task_group.py
new file mode 100644 (file)
index 0000000..ccdba44
--- /dev/null
@@ -0,0 +1,63 @@
+from concurrent import futures
+import logging
+import threading
+
+
+LOG = logging.getLogger()
+
+
+class TaskGroup(object):
+    def __init__(self, max_workers=1, stop_on_error=True):
+        self._executor = futures.ThreadPoolExecutor(max_workers=max_workers)
+        self._lock = threading.Lock()
+
+        self.errors = 0
+        self.completed = 0
+        self.pending = 0
+
+        self.stopped = False
+        self.stop_on_error = stop_on_error
+
+        self._submitted_tasks = []
+
+    def _wrap_task(self, task):
+        def wrapper():
+            with self._lock:
+                if self.stopped:
+                    self.pending -= 1
+                    return
+
+            try:
+                task()
+            except Exception as ex:
+                with self._lock:
+                    if self.stop_on_error:
+                        self.stopped = True
+
+                    self.errors += 1
+                    LOG.exception(
+                        "Task exception: %s. Total exceptions: %d",
+                        ex, self.errors)
+            finally:
+                with self._lock:
+                    self.completed += 1
+                    self.pending -= 1
+                    LOG.info("Completed tasks: %d. Pending: %d",
+                             self.completed, self.pending)
+
+        return wrapper
+
+    def submit(self, task):
+        task_wrapper = self._wrap_task(task)
+
+        with self._lock:
+            self.pending += 1
+
+        submitted_task = self._executor.submit(task_wrapper)
+        self._submitted_tasks.append(submitted_task)
+
+    def join(self):
+        LOG.info("Waiting for %d tasks to complete.",
+                 len(self._submitted_tasks))
+        futures.wait(self._submitted_tasks)
+        LOG.info("Tasks completed.")
index 52a64b7be3c4f1b8edce5a87f6e967a23f2d0835..d80b0a5ffe95ce8edf9403934cc220d7e61e6fff 100644 (file)
@@ -31,7 +31,7 @@ class Tracer:
             try:
                 return func(*args, **kwargs)
             except Exception as exc:
-                exc_str = str(exc)
+                exc_str = "%r: %s" % (exc, exc)
                 raise
             finally:
                 tend = time.time()
diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py b/qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py
new file mode 100644 (file)
index 0000000..a4c9142
--- /dev/null
@@ -0,0 +1,232 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+import argparse
+import logging
+import typing
+
+from py_tests.internal import exception
+from py_tests.internal import task_group
+from py_tests.internal.tracer import Tracer
+from py_tests.internal import utils
+from py_tests.rbd_wnbd import stress_test
+
+LOG = logging.getLogger()
+
+parser = argparse.ArgumentParser(description='rbd-wnbd service restart test')
+parser.add_argument('--test-name',
+                    help='The test to be run.',
+                    default="RbdStampTest")
+parser.add_argument('--iterations',
+                    help='Total number of test iterations',
+                    default=2, type=int)
+parser.add_argument('--image-count',
+                    help='The number of images to use.',
+                    default=8, type=int)
+parser.add_argument('--concurrency',
+                    help='The number of workers to use when '
+                         'initializing and running the tests.',
+                    default=4, type=int)
+parser.add_argument('--fio-iterations',
+                    help='Total number of benchmark iterations per disk.',
+                    default=1, type=int)
+parser.add_argument('--fio-workers',
+                    help='Total number of fio workers per disk.',
+                    default=1, type=int)
+parser.add_argument('--fio-depth',
+                    help='The number of concurrent asynchronous operations '
+                         'executed per disk',
+                    default=64, type=int)
+parser.add_argument('--fio-verify',
+                    help='The mechanism used to validate the written '
+                         'data. Examples: crc32c, md5, sha1, null, etc. '
+                         'If set to null, the written data will not be '
+                         'verified.',
+                    default='crc32c')
+parser.add_argument('--bs',
+                    help='Benchmark block size.',
+                    default="2M")
+parser.add_argument('--op',
+                    help='Benchmark operation. '
+                         'Examples: read, randwrite, rw, etc.',
+                    default="rw")
+parser.add_argument('--image-prefix',
+                    help='The image name prefix.',
+                    default="cephTest-")
+parser.add_argument('--image-size-mb',
+                    help='The image size in megabytes.',
+                    default=32, type=int)
+parser.add_argument('--map-timeout',
+                    help='Image map timeout.',
+                    default=60, type=int)
+parser.add_argument('--skip-enabling-disk', action='store_true',
+                    help='If set, the disk will not be turned online and the '
+                         'read-only flag will not be removed. Useful when '
+                         'the SAN policy is set to "onlineAll".')
+parser.add_argument('--verbose', action='store_true',
+                    help='Print info messages.')
+parser.add_argument('--debug', action='store_true',
+                    help='Print debug messages.')
+parser.add_argument('--stop-on-error', action='store_true',
+                    help='Stop testing when hitting errors.')
+parser.add_argument('--skip-cleanup-on-error', action='store_true',
+                    help='Skip cleanup when hitting errors.')
+
+
+class ServiceRestartTestRunner(object):
+    def __init__(self,
+                 test_cls: typing.Type[stress_test.RbdTest],
+                 test_params: dict = {},
+                 iterations: int = 1,
+                 image_count: int = 8,
+                 workers: int = 1,
+                 stop_on_error: bool = False,
+                 cleanup_on_error: bool = True):
+        self.test_cls = test_cls
+        self.test_params = test_params
+        self.iterations = iterations
+        self.image_count = image_count
+        self.workers = workers
+        self.errors = 0
+        self.stop_on_error = stop_on_error
+        self.cleanup_on_error = cleanup_on_error
+
+        self.test_instances: list[stress_test.RbdTest] = []
+
+    @Tracer.trace
+    def initialize(self):
+        LOG.info("Initializing mappings")
+
+        tg = task_group.TaskGroup(max_workers=self.workers,
+                                  stop_on_error=self.stop_on_error)
+
+        for idx in range(self.image_count):
+            test = self.test_cls(**self.test_params)
+            self.test_instances.append(test)
+
+            tg.submit(test.initialize)
+
+        tg.join()
+        self.errors += tg.errors
+
+    @Tracer.trace
+    def cleanup(self):
+        LOG.info("Performing cleanup")
+
+        tg = task_group.TaskGroup(max_workers=self.workers,
+                                  stop_on_error=self.stop_on_error)
+
+        for test_instance in self.test_instances:
+            tg.submit(test_instance.cleanup)
+
+        tg.join()
+        self.errors += tg.errors
+
+    @Tracer.trace
+    def run_tests(self):
+        LOG.info("Running the tests")
+
+        tg = task_group.TaskGroup(max_workers=self.workers,
+                                  stop_on_error=self.stop_on_error)
+
+        for test_instance in self.test_instances:
+            tg.submit(test_instance.run)
+
+        tg.join()
+        self.errors += tg.errors
+
+    @Tracer.trace
+    def _restart_service(self):
+        LOG.info("Restarting ceph-rbd service")
+
+        utils.ps_execute("restart-service", "ceph-rbd")
+
+    @Tracer.trace
+    def _refresh_test_instances(self):
+        LOG.info("Refreshing mappings after service restart")
+
+        tg = task_group.TaskGroup(max_workers=self.workers,
+                                  stop_on_error=self.stop_on_error)
+
+        for test_instance in self.test_instances:
+            tg.submit(test_instance.image.refresh_after_remap)
+
+        tg.join()
+        self.errors += tg.errors
+
+    @Tracer.trace
+    def run(self):
+        try:
+            self.initialize()
+
+            for iteration in range(self.iterations):
+                self.run_tests()
+
+                self._restart_service()
+
+                self._refresh_test_instances()
+        except Exception:
+            LOG.exception("Test failed")
+            self.errors += 1
+        finally:
+            if not self.errors or self.cleanup_on_error:
+                self.cleanup()
+
+
+TESTS: typing.Dict[str, typing.Type[stress_test.RbdTest]] = {
+    'RbdTest': stress_test.RbdTest,
+    'RbdFioTest': stress_test.RbdFioTest,
+    'RbdStampTest': stress_test.RbdStampTest,
+    # FS tests
+    'RbdFsTest': stress_test.RbdFsTest,
+    'RbdFsFioTest': stress_test.RbdFsFioTest,
+    'RbdFsStampTest': stress_test.RbdFsStampTest,
+}
+
+if __name__ == '__main__':
+    args = parser.parse_args()
+
+    log_level = logging.WARNING
+    if args.verbose:
+        log_level = logging.INFO
+    if args.debug:
+        log_level = logging.DEBUG
+    utils.setup_logging(log_level)
+
+    test_params = dict(
+        image_size_mb=args.image_size_mb,
+        image_prefix=args.image_prefix,
+        bs=args.bs,
+        op=args.op,
+        verify=args.fio_verify,
+        iodepth=args.fio_depth,
+        map_timeout=args.map_timeout,
+        skip_enabling_disk=args.skip_enabling_disk,
+    )
+
+    try:
+        test_cls = TESTS[args.test_name]
+    except KeyError:
+        raise exception.CephTestException(
+            "Unknown test: {}".format(args.test_name))
+
+    runner = ServiceRestartTestRunner(
+        test_cls,
+        test_params=test_params,
+        iterations=args.iterations,
+        image_count=args.image_count,
+        workers=args.concurrency,
+        stop_on_error=args.stop_on_error,
+        cleanup_on_error=not args.skip_cleanup_on_error)
+    runner.run()
+
+    Tracer.print_results()
+    test_cls.print_results(
+        description="count: %d, concurrency: %d" %
+        (args.iterations, args.concurrency))
+
+    assert runner.errors == 0, f"encountered {runner.errors} error(s)."
index 78f9555dd3b0efc646e2464cd0c03d2ff38e35df..0c50e6afe97799c5ab7df3a80910654ccda9932d 100644 (file)
@@ -393,8 +393,8 @@ class RbdStampTest(RbdTest):
             # we aren't writing to the wrong disk.
             time.sleep(self._rand_float(0, 5))
 
-            stamp = self._read_stamp()
-            assert self._previous_stamp == stamp
+            r_stamp = self._read_stamp()
+            assert self._previous_stamp == r_stamp
 
         w_stamp = self._get_stamp()
         self._write_stamp(w_stamp)
@@ -413,7 +413,7 @@ class RbdFsStampTest(RbdFsTestMixin, RbdStampTest):
         return self.get_subpath("test-stamp")
 
 
-class TestRunner(object):
+class StressTestRunner(object):
     def __init__(self,
                  test_cls: typing.Type[RbdTest],
                  test_params: dict = {},
@@ -521,7 +521,7 @@ if __name__ == '__main__':
         raise exception.CephTestException(
             "Unknown test: {}".format(args.test_name))
 
-    runner = TestRunner(
+    runner = StressTestRunner(
         test_cls,
         test_params=test_params,
         iterations=args.iterations,
index 7f44e87acf6c3a8d8aa4c59ead44ebc6da56c1a4..e0ee8de948ddf608b4f5de9b92c60b98c658c39d 100644 (file)
@@ -26,3 +26,17 @@ safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsFioTest -
 safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsStampTest --iterations 4
 
 safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdResizeFioTest --image-size-mb 64
+
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdTest --iterations=3 --image-count=50 --concurrency=8
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdFioTest --iterations=3 --image-count=50 --concurrency=8
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdStampTest --iterations=3 --image-count=50 --concurrency=8
+
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdFsTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdFsFioTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64
+safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test `
+    --test-name=RbdFsStampTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64