]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/smb: add a python module to help manage the ctdb cluster
authorJohn Mulligan <jmulligan@redhat.com>
Mon, 15 Jul 2024 19:39:19 +0000 (15:39 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Tue, 20 Aug 2024 13:53:56 +0000 (09:53 -0400)
Add a new module clustermeta that implements a JSON based interface
compatible with sambacc. This module will be called directly by cephadm
as it places the daemons on the cluster nodes.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/clustermeta.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/smb/clustermeta.py b/src/pybind/mgr/smb/clustermeta.py
new file mode 100644 (file)
index 0000000..72949c5
--- /dev/null
@@ -0,0 +1,195 @@
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    NamedTuple,
+    Optional,
+    TypedDict,
+)
+
+import contextlib
+import logging
+import operator
+
+from . import rados_store
+from .proto import Simplified
+
+if TYPE_CHECKING:  # pragma: no cover
+    from mgr_module import MgrModule
+
+
+log = logging.getLogger(__name__)
+
+
+ClusterNodeEntry = TypedDict(
+    'ClusterNodeEntry',
+    {'pnn': int, 'identity': str, 'node': str, 'state': str},
+)
+
+CephDaemonInfo = TypedDict(
+    'CephDaemonInfo',
+    {'daemon_type': str, 'daemon_id': str, 'hostname': str, 'host_ip': str},
+)
+
+RankMap = Dict[int, Dict[int, Optional[str]]]
+DaemonMap = Dict[str, CephDaemonInfo]
+
+
+class _GenerationInfo(NamedTuple):
+    generation: int
+    name: Optional[str]
+
+
+def _current_generation(
+    generations: Dict[int, Optional[str]]
+) -> _GenerationInfo:
+    max_gen = max(generations.keys())
+    return _GenerationInfo(max_gen, generations[max_gen])
+
+
+class ClusterMeta:
+    def __init__(self) -> None:
+        self._data: Simplified = {'nodes': [], '_source': 'cephadm'}
+        self._orig = self._data
+
+    def to_simplified(self) -> Simplified:
+        return self._data
+
+    def load(self, data: Simplified) -> None:
+        if not data:
+            return
+        assert 'nodes' in data
+        self._data = data
+        self._orig = data
+
+    def modified(self) -> bool:
+        return self._data == self._orig
+
+    def sync_ranks(self, rank_map: RankMap, daemon_map: DaemonMap) -> None:
+        """Convert cephadm's ranks and node info into something sambacc
+        can understand and manage for ctdb.
+        """
+        log.debug('rank_map=%r, daemon_map=%r', rank_map, daemon_map)
+        log.debug('current data: %r', self._data)
+        if not (rank_map and daemon_map):
+            return
+        missing = set()
+        rank_max = -1
+        for rank, rankval in rank_map.items():
+            rank_max = max(rank_max, rank)
+            curr_entry = self._get_pnn(rank)
+            if not curr_entry:
+                missing.add(rank)
+                continue
+            # "reconcile" existing rank-pnn values
+            try:
+                ceph_entry = self._to_entry(
+                    rank, _current_generation(rankval).name, daemon_map
+                )
+            except KeyError as err:
+                log.warning(
+                    'daemon not available: %s not in %r', err, daemon_map
+                )
+                continue
+            if ceph_entry != curr_entry:
+                # TODO do proper state value transitions
+                log.debug("updating entry %r", ceph_entry)
+                self._replace_entry(ceph_entry)
+        if missing:
+            log.debug('adding new entries')
+            entries = []
+            for rank in missing:
+                try:
+                    entries.append(
+                        self._to_entry(
+                            rank,
+                            _current_generation(rank_map[rank]).name,
+                            daemon_map,
+                        )
+                    )
+                except KeyError as err:
+                    log.warning(
+                        'daemon not available: %s not in %r', err, daemon_map
+                    )
+                    continue
+            self._append_entries(entries)
+        pnn_max = self._pnn_max()
+        if pnn_max > rank_max:
+            log.debug('removing extra entries')
+            # need to "prune" entries
+            for pnn in range(rank_max + 1, pnn_max + 1):
+                entry = self._get_pnn(pnn)
+                assert entry
+                entry['state'] = 'gone'
+                self._replace_entry(entry)
+        log.debug('synced data: %r; modified=%s', self._data, self.modified())
+
+    def _nodes(self) -> List[ClusterNodeEntry]:
+        return [node for node in self._data['nodes']]
+
+    def _pnn_max(self) -> int:
+        return max((n['pnn'] for n in self._nodes()), default=0)
+
+    def _get_pnn(self, pnn: int) -> Optional[ClusterNodeEntry]:
+        nodes = self._nodes()
+        for value in nodes:
+            assert isinstance(value, dict)
+            if value['pnn'] == pnn:
+                return value
+        return None
+
+    def _sort_nodes(self) -> None:
+        self._data['nodes'].sort(key=operator.itemgetter('pnn'))
+
+    def _replace_entry(self, entry: ClusterNodeEntry) -> None:
+        assert isinstance(entry, dict)
+        pnn = entry['pnn']
+        self._data['nodes'] = [e for e in self._nodes() if e['pnn'] != pnn]
+        self._data['nodes'].append(entry)
+        self._sort_nodes()
+        log.debug('_replace_entry updated data=%r', self._data)
+
+    def _append_entries(
+        self, new_entries: Iterable[ClusterNodeEntry]
+    ) -> None:
+        self._data['nodes'].extend(new_entries)
+        self._sort_nodes()
+        log.debug('_append_entries updated data=%r', self._data)
+
+    def _to_entry(
+        self, rank: int, name: Optional[str], daemon_map: DaemonMap
+    ) -> ClusterNodeEntry:
+        assert name
+        name = f'smb.{name}'
+        di = daemon_map[name]
+        return {
+            'pnn': rank,
+            'identity': name,
+            'node': di['host_ip'],
+            'state': 'ready',
+        }
+
+
+_LOCK_NAME = "cluster_meta"
+
+
+@contextlib.contextmanager
+def rados_object(mgr: 'MgrModule', uri: str) -> Iterator[ClusterMeta]:
+    """Return a cluster meta object that will store persistent data in rados."""
+    pool, ns, objname = rados_store.parse_uri(uri)
+    store = rados_store.RADOSConfigStore.init(mgr, pool)
+
+    cmeta = ClusterMeta()
+    previous = {}
+    entry = store[ns, objname]
+    try:
+        with entry.locked(_LOCK_NAME):
+            previous = entry.get()
+    except KeyError:
+        log.debug('no previous object %s found', uri)
+    cmeta.load(previous)
+    yield cmeta
+    with entry.locked(_LOCK_NAME):
+        entry.set(cmeta.to_simplified())