from mgr_module import MgrModule, CommandResult, CLICommand, Option
-import errno
+import enum
import json
import random
import sys
from typing import List, Optional, Tuple
+# These workloads are things that can be requested to run inside the
+# serve() function
+class Workload(enum.Enum):
+ COMMAND_SPAM = 'command_spam'
+ THROW_EXCEPTION = 'throw_exception'
+ SHUTDOWN = 'shutdown'
+
+
class Module(MgrModule):
"""
This module is for testing the ceph-mgr python interface from within
activities in its serve() thread.
"""
- # These workloads are things that can be requested to run inside the
- # serve() function
- WORKLOAD_COMMAND_SPAM = "command_spam"
- WORKLOAD_THROW_EXCEPTION = "throw_exception"
- SHUTDOWN = "shutdown"
-
- WORKLOADS = (WORKLOAD_COMMAND_SPAM, WORKLOAD_THROW_EXCEPTION)
-
# The test code in qa/ relies on these options existing -- they
# are of course not really used for anything in the module
MODULE_OPTIONS = [
return 0, '', 'Self-test succeeded'
@CLICommand('mgr self-test background start')
- def backgroun_start(self, workload: str) -> Tuple[int, str, str]:
+ def backgroun_start(self, workload: Workload) -> Tuple[int, str, str]:
'''
Activate a background workload (one of command_spam, throw_exception)
'''
assert False, repr(what)
def shutdown(self):
- self._workload = self.SHUTDOWN
+ self._workload = Workload.SHUTDOWN
self._event.set()
def _command_spam(self):
def serve(self):
while True:
- if self._workload == self.WORKLOAD_COMMAND_SPAM:
+ if self._workload == Workload.COMMAND_SPAM:
self._command_spam()
- elif self._workload == self.SHUTDOWN:
+ elif self._workload == Workload.SHUTDOWN:
self.log.info("Shutting down...")
break
- elif self._workload == self.WORKLOAD_THROW_EXCEPTION:
+ elif self._workload == Workload.THROW_EXCEPTION:
raise RuntimeError("Synthetic exception in serve")
else:
self.log.info("Waiting for workload request...")