--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+use_shaman: True
+tasks:
+- install:
+ extra_packages:
+ - nvme-cli
+- cephadm:
+ watchdog_setup:
+- cephadm.shell:
+ host.a:
+ # get state before nvmeof deployment
+ - ceph orch status
+ - ceph orch ps
+ - ceph orch host ls
+ - ceph orch device ls
+ - ceph osd lspools
--- /dev/null
+.qa/distros/supported/centos_latest.yaml
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+roles:
+- - host.a
+ - mon.a
+ - mgr.x
+ - osd.0
+ - osd.1
+ - client.0
+ - ceph.nvmeof.nvmeof.a
+- - host.b
+ - mon.b
+ - mon.c
+ - osd.2
+ - osd.3
+ - osd.4
+ - client.1
+ - ceph.nvmeof.nvmeof.b
+- - client.2
+- - client.3
+
+overrides:
+ ceph:
+ conf:
+ mon:
+ # cephadm can take up to 5 minutes to bring up remaining mons
+ mon down mkfs grace: 300
--- /dev/null
+.qa/rbd/conf
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+tasks:
+- nvmeof:
+ client: client.0
+ gw_image: quay.io/ceph/nvmeof:1.2 # "default" is the image cephadm defaults to; change to test specific nvmeof images, example "latest"
+ rbd:
+ pool_name: mypool
+ image_name_prefix: myimage
+ gateway_config:
+ subsystems_count: 3
+ namespaces_count: 20
+ cli_image: quay.io/ceph/nvmeof-cli:1.2
+
+- cephadm.wait_for_service:
+ service: nvmeof.mypool
+
+- workunit:
+ no_coverage_and_limits: true
+ clients:
+ client.2:
+ - rbd/nvmeof_setup_subsystem.sh
+ env:
+ RBD_POOL: mypool
+ RBD_IMAGE_PREFIX: myimage
+
+- workunit:
+ no_coverage_and_limits: true
+ timeout: 30m
+ clients:
+ client.2:
+ - rbd/nvmeof_basic_tests.sh
+ - rbd/nvmeof_fio_test.sh --start_ns 1 --end_ns 30 --rbd_iostat
+ client.3:
+ - rbd/nvmeof_basic_tests.sh
+ - rbd/nvmeof_fio_test.sh --start_ns 31 --end_ns 60
+ env:
+ RBD_POOL: mypool
+ IOSTAT_INTERVAL: '10'
+ RUNTIME: '600'
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+use_shaman: True
+tasks:
+- install:
+ extra_packages:
+ - nvme-cli
+- cephadm:
+ watchdog_setup:
+- cephadm.shell:
+ host.a:
+ # get state before nvmeof deployment
+ - ceph orch status
+ - ceph orch ps
+ - ceph orch host ls
+ - ceph orch device ls
+ - ceph osd lspools
--- /dev/null
+.qa/distros/supported/centos_latest.yaml
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+roles:
+- - host.a
+ - mon.a
+ - mgr.x
+ - osd.0
+ - osd.1
+ - client.0
+ - ceph.nvmeof.nvmeof.a
+- - host.b
+ - mon.b
+ - osd.2
+ - osd.3
+ - osd.4
+ - client.1
+ - ceph.nvmeof.nvmeof.b
+- - host.c
+ - mon.c
+ - osd.5
+ - osd.6
+ - osd.7
+ - client.2
+ - ceph.nvmeof.nvmeof.c
+- - client.3 # initiator
+
+overrides:
+ ceph:
+ conf:
+ mon:
+ # cephadm can take up to 5 minutes to bring up remaining mons
+ mon down mkfs grace: 300
--- /dev/null
+.qa/rbd/conf
\ No newline at end of file
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+tasks:
+- nvmeof:
+ client: client.0
+ gw_image: quay.io/ceph/nvmeof:1.2 # "default" is the image cephadm defaults to; change to test specific nvmeof images, example "latest"
+ rbd:
+ pool_name: mypool
+ image_name_prefix: myimage
+ gateway_config:
+ subsystems_count: 3
+ namespaces_count: 20 # each subsystem
+ cli_image: quay.io/ceph/nvmeof-cli:1.2
+
+- cephadm.wait_for_service:
+ service: nvmeof.mypool
+
+- workunit:
+ no_coverage_and_limits: true
+ clients:
+ client.3:
+ - rbd/nvmeof_setup_subsystem.sh
+ - rbd/nvmeof_basic_tests.sh
+ env:
+ RBD_POOL: mypool
+ RBD_IMAGE_PREFIX: myimage
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+overrides:
+ ceph:
+ log-ignorelist:
+ # mon thrashing
+ - MON_DOWN
+ - mons down
+ - mon down
+ - out of quorum
+ # nvmeof daemon thrashing
+ - CEPHADM_FAILED_DAEMON
+ - is in error state
+ - failed cephadm daemon
+
+tasks:
+- nvmeof.thrash:
+ checker_host: 'client.3'
+ switch_thrashers: True
+
+- mon_thrash:
+ revive_delay: 60
+ thrash_delay: 60
+ thrash_many: true
+ switch_thrashers: True
+ logger: '[nvmeof.thrasher.mon_thrasher]'
--- /dev/null
+overrides:
+ ceph:
+ log-ignorelist:
+ # nvmeof daemon thrashing
+ - CEPHADM_FAILED_DAEMON
+ - is in error state
+ - failed cephadm daemon
+
+tasks:
+- nvmeof.thrash:
+ checker_host: 'client.3'
--- /dev/null
+../.qa/
\ No newline at end of file
--- /dev/null
+tasks:
+- workunit:
+ no_coverage_and_limits: true
+ timeout: 30m
+ clients:
+ client.3:
+ - rbd/nvmeof_fio_test.sh --rbd_iostat
+ env:
+ RBD_POOL: mypool
+ IOSTAT_INTERVAL: '10'
+ RUNTIME: '600'
+++ /dev/null
-../.qa/
\ No newline at end of file
+++ /dev/null
-../.qa/
\ No newline at end of file
+++ /dev/null
-use_shaman: True
-tasks:
-- install:
-- cephadm:
-- cephadm.shell:
- host.a:
- # get state before nvmeof deployment
- - ceph orch status
- - ceph orch ps
- - ceph orch host ls
- - ceph orch device ls
- - ceph osd lspools
-
+++ /dev/null
-.qa/distros/supported/centos_latest.yaml
\ No newline at end of file
+++ /dev/null
-../.qa/
\ No newline at end of file
+++ /dev/null
-roles:
-- - host.a
- - mon.a
- - mgr.x
- - osd.0
- - osd.1
- - client.0
- - ceph.nvmeof.nvmeof.a
-- - host.b
- - mon.b
- - osd.2
- - osd.3
- - osd.4
- - client.1
- - ceph.nvmeof.nvmeof.b
-- - client.2
-- - client.3
+++ /dev/null
-openstack:
- - machine:
- disk: 40 # GB
- ram: 8000 # MB
- cpus: 1
- volumes: # attached to each instance
- count: 4
- size: 30 # GB
+++ /dev/null
-.qa/rbd/conf
\ No newline at end of file
+++ /dev/null
-../.qa/
\ No newline at end of file
+++ /dev/null
-tasks:
-- nvmeof:
- client: client.0
- version: default # "default" is the image cephadm defaults to; change to test specific nvmeof images, example "latest"
- rbd:
- pool_name: mypool
- image_name_prefix: myimage
- gateway_config:
- namespaces_count: 128
- cli_version: latest
-
-- cephadm.wait_for_service:
- service: nvmeof.mypool
-
-- workunit:
- no_coverage_and_limits: true
- clients:
- client.2:
- - rbd/nvmeof_setup_subsystem.sh
- env:
- RBD_POOL: mypool
- RBD_IMAGE_PREFIX: myimage
-
-- workunit:
- no_coverage_and_limits: true
- timeout: 30m
- clients:
- client.2:
- - rbd/nvmeof_basic_tests.sh
- - rbd/nvmeof_fio_test.sh --start_ns 1 --end_ns 64 --rbd_iostat
- client.3:
- - rbd/nvmeof_basic_tests.sh
- - rbd/nvmeof_fio_test.sh --start_ns 65 --end_ns 128
- env:
- RBD_POOL: mypool
- IOSTAT_INTERVAL: '10'
- RUNTIME: '600'
from teuthology.exceptions import ConfigError, CommandFailedError
from textwrap import dedent
from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from tasks.daemonwatchdog import DaemonWatchdog
from tasks.util import chacra
# these items we use from ceph.py should probably eventually move elsewhere
remote.sudo_write_file(client_keyring, keyring, mode='0644')
yield
+@contextlib.contextmanager
+def watchdog_setup(ctx, config):
+ if 'watchdog_setup' in config:
+ ctx.ceph[config['cluster']].thrashers = []
+ ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config, ctx.ceph[config['cluster']].thrashers)
+ ctx.ceph[config['cluster']].watchdog.start()
+ else:
+ ctx.ceph[config['cluster']].watchdog = None
+ yield
@contextlib.contextmanager
def ceph_initial():
cluster, type_, id_ = teuthology.split_role(role)
ctx.daemons.get_daemon(type_, id_, cluster).stop()
clusters.add(cluster)
-
-# for cluster in clusters:
-# ctx.ceph[cluster].watchdog.stop()
-# ctx.ceph[cluster].watchdog.join()
+
+ if ctx.ceph[cluster].watchdog:
+ for cluster in clusters:
+ ctx.ceph[cluster].watchdog.stop()
+ ctx.ceph[cluster].watchdog.join()
yield
:param ctx: the argparse.Namespace object
:param config: the config dict
+ :param watchdog_setup: start DaemonWatchdog to watch daemons for failures
"""
if config is None:
config = {}
lambda: ceph_clients(ctx=ctx, config=config),
lambda: create_rbd_pool(ctx=ctx, config=config),
lambda: conf_epoch(ctx=ctx, config=config),
+ lambda: watchdog_setup(ctx=ctx, config=config),
):
try:
if config.get('wait-for-healthy', True):
except:
self.logger.exception("ignoring exception:")
daemons = []
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('osd', cluster=self.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('rgw', cluster=self.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mgr', cluster=self.cluster)))
+ daemons.extend(filter(lambda daemon: not daemon.finished(), self.ctx.daemons.iter_daemons_of_role('osd', cluster=self.cluster)))
+ daemons.extend(filter(lambda daemon: not daemon.finished(), self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.cluster)))
+ daemons.extend(filter(lambda daemon: not daemon.finished(), self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.cluster)))
+ daemons.extend(filter(lambda daemon: not daemon.finished(), self.ctx.daemons.iter_daemons_of_role('rgw', cluster=self.cluster)))
+ daemons.extend(filter(lambda daemon: not daemon.finished(), self.ctx.daemons.iter_daemons_of_role('mgr', cluster=self.cluster)))
for daemon in daemons:
try:
mgrs = self.ctx.daemons.iter_daemons_of_role('mgr', cluster=self.cluster)
daemon_failures = []
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, osds))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, rgws))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mgrs))
+ daemon_failures.extend(filter(lambda daemon: daemon.finished(), osds))
+ daemon_failures.extend(filter(lambda daemon: daemon.finished(), mons))
+ daemon_failures.extend(filter(lambda daemon: daemon.finished(), mdss))
+ daemon_failures.extend(filter(lambda daemon: daemon.finished(), rgws))
+ daemon_failures.extend(filter(lambda daemon: daemon.finished(), mgrs))
for daemon in daemon_failures:
name = daemon.role + '.' + daemon.id_
import gevent
import json
import math
+from gevent.event import Event
from teuthology import misc as teuthology
from teuthology.contextutil import safe_while
from tasks import ceph_manager
the monitor (default: 10)
thrash_delay Number of seconds to wait in-between
test iterations (default: 0)
+ switch_thrashers: Toggle this to switch between thrashers so it waits until all
+ thrashers are done thrashing before proceeding. And then
+ wait until all thrashers are done reviving before proceeding.
+ (default: false)
store_thrash Thrash monitor store before killing the monitor being thrashed (default: False)
store_thrash_probability Probability of thrashing a monitor's store
(default: 50)
self.manager = manager
self.manager.wait_for_clean()
- self.stopping = False
+ self.stopping = Event()
self.logger = logger
self.config = config
self.name = name
if self.config is None:
self.config = dict()
+ if self.config.get("switch_thrashers"):
+ self.switch_thrasher = Event()
+
""" Test reproducibility """
self.random_seed = self.config.get('seed', None)
"""
Break out of this processes thrashing loop.
"""
- self.stopping = True
+ self.stopping.set()
self.thread.get()
def should_thrash_store(self):
"""
Revive the monitor specified
"""
- self.log('killing mon.{id}'.format(id=mon))
self.log('reviving mon.{id}'.format(id=mon))
self.manager.revive_mon(mon)
# Allow successful completion so gevent doesn't see an exception.
# The DaemonWatchdog will observe the error and tear down the test.
+ def switch_task(self):
+ """
+ Pause mon thrasher till other thrashers are done with their iteration.
+ This would help to sync between multiple thrashers, like:
+ 1. thrasher-1 and thrasher-2: thrash daemons in parallel
+ 2. thrasher-1 and thrasher-2: revive daemons in parallel
+ This allows us to run some checks after each thrashing and reviving iteration.
+ """
+ if not hasattr(self, 'switch_thrasher'):
+ return
+ self.switch_thrasher.set()
+ thrashers = self.ctx.ceph[self.config.get('cluster')].thrashers
+ for t in thrashers:
+ if not isinstance(t, MonitorThrasher) and hasattr(t, 'switch_thrasher') and (
+ isinstance(t.stopping, Event) and not t.stopping.is_set()
+ ):
+ other_thrasher = t
+ self.log('switch_task: waiting for others thrashers')
+ other_thrasher.switch_thrasher.wait(300)
+ self.log('switch_task: done waiting for the other thrasher')
+ other_thrasher.switch_thrasher.clear()
+
def _do_thrash(self):
"""
Continuously loop and thrash the monitors.
fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
))
- while not self.stopping:
+ while not self.stopping.is_set():
mons = _get_mons(self.ctx)
self.manager.wait_for_mon_quorum_size(len(mons))
self.log('making sure all monitors are in the quorum')
delay=self.revive_delay))
time.sleep(self.revive_delay)
+ self.switch_task()
+
for mon in mons_to_kill:
self.revive_mon(mon)
# do more freezes
delay=self.thrash_delay))
time.sleep(self.thrash_delay)
+ self.switch_task()
+
#status after thrashing
if self.mds_failover:
status = self.mds_cluster.status()
if 'cluster' not in config:
config['cluster'] = 'ceph'
+ logger = config.get('logger', 'mon_thrasher')
+
log.info('Beginning mon_thrash...')
first_mon = teuthology.get_first_mon(ctx, config)
(mon,) = ctx.cluster.only(first_mon).remotes.keys()
)
thrash_proc = MonitorThrasher(ctx,
manager, config, "MonitorThrasher",
- logger=log.getChild('mon_thrasher'))
+ logger=log.getChild(logger))
ctx.ceph[config['cluster']].thrashers.append(thrash_proc)
try:
log.debug('Yielding')
import logging
+import random
+import time
+from collections import defaultdict
+from datetime import datetime
from textwrap import dedent
+from gevent.event import Event
+from gevent.greenlet import Greenlet
from teuthology.task import Task
from teuthology import misc
from teuthology.exceptions import ConfigError
+from teuthology.orchestra import run
from tasks.util import get_remote_for_role
from tasks.cephadm import _shell
+from tasks.thrasher import Thrasher
log = logging.getLogger(__name__)
self.set_gateway_cfg()
def _set_defaults(self):
- self.gateway_image = self.config.get('version', 'default')
+ self.gateway_image = self.config.get('gw_image', 'default')
rbd_config = self.config.get('rbd', {})
self.poolname = rbd_config.get('pool_name', 'mypool')
self.rbd_size = rbd_config.get('rbd_size', 1024*8)
gateway_config = self.config.get('gateway_config', {})
- self.namespaces_count = gateway_config.get('namespaces_count', 1)
- self.cli_image = gateway_config.get('cli_version', 'latest')
+ self.cli_image = gateway_config.get('cli_image', 'quay.io/ceph/nvmeof-cli:latest')
+ self.nqn_prefix = gateway_config.get('subsystem_nqn_prefix', 'nqn.2016-06.io.spdk:cnode')
+ self.subsystems_count = gateway_config.get('subsystems_count', 1)
+ self.namespaces_count = gateway_config.get('namespaces_count', 1) # namepsaces per subsystem
self.bdev = gateway_config.get('bdev', 'mybdev')
self.serial = gateway_config.get('serial', 'SPDK00000000000001')
- self.nqn = gateway_config.get('nqn', 'nqn.2016-06.io.spdk:cnode1')
self.port = gateway_config.get('port', '4420')
self.srport = gateway_config.get('srport', '5500')
daemons[role] = (remote, id_)
if nodes:
- image = self.gateway_image
- if (image != "default"):
- log.info(f'[nvmeof]: ceph config set mgr mgr/cephadm/container_image_nvmeof quay.io/ceph/nvmeof:{image}')
+ gw_image = self.gateway_image
+ if (gw_image != "default"):
+ log.info(f'[nvmeof]: ceph config set mgr mgr/cephadm/container_image_nvmeof {gw_image}')
_shell(self.ctx, self.cluster_name, self.remote, [
'ceph', 'config', 'set', 'mgr',
'mgr/cephadm/container_image_nvmeof',
- f'quay.io/ceph/nvmeof:{image}'
+ gw_image
])
poolname = self.poolname
'--placement', str(len(nodes)) + ';' + ';'.join(nodes)
])
- log.info(f'[nvmeof]: creating {self.namespaces_count} images')
- for i in range(1, int(self.namespaces_count) + 1):
+ total_images = int(self.namespaces_count) * int(self.subsystems_count)
+ log.info(f'[nvmeof]: creating {total_images} images')
+ for i in range(1, total_images + 1):
imagename = self.image_name_prefix + str(i)
log.info(f'[nvmeof]: rbd create {poolname}/{imagename} --size {self.rbd_size}')
_shell(self.ctx, self.cluster_name, self.remote, [
gateway_ips = []
nvmeof_daemons = self.ctx.daemons.iter_daemons_of_role('nvmeof', cluster=self.cluster_name)
for daemon in nvmeof_daemons:
- gateway_names += [daemon.name()]
+ gateway_names += [daemon.remote.shortname]
gateway_ips += [daemon.remote.ip_address]
conf_data = dedent(f"""
NVMEOF_GATEWAY_IP_ADDRESSES={",".join(gateway_ips)}
NVMEOF_GATEWAY_NAMES={",".join(gateway_names)}
NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS={ip_address}
- NVMEOF_CLI_IMAGE="quay.io/ceph/nvmeof-cli:{self.cli_image}"
+ NVMEOF_CLI_IMAGE="{self.cli_image}"
+ NVMEOF_SUBSYSTEMS_PREFIX={self.nqn_prefix}
+ NVMEOF_SUBSYSTEMS_COUNT={self.subsystems_count}
NVMEOF_NAMESPACES_COUNT={self.namespaces_count}
- NVMEOF_NQN={self.nqn}
NVMEOF_PORT={self.port}
NVMEOF_SRPORT={self.srport}
""")
log.info("[nvmeof]: executed set_gateway_cfg successfully!")
+class NvmeofThrasher(Thrasher, Greenlet):
+ """
+ How it works::
+
+ - pick a nvmeof daemon
+ - kill it
+ - wait for other thrashers to finish thrashing (if switch_thrashers True)
+ - sleep for 'revive_delay' seconds
+ - do some checks after thrashing ('do_checks' method)
+ - revive daemons
+ - wait for other thrashers to finish reviving (if switch_thrashers True)
+ - sleep for 'thrash_delay' seconds
+ - do some checks after reviving ('do_checks' method)
+
+
+ Options::
+
+ seed Seed to use on the RNG to reproduce a previous
+ behavior (default: None; i.e., not set)
+ checker_host: Initiator client on which verification tests would
+ run during thrashing (mandatory option)
+ switch_thrashers: Toggle this to switch between thrashers so it waits until all
+ thrashers are done thrashing before proceeding. And then
+ wait until all thrashers are done reviving before proceeding.
+ (default: false)
+ randomize: Enables randomization and use the max/min values. (default: true)
+ max_thrash: Maximum number of daemons that can be thrashed at a time.
+ (default: num_of_daemons-1, minimum of 1 daemon should be up)
+ min_thrash_delay: Minimum number of seconds to delay before thrashing again.
+ (default: 60)
+ max_thrash_delay: Maximum number of seconds to delay before thrashing again.
+ (default: min_thrash_delay + 30)
+ min_revive_delay: Minimum number of seconds to delay before bringing back a
+ thrashed daemon. (default: 100)
+ max_revive_delay: Maximum number of seconds to delay before bringing back a
+ thrashed daemon. (default: min_revive_delay + 30)
+
+ daemon_max_thrash_times:
+ For now, NVMeoF daemons have limitation that each daemon can
+ be thrashed only 3 times in span of 30 mins. This option
+ allows to set the amount of times it could be thrashed in a period
+ of time. (default: 3)
+ daemon_max_thrash_period:
+ This option goes with the above option. It sets the period of time
+ over which each daemons can be thrashed for daemon_max_thrash_times
+ amount of times. Time period in seconds. (default: 1800, i.e. 30mins)
+
+
+ For example::
+ tasks:
+ - nvmeof.thrash:
+ checker_host: 'client.3'
+ switch_thrashers: True
+
+ - mon_thrash:
+ switch_thrashers: True
+
+ - workunit:
+ clients:
+ client.3:
+ - rbd/nvmeof_fio_test.sh --rbd_iostat
+ env:
+ RBD_POOL: mypool
+ IOSTAT_INTERVAL: '10'
+
+ """
+ def __init__(self, ctx, config, daemons) -> None:
+ super(NvmeofThrasher, self).__init__()
+
+ if config is None:
+ self.config = dict()
+ self.config = config
+ self.ctx = ctx
+ self.daemons = daemons
+ self.logger = log.getChild('[nvmeof.thrasher]')
+ self.stopping = Event()
+ if self.config.get("switch_thrashers"):
+ self.switch_thrasher = Event()
+ self.checker_host = get_remote_for_role(self.ctx, self.config.get('checker_host'))
+ self.devices = self._get_devices(self.checker_host)
+
+ """ Random seed """
+ self.random_seed = self.config.get('seed', None)
+ if self.random_seed is None:
+ self.random_seed = int(time.time())
+
+ self.rng = random.Random()
+ self.rng.seed(int(self.random_seed))
+
+ """ Thrashing params """
+ self.randomize = bool(self.config.get('randomize', True))
+ self.max_thrash_daemons = int(self.config.get('max_thrash', len(self.daemons) - 1))
+
+ # Limits on thrashing each daemon
+ self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 3))
+ self.daemon_max_thrash_period = int(self.config.get('daemon_max_thrash_period', 30 * 60)) # seconds
+
+ self.min_thrash_delay = int(self.config.get('min_thrash_delay', 60))
+ self.max_thrash_delay = int(self.config.get('max_thrash_delay', self.min_thrash_delay + 30))
+ self.min_revive_delay = int(self.config.get('min_revive_delay', 100))
+ self.max_revive_delay = int(self.config.get('max_revive_delay', self.min_revive_delay + 30))
+
+ def _get_devices(self, remote):
+ GET_DEVICE_CMD = "sudo nvme list --output-format=json | " \
+ "jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == \"Ceph bdev Controller\") | .DevicePath'"
+ devices = remote.sh(GET_DEVICE_CMD).split()
+ return devices
+
+ def log(self, x):
+ self.logger.info(x)
+
+ def _run(self): # overriding
+ try:
+ self.do_thrash()
+ except Exception as e:
+ self.set_thrasher_exception(e)
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+ # The DaemonWatchdog will observe the error and tear down the test.
+
+ def stop(self):
+ self.stopping.set()
+
+ def do_checks(self):
+ """
+ Run some checks to see if everything is running well during thrashing.
+ """
+ self.log('display and verify stats:')
+ for d in self.daemons:
+ d.remote.sh(d.status_cmd, check_status=False)
+ check_cmd = [
+ 'ceph', 'orch', 'ls',
+ run.Raw('&&'), 'ceph', 'orch', 'ps', '--daemon-type', 'nvmeof',
+ run.Raw('&&'), 'ceph', 'health', 'detail',
+ run.Raw('&&'), 'ceph', '-s',
+ ]
+ for dev in self.devices:
+ check_cmd += [
+ run.Raw('&&'), 'sudo', 'nvme', 'list-subsys', dev,
+ run.Raw('|'), 'grep', 'live optimized'
+ ]
+ self.checker_host.run(args=check_cmd).wait()
+
+ def switch_task(self):
+ """
+ Pause nvmeof thrasher till other thrashers are done with their iteration.
+ This method would help to sync between multiple thrashers, like:
+ 1. thrasher-1 and thrasher-2: thrash daemons in parallel
+ 2. thrasher-1 and thrasher-2: revive daemons in parallel
+ This allows us to run some checks after each thrashing and reviving iteration.
+ """
+ if not hasattr(self, 'switch_thrasher'):
+ return
+ self.switch_thrasher.set()
+ thrashers = self.ctx.ceph[self.config.get('cluster')].thrashers
+ for t in thrashers:
+ if not isinstance(t, NvmeofThrasher) and hasattr(t, 'switch_thrasher') and (
+ isinstance(t.stopping, Event) and not t.stopping.is_set()
+ ):
+ other_thrasher = t
+ self.log('switch_task: waiting for other thrasher')
+ other_thrasher.switch_thrasher.wait(300)
+ self.log('switch_task: done waiting for the other thrasher')
+ other_thrasher.switch_thrasher.clear()
+
+ def do_thrash(self):
+ self.log('start thrashing')
+ self.log(f'seed: {self.random_seed}, , '\
+ f'max thrash delay: {self.max_thrash_delay}, min thrash delay: {self.min_thrash_delay} '\
+ f'max revive delay: {self.max_revive_delay}, min revive delay: {self.min_revive_delay} '\
+ f'daemons: {len(self.daemons)} '\
+ )
+ daemons_thrash_history = defaultdict(list)
+ summary = []
+
+ while not self.stopping.is_set():
+ killed_daemons = []
+
+ weight = 1.0 / len(self.daemons)
+ count = 0
+ for daemon in self.daemons:
+ skip = self.rng.uniform(0.0, 1.0)
+ if weight <= skip:
+ self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format(
+ label=daemon.id_, skip=skip, weight=weight))
+ continue
+
+ # For now, nvmeof daemons can only be thrashed 3 times in last 30mins.
+ # Skip thrashing if daemon was thrashed <daemon_max_thrash_times>
+ # times in last <daemon_max_thrash_period> seconds.
+ thrashed_history = daemons_thrash_history.get(daemon.id_, [])
+ history_ptr = len(thrashed_history) - self.daemon_max_thrash_times
+ if history_ptr >= 0:
+ ptr_timestamp = thrashed_history[history_ptr]
+ current_timestamp = datetime.now()
+ if (current_timestamp - ptr_timestamp).total_seconds() < self.daemon_max_thrash_period:
+ self.log(f'skipping daemon {daemon.id_}: thrashed total {len(thrashed_history)} times, '\
+ f'can only thrash {self.daemon_max_thrash_times} times '\
+ f'in {self.daemon_max_thrash_period} seconds.')
+ continue
+
+ self.log('kill {label}'.format(label=daemon.id_))
+ daemon.stop()
+
+ killed_daemons.append(daemon)
+ daemons_thrash_history[daemon.id_] += [datetime.now()]
+
+ # only thrash max_thrash_daemons amount of daemons
+ count += 1
+ if count >= self.max_thrash_daemons:
+ break
+
+ if killed_daemons:
+ summary += ["killed: " + ", ".join([d.id_ for d in killed_daemons])]
+ # delay before reviving
+ revive_delay = self.min_revive_delay
+ if self.randomize:
+ revive_delay = random.randrange(self.min_revive_delay, self.max_revive_delay)
+
+ self.log(f'waiting for {revive_delay} secs before reviving')
+ time.sleep(revive_delay) # blocking wait
+ self.log('done waiting before reviving')
+
+ self.do_checks()
+ self.switch_task()
+
+ # revive after thrashing
+ for daemon in killed_daemons:
+ self.log('reviving {label}'.format(label=daemon.id_))
+ daemon.restart()
+
+ # delay before thrashing
+ thrash_delay = self.min_thrash_delay
+ if self.randomize:
+ thrash_delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay)
+ if thrash_delay > 0.0:
+ self.log(f'waiting for {thrash_delay} secs before thrashing')
+ time.sleep(thrash_delay) # blocking
+ self.log('done waiting before thrashing')
+
+ self.do_checks()
+ self.switch_task()
+ self.log("Thrasher summary: ")
+ for daemon in daemons_thrash_history:
+ self.log(f'{daemon} was thrashed {len(daemons_thrash_history[daemon])} times')
+ for index, string in enumerate(summary):
+ self.log(f"Iteration {index}: {string}")
+
+class ThrashTest(Nvmeof):
+ name = 'nvmeof.thrash'
+ def setup(self):
+ if self.config is None:
+ self.config = {}
+ assert isinstance(self.config, dict), \
+ 'nvmeof.thrash task only accepts a dict for configuration'
+
+ self.cluster = self.config['cluster'] = self.config.get('cluster', 'ceph')
+ daemons = list(self.ctx.daemons.iter_daemons_of_role('nvmeof', self.cluster))
+ assert len(daemons) > 1, \
+ 'nvmeof.thrash task requires at least 2 nvmeof daemon'
+ self.thrasher = NvmeofThrasher(self.ctx, self.config, daemons)
+
+ def begin(self):
+ self.thrasher.start()
+ self.ctx.ceph[self.cluster].thrashers.append(self.thrasher)
+
+ def end(self):
+ log.info('joining nvmeof.thrash')
+ self.thrasher.stop()
+ if self.thrasher.exception is not None:
+ raise RuntimeError('error during thrashing')
+ self.thrasher.join()
+ log.info('done joining')
+
+
task = Nvmeof
+thrash = ThrashTest
sudo modprobe nvme-fabrics
sudo modprobe nvme-tcp
-sudo dnf install nvme-cli -y
+sudo dnf reinstall nvme-cli -y
sudo lsmod | grep nvme
+nvme version
source /etc/ceph/nvmeof.env
-SPDK_CONTROLLER="SPDK bdev Controller"
+SPDK_CONTROLLER="Ceph bdev Controller"
DISCOVERY_PORT="8009"
discovery() {
}
connect() {
- sudo nvme connect -t tcp --traddr $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS -s $NVMEOF_PORT -n $NVMEOF_NQN
- output=$(sudo nvme list)
+ sudo nvme connect -t tcp --traddr $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS -s $NVMEOF_PORT -n "${NVMEOF_SUBSYSTEMS_PREFIX}1"
+ sleep 5
+ output=$(sudo nvme list --output-format=json)
if ! echo "$output" | grep -q "$SPDK_CONTROLLER"; then
return 1
fi
}
connect_all() {
- sudo nvme connect-all --traddr=$NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --transport=tcp
- output=$(sudo nvme list)
+ sudo nvme connect-all --traddr=$NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --transport=tcp -l 3600
+ sleep 5
+ output=$(sudo nvme list --output-format=json)
if ! echo "$output" | grep -q "$SPDK_CONTROLLER"; then
return 1
fi
test_run disconnect_all
test_run list_subsys 0
test_run connect_all
-gateway_count=$(( $(echo "$NVMEOF_GATEWAY_IP_ADDRESSES" | tr -cd ',' | wc -c) + 1))
-test_run list_subsys $gateway_count
+gateways_count=$(( $(echo "$NVMEOF_GATEWAY_IP_ADDRESSES" | tr -cd ',' | wc -c) + 1 ))
+multipath_count=$(( $gateways_count * $NVMEOF_SUBSYSTEMS_COUNT))
+test_run list_subsys $multipath_count
echo "-------------Test Summary-------------"
fio_file=$(mktemp -t nvmeof-fio-XXXX)
all_drives_list=$(sudo nvme list --output-format=json |
- jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == "SPDK bdev Controller") | .DevicePath')
+ jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == "Ceph bdev Controller") | .DevicePath')
# When the script is passed --start_ns and --end_ns (example: `nvmeof_fio_test.sh --start_ns 1 --end_ns 3`),
# then fio runs on namespaces only in the defined range (which is 1 to 3 here).
direct=1
EOF
-echo "[nvmeof] starting fio test..."
+echo "[nvmeof.fio] starting fio test..."
if [ -n "$IOSTAT_INTERVAL" ]; then
iostat_count=$(( RUNTIME / IOSTAT_INTERVAL ))
fi
if [ "$rbd_iostat" = true ]; then
iterations=$(( RUNTIME / 5 ))
- rbd perf image iostat $RBD_POOL --iterations $iterations &
+ timeout 20 rbd perf image iostat $RBD_POOL --iterations $iterations &
fi
fio --showcmd $fio_file
sudo fio $fio_file
wait
-echo "[nvmeof] fio test successful!"
+echo "[nvmeof.fio] fio test successful!"
sudo podman images
sudo podman ps
sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT --format json subsystem list
-sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT subsystem add --subsystem $NVMEOF_NQN
-# add all namespaces
-for i in $(seq 1 $NVMEOF_NAMESPACES_COUNT); do
- image="${RBD_IMAGE_PREFIX}${i}"
- sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT namespace add --subsystem $NVMEOF_NQN --rbd-pool $RBD_POOL --rbd-image $image
-done
-
-# add all gateway listeners
IFS=',' read -ra gateway_ips <<< "$NVMEOF_GATEWAY_IP_ADDRESSES"
IFS=',' read -ra gateway_names <<< "$NVMEOF_GATEWAY_NAMES"
+gateways_count=$(( $(echo "$NVMEOF_GATEWAY_IP_ADDRESSES" | tr -cd ',' | wc -c) + 1 ))
+
+list_subsystems () {
+ for i in "${!gateway_ips[@]}"
+ do
+ ip="${gateway_ips[i]}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $ip --server-port $NVMEOF_SRPORT --format json subsystem list
+ done
+}
+
+# add all subsystems
+for i in $(seq 1 $NVMEOF_SUBSYSTEMS_COUNT); do
+ subsystem_nqn="${NVMEOF_SUBSYSTEMS_PREFIX}${i}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT subsystem add --subsystem $subsystem_nqn
+done
+
+list_subsystems
+
+# add all gateway listeners
for i in "${!gateway_ips[@]}"
do
ip="${gateway_ips[i]}"
name="${gateway_names[i]}"
- echo "Adding gateway listener $index with IP ${ip} and name ${name}"
- sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $ip --server-port $NVMEOF_SRPORT listener add --subsystem $NVMEOF_NQN --gateway-name client.$name --traddr $ip --trsvcid $NVMEOF_PORT
- sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $ip --server-port $NVMEOF_SRPORT --format json subsystem list
+ for j in $(seq 1 $NVMEOF_SUBSYSTEMS_COUNT); do
+ subsystem_nqn="${NVMEOF_SUBSYSTEMS_PREFIX}${j}"
+ echo "Adding gateway listener $index with IP ${ip} and name ${name}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $ip --server-port $NVMEOF_SRPORT listener add --subsystem $subsystem_nqn --host-name $name --traddr $ip --trsvcid $NVMEOF_PORT
+ done
+done
+
+# add all hosts
+for i in $(seq 1 $NVMEOF_SUBSYSTEMS_COUNT); do
+ subsystem_nqn="${NVMEOF_SUBSYSTEMS_PREFIX}${i}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT host add --subsystem $subsystem_nqn --host "*"
+done
+
+# add all namespaces
+image_index=1
+for i in $(seq 1 $NVMEOF_SUBSYSTEMS_COUNT); do
+ subsystem_nqn="${NVMEOF_SUBSYSTEMS_PREFIX}${i}"
+ for ns in $(seq 1 $NVMEOF_NAMESPACES_COUNT); do
+ image="${RBD_IMAGE_PREFIX}${image_index}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT namespace add --subsystem $subsystem_nqn --rbd-pool $RBD_POOL --rbd-image $image --load-balancing-group $(($image_index % $gateways_count + 1))
+ ((image_index++))
+ done
+done
+
+list_subsystems
+
+# list namespaces
+for i in $(seq 1 $NVMEOF_SUBSYSTEMS_COUNT); do
+ subsystem_nqn="${NVMEOF_SUBSYSTEMS_PREFIX}${i}"
+ sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT --format plain namespace list --subsystem $subsystem_nqn
done
-sudo podman run -it $NVMEOF_CLI_IMAGE --server-address $NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS --server-port $NVMEOF_SRPORT host add --subsystem $NVMEOF_NQN --host "*"
echo "[nvmeof] Subsystem setup done"