--- /dev/null
+#!/usr/bin/env bash
+set -ex
+
+WINDOWS_SERVER_2019_ISO_URL=${WINDOWS_SERVER_2019_ISO_URL:-"https://software-download.microsoft.com/download/pr/17763.737.190906-2324.rs5_release_svc_refresh_SERVER_EVAL_x64FRE_en-us_1.iso"}
+VIRTIO_WIN_ISO_URL=${VIRTIO_WIN_ISO_URL:-"https://fedorapeople.org/groups/virt/virtio-win/direct-downloads/stable-virtio/virtio-win.iso"}
+
+DIR="$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)"
+
+# Use build_utils.sh from ceph-build
+curl --retry-max-time 30 --retry 10 -L -o ${DIR}/build_utils.sh https://raw.githubusercontent.com/ceph/ceph-build/master/scripts/build_utils.sh
+source ${DIR}/build_utils.sh
+
+# Helper function to restart the Windows VM
+function restart_windows_vm() {
+ echo "Restarting Windows VM"
+ ssh_exec "cmd.exe /c 'shutdown.exe /r /t 0 & sc.exe stop sshd'"
+ SECONDS=0
+ TIMEOUT=${1:-600}
+ while true; do
+ if [[ $SECONDS -gt $TIMEOUT ]]; then
+ echo "Timeout waiting for the VM to start"
+ exit 1
+ fi
+ ssh_exec hostname || {
+ echo "Cannot execute SSH commands yet"
+ sleep 10
+ continue
+ }
+ break
+ done
+ echo "Windows VM restarted"
+}
+
+# Install libvirt with KVM
+retrycmd_if_failure 5 0 5m sudo apt-get update
+retrycmd_if_failure 5 0 10m sudo apt-get install -y qemu-kvm libvirt-daemon-system libvirt-clients virtinst
+
+# Download ISO images
+echo "Downloading virtio-win ISO"
+retrycmd_if_failure 5 0 30m curl -C - -L $VIRTIO_WIN_ISO_URL -o ${DIR}/virtio-win.iso
+echo "Downloading Windows Server 2019 ISO"
+retrycmd_if_failure 5 0 60m curl -C - -L $WINDOWS_SERVER_2019_ISO_URL -o ${DIR}/windows-server-2019.iso
+
+# Create virtual floppy image with the unattended instructions to install Windows Server 2019
+echo "Creating floppy image"
+qemu-img create -f raw ${DIR}/floppy.img 1440k
+mkfs.msdos -s 1 ${DIR}/floppy.img
+mkdir ${DIR}/floppy
+sudo mount ${DIR}/floppy.img ${DIR}/floppy
+ssh-keygen -b 2048 -t rsa -f ${DIR}/id_rsa -q -N ""
+sudo cp \
+ ${DIR}/autounattend.xml \
+ ${DIR}/first-logon.ps1 \
+ ${DIR}/id_rsa.pub \
+ ${DIR}/utils.ps1 \
+ ${DIR}/setup.ps1 \
+ ${DIR}/floppy/
+sudo umount ${DIR}/floppy
+rmdir ${DIR}/floppy
+
+echo "Starting libvirt VM"
+qemu-img create -f qcow2 ${DIR}/ceph-win-ltsc2019.qcow2 50G
+VM_NAME="ceph-win-ltsc2019"
+sudo virt-install \
+ --name $VM_NAME \
+ --os-variant win2k19 \
+ --boot hd,cdrom \
+ --virt-type kvm \
+ --graphics spice \
+ --cpu host \
+ --vcpus 4 \
+ --memory 4096 \
+ --disk ${DIR}/floppy.img,device=floppy \
+ --disk ${DIR}/ceph-win-ltsc2019.qcow2,bus=virtio \
+ --disk ${DIR}/windows-server-2019.iso,device=cdrom \
+ --disk ${DIR}/virtio-win.iso,device=cdrom \
+ --network network=default,model=virtio \
+ --controller type=virtio-serial \
+ --channel unix,target_type=virtio,name=org.qemu.guest_agent.0 \
+ --noautoconsol
+
+export SSH_USER="administrator"
+export SSH_KNOWN_HOSTS_FILE="${DIR}/known_hosts"
+export SSH_KEY="${DIR}/id_rsa"
+
+SECONDS=0
+TIMEOUT=1800
+SLEEP_SECS=30
+while true; do
+ if [[ $SECONDS -gt $TIMEOUT ]]; then
+ echo "Timeout waiting for the VM to start"
+ exit 1
+ fi
+ VM_IP=$(sudo virsh domifaddr --source agent --interface Ethernet --full $VM_NAME | grep ipv4 | awk '{print $4}' | cut -d '/' -f1) || {
+ echo "Retrying in $SLEEP_SECS seconds"
+ sleep $SLEEP_SECS
+ continue
+ }
+ ssh-keyscan -H $VM_IP &> $SSH_KNOWN_HOSTS_FILE || {
+ echo "SSH is not reachable yet"
+ sleep $SLEEP_SECS
+ continue
+ }
+ SSH_ADDRESS=$VM_IP ssh_exec hostname || {
+ echo "Cannot execute SSH commands yet"
+ sleep $SLEEP_SECS
+ continue
+ }
+ break
+done
+export SSH_ADDRESS=$VM_IP
+
+scp_upload ${DIR}/utils.ps1 /utils.ps1
+scp_upload ${DIR}/setup.ps1 /setup.ps1
+SSH_TIMEOUT=1h ssh_exec /setup.ps1
+
+cd $DIR
+
+# Get the helper script to download Chacra builds
+retrycmd_if_failure 10 5 1m curl -L -o ./get-chacra-bin.py https://raw.githubusercontent.com/ceph/ceph-win32-tests/master/get-bin.py
+chmod +x ./get-chacra-bin.py
+
+# Download latest WNBD build from Chacra
+retrycmd_if_failure 10 0 10m ./get-chacra-bin.py --project wnbd --filename wnbd.zip
+scp_upload wnbd.zip /wnbd.zip
+ssh_exec tar.exe xzvf /wnbd.zip -C /
+
+# Install WNBD driver
+ssh_exec Import-Certificate -FilePath /wnbd/driver/wnbd.cer -Cert Cert:\\LocalMachine\\Root
+ssh_exec Import-Certificate -FilePath /wnbd/driver/wnbd.cer -Cert Cert:\\LocalMachine\\TrustedPublisher
+ssh_exec /wnbd/binaries/wnbd-client.exe install-driver /wnbd/driver/wnbd.inf
+restart_windows_vm
+ssh_exec wnbd-client.exe -v
+
+# Download Ceph Windows build from Chacra
+CEPH_REPO_FILE="/etc/apt/sources.list.d/ceph.list"
+PROJECT=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $4}')
+BRANCH=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $5}')
+SHA1=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $6}')
+retrycmd_if_failure 10 0 10m ./get-chacra-bin.py --project $PROJECT --branchname $BRANCH --sha1 $SHA1 --filename ceph.zip
+
+# Install Ceph on Windows
+SSH_TIMEOUT=5m scp_upload ./ceph.zip /ceph.zip
+SSH_TIMEOUT=10m ssh_exec tar.exe xzvf /ceph.zip -C /
+ssh_exec "New-Service -Name ceph-rbd -BinaryPathName 'c:\ceph\rbd-wnbd.exe service'"
+ssh_exec Start-Service -Name ceph-rbd
+ssh_exec rbd.exe -v
+
+# Setup Ceph configs and directories
+ssh_exec mkdir -force /etc/ceph, /var/run/ceph, /var/log/ceph
+for i in $(ls /etc/ceph); do
+ scp_upload /etc/ceph/$i /etc/ceph/$i
+done
+
+cat << EOF > ${DIR}/connection_info.sh
+export SSH_USER="${SSH_USER}"
+export SSH_KNOWN_HOSTS_FILE="${SSH_KNOWN_HOSTS_FILE}"
+export SSH_KEY="${SSH_KEY}"
+export SSH_ADDRESS="${SSH_ADDRESS}"
+EOF
+
+echo "Windows Server 2019 libvirt testing VM is ready"
--- /dev/null
+function Invoke-CommandLine {
+ Param(
+ [Parameter(Mandatory=$true)]
+ [String]$Command,
+ [String]$Arguments,
+ [Int[]]$AllowedExitCodes=@(0)
+ )
+ & $Command $Arguments.Split(" ")
+ if($LASTEXITCODE -notin $AllowedExitCodes) {
+ Throw "$Command $Arguments returned a non zero exit code ${LASTEXITCODE}."
+ }
+}
+
+function Start-ExecuteWithRetry {
+ Param(
+ [Parameter(Mandatory=$true)]
+ [ScriptBlock]$ScriptBlock,
+ [Int]$MaxRetryCount=10,
+ [Int]$RetryInterval=3,
+ [String]$RetryMessage,
+ [Array]$ArgumentList=@()
+ )
+ $currentErrorActionPreference = $ErrorActionPreference
+ $ErrorActionPreference = "Continue"
+ $retryCount = 0
+ while ($true) {
+ try {
+ $res = Invoke-Command -ScriptBlock $ScriptBlock -ArgumentList $ArgumentList
+ $ErrorActionPreference = $currentErrorActionPreference
+ return $res
+ } catch [System.Exception] {
+ $retryCount++
+ if ($retryCount -gt $MaxRetryCount) {
+ $ErrorActionPreference = $currentErrorActionPreference
+ Throw $_
+ } else {
+ $prefixMsg = "Retry(${retryCount}/${MaxRetryCount})"
+ if($RetryMessage) {
+ Write-Host "${prefixMsg} - $RetryMessage"
+ } elseif($_) {
+ Write-Host "${prefixMsg} - $($_.ToString())"
+ }
+ Start-Sleep $RetryInterval
+ }
+ }
+ }
+}
+
+function Start-FileDownload {
+ Param(
+ [Parameter(Mandatory=$true)]
+ [String]$URL,
+ [Parameter(Mandatory=$true)]
+ [String]$Destination,
+ [Int]$RetryCount=10
+ )
+ Write-Output "Downloading $URL to $Destination"
+ Start-ExecuteWithRetry `
+ -ScriptBlock { Invoke-CommandLine -Command "curl.exe" -Arguments "-L -s -o $Destination $URL" } `
+ -MaxRetryCount $RetryCount `
+ -RetryMessage "Failed to download '${URL}'. Retrying"
+ Write-Output "Successfully downloaded."
+}
+
+function Add-ToPathEnvVar {
+ Param(
+ [Parameter(Mandatory=$true)]
+ [String[]]$Path,
+ [Parameter(Mandatory=$false)]
+ [ValidateSet([System.EnvironmentVariableTarget]::User, [System.EnvironmentVariableTarget]::Machine)]
+ [System.EnvironmentVariableTarget]$Target=[System.EnvironmentVariableTarget]::Machine
+ )
+ $pathEnvVar = [Environment]::GetEnvironmentVariable("PATH", $Target).Split(';')
+ $currentSessionPath = $env:PATH.Split(';')
+ foreach($p in $Path) {
+ if($p -notin $pathEnvVar) {
+ $pathEnvVar += $p
+ }
+ if($p -notin $currentSessionPath) {
+ $currentSessionPath += $p
+ }
+ }
+ $env:PATH = $currentSessionPath -join ';'
+ $newPathEnvVar = $pathEnvVar -join ';'
+ [Environment]::SetEnvironmentVariable("PATH", $newPathEnvVar, $Target)
+}
+
+function Install-Tool {
+ [CmdletBinding(DefaultParameterSetName = "URL")]
+ Param(
+ [Parameter(Mandatory=$true, ParameterSetName = "URL")]
+ [String]$URL,
+ [Parameter(Mandatory=$true, ParameterSetName = "LocalPath")]
+ [String]$LocalPath,
+ [Parameter(ParameterSetName = "URL")]
+ [Parameter(ParameterSetName = "LocalPath")]
+ [String[]]$Params=@(),
+ [Parameter(ParameterSetName = "URL")]
+ [Parameter(ParameterSetName = "LocalPath")]
+ [Int[]]$AllowedExitCodes=@(0)
+ )
+ PROCESS {
+ $installerPath = $LocalPath
+ if($PSCmdlet.ParameterSetName -eq "URL") {
+ $installerPath = Join-Path $env:TEMP $URL.Split('/')[-1]
+ Start-FileDownload -URL $URL -Destination $installerPath
+ }
+ Write-Output "Installing ${installerPath}"
+ $kwargs = @{
+ "FilePath" = $installerPath
+ "ArgumentList" = $Params
+ "NoNewWindow" = $true
+ "PassThru" = $true
+ "Wait" = $true
+ }
+ if((Get-ChildItem $installerPath).Extension -eq '.msi') {
+ $kwargs["FilePath"] = "msiexec.exe"
+ $kwargs["ArgumentList"] = @("/i", $installerPath) + $Params
+ }
+ $p = Start-Process @kwargs
+ if($p.ExitCode -notin $AllowedExitCodes) {
+ Throw "Installation failed. Exit code: $($p.ExitCode)"
+ }
+ if($PSCmdlet.ParameterSetName -eq "URL") {
+ Start-ExecuteWithRetry `
+ -ScriptBlock { Remove-Item -Force -Path $installerPath -ErrorAction Stop } `
+ -RetryMessage "Failed to remove ${installerPath}. Retrying"
+ }
+ }
+}
--- /dev/null
+import argparse
+import collections
+import json
+import logging
+import math
+import prettytable
+import random
+import subprocess
+import time
+import threading
+import typing
+import uuid
+from concurrent import futures
+
+LOG = logging.getLogger()
+
+parser = argparse.ArgumentParser(description='rbd-wnbd tests')
+parser.add_argument('--test-name',
+ help='The test to be run.',
+ default="RbdFioTest")
+parser.add_argument('--iterations',
+ help='Total number of test iterations',
+ default=1, type=int)
+parser.add_argument('--concurrency',
+ help='The number of tests to run in parallel',
+ default=4, type=int)
+parser.add_argument('--fio-iterations',
+ help='Total number of benchmark iterations per disk.',
+ default=1, type=int)
+parser.add_argument('--fio-workers',
+ help='Total number of fio workers per disk.',
+ default=1, type=int)
+parser.add_argument('--fio-depth',
+ help='The number of concurrent asynchronous operations '
+ 'executed per disk',
+ default=64, type=int)
+parser.add_argument('--bs',
+ help='Benchmark block size.',
+ default="2M")
+parser.add_argument('--op',
+ help='Benchmark operation.',
+ default="read")
+parser.add_argument('--image-prefix',
+ help='The image name prefix.',
+ default="cephTest-")
+parser.add_argument('--image-size-mb',
+ help='The image size in megabytes.',
+ default=1024, type=int)
+parser.add_argument('--map-timeout',
+ help='Image map timeout.',
+ default=60, type=int)
+parser.add_argument('--verbose', action='store_true',
+ help='Print info messages.')
+parser.add_argument('--debug', action='store_true',
+ help='Print debug messages.')
+parser.add_argument('--stop-on-error', action='store_true',
+ help='Stop testing when hitting errors.')
+parser.add_argument('--skip-cleanup-on-error', action='store_true',
+ help='Skip cleanup when hitting errors.')
+
+
+class CephTestException(Exception):
+ msg_fmt = "An exception has been encountered."
+
+ def __init__(self, message: str = None, **kwargs):
+ self.kwargs = kwargs
+ if not message:
+ message = self.msg_fmt % kwargs
+ self.message = message
+ super(CephTestException, self).__init__(message)
+
+
+class CommandFailed(CephTestException):
+ msg_fmt = (
+ "Command failed: %(command)s. "
+ "Return code: %(returncode)s. "
+ "Stdout: %(stdout)s. Stderr: %(stderr)s.")
+
+
+def setup_logging(log_level: int = logging.INFO):
+ handler = logging.StreamHandler()
+ handler.setLevel(log_level)
+
+ log_fmt = '[%(asctime)s] %(levelname)s - %(message)s'
+ formatter = logging.Formatter(log_fmt)
+ handler.setFormatter(formatter)
+
+ LOG.addHandler(handler)
+ LOG.setLevel(logging.DEBUG)
+
+
+def execute(*args, **kwargs):
+ LOG.debug("Executing: %s", args)
+ result = subprocess.run(
+ args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ **kwargs)
+ LOG.debug("Command %s returned %d.", args, result.returncode)
+ if result.returncode:
+ exc = CommandFailed(
+ command=args, returncode=result.returncode,
+ stdout=result.stdout, stderr=result.stderr)
+ LOG.error(exc)
+ raise exc
+ return result
+
+
+def array_stats(array: list):
+ mean = sum(array) / len(array) if len(array) else 0
+ variance = (sum((i - mean) ** 2 for i in array) / len(array)
+ if len(array) else 0)
+ std_dev = math.sqrt(variance)
+ sorted_array = sorted(array)
+
+ return {
+ 'min': min(array) if len(array) else 0,
+ 'max': max(array) if len(array) else 0,
+ 'sum': sum(array) if len(array) else 0,
+ 'mean': mean,
+ 'median': sorted_array[len(array) // 2] if len(array) else 0,
+ 'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0,
+ 'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0,
+ 'variance': variance,
+ 'std_dev': std_dev,
+ 'count': len(array)
+ }
+
+
+class Tracer:
+ data: collections.OrderedDict = collections.OrderedDict()
+ lock = threading.Lock()
+
+ @classmethod
+ def trace(cls, func):
+ def wrapper(*args, **kwargs):
+ tstart = time.time()
+ exc_str = None
+
+ # Preserve call order
+ with cls.lock:
+ if func.__qualname__ not in cls.data:
+ cls.data[func.__qualname__] = list()
+
+ try:
+ return func(*args, **kwargs)
+ except Exception as exc:
+ exc_str = str(exc)
+ raise
+ finally:
+ tend = time.time()
+
+ with cls.lock:
+ cls.data[func.__qualname__] += [{
+ "duration": tend - tstart,
+ "error": exc_str,
+ }]
+
+ return wrapper
+
+ @classmethod
+ def get_results(cls):
+ stats = collections.OrderedDict()
+ for f in cls.data.keys():
+ stats[f] = array_stats([i['duration'] for i in cls.data[f]])
+ errors = []
+ for i in cls.data[f]:
+ if i['error']:
+ errors.append(i['error'])
+
+ stats[f]['errors'] = errors
+ return stats
+
+ @classmethod
+ def print_results(cls):
+ r = cls.get_results()
+
+ table = prettytable.PrettyTable(title="Duration (s)")
+ table.field_names = [
+ "function", "min", "max", "total",
+ "mean", "median", "std_dev",
+ "max 90%", "min 90%", "count", "errors"]
+ table.float_format = ".4"
+ for f, s in r.items():
+ table.add_row([f, s['min'], s['max'], s['sum'],
+ s['mean'], s['median'], s['std_dev'],
+ s['max_90'], s['min_90'],
+ s['count'], len(s['errors'])])
+ print(table)
+
+
+class RbdImage(object):
+ def __init__(self,
+ name: str,
+ size_mb: int,
+ is_shared: bool = True,
+ disk_number: int = -1,
+ mapped: bool = False):
+ self.name = name
+ self.size_mb = size_mb
+ self.is_shared = is_shared
+ self.disk_number = disk_number
+ self.mapped = mapped
+ self.removed = False
+
+ @classmethod
+ @Tracer.trace
+ def create(cls,
+ name: str,
+ size_mb: int = 1024,
+ is_shared: bool = True):
+ LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb)
+ cmd = ["rbd", "create", name, "--size", "%sM" % size_mb]
+ if is_shared:
+ cmd += ["--image-shared"]
+ execute(*cmd)
+
+ return RbdImage(name, size_mb, is_shared)
+
+ @Tracer.trace
+ def get_disk_number(self,
+ timeout: int = 60,
+ retry_interval: int = 2):
+ tstart: float = time.time()
+ elapsed: float = 0
+ LOG.info("Retrieving disk number: %s", self.name)
+ while elapsed < timeout or not timeout:
+ result = execute("rbd-wnbd", "show", self.name, "--format=json")
+ disk_info = json.loads(result.stdout)
+ disk_number = disk_info["disk_number"]
+ if disk_number > 0:
+ LOG.debug("Image %s disk number: %d", self.name, disk_number)
+ return disk_number
+
+ elapsed = time.time() - tstart
+ if elapsed > 10:
+ level = logging.WARNING
+ else:
+ level = logging.DEBUG
+ LOG.log(
+ level,
+ "Could not get disk number: %s. Time elapsed: %d. Timeout: %d",
+ self.name, elapsed, timeout)
+
+ time.sleep(retry_interval)
+ elapsed = time.time() - tstart
+
+ raise CephTestException(
+ f"Could not get disk number for {self.name}. "
+ f"Time elapsed: {elapsed}. Timeout: {timeout}")
+
+ @Tracer.trace
+ def _wait_for_disk(self,
+ timeout: int = 60,
+ retry_interval: int = 2):
+ tstart: float = time.time()
+ elapsed: float = 0
+ LOG.debug("Waiting for disk to be accessible: %s %s",
+ self.name, self.path)
+ while elapsed < timeout or not timeout:
+ try:
+ with open(self.path, 'rb') as _:
+ return
+ except FileNotFoundError:
+ pass
+
+ elapsed = time.time() - tstart
+ if elapsed > 10:
+ level = logging.WARNING
+ else:
+ level = logging.DEBUG
+ LOG.log(level,
+ "The mapped disk isn't accessible yet: %s %s. "
+ "Time elapsed: %d. Timeout: %d",
+ self.name, self.path, elapsed, timeout)
+
+ time.sleep(retry_interval)
+ elapsed = time.time() - tstart
+
+ raise CephTestException(
+ f"The mapped disk isn't accessible yet: {self.name} {self.path}. "
+ f"Time elapsed: {elapsed}. Timeout: {timeout}")
+
+ @property
+ def path(self):
+ return f"\\\\.\\PhysicalDrive{self.disk_number}"
+
+ @Tracer.trace
+ def map(self, timeout: int = 60):
+ LOG.info("Mapping image: %s", self.name)
+ tstart = time.time()
+
+ execute("rbd-wnbd", "map", self.name)
+ self.mapped = True
+
+ self.disk_number = self.get_disk_number(timeout=timeout)
+
+ elapsed = time.time() - tstart
+ self._wait_for_disk(timeout=timeout - elapsed, )
+
+ @Tracer.trace
+ def unmap(self):
+ if self.mapped:
+ LOG.info("Unmapping image: %s", self.name)
+ execute("rbd-wnbd", "unmap", self.name)
+ self.mapped = False
+
+ @Tracer.trace
+ def remove(self):
+ if not self.removed:
+ LOG.info("Removing image: %s", self.name)
+ execute("rbd", "rm", self.name)
+ self.removed = True
+
+ def cleanup(self):
+ try:
+ self.unmap()
+ finally:
+ self.remove()
+
+
+class RbdTest(object):
+ image: RbdImage
+
+ def __init__(self,
+ image_prefix: str = "cephTest-",
+ image_size_mb: int = 1024,
+ map_timeout: int = 60,
+ **kwargs):
+ self.image_size_mb = image_size_mb
+ self.image_name = image_prefix + str(uuid.uuid4())
+ self.map_timeout = map_timeout
+
+ @Tracer.trace
+ def initialize(self):
+ self.image = RbdImage.create(
+ self.image_name,
+ self.image_size_mb)
+ self.image.map(timeout=self.map_timeout)
+
+ def run(self):
+ pass
+
+ def cleanup(self):
+ if self.image:
+ self.image.cleanup()
+
+ @classmethod
+ def print_results(cls,
+ title: str = "Test results",
+ description: str = None):
+ pass
+
+
+class RbdFioTest(RbdTest):
+ data: typing.List[typing.Dict[str, str]] = []
+ lock = threading.Lock()
+
+ def __init__(self,
+ *args,
+ fio_size_mb: int = None,
+ iterations: int = 1,
+ workers: int = 1,
+ bs: str = "2M",
+ iodepth: int = 64,
+ op: str = "read",
+ **kwargs):
+
+ super(RbdFioTest, self).__init__(*args, **kwargs)
+
+ self.fio_size_mb = fio_size_mb or self.image_size_mb
+ self.iterations = iterations
+ self.workers = workers
+ self.bs = bs
+ self.iodepth = iodepth
+ self.op = op
+
+ def process_result(self, raw_fio_output: str):
+ result = json.loads(raw_fio_output)
+ with self.lock:
+ for job in result["jobs"]:
+ self.data.append({
+ 'error': job['error'],
+ 'io_bytes': job[self.op]['io_bytes'],
+ 'bw_bytes': job[self.op]['bw_bytes'],
+ 'runtime': job[self.op]['runtime'] / 1000, # seconds
+ 'total_ios': job[self.op]['short_ios'],
+ 'short_ios': job[self.op]['short_ios'],
+ 'dropped_ios': job[self.op]['short_ios'],
+ })
+
+ @Tracer.trace
+ def run(self):
+ LOG.info("Starting FIO test.")
+ cmd = [
+ "fio", "--thread", "--output-format=json",
+ "--randrepeat=%d" % self.iterations,
+ "--direct=1", "--gtod_reduce=1", "--name=test",
+ "--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth,
+ "--size=%sM" % self.fio_size_mb,
+ "--readwrite=%s" % self.op,
+ "--numjobs=%s" % self.workers,
+ "--filename=%s" % self.image.path,
+ ]
+ result = execute(*cmd)
+ LOG.info("Completed FIO test.")
+ self.process_result(result.stdout)
+
+ @classmethod
+ def print_results(cls,
+ title: str = "Benchmark results",
+ description: str = None):
+ if description:
+ title = "%s (%s)" % (title, description)
+ table = prettytable.PrettyTable(title=title)
+ table.field_names = ["stat", "min", "max", "mean",
+ "median", "std_dev",
+ "max 90%", "min 90%", "total"]
+ table.float_format = ".4"
+
+ s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in cls.data])
+ table.add_row(["bandwidth (MB/s)",
+ s['min'], s['max'], s['mean'],
+ s['median'], s['std_dev'],
+ s['max_90'], s['min_90'], 'N/A'])
+
+ s = array_stats([float(i["runtime"]) / 1000 for i in cls.data])
+ table.add_row(["duration (s)",
+ s['min'], s['max'], s['mean'],
+ s['median'], s['std_dev'],
+ s['max_90'], s['min_90'], s['sum']])
+
+ s = array_stats([i["error"] for i in cls.data])
+ table.add_row(["errors",
+ s['min'], s['max'], s['mean'],
+ s['median'], s['std_dev'],
+ s['max_90'], s['min_90'], s['sum']])
+
+ s = array_stats([i["short_ios"] for i in cls.data])
+ table.add_row(["incomplete IOs",
+ s['min'], s['max'], s['mean'],
+ s['median'], s['std_dev'],
+ s['max_90'], s['min_90'], s['sum']])
+
+ s = array_stats([i["dropped_ios"] for i in cls.data])
+ table.add_row(["dropped IOs",
+ s['min'], s['max'], s['mean'],
+ s['median'], s['std_dev'],
+ s['max_90'], s['min_90'], s['sum']])
+ print(table)
+
+
+class RbdStampTest(RbdTest):
+ @staticmethod
+ def _rand_float(min_val: float, max_val: float):
+ return min_val + (random.random() * max_val - min_val)
+
+ def _get_stamp(self):
+ buff = self.image_name.encode()
+ padding = 512 - len(buff)
+ buff += b'\0' * padding
+ return buff
+
+ @Tracer.trace
+ def _write_stamp(self):
+ with open(self.image.path, 'rb+') as disk:
+ stamp = self._get_stamp()
+ disk.write(stamp)
+
+ @Tracer.trace
+ def _read_stamp(self):
+ with open(self.image.path, 'rb') as disk:
+ return disk.read(len(self._get_stamp()))
+
+ @Tracer.trace
+ def run(self):
+ # Wait up to 5 seconds and then check the disk,
+ # ensuring that nobody else wrote to it.
+ time.sleep(self._rand_float(0, 5))
+ stamp = self._read_stamp()
+ assert(stamp == b'\0' * len(self._get_stamp()))
+
+ self._write_stamp()
+
+ stamp = self._read_stamp()
+ assert(stamp == self._get_stamp())
+
+
+class TestRunner(object):
+ def __init__(self,
+ test_cls: typing.Type[RbdTest],
+ test_params: dict = {},
+ iterations: int = 1,
+ workers: int = 1,
+ stop_on_error: bool = False,
+ cleanup_on_error: bool = True):
+ self.test_cls = test_cls
+ self.test_params = test_params
+ self.iterations = iterations
+ self.workers = workers
+ self.executor = futures.ThreadPoolExecutor(max_workers=workers)
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.errors = 0
+ self.stopped = False
+ self.stop_on_error = stop_on_error
+ self.cleanup_on_error = cleanup_on_error
+
+ @Tracer.trace
+ def run(self):
+ tasks = []
+ for i in range(self.iterations):
+ task = self.executor.submit(self.run_single_test)
+ tasks.append(task)
+
+ LOG.info("Waiting for %d tests to complete.", self.iterations)
+ for task in tasks:
+ task.result()
+
+ def run_single_test(self):
+ failed = False
+ if self.stopped:
+ return
+
+ try:
+ test = self.test_cls(**self.test_params)
+ test.initialize()
+ test.run()
+ except KeyboardInterrupt:
+ LOG.warning("Received Ctrl-C.")
+ self.stopped = True
+ except Exception as ex:
+ failed = True
+ if self.stop_on_error:
+ self.stopped = True
+ with self.lock:
+ self.errors += 1
+ LOG.exception(
+ "Test exception: %s. Total exceptions: %d",
+ ex, self.errors)
+ finally:
+ if not failed or self.cleanup_on_error:
+ try:
+ test.cleanup()
+ except KeyboardInterrupt:
+ LOG.warning("Received Ctrl-C.")
+ self.stopped = True
+ # Retry the cleanup
+ test.cleanup()
+ except Exception:
+ LOG.exception("Test cleanup failed.")
+
+ with self.lock:
+ self.completed += 1
+ LOG.info("Completed tests: %d. Pending: %d",
+ self.completed, self.iterations - self.completed)
+
+
+TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
+ 'RbdTest': RbdTest,
+ 'RbdFioTest': RbdFioTest,
+ 'RbdStampTest': RbdStampTest
+}
+
+if __name__ == '__main__':
+ args = parser.parse_args()
+
+ log_level = logging.WARNING
+ if args.verbose:
+ log_level = logging.INFO
+ if args.debug:
+ log_level = logging.DEBUG
+ setup_logging(log_level)
+
+ test_params = dict(
+ image_size_mb=args.image_size_mb,
+ image_prefix=args.image_prefix,
+ bs=args.bs,
+ op=args.op,
+ iodepth=args.fio_depth,
+ map_timeout=args.map_timeout
+ )
+
+ try:
+ test_cls = TESTS[args.test_name]
+ except KeyError:
+ raise CephTestException("Unkown test: {}".format(args.test_name))
+
+ runner = TestRunner(
+ test_cls,
+ test_params=test_params,
+ iterations=args.iterations,
+ workers=args.concurrency,
+ stop_on_error=args.stop_on_error,
+ cleanup_on_error=not args.skip_cleanup_on_error)
+ runner.run()
+
+ Tracer.print_results()
+ test_cls.print_results(
+ description="count: %d, concurrency: %d" %
+ (args.iterations, args.concurrency))
+
+ assert runner.errors == 0, f"encountered {runner.errors} error(s)."