# histograms.
pass
+ def toggle_channel(self, action, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable or disable a list of channels
+ '''
+ if not self.enabled:
+ # telemetry should be on for channels to be toggled
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ if channels is None:
+ msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
+ return 0, msg, ''
+
+ state = action == 'enable'
+ msg = ''
+ for c in channels:
+ if c not in ALL_CHANNELS:
+ msg = f"{msg}{c} is not a valid channel name. "\
+ f"Available channels: {ALL_CHANNELS}.\n"
+ else:
+ self.set_module_option(f"channel_{c}", state)
+ setattr(self,
+ f"channel_{c}",
+ state)
+ msg = f"{msg}channel_{c} is {action}d\n"
+
+ return 0, msg, ''
def restore_default_opt_setting(self, opt_name) -> None:
for o in self.MODULE_OPTIONS:
msg = 'Telemetry is now disabled. Channels settings are restored to default.'
return 0, msg, ''
+ @CLIReadCommand('telemetry enable channel')
+ def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable a list of channels
+ '''
+ return self.toggle_channel('enable', channels)
+
+ @CLIReadCommand('telemetry disable channel')
+ def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Disable a list of channels
+ '''
+ return self.toggle_channel('disable', channels)
+
@CLICommand('telemetry send')
def do_send(self,
endpoint: Optional[List[EndPoint]] = None,