From: John Mulligan Date: Thu, 18 May 2023 19:10:16 +0000 (-0400) Subject: mgr/cephadm: add exchange types and use them for deploy X-Git-Tag: v19.0.0~982^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=83f8a1b687506573b119804532ad37ad3ce0d393;p=ceph.git mgr/cephadm: add exchange types and use them for deploy Add an exchange module for defining the "exchange protocol" that the mgr module uses for communicating with the cephadm binary. The exchange module classes use data descriptors to define supported fields in the resulting data and will be serialized to JSON for communicating with cephadm. The cephadm binary does not use these types because it currently doesn't import anything outside of the standard library. A suggested future change would be to move 'exhange.py' to python-common somewhere and have the module be imported by both the mgr and cephadm and used for both serialization and deserialization. Signed-off-by: John Mulligan --- diff --git a/src/pybind/mgr/cephadm/exchange.py b/src/pybind/mgr/cephadm/exchange.py new file mode 100644 index 000000000000..22aa350e594b --- /dev/null +++ b/src/pybind/mgr/cephadm/exchange.py @@ -0,0 +1,164 @@ +# Data exchange formats for communicating more +# complex data structures between the cephadm binary +# an the mgr module. + +import json + +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + TypeVar, + Union, + cast, +) + + +FuncT = TypeVar("FuncT", bound=Callable) + + +class _DataField: + """A descriptor to map object fields into a data dictionary.""" + + def __init__( + self, + name: Optional[str] = None, + field_type: Optional[FuncT] = None, + ): + self.name = name + self.field_type = field_type + + def __set_name__(self, _: str, name: str) -> None: + if not self.name: + self.name = name + + def __get__(self, obj: Any, objtype: Any = None) -> Any: + return obj.data[self.name] + + def __set__(self, obj: Any, value: Any) -> None: + if self.field_type is not None: + obj.data[self.name] = self.field_type(value) + else: + obj.data[self.name] = value + + +def _get_data(obj: Any) -> Any: + """Wrapper to get underlying data dicts from objects that + advertise having them. + """ + _gd = getattr(obj, "get_data", None) + if _gd: + return _gd() + return obj + + +def _or_none(field_type: FuncT) -> FuncT: + def _field_type_or_none(value: Any) -> Any: + if value is None: + return None + return field_type(value) + + return cast(FuncT, _field_type_or_none) + + +class DeployMeta: + """Deployment metadata. Child of Deploy. Used by cephadm to + determine when certain changes have been made. + """ + + service_name = _DataField(field_type=str) + ports = _DataField(field_type=list) + ip = _DataField(field_type=_or_none(str)) + deployed_by = _DataField(field_type=_or_none(list)) + rank = _DataField(field_type=_or_none(int)) + rank_generation = _DataField(field_type=_or_none(int)) + extra_container_args = _DataField(field_type=_or_none(list)) + extra_entrypoint_args = _DataField(field_type=_or_none(list)) + + def __init__( + self, + init_data: Optional[Dict[str, Any]] = None, + *, + service_name: str = "", + ports: Optional[List[int]] = None, + ip: Optional[str] = None, + deployed_by: Optional[List[str]] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + extra_entrypoint_args: Optional[List[str]] = None, + ): + self.data = dict(init_data or {}) + # set fields + self.service_name = service_name + self.ports = ports or [] + self.ip = ip + self.deployed_by = deployed_by + self.rank = rank + self.rank_generation = rank_generation + self.extra_container_args = extra_container_args + self.extra_entrypoint_args = extra_entrypoint_args + + def get_data(self) -> Dict[str, Any]: + return self.data + + to_simplified = get_data + + @classmethod + def convert( + cls, + value: Union[Dict[str, Any], "DeployMeta", None], + ) -> "DeployMeta": + if not isinstance(value, DeployMeta): + return cls(value) + return value + + +class Deploy: + """Set of fields that instructs cephadm to deploy a + service/daemon. + """ + + fsid = _DataField(field_type=str) + name = _DataField(field_type=str) + image = _DataField(field_type=str) + deploy_arguments = _DataField(field_type=list) + params = _DataField(field_type=dict) + meta = _DataField(field_type=DeployMeta.convert) + config_blobs = _DataField(field_type=dict) + + def __init__( + self, + init_data: Optional[Dict[str, Any]] = None, + *, + fsid: str = "", + name: str = "", + image: str = "", + deploy_arguments: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, + meta: Optional[DeployMeta] = None, + config_blobs: Optional[Dict[str, Any]] = None, + ): + self.data = dict(init_data or {}) + # set fields + self.fsid = fsid + self.name = name + self.image = image + self.deploy_arguments = deploy_arguments or [] + self.params = params or {} + self.meta = DeployMeta.convert(meta) + self.config_blobs = config_blobs or {} + + def get_data(self) -> Dict[str, Any]: + """Return the underlying data dict.""" + return self.data + + def to_simplified(self) -> Dict[str, Any]: + """Return a simplified serializable version of the object.""" + return {k: _get_data(v) for k, v in self.get_data().items()} + + def dump_json_str(self) -> str: + """Return the object's JSON string representation.""" + return json.dumps(self.to_simplified()) diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 870a45b46bf1..67a7e8f332a8 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -25,6 +25,7 @@ from mgr_module import MonCommandFailed from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException from . import utils +from . import exchange if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -1271,24 +1272,23 @@ class CephadmServe: daemon_spec.name(), ['_orch', 'deploy'], [], - stdin=json.dumps({ - "fsid": self.mgr._cluster_fsid, - "name": daemon_spec.name(), - "image": image, - "deploy_arguments": daemon_spec.extra_args, - "params": daemon_params, - "meta": { - 'service_name': daemon_spec.service_name, - 'ports': daemon_spec.ports, - 'ip': daemon_spec.ip, - 'deployed_by': self.mgr.get_active_mgr_digests(), - 'rank': daemon_spec.rank, - 'rank_generation': daemon_spec.rank_generation, - 'extra_container_args': extra_container_args, - 'extra_entrypoint_args': extra_entrypoint_args - }, - "config_blobs": daemon_spec.final_config, - }), + stdin=exchange.Deploy( + fsid=self.mgr._cluster_fsid, + name=daemon_spec.name(), + image=image, + params=daemon_params, + meta=exchange.DeployMeta( + service_name=daemon_spec.service_name, + ports=daemon_spec.ports, + ip=daemon_spec.ip, + deployed_by=self.mgr.get_active_mgr_digests(), + rank=daemon_spec.rank, + rank_generation=daemon_spec.rank_generation, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + ), + config_blobs=daemon_spec.final_config, + ).dump_json_str(), ) if daemon_spec.daemon_type == 'agent':