From 71caa85b5338d2bc4099e85d0dc26d864edde762 Mon Sep 17 00:00:00 2001 From: Lucian Petrut Date: Tue, 10 Jan 2023 16:50:04 +0200 Subject: [PATCH] qa: reorganize Windows python test We're splitting the rbd-wnbd python test into separate files so that the common code may easily be reused by other tests. This also makes the code easier to read and maintain. Signed-off-by: Lucian Petrut (cherry picked from commit 808d42d575c97b6d0db0e6c6d88ee5fda97fe1b1) --- qa/workunits/windows/py_tests/__init__.py | 0 .../windows/py_tests/internal/__init__.py | 0 .../windows/py_tests/internal/exception.py | 27 + .../windows/py_tests/internal/rbd_image.py | 242 +++++++++ .../windows/py_tests/internal/tracer.py | 75 +++ .../windows/py_tests/internal/utils.py | 119 +++++ .../windows/py_tests/rbd_wnbd/__init__.py | 0 .../rbd_wnbd/stress_test.py} | 487 ++---------------- qa/workunits/windows/run-tests.ps1 | 17 +- 9 files changed, 524 insertions(+), 443 deletions(-) create mode 100644 qa/workunits/windows/py_tests/__init__.py create mode 100644 qa/workunits/windows/py_tests/internal/__init__.py create mode 100644 qa/workunits/windows/py_tests/internal/exception.py create mode 100644 qa/workunits/windows/py_tests/internal/rbd_image.py create mode 100644 qa/workunits/windows/py_tests/internal/tracer.py create mode 100644 qa/workunits/windows/py_tests/internal/utils.py create mode 100644 qa/workunits/windows/py_tests/rbd_wnbd/__init__.py rename qa/workunits/windows/{test_rbd_wnbd.py => py_tests/rbd_wnbd/stress_test.py} (53%) diff --git a/qa/workunits/windows/py_tests/__init__.py b/qa/workunits/windows/py_tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/qa/workunits/windows/py_tests/internal/__init__.py b/qa/workunits/windows/py_tests/internal/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/qa/workunits/windows/py_tests/internal/exception.py b/qa/workunits/windows/py_tests/internal/exception.py new file mode 100644 index 00000000000..27a02dbe8cb --- /dev/null +++ b/qa/workunits/windows/py_tests/internal/exception.py @@ -0,0 +1,27 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + +class CephTestException(Exception): + msg_fmt = "An exception has been encountered." + + def __init__(self, message: str = '', **kwargs): + self.kwargs = kwargs + if not message: + message = self.msg_fmt % kwargs + self.message = message + super(CephTestException, self).__init__(message) + + +class CommandFailed(CephTestException): + msg_fmt = ( + "Command failed: %(command)s. " + "Return code: %(returncode)s. " + "Stdout: %(stdout)s. Stderr: %(stderr)s.") + + +class CephTestTimeout(CephTestException): + msg_fmt = "Operation timeout." diff --git a/qa/workunits/windows/py_tests/internal/rbd_image.py b/qa/workunits/windows/py_tests/internal/rbd_image.py new file mode 100644 index 00000000000..be2f2300ff8 --- /dev/null +++ b/qa/workunits/windows/py_tests/internal/rbd_image.py @@ -0,0 +1,242 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + +import json +import logging +import time + +from py_tests.internal import exception +from py_tests.internal.tracer import Tracer +from py_tests.internal import utils + +LOG = logging.getLogger() + + +class RbdImage(object): + def __init__(self, + name: str, + size_mb: int, + is_shared: bool = True, + disk_number: int = -1, + mapped: bool = False): + self.name = name + self.size_mb = size_mb + self.is_shared = is_shared + self.disk_number = disk_number + self.mapped = mapped + self.removed = False + self.drive_letter = "" + + @classmethod + @Tracer.trace + def create(cls, + name: str, + size_mb: int = 1024, + is_shared: bool = True): + LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb) + cmd = ["rbd", "create", name, "--size", "%sM" % size_mb] + if is_shared: + cmd += ["--image-shared"] + utils.execute(*cmd) + + return RbdImage(name, size_mb, is_shared) + + @Tracer.trace + def get_disk_number(self, + timeout: int = 60, + retry_interval: int = 2): + @utils.retry_decorator( + retried_exceptions=exception.CephTestException, + timeout=timeout, + retry_interval=retry_interval) + def _get_disk_number(): + LOG.info("Retrieving disk number: %s", self.name) + + result = utils.execute( + "rbd-wnbd", "show", self.name, "--format=json") + disk_info = json.loads(result.stdout) + disk_number = disk_info["disk_number"] + if disk_number > 0: + LOG.debug("Image %s disk number: %d", self.name, disk_number) + return disk_number + + raise exception.CephTestException( + f"Could not get disk number: {self.name}.") + + return _get_disk_number() + + @Tracer.trace + def _wait_for_disk(self, + timeout: int = 60, + retry_interval: int = 2): + @utils.retry_decorator( + retried_exceptions=(FileNotFoundError, OSError), + additional_details="the mapped disk isn't available yet", + timeout=timeout, + retry_interval=retry_interval) + def wait_for_disk(): + LOG.debug("Waiting for disk to be accessible: %s %s", + self.name, self.path) + + with open(self.path, 'rb'): + pass + + return wait_for_disk() + + @property + def path(self): + return f"\\\\.\\PhysicalDrive{self.disk_number}" + + @Tracer.trace + @utils.retry_decorator( + additional_details="couldn't clear disk read-only flag") + def set_writable(self): + utils.ps_execute( + "Set-Disk", "-Number", str(self.disk_number), + "-IsReadOnly", "$false") + + @Tracer.trace + @utils.retry_decorator(additional_details="couldn't bring the disk online") + def set_online(self): + utils.ps_execute( + "Set-Disk", "-Number", str(self.disk_number), + "-IsOffline", "$false") + + @Tracer.trace + def map(self, timeout: int = 60): + LOG.info("Mapping image: %s", self.name) + tstart = time.time() + + utils.execute("rbd-wnbd", "map", self.name) + self.mapped = True + + self.disk_number = self.get_disk_number(timeout=timeout) + + elapsed = time.time() - tstart + self._wait_for_disk(timeout=timeout - elapsed) + + @Tracer.trace + def unmap(self): + if self.mapped: + LOG.info("Unmapping image: %s", self.name) + utils.execute("rbd-wnbd", "unmap", self.name) + self.mapped = False + + @Tracer.trace + @utils.retry_decorator() + def remove(self): + if not self.removed: + LOG.info("Removing image: %s", self.name) + utils.execute("rbd", "rm", self.name) + self.removed = True + + def cleanup(self): + try: + self.unmap() + finally: + self.remove() + + @Tracer.trace + @utils.retry_decorator() + def _init_disk(self): + cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk" + utils.ps_execute(cmd) + + @Tracer.trace + @utils.retry_decorator() + def _create_partition(self): + cmd = (f"Get-Disk -Number {self.disk_number} | " + "New-Partition -AssignDriveLetter -UseMaximumSize") + utils.ps_execute(cmd) + + @Tracer.trace + @utils.retry_decorator() + def _format_volume(self): + cmd = ( + f"(Get-Partition -DiskNumber {self.disk_number}" + " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false") + utils.ps_execute(cmd) + + @Tracer.trace + @utils.retry_decorator() + def _get_drive_letter(self): + cmd = (f"(Get-Partition -DiskNumber {self.disk_number}" + " | ? { $_.DriveLetter }).DriveLetter") + result = utils.ps_execute(cmd) + + # The PowerShell command will place a null character if no drive letter + # is available. For example, we can receive "\x00\r\n". + self.drive_letter = result.stdout.decode().strip() + if not self.drive_letter.isalpha() or len(self.drive_letter) != 1: + raise exception.CephTestException( + "Invalid drive letter received: %s" % self.drive_letter) + + @Tracer.trace + def init_fs(self): + if not self.mapped: + raise exception.CephTestException( + "Unable to create fs, image not mapped.") + + LOG.info("Initializing fs, image: %s.", self.name) + + self._init_disk() + self._create_partition() + self._format_volume() + self._get_drive_letter() + + @Tracer.trace + def get_fs_capacity(self): + if not self.drive_letter: + raise exception.CephTestException("No drive letter available") + + cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size" + result = utils.ps_execute(cmd) + + return int(result.stdout.decode().strip()) + + @Tracer.trace + def resize(self, new_size_mb, allow_shrink=False): + LOG.info( + "Resizing image: %s. New size: %s MB, old size: %s MB", + self.name, new_size_mb, self.size_mb) + + cmd = ["rbd", "resize", self.name, + "--size", f"{new_size_mb}M", "--no-progress"] + if allow_shrink: + cmd.append("--allow-shrink") + + utils.execute(*cmd) + + self.size_mb = new_size_mb + + @Tracer.trace + def get_disk_size(self): + """Retrieve the virtual disk size (bytes) reported by Windows.""" + cmd = f"(Get-Disk -Number {self.disk_number}).Size" + result = utils.ps_execute(cmd) + + disk_size = result.stdout.decode().strip() + if not disk_size.isdigit(): + raise exception.CephTestException( + "Invalid disk size received: %s" % disk_size) + + return int(disk_size) + + @Tracer.trace + @utils.retry_decorator(timeout=30) + def wait_for_disk_resize(self): + # After resizing the rbd image, the daemon is expected to receive + # the notification, inform the WNBD driver and then trigger a disk + # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds, + # so we'll need to do some polling. + disk_size = self.get_disk_size() + disk_size_mb = disk_size // (1 << 20) + + if disk_size_mb != self.size_mb: + raise exception.CephTestException( + "The disk size hasn't been updated yet. Retrieved size: " + f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.") diff --git a/qa/workunits/windows/py_tests/internal/tracer.py b/qa/workunits/windows/py_tests/internal/tracer.py new file mode 100644 index 00000000000..52a64b7be3c --- /dev/null +++ b/qa/workunits/windows/py_tests/internal/tracer.py @@ -0,0 +1,75 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + +import collections +import prettytable +import threading +import time + +from py_tests.internal import utils + + +class Tracer: + data: collections.OrderedDict = collections.OrderedDict() + lock = threading.Lock() + + @classmethod + def trace(cls, func): + def wrapper(*args, **kwargs): + tstart = time.time() + exc_str = None + + # Preserve call order + with cls.lock: + if func.__qualname__ not in cls.data: + cls.data[func.__qualname__] = list() + + try: + return func(*args, **kwargs) + except Exception as exc: + exc_str = str(exc) + raise + finally: + tend = time.time() + + with cls.lock: + cls.data[func.__qualname__] += [{ + "duration": tend - tstart, + "error": exc_str, + }] + + return wrapper + + @classmethod + def get_results(cls): + stats = collections.OrderedDict() + for f in cls.data.keys(): + stats[f] = utils.array_stats([i['duration'] for i in cls.data[f]]) + errors = [] + for i in cls.data[f]: + if i['error']: + errors.append(i['error']) + + stats[f]['errors'] = errors + return stats + + @classmethod + def print_results(cls): + r = cls.get_results() + + table = prettytable.PrettyTable(title="Duration (s)") + table.field_names = [ + "function", "min", "max", "total", + "mean", "median", "std_dev", + "max 90%", "min 90%", "count", "errors"] + table.float_format = ".4" + for f, s in r.items(): + table.add_row([f, s['min'], s['max'], s['sum'], + s['mean'], s['median'], s['std_dev'], + s['max_90'], s['min_90'], + s['count'], len(s['errors'])]) + print(table) diff --git a/qa/workunits/windows/py_tests/internal/utils.py b/qa/workunits/windows/py_tests/internal/utils.py new file mode 100644 index 00000000000..0fb5d328961 --- /dev/null +++ b/qa/workunits/windows/py_tests/internal/utils.py @@ -0,0 +1,119 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + +import collections +import functools +import logging +import math +import subprocess +import time +import typing + +from py_tests.internal import exception + +LOG = logging.getLogger() + + +def setup_logging(log_level: int = logging.INFO): + handler = logging.StreamHandler() + handler.setLevel(log_level) + + log_fmt = '[%(asctime)s] %(levelname)s - %(message)s' + formatter = logging.Formatter(log_fmt) + handler.setFormatter(formatter) + + LOG.addHandler(handler) + LOG.setLevel(logging.DEBUG) + + +def retry_decorator(timeout: int = 60, + retry_interval: int = 2, + silent_interval: int = 10, + additional_details: str = "", + retried_exceptions: + typing.Union[ + typing.Type[Exception], + collections.abc.Iterable[ + typing.Type[Exception]]] = Exception): + def wrapper(f: typing.Callable[..., typing.Any]): + @functools.wraps(f) + def inner(*args, **kwargs): + tstart: float = time.time() + elapsed: float = 0 + exc = None + details = additional_details or "%s failed" % f.__qualname__ + + while elapsed < timeout or not timeout: + try: + return f(*args, **kwargs) + except retried_exceptions as ex: + exc = ex + elapsed = time.time() - tstart + if elapsed > silent_interval: + level = logging.WARNING + else: + level = logging.DEBUG + LOG.log(level, + "Exception: %s. Additional details: %s. " + "Time elapsed: %d. Timeout: %d", + ex, details, elapsed, timeout) + + time.sleep(retry_interval) + elapsed = time.time() - tstart + + msg = ( + "Operation timed out. Exception: %s. Additional details: %s. " + "Time elapsed: %d. Timeout: %d.") + raise exception.CephTestTimeout( + msg % (exc, details, elapsed, timeout)) + return inner + return wrapper + + +def execute(*args, **kwargs): + LOG.debug("Executing: %s", args) + result = subprocess.run( + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs) + LOG.debug("Command %s returned %d.", args, result.returncode) + if result.returncode: + exc = exception.CommandFailed( + command=args, returncode=result.returncode, + stdout=result.stdout, stderr=result.stderr) + raise exc + return result + + +def ps_execute(*args, **kwargs): + # Disable PS progress bar, causes issues when invoked remotely. + prefix = "$global:ProgressPreference = 'SilentlyContinue' ; " + return execute( + "powershell.exe", "-NonInteractive", + "-Command", prefix, *args, **kwargs) + + +def array_stats(array: list): + mean = sum(array) / len(array) if len(array) else 0 + variance = (sum((i - mean) ** 2 for i in array) / len(array) + if len(array) else 0) + std_dev = math.sqrt(variance) + sorted_array = sorted(array) + + return { + 'min': min(array) if len(array) else 0, + 'max': max(array) if len(array) else 0, + 'sum': sum(array) if len(array) else 0, + 'mean': mean, + 'median': sorted_array[len(array) // 2] if len(array) else 0, + 'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0, + 'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0, + 'variance': variance, + 'std_dev': std_dev, + 'count': len(array) + } diff --git a/qa/workunits/windows/py_tests/rbd_wnbd/__init__.py b/qa/workunits/windows/py_tests/rbd_wnbd/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/qa/workunits/windows/test_rbd_wnbd.py b/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py similarity index 53% rename from qa/workunits/windows/test_rbd_wnbd.py rename to qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py index 41d255a1961..78f9555dd3b 100644 --- a/qa/workunits/windows/test_rbd_wnbd.py +++ b/qa/workunits/windows/py_tests/rbd_wnbd/stress_test.py @@ -1,22 +1,32 @@ +# Copyright (C) 2023 Cloudbase Solutions +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation (see LICENSE). + import argparse import collections -import functools import json import logging import math import os import prettytable import random -import subprocess import time import threading import typing import uuid from concurrent import futures +from py_tests.internal import exception +from py_tests.internal.rbd_image import RbdImage +from py_tests.internal.tracer import Tracer +from py_tests.internal import utils + LOG = logging.getLogger() -parser = argparse.ArgumentParser(description='rbd-wnbd tests') +parser = argparse.ArgumentParser(description='rbd-wnbd stress tests') parser.add_argument('--test-name', help='The test to be run.', default="RbdFioTest") @@ -72,414 +82,6 @@ parser.add_argument('--skip-cleanup-on-error', action='store_true', help='Skip cleanup when hitting errors.') -class CephTestException(Exception): - msg_fmt = "An exception has been encountered." - - def __init__(self, message: str = '', **kwargs): - self.kwargs = kwargs - if not message: - message = self.msg_fmt % kwargs - self.message = message - super(CephTestException, self).__init__(message) - - -class CommandFailed(CephTestException): - msg_fmt = ( - "Command failed: %(command)s. " - "Return code: %(returncode)s. " - "Stdout: %(stdout)s. Stderr: %(stderr)s.") - - -class CephTestTimeout(CephTestException): - msg_fmt = "Operation timeout." - - -def setup_logging(log_level: int = logging.INFO): - handler = logging.StreamHandler() - handler.setLevel(log_level) - - log_fmt = '[%(asctime)s] %(levelname)s - %(message)s' - formatter = logging.Formatter(log_fmt) - handler.setFormatter(formatter) - - LOG.addHandler(handler) - LOG.setLevel(logging.DEBUG) - - -def retry_decorator(timeout: int = 60, - retry_interval: int = 2, - silent_interval: int = 10, - additional_details: str = "", - retried_exceptions: - typing.Union[ - typing.Type[Exception], - collections.abc.Iterable[ - typing.Type[Exception]]] = Exception): - def wrapper(f: typing.Callable[..., typing.Any]): - @functools.wraps(f) - def inner(*args, **kwargs): - tstart: float = time.time() - elapsed: float = 0 - exc = None - details = additional_details or "%s failed" % f.__qualname__ - - while elapsed < timeout or not timeout: - try: - return f(*args, **kwargs) - except retried_exceptions as ex: - exc = ex - elapsed = time.time() - tstart - if elapsed > silent_interval: - level = logging.WARNING - else: - level = logging.DEBUG - LOG.log(level, - "Exception: %s. Additional details: %s. " - "Time elapsed: %d. Timeout: %d", - ex, details, elapsed, timeout) - - time.sleep(retry_interval) - elapsed = time.time() - tstart - - msg = ( - "Operation timed out. Exception: %s. Additional details: %s. " - "Time elapsed: %d. Timeout: %d.") - raise CephTestTimeout( - msg % (exc, details, elapsed, timeout)) - return inner - return wrapper - - -def execute(*args, **kwargs): - LOG.debug("Executing: %s", args) - result = subprocess.run( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - LOG.debug("Command %s returned %d.", args, result.returncode) - if result.returncode: - exc = CommandFailed( - command=args, returncode=result.returncode, - stdout=result.stdout, stderr=result.stderr) - raise exc - return result - - -def ps_execute(*args, **kwargs): - # Disable PS progress bar, causes issues when invoked remotely. - prefix = "$global:ProgressPreference = 'SilentlyContinue' ; " - return execute( - "powershell.exe", "-NonInteractive", - "-Command", prefix, *args, **kwargs) - - -def array_stats(array: list): - mean = sum(array) / len(array) if len(array) else 0 - variance = (sum((i - mean) ** 2 for i in array) / len(array) - if len(array) else 0) - std_dev = math.sqrt(variance) - sorted_array = sorted(array) - - return { - 'min': min(array) if len(array) else 0, - 'max': max(array) if len(array) else 0, - 'sum': sum(array) if len(array) else 0, - 'mean': mean, - 'median': sorted_array[len(array) // 2] if len(array) else 0, - 'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0, - 'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0, - 'variance': variance, - 'std_dev': std_dev, - 'count': len(array) - } - - -class Tracer: - data: collections.OrderedDict = collections.OrderedDict() - lock = threading.Lock() - - @classmethod - def trace(cls, func): - def wrapper(*args, **kwargs): - tstart = time.time() - exc_str = None - - # Preserve call order - with cls.lock: - if func.__qualname__ not in cls.data: - cls.data[func.__qualname__] = list() - - try: - return func(*args, **kwargs) - except Exception as exc: - exc_str = str(exc) - raise - finally: - tend = time.time() - - with cls.lock: - cls.data[func.__qualname__] += [{ - "duration": tend - tstart, - "error": exc_str, - }] - - return wrapper - - @classmethod - def get_results(cls): - stats = collections.OrderedDict() - for f in cls.data.keys(): - stats[f] = array_stats([i['duration'] for i in cls.data[f]]) - errors = [] - for i in cls.data[f]: - if i['error']: - errors.append(i['error']) - - stats[f]['errors'] = errors - return stats - - @classmethod - def print_results(cls): - r = cls.get_results() - - table = prettytable.PrettyTable(title="Duration (s)") - table.field_names = [ - "function", "min", "max", "total", - "mean", "median", "std_dev", - "max 90%", "min 90%", "count", "errors"] - table.float_format = ".4" - for f, s in r.items(): - table.add_row([f, s['min'], s['max'], s['sum'], - s['mean'], s['median'], s['std_dev'], - s['max_90'], s['min_90'], - s['count'], len(s['errors'])]) - print(table) - - -class RbdImage(object): - def __init__(self, - name: str, - size_mb: int, - is_shared: bool = True, - disk_number: int = -1, - mapped: bool = False): - self.name = name - self.size_mb = size_mb - self.is_shared = is_shared - self.disk_number = disk_number - self.mapped = mapped - self.removed = False - self.drive_letter = "" - - @classmethod - @Tracer.trace - def create(cls, - name: str, - size_mb: int = 1024, - is_shared: bool = True): - LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb) - cmd = ["rbd", "create", name, "--size", "%sM" % size_mb] - if is_shared: - cmd += ["--image-shared"] - execute(*cmd) - - return RbdImage(name, size_mb, is_shared) - - @Tracer.trace - def get_disk_number(self, - timeout: int = 60, - retry_interval: int = 2): - @retry_decorator( - retried_exceptions=CephTestException, - timeout=timeout, - retry_interval=retry_interval) - def _get_disk_number(): - LOG.info("Retrieving disk number: %s", self.name) - - result = execute("rbd-wnbd", "show", self.name, "--format=json") - disk_info = json.loads(result.stdout) - disk_number = disk_info["disk_number"] - if disk_number > 0: - LOG.debug("Image %s disk number: %d", self.name, disk_number) - return disk_number - - raise CephTestException( - f"Could not get disk number: {self.name}.") - - return _get_disk_number() - - @Tracer.trace - def _wait_for_disk(self, - timeout: int = 60, - retry_interval: int = 2): - @retry_decorator( - retried_exceptions=(FileNotFoundError, OSError), - additional_details="the mapped disk isn't available yet", - timeout=timeout, - retry_interval=retry_interval) - def wait_for_disk(): - LOG.debug("Waiting for disk to be accessible: %s %s", - self.name, self.path) - - with open(self.path, 'rb'): - pass - - return wait_for_disk() - - @property - def path(self): - return f"\\\\.\\PhysicalDrive{self.disk_number}" - - @Tracer.trace - @retry_decorator(additional_details="couldn't clear disk read-only flag") - def set_writable(self): - ps_execute( - "Set-Disk", "-Number", str(self.disk_number), - "-IsReadOnly", "$false") - - @Tracer.trace - @retry_decorator(additional_details="couldn't bring the disk online") - def set_online(self): - ps_execute( - "Set-Disk", "-Number", str(self.disk_number), - "-IsOffline", "$false") - - @Tracer.trace - def map(self, timeout: int = 60): - LOG.info("Mapping image: %s", self.name) - tstart = time.time() - - execute("rbd-wnbd", "map", self.name) - self.mapped = True - - self.disk_number = self.get_disk_number(timeout=timeout) - - elapsed = time.time() - tstart - self._wait_for_disk(timeout=timeout - elapsed) - - @Tracer.trace - def unmap(self): - if self.mapped: - LOG.info("Unmapping image: %s", self.name) - execute("rbd-wnbd", "unmap", self.name) - self.mapped = False - - @Tracer.trace - @retry_decorator() - def remove(self): - if not self.removed: - LOG.info("Removing image: %s", self.name) - execute("rbd", "rm", self.name) - self.removed = True - - def cleanup(self): - try: - self.unmap() - finally: - self.remove() - - @Tracer.trace - @retry_decorator() - def _init_disk(self): - cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk" - ps_execute(cmd) - - @Tracer.trace - @retry_decorator() - def _create_partition(self): - cmd = (f"Get-Disk -Number {self.disk_number} | " - "New-Partition -AssignDriveLetter -UseMaximumSize") - ps_execute(cmd) - - @Tracer.trace - @retry_decorator() - def _format_volume(self): - cmd = ( - f"(Get-Partition -DiskNumber {self.disk_number}" - " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false") - ps_execute(cmd) - - @Tracer.trace - @retry_decorator() - def _get_drive_letter(self): - cmd = (f"(Get-Partition -DiskNumber {self.disk_number}" - " | ? { $_.DriveLetter }).DriveLetter") - result = ps_execute(cmd) - - # The PowerShell command will place a null character if no drive letter - # is available. For example, we can receive "\x00\r\n". - self.drive_letter = result.stdout.decode().strip() - if not self.drive_letter.isalpha() or len(self.drive_letter) != 1: - raise CephTestException( - "Invalid drive letter received: %s" % self.drive_letter) - - @Tracer.trace - def init_fs(self): - if not self.mapped: - raise CephTestException("Unable to create fs, image not mapped.") - - LOG.info("Initializing fs, image: %s.", self.name) - - self._init_disk() - self._create_partition() - self._format_volume() - self._get_drive_letter() - - @Tracer.trace - def get_fs_capacity(self): - if not self.drive_letter: - raise CephTestException("No drive letter available") - - cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size" - result = ps_execute(cmd) - - return int(result.stdout.decode().strip()) - - @Tracer.trace - def resize(self, new_size_mb, allow_shrink=False): - LOG.info( - "Resizing image: %s. New size: %s MB, old size: %s MB", - self.name, new_size_mb, self.size_mb) - - cmd = ["rbd", "resize", self.name, - "--size", f"{new_size_mb}M", "--no-progress"] - if allow_shrink: - cmd.append("--allow-shrink") - - execute(*cmd) - - self.size_mb = new_size_mb - - @Tracer.trace - def get_disk_size(self): - """Retrieve the virtual disk size (bytes) reported by Windows.""" - cmd = f"(Get-Disk -Number {self.disk_number}).Size" - result = ps_execute(cmd) - - disk_size = result.stdout.decode().strip() - if not disk_size.isdigit(): - raise CephTestException( - "Invalid disk size received: %s" % disk_size) - - return int(disk_size) - - @Tracer.trace - @retry_decorator(timeout=30) - def wait_for_disk_resize(self): - # After resizing the rbd image, the daemon is expected to receive - # the notification, inform the WNBD driver and then trigger a disk - # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds, - # so we'll need to do some polling. - disk_size = self.get_disk_size() - disk_size_mb = disk_size // (1 << 20) - - if disk_size_mb != self.size_mb: - raise CephTestException( - "The disk size hasn't been updated yet. Retrieved size: " - f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.") - - class RbdTest(object): image: RbdImage @@ -516,6 +118,7 @@ class RbdTest(object): def cleanup(self): if self.image: self.image.cleanup() + self.image = None @classmethod def print_results(cls, @@ -617,7 +220,7 @@ class RbdFioTest(RbdTest): ] if self.verify: cmd += ["--verify=%s" % self.verify] - result = execute(*cmd) + result = utils.execute(*cmd) LOG.info("Completed FIO test.") self.process_result(result.stdout) @@ -643,44 +246,46 @@ class RbdFioTest(RbdTest): op_data = cls.data[op] - s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in op_data]) + s = utils.array_stats( + [float(i["bw_bytes"]) / 1000_000 for i in op_data]) table.add_row(["bandwidth (MB/s)", s['min'], s['max'], s['mean'], s['median'], s['std_dev'], s['max_90'], s['min_90'], 'N/A']) - s = array_stats([float(i["runtime"]) for i in op_data]) + s = utils.array_stats([float(i["runtime"]) for i in op_data]) table.add_row(["duration (s)", s['min'], s['max'], s['mean'], s['median'], s['std_dev'], s['max_90'], s['min_90'], s['sum']]) - s = array_stats([i["error"] for i in op_data]) + s = utils.array_stats([i["error"] for i in op_data]) table.add_row(["errors", s['min'], s['max'], s['mean'], s['median'], s['std_dev'], s['max_90'], s['min_90'], s['sum']]) - s = array_stats([i["short_ios"] for i in op_data]) + s = utils.array_stats([i["short_ios"] for i in op_data]) table.add_row(["incomplete IOs", s['min'], s['max'], s['mean'], s['median'], s['std_dev'], s['max_90'], s['min_90'], s['sum']]) - s = array_stats([i["dropped_ios"] for i in op_data]) + s = utils.array_stats([i["dropped_ios"] for i in op_data]) table.add_row(["dropped IOs", s['min'], s['max'], s['mean'], s['median'], s['std_dev'], s['max_90'], s['min_90'], s['sum']]) - clat_min = array_stats([i["clat_ns_min"] for i in op_data]) - clat_max = array_stats([i["clat_ns_max"] for i in op_data]) - clat_mean = array_stats([i["clat_ns_mean"] for i in op_data]) + clat_min = utils.array_stats([i["clat_ns_min"] for i in op_data]) + clat_max = utils.array_stats([i["clat_ns_max"] for i in op_data]) + clat_mean = utils.array_stats([i["clat_ns_mean"] for i in op_data]) clat_stddev = math.sqrt( - sum([float(i["clat_ns_stddev"]) ** 2 for i in op_data]) / len(op_data) + sum([float(i["clat_ns_stddev"]) ** 2 + for i in op_data]) / len(op_data) if len(op_data) else 0) - clat_10 = array_stats([i["clat_ns_10"] for i in op_data]) - clat_90 = array_stats([i["clat_ns_90"] for i in op_data]) + clat_10 = utils.array_stats([i["clat_ns_10"] for i in op_data]) + clat_90 = utils.array_stats([i["clat_ns_90"] for i in op_data]) # For convenience, we'll convert it from ns to seconds. table.add_row(["completion latency (s)", clat_min['min'] / 1e+9, @@ -744,14 +349,25 @@ class RbdStampTest(RbdTest): _write_open_mode = "rb+" _read_open_mode = "rb" _expect_path_exists = True + _stamp_size = 512 + + def __init__(self, *args, **kwargs): + super(RbdStampTest, self).__init__(*args, **kwargs) + + # We allow running the test repeatedly, for example after a + # remount operation. + self._previous_stamp = b'\0' * self._stamp_size @staticmethod def _rand_float(min_val: float, max_val: float): return min_val + (random.random() * max_val - min_val) def _get_stamp(self): - buff = self.image_name.encode() - padding = 512 - len(buff) + buff_str = self.image_name + "-" + str(uuid.uuid4()) + buff = buff_str.encode() + assert len(buff) <= self._stamp_size + + padding = self._stamp_size - len(buff) buff += b'\0' * padding return buff @@ -759,15 +375,14 @@ class RbdStampTest(RbdTest): return self.image.path @Tracer.trace - def _write_stamp(self): + def _write_stamp(self, stamp): with open(self._get_stamp_path(), self._write_open_mode) as disk: - stamp = self._get_stamp() disk.write(stamp) @Tracer.trace def _read_stamp(self): with open(self._get_stamp_path(), self._read_open_mode) as disk: - return disk.read(len(self._get_stamp())) + return disk.read(self._stamp_size) @Tracer.trace def run(self): @@ -779,12 +394,15 @@ class RbdStampTest(RbdTest): time.sleep(self._rand_float(0, 5)) stamp = self._read_stamp() - assert stamp == b'\0' * len(self._get_stamp()) + assert self._previous_stamp == stamp + + w_stamp = self._get_stamp() + self._write_stamp(w_stamp) - self._write_stamp() + r_stamp = self._read_stamp() + assert w_stamp == r_stamp - stamp = self._read_stamp() - assert stamp == self._get_stamp() + self._previous_stamp = w_stamp class RbdFsStampTest(RbdFsTestMixin, RbdStampTest): @@ -884,7 +502,7 @@ if __name__ == '__main__': log_level = logging.INFO if args.debug: log_level = logging.DEBUG - setup_logging(log_level) + utils.setup_logging(log_level) test_params = dict( image_size_mb=args.image_size_mb, @@ -900,7 +518,8 @@ if __name__ == '__main__': try: test_cls = TESTS[args.test_name] except KeyError: - raise CephTestException("Unknown test: {}".format(args.test_name)) + raise exception.CephTestException( + "Unknown test: {}".format(args.test_name)) runner = TestRunner( test_cls, diff --git a/qa/workunits/windows/run-tests.ps1 b/qa/workunits/windows/run-tests.ps1 index 6d818f4267e..7f44e87acf6 100644 --- a/qa/workunits/windows/run-tests.ps1 +++ b/qa/workunits/windows/run-tests.ps1 @@ -4,7 +4,7 @@ $ErrorActionPreference = "Stop" $scriptLocation = [System.IO.Path]::GetDirectoryName( $myInvocation.MyCommand.Definition) -$testRbdWnbd = "$scriptLocation/test_rbd_wnbd.py" +$env:PYTHONPATH += ";$scriptLocation" function safe_exec() { # Powershell doesn't check the command exit code, we'll need to @@ -16,14 +16,13 @@ function safe_exec() { } } -safe_exec python.exe $testRbdWnbd --test-name RbdTest --iterations 100 -safe_exec python.exe $testRbdWnbd --test-name RbdFioTest --iterations 100 -safe_exec python.exe $testRbdWnbd --test-name RbdStampTest --iterations 100 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdTest --iterations 100 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFioTest --iterations 100 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdStampTest --iterations 100 # It can take a while to setup the partition (~10s), we'll use fewer iterations. -safe_exec python.exe $testRbdWnbd --test-name RbdFsTest --iterations 4 -safe_exec python.exe $testRbdWnbd --test-name RbdFsFioTest --iterations 4 -safe_exec python.exe $testRbdWnbd --test-name RbdFsStampTest --iterations 4 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsTest --iterations 4 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsFioTest --iterations 4 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdFsStampTest --iterations 4 -safe_exec python.exe $testRbdWnbd ` - --test-name RbdResizeFioTest --image-size-mb 64 +safe_exec python.exe -m py_tests.rbd_wnbd.stress_test --test-name RbdResizeFioTest --image-size-mb 64 -- 2.39.5