]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa: add some retries to test_rbd_wnbd.py
authorLucian Petrut <lpetrut@cloudbasesolutions.com>
Thu, 22 Dec 2022 13:26:36 +0000 (15:26 +0200)
committerLucian Petrut <lpetrut@cloudbasesolutions.com>
Fri, 23 Dec 2022 17:38:23 +0000 (19:38 +0200)
The following operations may fail right after a block device
is attached:

* retrieving the disk number (can return -1)
* opening the disk
* setting the disk online or writable

For this reason, we'll need to add some retries. For convenience,
we're moving the existing retry logic to a separate decorator.

Signed-off-by: Lucian Petrut <lpetrut@cloudbasesolutions.com>
qa/workunits/windows/test_rbd_wnbd.py

index c926b9c79992a9b9685caf1feee8114fca5e36ad..33572bd81473c72bb3476ef6e2acc87bdfc3f031 100644 (file)
@@ -1,5 +1,6 @@
 import argparse
 import collections
+import functools
 import json
 import logging
 import math
@@ -89,6 +90,10 @@ class CommandFailed(CephTestException):
         "Stdout: %(stdout)s. Stderr: %(stderr)s.")
 
 
+class CephTestTimeout(CephTestException):
+    msg_fmt = "Operation timeout."
+
+
 def setup_logging(log_level: int = logging.INFO):
     handler = logging.StreamHandler()
     handler.setLevel(log_level)
@@ -101,6 +106,50 @@ def setup_logging(log_level: int = logging.INFO):
     LOG.setLevel(logging.DEBUG)
 
 
+def retry_decorator(timeout: int = 60,
+                    retry_interval: int = 2,
+                    silent_interval: int = 10,
+                    additional_details: str = "",
+                    retried_exceptions:
+                        typing.Union[
+                            typing.Type[Exception],
+                            collections.abc.Iterable[
+                                typing.Type[Exception]]] = Exception):
+    def wrapper(f: typing.Callable[..., typing.Any]):
+        @functools.wraps(f)
+        def inner(*args, **kwargs):
+            tstart: float = time.time()
+            elapsed: float = 0
+            exc = None
+            details = additional_details or "%s failed" % f.__qualname__
+
+            while elapsed < timeout or not timeout:
+                try:
+                    return f(*args, **kwargs)
+                except retried_exceptions as ex:
+                    exc = ex
+                    elapsed = time.time() - tstart
+                    if elapsed > silent_interval:
+                        level = logging.WARNING
+                    else:
+                        level = logging.DEBUG
+                    LOG.log(level,
+                            "Exception: %s. Additional details: %s. "
+                            "Time elapsed: %d. Timeout: %d",
+                            ex, details, elapsed, timeout)
+
+                    time.sleep(retry_interval)
+                    elapsed = time.time() - tstart
+
+            msg = (
+                "Operation timed out. Exception: %s. Additional details: %s. "
+                "Time elapsed: %d. Timeout: %d.")
+            raise CephTestTimeout(
+                msg % (exc, details, elapsed, timeout))
+        return inner
+    return wrapper
+
+
 def execute(*args, **kwargs):
     LOG.debug("Executing: %s", args)
     result = subprocess.run(
@@ -234,10 +283,13 @@ class RbdImage(object):
     def get_disk_number(self,
                         timeout: int = 60,
                         retry_interval: int = 2):
-        tstart: float = time.time()
-        elapsed: float = 0
-        LOG.info("Retrieving disk number: %s", self.name)
-        while elapsed < timeout or not timeout:
+        @retry_decorator(
+            retried_exceptions=CephTestException,
+            timeout=timeout,
+            retry_interval=retry_interval)
+        def _get_disk_number():
+            LOG.info("Retrieving disk number: %s", self.name)
+
             result = execute("rbd-wnbd", "show", self.name, "--format=json")
             disk_info = json.loads(result.stdout)
             disk_number = disk_info["disk_number"]
@@ -245,60 +297,35 @@ class RbdImage(object):
                 LOG.debug("Image %s disk number: %d", self.name, disk_number)
                 return disk_number
 
-            elapsed = time.time() - tstart
-            if elapsed > 10:
-                level = logging.WARNING
-            else:
-                level = logging.DEBUG
-            LOG.log(
-                level,
-                "Could not get disk number: %s. Time elapsed: %d. Timeout: %d",
-                self.name, elapsed, timeout)
-
-            time.sleep(retry_interval)
-            elapsed = time.time() - tstart
+            raise CephTestException(
+                f"Could not get disk number: {self.name}.")
 
-        raise CephTestException(
-            f"Could not get disk number for {self.name}. "
-            f"Time elapsed: {elapsed}. Timeout: {timeout}")
+        return _get_disk_number()
 
     @Tracer.trace
     def _wait_for_disk(self,
                        timeout: int = 60,
                        retry_interval: int = 2):
-        tstart: float = time.time()
-        elapsed: float = 0
-        LOG.debug("Waiting for disk to be accessible: %s %s",
-                  self.name, self.path)
-        while elapsed < timeout or not timeout:
-            try:
-                with open(self.path, 'rb') as _:
-                    return
-            except FileNotFoundError:
+        @retry_decorator(
+            retried_exceptions=(FileNotFoundError, OSError),
+            additional_details="the mapped disk isn't available yet",
+            timeout=timeout,
+            retry_interval=retry_interval)
+        def wait_for_disk():
+            LOG.debug("Waiting for disk to be accessible: %s %s",
+                      self.name, self.path)
+
+            with open(self.path, 'rb'):
                 pass
 
-            elapsed = time.time() - tstart
-            if elapsed > 10:
-                level = logging.WARNING
-            else:
-                level = logging.DEBUG
-            LOG.log(level,
-                    "The mapped disk isn't accessible yet: %s %s. "
-                    "Time elapsed: %d. Timeout: %d",
-                    self.name, self.path, elapsed, timeout)
-
-            time.sleep(retry_interval)
-            elapsed = time.time() - tstart
-
-        raise CephTestException(
-            f"The mapped disk isn't accessible yet: {self.name} {self.path}. "
-            f"Time elapsed: {elapsed}. Timeout: {timeout}")
+        return wait_for_disk()
 
     @property
     def path(self):
         return f"\\\\.\\PhysicalDrive{self.disk_number}"
 
     @Tracer.trace
+    @retry_decorator(additional_details="couldn't clear disk read-only flag")
     def set_writable(self):
         execute(
             "powershell.exe", "-command",
@@ -306,6 +333,7 @@ class RbdImage(object):
             "-IsReadOnly", "$false")
 
     @Tracer.trace
+    @retry_decorator(additional_details="couldn't bring the disk online")
     def set_online(self):
         execute(
             "powershell.exe", "-command",