--- /dev/null
+from mgr_module import CLICommandBase
+
+AlertsCLICommand = CLICommandBase.make_registry_subtype("AlertsCLICommand")
import smtplib
import ssl
+from .cli import AlertsCLICommand
+
class Alerts(MgrModule):
+ CLICommand = AlertsCLICommand
MODULE_OPTIONS = [
Option(
name='interval',
self.get_ceph_option(opt))
self.log.debug(' native option %s = %s', opt, getattr(self, opt))
- @CLIReadCommand('alerts send')
+ @AlertsCLICommand.Read('alerts send')
def send(self) -> HandleCommandResult:
"""
(re)send alerts immediately
--- /dev/null
+from mgr_module import CLICommandBase
+
+BalancerCLICommand = CLICommandBase.make_registry_subtype("BalancerCLICommand")
from mgr_module import CRUSHMap
import datetime
+from .cli import BalancerCLICommand
+
TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
class Module(MgrModule):
+ CLICommand = BalancerCLICommand
MODULE_OPTIONS = [
Option(name='active',
type='bool',
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
- @CLIReadCommand('balancer status')
+ @BalancerCLICommand.Read('balancer status')
def show_status(self) -> Tuple[int, str, str]:
"""
Show balancer status
}
return (0, json.dumps(s, indent=4, sort_keys=True), '')
- @CLIReadCommand('balancer status detail')
+ @BalancerCLICommand.Read('balancer status detail')
def show_status_detail(self) -> Tuple[int, str, str]:
"""
Show balancer status (detailed)
}
return (0, json.dumps(s, indent=4, sort_keys=True), '')
- @CLICommand('balancer mode')
+ @BalancerCLICommand('balancer mode')
def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
"""
Set balancer mode
self.set_module_option('mode', mode.value)
return (0, '', '')
- @CLICommand('balancer on')
+ @BalancerCLICommand('balancer on')
def on(self) -> Tuple[int, str, str]:
"""
Enable automatic balancing
self.event.set()
return (0, '', '')
- @CLICommand('balancer off')
+ @BalancerCLICommand('balancer off')
def off(self) -> Tuple[int, str, str]:
"""
Disable automatic balancing
self.event.set()
return (0, '', '')
- @CLIReadCommand('balancer pool ls')
+ @BalancerCLICommand.Read('balancer pool ls')
def pool_ls(self) -> Tuple[int, str, str]:
"""
List automatic balancing pools
self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
- @CLICommand('balancer pool add')
+ @BalancerCLICommand('balancer pool add')
def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
"""
Enable automatic balancing for specific pools
self.set_module_option('pool_ids', ','.join(final))
return (0, '', '')
- @CLICommand('balancer pool rm')
+ @BalancerCLICommand('balancer pool rm')
def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
"""
Disable automatic balancing for specific pools
f'pool "{option}"')
return ms, pools
- @CLIReadCommand('balancer eval-verbose')
+ @BalancerCLICommand.Read('balancer eval-verbose')
def plan_eval_verbose(self, option: Optional[str] = None):
"""
Evaluate data distribution for the current cluster or specific pool or specific
except ValueError as e:
return (-errno.EINVAL, '', str(e))
- @CLIReadCommand('balancer eval')
+ @BalancerCLICommand.Read('balancer eval')
def plan_eval_brief(self, option: Optional[str] = None):
"""
Evaluate data distribution for the current cluster or specific pool or specific plan
except ValueError as e:
return (-errno.EINVAL, '', str(e))
- @CLIReadCommand('balancer optimize')
+ @BalancerCLICommand.Read('balancer optimize')
def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
"""
Run optimizer to create a new plan
self.optimize_result = detail
return (r, '', detail)
- @CLIReadCommand('balancer show')
+ @BalancerCLICommand.Read('balancer show')
def plan_show(self, plan: str) -> Tuple[int, str, str]:
"""
Show details of an optimization plan
return (-errno.ENOENT, '', f'plan {plan} not found')
return (0, plan_.show(), '')
- @CLICommand('balancer rm')
+ @BalancerCLICommand('balancer rm')
def plan_rm(self, plan: str) -> Tuple[int, str, str]:
"""
Discard an optimization plan
del self.plans[plan]
return (0, '', '')
- @CLICommand('balancer reset')
+ @BalancerCLICommand('balancer reset')
def plan_reset(self) -> Tuple[int, str, str]:
"""
Discard all optimization plans
self.plans = {}
return (0, '', '')
- @CLIReadCommand('balancer dump')
+ @BalancerCLICommand.Read('balancer dump')
def plan_dump(self, plan: str) -> Tuple[int, str, str]:
"""
Show an optimization plan
else:
return (0, plan_.dump(), '')
- @CLIReadCommand('balancer ls')
+ @BalancerCLICommand.Read('balancer ls')
def plan_ls(self) -> Tuple[int, str, str]:
"""
List all plans
"""
return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
- @CLIReadCommand('balancer execute')
+ @BalancerCLICommand.Read('balancer execute')
def plan_execute(self, plan: str) -> Tuple[int, str, str]:
"""
Execute an optimization plan
-from ..orchestrator.cli import OrchestratorCLICommandBase
+from orchestrator.cli import OrchestratorCLICommandBase
CephadmCLICommand = OrchestratorCLICommandBase.make_registry_subtype(
--- /dev/null
+from mgr_module import CLICommandBase
+
+CLIAPICLICommand = CLICommandBase.make_registry_subtype("CLIAPICLICommand")
import errno
from typing import Any, Callable, Dict, List
-from mgr_module import MgrModule, HandleCommandResult, CLICommand, API
+from .cli import CLIAPICLICommand
+
+from mgr_module import MgrModule, HandleCommandResult, API
logger = logging.getLogger()
get_time = time.perf_counter
# save functions to klass._cli_{n}() methods. This
# can help on unit testing
wrapper = cls.func_wrapper(func)
- command = CLICommand(**CephCommander(func).to_ceph_signature())( # type: ignore
+ command = CLIAPICLICommand(**CephCommander(func).to_ceph_signature())( # type: ignore
wrapper)
setattr(
klass,
class CLI(MgrModule, metaclass=MgrAPIReflector):
- @CLICommand('mgr cli_benchmark')
+ CLICommand = CLIAPICLICommand
+
+ @CLIAPICLICommand('mgr cli_benchmark')
def benchmark(self, iterations: int, threads: int, func_name: str,
func_args: List[str] = None) -> HandleCommandResult: # type: ignore
func_args = () if func_args is None else func_args
--- /dev/null
+from mgr_module import CLICommandBase
+
+CrashCLICommand = CLICommandBase.make_registry_subtype("CrashCLICommand")
from typing import cast, Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple, TypeVar, \
Union, TYPE_CHECKING
+from .cli import CrashCLICommand
+
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
OLD_DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
class Module(MgrModule):
+ CLICommand = CrashCLICommand
MODULE_OPTIONS = [
Option(
name='warn_recent_interval',
# command handlers
- @CLIReadCommand('crash info')
+ @CrashCLICommand.Read('crash info')
@with_crashes
def do_info(self, id: str) -> Tuple[int, str, str]:
"""
val = json.dumps(crash, indent=4, sort_keys=True)
return 0, val, ''
- @CLICommand('crash post')
+ @CrashCLICommand('crash post')
def do_post(self, inbuf: str) -> Tuple[int, str, str]:
"""
Add a crash dump (use -i <jsonfile>)
'' if 'archived' in c else '*'])
return 0, table.get_string(), ''
- @CLIReadCommand('crash ls')
+ @CrashCLICommand.Read('crash ls')
@with_crashes
def do_ls_all(self, format: Optional[str] = None) -> Tuple[int, str, str]:
"""
assert self.crashes is not None
return self._do_ls(self.crashes.values(), format)
- @CLIReadCommand('crash ls-new')
+ @CrashCLICommand.Read('crash ls-new')
@with_crashes
def do_ls_new(self, format: Optional[str] = None) -> Tuple[int, str, str]:
"""
if 'archived' not in crash]
return self._do_ls(t, format)
- @CLICommand('crash rm')
+ @CrashCLICommand('crash rm')
@with_crashes
def do_rm(self, id: str) -> Tuple[int, str, str]:
"""
self._refresh_health_checks()
return 0, '', ''
- @CLICommand('crash prune')
+ @CrashCLICommand('crash prune')
@with_crashes
def do_prune(self, keep: int) -> Tuple[int, str, str]:
"""
if removed_any:
self._refresh_health_checks()
- @CLIWriteCommand('crash archive')
+ @CrashCLICommand.Write('crash archive')
@with_crashes
def do_archive(self, id: str) -> Tuple[int, str, str]:
"""
self._refresh_health_checks()
return 0, '', ''
- @CLIWriteCommand('crash archive-all')
+ @CrashCLICommand.Write('crash archive-all')
@with_crashes
def do_archive_all(self) -> Tuple[int, str, str]:
"""
self._refresh_health_checks()
return 0, '', ''
- @CLIReadCommand('crash stat')
+ @CrashCLICommand.Read('crash stat')
@with_crashes
def do_stat(self) -> Tuple[int, str, str]:
"""
retlines.append(binstr(bindict))
return 0, '\n'.join(retlines), ''
- @CLIReadCommand('crash json_report')
+ @CrashCLICommand.Read('crash json_report')
@with_crashes
def do_json_report(self, hours: int) -> Tuple[int, str, str]:
"""
--- /dev/null
+from mgr_module import CLICommandBase
+
+DevicehealthCLICommand = CLICommandBase.make_registry_subtype("DevicehealthCLICommand")
from datetime import datetime, timedelta, timezone
from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
+from .cli import DevicehealthCLICommand
+
TIME_FORMAT = '%Y%m%d-%H%M%S'
DEVICE_HEALTH = 'DEVICE_HEALTH'
class Module(MgrModule):
+ CLICommand = DevicehealthCLICommand
# latest (if db does not exist)
SCHEMA = [
return False
return parts[0] in ('osd', 'mon')
- @CLIReadCommand('device query-daemon-health-metrics')
+ @DevicehealthCLICommand.Read('device query-daemon-health-metrics')
def do_query_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
'''
Get device health metrics for a given daemon
return result.wait()
@CLIRequiresDB
- @CLIReadCommand('device scrape-daemon-health-metrics')
+ @DevicehealthCLICommand.Read('device scrape-daemon-health-metrics')
@MgrModuleRecoverDB
def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
'''
return self.scrape_daemon(daemon_type, daemon_id)
@CLIRequiresDB
- @CLIReadCommand('device scrape-health-metrics')
+ @DevicehealthCLICommand.Read('device scrape-health-metrics')
@MgrModuleRecoverDB
def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
'''
return self.scrape_device(devid)
@CLIRequiresDB
- @CLIReadCommand('device get-health-metrics')
+ @DevicehealthCLICommand.Read('device get-health-metrics')
@MgrModuleRecoverDB
def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
'''
return self.show_device_metrics(devid, sample)
@CLIRequiresDB
- @CLICommand('device check-health')
+ @DevicehealthCLICommand('device check-health')
@MgrModuleRecoverDB
def do_check_health(self) -> Tuple[int, str, str]:
'''
'''
return self.check_health()
- @CLICommand('device monitoring on')
+ @DevicehealthCLICommand('device monitoring on')
def do_monitoring_on(self) -> Tuple[int, str, str]:
'''
Enable device health monitoring
self.event.set()
return 0, '', ''
- @CLICommand('device monitoring off')
+ @DevicehealthCLICommand('device monitoring off')
def do_monitoring_off(self) -> Tuple[int, str, str]:
'''
Disable device health monitoring
return 0, '', ''
@CLIRequiresDB
- @CLIReadCommand('device predict-life-expectancy')
+ @DevicehealthCLICommand.Read('device predict-life-expectancy')
@MgrModuleRecoverDB
def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
'''
--- /dev/null
+from mgr_module import CLICommandBase
+
+DiskpredictionLocalCLICommand = CLICommandBase.make_registry_subtype("DiskpredictionLocalCLICommand")
import time
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from mgr_module import CommandResult, MgrModule, Option
+
+from .cli import DiskpredictionLocalCLICommand
# Importing scipy early appears to avoid a future deadlock when
# we try to do
#
class Module(MgrModule):
+ CLICommand = DiskpredictionLocalCLICommand
MODULE_OPTIONS = [
Option(name='sleep_interval',
default=600),
--- /dev/null
+from mgr_module import CLICommandBase
+
+FeedbackCLICommand = CLICommandBase.make_registry_subtype("FeedbackCLICommand")
from requests.exceptions import RequestException
+from .cli import FeedbackCLICommand
+
from mgr_module import HandleCommandResult, MgrModule
import errno
class FeedbackModule(MgrModule):
+ CLICommand = FeedbackCLICommand
# there are CLI commands we implement
- @CLIReadCommand('feedback set api-key')
+ @FeedbackCLICommand.Read('feedback set api-key')
def _cmd_feedback_set_api_key(self, key: str) -> HandleCommandResult:
"""
Set Ceph Issue Tracker API key
return HandleCommandResult(stderr=f'Exception in setting API key : {error}')
return HandleCommandResult(stdout="Successfully updated API key")
- @CLIReadCommand('feedback delete api-key')
+ @FeedbackCLICommand.Read('feedback delete api-key')
def _cmd_feedback_delete_api_key(self) -> HandleCommandResult:
"""
Delete Ceph Issue Tracker API key
return HandleCommandResult(stderr=f'Exception in deleting API key : {error}')
return HandleCommandResult(stdout="Successfully deleted key")
- @CLIReadCommand('feedback get api-key')
+ @FeedbackCLICommand.Read('feedback get api-key')
def _cmd_feedback_get_api_key(self) -> HandleCommandResult:
"""
Get Ceph Issue Tracker API key
return HandleCommandResult(stderr=f'Error in retreiving issue tracker API key: {error}')
return HandleCommandResult(stdout=f'Your key: {key}')
- @CLIReadCommand('feedback issue list')
+ @FeedbackCLICommand.Read('feedback issue list')
def _cmd_feedback_issue_list(self) -> HandleCommandResult:
"""
Fetch issue list
return HandleCommandResult(stderr="Error occurred. Try again later")
return HandleCommandResult(stdout=str(response))
- @CLIReadCommand('feedback issue report')
+ @FeedbackCLICommand.Read('feedback issue report')
def _cmd_feedback_issue_report(self, project: str, tracker: str, subject: str, description: str) -> HandleCommandResult:
"""
Create an issue
--- /dev/null
+from mgr_module import CLICommandBase
+
+HelloCLICommand = CLICommandBase.make_registry_subtype("HelloCLICommand")
from typing import cast, Any, Optional, TYPE_CHECKING
import errno
+from .cli import HelloCLICommand
+
class Hello(MgrModule):
+ CLICommand = HelloCLICommand
# These are module options we understand. These can be set with
#
# ceph config set global mgr/hello/<name> <value>
self.log.debug(' native option %s = %s', opt, getattr(self, opt))
# there are CLI commands we implement
- @CLIReadCommand('hello')
+ @HelloCLICommand.Read('hello')
def hello(self, person_name: Optional[str] = None) -> HandleCommandResult:
"""
Say hello
fin = '!' if self.get_module_option('emphatic') else ''
return HandleCommandResult(stdout=f'Hello, {who}{fin}')
- @CLIReadCommand('count')
+ @HelloCLICommand.Read('count')
def count(self, num: int) -> HandleCommandResult:
"""
Do some counting
--- /dev/null
+from mgr_module import CLICommandBase
+
+InfluxCLICommand = CLICommandBase.make_registry_subtype("InfluxCLICommand")
import time
from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
+from .cli import InfluxCLICommand
+
from mgr_module import MgrModule, Option, OptionValue
try:
class Module(MgrModule):
+ CLICommand = InfluxCLICommand
MODULE_OPTIONS = [
Option(name='hostname',
default=None,
return json.dumps(result, indent=2, sort_keys=True)
- @CLIReadCommand('influx config-show')
+ @InfluxCLICommand.Read('influx config-show')
def config_show(self) -> Tuple[int, str, str]:
"""
Show current configuration
"""
return 0, json.dumps(self.config, sort_keys=True), ''
- @CLIWriteCommand('influx config-set')
+ @InfluxCLICommand.Write('influx config-set')
def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
if not value:
return -errno.EINVAL, '', 'Value should not be empty'
except ValueError as e:
return -errno.EINVAL, '', str(e)
- @CLICommand('influx send')
+ @InfluxCLICommand('influx send')
def send(self) -> Tuple[int, str, str]:
"""
Force sending data to Influx
--- /dev/null
+from mgr_module import CLICommandBase
+
+InsightsCLICommand = CLICommandBase.make_registry_subtype("InsightsCLICommand")
import re
import threading
+from .cli import InsightsCLICommand
+
from mgr_module import HandleCommandResult
from mgr_module import MgrModule, CommandResult, NotifyType
from . import health as health_util
class Module(MgrModule):
+ CLICommand = InsightsCLICommand
NOTIFY_TYPES = [NotifyType.health]
ret={}, outs=\"{}\"".format(ret, outs))
return [], ["Failed to read monitor config dump"]
- @CLIReadCommand('insights')
+ @InsightsCLICommand.Read('insights')
def do_report(self):
'''
Retrieve insights report
result = json.dumps(report, indent=2, cls=health_util.HealthEncoder)
return HandleCommandResult(stdout=result)
- @CLICommand('insights prune-health')
+ @InsightsCLICommand('insights prune-health')
def do_prune_health(self, hours: int):
'''
Remove health history older than <hours> hours
--- /dev/null
+from mgr_module import CLICommandBase
+
+IostatCLICommand = CLICommandBase.make_registry_subtype("IostatCLICommand")
from mgr_module import HandleCommandResult, MgrModule
+from .cli import IostatCLICommand
+
class Module(MgrModule):
+ CLICommand = IostatCLICommand
+
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
assert 'num_write' in r['pg_stats_delta']['stat_sum']
assert 'num_read' in r['pg_stats_delta']['stat_sum']
- @CLIReadCommand('iostat', poll=True)
+ @IostatCLICommand.Read('iostat', poll=True)
def iostat(self, width: int = 80, print_header: bool = False) -> HandleCommandResult:
"""
Get IO rates
--- /dev/null
+from mgr_module import CLICommandBase
+
+K8SeventsCLICommand = CLICommandBase.make_registry_subtype("K8SeventsCLICommand")
import tempfile
import threading
+from .cli import K8SeventsCLICommand
+
from urllib.parse import urlparse
from datetime import tzinfo, datetime, timedelta
class Module(MgrModule):
+ CLICommand = K8SeventsCLICommand
COMMANDS = [
{
"cmd": "k8sevents status",
--- /dev/null
+from mgr_module import CLICommandBase
+
+LocalpoolCLICommand = CLICommandBase.make_registry_subtype("LocalpoolCLICommand")
import threading
from typing import cast, Any
+from .cli import LocalpoolCLICommand
+
class Module(MgrModule):
+ CLICommand = LocalpoolCLICommand
MODULE_OPTIONS = [
Option(
--- /dev/null
+from mgr_module import CLICommandBase
+
+MDSAutoscalerCLICommand = CLICommandBase.make_registry_subtype("MDSAutoscalerCLICommand")
import orchestrator
import copy
+from .cli import MDSAutoscalerCLICommand
+
log = logging.getLogger(__name__)
class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
+ CLICommand = MDSAutoscalerCLICommand
"""
MDS autoscaler.
"""
--- /dev/null
+from mgr_module import CLICommandBase
+
+MirroringCLICommand = CLICommandBase.make_registry_subtype("MirroringCLICommand")
from typing import List, Optional
+from .cli import MirroringCLICommand
+
from mgr_module import MgrModule, Option, NotifyType
from .fs.snapshot_mirror import FSSnapshotMirror
class Module(MgrModule):
+ CLICommand = MirroringCLICommand
MODULE_OPTIONS: List[Option] = []
NOTIFY_TYPES = [NotifyType.fs_map]
def notify(self, notify_type: NotifyType, notify_id):
self.fs_snapshot_mirror.notify(notify_type)
- @CLIWriteCommand('fs snapshot mirror enable')
+ @MirroringCLICommand.Write('fs snapshot mirror enable')
def snapshot_mirror_enable(self,
fs_name: str):
"""Enable snapshot mirroring for a filesystem"""
return self.fs_snapshot_mirror.enable_mirror(fs_name)
- @CLIWriteCommand('fs snapshot mirror disable')
+ @MirroringCLICommand.Write('fs snapshot mirror disable')
def snapshot_mirror_disable(self,
fs_name: str):
"""Disable snapshot mirroring for a filesystem"""
return self.fs_snapshot_mirror.disable_mirror(fs_name)
- @CLIWriteCommand('fs snapshot mirror peer_add')
+ @MirroringCLICommand.Write('fs snapshot mirror peer_add')
def snapshot_mirorr_peer_add(self,
fs_name: str,
remote_cluster_spec: str,
"future release. Use 'peer_bootstrap' instead.\n")
return r, out, err
- @CLIReadCommand('fs snapshot mirror peer_list')
+ @MirroringCLICommand.Read('fs snapshot mirror peer_list')
def snapshot_mirror_peer_list(self,
fs_name: str):
"""List configured peers for a file system"""
return self.fs_snapshot_mirror.peer_list(fs_name)
- @CLIWriteCommand('fs snapshot mirror peer_remove')
+ @MirroringCLICommand.Write('fs snapshot mirror peer_remove')
def snapshot_mirror_peer_remove(self,
fs_name: str,
peer_uuid: str):
"""Remove a filesystem peer"""
return self.fs_snapshot_mirror.peer_remove(fs_name, peer_uuid)
- @CLIWriteCommand('fs snapshot mirror peer_bootstrap create')
+ @MirroringCLICommand.Write('fs snapshot mirror peer_bootstrap create')
def snapshot_mirror_peer_bootstrap_create(self,
fs_name: str,
client_name: str,
"""Bootstrap a filesystem peer"""
return self.fs_snapshot_mirror.peer_bootstrap_create(fs_name, client_name, site_name)
- @CLIWriteCommand('fs snapshot mirror peer_bootstrap import')
+ @MirroringCLICommand.Write('fs snapshot mirror peer_bootstrap import')
def snapshot_mirror_peer_bootstrap_import(self,
fs_name: str,
token: str):
"""Import a bootstrap token"""
return self.fs_snapshot_mirror.peer_bootstrap_import(fs_name, token)
- @CLIWriteCommand('fs snapshot mirror add')
+ @MirroringCLICommand.Write('fs snapshot mirror add')
def snapshot_mirror_add_dir(self,
fs_name: str,
path: str):
"""Add a directory for snapshot mirroring"""
return self.fs_snapshot_mirror.add_dir(fs_name, path)
- @CLIWriteCommand('fs snapshot mirror remove')
+ @MirroringCLICommand.Write('fs snapshot mirror remove')
def snapshot_mirror_remove_dir(self,
fs_name: str,
path: str):
"""Remove a snapshot mirrored directory"""
return self.fs_snapshot_mirror.remove_dir(fs_name, path)
- @CLIReadCommand('fs snapshot mirror ls')
+ @MirroringCLICommand.Read('fs snapshot mirror ls')
def snapshot_mirror_ls(self,
fs_name: str):
"""List the snapshot mirrored directories"""
return self.fs_snapshot_mirror.list_dirs(fs_name)
- @CLIReadCommand('fs snapshot mirror dirmap')
+ @MirroringCLICommand.Read('fs snapshot mirror dirmap')
def snapshot_mirror_dirmap(self,
fs_name: str,
path: str):
"""Get current mirror instance map for a directory"""
return self.fs_snapshot_mirror.status(fs_name, path)
- @CLIReadCommand('fs snapshot mirror show distribution')
+ @MirroringCLICommand.Read('fs snapshot mirror show distribution')
def snapshot_mirror_distribution(self,
fs_name: str):
"""Get current instance to directory map for a filesystem"""
return self.fs_snapshot_mirror.show_distribution(fs_name)
- @CLIReadCommand('fs snapshot mirror daemon status')
+ @MirroringCLICommand.Read('fs snapshot mirror daemon status')
def snapshot_mirror_daemon_status(self):
"""Get mirror daemon status"""
return self.fs_snapshot_mirror.daemon_status()
--- /dev/null
+from mgr_module import CLICommandBase
+
+NFSCLICommand = CLICommandBase.make_registry_subtype("NFSCLICommand")
from typing import Tuple, Optional, List, Dict, Any
import yaml
+from .cli import NFSCLICommand
+
from mgr_module import MgrModule, Option, CLICheckNonemptyFileInput
import object_format
import orchestrator
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ CLICommand = NFSCLICommand
MODULE_OPTIONS: List[Option] = []
def __init__(self, *args: str, **kwargs: Any) -> None:
self.nfs = NFSCluster(self)
self.inited = True
- @CLICommand('nfs export create cephfs', perm='rw')
+ @NFSCLICommand('nfs export create cephfs', perm='rw')
@object_format.Responder()
def _cmd_nfs_export_create_cephfs(
self,
earmark_resolver=earmark_resolver
)
- @CLICommand('nfs export create rgw', perm='rw')
+ @NFSCLICommand('nfs export create rgw', perm='rw')
@object_format.Responder()
def _cmd_nfs_export_create_rgw(
self,
sectype=sectype,
)
- @CLICommand('nfs export rm', perm='rw')
+ @NFSCLICommand('nfs export rm', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_export_rm(self, cluster_id: str, pseudo_path: str) -> None:
"""Remove a cephfs export"""
return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
- @CLICommand('nfs export delete', perm='rw')
+ @NFSCLICommand('nfs export delete', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_export_delete(self, cluster_id: str, pseudo_path: str) -> None:
"""Delete a cephfs export (DEPRECATED)"""
return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
- @CLICommand('nfs export ls', perm='r')
+ @NFSCLICommand('nfs export ls', perm='r')
@object_format.Responder()
def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> List[Any]:
"""List exports of a NFS cluster"""
return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
- @CLICommand('nfs export info', perm='r')
+ @NFSCLICommand('nfs export info', perm='r')
@object_format.Responder()
def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
"""Fetch a export of a NFS cluster given the pseudo path/binding"""
return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
- @CLICommand('nfs export get', perm='r')
+ @NFSCLICommand('nfs export get', perm='r')
@object_format.Responder()
def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
"""Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
- @CLICommand('nfs export apply', perm='rw')
+ @NFSCLICommand('nfs export apply', perm='rw')
@CLICheckNonemptyFileInput(desc='Export JSON or Ganesha EXPORT specification')
@object_format.Responder()
def _cmd_nfs_export_apply(self, cluster_id: str, inbuf: str) -> AppliedExportResults:
return self.export_mgr.apply_export(cluster_id, export_config=inbuf,
earmark_resolver=earmark_resolver)
- @CLICommand('nfs cluster create', perm='rw')
+ @NFSCLICommand('nfs cluster create', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_cluster_create(self,
cluster_id: str,
tls_min_version=tls_min_version,
tls_ciphers=tls_ciphers)
- @CLICommand('nfs cluster rm', perm='rw')
+ @NFSCLICommand('nfs cluster rm', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_cluster_rm(self, cluster_id: str) -> None:
"""Removes an NFS Cluster"""
return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
- @CLICommand('nfs cluster delete', perm='rw')
+ @NFSCLICommand('nfs cluster delete', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_cluster_delete(self, cluster_id: str) -> None:
"""Removes an NFS Cluster (DEPRECATED)"""
return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
- @CLICommand('nfs cluster ls', perm='r')
+ @NFSCLICommand('nfs cluster ls', perm='r')
@object_format.Responder()
def _cmd_nfs_cluster_ls(self) -> List[str]:
"""List NFS Clusters"""
return self.nfs.list_nfs_cluster()
- @CLICommand('nfs cluster info', perm='r')
+ @NFSCLICommand('nfs cluster info', perm='r')
@object_format.Responder()
def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Dict[str, Any]:
"""Displays NFS Cluster info"""
return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
- @CLICommand('nfs cluster config get', perm='r')
+ @NFSCLICommand('nfs cluster config get', perm='r')
@object_format.ErrorResponseHandler()
def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]:
"""Fetch NFS-Ganesha config"""
conf = self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
return 0, conf, ""
- @CLICommand('nfs cluster config set', perm='rw')
+ @NFSCLICommand('nfs cluster config set', perm='rw')
@CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
@object_format.EmptyResponder()
def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> None:
"""Set NFS-Ganesha config by `-i <config_file>`"""
return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf)
- @CLICommand('nfs cluster config reset', perm='rw')
+ @NFSCLICommand('nfs cluster config reset', perm='rw')
@object_format.EmptyResponder()
def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> None:
"""Reset NFS-Ganesha Config to default"""
--- /dev/null
+from mgr_module import CLICommandBase
+
+OSDPerfQueryCLICommand = CLICommandBase.make_registry_subtype("OSDPerfQueryCLICommand")
import errno
import prettytable
+from .cli import OSDPerfQueryCLICommand
+
from mgr_module import MgrModule
def get_human_readable(bytes, precision=2):
return '%.*f%s' % (precision, bytes, suffixes[suffix_index])
class OSDPerfQuery(MgrModule):
+ CLICommand = OSDPerfQueryCLICommand
COMMANDS = [
{
"cmd": "osd perf query add "
--- /dev/null
+from mgr_module import CLICommandBase
+
+OSDSupportCLICommand = CLICommandBase.make_registry_subtype("OSDSupportCLICommand")
from mgr_module import MgrModule
+from .cli import OSDSupportCLICommand
+
class OSDSupport(MgrModule):
+ CLICommand = OSDSupportCLICommand
# Kept to keep upgrades from older point releases working.
# This module can be removed as soon as we no longer
# support upgrades from old octopus point releases.
--- /dev/null
+from mgr_module import CLICommandBase
+
+PGAutoscalerCLICommand = CLICommandBase.make_registry_subtype("PGAutoscalerCLICommand")
from prettytable import PrettyTable
from mgr_module import HealthChecksT, CRUSHMap, MgrModule, Option, OSDMap
+from .cli import PGAutoscalerCLICommand
+
"""
Some terminology is made up for the purposes of this module:
class PgAutoscaler(MgrModule):
+ CLICommand = PGAutoscalerCLICommand
"""
PG autoscaler.
"""
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name']))
- @CLIReadCommand('osd pool autoscale-status')
+ @PGAutoscalerCLICommand.Read('osd pool autoscale-status')
def _command_autoscale_status(self, format: str = 'plain') -> Tuple[int, str, str]:
"""
report on pool pg_num sizing recommendation and intent
])
return 0, table.get_string(), ''
- @CLIWriteCommand("osd pool set threshold")
+ @PGAutoscalerCLICommand.Write("osd pool set threshold")
def set_scaling_threshold(self, num: float) -> Tuple[int, str, str]:
"""
set the autoscaler threshold
self.set_module_option("threshold", num)
return 0, "threshold updated", ""
- @CLIReadCommand("osd pool get threshold")
+ @PGAutoscalerCLICommand.Read("osd pool get threshold")
def get_scaling_threshold(self) -> Tuple[int, str, str]:
"""
return the autoscaler threshold value
else:
return False
- @CLIWriteCommand("osd pool get noautoscale")
+ @PGAutoscalerCLICommand.Write("osd pool get noautoscale")
def get_noautoscale(self) -> Tuple[int, str, str]:
"""
Get the noautoscale flag to see if all pools
else:
return 0, "", "noautoscale is off"
- @CLIWriteCommand("osd pool unset noautoscale")
+ @PGAutoscalerCLICommand.Write("osd pool unset noautoscale")
def unset_noautoscale(self) -> Tuple[int, str, str]:
"""
Unset the noautoscale flag so all pools will
})
return 0, "", "noautoscale is unset, all pools now back to its previous mode"
- @CLIWriteCommand("osd pool set noautoscale")
+ @PGAutoscalerCLICommand.Write("osd pool set noautoscale")
def set_noautoscale(self) -> Tuple[int, str, str]:
"""
set the noautoscale for all pools (including
--- /dev/null
+from mgr_module import CLICommandBase
+
+ProgressCLICommand = CLICommandBase.make_registry_subtype("ProgressCLICommand")
import logging
import json
+from .cli import ProgressCLICommand
+
ENCODING_VERSION = 2
class Module(MgrModule):
+ CLICommand = ProgressCLICommand
COMMANDS = [
{"cmd": "progress",
"desc": "Show progress of recovery operations",
--- /dev/null
+from mgr_module import CLICommandBase
+
+PrometheusCLICommand = CLICommandBase.make_registry_subtype("PrometheusCLICommand")
from collections import OrderedDict
from tempfile import NamedTemporaryFile
+from .cli import PrometheusCLICommand
+
from mgr_module import MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT, HandleCommandResult
from mgr_util import get_default_addr, profile_method, build_url, test_port_allocation, PortAlreadyInUse
from orchestrator import OrchestratorClientMixin, raise_if_exception, OrchestratorError
class Module(MgrModule, OrchestratorClientMixin):
+ CLICommand = PrometheusCLICommand
MODULE_OPTIONS = [
Option(
'server_addr',
return ''.join(_metrics) + '\n'
- @CLIReadCommand('prometheus file_sd_config')
+ @PrometheusCLICommand.Read('prometheus file_sd_config')
def get_file_sd_config(self) -> Tuple[int, str, str]:
'''
Return file_sd compatible prometheus config for mgr cluster
self.log.info('Stopping engine...')
self.shutdown_event.set()
- @CLIReadCommand('healthcheck history ls')
+ @PrometheusCLICommand.Read('healthcheck history ls')
def _list_healthchecks(self, format: Format = Format.plain) -> HandleCommandResult:
"""List all the healthchecks being tracked
return HandleCommandResult(retval=0, stdout=out)
- @CLIWriteCommand('healthcheck history clear')
+ @PrometheusCLICommand.Write('healthcheck history clear')
def _clear_healthchecks(self) -> HandleCommandResult:
"""Clear the healthcheck history"""
self.health_history.reset()
--- /dev/null
+from mgr_module import CLICommandBase
+
+RBDSupportCLICommand = CLICommandBase.make_registry_subtype("RBDSupportCLICommand")
import traceback
from typing import cast, Any, Callable, Optional, Tuple, TypeVar
+from .cli import RBDSupportCLICommand
+
from mgr_module import MgrModule, Option
from threading import Thread, Event
class Module(MgrModule):
+ CLICommand = RBDSupportCLICommand
MODULE_OPTIONS = [
Option(name=MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME),
Option(name=MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE,
# shut down client and deregister it from MgrMap
super().shutdown()
- @CLIWriteCommand('rbd mirror snapshot schedule add')
+ @RBDSupportCLICommand.Write('rbd mirror snapshot schedule add')
@with_latest_osdmap
def mirror_snapshot_schedule_add(self,
level_spec: str,
spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
return self.mirror_snapshot_schedule.add_schedule(spec, interval, start_time)
- @CLIWriteCommand('rbd mirror snapshot schedule remove')
+ @RBDSupportCLICommand.Write('rbd mirror snapshot schedule remove')
@with_latest_osdmap
def mirror_snapshot_schedule_remove(self,
level_spec: str,
spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
return self.mirror_snapshot_schedule.remove_schedule(spec, interval, start_time)
- @CLIReadCommand('rbd mirror snapshot schedule list')
+ @RBDSupportCLICommand.Read('rbd mirror snapshot schedule list')
@with_latest_osdmap
def mirror_snapshot_schedule_list(self,
level_spec: str = '') -> Tuple[int, str, str]:
spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
return self.mirror_snapshot_schedule.list(spec)
- @CLIReadCommand('rbd mirror snapshot schedule status')
+ @RBDSupportCLICommand.Read('rbd mirror snapshot schedule status')
@with_latest_osdmap
def mirror_snapshot_schedule_status(self,
level_spec: str = '') -> Tuple[int, str, str]:
spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
return self.mirror_snapshot_schedule.status(spec)
- @CLIReadCommand('rbd perf image stats')
+ @RBDSupportCLICommand.Read('rbd perf image stats')
@with_latest_osdmap
def perf_image_stats(self,
pool_spec: Optional[str] = None,
sort_by_name = sort_by.name if sort_by else OSD_PERF_QUERY_COUNTERS[0]
return self.perf.get_perf_stats(pool_spec, sort_by_name)
- @CLIReadCommand('rbd perf image counters')
+ @RBDSupportCLICommand.Read('rbd perf image counters')
@with_latest_osdmap
def perf_image_counters(self,
pool_spec: Optional[str] = None,
sort_by_name = sort_by.name if sort_by else OSD_PERF_QUERY_COUNTERS[0]
return self.perf.get_perf_counters(pool_spec, sort_by_name)
- @CLIWriteCommand('rbd task add flatten')
+ @RBDSupportCLICommand.Write('rbd task add flatten')
@with_latest_osdmap
def task_add_flatten(self, image_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_flatten(image_spec)
- @CLIWriteCommand('rbd task add remove')
+ @RBDSupportCLICommand.Write('rbd task add remove')
@with_latest_osdmap
def task_add_remove(self, image_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_remove(image_spec)
- @CLIWriteCommand('rbd task add trash remove')
+ @RBDSupportCLICommand.Write('rbd task add trash remove')
@with_latest_osdmap
def task_add_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_trash_remove(image_id_spec)
- @CLIWriteCommand('rbd task add migration execute')
+ @RBDSupportCLICommand.Write('rbd task add migration execute')
@with_latest_osdmap
def task_add_migration_execute(self, image_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_migration_execute(image_spec)
- @CLIWriteCommand('rbd task add migration commit')
+ @RBDSupportCLICommand.Write('rbd task add migration commit')
@with_latest_osdmap
def task_add_migration_commit(self, image_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_migration_commit(image_spec)
- @CLIWriteCommand('rbd task add migration abort')
+ @RBDSupportCLICommand.Write('rbd task add migration abort')
@with_latest_osdmap
def task_add_migration_abort(self, image_spec: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.queue_migration_abort(image_spec)
- @CLIWriteCommand('rbd task cancel')
+ @RBDSupportCLICommand.Write('rbd task cancel')
@with_latest_osdmap
def task_cancel(self, task_id: str) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.task_cancel(task_id)
- @CLIReadCommand('rbd task list')
+ @RBDSupportCLICommand.Read('rbd task list')
@with_latest_osdmap
def task_list(self, task_id: Optional[str] = None) -> Tuple[int, str, str]:
"""
with self.task.lock:
return self.task.task_list(task_id)
- @CLIWriteCommand('rbd trash purge schedule add')
+ @RBDSupportCLICommand.Write('rbd trash purge schedule add')
@with_latest_osdmap
def trash_purge_schedule_add(self,
level_spec: str,
spec = LevelSpec.from_name(self, level_spec, allow_image_level=False)
return self.trash_purge_schedule.add_schedule(spec, interval, start_time)
- @CLIWriteCommand('rbd trash purge schedule remove')
+ @RBDSupportCLICommand.Write('rbd trash purge schedule remove')
@with_latest_osdmap
def trash_purge_schedule_remove(self,
level_spec: str,
spec = LevelSpec.from_name(self, level_spec, allow_image_level=False)
return self.trash_purge_schedule.remove_schedule(spec, interval, start_time)
- @CLIReadCommand('rbd trash purge schedule list')
+ @RBDSupportCLICommand.Read('rbd trash purge schedule list')
@with_latest_osdmap
def trash_purge_schedule_list(self,
level_spec: str = '') -> Tuple[int, str, str]:
spec = LevelSpec.from_name(self, level_spec, allow_image_level=False)
return self.trash_purge_schedule.list(spec)
- @CLIReadCommand('rbd trash purge schedule status')
+ @RBDSupportCLICommand.Read('rbd trash purge schedule status')
@with_latest_osdmap
def trash_purge_schedule_status(self,
level_spec: str = '') -> Tuple[int, str, str]:
--- /dev/null
+from mgr_module import CLICommandBase
+
+RGWCLICommand = CLICommandBase.make_registry_subtype("RGWCLICommand")
import functools
import sys
+from .cli import RGWCLICommand
+
from mgr_module import (
MgrModule,
HandleCommandResult,
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ CLICommand = RGWCLICommand
MODULE_OPTIONS: List[Option] = [
Option(
'secondary_zone_period_retry_limit',
self.get_ceph_option(opt))
self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
- @CLICommand('rgw admin', perm='rw')
+ @RGWCLICommand('rgw admin', perm='rw')
def _cmd_rgw_admin(self, params: Sequence[str]) -> HandleCommandResult:
"""rgw admin"""
cmd, returncode, out, err = self.env.mgr.tool_exec('radosgw-admin', params or [])
return HandleCommandResult(retval=returncode, stdout=out, stderr=err)
- @CLICommand('rgw realm bootstrap', perm='rw')
+ @RGWCLICommand('rgw realm bootstrap', perm='rw')
@check_orchestrator
def _cmd_rgw_realm_bootstrap(self,
realm_name: Optional[str] = None,
f'with attrs {profile_attrs}: {str(e)}')
return profile_name
- @CLICommand('rgw realm zone-creds create', perm='rw')
+ @RGWCLICommand('rgw realm zone-creds create', perm='rw')
def _cmd_rgw_realm_new_zone_creds(self,
realm_name: Optional[str] = None,
endpoints: Optional[str] = None,
return HandleCommandResult(retval=retval, stdout=out, stderr=err)
- @CLICommand('rgw realm zone-creds remove', perm='rw')
+ @RGWCLICommand('rgw realm zone-creds remove', perm='rw')
def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None) -> HandleCommandResult:
"""Create credentials for new zone creation"""
return HandleCommandResult(retval=retval, stdout=out, stderr=err)
- @CLICommand('rgw realm tokens', perm='r')
+ @RGWCLICommand('rgw realm tokens', perm='r')
def list_realm_tokens(self) -> HandleCommandResult:
try:
realms_info = self.get_realm_tokens()
realms_info.append({'realm': realm_info['realm_name'], 'token': realm_token_s})
return realms_info
- @CLICommand('rgw zone modify', perm='rw')
+ @RGWCLICommand('rgw zone modify', perm='rw')
def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]) -> HandleCommandResult:
try:
retval, out, err = RGWAM(self.env).zone_modify(realm_name,
self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
- @CLICommand('rgw zonegroup modify', perm='rw')
+ @RGWCLICommand('rgw zonegroup modify', perm='rw')
def update_zonegroup_info(self, realm_name: str, zonegroup_name: str, zone_name: str, hostnames: List[str]) -> HandleCommandResult:
try:
retval, out, err = RGWAM(self.env).zonegroup_modify(realm_name,
self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
- @CLICommand('rgw zone create', perm='rw')
+ @RGWCLICommand('rgw zone create', perm='rw')
@check_orchestrator
def _cmd_rgw_zone_create(self,
zone_name: Optional[str] = None,
raise e
return created_zones
- @CLICommand('rgw realm reconcile', perm='rw')
+ @RGWCLICommand('rgw realm reconcile', perm='rw')
def _cmd_rgw_realm_reconcile(self,
realm_name: Optional[str] = None,
zonegroup_name: Optional[str] = None,
--- /dev/null
+from mgr_module import CLICommandBase
+
+RookCLICommand = CLICommandBase.make_registry_subtype("RookCLICommand")
from typing import Optional, Dict, Union, Tuple, Type, Optional
from functools import wraps
+from .cli import RookCLICommand
+
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
from ceph.utils import datetime_now
class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
+ CLICommand = RookCLICommand
"""
Writes are a two-phase thing, firstly sending
the write to the k8s API (fast) and then waiting
--- /dev/null
+from mgr_module import CLICommandBase
+
+SelftestCLICommand = CLICommandBase.make_registry_subtype("SelftestCLICommand")
from io import StringIO
from typing import Any, Dict, List, Optional, Tuple
+from .cli import SelftestCLICommand
+
# These workloads are things that can be requested to run inside the
# serve() function
class Module(MgrModule):
+ CLICommand = SelftestCLICommand
"""
This module is for testing the ceph-mgr python interface from within
a running ceph-mgr daemon.
self._health: Dict[str, Dict[str, Any]] = {}
self._repl = InteractiveInterpreter(dict(mgr=self))
- @CLICommand('mgr self-test python-version', perm='r')
+ @SelftestCLICommand('mgr self-test python-version', perm='r')
def python_version(self) -> Tuple[int, str, str]:
'''
Query the version of the embedded Python runtime
micro = sys.version_info.micro
return 0, f'{major}.{minor}.{micro}', ''
- @CLICommand('mgr self-test run')
+ @SelftestCLICommand('mgr self-test run')
def run(self) -> Tuple[int, str, str]:
'''
Run mgr python interface tests
self._self_test()
return 0, '', 'Self-test succeeded'
- @CLICommand('mgr self-test background start')
+ @SelftestCLICommand('mgr self-test background start')
def backgroun_start(self, workload: Workload) -> Tuple[int, str, str]:
'''
Activate a background workload (one of command_spam, throw_exception)
self._event.set()
return 0, '', 'Running `{0}` in background'.format(self._workload)
- @CLICommand('mgr self-test background stop')
+ @SelftestCLICommand('mgr self-test background stop')
def background_stop(self) -> Tuple[int, str, str]:
'''
Stop background workload if any is running
else:
return 0, '', 'No background workload was running'
- @CLICommand('mgr self-test config get')
+ @SelftestCLICommand('mgr self-test config get')
def config_get(self, key: str) -> Tuple[int, str, str]:
'''
Peek at a configuration value
'''
return 0, str(self.get_module_option(key)), ''
- @CLICommand('mgr self-test config get_localized')
+ @SelftestCLICommand('mgr self-test config get_localized')
def config_get_localized(self, key: str) -> Tuple[int, str, str]:
'''
Peek at a configuration value (localized variant)
'''
return 0, str(self.get_localized_module_option(key)), ''
- @CLICommand('mgr self-test remote')
+ @SelftestCLICommand('mgr self-test remote')
def test_remote(self) -> Tuple[int, str, str]:
'''
Test inter-module calls
self._test_remote_calls()
return 0, '', 'Successfully called'
- @CLICommand('mgr self-test module')
+ @SelftestCLICommand('mgr self-test module')
def module(self, module: str) -> Tuple[int, str, str]:
'''
Run another module's self_test() method
else:
return 0, str(r), "Self-test OK"
- @CLICommand('mgr self-test cluster-log')
+ @SelftestCLICommand('mgr self-test cluster-log')
def do_cluster_log(self,
channel: str,
priority: str,
message)
return 0, '', 'Successfully called'
- @CLICommand('mgr self-test health set')
+ @SelftestCLICommand('mgr self-test health set')
def health_set(self, checks: str) -> Tuple[int, str, str]:
'''
Set a health check from a JSON-formatted description.
self.set_health_checks(self._health)
return 0, "", ""
- @CLICommand('mgr self-test health clear')
+ @SelftestCLICommand('mgr self-test health clear')
def health_clear(self, checks: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Clear health checks by name. If no names provided, clear all.
self.set_health_checks(self._health)
return 0, "", ""
- @CLICommand('mgr self-test insights_set_now_offset')
+ @SelftestCLICommand('mgr self-test insights_set_now_offset')
def insights_set_now_offset(self, hours: int) -> Tuple[int, str, str]:
'''
Set the now time for the insights module.
self._event.clear()
self.log.info("Ended command_spam workload...")
- @CLICommand('mgr self-test eval')
+ @SelftestCLICommand('mgr self-test eval')
def eval(self,
s: Optional[str] = None,
inbuf: Optional[str] = None) -> HandleCommandResult:
--- /dev/null
+from mgr_module import CLICommandBase
+
+SnapScheduleCLICommand = CLICommandBase.make_registry_subtype("SnapScheduleCLICommand")
from mgr_util import CephfsConnectionException
from threading import Event
+from .cli import SnapScheduleCLICommand
+
class Module(MgrModule):
+ CLICommand = SnapScheduleCLICommand
NOTIFY_TYPES = [NotifyType.fs_map]
MODULE_OPTIONS = [
self._initialized.wait()
return -errno.EINVAL, "", "Unknown command"
- @CLIReadCommand('fs snap-schedule status')
+ @SnapScheduleCLICommand.Read('fs snap-schedule status')
def snap_schedule_get(self,
path: str = '/',
fs: Optional[str] = None,
self.log.info(errstr)
return 0, '\n===\n'.join([ret_sched.report() for ret_sched in ret_scheds]), ''
- @CLIReadCommand('fs snap-schedule list')
+ @SnapScheduleCLICommand.Read('fs snap-schedule list')
def snap_schedule_list(self, path: str,
recursive: bool = False,
fs: Optional[str] = None,
return 0, json.dumps(out), ''
return 0, '\n'.join([str(sched) for sched in scheds]), ''
- @CLIWriteCommand('fs snap-schedule add')
+ @SnapScheduleCLICommand.Write('fs snap-schedule add')
def snap_schedule_add(self,
path: str,
snap_schedule: str,
return -errno.EIO, '', str(e)
return 0, suc_msg, ''
- @CLIWriteCommand('fs snap-schedule remove')
+ @SnapScheduleCLICommand.Write('fs snap-schedule remove')
def snap_schedule_rm(self,
path: str,
repeat: Optional[str] = None,
return -errno.EIO, '', str(e)
return 0, 'Schedule removed for path {}'.format(abs_path), ''
- @CLIWriteCommand('fs snap-schedule retention add')
+ @SnapScheduleCLICommand.Write('fs snap-schedule retention add')
def snap_schedule_retention_add(self,
path: str,
retention_spec_or_period: str,
return -errno.EIO, '', str(e)
return 0, 'Retention added to path {}'.format(abs_path), ''
- @CLIWriteCommand('fs snap-schedule retention remove')
+ @SnapScheduleCLICommand.Write('fs snap-schedule retention remove')
def snap_schedule_retention_rm(self,
path: str,
retention_spec_or_period: str,
return -errno.EIO, '', str(e)
return 0, 'Retention removed from path {}'.format(abs_path), ''
- @CLIWriteCommand('fs snap-schedule activate')
+ @SnapScheduleCLICommand.Write('fs snap-schedule activate')
def snap_schedule_activate(self,
path: str,
repeat: Optional[str] = None,
return -errno.EIO, '', str(e)
return 0, 'Schedule activated for path {}'.format(abs_path), ''
- @CLIWriteCommand('fs snap-schedule deactivate')
+ @SnapScheduleCLICommand.Write('fs snap-schedule deactivate')
def snap_schedule_deactivate(self,
path: str,
repeat: Optional[str] = None,
--- /dev/null
+from mgr_module import CLICommandBase
+
+StatsCLICommand = CLICommandBase.make_registry_subtype("StatsCLICommand")
import json
from typing import List, Dict
+from .cli import StatsCLICommand
+
from mgr_module import MgrModule, Option, NotifyType
from .fs.perf_stats import FSPerfStats
class Module(MgrModule):
+ CLICommand = StatsCLICommand
COMMANDS = [
{
"cmd": "fs perf stats "
--- /dev/null
+from mgr_module import CLICommandBase
+
+StatusCLICommand = CLICommandBase.make_registry_subtype("StatusCLICommand")
import mgr_util
import json
+from .cli import StatusCLICommand
+
from mgr_module import MgrModule, HandleCommandResult
class Module(MgrModule):
+ CLICommand = StatusCLICommand
def get_unlabeled_counter_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
data = self.get_unlabeled_counter(daemon_type, daemon_name, stat)[stat]
if data:
else:
return 0
- @CLIReadCommand("fs status")
+ @StatusCLICommand.Read("fs status")
def handle_fs_status(self,
fs: Optional[str] = None,
format: str = 'plain') -> Tuple[int, str, str]:
else:
return HandleCommandResult(stdout=output)
- @CLIReadCommand("osd status")
+ @StatusCLICommand.Read("osd status")
def handle_osd_status(self, bucket: Optional[str] = None, format: str = 'plain') -> Tuple[int, str, str]:
"""
Show the status of OSDs within a bucket, or all
--- /dev/null
+from mgr_module import CLICommandBase
+
+TelegrafCLICommand = CLICommandBase.make_registry_subtype("TelegrafCLICommand")
import time
from threading import Event
+from .cli import TelegrafCLICommand
+
from telegraf.basesocket import BaseSocket
from telegraf.protocol import Line
from mgr_module import MgrModule, Option, OptionValue, PG_STATES
class Module(MgrModule):
+ CLICommand = TelegrafCLICommand
MODULE_OPTIONS = [
Option(name='address',
default='unixgram:///tmp/telegraf.sock'),
self.run = False
self.event.set()
- @CLIReadCommand('telegraf config-show')
+ @TelegrafCLICommand.Read('telegraf config-show')
def config_show(self) -> Tuple[int, str, str]:
"""
Show current configuration
"""
return 0, json.dumps(self.config), ''
- @CLICommand('telegraf config-set')
+ @TelegrafCLICommand('telegraf config-set')
def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
"""
Set a configuration value
self.set_module_option(key, value)
return 0, 'Configuration option {0} updated'.format(key), ''
- @CLICommand('telegraf send')
+ @TelegrafCLICommand('telegraf send')
def send(self) -> Tuple[int, str, str]:
"""
Force sending data to Telegraf
--- /dev/null
+from mgr_module import CLICommandBase
+
+TelemetryCLICommand = CLICommandBase.make_registry_subtype("TelemetryCLICommand")
from collections import defaultdict
from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
+from .cli import TelemetryCLICommand
+
from mgr_module import MgrModule, Option, OptionValue, ServiceInfoT
]
class Module(MgrModule):
+ CLICommand = TelemetryCLICommand
metadata_keys = [
"arch",
"ceph_version",
return 0, msg, ''
- @CLIReadCommand('telemetry status')
+ @TelemetryCLICommand.Read('telemetry status')
def status(self) -> Tuple[int, str, str]:
'''
Show current configuration
if self.last_upload else self.last_upload)
return 0, json.dumps(r, indent=4, sort_keys=True), ''
- @CLIReadCommand('telemetry diff')
+ @TelemetryCLICommand.Read('telemetry diff')
def diff(self) -> Tuple[int, str, str]:
'''
Show the diff between opted-in collection and available collection
return 0, r, ''
- @CLICommand('telemetry on')
+ @TelemetryCLICommand('telemetry on')
def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
'''
Enable telemetry reports from this cluster
return 0, msg, ''
- @CLICommand('telemetry off')
+ @TelemetryCLICommand('telemetry off')
def off(self) -> Tuple[int, str, str]:
'''
Disable telemetry reports from this cluster
msg = 'Telemetry is now disabled.'
return 0, msg, ''
- @CLIReadCommand('telemetry enable channel all')
+ @TelemetryCLICommand.Read('telemetry enable channel all')
def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
'''
Enable all channels
'''
return self.toggle_channel('enable', channels)
- @CLIReadCommand('telemetry enable channel')
+ @TelemetryCLICommand.Read('telemetry enable channel')
def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Enable a list of channels
'''
return self.toggle_channel('enable', channels)
- @CLIReadCommand('telemetry disable channel all')
+ @TelemetryCLICommand.Read('telemetry disable channel all')
def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
'''
Disable all channels
'''
return self.toggle_channel('disable', channels)
- @CLIReadCommand('telemetry disable channel')
+ @TelemetryCLICommand.Read('telemetry disable channel')
def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Disable a list of channels
'''
return self.toggle_channel('disable', channels)
- @CLIReadCommand('telemetry channel ls')
+ @TelemetryCLICommand.Read('telemetry channel ls')
def channel_ls(self) -> Tuple[int, str, str]:
'''
List all channels
return 0, table.get_string(sortby="NAME"), ''
- @CLIReadCommand('telemetry collection ls')
+ @TelemetryCLICommand.Read('telemetry collection ls')
def collection_ls(self) -> Tuple[int, str, str]:
'''
List all collections
return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
- @CLICommand('telemetry send')
+ @TelemetryCLICommand('telemetry send')
def do_send(self,
endpoint: Optional[List[EndPoint]] = None,
license: Optional[str] = None) -> Tuple[int, str, str]:
self.last_report = self.compile_report()
return self.send(self.last_report, endpoint)
- @CLIReadCommand('telemetry show')
+ @TelemetryCLICommand.Read('telemetry show')
def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Show a sample report of opted-in collections (except for 'device')
return 0, report, ''
- @CLIReadCommand('telemetry preview')
+ @TelemetryCLICommand.Read('telemetry preview')
def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Preview a sample report of the most recent collections available (except for 'device')
return 0, report, ''
- @CLIReadCommand('telemetry show-device')
+ @TelemetryCLICommand.Read('telemetry show-device')
def show_device(self) -> Tuple[int, str, str]:
'''
Show a sample device report
return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
- @CLIReadCommand('telemetry preview-device')
+ @TelemetryCLICommand.Read('telemetry preview-device')
def preview_device(self) -> Tuple[int, str, str]:
'''
Preview a sample device report of the most recent device collection
report = json.dumps(report, indent=4, sort_keys=True)
return 0, report, ''
- @CLIReadCommand('telemetry show-all')
+ @TelemetryCLICommand.Read('telemetry show-all')
def show_all(self) -> Tuple[int, str, str]:
'''
Show a sample report of all enabled channels (including 'device' channel)
self.format_perf_histogram(report)
return 0, json.dumps(report, indent=4, sort_keys=True), ''
- @CLIReadCommand('telemetry preview-all')
+ @TelemetryCLICommand.Read('telemetry preview-all')
def preview_all(self) -> Tuple[int, str, str]:
'''
Preview a sample report of the most recent collections available of all channels (including 'device')
--- /dev/null
+from mgr_module import CLICommandBase
+
+TestOrchestratorCLICommand = CLICommandBase.make_registry_subtype("TestOrchestratorCLICommand")
import itertools
from subprocess import check_output, CalledProcessError
+from .cli import TestOrchestratorCLICommand
+
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, IscsiServiceSpec
try:
class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
+ CLICommand = TestOrchestratorCLICommand
"""
This is an orchestrator implementation used for internal testing. It's meant for
development environments and integration testing.
The implementation is similar to the Rook orchestrator, but simpler.
"""
- @CLICommand('test_orchestrator load_data', perm='w')
+ @TestOrchestratorCLICommand('test_orchestrator load_data', perm='w')
def _load_data(self, inbuf):
"""
load dummy data into test orchestrator
--- /dev/null
+from mgr_module import CLICommandBase
+
+VolumesCLICommand = CLICommandBase.make_registry_subtype("VolumesCLICommand")
import traceback
import threading
+from .cli import VolumesCLICommand
+
from mgr_module import MgrModule, Option
import orchestrator
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ CLICommand = VolumesCLICommand
COMMANDS = [
{
'cmd': 'fs volume ls',