]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/smb: add handler.py the main business logic & transaction core
authorJohn Mulligan <jmulligan@redhat.com>
Tue, 30 Jan 2024 19:15:29 +0000 (14:15 -0500)
committerJohn Mulligan <jmulligan@redhat.com>
Thu, 25 Apr 2024 23:10:39 +0000 (19:10 -0400)
Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/handler.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/smb/handler.py b/src/pybind/mgr/smb/handler.py
new file mode 100644 (file)
index 0000000..387d0a4
--- /dev/null
@@ -0,0 +1,987 @@
+from typing import (
+    Any,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+    cast,
+)
+
+import logging
+import time
+
+from ceph.deployment.service_spec import SMBSpec
+
+from . import config_store, external, resources
+from .enums import (
+    AuthMode,
+    CephFSStorageProvider,
+    Intent,
+    JoinSourceType,
+    State,
+    UserGroupSourceType,
+)
+from .internal import (
+    ClusterEntry,
+    JoinAuthEntry,
+    ResourceEntry,
+    ShareEntry,
+    UsersAndGroupsEntry,
+)
+from .proto import (
+    AccessAuthorizer,
+    ConfigEntry,
+    ConfigStore,
+    OrchSubmitter,
+    PathResolver,
+    Simplified,
+    checked,
+)
+from .resources import SMBResource
+from .results import ErrorResult, Result, ResultGroup
+
+ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
+ShareRef = Union[resources.Share, resources.RemovedShare]
+
+_DOMAIN = 'domain'
+log = logging.getLogger(__name__)
+
+
+class InvalidResourceMatch(ValueError):
+    pass
+
+
+class ClusterChangeGroup:
+    """A bag of holding for items being modified and thus needing synchronizing
+    with the external stores & components.
+    """
+
+    def __init__(
+        self,
+        cluster: ClusterRef,
+        shares: List[resources.Share],
+        join_auths: List[resources.JoinAuth],
+        users_and_groups: List[resources.UsersAndGroups],
+    ):
+        self.cluster = cluster
+        self.shares = shares
+        self.join_auths = join_auths
+        self.users_and_groups = users_and_groups
+        # a cache for modified entries
+        self.cache = config_store.EntryCache()
+
+    def cache_updated_entry(self, entry: ConfigEntry) -> None:
+        self.cache[entry.full_key] = entry
+
+
+class _FakePathResolver:
+    """A stub PathResolver for unit testing."""
+
+    def resolve(
+        self, volume: str, subvolumegroup: str, subvolume: str, path: str
+    ) -> str:
+        path = path.lstrip('/')
+        if subvolumegroup or subvolume:
+            import uuid
+
+            # mimic the uuid found in a real ceph subvolume path
+            # by deriving a uuid from the existing values we have
+            vid = str(
+                uuid.uuid3(
+                    uuid.NAMESPACE_URL,
+                    f'cephfs+{volume}:{subvolumegroup}:{subvolume}',
+                )
+            )
+            subvolumegroup = subvolumegroup or '_nogroup'
+            return f'/volumes/{subvolumegroup}/{subvolume}/{vid}/{path}'
+        return f'/{path}'
+
+    resolve_exists = resolve
+
+
+class _FakeAuthorizer:
+    """A stub AccessAuthorizer for unit testing."""
+
+    def authorize_entity(
+        self, volume: str, entity: str, caps: str = ''
+    ) -> None:
+        pass
+
+
+class _Matcher:
+    def __init__(self) -> None:
+        self._contents: Set[Any] = set()
+        self._inputs: Set[str] = set()
+
+    def __str__(self) -> str:
+        if not self._contents:
+            return 'match-all'
+        return 'match-resources:' + ','.join(self._inputs)
+
+    def __contains__(self, value: Any) -> bool:
+        if not self._contents:
+            return True
+        if not isinstance(value, tuple):
+            return value in self._contents
+        assert len(value) > 1
+        return (
+            # match a specific resource id
+            value in self._contents
+            # match all ids of a given resource type
+            or (value[0], None) in self._contents
+            # match a all partial ids (shares only)
+            or (
+                len(value) == 3
+                and (value[0], value[1], None) in self._contents
+            )
+        )
+
+    def parse(self, txt: str) -> None:
+        rtypes: Dict[str, Any] = {
+            cast(Any, r).resource_type: r
+            for r in (
+                resources.Cluster,
+                resources.Share,
+                resources.JoinAuth,
+                resources.UsersAndGroups,
+            )
+        }
+        if txt in rtypes:
+            resource_cls = rtypes[txt]
+            self._contents.add(resource_cls)
+            self._contents.add((resource_cls, None))
+            self._inputs.add(txt)
+            return
+        try:
+            prefix, id_a = txt.rsplit('.', 1)
+            resource_cls = rtypes[prefix]
+            self._contents.add(resource_cls)
+            self._contents.add((resource_cls, id_a))
+            self._contents.add((resource_cls, id_a, None))
+            self._inputs.add(txt)
+            return
+        except (ValueError, KeyError):
+            pass
+        try:
+            prefix, id_a, id_b = txt.rsplit('.', 2)
+            resource_cls = rtypes[prefix]
+            self._contents.add(resource_cls)
+            self._contents.add((resource_cls, id_a, id_b))
+            self._inputs.add(txt)
+            return
+        except (ValueError, KeyError):
+            pass
+        raise InvalidResourceMatch(
+            f'{txt!r} does not match a valid resource type'
+        )
+
+
+class ClusterConfigHandler:
+    """The central class for ingesting and handling smb configuration change
+    requests.
+
+    The ClusterConfigHandler works in roughly three phases:
+    1. Validation - for the resources being updated makes sure they're valid
+                    internally and also performs basic consistency checks.
+    2. Update     - updates the internal configuration store to persist the
+                    new resource objects
+    3. Sync'ing   - convert internal resources to externally usable data and
+                    update external components as needed.
+                    (see also "reconciliation")
+
+    It makes use of three data stores.
+    * internal_store: items that belong to the smb module. Generally, our
+      own saved resource types.
+    * public_store: A public store that is meant for sharing configuration data
+      with other processes. It is intended for non-sensitive general
+      configuration data
+    * priv_store: A priv(ate/ileged) store that is also meant for sharing data
+      with other processes. But unlike public store this data might be
+      sensitive.
+
+    Note that these stores are permitted to overlap. A public_store and
+    priv_store could use the exact same store object if the caller configures
+    the ClusterConfigHandler that way. This is very much expected when
+    executed in unit/other tests. Do NOT assume the keys in stores are mutually
+    exclusive!
+
+    This class also exposes some extra functionality for reading/iterating
+    the internal store so that the mgr module can be largely encapsulated
+    away from the store(s).
+    """
+
+    def __init__(
+        self,
+        *,
+        internal_store: ConfigStore,
+        public_store: ConfigStore,
+        priv_store: ConfigStore,
+        path_resolver: Optional[PathResolver] = None,
+        authorizer: Optional[AccessAuthorizer] = None,
+        orch: Optional[OrchSubmitter] = None,
+    ) -> None:
+        self.internal_store = internal_store
+        self.public_store = public_store
+        self.priv_store = priv_store
+        if path_resolver is None:
+            path_resolver = _FakePathResolver()
+        self._path_resolver: PathResolver = path_resolver
+        if authorizer is None:
+            authorizer = _FakeAuthorizer()
+        self._authorizer: AccessAuthorizer = authorizer
+        self._orch = orch  # if None, disables updating the spec via orch
+        log.info(
+            'Initialized new ClusterConfigHandler with'
+            f' internal store {self.internal_store!r},'
+            f' public store {self.public_store!r},'
+            f' priv store {self.priv_store!r},'
+            f' path resolver {self._path_resolver!r},'
+            f' authorizer {self._authorizer!r},'
+            f' orch {self._orch!r}'
+        )
+
+    def apply(self, inputs: Iterable[SMBResource]) -> ResultGroup:
+        log.debug('applying changes to internal data store')
+        results = ResultGroup()
+        for resource in self._order_inputs(inputs):
+            try:
+                result = self._update_resource(resource)
+            except ErrorResult as err:
+                result = err
+            except Exception as err:
+                log.exception("error updating resource")
+                result = ErrorResult(resource, msg=str(err))
+            results.append(result)
+        if results.success:
+            log.debug(
+                'successfully updated %s resources. syncing changes to public stores',
+                len(list(results)),
+            )
+            self._sync_modified(results)
+        return results
+
+    def cluster_ids(self) -> List[str]:
+        return list(ClusterEntry.ids(self.internal_store))
+
+    def share_ids(self) -> List[Tuple[str, str]]:
+        return list(ShareEntry.ids(self.internal_store))
+
+    def share_ids_by_cluster(self) -> Dict[str, List[str]]:
+        out: Dict[str, List[str]] = {}
+        for cluster_id, share_id in ShareEntry.ids(self.internal_store):
+            out.setdefault(cluster_id, []).append(share_id)
+        return out
+
+    def join_auth_ids(self) -> List[str]:
+        return list(JoinAuthEntry.ids(self.internal_store))
+
+    def user_and_group_ids(self) -> List[str]:
+        return list(UsersAndGroupsEntry.ids(self.internal_store))
+
+    def all_resources(self) -> List[SMBResource]:
+        return self._search_resources(_Matcher())
+
+    def matching_resources(self, names: List[str]) -> List[SMBResource]:
+        matcher = _Matcher()
+        for name in names:
+            matcher.parse(name)
+        return self._search_resources(matcher)
+
+    def _search_resources(self, matcher: _Matcher) -> List[SMBResource]:
+        log.debug("performing search with matcher: %s", matcher)
+        out: List[SMBResource] = []
+        if resources.Cluster in matcher or resources.Share in matcher:
+            log.debug("searching for clusters and/or shares")
+            cluster_shares = self.share_ids_by_cluster()
+            for cluster_id in self.cluster_ids():
+                if (resources.Cluster, cluster_id) in matcher:
+                    out.append(self._cluster_entry(cluster_id).get_cluster())
+                for share_id in cluster_shares[cluster_id]:
+                    if (resources.Share, cluster_id, share_id) in matcher:
+                        out.append(
+                            self._share_entry(
+                                cluster_id, share_id
+                            ).get_share()
+                        )
+        if resources.JoinAuth in matcher:
+            log.debug("searching for join auths")
+            for auth_id in self.join_auth_ids():
+                if (resources.JoinAuth, auth_id) in matcher:
+                    out.append(self._join_auth_entry(auth_id).get_join_auth())
+        if resources.UsersAndGroups in matcher:
+            log.debug("searching for users and groups")
+            for ug_id in self.user_and_group_ids():
+                if (resources.UsersAndGroups, ug_id) in matcher:
+                    out.append(
+                        self._users_and_groups_entry(
+                            ug_id
+                        ).get_users_and_groups()
+                    )
+        log.debug("search found %d resources", len(out))
+        return out
+
+    def _order_inputs(
+        self, inputs: Iterable[SMBResource]
+    ) -> List[SMBResource]:
+        """Sort resource objects by type so that the user can largely input
+        objects freely but that references map out cleanly.
+        """
+
+        def _keyfunc(r: SMBResource) -> int:
+            if isinstance(r, resources.RemovedShare):
+                return -2
+            if isinstance(r, resources.RemovedCluster):
+                return -1
+            if isinstance(r, resources.Share):
+                return 2
+            if isinstance(r, resources.Cluster):
+                return 1
+            return 0
+
+        return sorted(inputs, key=_keyfunc)
+
+    def _update_resource(self, resource: SMBResource) -> Result:
+        """Update the internal store with a new resource object."""
+        entry: ResourceEntry
+        log.debug('updating resource: %r', resource)
+        if isinstance(
+            resource, (resources.Cluster, resources.RemovedCluster)
+        ):
+            self._check_cluster(resource)
+            entry = self._cluster_entry(resource.cluster_id)
+        elif isinstance(resource, (resources.Share, resources.RemovedShare)):
+            self._check_share(resource)
+            entry = self._share_entry(resource.cluster_id, resource.share_id)
+        elif isinstance(resource, resources.JoinAuth):
+            self._check_join_auths(resource)
+            entry = self._join_auth_entry(resource.auth_id)
+        elif isinstance(resource, resources.UsersAndGroups):
+            self._check_users_and_groups(resource)
+            entry = self._users_and_groups_entry(resource.users_groups_id)
+        else:
+            raise TypeError('not a valid smb resource')
+        state = self._save(entry, resource)
+        result = Result(resource, success=True, status={'state': state})
+        log.debug('saved resource: %r; state: %s', resource, state)
+        return result
+
+    def _save(self, entry: ResourceEntry, resource: SMBResource) -> State:
+        # Returns the Intent indicating the previous state.
+        if resource.intent == Intent.REMOVED:
+            removed = entry.remove()
+            return State.REMOVED if removed else State.NOT_PRESENT
+        return entry.create_or_update(resource)
+
+    def _sync_clusters(
+        self, modified_cluster_ids: Optional[Collection[str]] = None
+    ) -> None:
+        """Trigger synchronization for all the clusters listed in
+        `modified_cluster_ids` or all clusters if None.
+        """
+        share_ids = self.share_ids()
+        present_cluster_ids = set()
+        removed_cluster_ids = set()
+        change_groups = []
+        cluster_ids = modified_cluster_ids or ClusterEntry.ids(
+            self.internal_store
+        )
+        log.debug(
+            'syncing %s clusters: %s',
+            'all' if not modified_cluster_ids else 'selected',
+            ' '.join(cluster_ids),
+        )
+        for cluster_id in cluster_ids:
+            entry = self._cluster_entry(cluster_id)
+            try:
+                cluster = entry.get_cluster()
+            except KeyError:
+                removed_cluster_ids.add(cluster_id)
+                continue
+            present_cluster_ids.add(cluster_id)
+            change_group = ClusterChangeGroup(
+                cluster,
+                [
+                    self._share_entry(cid, shid).get_share()
+                    for cid, shid in share_ids
+                    if cid == cluster_id
+                ],
+                [
+                    self._join_auth_entry(_id).get_join_auth()
+                    for _id in _auth_refs(cluster)
+                ],
+                [
+                    self._users_and_groups_entry(_id).get_users_and_groups()
+                    for _id in _ug_refs(cluster)
+                ],
+            )
+            change_groups.append(change_group)
+        for change_group in change_groups:
+            self._save_cluster_settings(change_group)
+
+        # if there are clusters in the public store, that don't exist
+        # in the internal store, we need to clean them up.
+        if not modified_cluster_ids:
+            ext_ids = set(
+                external.stored_cluster_ids(
+                    self.public_store, self.priv_store
+                )
+            )
+            removed_cluster_ids = ext_ids - set(cluster_ids)
+        for cluster_id in removed_cluster_ids:
+            self._remove_cluster(cluster_id)
+
+    def _sync_modified(self, updated: ResultGroup) -> None:
+        cluster_ids = self._find_modifications(updated)
+        self._sync_clusters(cluster_ids)
+
+    def _find_modifications(self, updated: ResultGroup) -> Collection[str]:
+        """Given a ResultGroup tracking what was recently updated in the
+        internal store, return all cluster_ids that may need external syncing.
+        """
+        # this initial version is going to take a simplistic approach and try
+        # to broadly collect anything that could be a change.
+        # Later, this function can be refined to trigger fewer changes by looking
+        # at the objects in more detail any only producing a change group for
+        # something that really has been modified.
+        chg_cluster_ids: Set[str] = set()
+        chg_join_ids: Set[str] = set()
+        chg_ug_ids: Set[str] = set()
+        for result in updated:
+            state = (result.status or {}).get('state', None)
+            if state in (State.PRESENT, State.NOT_PRESENT):
+                # these are the no-change states. we can ignore them
+                continue
+            if isinstance(
+                result.src, (resources.Cluster, resources.RemovedCluster)
+            ):
+                chg_cluster_ids.add(result.src.cluster_id)
+            elif isinstance(
+                result.src, (resources.Share, resources.RemovedShare)
+            ):
+                # shares always belong to one cluster
+                chg_cluster_ids.add(result.src.cluster_id)
+            elif isinstance(result.src, resources.JoinAuth):
+                chg_join_ids.add(result.src.auth_id)
+            elif isinstance(result.src, resources.UsersAndGroups):
+                chg_ug_ids.add(result.src.users_groups_id)
+
+        # TODO: here's a lazy bit. if any join auths or users/groups changed we
+        # will regen all clusters because these can be shared by >1 cluster.
+        # In future, make this only pick clusters using the named resources.
+        if chg_join_ids or chg_ug_ids:
+            chg_cluster_ids.update(ClusterEntry.ids(self.internal_store))
+        return chg_cluster_ids
+
+    def _save_cluster_settings(
+        self, change_group: ClusterChangeGroup
+    ) -> None:
+        """Save the external facing objects. Tickle the external components."""
+        log.debug(
+            'saving external store for cluster: %s',
+            change_group.cluster.cluster_id,
+        )
+        # vols: hold the cephfs volumes our shares touch. some operations are
+        # disabled/skipped unless we touch volumes.
+        vols = {share.checked_cephfs.volume for share in change_group.shares}
+        data_entity = _cephx_data_entity(change_group.cluster.cluster_id)
+        # save the various object types
+        previous_info = _swap_pending_cluster_info(
+            self.public_store,
+            change_group,
+            orch_needed=bool(vols and self._orch),
+        )
+        _save_pending_join_auths(self.priv_store, change_group)
+        _save_pending_users_and_groups(self.priv_store, change_group)
+        _save_pending_config(
+            self.public_store,
+            change_group,
+            self._path_resolver,
+            data_entity,
+        )
+        # remove any stray objects
+        external.rm_other_in_ns(
+            self.priv_store,
+            change_group.cluster.cluster_id,
+            set(change_group.cache),
+        )
+        external.rm_other_in_ns(
+            self.public_store,
+            change_group.cluster.cluster_id,
+            set(change_group.cache),
+        )
+
+        # ensure a entity exists with access to the volumes
+        for volume in vols:
+            self._authorizer.authorize_entity(volume, data_entity)
+        if not vols:
+            # there were no volumes, and thus nothing to authorize. set data_entity
+            # to an empty string to avoid adding it to the svc spec later.
+            data_entity = ''
+
+        # build a service spec for smb cluster
+        cluster = change_group.cluster
+        assert isinstance(cluster, resources.Cluster)
+        config_entries = [
+            change_group.cache[external.config_key(cluster.cluster_id)],
+            self.public_store[
+                external.config_key(cluster.cluster_id, override=True)
+            ],
+        ]
+        join_source_entries = [
+            change_group.cache[(cluster.cluster_id, key)]
+            for key in external.stored_join_source_keys(
+                change_group.cache, cluster.cluster_id
+            )
+        ]
+        user_source_entries = [
+            change_group.cache[(cluster.cluster_id, key)]
+            for key in external.stored_usergroup_source_keys(
+                change_group.cache, cluster.cluster_id
+            )
+        ]
+        smb_spec = _generate_smb_service_spec(
+            cluster,
+            config_entries=config_entries,
+            join_source_entries=join_source_entries,
+            user_source_entries=user_source_entries,
+            data_entity=data_entity,
+        )
+        _save_pending_spec_backup(self.public_store, change_group, smb_spec)
+        # if orch was ever needed in the past we must "re-orch", but if we have
+        # no volumes and never orch'ed before wait until we have something to
+        # share before orchestrating the smb cluster. This is done because we
+        # need volumes in order to have cephx keys that we pass to the services
+        # via orch.  This differs from NFS because ganesha embeds the cephx
+        # keys directly in each export definition block while samba needs the
+        # ceph keyring to load keys.
+        previous_orch = previous_info.get('orch_needed', False)
+        if self._orch and (vols or previous_orch):
+            self._orch.submit_smb_spec(smb_spec)
+
+    def _remove_cluster(self, cluster_id: str) -> None:
+        log.info('Removing cluster: %s', cluster_id)
+        spec_key = external.spec_backup_key(cluster_id)
+        if self.public_store[spec_key].exists() and self._orch:
+            service_name = f'smb.{cluster_id}'
+            log.debug('Removing smb orch service: %r', service_name)
+            self._orch.remove_smb_service(service_name)
+        external.rm_cluster(self.priv_store, cluster_id)
+        external.rm_cluster(self.public_store, cluster_id)
+
+    def _check_cluster(self, cluster: ClusterRef) -> None:
+        """Check that the cluster resource can be updated."""
+        if cluster.intent == Intent.REMOVED:
+            share_ids = ShareEntry.ids(self.internal_store)
+            clusters_used = {cid for cid, _ in share_ids}
+            if cluster.cluster_id in clusters_used:
+                raise ErrorResult(
+                    cluster,
+                    msg="cluster in use by shares",
+                    status={
+                        'clusters': [
+                            shid
+                            for cid, shid in share_ids
+                            if cid == cluster.cluster_id
+                        ]
+                    },
+                )
+            return
+        assert isinstance(cluster, resources.Cluster)
+        cluster.validate()
+
+    def _check_share(self, share: ShareRef) -> None:
+        """Check that the share resource can be updated."""
+        if share.intent == Intent.REMOVED:
+            return
+        assert isinstance(share, resources.Share)
+        share.validate()
+        if share.cluster_id not in ClusterEntry.ids(self.internal_store):
+            raise ErrorResult(
+                share,
+                msg="no matching cluster id",
+                status={"cluster_id": share.cluster_id},
+            )
+        assert share.cephfs is not None
+        try:
+            self._path_resolver.resolve_exists(
+                share.cephfs.volume,
+                share.cephfs.subvolumegroup,
+                share.cephfs.subvolume,
+                share.cephfs.path,
+            )
+        except (FileNotFoundError, NotADirectoryError):
+            raise ErrorResult(
+                share, msg="path is not a valid directory in volume"
+            )
+
+    def _check_join_auths(self, join_auth: resources.JoinAuth) -> None:
+        """Check that the JoinAuth resource can be updated."""
+        if join_auth.intent == Intent.PRESENT:
+            return  # adding is always ok
+        refs_in_use: Dict[str, List[str]] = {}
+        for cluster_id in ClusterEntry.ids(self.internal_store):
+            cluster = self._cluster_entry(cluster_id).get_cluster()
+            for ref in _auth_refs(cluster):
+                refs_in_use.setdefault(ref, []).append(cluster_id)
+        log.debug('refs_in_use: %r', refs_in_use)
+        if join_auth.auth_id in refs_in_use:
+            raise ErrorResult(
+                join_auth,
+                msg='join auth resource in use by clusters',
+                status={
+                    'clusters': refs_in_use[join_auth.auth_id],
+                },
+            )
+
+    def _check_users_and_groups(
+        self, users_and_groups: resources.UsersAndGroups
+    ) -> None:
+        """Check that the UsersAndGroups resource can be updated."""
+        if users_and_groups.intent == Intent.PRESENT:
+            return  # adding is always ok
+        refs_in_use: Dict[str, List[str]] = {}
+        for cluster_id in ClusterEntry.ids(self.internal_store):
+            cluster = self._cluster_entry(cluster_id).get_cluster()
+            for ref in _ug_refs(cluster):
+                refs_in_use.setdefault(ref, []).append(cluster_id)
+        log.debug('refs_in_use: %r', refs_in_use)
+        if users_and_groups.users_groups_id in refs_in_use:
+            raise ErrorResult(
+                users_and_groups,
+                msg='users and groups resource in use by clusters',
+                status={
+                    'clusters': refs_in_use[users_and_groups.users_groups_id],
+                },
+            )
+
+    def _cluster_entry(self, cluster_id: str) -> ClusterEntry:
+        return ClusterEntry.from_store(self.internal_store, cluster_id)
+
+    def _share_entry(self, cluster_id: str, share_id: str) -> ShareEntry:
+        return ShareEntry.from_store(
+            self.internal_store, cluster_id, share_id
+        )
+
+    def _join_auth_entry(self, auth_id: str) -> JoinAuthEntry:
+        return JoinAuthEntry.from_store(self.internal_store, auth_id)
+
+    def _users_and_groups_entry(self, ug_id: str) -> UsersAndGroupsEntry:
+        return UsersAndGroupsEntry.from_store(self.internal_store, ug_id)
+
+    def generate_config(self, cluster_id: str) -> Dict[str, Any]:
+        """Demo function that generates a config on demand."""
+        cluster = self._cluster_entry(cluster_id).get_cluster()
+        shares = [
+            self._share_entry(cluster_id, shid).get_share()
+            for shid in self.share_ids_by_cluster()[cluster_id]
+        ]
+        return _generate_config(
+            cluster,
+            shares,
+            self._path_resolver,
+            _cephx_data_entity(cluster_id),
+        )
+
+    def generate_smb_service_spec(self, cluster_id: str) -> SMBSpec:
+        """Demo function that generates a smb service spec on demand."""
+        cluster = self._cluster_entry(cluster_id).get_cluster()
+        # if the user manually puts custom configurations (aka "override"
+        # configs) in the store, use that in favor of the generated config.
+        # this is mainly intended for development/test
+        config_entries = [
+            self.public_store[external.config_key(cluster_id)],
+            self.public_store[external.config_key(cluster_id, override=True)],
+        ]
+        join_source_entries = [
+            self.priv_store[(cluster_id, key)]
+            for key in external.stored_join_source_keys(
+                self.priv_store, cluster_id
+            )
+        ]
+        user_source_entries = [
+            self.priv_store[(cluster_id, key)]
+            for key in external.stored_usergroup_source_keys(
+                self.priv_store, cluster_id
+            )
+        ]
+        return _generate_smb_service_spec(
+            cluster,
+            config_entries=config_entries,
+            join_source_entries=join_source_entries,
+            user_source_entries=user_source_entries,
+        )
+
+
+def _auth_refs(cluster: resources.Cluster) -> Collection[str]:
+    if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
+        return set()
+    return {
+        j.ref
+        for j in checked(cluster.domain_settings).join_sources
+        if j.source_type == JoinSourceType.RESOURCE and j.ref
+    }
+
+
+def _ug_refs(cluster: resources.Cluster) -> Collection[str]:
+    if (
+        cluster.auth_mode != AuthMode.USER
+        or cluster.user_group_settings is None
+    ):
+        return set()
+    return {
+        ug.ref
+        for ug in cluster.user_group_settings
+        if ug.source_type == UserGroupSourceType.RESOURCE and ug.ref
+    }
+
+
+def _ynbool(value: bool) -> str:
+    """Convert a bool to an smb.conf compatible string."""
+    return 'Yes' if value else 'No'
+
+
+def _generate_share(
+    share: resources.Share, resolver: PathResolver, cephx_entity: str
+) -> Dict[str, Dict[str, str]]:
+    assert share.cephfs is not None
+    assert share.cephfs.provider == CephFSStorageProvider.SAMBA_VFS
+    assert cephx_entity, "cephx entity name missing"
+    # very annoyingly, samba's ceph module absolutely must NOT have the
+    # "client." bit in front. JJM has been tripped up by this multiple times -
+    # seemingly every time this module is touched.
+    _prefix = 'client.'
+    plen = len(_prefix)
+    if cephx_entity.startswith(_prefix):
+        cephx_entity = cephx_entity[plen:]
+    path = resolver.resolve(
+        share.cephfs.volume,
+        share.cephfs.subvolumegroup,
+        share.cephfs.subvolume,
+        share.cephfs.path,
+    )
+    return {
+        # smb.conf options
+        'options': {
+            'path': path,
+            "vfs objects": "ceph",
+            'ceph:config_file': '/etc/ceph/ceph.conf',
+            'ceph:filesystem': share.cephfs.volume,
+            'ceph:user_id': cephx_entity,
+            'read only': _ynbool(share.readonly),
+            'browseable': _ynbool(share.browseable),
+            'kernel share modes': 'no',
+            'x:ceph:id': f'{share.cluster_id}.{share.share_id}',
+        }
+    }
+
+
+def _generate_config(
+    cluster: resources.Cluster,
+    shares: Iterable[resources.Share],
+    resolver: PathResolver,
+    cephx_entity: str = "",
+) -> Dict[str, Any]:
+    cluster_global_opts = {}
+    if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
+        assert cluster.domain_settings is not None
+        cluster_global_opts['security'] = 'ads'
+        cluster_global_opts['realm'] = cluster.domain_settings.realm
+        # TODO: support alt. workgroup values
+        wg = cluster.domain_settings.realm.upper().split('.')[0]
+        cluster_global_opts['workgroup'] = wg
+        cluster_global_opts['idmap config * : backend'] = 'autorid'
+        cluster_global_opts['idmap config * : range'] = '2000-9999999'
+
+    share_configs = {
+        share.name: _generate_share(share, resolver, cephx_entity)
+        for share in shares
+    }
+
+    return {
+        'samba-container-config': 'v0',
+        'configs': {
+            cluster.cluster_id: {
+                'instance_name': cluster.cluster_id,
+                'instance_features': [],
+                'globals': ['default', cluster.cluster_id],
+                'shares': list(share_configs.keys()),
+            },
+        },
+        'globals': {
+            'default': {
+                'options': {
+                    'server min protocol': 'SMB2',
+                    'load printers': 'No',
+                    'printing': 'bsd',
+                    'printcap name': '/dev/null',
+                    'disable spoolss': 'Yes',
+                }
+            },
+            cluster.cluster_id: {
+                'options': cluster_global_opts,
+            },
+        },
+        'shares': share_configs,
+    }
+
+
+def _generate_smb_service_spec(
+    cluster: resources.Cluster,
+    *,
+    config_entries: List[ConfigEntry],
+    join_source_entries: List[ConfigEntry],
+    user_source_entries: List[ConfigEntry],
+    data_entity: str = '',
+) -> SMBSpec:
+    features = []
+    if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
+        features.append(_DOMAIN)
+    # only one config uri can be used, the input list should be
+    # ordered from lowest to highest priority and the highest priority
+    # item that exists in the store will be used.
+    config_uri = ''
+    for entry in config_entries:
+        if entry.exists():
+            config_uri = entry.uri
+    if not config_uri:
+        raise ValueError('no samba container configuration available')
+    # collect the the uris for the join sources
+    join_sources: List[str] = []
+    for entry in join_source_entries:
+        # if entry.exists():
+        join_sources.append(entry.uri)
+    # collect the uris for the user sources
+    user_sources: List[str] = []
+    for entry in user_source_entries:
+        user_sources.append(entry.uri)
+    user_entities: Optional[List[str]] = None
+    if data_entity:
+        user_entities = [data_entity]
+    return SMBSpec(
+        service_id=cluster.cluster_id,
+        placement=cluster.placement,
+        cluster_id=cluster.cluster_id,
+        features=features,
+        config_uri=config_uri,
+        join_sources=join_sources,
+        user_sources=user_sources,
+        custom_dns=cluster.custom_dns,
+        include_ceph_users=user_entities,
+    )
+
+
+def _swap_pending_cluster_info(
+    store: ConfigStore,
+    change_group: ClusterChangeGroup,
+    orch_needed: bool,
+) -> Simplified:
+    # TODO: its not just a  placeholder any more. rename the key func!
+    pentry = store[
+        external.cluster_placeholder_key(change_group.cluster.cluster_id)
+    ]
+    try:
+        existing = pentry.get()
+    except KeyError:
+        existing = {}
+    pentry.set(
+        {
+            'cluster_id': change_group.cluster.cluster_id,
+            'timestamp': int(time.time()),
+            'orch_needed': orch_needed,
+        }
+    )
+    change_group.cache_updated_entry(pentry)
+    return existing
+
+
+def _save_pending_join_auths(
+    store: ConfigStore,
+    change_group: ClusterChangeGroup,
+) -> None:
+    cluster = change_group.cluster
+    assert isinstance(cluster, resources.Cluster)
+    # save each join auth source in the priv store
+    if cluster.auth_mode != AuthMode.ACTIVE_DIRECTORY:
+        return
+    arefs = {j.auth_id: j for j in change_group.join_auths}
+    for idx, src in enumerate(checked(cluster.domain_settings).join_sources):
+        if src.source_type == JoinSourceType.RESOURCE:
+            javalues = checked(arefs[src.ref].auth)
+        elif src.source_type == JoinSourceType.PASSWORD:
+            javalues = checked(src.auth)
+        else:
+            raise ValueError(
+                f'unsupported join source type: {src.source_type}'
+            )
+        jentry = store[external.join_source_key(cluster.cluster_id, str(idx))]
+        jentry.set(javalues.to_simplified())
+        change_group.cache_updated_entry(jentry)
+
+
+def _save_pending_users_and_groups(
+    store: ConfigStore,
+    change_group: ClusterChangeGroup,
+) -> None:
+    cluster = change_group.cluster
+    assert isinstance(cluster, resources.Cluster)
+    # save each users-and-groups settings in the priv store
+    if cluster.auth_mode != AuthMode.USER:
+        return
+    augs = {ug.users_groups_id: ug for ug in change_group.users_and_groups}
+    for idx, ugsv in enumerate(checked(cluster.user_group_settings)):
+        if ugsv.source_type == UserGroupSourceType.RESOURCE:
+            ugvalues = augs[ugsv.ref].values
+            assert ugvalues
+        elif ugsv.source_type == UserGroupSourceType.INLINE:
+            ugvalues = ugsv.values
+            assert ugvalues
+        else:
+            raise ValueError(
+                f'unsupported users/groups source type: {ugsv.source_type}'
+            )
+        ugentry = store[
+            external.users_and_groups_key(cluster.cluster_id, str(idx))
+        ]
+        ugsimple = ugvalues.to_simplified()
+        ug_config: Simplified = {'samba-container-config': 'v0'}
+        if 'users' in ugsimple:
+            ug_config['users'] = {'all_entries': ugsimple['users']}
+        if 'groups' in ugsimple:
+            ug_config['groups'] = {'all_entries': ugsimple['groups']}
+        ugentry.set(ug_config)
+        change_group.cache_updated_entry(ugentry)
+
+
+def _save_pending_config(
+    store: ConfigStore,
+    change_group: ClusterChangeGroup,
+    resolver: PathResolver,
+    cephx_entity: str = "",
+) -> None:
+    assert isinstance(change_group.cluster, resources.Cluster)
+    # generate the cluster configuration and save it in the public store
+    cconfig = _generate_config(
+        change_group.cluster, change_group.shares, resolver, cephx_entity
+    )
+    centry = store[external.config_key(change_group.cluster.cluster_id)]
+    centry.set(cconfig)
+    change_group.cache_updated_entry(centry)
+
+
+def _save_pending_spec_backup(
+    store: ConfigStore, change_group: ClusterChangeGroup, smb_spec: SMBSpec
+) -> None:
+    ssentry = store[external.spec_backup_key(change_group.cluster.cluster_id)]
+    ssentry.set(smb_spec.to_json())
+    change_group.cache_updated_entry(ssentry)
+
+
+def _cephx_data_entity(cluster_id: str) -> str:
+    """Generate a name for the (default?) cephx key that a cluster (smbd) will
+    use for data access.
+    """
+    return f'client.smb.fs.cluster.{cluster_id}'