"""
self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
+ def tool_exec(
+ self,
+ args: List[str],
+ timeout: int = 10,
+ stdin: Optional[bytes] = None
+ ) -> Tuple[int, str, str]:
+ try:
+ tool = args.pop(0)
+ cmd = [
+ tool,
+ '-k', str(self.get_ceph_option('keyring')),
+ '-n', f'mgr.{self.get_mgr_id()}',
+ ] + args
+ self.log.debug('exec: ' + ' '.join(cmd))
+ p = subprocess.run(
+ cmd,
+ input=stdin,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ timeout=timeout,
+ )
+ except subprocess.TimeoutExpired as ex:
+ self.log.error(ex)
+ return -errno.ETIMEDOUT, '', str(ex)
+ if p.returncode:
+ self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
+ return p.returncode, p.stdout.decode(), p.stderr.decode()
+
def set_health_checks(self, checks: HealthChecksT) -> None:
"""
Set the module's current map of health checks. Argument is a