import json
import logging
import math
+import os
import prettytable
import random
import subprocess
self.disk_number = disk_number
self.mapped = mapped
self.removed = False
+ self.drive_letter = ""
@classmethod
@Tracer.trace
finally:
self.remove()
+ @Tracer.trace
+ def init_fs(self):
+ if not self.mapped:
+ raise CephTestException("Unable to create fs, image not mapped.")
+
+ cmd = ("powershell.exe", "-command",
+ f"Get-Disk -Number {self.disk_number} | "
+ "Initialize-Disk -PassThru | "
+ "New-Partition -AssignDriveLetter -UseMaximumSize | "
+ "Format-Volume -Force -Confirm:$false")
+ execute(*cmd)
+
+ # Retrieve the drive letter.
+ cmd = (
+ "powershell.exe", "-command",
+ f"(Get-Partition -DiskNumber {self.disk_number})[0].DriveLetter")
+ result = execute(*cmd)
+
+ self.drive_letter = result.stdout.decode().strip()
+ if len(self.drive_letter) != 1:
+ raise CephTestException(
+ "Invalid drive letter received: %s" % self.drive_letter)
+
+ @Tracer.trace
+ def get_fs_capacity(self):
+ if not self.drive_letter:
+ raise CephTestException("No drive letter available")
+
+ cmd = ("powershell.exe", "-command",
+ f"(Get-Volume -DriveLetter {self.drive_letter}).Size")
+ result = execute(*cmd)
+
+ return int(result.stdout.decode().strip())
+
class RbdTest(object):
image: RbdImage
- # Windows disks must be turned online before accessing partitions.
requires_disk_online = False
requires_disk_write = False
pass
+class RbdFsTestMixin(object):
+ # Windows disks must be turned online before accessing partitions.
+ requires_disk_online = True
+ requires_disk_write = True
+
+ @Tracer.trace
+ def initialize(self):
+ super(RbdFsTestMixin, self).initialize()
+
+ self.image.init_fs()
+
+ def get_subpath(self, *args):
+ drive_path = f"{self.image.drive_letter}:\\"
+ return os.path.join(drive_path, *args)
+
+
+class RbdFsTest(RbdFsTestMixin, RbdTest):
+ pass
+
+
class RbdFioTest(RbdTest):
data: typing.DefaultDict[str, typing.List[typing.Dict[str, str]]] = (
collections.defaultdict(list))
'dropped_ios': job[op]['short_ios'],
})
+ def _get_fio_path(self):
+ return self.image.path
+
@Tracer.trace
def run(self):
LOG.info("Starting FIO test.")
"--size=%sM" % self.fio_size_mb,
"--readwrite=%s" % self.op,
"--numjobs=%s" % self.workers,
- "--filename=%s" % self.image.path,
+ "--filename=%s" % self._get_fio_path(),
]
if self.verify:
cmd += ["--verify=%s" % self.verify]
print(table)
+class RbdFsFioTest(RbdFsTestMixin, RbdFioTest):
+ def initialize(self):
+ super(RbdFsFioTest, self).initialize()
+
+ if not self.fio_size_mb or self.fio_size_mb == self.image_size_mb:
+ # Out of caution, we'll use up to 80% of the FS by default
+ self.fio_size_mb = int(
+ self.image.get_fs_capacity() * 0.8 / (1024 * 1024))
+
+ @staticmethod
+ def _fio_escape_path(path):
+ # FIO allows specifying multiple files separated by colon.
+ # This means that ":" has to be escaped, so
+ # F:\filename becomes F\:\filename.
+ return path.replace(":", "\\:")
+
+ def _get_fio_path(self):
+ return self._fio_escape_path(self.get_subpath("test-fio"))
+
+
class RbdStampTest(RbdTest):
requires_disk_write = True
+ _write_open_mode = "rb+"
+ _read_open_mode = "rb"
+ _expect_path_exists = True
+
@staticmethod
def _rand_float(min_val: float, max_val: float):
return min_val + (random.random() * max_val - min_val)
buff += b'\0' * padding
return buff
+ def _get_stamp_path(self):
+ return self.image.path
+
@Tracer.trace
def _write_stamp(self):
- with open(self.image.path, 'rb+') as disk:
+ with open(self._get_stamp_path(), self._write_open_mode) as disk:
stamp = self._get_stamp()
disk.write(stamp)
@Tracer.trace
def _read_stamp(self):
- with open(self.image.path, 'rb') as disk:
+ with open(self._get_stamp_path(), self._read_open_mode) as disk:
return disk.read(len(self._get_stamp()))
@Tracer.trace
def run(self):
- # Wait up to 5 seconds and then check the disk,
- # ensuring that nobody else wrote to it.
- time.sleep(self._rand_float(0, 5))
- stamp = self._read_stamp()
- assert(stamp == b'\0' * len(self._get_stamp()))
+ if self._expect_path_exists:
+ # Wait up to 5 seconds and then check the disk, ensuring that
+ # nobody else wrote to it. This is particularly useful when
+ # running a high number of tests in parallel, ensuring that
+ # we aren't writing to the wrong disk.
+ time.sleep(self._rand_float(0, 5))
+
+ stamp = self._read_stamp()
+ assert(stamp == b'\0' * len(self._get_stamp()))
self._write_stamp()
assert(stamp == self._get_stamp())
+class RbdFsStampTest(RbdFsTestMixin, RbdStampTest):
+ _write_open_mode = "wb"
+ _expect_path_exists = False
+
+ def _get_stamp_path(self):
+ return self.get_subpath("test-stamp")
+
+
class TestRunner(object):
def __init__(self,
test_cls: typing.Type[RbdTest],
TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
'RbdTest': RbdTest,
'RbdFioTest': RbdFioTest,
- 'RbdStampTest': RbdStampTest
+ 'RbdStampTest': RbdStampTest,
+ # FS tests
+ 'RbdFsTest': RbdFsTest,
+ 'RbdFsFioTest': RbdFsFioTest,
+ 'RbdFsStampTest': RbdFsStampTest,
}
if __name__ == '__main__':