From 65a14e61f5f5b1f30144c887d7296f7699ab5492 Mon Sep 17 00:00:00 2001 From: Lucian Petrut Date: Thu, 12 Jan 2023 12:55:06 +0200 Subject: [PATCH] qa: add ceph-rbd windows service restart test We're adding a test that: * maps a configurable number of images * runs a specified test - we're reusing the ones from stress_test, making just a few minor changes to allow running the same test multiple times * restarts the ceph-rbd Windows service * waits for the images to be reconnected and refreshes the mount information * reruns the test * repeats the above workflow for a specified number of times, reusing the same images This test ensures that: * mounted images are still available after a service restart * drive letters are retained * the image content is retained * there are no race conditions when connecting or disconnecting a large number of images in parallel * the driver is capable of mapping a specified number of images simultaneously Signed-off-by: Lucian Petrut (cherry picked from commit d6d36b535c86618aa754298f724854a6fde31140) --- .../windows/py_tests/internal/rbd_image.py | 51 +++- .../windows/py_tests/internal/task_group.py | 63 +++++ .../windows/py_tests/internal/tracer.py | 2 +- .../py_tests/rbd_wnbd/service_restart_test.py | 232 ++++++++++++++++++ .../windows/py_tests/rbd_wnbd/stress_test.py | 8 +- qa/workunits/windows/run-tests.ps1 | 14 ++ 6 files changed, 361 insertions(+), 9 deletions(-) create mode 100644 qa/workunits/windows/py_tests/internal/task_group.py create mode 100644 qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py diff --git a/qa/workunits/windows/py_tests/internal/rbd_image.py b/qa/workunits/windows/py_tests/internal/rbd_image.py index be2f2300ff8..772819ee804 100644 --- a/qa/workunits/windows/py_tests/internal/rbd_image.py +++ b/qa/workunits/windows/py_tests/internal/rbd_image.py @@ -7,6 +7,7 @@ import json import logging +import os import time from py_tests.internal import exception @@ -87,6 +88,28 @@ class RbdImage(object): return wait_for_disk() + @Tracer.trace + def _wait_for_fs(self, + timeout: int = 60, + retry_interval: int = 2): + @utils.retry_decorator( + retried_exceptions=exception.CephTestException, + additional_details="the mapped fs isn't available yet", + timeout=timeout, + retry_interval=retry_interval) + def wait_for_fs(): + drive_letter = self._get_drive_letter() + path = f"{drive_letter}:\\" + + LOG.debug("Waiting for disk to be accessible: %s %s", + self.name, self.path) + + if not os.path.exists(path): + raise exception.CephTestException( + f"path not available yet: {path}") + + return wait_for_fs() + @property def path(self): return f"\\\\.\\PhysicalDrive{self.disk_number}" @@ -119,6 +142,25 @@ class RbdImage(object): elapsed = time.time() - tstart self._wait_for_disk(timeout=timeout - elapsed) + @Tracer.trace + def refresh_after_remap(self, timeout: int = 60): + tstart = time.time() + + # The disk number may change after a remap, we need to refresh it. + self.disk_number = self.get_disk_number(timeout=timeout) + + elapsed = time.time() - tstart + self._wait_for_disk(timeout=timeout - elapsed) + + if self.drive_letter: + elapsed = time.time() - tstart + self._wait_for_fs(timeout=timeout - elapsed) + + drive_letter = self._get_drive_letter() + + # We expect the drive letter to remain the same after a remap. + assert self.drive_letter == drive_letter + @Tracer.trace def unmap(self): if self.mapped: @@ -170,10 +212,11 @@ class RbdImage(object): # The PowerShell command will place a null character if no drive letter # is available. For example, we can receive "\x00\r\n". - self.drive_letter = result.stdout.decode().strip() - if not self.drive_letter.isalpha() or len(self.drive_letter) != 1: + drive_letter = result.stdout.decode().strip() + if not drive_letter.isalpha() or len(drive_letter) != 1: raise exception.CephTestException( - "Invalid drive letter received: %s" % self.drive_letter) + "Invalid drive letter received: %s" % drive_letter) + return drive_letter @Tracer.trace def init_fs(self): @@ -186,7 +229,7 @@ class RbdImage(object): self._init_disk() self._create_partition() self._format_volume() - self._get_drive_letter() + self.drive_letter = self._get_drive_letter() @Tracer.trace def get_fs_capacity(self): diff --git a/qa/workunits/windows/py_tests/internal/task_group.py b/qa/workunits/windows/py_tests/internal/task_group.py new file mode 100644 index 00000000000..ccdba44233d --- /dev/null +++ b/qa/workunits/windows/py_tests/internal/task_group.py @@ -0,0 +1,63 @@ +from concurrent import futures +import logging +import threading + + +LOG = logging.getLogger() + + +class TaskGroup(object): + def __init__(self, max_workers=1, stop_on_error=True): + self._executor = futures.ThreadPoolExecutor(max_workers=max_workers) + self._lock = threading.Lock() + + self.errors = 0 + self.completed = 0 + self.pending = 0 + + self.stopped = False + self.stop_on_error = stop_on_error + + self._submitted_tasks = [] + + def _wrap_task(self, task): + def wrapper(): + with self._lock: + if self.stopped: + self.pending -= 1 + return + + try: + task() + except Exception as ex: + with self._lock: + if self.stop_on_error: + self.stopped = True + + self.errors += 1 + LOG.exception( + "Task exception: %s. Total exceptions: %d", + ex, self.errors) + finally: + with self._lock: + self.completed += 1 + self.pending -= 1 + LOG.info("Completed tasks: %d. Pending: %d", + self.completed, self.pending) + + return wrapper + + def submit(self, task): + task_wrapper = self._wrap_task(task) + + with self._lock: + self.pending += 1 + + submitted_task = self._executor.submit(task_wrapper) + self._submitted_tasks.append(submitted_task) + + def join(self): + LOG.info("Waiting for %d tasks to complete.", + len(self._submitted_tasks)) + futures.wait(self._submitted_tasks) + LOG.info("Tasks completed.") diff --git a/qa/workunits/windows/py_tests/internal/tracer.py b/qa/workunits/windows/py_tests/internal/tracer.py index 52a64b7be3c..d80b0a5ffe9 100644 --- a/qa/workunits/windows/py_tests/internal/tracer.py +++ b/qa/workunits/windows/py_tests/internal/tracer.py @@ -31,7 +31,7 @@ class Tracer: try: return func(*args, **kwargs) except Exception as exc: - exc_str = str(exc) + exc_str = "%r: %s" % (exc, exc) raise finally: tend = time.time() diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py b/qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py new file mode 100644 index 00000000000..a4c9142f30b --- /dev/null +++ b/qa/workunits/windows/py_tests/rbd_wnbd/service_restart_test.py @@ -0,0 +1,232 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + +import argparse +import logging +import typing + +from py_tests.internal import exception +from py_tests.internal import task_group +from py_tests.internal.tracer import Tracer +from py_tests.internal import utils +from py_tests.rbd_wnbd import stress_test + +LOG = logging.getLogger() + +parser = argparse.ArgumentParser(description='rbd-wnbd service restart test') +parser.add_argument('--test-name', + help='The test to be run.', + default="RbdStampTest") +parser.add_argument('--iterations', + help='Total number of test iterations', + default=2, type=int) +parser.add_argument('--image-count', + help='The number of images to use.', + default=8, type=int) +parser.add_argument('--concurrency', + help='The number of workers to use when ' + 'initializing and running the tests.', + default=4, type=int) +parser.add_argument('--fio-iterations', + help='Total number of benchmark iterations per disk.', + default=1, type=int) +parser.add_argument('--fio-workers', + help='Total number of fio workers per disk.', + default=1, type=int) +parser.add_argument('--fio-depth', + help='The number of concurrent asynchronous operations ' + 'executed per disk', + default=64, type=int) +parser.add_argument('--fio-verify', + help='The mechanism used to validate the written ' + 'data. Examples: crc32c, md5, sha1, null, etc. ' + 'If set to null, the written data will not be ' + 'verified.', + default='crc32c') +parser.add_argument('--bs', + help='Benchmark block size.', + default="2M") +parser.add_argument('--op', + help='Benchmark operation. ' + 'Examples: read, randwrite, rw, etc.', + default="rw") +parser.add_argument('--image-prefix', + help='The image name prefix.', + default="cephTest-") +parser.add_argument('--image-size-mb', + help='The image size in megabytes.', + default=32, type=int) +parser.add_argument('--map-timeout', + help='Image map timeout.', + default=60, type=int) +parser.add_argument('--skip-enabling-disk', action='store_true', + help='If set, the disk will not be turned online and the ' + 'read-only flag will not be removed. Useful when ' + 'the SAN policy is set to "onlineAll".') +parser.add_argument('--verbose', action='store_true', + help='Print info messages.') +parser.add_argument('--debug', action='store_true', + help='Print debug messages.') +parser.add_argument('--stop-on-error', action='store_true', + help='Stop testing when hitting errors.') +parser.add_argument('--skip-cleanup-on-error', action='store_true', + help='Skip cleanup when hitting errors.') + + +class ServiceRestartTestRunner(object): + def __init__(self, + test_cls: typing.Type[stress_test.RbdTest], + test_params: dict = {}, + iterations: int = 1, + image_count: int = 8, + workers: int = 1, + stop_on_error: bool = False, + cleanup_on_error: bool = True): + self.test_cls = test_cls + self.test_params = test_params + self.iterations = iterations + self.image_count = image_count + self.workers = workers + self.errors = 0 + self.stop_on_error = stop_on_error + self.cleanup_on_error = cleanup_on_error + + self.test_instances: list[stress_test.RbdTest] = [] + + @Tracer.trace + def initialize(self): + LOG.info("Initializing mappings") + + tg = task_group.TaskGroup(max_workers=self.workers, + stop_on_error=self.stop_on_error) + + for idx in range(self.image_count): + test = self.test_cls(**self.test_params) + self.test_instances.append(test) + + tg.submit(test.initialize) + + tg.join() + self.errors += tg.errors + + @Tracer.trace + def cleanup(self): + LOG.info("Performing cleanup") + + tg = task_group.TaskGroup(max_workers=self.workers, + stop_on_error=self.stop_on_error) + + for test_instance in self.test_instances: + tg.submit(test_instance.cleanup) + + tg.join() + self.errors += tg.errors + + @Tracer.trace + def run_tests(self): + LOG.info("Running the tests") + + tg = task_group.TaskGroup(max_workers=self.workers, + stop_on_error=self.stop_on_error) + + for test_instance in self.test_instances: + tg.submit(test_instance.run) + + tg.join() + self.errors += tg.errors + + @Tracer.trace + def _restart_service(self): + LOG.info("Restarting ceph-rbd service") + + utils.ps_execute("restart-service", "ceph-rbd") + + @Tracer.trace + def _refresh_test_instances(self): + LOG.info("Refreshing mappings after service restart") + + tg = task_group.TaskGroup(max_workers=self.workers, + stop_on_error=self.stop_on_error) + + for test_instance in self.test_instances: + tg.submit(test_instance.image.refresh_after_remap) + + tg.join() + self.errors += tg.errors + + @Tracer.trace + def run(self): + try: + self.initialize() + + for iteration in range(self.iterations): + self.run_tests() + + self._restart_service() + + self._refresh_test_instances() + except Exception: + LOG.exception("Test failed") + self.errors += 1 + finally: + if not self.errors or self.cleanup_on_error: + self.cleanup() + + +TESTS: typing.Dict[str, typing.Type[stress_test.RbdTest]] = { + 'RbdTest': stress_test.RbdTest, + 'RbdFioTest': stress_test.RbdFioTest, + 'RbdStampTest': stress_test.RbdStampTest, + # FS tests + 'RbdFsTest': stress_test.RbdFsTest, + 'RbdFsFioTest': stress_test.RbdFsFioTest, + 'RbdFsStampTest': stress_test.RbdFsStampTest, +} + +if __name__ == '__main__': + args = parser.parse_args() + + log_level = logging.WARNING + if args.verbose: + log_level = logging.INFO + if args.debug: + log_level = logging.DEBUG + utils.setup_logging(log_level) + + test_params = dict( + image_size_mb=args.image_size_mb, + image_prefix=args.image_prefix, + bs=args.bs, + op=args.op, + verify=args.fio_verify, + iodepth=args.fio_depth, + map_timeout=args.map_timeout, + skip_enabling_disk=args.skip_enabling_disk, + ) + + try: + test_cls = TESTS[args.test_name] + except KeyError: + raise exception.CephTestException( + "Unknown test: {}".format(args.test_name)) + + runner = ServiceRestartTestRunner( + test_cls, + test_params=test_params, + iterations=args.iterations, + image_count=args.image_count, + workers=args.concurrency, + stop_on_error=args.stop_on_error, + cleanup_on_error=not args.skip_cleanup_on_error) + runner.run() + + Tracer.print_results() + test_cls.print_results( + description="count: %d, concurrency: %d" % + (args.iterations, args.concurrency)) + + assert runner.errors == 0, f"encountered {runner.errors} error(s)." diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py b/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py index 78f9555dd3b..0c50e6afe97 100644 --- a/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py +++ b/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py @@ -393,8 +393,8 @@ class RbdStampTest(RbdTest): # we aren't writing to the wrong disk. time.sleep(self._rand_float(0, 5)) - stamp = self._read_stamp() - assert self._previous_stamp == stamp + r_stamp = self._read_stamp() + assert self._previous_stamp == r_stamp w_stamp = self._get_stamp() self._write_stamp(w_stamp) @@ -413,7 +413,7 @@ class RbdFsStampTest(RbdFsTestMixin, RbdStampTest): return self.get_subpath("test-stamp") -class TestRunner(object): +class StressTestRunner(object): def __init__(self, test_cls: typing.Type[RbdTest], test_params: dict = {}, @@ -521,7 +521,7 @@ if __name__ == '__main__': raise exception.CephTestException( "Unknown test: {}".format(args.test_name)) - runner = TestRunner( + runner = StressTestRunner( test_cls, test_params=test_params, iterations=args.iterations, diff --git a/qa/workunits/windows/run-tests.ps1 b/qa/workunits/windows/run-tests.ps1 index 7f44e87acf6..e0ee8de948d 100644 --- a/qa/workunits/windows/run-tests.ps1 +++ b/qa/workunits/windows/run-tests.ps1 @@ -26,3 +26,17 @@ safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsFioTest - safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsStampTest --iterations 4 safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdResizeFioTest --image-size-mb 64 + +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdTest --iterations=3 --image-count=50 --concurrency=8 +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdFioTest --iterations=3 --image-count=50 --concurrency=8 +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdStampTest --iterations=3 --image-count=50 --concurrency=8 + +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdFsTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64 +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdFsFioTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64 +safe_exec python.exe -m py_tests.rbd_wnbd.service_restart_test ` + --test-name=RbdFsStampTest --iterations=3 --image-count=8 --concurrency=8 --image-size-mb=64 -- 2.39.5