From b52e07cbf3eea9090f7d80b2faee802ce46d0172 Mon Sep 17 00:00:00 2001 From: Lucian Petrut Date: Thu, 22 Dec 2022 15:26:36 +0200 Subject: [PATCH] qa: add some retries to test_rbd_wnbd.py The following operations may fail right after a block device is attached: * retrieving the disk number (can return -1) * opening the disk * setting the disk online or writable For this reason, we'll need to add some retries. For convenience, we're moving the existing retry logic to a separate decorator. Signed-off-by: Lucian Petrut --- qa/workunits/windows/test_rbd_wnbd.py | 116 ++++++++++++++++---------- 1 file changed, 72 insertions(+), 44 deletions(-) diff --git a/qa/workunits/windows/test_rbd_wnbd.py b/qa/workunits/windows/test_rbd_wnbd.py index c926b9c79992a..33572bd81473c 100644 --- a/qa/workunits/windows/test_rbd_wnbd.py +++ b/qa/workunits/windows/test_rbd_wnbd.py @@ -1,5 +1,6 @@ import argparse import collections +import functools import json import logging import math @@ -89,6 +90,10 @@ class CommandFailed(CephTestException): "Stdout: %(stdout)s. Stderr: %(stderr)s.") +class CephTestTimeout(CephTestException): + msg_fmt = "Operation timeout." + + def setup_logging(log_level: int = logging.INFO): handler = logging.StreamHandler() handler.setLevel(log_level) @@ -101,6 +106,50 @@ def setup_logging(log_level: int = logging.INFO): LOG.setLevel(logging.DEBUG) +def retry_decorator(timeout: int = 60, + retry_interval: int = 2, + silent_interval: int = 10, + additional_details: str = "", + retried_exceptions: + typing.Union[ + typing.Type[Exception], + collections.abc.Iterable[ + typing.Type[Exception]]] = Exception): + def wrapper(f: typing.Callable[..., typing.Any]): + @functools.wraps(f) + def inner(*args, **kwargs): + tstart: float = time.time() + elapsed: float = 0 + exc = None + details = additional_details or "%s failed" % f.__qualname__ + + while elapsed < timeout or not timeout: + try: + return f(*args, **kwargs) + except retried_exceptions as ex: + exc = ex + elapsed = time.time() - tstart + if elapsed > silent_interval: + level = logging.WARNING + else: + level = logging.DEBUG + LOG.log(level, + "Exception: %s. Additional details: %s. " + "Time elapsed: %d. Timeout: %d", + ex, details, elapsed, timeout) + + time.sleep(retry_interval) + elapsed = time.time() - tstart + + msg = ( + "Operation timed out. Exception: %s. Additional details: %s. " + "Time elapsed: %d. Timeout: %d.") + raise CephTestTimeout( + msg % (exc, details, elapsed, timeout)) + return inner + return wrapper + + def execute(*args, **kwargs): LOG.debug("Executing: %s", args) result = subprocess.run( @@ -234,10 +283,13 @@ class RbdImage(object): def get_disk_number(self, timeout: int = 60, retry_interval: int = 2): - tstart: float = time.time() - elapsed: float = 0 - LOG.info("Retrieving disk number: %s", self.name) - while elapsed < timeout or not timeout: + @retry_decorator( + retried_exceptions=CephTestException, + timeout=timeout, + retry_interval=retry_interval) + def _get_disk_number(): + LOG.info("Retrieving disk number: %s", self.name) + result = execute("rbd-wnbd", "show", self.name, "--format=json") disk_info = json.loads(result.stdout) disk_number = disk_info["disk_number"] @@ -245,60 +297,35 @@ class RbdImage(object): LOG.debug("Image %s disk number: %d", self.name, disk_number) return disk_number - elapsed = time.time() - tstart - if elapsed > 10: - level = logging.WARNING - else: - level = logging.DEBUG - LOG.log( - level, - "Could not get disk number: %s. Time elapsed: %d. Timeout: %d", - self.name, elapsed, timeout) - - time.sleep(retry_interval) - elapsed = time.time() - tstart + raise CephTestException( + f"Could not get disk number: {self.name}.") - raise CephTestException( - f"Could not get disk number for {self.name}. " - f"Time elapsed: {elapsed}. Timeout: {timeout}") + return _get_disk_number() @Tracer.trace def _wait_for_disk(self, timeout: int = 60, retry_interval: int = 2): - tstart: float = time.time() - elapsed: float = 0 - LOG.debug("Waiting for disk to be accessible: %s %s", - self.name, self.path) - while elapsed < timeout or not timeout: - try: - with open(self.path, 'rb') as _: - return - except FileNotFoundError: + @retry_decorator( + retried_exceptions=(FileNotFoundError, OSError), + additional_details="the mapped disk isn't available yet", + timeout=timeout, + retry_interval=retry_interval) + def wait_for_disk(): + LOG.debug("Waiting for disk to be accessible: %s %s", + self.name, self.path) + + with open(self.path, 'rb'): pass - elapsed = time.time() - tstart - if elapsed > 10: - level = logging.WARNING - else: - level = logging.DEBUG - LOG.log(level, - "The mapped disk isn't accessible yet: %s %s. " - "Time elapsed: %d. Timeout: %d", - self.name, self.path, elapsed, timeout) - - time.sleep(retry_interval) - elapsed = time.time() - tstart - - raise CephTestException( - f"The mapped disk isn't accessible yet: {self.name} {self.path}. " - f"Time elapsed: {elapsed}. Timeout: {timeout}") + return wait_for_disk() @property def path(self): return f"\\\\.\\PhysicalDrive{self.disk_number}" @Tracer.trace + @retry_decorator(additional_details="couldn't clear disk read-only flag") def set_writable(self): execute( "powershell.exe", "-command", @@ -306,6 +333,7 @@ class RbdImage(object): "-IsReadOnly", "$false") @Tracer.trace + @retry_decorator(additional_details="couldn't bring the disk online") def set_online(self): execute( "powershell.exe", "-command", -- 2.39.5