else:
from typing_extensions import Literal
+import cephfs
import inspect
import logging
import errno
# Keep a librados instance for those that need it.
self._rados: Optional[rados.Rados] = None
+ self._cephfs: Optional[cephfs.LibCephFS] = None
+
# this does not change over the lifetime of an active mgr
self._mgr_ips: Optional[str] = None
return (rc, stdout, stderr)
+ class _CommandResultWrapper:
+ def __init__(self, module: 'MgrModule', tag: Optional[str], result: CommandResult):
+ if tag is None:
+ tag = ""
+ self.module = module
+ self.tag = tag
+ self.result = result
+
+ def complete(self, r: int, outb: bytes, outs: bytes) -> None:
+ self.result.complete(r, outb.decode('utf-8'), outs.decode('utf-8'))
+ self.module._ceph_notify_all("command", self.tag)
+
def send_command(
self,
result: CommandResult,
:param bool one_shot: a keyword-only param to make the command abort
with EPIPE when the target resets or refuses to reconnect
"""
- self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf, one_shot=one_shot)
+
+ if svc_type == "mds":
+ wrapped_result = self._CommandResultWrapper(self, tag, result)
+ self.log.info(f"do mds_command: mds.{svc_id} {command}")
+ self.cephfs.mds_command2(wrapped_result, svc_id, command, inbuf, one_shot=one_shot)
+ else:
+ self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf, one_shot=one_shot)
def tool_exec(
self,
self._ceph_register_client(None, self._rados.get_addrs(), False)
return self._rados
+ @property
+ def cephfs(self) -> cephfs.LibCephFS:
+ """
+ An (unmounted) cephfs instance to be shared by any classes within this
+ mgr module that want one.
+ """
+ if self._cephfs:
+ return self._cephfs
+
+ self._cephfs = cephfs.LibCephFS(rados_inst=self.rados)
+ self._cephfs.init()
+ return self._cephfs
+
@staticmethod
def can_run() -> Tuple[bool, str]:
"""