REGEX "\\.gitignore" EXCLUDE
REGEX ".*\\.pyi" EXCLUDE
REGEX "hello/.*" EXCLUDE
+ REGEX "cli_api/.*" EXCLUDE
REGEX "tests/.*" EXCLUDE
REGEX "rook/rook-client-python.*" EXCLUDE
REGEX "osd_perf_query/.*" EXCLUDE
-import os
-from .module import CLI # noqa # pylint: disable=unused-import
+from .module import CLI
+__all__ = [
+ "CLI",
+]
+import os
if 'UNITTEST' in os.environ:
import tests # noqa # pylint: disable=unused-import
+ __all__.append(tests.__name__)
-import json
+import concurrent.futures
+import functools
+import inspect
import logging
-import threading
import time
-from functools import partial
-from queue import Queue
+import errno
+from typing import Any, Callable, Dict, List
-from mgr_module import CLICommand, HandleCommandResult, MgrModule
+from mgr_module import MgrModule, HandleCommandResult, CLICommand, API
logger = logging.getLogger()
+get_time = time.perf_counter
-class CLI(MgrModule):
-
- @CLICommand('mgr api get')
- def api_get(self, arg: str):
- '''
- Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
- :param str data_name: Valid things to fetch are osd_crush_map_text,
- osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
- osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
- health, mon_status, devices, device <devid>, pg_stats,
- pool_stats, pg_ready, osd_ping_times.
- Note:
- All these structures have their own JSON representations: experiment
- or look at the C++ ``dump()`` methods to learn about them.
- '''
- t1_start = time.time()
- str_arg = self.get(arg)
- t1_end = time.time()
- time_final = (t1_end - t1_start)
- return HandleCommandResult(0, json.dumps(str_arg), str(time_final))
-
- @CLICommand('mgr api benchmark get')
- def api_get_benchmark(self, arg: str, number_of_total_calls: int,
- number_of_parallel_calls: int):
- benchmark_runner = ThreadedBenchmarkRunner(number_of_total_calls, number_of_parallel_calls)
- benchmark_runner.start(partial(self.get, arg))
- benchmark_runner.join()
- stats = benchmark_runner.get_stats()
- return HandleCommandResult(0, json.dumps(stats), "")
-
-
-class ThreadedBenchmarkRunner:
- def __init__(self, number_of_total_calls, number_of_parallel_calls):
- self._number_of_parallel_calls = number_of_parallel_calls
- self._number_of_total_calls = number_of_total_calls
- self._threads = []
- self._jobs: Queue = Queue()
- self._time = 0.0
- self._self_time = []
- self._lock = threading.Lock()
-
- def start(self, func):
- if(self._number_of_total_calls and self._number_of_parallel_calls):
- for thread_id in range(self._number_of_parallel_calls):
- new_thread = threading.Thread(target=ThreadedBenchmarkRunner.timer,
- args=(self, self._jobs, func,))
- self._threads.append(new_thread)
- for job_id in range(self._number_of_total_calls):
- self._jobs.put(job_id)
- for thread in self._threads:
- thread.start()
+def pretty_json(obj: Any) -> Any:
+ import json
+ return json.dumps(obj, sort_keys=True, indent=2)
+
+
+class CephCommander:
+ """
+ Utility class to inspect Python functions and generate corresponding
+ CephCommand signatures (see src/mon/MonCommand.h for details)
+ """
+
+ def __init__(self, func: Callable):
+ self.func = func
+ self.signature = inspect.signature(func)
+ self.params = self.signature.parameters
+
+ def to_ceph_signature(self) -> Dict[str, str]:
+ """
+ Generate CephCommand signature (dict-like)
+ """
+ return {
+ 'prefix': f'mgr cli {self.func.__name__}',
+ 'perm': API.perm.get(self.func)
+ }
+
+
+class MgrAPIReflector(type):
+ """
+ Metaclass to register COMMANDS and Command Handlers via CLICommand
+ decorator
+ """
+
+ def __new__(cls, name, bases, dct): # type: ignore
+ klass = super().__new__(cls, name, bases, dct)
+ cls.threaded_benchmark_runner = None
+ for base in bases:
+ for name, func in inspect.getmembers(base, cls.is_public):
+ # However not necessary (CLICommand uses a registry)
+ # save functions to klass._cli_{n}() methods. This
+ # can help on unit testing
+ wrapper = cls.func_wrapper(func)
+ command = CLICommand(**CephCommander(func).to_ceph_signature())( # type: ignore
+ wrapper)
+ setattr(
+ klass,
+ f'_cli_{name}',
+ command)
+ return klass
+
+ @staticmethod
+ def is_public(func: Callable) -> bool:
+ return (
+ inspect.isfunction(func)
+ and not func.__name__.startswith('_')
+ and API.expose.get(func)
+ )
+
+ @staticmethod
+ def func_wrapper(func: Callable) -> Callable:
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs) -> HandleCommandResult: # type: ignore
+ return HandleCommandResult(stdout=pretty_json(
+ func(self, *args, **kwargs)))
+
+ # functools doesn't change the signature when wrapping a function
+ # so we do it manually
+ signature = inspect.signature(func)
+ wrapper.__signature__ = signature # type: ignore
+ return wrapper
+
+
+class CLI(MgrModule, metaclass=MgrAPIReflector):
+ @CLICommand('mgr cli_benchmark')
+ def benchmark(self, iterations: int, threads: int, func_name: str,
+ func_args: List[str] = None) -> HandleCommandResult: # type: ignore
+ func_args = () if func_args is None else func_args
+ if iterations and threads:
+ try:
+ func = getattr(self, func_name)
+ except AttributeError:
+ return HandleCommandResult(errno.EINVAL,
+ stderr="Could not find the public "
+ "function you are requesting")
else:
- raise BenchmarkException("Number of Total and number of parallel calls must be greater than 0")
+ raise BenchmarkException("Number of calls and number "
+ "of parallel calls must be greater than 0")
+
+ def timer(*args: Any) -> float:
+ time_start = get_time()
+ func(*func_args)
+ return get_time() - time_start
- def join(self):
- for thread in self._threads:
- thread.join()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
+ results_iter = executor.map(timer, range(iterations))
+ results = list(results_iter)
- def get_stats(self):
stats = {
- "avg": (self._time / self._number_of_total_calls),
- "min": min(self._self_time),
- "max": max(self._self_time)
+ "avg": sum(results) / len(results),
+ "max": max(results),
+ "min": min(results),
}
- return stats
-
- def timer(self, jobs, func):
- self._lock.acquire()
- while not self._jobs.empty():
- jobs.get()
- time_start = time.time()
- func()
- time_end = time.time()
- self._self_time.append(time_end - time_start)
- self._time += (time_end - time_start)
- self._jobs.task_done()
- self._lock.release()
+ return HandleCommandResult(stdout=pretty_json(stats))
class BenchmarkException(Exception):
--- /dev/null
+import unittest
+
+from ..module import CLI, BenchmarkException, HandleCommandResult
+
+
+class BenchmarkRunnerTest(unittest.TestCase):
+ def setUp(self):
+ self.cli = CLI('CLI', 0, 0)
+
+ def test_number_of_calls_on_start_fails(self):
+ with self.assertRaises(BenchmarkException) as ctx:
+ self.cli.benchmark(0, 10, 'list_servers', [])
+ self.assertEqual(str(ctx.exception),
+ "Number of calls and number "
+ "of parallel calls must be greater than 0")
+
+ def test_number_of_parallel_calls_on_start_fails(self):
+ with self.assertRaises(BenchmarkException) as ctx:
+ self.cli.benchmark(100, 0, 'list_servers', [])
+ self.assertEqual(str(ctx.exception),
+ "Number of calls and number "
+ "of parallel calls must be greater than 0")
+
+ def test_number_of_parallel_calls_on_start_works(self):
+ CLI.benchmark(10, 10, "get", "osd_map")
+
+ def test_function_name_fails(self):
+ for iterations in [0, 1]:
+ threads = 0 if iterations else 1
+ with self.assertRaises(BenchmarkException) as ctx:
+ self.cli.benchmark(iterations, threads, 'fake_method', [])
+ self.assertEqual(str(ctx.exception),
+ "Number of calls and number "
+ "of parallel calls must be greater than 0")
+ result: HandleCommandResult = self.cli.benchmark(1, 1, 'fake_method', [])
+ self.assertEqual(result.stderr, "Could not find the public "
+ "function you are requesting")
+
+ def test_function_name_works(self):
+ CLI.benchmark(10, 10, "get", "osd_map")
+++ /dev/null
-import unittest
-
-from ..module import ThreadedBenchmarkRunner, BenchmarkException
-
-
-class ThreadedBenchmarkRunnerTest(unittest.TestCase):
- def test_number_of_calls_on_start_fails(self):
- class_threadbenchmarkrunner = ThreadedBenchmarkRunner(0, 10)
- with self.assertRaises(BenchmarkException):
- class_threadbenchmarkrunner.start(None)
-
- def test_number_of_parallel_calls_on_start_fails(self):
- class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 0)
- with self.assertRaises(BenchmarkException):
- class_threadbenchmarkrunner.start(None)
-
- def test_number_of_parallel_calls_on_start_works(self):
- class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 10)
-
- def dummy_function():
- pass
- class_threadbenchmarkrunner.start(dummy_function)
- assert len(class_threadbenchmarkrunner._self_time) > 0
- assert sum(class_threadbenchmarkrunner._self_time) > 0
-
- def test_get_stats_works(self):
- class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 10)
-
- def dummy_function():
- for i in range(10):
- pass
- class_threadbenchmarkrunner.start(dummy_function)
- stats = class_threadbenchmarkrunner.get_stats()
- assert stats['avg'] > 0
- assert stats['min'] > 0
- assert stats['max'] > 0
PerfCounterT = Dict[str, Any]
+class API:
+ def DecoratorFactory(attr: str, default: Any): # type: ignore
+ class DecoratorClass:
+ _ATTR_TOKEN = f'__ATTR_{attr.upper()}__'
+
+ def __init__(self, value: Any=default) -> None:
+ self.value = value
+
+ def __call__(self, func: Callable) -> Any:
+ setattr(func, self._ATTR_TOKEN, self.value)
+ return func
+
+ @classmethod
+ def get(cls, func: Callable) -> Any:
+ return getattr(func, cls._ATTR_TOKEN, default)
+
+ return DecoratorClass
+
+ perm = DecoratorFactory('perm', default='r')
+ expose = DecoratorFactory('expose', default=False)(True)
+
+
class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
COMMANDS = [] # type: List[Any]
MODULE_OPTIONS: List[Option] = []
"""
return self._ceph_get_release_name()
+ @API.expose
def lookup_release_name(self, major: int) -> str:
return self._ceph_lookup_release_name(major)
self._rados.shutdown()
self._ceph_unregister_client(addrs)
- def get(self, data_name: str):
+ @API.expose
+ def get(self, data_name: str) -> Any:
"""
Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
return ret
- def get_server(self, hostname) -> ServerInfoT:
+ @API.expose
+ def get_server(self, hostname: str) -> ServerInfoT:
"""
Called by the plugin to fetch metadata about a particular hostname from
ceph-mgr.
"""
return cast(ServerInfoT, self._ceph_get_server(hostname))
+ @API.expose
def get_perf_schema(self,
svc_type: str,
svc_name: str) -> Dict[str,
"""
return self._ceph_get_perf_schema(svc_type, svc_name)
+ @API.expose
def get_counter(self,
svc_type: str,
svc_name: str,
"""
return self._ceph_get_counter(svc_type, svc_name, path)
+ @API.expose
def get_latest_counter(self,
svc_type: str,
svc_name: str,
"""
return self._ceph_get_latest_counter(svc_type, svc_name, path)
+ @API.expose
def list_servers(self) -> List[ServerInfoT]:
"""
Like ``get_server``, but gives information about all servers (i.e. all
return default
return metadata
+ @API.expose
def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
"""
Fetch the latest status for a particular service daemon.
"""
return self._ceph_get_mgr_id()
+ @API.expose
def get_ceph_conf_path(self) -> str:
return self._ceph_get_ceph_conf_path()
+ @API.expose
def get_mgr_ip(self) -> str:
ips = self.get("mgr_ips").get('ips', [])
if not ips:
return socket.gethostname()
return ips[0]
+ @API.expose
def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
+ @API.expose
def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
return self._ceph_get_foreign_option(entity, key)
r = self._ceph_get_module_option(module, key)
return default if r is None else r
+ @API.expose
def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
"""
Retrieve a dict of KV store keys to values, where the keys
self._validate_module_option(key)
return self._ceph_set_module_option(module, key, str(val))
+ @API.perm('w')
+ @API.expose
def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
"""
Set localized configuration for this ceph-mgr instance
self._validate_module_option(key)
return self._set_localized(key, val, self._set_module_option)
+ @API.perm('w')
+ @API.expose
def set_store(self, key: str, val: Optional[str]) -> None:
"""
Set a value in this module's persistent key value store.
"""
self._ceph_set_store(key, val)
+ @API.expose
def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a value from this module's persistent key value store
else:
return r
+ @API.expose
def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
if r is None:
r = default
return r
+ @API.perm('w')
+ @API.expose
def set_localized_store(self, key: str, val: Optional[str]) -> None:
return self._set_localized(key, val, self.set_store)
"""
return cast(OSDMap, self._ceph_get_osdmap())
+ @API.expose
def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
else:
return 0
+ @API.expose
def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
else:
return 0, 0
+ @API.expose
@profile_method()
def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
services: Sequence[str] = ("mds", "mon", "osd",
return result
+ @API.expose
def set_uri(self, uri: str) -> None:
"""
If the module exposes a service, then call this to publish the
"""
return self._ceph_set_uri(uri)
+ @API.perm('w')
+ @API.expose
def set_device_wear_level(self, devid: str, wear_level: float) -> None:
return self._ceph_set_device_wear_level(devid, wear_level)
+ @API.expose
def have_mon_connection(self) -> bool:
"""
Check whether this ceph-mgr daemon has an open connection
add_to_ceph_s: bool) -> None:
return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
+ @API.perm('w')
+ @API.expose
def complete_progress_event(self, evid: str) -> None:
return self._ceph_complete_progress_event(evid)
+ @API.perm('w')
+ @API.expose
def clear_all_progress_events(self) -> None:
return self._ceph_clear_all_progress_events()
return True, ""
+ @API.expose
def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
"""
Invoke a method on another module. All arguments, and the return
"""
return self._ceph_add_osd_perf_query(query)
+ @API.perm('w')
+ @API.expose
def remove_osd_perf_query(self, query_id: int) -> None:
"""
Unregister an OSD perf query.
"""
return self._ceph_remove_osd_perf_query(query_id)
+ @API.expose
def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
"""
Get stats collected for an OSD perf query.
"""
return self._ceph_add_mds_perf_query(query)
+ @API.perm('w')
+ @API.expose
def remove_mds_perf_query(self, query_id: int) -> None:
"""
Unregister an MDS perf query.
"""
return self._ceph_remove_mds_perf_query(query_id)
+ @API.expose
def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
"""
Get stats collected for an MDS perf query.
"""
return self._ceph_is_authorized(arguments)
+ @API.expose
def send_rgwadmin_command(self, args: List[str],
stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
try: