import argparse
import collections
+import functools
import json
import logging
import math
"Stdout: %(stdout)s. Stderr: %(stderr)s.")
+class CephTestTimeout(CephTestException):
+ msg_fmt = "Operation timeout."
+
+
def setup_logging(log_level: int = logging.INFO):
handler = logging.StreamHandler()
handler.setLevel(log_level)
LOG.setLevel(logging.DEBUG)
+def retry_decorator(timeout: int = 60,
+ retry_interval: int = 2,
+ silent_interval: int = 10,
+ additional_details: str = "",
+ retried_exceptions:
+ typing.Union[
+ typing.Type[Exception],
+ collections.abc.Iterable[
+ typing.Type[Exception]]] = Exception):
+ def wrapper(f: typing.Callable[..., typing.Any]):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ tstart: float = time.time()
+ elapsed: float = 0
+ exc = None
+ details = additional_details or "%s failed" % f.__qualname__
+
+ while elapsed < timeout or not timeout:
+ try:
+ return f(*args, **kwargs)
+ except retried_exceptions as ex:
+ exc = ex
+ elapsed = time.time() - tstart
+ if elapsed > silent_interval:
+ level = logging.WARNING
+ else:
+ level = logging.DEBUG
+ LOG.log(level,
+ "Exception: %s. Additional details: %s. "
+ "Time elapsed: %d. Timeout: %d",
+ ex, details, elapsed, timeout)
+
+ time.sleep(retry_interval)
+ elapsed = time.time() - tstart
+
+ msg = (
+ "Operation timed out. Exception: %s. Additional details: %s. "
+ "Time elapsed: %d. Timeout: %d.")
+ raise CephTestTimeout(
+ msg % (exc, details, elapsed, timeout))
+ return inner
+ return wrapper
+
+
def execute(*args, **kwargs):
LOG.debug("Executing: %s", args)
result = subprocess.run(
def get_disk_number(self,
timeout: int = 60,
retry_interval: int = 2):
- tstart: float = time.time()
- elapsed: float = 0
- LOG.info("Retrieving disk number: %s", self.name)
- while elapsed < timeout or not timeout:
+ @retry_decorator(
+ retried_exceptions=CephTestException,
+ timeout=timeout,
+ retry_interval=retry_interval)
+ def _get_disk_number():
+ LOG.info("Retrieving disk number: %s", self.name)
+
result = execute("rbd-wnbd", "show", self.name, "--format=json")
disk_info = json.loads(result.stdout)
disk_number = disk_info["disk_number"]
LOG.debug("Image %s disk number: %d", self.name, disk_number)
return disk_number
- elapsed = time.time() - tstart
- if elapsed > 10:
- level = logging.WARNING
- else:
- level = logging.DEBUG
- LOG.log(
- level,
- "Could not get disk number: %s. Time elapsed: %d. Timeout: %d",
- self.name, elapsed, timeout)
-
- time.sleep(retry_interval)
- elapsed = time.time() - tstart
+ raise CephTestException(
+ f"Could not get disk number: {self.name}.")
- raise CephTestException(
- f"Could not get disk number for {self.name}. "
- f"Time elapsed: {elapsed}. Timeout: {timeout}")
+ return _get_disk_number()
@Tracer.trace
def _wait_for_disk(self,
timeout: int = 60,
retry_interval: int = 2):
- tstart: float = time.time()
- elapsed: float = 0
- LOG.debug("Waiting for disk to be accessible: %s %s",
- self.name, self.path)
- while elapsed < timeout or not timeout:
- try:
- with open(self.path, 'rb') as _:
- return
- except FileNotFoundError:
+ @retry_decorator(
+ retried_exceptions=(FileNotFoundError, OSError),
+ additional_details="the mapped disk isn't available yet",
+ timeout=timeout,
+ retry_interval=retry_interval)
+ def wait_for_disk():
+ LOG.debug("Waiting for disk to be accessible: %s %s",
+ self.name, self.path)
+
+ with open(self.path, 'rb'):
pass
- elapsed = time.time() - tstart
- if elapsed > 10:
- level = logging.WARNING
- else:
- level = logging.DEBUG
- LOG.log(level,
- "The mapped disk isn't accessible yet: %s %s. "
- "Time elapsed: %d. Timeout: %d",
- self.name, self.path, elapsed, timeout)
-
- time.sleep(retry_interval)
- elapsed = time.time() - tstart
-
- raise CephTestException(
- f"The mapped disk isn't accessible yet: {self.name} {self.path}. "
- f"Time elapsed: {elapsed}. Timeout: {timeout}")
+ return wait_for_disk()
@property
def path(self):
return f"\\\\.\\PhysicalDrive{self.disk_number}"
@Tracer.trace
+ @retry_decorator(additional_details="couldn't clear disk read-only flag")
def set_writable(self):
execute(
"powershell.exe", "-command",
"-IsReadOnly", "$false")
@Tracer.trace
+ @retry_decorator(additional_details="couldn't bring the disk online")
def set_online(self):
execute(
"powershell.exe", "-command",