]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa: reorganize Windows python test
authorLucian Petrut <lpetrut@cloudbasesolutions.com>
Tue, 10 Jan 2023 14:50:04 +0000 (16:50 +0200)
committerLucian Petrut <lpetrut@cloudbasesolutions.com>
Thu, 23 May 2024 08:15:43 +0000 (08:15 +0000)
We're splitting the rbd-wnbd python test into separate files so
that the common code may easily be reused by other tests. This
also makes the code easier to read and maintain.

Signed-off-by: Lucian Petrut <lpetrut@cloudbasesolutions.com>
(cherry picked from commit 808d42d575c97b6d0db0e6c6d88ee5fda97fe1b1)

qa/workunits/windows/py_tests/__init__.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/__init__.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/exception.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/rbd_image.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/tracer.py [new file with mode: 0644]
qa/workunits/windows/py_tests/internal/utils.py [new file with mode: 0644]
qa/workunits/windows/py_tests/rbd_wnbd/__init__.py [new file with mode: 0644]
qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py [new file with mode: 0644]
qa/workunits/windows/run-tests.ps1
qa/workunits/windows/test_rbd_wnbd.py [deleted file]

diff --git a/qa/workunits/windows/py_tests/__init__.py b/qa/workunits/windows/py_tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/workunits/windows/py_tests/internal/__init__.py b/qa/workunits/windows/py_tests/internal/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/workunits/windows/py_tests/internal/exception.py b/qa/workunits/windows/py_tests/internal/exception.py
new file mode 100644 (file)
index 0000000..27a02db
--- /dev/null
@@ -0,0 +1,27 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+class CephTestException(Exception):
+    msg_fmt = "An exception has been encountered."
+
+    def __init__(self, message: str = '', **kwargs):
+        self.kwargs = kwargs
+        if not message:
+            message = self.msg_fmt % kwargs
+        self.message = message
+        super(CephTestException, self).__init__(message)
+
+
+class CommandFailed(CephTestException):
+    msg_fmt = (
+        "Command failed: %(command)s. "
+        "Return code: %(returncode)s. "
+        "Stdout: %(stdout)s. Stderr: %(stderr)s.")
+
+
+class CephTestTimeout(CephTestException):
+    msg_fmt = "Operation timeout."
diff --git a/qa/workunits/windows/py_tests/internal/rbd_image.py b/qa/workunits/windows/py_tests/internal/rbd_image.py
new file mode 100644 (file)
index 0000000..be2f230
--- /dev/null
@@ -0,0 +1,242 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+import json
+import logging
+import time
+
+from py_tests.internal import exception
+from py_tests.internal.tracer import Tracer
+from py_tests.internal import utils
+
+LOG = logging.getLogger()
+
+
+class RbdImage(object):
+    def __init__(self,
+                 name: str,
+                 size_mb: int,
+                 is_shared: bool = True,
+                 disk_number: int = -1,
+                 mapped: bool = False):
+        self.name = name
+        self.size_mb = size_mb
+        self.is_shared = is_shared
+        self.disk_number = disk_number
+        self.mapped = mapped
+        self.removed = False
+        self.drive_letter = ""
+
+    @classmethod
+    @Tracer.trace
+    def create(cls,
+               name: str,
+               size_mb: int = 1024,
+               is_shared: bool = True):
+        LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb)
+        cmd = ["rbd", "create", name, "--size", "%sM" % size_mb]
+        if is_shared:
+            cmd += ["--image-shared"]
+        utils.execute(*cmd)
+
+        return RbdImage(name, size_mb, is_shared)
+
+    @Tracer.trace
+    def get_disk_number(self,
+                        timeout: int = 60,
+                        retry_interval: int = 2):
+        @utils.retry_decorator(
+            retried_exceptions=exception.CephTestException,
+            timeout=timeout,
+            retry_interval=retry_interval)
+        def _get_disk_number():
+            LOG.info("Retrieving disk number: %s", self.name)
+
+            result = utils.execute(
+                "rbd-wnbd", "show", self.name, "--format=json")
+            disk_info = json.loads(result.stdout)
+            disk_number = disk_info["disk_number"]
+            if disk_number > 0:
+                LOG.debug("Image %s disk number: %d", self.name, disk_number)
+                return disk_number
+
+            raise exception.CephTestException(
+                f"Could not get disk number: {self.name}.")
+
+        return _get_disk_number()
+
+    @Tracer.trace
+    def _wait_for_disk(self,
+                       timeout: int = 60,
+                       retry_interval: int = 2):
+        @utils.retry_decorator(
+            retried_exceptions=(FileNotFoundError, OSError),
+            additional_details="the mapped disk isn't available yet",
+            timeout=timeout,
+            retry_interval=retry_interval)
+        def wait_for_disk():
+            LOG.debug("Waiting for disk to be accessible: %s %s",
+                      self.name, self.path)
+
+            with open(self.path, 'rb'):
+                pass
+
+        return wait_for_disk()
+
+    @property
+    def path(self):
+        return f"\\\\.\\PhysicalDrive{self.disk_number}"
+
+    @Tracer.trace
+    @utils.retry_decorator(
+        additional_details="couldn't clear disk read-only flag")
+    def set_writable(self):
+        utils.ps_execute(
+            "Set-Disk", "-Number", str(self.disk_number),
+            "-IsReadOnly", "$false")
+
+    @Tracer.trace
+    @utils.retry_decorator(additional_details="couldn't bring the disk online")
+    def set_online(self):
+        utils.ps_execute(
+            "Set-Disk", "-Number", str(self.disk_number),
+            "-IsOffline", "$false")
+
+    @Tracer.trace
+    def map(self, timeout: int = 60):
+        LOG.info("Mapping image: %s", self.name)
+        tstart = time.time()
+
+        utils.execute("rbd-wnbd", "map", self.name)
+        self.mapped = True
+
+        self.disk_number = self.get_disk_number(timeout=timeout)
+
+        elapsed = time.time() - tstart
+        self._wait_for_disk(timeout=timeout - elapsed)
+
+    @Tracer.trace
+    def unmap(self):
+        if self.mapped:
+            LOG.info("Unmapping image: %s", self.name)
+            utils.execute("rbd-wnbd", "unmap", self.name)
+            self.mapped = False
+
+    @Tracer.trace
+    @utils.retry_decorator()
+    def remove(self):
+        if not self.removed:
+            LOG.info("Removing image: %s", self.name)
+            utils.execute("rbd", "rm", self.name)
+            self.removed = True
+
+    def cleanup(self):
+        try:
+            self.unmap()
+        finally:
+            self.remove()
+
+    @Tracer.trace
+    @utils.retry_decorator()
+    def _init_disk(self):
+        cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk"
+        utils.ps_execute(cmd)
+
+    @Tracer.trace
+    @utils.retry_decorator()
+    def _create_partition(self):
+        cmd = (f"Get-Disk -Number {self.disk_number} | "
+               "New-Partition -AssignDriveLetter -UseMaximumSize")
+        utils.ps_execute(cmd)
+
+    @Tracer.trace
+    @utils.retry_decorator()
+    def _format_volume(self):
+        cmd = (
+            f"(Get-Partition -DiskNumber {self.disk_number}"
+            " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false")
+        utils.ps_execute(cmd)
+
+    @Tracer.trace
+    @utils.retry_decorator()
+    def _get_drive_letter(self):
+        cmd = (f"(Get-Partition -DiskNumber {self.disk_number}"
+               " | ? { $_.DriveLetter }).DriveLetter")
+        result = utils.ps_execute(cmd)
+
+        # The PowerShell command will place a null character if no drive letter
+        # is available. For example, we can receive "\x00\r\n".
+        self.drive_letter = result.stdout.decode().strip()
+        if not self.drive_letter.isalpha() or len(self.drive_letter) != 1:
+            raise exception.CephTestException(
+                "Invalid drive letter received: %s" % self.drive_letter)
+
+    @Tracer.trace
+    def init_fs(self):
+        if not self.mapped:
+            raise exception.CephTestException(
+                "Unable to create fs, image not mapped.")
+
+        LOG.info("Initializing fs, image: %s.", self.name)
+
+        self._init_disk()
+        self._create_partition()
+        self._format_volume()
+        self._get_drive_letter()
+
+    @Tracer.trace
+    def get_fs_capacity(self):
+        if not self.drive_letter:
+            raise exception.CephTestException("No drive letter available")
+
+        cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size"
+        result = utils.ps_execute(cmd)
+
+        return int(result.stdout.decode().strip())
+
+    @Tracer.trace
+    def resize(self, new_size_mb, allow_shrink=False):
+        LOG.info(
+            "Resizing image: %s. New size: %s MB, old size: %s MB",
+            self.name, new_size_mb, self.size_mb)
+
+        cmd = ["rbd", "resize", self.name,
+               "--size", f"{new_size_mb}M", "--no-progress"]
+        if allow_shrink:
+            cmd.append("--allow-shrink")
+
+        utils.execute(*cmd)
+
+        self.size_mb = new_size_mb
+
+    @Tracer.trace
+    def get_disk_size(self):
+        """Retrieve the virtual disk size (bytes) reported by Windows."""
+        cmd = f"(Get-Disk -Number {self.disk_number}).Size"
+        result = utils.ps_execute(cmd)
+
+        disk_size = result.stdout.decode().strip()
+        if not disk_size.isdigit():
+            raise exception.CephTestException(
+                "Invalid disk size received: %s" % disk_size)
+
+        return int(disk_size)
+
+    @Tracer.trace
+    @utils.retry_decorator(timeout=30)
+    def wait_for_disk_resize(self):
+        # After resizing the rbd image, the daemon is expected to receive
+        # the notification, inform the WNBD driver and then trigger a disk
+        # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds,
+        # so we'll need to do some polling.
+        disk_size = self.get_disk_size()
+        disk_size_mb = disk_size // (1 << 20)
+
+        if disk_size_mb != self.size_mb:
+            raise exception.CephTestException(
+                "The disk size hasn't been updated yet. Retrieved size: "
+                f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.")
diff --git a/qa/workunits/windows/py_tests/internal/tracer.py b/qa/workunits/windows/py_tests/internal/tracer.py
new file mode 100644 (file)
index 0000000..52a64b7
--- /dev/null
@@ -0,0 +1,75 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+import collections
+import prettytable
+import threading
+import time
+
+from py_tests.internal import utils
+
+
+class Tracer:
+    data: collections.OrderedDict = collections.OrderedDict()
+    lock = threading.Lock()
+
+    @classmethod
+    def trace(cls, func):
+        def wrapper(*args, **kwargs):
+            tstart = time.time()
+            exc_str = None
+
+            # Preserve call order
+            with cls.lock:
+                if func.__qualname__ not in cls.data:
+                    cls.data[func.__qualname__] = list()
+
+            try:
+                return func(*args, **kwargs)
+            except Exception as exc:
+                exc_str = str(exc)
+                raise
+            finally:
+                tend = time.time()
+
+                with cls.lock:
+                    cls.data[func.__qualname__] += [{
+                        "duration": tend - tstart,
+                        "error": exc_str,
+                    }]
+
+        return wrapper
+
+    @classmethod
+    def get_results(cls):
+        stats = collections.OrderedDict()
+        for f in cls.data.keys():
+            stats[f] = utils.array_stats([i['duration'] for i in cls.data[f]])
+            errors = []
+            for i in cls.data[f]:
+                if i['error']:
+                    errors.append(i['error'])
+
+            stats[f]['errors'] = errors
+        return stats
+
+    @classmethod
+    def print_results(cls):
+        r = cls.get_results()
+
+        table = prettytable.PrettyTable(title="Duration (s)")
+        table.field_names = [
+            "function", "min", "max", "total",
+            "mean", "median", "std_dev",
+            "max 90%", "min 90%", "count", "errors"]
+        table.float_format = ".4"
+        for f, s in r.items():
+            table.add_row([f, s['min'], s['max'], s['sum'],
+                           s['mean'], s['median'], s['std_dev'],
+                           s['max_90'], s['min_90'],
+                           s['count'], len(s['errors'])])
+        print(table)
diff --git a/qa/workunits/windows/py_tests/internal/utils.py b/qa/workunits/windows/py_tests/internal/utils.py
new file mode 100644 (file)
index 0000000..0fb5d32
--- /dev/null
@@ -0,0 +1,119 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+import collections
+import functools
+import logging
+import math
+import subprocess
+import time
+import typing
+
+from py_tests.internal import exception
+
+LOG = logging.getLogger()
+
+
+def setup_logging(log_level: int = logging.INFO):
+    handler = logging.StreamHandler()
+    handler.setLevel(log_level)
+
+    log_fmt = '[%(asctime)s] %(levelname)s - %(message)s'
+    formatter = logging.Formatter(log_fmt)
+    handler.setFormatter(formatter)
+
+    LOG.addHandler(handler)
+    LOG.setLevel(logging.DEBUG)
+
+
+def retry_decorator(timeout: int = 60,
+                    retry_interval: int = 2,
+                    silent_interval: int = 10,
+                    additional_details: str = "",
+                    retried_exceptions:
+                        typing.Union[
+                            typing.Type[Exception],
+                            collections.abc.Iterable[
+                                typing.Type[Exception]]] = Exception):
+    def wrapper(f: typing.Callable[..., typing.Any]):
+        @functools.wraps(f)
+        def inner(*args, **kwargs):
+            tstart: float = time.time()
+            elapsed: float = 0
+            exc = None
+            details = additional_details or "%s failed" % f.__qualname__
+
+            while elapsed < timeout or not timeout:
+                try:
+                    return f(*args, **kwargs)
+                except retried_exceptions as ex:
+                    exc = ex
+                    elapsed = time.time() - tstart
+                    if elapsed > silent_interval:
+                        level = logging.WARNING
+                    else:
+                        level = logging.DEBUG
+                    LOG.log(level,
+                            "Exception: %s. Additional details: %s. "
+                            "Time elapsed: %d. Timeout: %d",
+                            ex, details, elapsed, timeout)
+
+                    time.sleep(retry_interval)
+                    elapsed = time.time() - tstart
+
+            msg = (
+                "Operation timed out. Exception: %s. Additional details: %s. "
+                "Time elapsed: %d. Timeout: %d.")
+            raise exception.CephTestTimeout(
+                msg % (exc, details, elapsed, timeout))
+        return inner
+    return wrapper
+
+
+def execute(*args, **kwargs):
+    LOG.debug("Executing: %s", args)
+    result = subprocess.run(
+        args,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+        **kwargs)
+    LOG.debug("Command %s returned %d.", args, result.returncode)
+    if result.returncode:
+        exc = exception.CommandFailed(
+            command=args, returncode=result.returncode,
+            stdout=result.stdout, stderr=result.stderr)
+        raise exc
+    return result
+
+
+def ps_execute(*args, **kwargs):
+    # Disable PS progress bar, causes issues when invoked remotely.
+    prefix = "$global:ProgressPreference = 'SilentlyContinue' ; "
+    return execute(
+        "powershell.exe", "-NonInteractive",
+        "-Command", prefix, *args, **kwargs)
+
+
+def array_stats(array: list):
+    mean = sum(array) / len(array) if len(array) else 0
+    variance = (sum((i - mean) ** 2 for i in array) / len(array)
+                if len(array) else 0)
+    std_dev = math.sqrt(variance)
+    sorted_array = sorted(array)
+
+    return {
+        'min': min(array) if len(array) else 0,
+        'max': max(array) if len(array) else 0,
+        'sum': sum(array) if len(array) else 0,
+        'mean': mean,
+        'median': sorted_array[len(array) // 2] if len(array) else 0,
+        'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0,
+        'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0,
+        'variance': variance,
+        'std_dev': std_dev,
+        'count': len(array)
+    }
diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/__init__.py b/qa/workunits/windows/py_tests/rbd_wnbd/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py b/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py
new file mode 100644 (file)
index 0000000..78f9555
--- /dev/null
@@ -0,0 +1,538 @@
+# Copyright (C) 2023 Cloudbase Solutions
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation (see LICENSE).
+
+import argparse
+import collections
+import json
+import logging
+import math
+import os
+import prettytable
+import random
+import time
+import threading
+import typing
+import uuid
+from concurrent import futures
+
+from py_tests.internal import exception
+from py_tests.internal.rbd_image import RbdImage
+from py_tests.internal.tracer import Tracer
+from py_tests.internal import utils
+
+LOG = logging.getLogger()
+
+parser = argparse.ArgumentParser(description='rbd-wnbd stress tests')
+parser.add_argument('--test-name',
+                    help='The test to be run.',
+                    default="RbdFioTest")
+parser.add_argument('--iterations',
+                    help='Total number of test iterations',
+                    default=1, type=int)
+parser.add_argument('--concurrency',
+                    help='The number of tests to run in parallel',
+                    default=4, type=int)
+parser.add_argument('--fio-iterations',
+                    help='Total number of benchmark iterations per disk.',
+                    default=1, type=int)
+parser.add_argument('--fio-workers',
+                    help='Total number of fio workers per disk.',
+                    default=1, type=int)
+parser.add_argument('--fio-depth',
+                    help='The number of concurrent asynchronous operations '
+                         'executed per disk',
+                    default=64, type=int)
+parser.add_argument('--fio-verify',
+                    help='The mechanism used to validate the written '
+                         'data. Examples: crc32c, md5, sha1, null, etc. '
+                         'If set to null, the written data will not be '
+                         'verified.',
+                    default='crc32c')
+parser.add_argument('--bs',
+                    help='Benchmark block size.',
+                    default="2M")
+parser.add_argument('--op',
+                    help='Benchmark operation. '
+                         'Examples: read, randwrite, rw, etc.',
+                    default="rw")
+parser.add_argument('--image-prefix',
+                    help='The image name prefix.',
+                    default="cephTest-")
+parser.add_argument('--image-size-mb',
+                    help='The image size in megabytes.',
+                    default=1024, type=int)
+parser.add_argument('--map-timeout',
+                    help='Image map timeout.',
+                    default=60, type=int)
+parser.add_argument('--skip-enabling-disk', action='store_true',
+                    help='If set, the disk will not be turned online and the '
+                         'read-only flag will not be removed. Useful when '
+                         'the SAN policy is set to "onlineAll".')
+parser.add_argument('--verbose', action='store_true',
+                    help='Print info messages.')
+parser.add_argument('--debug', action='store_true',
+                    help='Print debug messages.')
+parser.add_argument('--stop-on-error', action='store_true',
+                    help='Stop testing when hitting errors.')
+parser.add_argument('--skip-cleanup-on-error', action='store_true',
+                    help='Skip cleanup when hitting errors.')
+
+
+class RbdTest(object):
+    image: RbdImage
+
+    requires_disk_online = False
+    requires_disk_write = False
+
+    def __init__(self,
+                 image_prefix: str = "cephTest-",
+                 image_size_mb: int = 1024,
+                 map_timeout: int = 60,
+                 **kwargs):
+        self.image_size_mb = image_size_mb
+        self.image_name = image_prefix + str(uuid.uuid4())
+        self.map_timeout = map_timeout
+        self.skip_enabling_disk = kwargs.get("skip_enabling_disk")
+
+    @Tracer.trace
+    def initialize(self):
+        self.image = RbdImage.create(
+            self.image_name,
+            self.image_size_mb)
+        self.image.map(timeout=self.map_timeout)
+
+        if not self.skip_enabling_disk:
+            if self.requires_disk_write:
+                self.image.set_writable()
+
+            if self.requires_disk_online:
+                self.image.set_online()
+
+    def run(self):
+        pass
+
+    def cleanup(self):
+        if self.image:
+            self.image.cleanup()
+            self.image = None
+
+    @classmethod
+    def print_results(cls,
+                      title: str = "Test results",
+                      description: str = ''):
+        pass
+
+
+class RbdFsTestMixin(object):
+    # Windows disks must be turned online before accessing partitions.
+    requires_disk_online = True
+    requires_disk_write = True
+
+    @Tracer.trace
+    def initialize(self):
+        super(RbdFsTestMixin, self).initialize()
+
+        self.image.init_fs()
+
+    def get_subpath(self, *args):
+        drive_path = f"{self.image.drive_letter}:\\"
+        return os.path.join(drive_path, *args)
+
+
+class RbdFsTest(RbdFsTestMixin, RbdTest):
+    pass
+
+
+class RbdFioTest(RbdTest):
+    data: typing.DefaultDict[str, typing.List[typing.Dict[str, str]]] = (
+        collections.defaultdict(list))
+    lock = threading.Lock()
+
+    def __init__(self,
+                 *args,
+                 fio_size_mb: int = 0,
+                 iterations: int = 1,
+                 workers: int = 1,
+                 bs: str = "2M",
+                 iodepth: int = 64,
+                 op: str = "rw",
+                 verify: str = "crc32c",
+                 **kwargs):
+
+        super(RbdFioTest, self).__init__(*args, **kwargs)
+
+        self.fio_size_mb = fio_size_mb or self.image_size_mb
+        self.iterations = iterations
+        self.workers = workers
+        self.bs = bs
+        self.iodepth = iodepth
+        self.op = op
+        if op not in ("read", "randread"):
+            self.requires_disk_write = True
+        self.verify = verify
+
+    def process_result(self, raw_fio_output: str):
+        result = json.loads(raw_fio_output)
+        with self.lock:
+            for job in result["jobs"]:
+                # Fio doesn't support trim on Windows
+                for op in ['read', 'write']:
+                    if op in job:
+                        self.data[op].append({
+                            'error': job['error'],
+                            'io_bytes': job[op]['io_bytes'],
+                            'bw_bytes': job[op]['bw_bytes'],
+                            'runtime': job[op]['runtime'] / 1000,  # seconds
+                            'total_ios': job[op]['short_ios'],
+                            'short_ios': job[op]['short_ios'],
+                            'dropped_ios': job[op]['short_ios'],
+                            'clat_ns_min': job[op]['clat_ns']['min'],
+                            'clat_ns_max': job[op]['clat_ns']['max'],
+                            'clat_ns_mean': job[op]['clat_ns']['mean'],
+                            'clat_ns_stddev': job[op]['clat_ns']['stddev'],
+                            'clat_ns_10': job[op].get('clat_ns', {})
+                                                 .get('percentile', {})
+                                                 .get('10.000000', 0),
+                            'clat_ns_90': job[op].get('clat_ns', {})
+                                                 .get('percentile', {})
+                                                 .get('90.000000', 0)
+                        })
+
+    def _get_fio_path(self):
+        return self.image.path
+
+    @Tracer.trace
+    def _run_fio(self, fio_size_mb: int = 0) -> None:
+        LOG.info("Starting FIO test.")
+        cmd = [
+            "fio", "--thread", "--output-format=json",
+            "--randrepeat=%d" % self.iterations,
+            "--direct=1", "--name=test",
+            "--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth,
+            "--size=%sM" % (fio_size_mb or self.fio_size_mb),
+            "--readwrite=%s" % self.op,
+            "--numjobs=%s" % self.workers,
+            "--filename=%s" % self._get_fio_path(),
+        ]
+        if self.verify:
+            cmd += ["--verify=%s" % self.verify]
+        result = utils.execute(*cmd)
+        LOG.info("Completed FIO test.")
+        self.process_result(result.stdout)
+
+    @Tracer.trace
+    def run(self):
+        self._run_fio()
+
+    @classmethod
+    def print_results(cls,
+                      title: str = "Benchmark results",
+                      description: str = ''):
+        if description:
+            title = "%s (%s)" % (title, description)
+
+        for op in cls.data.keys():
+            op_title = "%s op=%s" % (title, op)
+
+            table = prettytable.PrettyTable(title=op_title)
+            table.field_names = ["stat", "min", "max", "mean",
+                                 "median", "std_dev",
+                                 "max 90%", "min 90%", "total"]
+            table.float_format = ".4"
+
+            op_data = cls.data[op]
+
+            s = utils.array_stats(
+                [float(i["bw_bytes"]) / 1000_000 for i in op_data])
+            table.add_row(["bandwidth (MB/s)",
+                           s['min'], s['max'], s['mean'],
+                           s['median'], s['std_dev'],
+                           s['max_90'], s['min_90'], 'N/A'])
+
+            s = utils.array_stats([float(i["runtime"]) for i in op_data])
+            table.add_row(["duration (s)",
+                          s['min'], s['max'], s['mean'],
+                          s['median'], s['std_dev'],
+                          s['max_90'], s['min_90'], s['sum']])
+
+            s = utils.array_stats([i["error"] for i in op_data])
+            table.add_row(["errors",
+                           s['min'], s['max'], s['mean'],
+                           s['median'], s['std_dev'],
+                           s['max_90'], s['min_90'], s['sum']])
+
+            s = utils.array_stats([i["short_ios"] for i in op_data])
+            table.add_row(["incomplete IOs",
+                           s['min'], s['max'], s['mean'],
+                           s['median'], s['std_dev'],
+                           s['max_90'], s['min_90'], s['sum']])
+
+            s = utils.array_stats([i["dropped_ios"] for i in op_data])
+            table.add_row(["dropped IOs",
+                           s['min'], s['max'], s['mean'],
+                           s['median'], s['std_dev'],
+                           s['max_90'], s['min_90'], s['sum']])
+
+            clat_min = utils.array_stats([i["clat_ns_min"] for i in op_data])
+            clat_max = utils.array_stats([i["clat_ns_max"] for i in op_data])
+            clat_mean = utils.array_stats([i["clat_ns_mean"] for i in op_data])
+            clat_stddev = math.sqrt(
+                sum([float(i["clat_ns_stddev"]) ** 2
+                     for i in op_data]) / len(op_data)
+                if len(op_data) else 0)
+            clat_10 = utils.array_stats([i["clat_ns_10"] for i in op_data])
+            clat_90 = utils.array_stats([i["clat_ns_90"] for i in op_data])
+            # For convenience, we'll convert it from ns to seconds.
+            table.add_row(["completion latency (s)",
+                           clat_min['min'] / 1e+9,
+                           clat_max['max'] / 1e+9,
+                           clat_mean['mean'] / 1e+9,
+                           clat_mean['median'] / 1e+9,
+                           clat_stddev / 1e+9,
+                           clat_10['mean'] / 1e+9,
+                           clat_90['mean'] / 1e+9,
+                           clat_mean['sum'] / 1e+9])
+            print(table)
+
+
+class RbdResizeFioTest(RbdFioTest):
+    """Image resize test.
+
+    This test extends and then shrinks the image, performing FIO tests to
+    validate the resized image.
+    """
+
+    @Tracer.trace
+    def run(self):
+        self.image.resize(self.image_size_mb * 2)
+        self.image.wait_for_disk_resize()
+
+        self._run_fio(fio_size_mb=self.image_size_mb * 2)
+
+        self.image.resize(self.image_size_mb // 2, allow_shrink=True)
+        self.image.wait_for_disk_resize()
+
+        self._run_fio(fio_size_mb=self.image_size_mb // 2)
+
+        # Just like rbd-nbd, rbd-wnbd is masking out-of-bounds errors.
+        # For this reason, we don't have a negative test that writes
+        # passed the disk boundary.
+
+
+class RbdFsFioTest(RbdFsTestMixin, RbdFioTest):
+    def initialize(self):
+        super(RbdFsFioTest, self).initialize()
+
+        if not self.fio_size_mb or self.fio_size_mb == self.image_size_mb:
+            # Out of caution, we'll use up to 80% of the FS by default
+            self.fio_size_mb = int(
+                self.image.get_fs_capacity() * 0.8 / (1024 * 1024))
+
+    @staticmethod
+    def _fio_escape_path(path):
+        # FIO allows specifying multiple files separated by colon.
+        # This means that ":" has to be escaped, so
+        # F:\filename becomes F\:\filename.
+        return path.replace(":", "\\:")
+
+    def _get_fio_path(self):
+        return self._fio_escape_path(self.get_subpath("test-fio"))
+
+
+class RbdStampTest(RbdTest):
+    requires_disk_write = True
+
+    _write_open_mode = "rb+"
+    _read_open_mode = "rb"
+    _expect_path_exists = True
+    _stamp_size = 512
+
+    def __init__(self, *args, **kwargs):
+        super(RbdStampTest, self).__init__(*args, **kwargs)
+
+        # We allow running the test repeatedly, for example after a
+        # remount operation.
+        self._previous_stamp = b'\0' * self._stamp_size
+
+    @staticmethod
+    def _rand_float(min_val: float, max_val: float):
+        return min_val + (random.random() * max_val - min_val)
+
+    def _get_stamp(self):
+        buff_str = self.image_name + "-" + str(uuid.uuid4())
+        buff = buff_str.encode()
+        assert len(buff) <= self._stamp_size
+
+        padding = self._stamp_size - len(buff)
+        buff += b'\0' * padding
+        return buff
+
+    def _get_stamp_path(self):
+        return self.image.path
+
+    @Tracer.trace
+    def _write_stamp(self, stamp):
+        with open(self._get_stamp_path(), self._write_open_mode) as disk:
+            disk.write(stamp)
+
+    @Tracer.trace
+    def _read_stamp(self):
+        with open(self._get_stamp_path(), self._read_open_mode) as disk:
+            return disk.read(self._stamp_size)
+
+    @Tracer.trace
+    def run(self):
+        if self._expect_path_exists:
+            # Wait up to 5 seconds and then check the disk, ensuring that
+            # nobody else wrote to it. This is particularly useful when
+            # running a high number of tests in parallel, ensuring that
+            # we aren't writing to the wrong disk.
+            time.sleep(self._rand_float(0, 5))
+
+            stamp = self._read_stamp()
+            assert self._previous_stamp == stamp
+
+        w_stamp = self._get_stamp()
+        self._write_stamp(w_stamp)
+
+        r_stamp = self._read_stamp()
+        assert w_stamp == r_stamp
+
+        self._previous_stamp = w_stamp
+
+
+class RbdFsStampTest(RbdFsTestMixin, RbdStampTest):
+    _write_open_mode = "wb"
+    _expect_path_exists = False
+
+    def _get_stamp_path(self):
+        return self.get_subpath("test-stamp")
+
+
+class TestRunner(object):
+    def __init__(self,
+                 test_cls: typing.Type[RbdTest],
+                 test_params: dict = {},
+                 iterations: int = 1,
+                 workers: int = 1,
+                 stop_on_error: bool = False,
+                 cleanup_on_error: bool = True):
+        self.test_cls = test_cls
+        self.test_params = test_params
+        self.iterations = iterations
+        self.workers = workers
+        self.executor = futures.ThreadPoolExecutor(max_workers=workers)
+        self.lock = threading.Lock()
+        self.completed = 0
+        self.errors = 0
+        self.stopped = False
+        self.stop_on_error = stop_on_error
+        self.cleanup_on_error = cleanup_on_error
+
+    @Tracer.trace
+    def run(self):
+        tasks = []
+        for i in range(self.iterations):
+            task = self.executor.submit(self.run_single_test)
+            tasks.append(task)
+
+        LOG.info("Waiting for %d tests to complete.", self.iterations)
+        for task in tasks:
+            task.result()
+
+    def run_single_test(self):
+        failed = False
+        if self.stopped:
+            return
+
+        try:
+            test = self.test_cls(**self.test_params)
+            test.initialize()
+            test.run()
+        except KeyboardInterrupt:
+            LOG.warning("Received Ctrl-C.")
+            self.stopped = True
+        except Exception as ex:
+            failed = True
+            if self.stop_on_error:
+                self.stopped = True
+            with self.lock:
+                self.errors += 1
+                LOG.exception(
+                    "Test exception: %s. Total exceptions: %d",
+                    ex, self.errors)
+        finally:
+            if not failed or self.cleanup_on_error:
+                try:
+                    test.cleanup()
+                except KeyboardInterrupt:
+                    LOG.warning("Received Ctrl-C.")
+                    self.stopped = True
+                    # Retry the cleanup
+                    test.cleanup()
+                except Exception:
+                    LOG.exception("Test cleanup failed.")
+
+            with self.lock:
+                self.completed += 1
+                LOG.info("Completed tests: %d. Pending: %d",
+                         self.completed, self.iterations - self.completed)
+
+
+TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
+    'RbdTest': RbdTest,
+    'RbdFioTest': RbdFioTest,
+    'RbdResizeFioTest': RbdResizeFioTest,
+    'RbdStampTest': RbdStampTest,
+    # FS tests
+    'RbdFsTest': RbdFsTest,
+    'RbdFsFioTest': RbdFsFioTest,
+    'RbdFsStampTest': RbdFsStampTest,
+}
+
+if __name__ == '__main__':
+    args = parser.parse_args()
+
+    log_level = logging.WARNING
+    if args.verbose:
+        log_level = logging.INFO
+    if args.debug:
+        log_level = logging.DEBUG
+    utils.setup_logging(log_level)
+
+    test_params = dict(
+        image_size_mb=args.image_size_mb,
+        image_prefix=args.image_prefix,
+        bs=args.bs,
+        op=args.op,
+        verify=args.fio_verify,
+        iodepth=args.fio_depth,
+        map_timeout=args.map_timeout,
+        skip_enabling_disk=args.skip_enabling_disk,
+    )
+
+    try:
+        test_cls = TESTS[args.test_name]
+    except KeyError:
+        raise exception.CephTestException(
+            "Unknown test: {}".format(args.test_name))
+
+    runner = TestRunner(
+        test_cls,
+        test_params=test_params,
+        iterations=args.iterations,
+        workers=args.concurrency,
+        stop_on_error=args.stop_on_error,
+        cleanup_on_error=not args.skip_cleanup_on_error)
+    runner.run()
+
+    Tracer.print_results()
+    test_cls.print_results(
+        description="count: %d, concurrency: %d" %
+        (args.iterations, args.concurrency))
+
+    assert runner.errors == 0, f"encountered {runner.errors} error(s)."
index 6d818f4267ecfe3bdd1009fd61d11ba2a0a22c30..7f44e87acf6c3a8d8aa4c59ead44ebc6da56c1a4 100644 (file)
@@ -4,7 +4,7 @@ $ErrorActionPreference = "Stop"
 $scriptLocation = [System.IO.Path]::GetDirectoryName(
     $myInvocation.MyCommand.Definition)
 
-$testRbdWnbd = "$scriptLocation/test_rbd_wnbd.py"
+$env:PYTHONPATH += ";$scriptLocation"
 
 function safe_exec() {
     # Powershell doesn't check the command exit code, we'll need to
@@ -16,14 +16,13 @@ function safe_exec() {
     }
 }
 
-safe_exec python.exe $testRbdWnbd --test-name RbdTest --iterations 100
-safe_exec python.exe $testRbdWnbd --test-name RbdFioTest --iterations 100
-safe_exec python.exe $testRbdWnbd --test-name RbdStampTest --iterations 100
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdTest --iterations 100
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFioTest --iterations 100
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdStampTest --iterations 100
 
 # It can take a while to setup the partition (~10s), we'll use fewer iterations.
-safe_exec python.exe $testRbdWnbd --test-name RbdFsTest --iterations 4
-safe_exec python.exe $testRbdWnbd --test-name RbdFsFioTest --iterations 4
-safe_exec python.exe $testRbdWnbd --test-name RbdFsStampTest --iterations 4
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsTest --iterations 4
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsFioTest --iterations 4
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsStampTest --iterations 4
 
-safe_exec python.exe $testRbdWnbd `
-    --test-name RbdResizeFioTest --image-size-mb 64
+safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdResizeFioTest --image-size-mb 64
diff --git a/qa/workunits/windows/test_rbd_wnbd.py b/qa/workunits/windows/test_rbd_wnbd.py
deleted file mode 100644 (file)
index 41d255a..0000000
+++ /dev/null
@@ -1,919 +0,0 @@
-import argparse
-import collections
-import functools
-import json
-import logging
-import math
-import os
-import prettytable
-import random
-import subprocess
-import time
-import threading
-import typing
-import uuid
-from concurrent import futures
-
-LOG = logging.getLogger()
-
-parser = argparse.ArgumentParser(description='rbd-wnbd tests')
-parser.add_argument('--test-name',
-                    help='The test to be run.',
-                    default="RbdFioTest")
-parser.add_argument('--iterations',
-                    help='Total number of test iterations',
-                    default=1, type=int)
-parser.add_argument('--concurrency',
-                    help='The number of tests to run in parallel',
-                    default=4, type=int)
-parser.add_argument('--fio-iterations',
-                    help='Total number of benchmark iterations per disk.',
-                    default=1, type=int)
-parser.add_argument('--fio-workers',
-                    help='Total number of fio workers per disk.',
-                    default=1, type=int)
-parser.add_argument('--fio-depth',
-                    help='The number of concurrent asynchronous operations '
-                         'executed per disk',
-                    default=64, type=int)
-parser.add_argument('--fio-verify',
-                    help='The mechanism used to validate the written '
-                         'data. Examples: crc32c, md5, sha1, null, etc. '
-                         'If set to null, the written data will not be '
-                         'verified.',
-                    default='crc32c')
-parser.add_argument('--bs',
-                    help='Benchmark block size.',
-                    default="2M")
-parser.add_argument('--op',
-                    help='Benchmark operation. '
-                         'Examples: read, randwrite, rw, etc.',
-                    default="rw")
-parser.add_argument('--image-prefix',
-                    help='The image name prefix.',
-                    default="cephTest-")
-parser.add_argument('--image-size-mb',
-                    help='The image size in megabytes.',
-                    default=1024, type=int)
-parser.add_argument('--map-timeout',
-                    help='Image map timeout.',
-                    default=60, type=int)
-parser.add_argument('--skip-enabling-disk', action='store_true',
-                    help='If set, the disk will not be turned online and the '
-                         'read-only flag will not be removed. Useful when '
-                         'the SAN policy is set to "onlineAll".')
-parser.add_argument('--verbose', action='store_true',
-                    help='Print info messages.')
-parser.add_argument('--debug', action='store_true',
-                    help='Print debug messages.')
-parser.add_argument('--stop-on-error', action='store_true',
-                    help='Stop testing when hitting errors.')
-parser.add_argument('--skip-cleanup-on-error', action='store_true',
-                    help='Skip cleanup when hitting errors.')
-
-
-class CephTestException(Exception):
-    msg_fmt = "An exception has been encountered."
-
-    def __init__(self, message: str = '', **kwargs):
-        self.kwargs = kwargs
-        if not message:
-            message = self.msg_fmt % kwargs
-        self.message = message
-        super(CephTestException, self).__init__(message)
-
-
-class CommandFailed(CephTestException):
-    msg_fmt = (
-        "Command failed: %(command)s. "
-        "Return code: %(returncode)s. "
-        "Stdout: %(stdout)s. Stderr: %(stderr)s.")
-
-
-class CephTestTimeout(CephTestException):
-    msg_fmt = "Operation timeout."
-
-
-def setup_logging(log_level: int = logging.INFO):
-    handler = logging.StreamHandler()
-    handler.setLevel(log_level)
-
-    log_fmt = '[%(asctime)s] %(levelname)s - %(message)s'
-    formatter = logging.Formatter(log_fmt)
-    handler.setFormatter(formatter)
-
-    LOG.addHandler(handler)
-    LOG.setLevel(logging.DEBUG)
-
-
-def retry_decorator(timeout: int = 60,
-                    retry_interval: int = 2,
-                    silent_interval: int = 10,
-                    additional_details: str = "",
-                    retried_exceptions:
-                        typing.Union[
-                            typing.Type[Exception],
-                            collections.abc.Iterable[
-                                typing.Type[Exception]]] = Exception):
-    def wrapper(f: typing.Callable[..., typing.Any]):
-        @functools.wraps(f)
-        def inner(*args, **kwargs):
-            tstart: float = time.time()
-            elapsed: float = 0
-            exc = None
-            details = additional_details or "%s failed" % f.__qualname__
-
-            while elapsed < timeout or not timeout:
-                try:
-                    return f(*args, **kwargs)
-                except retried_exceptions as ex:
-                    exc = ex
-                    elapsed = time.time() - tstart
-                    if elapsed > silent_interval:
-                        level = logging.WARNING
-                    else:
-                        level = logging.DEBUG
-                    LOG.log(level,
-                            "Exception: %s. Additional details: %s. "
-                            "Time elapsed: %d. Timeout: %d",
-                            ex, details, elapsed, timeout)
-
-                    time.sleep(retry_interval)
-                    elapsed = time.time() - tstart
-
-            msg = (
-                "Operation timed out. Exception: %s. Additional details: %s. "
-                "Time elapsed: %d. Timeout: %d.")
-            raise CephTestTimeout(
-                msg % (exc, details, elapsed, timeout))
-        return inner
-    return wrapper
-
-
-def execute(*args, **kwargs):
-    LOG.debug("Executing: %s", args)
-    result = subprocess.run(
-        args,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        **kwargs)
-    LOG.debug("Command %s returned %d.", args, result.returncode)
-    if result.returncode:
-        exc = CommandFailed(
-            command=args, returncode=result.returncode,
-            stdout=result.stdout, stderr=result.stderr)
-        raise exc
-    return result
-
-
-def ps_execute(*args, **kwargs):
-    # Disable PS progress bar, causes issues when invoked remotely.
-    prefix = "$global:ProgressPreference = 'SilentlyContinue' ; "
-    return execute(
-        "powershell.exe", "-NonInteractive",
-        "-Command", prefix, *args, **kwargs)
-
-
-def array_stats(array: list):
-    mean = sum(array) / len(array) if len(array) else 0
-    variance = (sum((i - mean) ** 2 for i in array) / len(array)
-                if len(array) else 0)
-    std_dev = math.sqrt(variance)
-    sorted_array = sorted(array)
-
-    return {
-        'min': min(array) if len(array) else 0,
-        'max': max(array) if len(array) else 0,
-        'sum': sum(array) if len(array) else 0,
-        'mean': mean,
-        'median': sorted_array[len(array) // 2] if len(array) else 0,
-        'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0,
-        'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0,
-        'variance': variance,
-        'std_dev': std_dev,
-        'count': len(array)
-    }
-
-
-class Tracer:
-    data: collections.OrderedDict = collections.OrderedDict()
-    lock = threading.Lock()
-
-    @classmethod
-    def trace(cls, func):
-        def wrapper(*args, **kwargs):
-            tstart = time.time()
-            exc_str = None
-
-            # Preserve call order
-            with cls.lock:
-                if func.__qualname__ not in cls.data:
-                    cls.data[func.__qualname__] = list()
-
-            try:
-                return func(*args, **kwargs)
-            except Exception as exc:
-                exc_str = str(exc)
-                raise
-            finally:
-                tend = time.time()
-
-                with cls.lock:
-                    cls.data[func.__qualname__] += [{
-                        "duration": tend - tstart,
-                        "error": exc_str,
-                    }]
-
-        return wrapper
-
-    @classmethod
-    def get_results(cls):
-        stats = collections.OrderedDict()
-        for f in cls.data.keys():
-            stats[f] = array_stats([i['duration'] for i in cls.data[f]])
-            errors = []
-            for i in cls.data[f]:
-                if i['error']:
-                    errors.append(i['error'])
-
-            stats[f]['errors'] = errors
-        return stats
-
-    @classmethod
-    def print_results(cls):
-        r = cls.get_results()
-
-        table = prettytable.PrettyTable(title="Duration (s)")
-        table.field_names = [
-            "function", "min", "max", "total",
-            "mean", "median", "std_dev",
-            "max 90%", "min 90%", "count", "errors"]
-        table.float_format = ".4"
-        for f, s in r.items():
-            table.add_row([f, s['min'], s['max'], s['sum'],
-                           s['mean'], s['median'], s['std_dev'],
-                           s['max_90'], s['min_90'],
-                           s['count'], len(s['errors'])])
-        print(table)
-
-
-class RbdImage(object):
-    def __init__(self,
-                 name: str,
-                 size_mb: int,
-                 is_shared: bool = True,
-                 disk_number: int = -1,
-                 mapped: bool = False):
-        self.name = name
-        self.size_mb = size_mb
-        self.is_shared = is_shared
-        self.disk_number = disk_number
-        self.mapped = mapped
-        self.removed = False
-        self.drive_letter = ""
-
-    @classmethod
-    @Tracer.trace
-    def create(cls,
-               name: str,
-               size_mb: int = 1024,
-               is_shared: bool = True):
-        LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb)
-        cmd = ["rbd", "create", name, "--size", "%sM" % size_mb]
-        if is_shared:
-            cmd += ["--image-shared"]
-        execute(*cmd)
-
-        return RbdImage(name, size_mb, is_shared)
-
-    @Tracer.trace
-    def get_disk_number(self,
-                        timeout: int = 60,
-                        retry_interval: int = 2):
-        @retry_decorator(
-            retried_exceptions=CephTestException,
-            timeout=timeout,
-            retry_interval=retry_interval)
-        def _get_disk_number():
-            LOG.info("Retrieving disk number: %s", self.name)
-
-            result = execute("rbd-wnbd", "show", self.name, "--format=json")
-            disk_info = json.loads(result.stdout)
-            disk_number = disk_info["disk_number"]
-            if disk_number > 0:
-                LOG.debug("Image %s disk number: %d", self.name, disk_number)
-                return disk_number
-
-            raise CephTestException(
-                f"Could not get disk number: {self.name}.")
-
-        return _get_disk_number()
-
-    @Tracer.trace
-    def _wait_for_disk(self,
-                       timeout: int = 60,
-                       retry_interval: int = 2):
-        @retry_decorator(
-            retried_exceptions=(FileNotFoundError, OSError),
-            additional_details="the mapped disk isn't available yet",
-            timeout=timeout,
-            retry_interval=retry_interval)
-        def wait_for_disk():
-            LOG.debug("Waiting for disk to be accessible: %s %s",
-                      self.name, self.path)
-
-            with open(self.path, 'rb'):
-                pass
-
-        return wait_for_disk()
-
-    @property
-    def path(self):
-        return f"\\\\.\\PhysicalDrive{self.disk_number}"
-
-    @Tracer.trace
-    @retry_decorator(additional_details="couldn't clear disk read-only flag")
-    def set_writable(self):
-        ps_execute(
-            "Set-Disk", "-Number", str(self.disk_number),
-            "-IsReadOnly", "$false")
-
-    @Tracer.trace
-    @retry_decorator(additional_details="couldn't bring the disk online")
-    def set_online(self):
-        ps_execute(
-            "Set-Disk", "-Number", str(self.disk_number),
-            "-IsOffline", "$false")
-
-    @Tracer.trace
-    def map(self, timeout: int = 60):
-        LOG.info("Mapping image: %s", self.name)
-        tstart = time.time()
-
-        execute("rbd-wnbd", "map", self.name)
-        self.mapped = True
-
-        self.disk_number = self.get_disk_number(timeout=timeout)
-
-        elapsed = time.time() - tstart
-        self._wait_for_disk(timeout=timeout - elapsed)
-
-    @Tracer.trace
-    def unmap(self):
-        if self.mapped:
-            LOG.info("Unmapping image: %s", self.name)
-            execute("rbd-wnbd", "unmap", self.name)
-            self.mapped = False
-
-    @Tracer.trace
-    @retry_decorator()
-    def remove(self):
-        if not self.removed:
-            LOG.info("Removing image: %s", self.name)
-            execute("rbd", "rm", self.name)
-            self.removed = True
-
-    def cleanup(self):
-        try:
-            self.unmap()
-        finally:
-            self.remove()
-
-    @Tracer.trace
-    @retry_decorator()
-    def _init_disk(self):
-        cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk"
-        ps_execute(cmd)
-
-    @Tracer.trace
-    @retry_decorator()
-    def _create_partition(self):
-        cmd = (f"Get-Disk -Number {self.disk_number} | "
-               "New-Partition -AssignDriveLetter -UseMaximumSize")
-        ps_execute(cmd)
-
-    @Tracer.trace
-    @retry_decorator()
-    def _format_volume(self):
-        cmd = (
-            f"(Get-Partition -DiskNumber {self.disk_number}"
-            " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false")
-        ps_execute(cmd)
-
-    @Tracer.trace
-    @retry_decorator()
-    def _get_drive_letter(self):
-        cmd = (f"(Get-Partition -DiskNumber {self.disk_number}"
-               " | ? { $_.DriveLetter }).DriveLetter")
-        result = ps_execute(cmd)
-
-        # The PowerShell command will place a null character if no drive letter
-        # is available. For example, we can receive "\x00\r\n".
-        self.drive_letter = result.stdout.decode().strip()
-        if not self.drive_letter.isalpha() or len(self.drive_letter) != 1:
-            raise CephTestException(
-                "Invalid drive letter received: %s" % self.drive_letter)
-
-    @Tracer.trace
-    def init_fs(self):
-        if not self.mapped:
-            raise CephTestException("Unable to create fs, image not mapped.")
-
-        LOG.info("Initializing fs, image: %s.", self.name)
-
-        self._init_disk()
-        self._create_partition()
-        self._format_volume()
-        self._get_drive_letter()
-
-    @Tracer.trace
-    def get_fs_capacity(self):
-        if not self.drive_letter:
-            raise CephTestException("No drive letter available")
-
-        cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size"
-        result = ps_execute(cmd)
-
-        return int(result.stdout.decode().strip())
-
-    @Tracer.trace
-    def resize(self, new_size_mb, allow_shrink=False):
-        LOG.info(
-            "Resizing image: %s. New size: %s MB, old size: %s MB",
-            self.name, new_size_mb, self.size_mb)
-
-        cmd = ["rbd", "resize", self.name,
-               "--size", f"{new_size_mb}M", "--no-progress"]
-        if allow_shrink:
-            cmd.append("--allow-shrink")
-
-        execute(*cmd)
-
-        self.size_mb = new_size_mb
-
-    @Tracer.trace
-    def get_disk_size(self):
-        """Retrieve the virtual disk size (bytes) reported by Windows."""
-        cmd = f"(Get-Disk -Number {self.disk_number}).Size"
-        result = ps_execute(cmd)
-
-        disk_size = result.stdout.decode().strip()
-        if not disk_size.isdigit():
-            raise CephTestException(
-                "Invalid disk size received: %s" % disk_size)
-
-        return int(disk_size)
-
-    @Tracer.trace
-    @retry_decorator(timeout=30)
-    def wait_for_disk_resize(self):
-        # After resizing the rbd image, the daemon is expected to receive
-        # the notification, inform the WNBD driver and then trigger a disk
-        # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds,
-        # so we'll need to do some polling.
-        disk_size = self.get_disk_size()
-        disk_size_mb = disk_size // (1 << 20)
-
-        if disk_size_mb != self.size_mb:
-            raise CephTestException(
-                "The disk size hasn't been updated yet. Retrieved size: "
-                f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.")
-
-
-class RbdTest(object):
-    image: RbdImage
-
-    requires_disk_online = False
-    requires_disk_write = False
-
-    def __init__(self,
-                 image_prefix: str = "cephTest-",
-                 image_size_mb: int = 1024,
-                 map_timeout: int = 60,
-                 **kwargs):
-        self.image_size_mb = image_size_mb
-        self.image_name = image_prefix + str(uuid.uuid4())
-        self.map_timeout = map_timeout
-        self.skip_enabling_disk = kwargs.get("skip_enabling_disk")
-
-    @Tracer.trace
-    def initialize(self):
-        self.image = RbdImage.create(
-            self.image_name,
-            self.image_size_mb)
-        self.image.map(timeout=self.map_timeout)
-
-        if not self.skip_enabling_disk:
-            if self.requires_disk_write:
-                self.image.set_writable()
-
-            if self.requires_disk_online:
-                self.image.set_online()
-
-    def run(self):
-        pass
-
-    def cleanup(self):
-        if self.image:
-            self.image.cleanup()
-
-    @classmethod
-    def print_results(cls,
-                      title: str = "Test results",
-                      description: str = ''):
-        pass
-
-
-class RbdFsTestMixin(object):
-    # Windows disks must be turned online before accessing partitions.
-    requires_disk_online = True
-    requires_disk_write = True
-
-    @Tracer.trace
-    def initialize(self):
-        super(RbdFsTestMixin, self).initialize()
-
-        self.image.init_fs()
-
-    def get_subpath(self, *args):
-        drive_path = f"{self.image.drive_letter}:\\"
-        return os.path.join(drive_path, *args)
-
-
-class RbdFsTest(RbdFsTestMixin, RbdTest):
-    pass
-
-
-class RbdFioTest(RbdTest):
-    data: typing.DefaultDict[str, typing.List[typing.Dict[str, str]]] = (
-        collections.defaultdict(list))
-    lock = threading.Lock()
-
-    def __init__(self,
-                 *args,
-                 fio_size_mb: int = 0,
-                 iterations: int = 1,
-                 workers: int = 1,
-                 bs: str = "2M",
-                 iodepth: int = 64,
-                 op: str = "rw",
-                 verify: str = "crc32c",
-                 **kwargs):
-
-        super(RbdFioTest, self).__init__(*args, **kwargs)
-
-        self.fio_size_mb = fio_size_mb or self.image_size_mb
-        self.iterations = iterations
-        self.workers = workers
-        self.bs = bs
-        self.iodepth = iodepth
-        self.op = op
-        if op not in ("read", "randread"):
-            self.requires_disk_write = True
-        self.verify = verify
-
-    def process_result(self, raw_fio_output: str):
-        result = json.loads(raw_fio_output)
-        with self.lock:
-            for job in result["jobs"]:
-                # Fio doesn't support trim on Windows
-                for op in ['read', 'write']:
-                    if op in job:
-                        self.data[op].append({
-                            'error': job['error'],
-                            'io_bytes': job[op]['io_bytes'],
-                            'bw_bytes': job[op]['bw_bytes'],
-                            'runtime': job[op]['runtime'] / 1000,  # seconds
-                            'total_ios': job[op]['short_ios'],
-                            'short_ios': job[op]['short_ios'],
-                            'dropped_ios': job[op]['short_ios'],
-                            'clat_ns_min': job[op]['clat_ns']['min'],
-                            'clat_ns_max': job[op]['clat_ns']['max'],
-                            'clat_ns_mean': job[op]['clat_ns']['mean'],
-                            'clat_ns_stddev': job[op]['clat_ns']['stddev'],
-                            'clat_ns_10': job[op].get('clat_ns', {})
-                                                 .get('percentile', {})
-                                                 .get('10.000000', 0),
-                            'clat_ns_90': job[op].get('clat_ns', {})
-                                                 .get('percentile', {})
-                                                 .get('90.000000', 0)
-                        })
-
-    def _get_fio_path(self):
-        return self.image.path
-
-    @Tracer.trace
-    def _run_fio(self, fio_size_mb: int = 0) -> None:
-        LOG.info("Starting FIO test.")
-        cmd = [
-            "fio", "--thread", "--output-format=json",
-            "--randrepeat=%d" % self.iterations,
-            "--direct=1", "--name=test",
-            "--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth,
-            "--size=%sM" % (fio_size_mb or self.fio_size_mb),
-            "--readwrite=%s" % self.op,
-            "--numjobs=%s" % self.workers,
-            "--filename=%s" % self._get_fio_path(),
-        ]
-        if self.verify:
-            cmd += ["--verify=%s" % self.verify]
-        result = execute(*cmd)
-        LOG.info("Completed FIO test.")
-        self.process_result(result.stdout)
-
-    @Tracer.trace
-    def run(self):
-        self._run_fio()
-
-    @classmethod
-    def print_results(cls,
-                      title: str = "Benchmark results",
-                      description: str = ''):
-        if description:
-            title = "%s (%s)" % (title, description)
-
-        for op in cls.data.keys():
-            op_title = "%s op=%s" % (title, op)
-
-            table = prettytable.PrettyTable(title=op_title)
-            table.field_names = ["stat", "min", "max", "mean",
-                                 "median", "std_dev",
-                                 "max 90%", "min 90%", "total"]
-            table.float_format = ".4"
-
-            op_data = cls.data[op]
-
-            s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in op_data])
-            table.add_row(["bandwidth (MB/s)",
-                           s['min'], s['max'], s['mean'],
-                           s['median'], s['std_dev'],
-                           s['max_90'], s['min_90'], 'N/A'])
-
-            s = array_stats([float(i["runtime"]) for i in op_data])
-            table.add_row(["duration (s)",
-                          s['min'], s['max'], s['mean'],
-                          s['median'], s['std_dev'],
-                          s['max_90'], s['min_90'], s['sum']])
-
-            s = array_stats([i["error"] for i in op_data])
-            table.add_row(["errors",
-                           s['min'], s['max'], s['mean'],
-                           s['median'], s['std_dev'],
-                           s['max_90'], s['min_90'], s['sum']])
-
-            s = array_stats([i["short_ios"] for i in op_data])
-            table.add_row(["incomplete IOs",
-                           s['min'], s['max'], s['mean'],
-                           s['median'], s['std_dev'],
-                           s['max_90'], s['min_90'], s['sum']])
-
-            s = array_stats([i["dropped_ios"] for i in op_data])
-            table.add_row(["dropped IOs",
-                           s['min'], s['max'], s['mean'],
-                           s['median'], s['std_dev'],
-                           s['max_90'], s['min_90'], s['sum']])
-
-            clat_min = array_stats([i["clat_ns_min"] for i in op_data])
-            clat_max = array_stats([i["clat_ns_max"] for i in op_data])
-            clat_mean = array_stats([i["clat_ns_mean"] for i in op_data])
-            clat_stddev = math.sqrt(
-                sum([float(i["clat_ns_stddev"]) ** 2 for i in op_data]) / len(op_data)
-                if len(op_data) else 0)
-            clat_10 = array_stats([i["clat_ns_10"] for i in op_data])
-            clat_90 = array_stats([i["clat_ns_90"] for i in op_data])
-            # For convenience, we'll convert it from ns to seconds.
-            table.add_row(["completion latency (s)",
-                           clat_min['min'] / 1e+9,
-                           clat_max['max'] / 1e+9,
-                           clat_mean['mean'] / 1e+9,
-                           clat_mean['median'] / 1e+9,
-                           clat_stddev / 1e+9,
-                           clat_10['mean'] / 1e+9,
-                           clat_90['mean'] / 1e+9,
-                           clat_mean['sum'] / 1e+9])
-            print(table)
-
-
-class RbdResizeFioTest(RbdFioTest):
-    """Image resize test.
-
-    This test extends and then shrinks the image, performing FIO tests to
-    validate the resized image.
-    """
-
-    @Tracer.trace
-    def run(self):
-        self.image.resize(self.image_size_mb * 2)
-        self.image.wait_for_disk_resize()
-
-        self._run_fio(fio_size_mb=self.image_size_mb * 2)
-
-        self.image.resize(self.image_size_mb // 2, allow_shrink=True)
-        self.image.wait_for_disk_resize()
-
-        self._run_fio(fio_size_mb=self.image_size_mb // 2)
-
-        # Just like rbd-nbd, rbd-wnbd is masking out-of-bounds errors.
-        # For this reason, we don't have a negative test that writes
-        # passed the disk boundary.
-
-
-class RbdFsFioTest(RbdFsTestMixin, RbdFioTest):
-    def initialize(self):
-        super(RbdFsFioTest, self).initialize()
-
-        if not self.fio_size_mb or self.fio_size_mb == self.image_size_mb:
-            # Out of caution, we'll use up to 80% of the FS by default
-            self.fio_size_mb = int(
-                self.image.get_fs_capacity() * 0.8 / (1024 * 1024))
-
-    @staticmethod
-    def _fio_escape_path(path):
-        # FIO allows specifying multiple files separated by colon.
-        # This means that ":" has to be escaped, so
-        # F:\filename becomes F\:\filename.
-        return path.replace(":", "\\:")
-
-    def _get_fio_path(self):
-        return self._fio_escape_path(self.get_subpath("test-fio"))
-
-
-class RbdStampTest(RbdTest):
-    requires_disk_write = True
-
-    _write_open_mode = "rb+"
-    _read_open_mode = "rb"
-    _expect_path_exists = True
-
-    @staticmethod
-    def _rand_float(min_val: float, max_val: float):
-        return min_val + (random.random() * max_val - min_val)
-
-    def _get_stamp(self):
-        buff = self.image_name.encode()
-        padding = 512 - len(buff)
-        buff += b'\0' * padding
-        return buff
-
-    def _get_stamp_path(self):
-        return self.image.path
-
-    @Tracer.trace
-    def _write_stamp(self):
-        with open(self._get_stamp_path(), self._write_open_mode) as disk:
-            stamp = self._get_stamp()
-            disk.write(stamp)
-
-    @Tracer.trace
-    def _read_stamp(self):
-        with open(self._get_stamp_path(), self._read_open_mode) as disk:
-            return disk.read(len(self._get_stamp()))
-
-    @Tracer.trace
-    def run(self):
-        if self._expect_path_exists:
-            # Wait up to 5 seconds and then check the disk, ensuring that
-            # nobody else wrote to it. This is particularly useful when
-            # running a high number of tests in parallel, ensuring that
-            # we aren't writing to the wrong disk.
-            time.sleep(self._rand_float(0, 5))
-
-            stamp = self._read_stamp()
-            assert stamp == b'\0' * len(self._get_stamp())
-
-        self._write_stamp()
-
-        stamp = self._read_stamp()
-        assert stamp == self._get_stamp()
-
-
-class RbdFsStampTest(RbdFsTestMixin, RbdStampTest):
-    _write_open_mode = "wb"
-    _expect_path_exists = False
-
-    def _get_stamp_path(self):
-        return self.get_subpath("test-stamp")
-
-
-class TestRunner(object):
-    def __init__(self,
-                 test_cls: typing.Type[RbdTest],
-                 test_params: dict = {},
-                 iterations: int = 1,
-                 workers: int = 1,
-                 stop_on_error: bool = False,
-                 cleanup_on_error: bool = True):
-        self.test_cls = test_cls
-        self.test_params = test_params
-        self.iterations = iterations
-        self.workers = workers
-        self.executor = futures.ThreadPoolExecutor(max_workers=workers)
-        self.lock = threading.Lock()
-        self.completed = 0
-        self.errors = 0
-        self.stopped = False
-        self.stop_on_error = stop_on_error
-        self.cleanup_on_error = cleanup_on_error
-
-    @Tracer.trace
-    def run(self):
-        tasks = []
-        for i in range(self.iterations):
-            task = self.executor.submit(self.run_single_test)
-            tasks.append(task)
-
-        LOG.info("Waiting for %d tests to complete.", self.iterations)
-        for task in tasks:
-            task.result()
-
-    def run_single_test(self):
-        failed = False
-        if self.stopped:
-            return
-
-        try:
-            test = self.test_cls(**self.test_params)
-            test.initialize()
-            test.run()
-        except KeyboardInterrupt:
-            LOG.warning("Received Ctrl-C.")
-            self.stopped = True
-        except Exception as ex:
-            failed = True
-            if self.stop_on_error:
-                self.stopped = True
-            with self.lock:
-                self.errors += 1
-                LOG.exception(
-                    "Test exception: %s. Total exceptions: %d",
-                    ex, self.errors)
-        finally:
-            if not failed or self.cleanup_on_error:
-                try:
-                    test.cleanup()
-                except KeyboardInterrupt:
-                    LOG.warning("Received Ctrl-C.")
-                    self.stopped = True
-                    # Retry the cleanup
-                    test.cleanup()
-                except Exception:
-                    LOG.exception("Test cleanup failed.")
-
-            with self.lock:
-                self.completed += 1
-                LOG.info("Completed tests: %d. Pending: %d",
-                         self.completed, self.iterations - self.completed)
-
-
-TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
-    'RbdTest': RbdTest,
-    'RbdFioTest': RbdFioTest,
-    'RbdResizeFioTest': RbdResizeFioTest,
-    'RbdStampTest': RbdStampTest,
-    # FS tests
-    'RbdFsTest': RbdFsTest,
-    'RbdFsFioTest': RbdFsFioTest,
-    'RbdFsStampTest': RbdFsStampTest,
-}
-
-if __name__ == '__main__':
-    args = parser.parse_args()
-
-    log_level = logging.WARNING
-    if args.verbose:
-        log_level = logging.INFO
-    if args.debug:
-        log_level = logging.DEBUG
-    setup_logging(log_level)
-
-    test_params = dict(
-        image_size_mb=args.image_size_mb,
-        image_prefix=args.image_prefix,
-        bs=args.bs,
-        op=args.op,
-        verify=args.fio_verify,
-        iodepth=args.fio_depth,
-        map_timeout=args.map_timeout,
-        skip_enabling_disk=args.skip_enabling_disk,
-    )
-
-    try:
-        test_cls = TESTS[args.test_name]
-    except KeyError:
-        raise CephTestException("Unknown test: {}".format(args.test_name))
-
-    runner = TestRunner(
-        test_cls,
-        test_params=test_params,
-        iterations=args.iterations,
-        workers=args.concurrency,
-        stop_on_error=args.stop_on_error,
-        cleanup_on_error=not args.skip_cleanup_on_error)
-    runner.run()
-
-    Tracer.print_results()
-    test_cls.print_results(
-        description="count: %d, concurrency: %d" %
-        (args.iterations, args.concurrency))
-
-    assert runner.errors == 0, f"encountered {runner.errors} error(s)."