From: Waad AlKhoury Date: Wed, 24 Nov 2021 09:12:41 +0000 (+0100) Subject: mgr/cli Redo cli api mgr module. X-Git-Tag: v17.1.0~102^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=12ff5ec9b72a85f6b7723f4879b844467c75d32a;p=ceph.git mgr/cli Redo cli api mgr module. Signed-off-by: Waad AlKhoury Signed-off-by: Pere Diaz Bou --- diff --git a/src/pybind/mgr/cli_api/__init__.py b/src/pybind/mgr/cli_api/__init__.py index 6b71ccbef68..a52284054d2 100644 --- a/src/pybind/mgr/cli_api/__init__.py +++ b/src/pybind/mgr/cli_api/__init__.py @@ -1,6 +1,10 @@ -import os -from .module import CLI # noqa # pylint: disable=unused-import +from .module import CLI +__all__ = [ + "CLI", +] +import os if 'UNITTEST' in os.environ: import tests # noqa # pylint: disable=unused-import + __all__.append(tests.__name__) diff --git a/src/pybind/mgr/cli_api/module.py b/src/pybind/mgr/cli_api/module.py index 527b8930c67..79b042eb0e9 100755 --- a/src/pybind/mgr/cli_api/module.py +++ b/src/pybind/mgr/cli_api/module.py @@ -1,92 +1,119 @@ -import json +import concurrent.futures +import functools +import inspect import logging -import threading import time -from functools import partial -from queue import Queue +import errno +from typing import Any, Callable, Dict, List -from mgr_module import CLICommand, HandleCommandResult, MgrModule +from mgr_module import MgrModule, HandleCommandResult, CLICommand, API logger = logging.getLogger() +get_time = time.perf_counter -class CLI(MgrModule): - - @CLICommand('mgr api get') - def api_get(self, arg: str): - ''' - Called by the plugin to fetch named cluster-wide objects from ceph-mgr. - :param str data_name: Valid things to fetch are osd_crush_map_text, - osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map, - osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats, - health, mon_status, devices, device , pg_stats, - pool_stats, pg_ready, osd_ping_times. - Note: - All these structures have their own JSON representations: experiment - or look at the C++ ``dump()`` methods to learn about them. - ''' - t1_start = time.time() - str_arg = self.get(arg) - t1_end = time.time() - time_final = (t1_end - t1_start) - return HandleCommandResult(0, json.dumps(str_arg), str(time_final)) - - @CLICommand('mgr api benchmark get') - def api_get_benchmark(self, arg: str, number_of_total_calls: int, - number_of_parallel_calls: int): - benchmark_runner = ThreadedBenchmarkRunner(number_of_total_calls, number_of_parallel_calls) - benchmark_runner.start(partial(self.get, arg)) - benchmark_runner.join() - stats = benchmark_runner.get_stats() - return HandleCommandResult(0, json.dumps(stats), "") - - -class ThreadedBenchmarkRunner: - def __init__(self, number_of_total_calls, number_of_parallel_calls): - self._number_of_parallel_calls = number_of_parallel_calls - self._number_of_total_calls = number_of_total_calls - self._threads = [] - self._jobs: Queue = Queue() - self._time = 0.0 - self._self_time = [] - self._lock = threading.Lock() - - def start(self, func): - if(self._number_of_total_calls and self._number_of_parallel_calls): - for thread_id in range(self._number_of_parallel_calls): - new_thread = threading.Thread(target=ThreadedBenchmarkRunner.timer, - args=(self, self._jobs, func,)) - self._threads.append(new_thread) - for job_id in range(self._number_of_total_calls): - self._jobs.put(job_id) - for thread in self._threads: - thread.start() +def pretty_json(obj: Any) -> Any: + import json + return json.dumps(obj, sort_keys=True, indent=2) + + +class CephCommander: + """ + Utility class to inspect Python functions and generate corresponding + CephCommand signatures (see src/mon/MonCommand.h for details) + """ + + def __init__(self, func: Callable): + self.func = func + self.signature = inspect.signature(func) + self.params = self.signature.parameters + + def to_ceph_signature(self) -> Dict[str, str]: + """ + Generate CephCommand signature (dict-like) + """ + return { + 'prefix': f'mgr cli {self.func.__name__}', + 'perm': API.perm.get(self.func) + } + + +class MgrAPIReflector(type): + """ + Metaclass to register COMMANDS and Command Handlers via CLICommand + decorator + """ + + def __new__(cls, name, bases, dct): # type: ignore + klass = super().__new__(cls, name, bases, dct) + cls.threaded_benchmark_runner = None + for base in bases: + for name, func in inspect.getmembers(base, cls.is_public): + # However not necessary (CLICommand uses a registry) + # save functions to klass._cli_{n}() methods. This + # can help on unit testing + wrapper = cls.func_wrapper(func) + command = CLICommand(**CephCommander(func).to_ceph_signature())( # type: ignore + wrapper) + setattr( + klass, + f'_cli_{name}', + command) + return klass + + @staticmethod + def is_public(func: Callable) -> bool: + return ( + inspect.isfunction(func) + and not func.__name__.startswith('_') + and API.expose.get(func) + ) + + @staticmethod + def func_wrapper(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(self, *args, **kwargs) -> HandleCommandResult: # type: ignore + return HandleCommandResult(stdout=pretty_json( + func(self, *args, **kwargs))) + + # functools doesn't change the signature when wrapping a function + # so we do it manually + signature = inspect.signature(func) + wrapper.__signature__ = signature # type: ignore + return wrapper + + +class CLI(MgrModule, metaclass=MgrAPIReflector): + @CLICommand('mgr cli_benchmark') + def benchmark(self, iterations: int, threads: int, func_name: str, + func_args: List[str] = None) -> HandleCommandResult: # type: ignore + func_args = () if func_args is None else func_args + if iterations and threads: + try: + func = getattr(self, func_name) + except AttributeError: + return HandleCommandResult(errno.EINVAL, + stderr="Could not find the public " + "function you are requesting") else: - raise BenchmarkException("Number of Total and number of parallel calls must be greater than 0") + raise BenchmarkException("Number of calls and number " + "of parallel calls must be greater than 0") + + def timer(*args: Any) -> float: + time_start = get_time() + func(*func_args) + return get_time() - time_start - def join(self): - for thread in self._threads: - thread.join() + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + results_iter = executor.map(timer, range(iterations)) + results = list(results_iter) - def get_stats(self): stats = { - "avg": (self._time / self._number_of_total_calls), - "min": min(self._self_time), - "max": max(self._self_time) + "avg": sum(results) / len(results), + "max": max(results), + "min": min(results), } - return stats - - def timer(self, jobs, func): - self._lock.acquire() - while not self._jobs.empty(): - jobs.get() - time_start = time.time() - func() - time_end = time.time() - self._self_time.append(time_end - time_start) - self._time += (time_end - time_start) - self._jobs.task_done() - self._lock.release() + return HandleCommandResult(stdout=pretty_json(stats)) class BenchmarkException(Exception): diff --git a/src/pybind/mgr/cli_api/tests/test_cli_api.py b/src/pybind/mgr/cli_api/tests/test_cli_api.py new file mode 100644 index 00000000000..ee42dc96a60 --- /dev/null +++ b/src/pybind/mgr/cli_api/tests/test_cli_api.py @@ -0,0 +1,40 @@ +import unittest + +from ..module import CLI, BenchmarkException, HandleCommandResult + + +class BenchmarkRunnerTest(unittest.TestCase): + def setUp(self): + self.cli = CLI('CLI', 0, 0) + + def test_number_of_calls_on_start_fails(self): + with self.assertRaises(BenchmarkException) as ctx: + self.cli.benchmark(0, 10, 'list_servers', []) + self.assertEqual(str(ctx.exception), + "Number of calls and number " + "of parallel calls must be greater than 0") + + def test_number_of_parallel_calls_on_start_fails(self): + with self.assertRaises(BenchmarkException) as ctx: + self.cli.benchmark(100, 0, 'list_servers', []) + self.assertEqual(str(ctx.exception), + "Number of calls and number " + "of parallel calls must be greater than 0") + + def test_number_of_parallel_calls_on_start_works(self): + CLI.benchmark(10, 10, "get", "osd_map") + + def test_function_name_fails(self): + for iterations in [0, 1]: + threads = 0 if iterations else 1 + with self.assertRaises(BenchmarkException) as ctx: + self.cli.benchmark(iterations, threads, 'fake_method', []) + self.assertEqual(str(ctx.exception), + "Number of calls and number " + "of parallel calls must be greater than 0") + result: HandleCommandResult = self.cli.benchmark(1, 1, 'fake_method', []) + self.assertEqual(result.stderr, "Could not find the public " + "function you are requesting") + + def test_function_name_works(self): + CLI.benchmark(10, 10, "get", "osd_map") diff --git a/src/pybind/mgr/cli_api/tests/test_cliapi.py b/src/pybind/mgr/cli_api/tests/test_cliapi.py deleted file mode 100644 index 2136161c9d5..00000000000 --- a/src/pybind/mgr/cli_api/tests/test_cliapi.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest - -from ..module import ThreadedBenchmarkRunner, BenchmarkException - - -class ThreadedBenchmarkRunnerTest(unittest.TestCase): - def test_number_of_calls_on_start_fails(self): - class_threadbenchmarkrunner = ThreadedBenchmarkRunner(0, 10) - with self.assertRaises(BenchmarkException): - class_threadbenchmarkrunner.start(None) - - def test_number_of_parallel_calls_on_start_fails(self): - class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 0) - with self.assertRaises(BenchmarkException): - class_threadbenchmarkrunner.start(None) - - def test_number_of_parallel_calls_on_start_works(self): - class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 10) - - def dummy_function(): - pass - class_threadbenchmarkrunner.start(dummy_function) - assert len(class_threadbenchmarkrunner._self_time) > 0 - assert sum(class_threadbenchmarkrunner._self_time) > 0 - - def test_get_stats_works(self): - class_threadbenchmarkrunner = ThreadedBenchmarkRunner(10, 10) - - def dummy_function(): - for i in range(10): - pass - class_threadbenchmarkrunner.start(dummy_function) - stats = class_threadbenchmarkrunner.get_stats() - assert stats['avg'] > 0 - assert stats['min'] > 0 - assert stats['max'] > 0 diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 33019666273..391f81f0f01 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -874,6 +874,28 @@ ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]] PerfCounterT = Dict[str, Any] +class API: + def DecoratorFactory(attr: str, default: Any): # type: ignore + class DecoratorClass: + _ATTR_TOKEN = f'__ATTR_{attr.upper()}__' + + def __init__(self, value: Any=default) -> None: + self.value = value + + def __call__(self, func: Callable) -> Any: + setattr(func, self._ATTR_TOKEN, self.value) + return func + + @classmethod + def get(cls, func: Callable) -> Any: + return getattr(func, cls._ATTR_TOKEN, default) + + return DecoratorClass + + perm = DecoratorFactory('perm', default='r') + expose = DecoratorFactory('expose', default=False)(True) + + class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): MGR_POOL_NAME = ".mgr" @@ -998,10 +1020,12 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): def version(self) -> str: return self._version + @API.expose def pool_exists(self, name: str) -> bool: pools = [p['pool_name'] for p in self.get('osd_map')['pools']] return name in pools + @API.expose def have_enough_osds(self) -> bool: # wait until we have enough OSDs to allow the pool to be healthy ready = 0 @@ -1012,6 +1036,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): need = cast(int, self.get_ceph_option("osd_pool_default_size")) return ready >= need + @API.perm('w') + @API.expose def rename_pool(self, srcpool: str, destpool: str) -> None: c = { 'prefix': 'osd pool rename', @@ -1021,6 +1047,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): } self.check_mon_command(c) + @API.perm('w') + @API.expose def create_pool(self, pool: str) -> None: c = { 'prefix': 'osd pool create', @@ -1031,6 +1059,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): } self.check_mon_command(c) + @API.perm('w') + @API.expose def appify_pool(self, pool: str, app: str) -> None: c = { 'prefix': 'osd pool application enable', @@ -1041,6 +1071,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): } self.check_mon_command(c) + @API.perm('w') + @API.expose def create_mgr_pool(self) -> None: self.log.info("creating mgr pool") @@ -1081,6 +1113,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): with self._db_lock, self.db: self.db.execute(SQL, (key, value)) + @API.expose def get_kv(self, key: str) -> Any: SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;" @@ -1148,6 +1181,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self.configure_db(db) return db + @API.expose def db_ready(self) -> bool: with self._db_lock: try: @@ -1177,6 +1211,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_release_name() + @API.expose def lookup_release_name(self, major: int) -> str: return self._ceph_lookup_release_name(major) @@ -1260,6 +1295,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._rados.shutdown() self._ceph_unregister_client(addrs) + @API.expose def get(self, data_name: str) -> Any: """ Called by the plugin to fetch named cluster-wide objects from ceph-mgr. @@ -1386,6 +1422,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return ret + @API.expose def get_server(self, hostname: str) -> ServerInfoT: """ Called by the plugin to fetch metadata about a particular hostname from @@ -1398,6 +1435,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return cast(ServerInfoT, self._ceph_get_server(hostname)) + @API.expose def get_perf_schema(self, svc_type: str, svc_name: str) -> Dict[str, @@ -1413,6 +1451,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_perf_schema(svc_type, svc_name) + @API.expose def get_counter(self, svc_type: str, svc_name: str, @@ -1431,6 +1470,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_counter(svc_type, svc_name, path) + @API.expose def get_latest_counter(self, svc_type: str, svc_name: str, @@ -1450,6 +1490,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_latest_counter(svc_type, svc_name, path) + @API.expose def list_servers(self) -> List[ServerInfoT]: """ Like ``get_server``, but gives information about all servers (i.e. all @@ -1481,6 +1522,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return default return metadata + @API.expose def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]: """ Fetch the latest status for a particular service daemon. @@ -1678,9 +1720,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_mgr_id() + @API.expose def get_ceph_conf_path(self) -> str: return self._ceph_get_ceph_conf_path() + @API.expose def get_mgr_ip(self) -> str: if not self._mgr_ips: ips = self.get("mgr_ips").get('ips', []) @@ -1690,9 +1734,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): assert self._mgr_ips is not None return self._mgr_ips + @API.expose def get_ceph_option(self, key: str) -> OptionValue: return self._ceph_get_option(key) + @API.expose def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue: return self._ceph_get_foreign_option(entity, key) @@ -1742,6 +1788,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): r = self._ceph_get_module_option(module, key) return default if r is None else r + @API.expose def get_store_prefix(self, key_prefix: str) -> Dict[str, str]: """ Retrieve a dict of KV store keys to values, where the keys @@ -1793,6 +1840,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._ceph_set_module_option(module, key, str(val)) + @API.perm('w') + @API.expose def set_localized_module_option(self, key: str, val: Optional[str]) -> None: """ Set localized configuration for this ceph-mgr instance @@ -1803,6 +1852,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._set_localized(key, val, self._set_module_option) + @API.perm('w') + @API.expose def set_store(self, key: str, val: Optional[str]) -> None: """ Set a value in this module's persistent key value store. @@ -1810,6 +1861,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ self._ceph_set_store(key, val) + @API.expose def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]: """ Get a value from this module's persistent key value store @@ -1820,6 +1872,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): else: return r + @API.expose def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]: r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key)) if r is None: @@ -1828,6 +1881,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): r = default return r + @API.perm('w') + @API.expose def set_localized_store(self, key: str, val: Optional[str]) -> None: return self._set_localized(key, val, self.set_store) @@ -1853,6 +1908,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return cast(OSDMap, self._ceph_get_osdmap()) + @API.expose def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int: data = self.get_latest_counter( daemon_type, daemon_name, counter)[counter] @@ -1861,6 +1917,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): else: return 0 + @API.expose def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]: data = self.get_latest_counter( daemon_type, daemon_name, counter)[counter] @@ -1871,6 +1928,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): else: return 0, 0 + @API.expose @profile_method() def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL, services: Sequence[str] = ("mds", "mon", "osd", @@ -1943,6 +2001,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return result + @API.expose def set_uri(self, uri: str) -> None: """ If the module exposes a service, then call this to publish the @@ -1952,9 +2011,12 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_set_uri(uri) + @API.perm('w') + @API.expose def set_device_wear_level(self, devid: str, wear_level: float) -> None: return self._ceph_set_device_wear_level(devid, wear_level) + @API.expose def have_mon_connection(self) -> bool: """ Check whether this ceph-mgr daemon has an open connection @@ -1972,9 +2034,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): add_to_ceph_s: bool) -> None: return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s) + @API.perm('w') + @API.expose def complete_progress_event(self, evid: str) -> None: return self._ceph_complete_progress_event(evid) + @API.perm('w') + @API.expose def clear_all_progress_events(self) -> None: return self._ceph_clear_all_progress_events() @@ -2009,6 +2075,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return True, "" + @API.expose def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any: """ Invoke a method on another module. All arguments, and the return @@ -2063,6 +2130,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_add_osd_perf_query(query) + @API.perm('w') + @API.expose def remove_osd_perf_query(self, query_id: int) -> None: """ Unregister an OSD perf query. @@ -2071,6 +2140,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_remove_osd_perf_query(query_id) + @API.expose def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: """ Get stats collected for an OSD perf query. @@ -2108,6 +2178,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_add_mds_perf_query(query) + @API.perm('w') + @API.expose def remove_mds_perf_query(self, query_id: int) -> None: """ Unregister an MDS perf query. @@ -2116,6 +2188,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_remove_mds_perf_query(query_id) + @API.expose def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: """ Get stats collected for an MDS perf query. @@ -2135,6 +2208,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_is_authorized(arguments) + @API.expose def send_rgwadmin_command(self, args: List[str], stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]: try: