From: John Mulligan Date: Mon, 15 Jul 2024 19:39:19 +0000 (-0400) Subject: mgr/smb: add a python module to help manage the ctdb cluster X-Git-Tag: testing/wip-vshankar-testing-20240826.122843-debug~13^2~9 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=cd2d01e9f5152dcfd200273825802e3b3c04ca92;p=ceph-ci.git mgr/smb: add a python module to help manage the ctdb cluster Add a new module clustermeta that implements a JSON based interface compatible with sambacc. This module will be called directly by cephadm as it places the daemons on the cluster nodes. Signed-off-by: John Mulligan --- diff --git a/src/pybind/mgr/smb/clustermeta.py b/src/pybind/mgr/smb/clustermeta.py new file mode 100644 index 00000000000..72949c5d905 --- /dev/null +++ b/src/pybind/mgr/smb/clustermeta.py @@ -0,0 +1,195 @@ +from typing import ( + TYPE_CHECKING, + Dict, + Iterable, + Iterator, + List, + NamedTuple, + Optional, + TypedDict, +) + +import contextlib +import logging +import operator + +from . import rados_store +from .proto import Simplified + +if TYPE_CHECKING: # pragma: no cover + from mgr_module import MgrModule + + +log = logging.getLogger(__name__) + + +ClusterNodeEntry = TypedDict( + 'ClusterNodeEntry', + {'pnn': int, 'identity': str, 'node': str, 'state': str}, +) + +CephDaemonInfo = TypedDict( + 'CephDaemonInfo', + {'daemon_type': str, 'daemon_id': str, 'hostname': str, 'host_ip': str}, +) + +RankMap = Dict[int, Dict[int, Optional[str]]] +DaemonMap = Dict[str, CephDaemonInfo] + + +class _GenerationInfo(NamedTuple): + generation: int + name: Optional[str] + + +def _current_generation( + generations: Dict[int, Optional[str]] +) -> _GenerationInfo: + max_gen = max(generations.keys()) + return _GenerationInfo(max_gen, generations[max_gen]) + + +class ClusterMeta: + def __init__(self) -> None: + self._data: Simplified = {'nodes': [], '_source': 'cephadm'} + self._orig = self._data + + def to_simplified(self) -> Simplified: + return self._data + + def load(self, data: Simplified) -> None: + if not data: + return + assert 'nodes' in data + self._data = data + self._orig = data + + def modified(self) -> bool: + return self._data == self._orig + + def sync_ranks(self, rank_map: RankMap, daemon_map: DaemonMap) -> None: + """Convert cephadm's ranks and node info into something sambacc + can understand and manage for ctdb. + """ + log.debug('rank_map=%r, daemon_map=%r', rank_map, daemon_map) + log.debug('current data: %r', self._data) + if not (rank_map and daemon_map): + return + missing = set() + rank_max = -1 + for rank, rankval in rank_map.items(): + rank_max = max(rank_max, rank) + curr_entry = self._get_pnn(rank) + if not curr_entry: + missing.add(rank) + continue + # "reconcile" existing rank-pnn values + try: + ceph_entry = self._to_entry( + rank, _current_generation(rankval).name, daemon_map + ) + except KeyError as err: + log.warning( + 'daemon not available: %s not in %r', err, daemon_map + ) + continue + if ceph_entry != curr_entry: + # TODO do proper state value transitions + log.debug("updating entry %r", ceph_entry) + self._replace_entry(ceph_entry) + if missing: + log.debug('adding new entries') + entries = [] + for rank in missing: + try: + entries.append( + self._to_entry( + rank, + _current_generation(rank_map[rank]).name, + daemon_map, + ) + ) + except KeyError as err: + log.warning( + 'daemon not available: %s not in %r', err, daemon_map + ) + continue + self._append_entries(entries) + pnn_max = self._pnn_max() + if pnn_max > rank_max: + log.debug('removing extra entries') + # need to "prune" entries + for pnn in range(rank_max + 1, pnn_max + 1): + entry = self._get_pnn(pnn) + assert entry + entry['state'] = 'gone' + self._replace_entry(entry) + log.debug('synced data: %r; modified=%s', self._data, self.modified()) + + def _nodes(self) -> List[ClusterNodeEntry]: + return [node for node in self._data['nodes']] + + def _pnn_max(self) -> int: + return max((n['pnn'] for n in self._nodes()), default=0) + + def _get_pnn(self, pnn: int) -> Optional[ClusterNodeEntry]: + nodes = self._nodes() + for value in nodes: + assert isinstance(value, dict) + if value['pnn'] == pnn: + return value + return None + + def _sort_nodes(self) -> None: + self._data['nodes'].sort(key=operator.itemgetter('pnn')) + + def _replace_entry(self, entry: ClusterNodeEntry) -> None: + assert isinstance(entry, dict) + pnn = entry['pnn'] + self._data['nodes'] = [e for e in self._nodes() if e['pnn'] != pnn] + self._data['nodes'].append(entry) + self._sort_nodes() + log.debug('_replace_entry updated data=%r', self._data) + + def _append_entries( + self, new_entries: Iterable[ClusterNodeEntry] + ) -> None: + self._data['nodes'].extend(new_entries) + self._sort_nodes() + log.debug('_append_entries updated data=%r', self._data) + + def _to_entry( + self, rank: int, name: Optional[str], daemon_map: DaemonMap + ) -> ClusterNodeEntry: + assert name + name = f'smb.{name}' + di = daemon_map[name] + return { + 'pnn': rank, + 'identity': name, + 'node': di['host_ip'], + 'state': 'ready', + } + + +_LOCK_NAME = "cluster_meta" + + +@contextlib.contextmanager +def rados_object(mgr: 'MgrModule', uri: str) -> Iterator[ClusterMeta]: + """Return a cluster meta object that will store persistent data in rados.""" + pool, ns, objname = rados_store.parse_uri(uri) + store = rados_store.RADOSConfigStore.init(mgr, pool) + + cmeta = ClusterMeta() + previous = {} + entry = store[ns, objname] + try: + with entry.locked(_LOCK_NAME): + previous = entry.get() + except KeyError: + log.debug('no previous object %s found', uri) + cmeta.load(previous) + yield cmeta + with entry.locked(_LOCK_NAME): + entry.set(cmeta.to_simplified())