import gevent
import json
import math
+from gevent.event import Event
from teuthology import misc as teuthology
from teuthology.contextutil import safe_while
from tasks import ceph_manager
the monitor (default: 10)
thrash_delay Number of seconds to wait in-between
test iterations (default: 0)
+ switch_thrashers: Toggle this to switch between thrashers so it waits until all
+ thrashers are done thrashing before proceeding. And then
+ wait until all thrashers are done reviving before proceeding.
+ (default: false)
store_thrash Thrash monitor store before killing the monitor being thrashed (default: False)
store_thrash_probability Probability of thrashing a monitor's store
(default: 50)
self.manager = manager
self.manager.wait_for_clean()
- self.stopping = False
+ self.stopping = Event()
self.logger = logger
self.config = config
self.name = name
if self.config is None:
self.config = dict()
+ if self.config.get("switch_thrashers"):
+ self.switch_thrasher = Event()
+
""" Test reproducibility """
self.random_seed = self.config.get('seed', None)
"""
Break out of this processes thrashing loop.
"""
- self.stopping = True
+ self.stopping.set()
self.thread.get()
def stop_and_join(self):
"""
Revive the monitor specified
"""
- self.log('killing mon.{id}'.format(id=mon))
self.log('reviving mon.{id}'.format(id=mon))
self.manager.revive_mon(mon)
# Allow successful completion so gevent doesn't see an exception.
# The DaemonWatchdog will observe the error and tear down the test.
+ def switch_task(self):
+ """
+ Pause mon thrasher till other thrashers are done with their iteration.
+ This would help to sync between multiple thrashers, like:
+ 1. thrasher-1 and thrasher-2: thrash daemons in parallel
+ 2. thrasher-1 and thrasher-2: revive daemons in parallel
+ This allows us to run some checks after each thrashing and reviving iteration.
+ """
+ if not hasattr(self, 'switch_thrasher'):
+ return
+ self.switch_thrasher.set()
+ thrashers = self.ctx.ceph[self.config.get('cluster')].thrashers
+ for t in thrashers:
+ if not isinstance(t, MonitorThrasher) and hasattr(t, 'switch_thrasher') and (
+ isinstance(t.stopping, Event) and not t.stopping.is_set()
+ ):
+ other_thrasher = t
+ self.log('switch_task: waiting for others thrashers')
+ other_thrasher.switch_thrasher.wait(300)
+ self.log('switch_task: done waiting for the other thrasher')
+ other_thrasher.switch_thrasher.clear()
+
def _do_thrash(self):
"""
Continuously loop and thrash the monitors.
fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
))
- while not self.stopping:
+ while not self.stopping.is_set():
mons = _get_mons(self.ctx)
self.manager.wait_for_mon_quorum_size(len(mons))
self.log('making sure all monitors are in the quorum')
delay=self.revive_delay))
time.sleep(self.revive_delay)
+ self.switch_task()
+
for mon in mons_to_kill:
self.revive_mon(mon)
# do more freezes
delay=self.thrash_delay))
time.sleep(self.thrash_delay)
+ self.switch_task()
+
#status after thrashing
if self.mds_failover:
status = self.mds_cluster.status()
if 'cluster' not in config:
config['cluster'] = 'ceph'
+ logger = config.get('logger', 'mon_thrasher')
+
log.info('Beginning mon_thrash...')
first_mon = teuthology.get_first_mon(ctx, config)
(mon,) = ctx.cluster.only(first_mon).remotes.keys()
)
thrash_proc = MonitorThrasher(ctx,
manager, config, "MonitorThrasher",
- logger=log.getChild('mon_thrasher'))
+ logger=log.getChild(logger))
ctx.ceph[config['cluster']].thrashers.append(thrash_proc)
try:
log.debug('Yielding')
import logging
+import random
+import time
+from collections import defaultdict
+from datetime import datetime
from textwrap import dedent
+from gevent.event import Event
+from gevent.greenlet import Greenlet
from teuthology.task import Task
from teuthology import misc
from teuthology.exceptions import ConfigError
+from teuthology.orchestra import run
from tasks.util import get_remote_for_role
from tasks.cephadm import _shell
+from tasks.thrasher import Thrasher
log = logging.getLogger(__name__)
self.set_gateway_cfg()
def _set_defaults(self):
- self.gateway_image = self.config.get('version', 'default')
+ self.gateway_image = self.config.get('gw_image', 'default')
rbd_config = self.config.get('rbd', {})
self.poolname = rbd_config.get('pool_name', 'mypool')
self.rbd_size = rbd_config.get('rbd_size', 1024*8)
gateway_config = self.config.get('gateway_config', {})
- conf_vars = gateway_config.get('vars', {})
- self.cli_image = conf_vars.get('cli_version', 'latest')
- self.bdev = conf_vars.get('bdev', 'mybdev')
- self.serial = conf_vars.get('serial', 'SPDK00000000000001')
- self.nqn = conf_vars.get('nqn', 'nqn.2016-06.io.spdk:cnode1')
- self.port = conf_vars.get('port', '4420')
- self.srport = conf_vars.get('srport', '5500')
+ self.cli_image = gateway_config.get('cli_image', 'quay.io/ceph/nvmeof-cli:latest')
+ self.nqn_prefix = gateway_config.get('subsystem_nqn_prefix', 'nqn.2016-06.io.spdk:cnode')
+ self.subsystems_count = gateway_config.get('subsystems_count', 1)
+ self.namespaces_count = gateway_config.get('namespaces_count', 1) # namepsaces per subsystem
+ self.bdev = gateway_config.get('bdev', 'mybdev')
+ self.serial = gateway_config.get('serial', 'SPDK00000000000001')
+ self.port = gateway_config.get('port', '4420')
+ self.srport = gateway_config.get('srport', '5500')
def deploy_nvmeof(self):
"""
daemons[role] = (remote, id_)
if nodes:
- image = self.gateway_image
- if (image != "default"):
- log.info(f'[nvmeof]: ceph config set mgr mgr/cephadm/container_image_nvmeof quay.io/ceph/nvmeof:{image}')
+ gw_image = self.gateway_image
+ if (gw_image != "default"):
+ log.info(f'[nvmeof]: ceph config set mgr mgr/cephadm/container_image_nvmeof {gw_image}')
_shell(self.ctx, self.cluster_name, self.remote, [
'ceph', 'config', 'set', 'mgr',
'mgr/cephadm/container_image_nvmeof',
- f'quay.io/ceph/nvmeof:{image}'
+ gw_image
])
poolname = self.poolname
'--placement', str(len(nodes)) + ';' + ';'.join(nodes)
])
- log.info(f'[nvmeof]: rbd create {poolname}/{imagename} --size {self.rbd_size}')
- _shell(self.ctx, self.cluster_name, self.remote, [
- 'rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}'
- ])
+ total_images = int(self.namespaces_count) * int(self.subsystems_count)
+ log.info(f'[nvmeof]: creating {total_images} images')
+ for i in range(1, total_images + 1):
+ imagename = self.image_name_prefix + str(i)
+ log.info(f'[nvmeof]: rbd create {poolname}/{imagename} --size {self.rbd_size}')
+ _shell(self.ctx, self.cluster_name, self.remote, [
+ 'rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}'
+ ])
for role, i in daemons.items():
remote, id_ = i
gateway_name = ""
nvmeof_daemons = self.ctx.daemons.iter_daemons_of_role('nvmeof', cluster=self.cluster_name)
for daemon in nvmeof_daemons:
- if ip_address == daemon.remote.ip_address:
- gateway_name = daemon.name()
+ gateway_names += [daemon.remote.shortname]
+ gateway_ips += [daemon.remote.ip_address]
conf_data = dedent(f"""
- NVMEOF_GATEWAY_IP_ADDRESS={ip_address}
- NVMEOF_GATEWAY_NAME={gateway_name}
- NVMEOF_CLI_IMAGE="quay.io/ceph/nvmeof-cli:{self.cli_image}"
- NVMEOF_BDEV={self.bdev}
- NVMEOF_SERIAL={self.serial}
- NVMEOF_NQN={self.nqn}
+ NVMEOF_GATEWAY_IP_ADDRESSES={",".join(gateway_ips)}
+ NVMEOF_GATEWAY_NAMES={",".join(gateway_names)}
+ NVMEOF_DEFAULT_GATEWAY_IP_ADDRESS={ip_address}
+ NVMEOF_CLI_IMAGE="{self.cli_image}"
+ NVMEOF_SUBSYSTEMS_PREFIX={self.nqn_prefix}
+ NVMEOF_SUBSYSTEMS_COUNT={self.subsystems_count}
+ NVMEOF_NAMESPACES_COUNT={self.namespaces_count}
NVMEOF_PORT={self.port}
NVMEOF_SRPORT={self.srport}
""")
log.info("[nvmeof]: executed set_gateway_cfg successfully!")
+class NvmeofThrasher(Thrasher, Greenlet):
+ """
+ How it works::
+
+ - pick a nvmeof daemon
+ - kill it
+ - wait for other thrashers to finish thrashing (if switch_thrashers True)
+ - sleep for 'revive_delay' seconds
+ - do some checks after thrashing ('do_checks' method)
+ - revive daemons
+ - wait for other thrashers to finish reviving (if switch_thrashers True)
+ - sleep for 'thrash_delay' seconds
+ - do some checks after reviving ('do_checks' method)
+
+
+ Options::
+
+ seed Seed to use on the RNG to reproduce a previous
+ behavior (default: None; i.e., not set)
+ checker_host: Initiator client on which verification tests would
+ run during thrashing (mandatory option)
+ switch_thrashers: Toggle this to switch between thrashers so it waits until all
+ thrashers are done thrashing before proceeding. And then
+ wait until all thrashers are done reviving before proceeding.
+ (default: false)
+ randomize: Enables randomization and use the max/min values. (default: true)
+ max_thrash: Maximum number of daemons that can be thrashed at a time.
+ (default: num_of_daemons-1, minimum of 1 daemon should be up)
+ min_thrash_delay: Minimum number of seconds to delay before thrashing again.
+ (default: 60)
+ max_thrash_delay: Maximum number of seconds to delay before thrashing again.
+ (default: min_thrash_delay + 30)
+ min_revive_delay: Minimum number of seconds to delay before bringing back a
+ thrashed daemon. (default: 100)
+ max_revive_delay: Maximum number of seconds to delay before bringing back a
+ thrashed daemon. (default: min_revive_delay + 30)
+
+ daemon_max_thrash_times:
+ For now, NVMeoF daemons have limitation that each daemon can
+ be thrashed only 3 times in span of 30 mins. This option
+ allows to set the amount of times it could be thrashed in a period
+ of time. (default: 3)
+ daemon_max_thrash_period:
+ This option goes with the above option. It sets the period of time
+ over which each daemons can be thrashed for daemon_max_thrash_times
+ amount of times. Time period in seconds. (default: 1800, i.e. 30mins)
+
+
+ For example::
+ tasks:
+ - nvmeof.thrash:
+ checker_host: 'client.3'
+ switch_thrashers: True
+
+ - mon_thrash:
+ switch_thrashers: True
+
+ - workunit:
+ clients:
+ client.3:
+ - rbd/nvmeof_fio_test.sh --rbd_iostat
+ env:
+ RBD_POOL: mypool
+ IOSTAT_INTERVAL: '10'
+
+ """
+ def __init__(self, ctx, config, daemons) -> None:
+ super(NvmeofThrasher, self).__init__()
+
+ if config is None:
+ self.config = dict()
+ self.config = config
+ self.ctx = ctx
+ self.daemons = daemons
+ self.logger = log.getChild('[nvmeof.thrasher]')
+ self.stopping = Event()
+ if self.config.get("switch_thrashers"):
+ self.switch_thrasher = Event()
+ self.checker_host = get_remote_for_role(self.ctx, self.config.get('checker_host'))
+ self.devices = self._get_devices(self.checker_host)
+
+ """ Random seed """
+ self.random_seed = self.config.get('seed', None)
+ if self.random_seed is None:
+ self.random_seed = int(time.time())
+
+ self.rng = random.Random()
+ self.rng.seed(int(self.random_seed))
+
+ """ Thrashing params """
+ self.randomize = bool(self.config.get('randomize', True))
+ self.max_thrash_daemons = int(self.config.get('max_thrash', len(self.daemons) - 1))
+
+ # Limits on thrashing each daemon
+ self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 3))
+ self.daemon_max_thrash_period = int(self.config.get('daemon_max_thrash_period', 30 * 60)) # seconds
+
+ self.min_thrash_delay = int(self.config.get('min_thrash_delay', 60))
+ self.max_thrash_delay = int(self.config.get('max_thrash_delay', self.min_thrash_delay + 30))
+ self.min_revive_delay = int(self.config.get('min_revive_delay', 100))
+ self.max_revive_delay = int(self.config.get('max_revive_delay', self.min_revive_delay + 30))
+
+ def _get_devices(self, remote):
+ GET_DEVICE_CMD = "sudo nvme list --output-format=json | " \
+ "jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == \"Ceph bdev Controller\") | .DevicePath'"
+ devices = remote.sh(GET_DEVICE_CMD).split()
+ return devices
+
+ def log(self, x):
+ self.logger.info(x)
+
+ def _run(self): # overriding
+ try:
+ self.do_thrash()
+ except Exception as e:
+ self.set_thrasher_exception(e)
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+ # The DaemonWatchdog will observe the error and tear down the test.
+
+ def stop(self):
+ self.stopping.set()
+
+ def do_checks(self):
+ """
+ Run some checks to see if everything is running well during thrashing.
+ """
+ self.log('display and verify stats:')
+ for d in self.daemons:
+ d.remote.sh(d.status_cmd, check_status=False)
+ check_cmd = [
+ 'ceph', 'orch', 'ls',
+ run.Raw('&&'), 'ceph', 'orch', 'ps', '--daemon-type', 'nvmeof',
+ run.Raw('&&'), 'ceph', 'health', 'detail',
+ run.Raw('&&'), 'ceph', '-s',
+ ]
+ for dev in self.devices:
+ check_cmd += [
+ run.Raw('&&'), 'sudo', 'nvme', 'list-subsys', dev,
+ run.Raw('|'), 'grep', 'live optimized'
+ ]
+ self.checker_host.run(args=check_cmd).wait()
+
+ def switch_task(self):
+ """
+ Pause nvmeof thrasher till other thrashers are done with their iteration.
+ This method would help to sync between multiple thrashers, like:
+ 1. thrasher-1 and thrasher-2: thrash daemons in parallel
+ 2. thrasher-1 and thrasher-2: revive daemons in parallel
+ This allows us to run some checks after each thrashing and reviving iteration.
+ """
+ if not hasattr(self, 'switch_thrasher'):
+ return
+ self.switch_thrasher.set()
+ thrashers = self.ctx.ceph[self.config.get('cluster')].thrashers
+ for t in thrashers:
+ if not isinstance(t, NvmeofThrasher) and hasattr(t, 'switch_thrasher') and (
+ isinstance(t.stopping, Event) and not t.stopping.is_set()
+ ):
+ other_thrasher = t
+ self.log('switch_task: waiting for other thrasher')
+ other_thrasher.switch_thrasher.wait(300)
+ self.log('switch_task: done waiting for the other thrasher')
+ other_thrasher.switch_thrasher.clear()
+
+ def do_thrash(self):
+ self.log('start thrashing')
+ self.log(f'seed: {self.random_seed}, , '\
+ f'max thrash delay: {self.max_thrash_delay}, min thrash delay: {self.min_thrash_delay} '\
+ f'max revive delay: {self.max_revive_delay}, min revive delay: {self.min_revive_delay} '\
+ f'daemons: {len(self.daemons)} '\
+ )
+ daemons_thrash_history = defaultdict(list)
+ summary = []
+
+ while not self.stopping.is_set():
+ killed_daemons = []
+
+ weight = 1.0 / len(self.daemons)
+ count = 0
+ for daemon in self.daemons:
+ skip = self.rng.uniform(0.0, 1.0)
+ if weight <= skip:
+ self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format(
+ label=daemon.id_, skip=skip, weight=weight))
+ continue
+
+ # For now, nvmeof daemons can only be thrashed 3 times in last 30mins.
+ # Skip thrashing if daemon was thrashed <daemon_max_thrash_times>
+ # times in last <daemon_max_thrash_period> seconds.
+ thrashed_history = daemons_thrash_history.get(daemon.id_, [])
+ history_ptr = len(thrashed_history) - self.daemon_max_thrash_times
+ if history_ptr >= 0:
+ ptr_timestamp = thrashed_history[history_ptr]
+ current_timestamp = datetime.now()
+ if (current_timestamp - ptr_timestamp).total_seconds() < self.daemon_max_thrash_period:
+ self.log(f'skipping daemon {daemon.id_}: thrashed total {len(thrashed_history)} times, '\
+ f'can only thrash {self.daemon_max_thrash_times} times '\
+ f'in {self.daemon_max_thrash_period} seconds.')
+ continue
+
+ self.log('kill {label}'.format(label=daemon.id_))
+ daemon.stop()
+
+ killed_daemons.append(daemon)
+ daemons_thrash_history[daemon.id_] += [datetime.now()]
+
+ # only thrash max_thrash_daemons amount of daemons
+ count += 1
+ if count >= self.max_thrash_daemons:
+ break
+
+ if killed_daemons:
+ summary += ["killed: " + ", ".join([d.id_ for d in killed_daemons])]
+ # delay before reviving
+ revive_delay = self.min_revive_delay
+ if self.randomize:
+ revive_delay = random.randrange(self.min_revive_delay, self.max_revive_delay)
+
+ self.log(f'waiting for {revive_delay} secs before reviving')
+ time.sleep(revive_delay) # blocking wait
+ self.log('done waiting before reviving')
+
+ self.do_checks()
+ self.switch_task()
+
+ # revive after thrashing
+ for daemon in killed_daemons:
+ self.log('reviving {label}'.format(label=daemon.id_))
+ daemon.restart()
+
+ # delay before thrashing
+ thrash_delay = self.min_thrash_delay
+ if self.randomize:
+ thrash_delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay)
+ if thrash_delay > 0.0:
+ self.log(f'waiting for {thrash_delay} secs before thrashing')
+ time.sleep(thrash_delay) # blocking
+ self.log('done waiting before thrashing')
+
+ self.do_checks()
+ self.switch_task()
+ self.log("Thrasher summary: ")
+ for daemon in daemons_thrash_history:
+ self.log(f'{daemon} was thrashed {len(daemons_thrash_history[daemon])} times')
+ for index, string in enumerate(summary):
+ self.log(f"Iteration {index}: {string}")
+
+class ThrashTest(Nvmeof):
+ name = 'nvmeof.thrash'
+ def setup(self):
+ if self.config is None:
+ self.config = {}
+ assert isinstance(self.config, dict), \
+ 'nvmeof.thrash task only accepts a dict for configuration'
+
+ self.cluster = self.config['cluster'] = self.config.get('cluster', 'ceph')
+ daemons = list(self.ctx.daemons.iter_daemons_of_role('nvmeof', self.cluster))
+ assert len(daemons) > 1, \
+ 'nvmeof.thrash task requires at least 2 nvmeof daemon'
+ self.thrasher = NvmeofThrasher(self.ctx, self.config, daemons)
+
+ def begin(self):
+ self.thrasher.start()
+ self.ctx.ceph[self.cluster].thrashers.append(self.thrasher)
+
+ def end(self):
+ log.info('joining nvmeof.thrash')
+ self.thrasher.stop()
+ if self.thrasher.exception is not None:
+ raise RuntimeError('error during thrashing')
+ self.thrasher.join()
+ log.info('done joining')
+
+
task = Nvmeof
+thrash = ThrashTest