From: John Mulligan Date: Tue, 1 Jul 2025 15:28:02 +0000 (-0400) Subject: mgr/smb: reorganize internal.py to use fewer if-isinstance blocks X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c81a666fe071afb1ea8cc6adba20d09eb38cc9d6;p=ceph.git mgr/smb: reorganize internal.py to use fewer if-isinstance blocks While working on adding a new top-level resource type I realized that this code can be a bit error prone and un-DRY. Use a single method for mapping between resource classes and resource entry classes and refactor the rest of the module around that idea. Signed-off-by: John Mulligan --- diff --git a/src/pybind/mgr/smb/internal.py b/src/pybind/mgr/smb/internal.py index 57e7a0c0278a8..5846a4742df6e 100644 --- a/src/pybind/mgr/smb/internal.py +++ b/src/pybind/mgr/smb/internal.py @@ -1,7 +1,7 @@ """Support for working with the internal data store and the strucutured resources that the internal store holds. """ -from typing import Collection, Tuple, Type, TypeVar +from typing import Collection, Tuple, Type, TypeVar, Union from . import resources from .enums import ConfigNS, State @@ -10,6 +10,7 @@ from .proto import ( ConfigStore, ConfigStoreListing, EntryKey, + ResourceKey, Self, Simplifiable, ) @@ -19,37 +20,26 @@ from .utils import one T = TypeVar('T') -def cluster_key(cluster_id: str) -> EntryKey: - """Return store entry key for a cluster entry.""" - return str(ConfigNS.CLUSTERS), cluster_id +class ResourceIDKey: + """A ResourceKey for resources with only one ID.""" + def __init__(self, resource_id: str) -> None: + self.resource_id = resource_id -def share_key(cluster_id: str, share_id: str) -> EntryKey: - """Return store entry key for a share entry.""" - return str(ConfigNS.SHARES), f'{cluster_id}.{share_id}' + def __str__(self) -> str: + return self.resource_id -def join_auth_key(auth_id: str) -> EntryKey: - """Return store entry key for a join auth entry.""" - return str(ConfigNS.JOIN_AUTHS), auth_id +class ChildResourceIDKey: + """A ResourceKey for resources with both a parent and child ID.""" + def __init__(self, parent_id: str, resource_id: str) -> None: + self.parent_id = parent_id + self.resource_id = resource_id + self._key = f'{parent_id}.{resource_id}' -def users_and_groups_key(users_groups_id: str) -> EntryKey: - """Return store entry key for a users-and-groups entry.""" - return str(ConfigNS.USERS_AND_GROUPS), users_groups_id - - -def resource_key(resource: SMBResource) -> EntryKey: - """Return a store entry key for an smb resource object.""" - if isinstance(resource, (resources.Cluster, resources.RemovedCluster)): - return cluster_key(resource.cluster_id) - elif isinstance(resource, (resources.Share, resources.RemovedShare)): - return share_key(resource.cluster_id, resource.share_id) - elif isinstance(resource, resources.JoinAuth): - return join_auth_key(resource.auth_id) - elif isinstance(resource, resources.UsersAndGroups): - return users_and_groups_key(resource.users_groups_id) - raise TypeError('not a valid smb resource') + def __str__(self) -> str: + return self._key class ResourceEntry: @@ -70,7 +60,7 @@ class ResourceEntry: def get_resource_type(self, cls: Type[T]) -> T: obj = self.get() - assert isinstance(obj, cls) + assert isinstance(obj, cls), f"{obj!r} is not a {cls}" return obj def set(self, resource: Simplifiable) -> None: @@ -90,20 +80,49 @@ class ResourceEntry: def remove(self) -> bool: return self.config_entry.remove() + @classmethod + def ids( + cls, store: ConfigStoreListing + ) -> Union[Collection[str], Collection[Tuple[str, str]]]: + raise NotImplementedError() -class ClusterEntry(ResourceEntry): - """Cluster resource getter/setter for the smb internal data store(s).""" + @classmethod + def from_store_by_key( + cls, store: ConfigStore, key: Union[str, ResourceKey] + ) -> Self: + _key = str(key) + return cls(_key, store[str(cls.namespace), _key]) + + @classmethod + def to_key(cls, resource: SMBResource) -> ResourceKey: + raise NotImplementedError() - namespace = ConfigNS.CLUSTERS + +class CommonResourceEntry(ResourceEntry): + """Common resource getter/setter for objects with a single ID value in the + smb internal data store(s). + """ @classmethod - def from_store(cls, store: ConfigStore, cluster_id: str) -> Self: - return cls(cluster_id, store[str(cls.namespace), cluster_id]) + def from_store(cls, store: ConfigStore, resource_id: str) -> Self: + return cls.from_store_by_key(store, resource_id) @classmethod def ids(cls, store: ConfigStoreListing) -> Collection[str]: return store.contents(str(cls.namespace)) + +class ClusterEntry(CommonResourceEntry): + """Cluster resource getter/setter for the smb internal data store(s).""" + + namespace = ConfigNS.CLUSTERS + _for_resource = (resources.Cluster, resources.RemovedCluster) + + @classmethod + def to_key(cls, resource: SMBResource) -> ResourceKey: + assert isinstance(resource, cls._for_resource) + return ResourceIDKey(resource.cluster_id) + def get_cluster(self) -> resources.Cluster: return self.get_resource_type(resources.Cluster) @@ -112,73 +131,92 @@ class ShareEntry(ResourceEntry): """Share resource getter/setter for the smb internal data store(s).""" namespace = ConfigNS.SHARES + _for_resource = (resources.Share, resources.RemovedShare) @classmethod def from_store( cls, store: ConfigStore, cluster_id: str, share_id: str ) -> Self: - key = f'{cluster_id}.{share_id}' - return cls(key, store[str(cls.namespace), key]) + key = ChildResourceIDKey(cluster_id, share_id) + return cls.from_store_by_key(store, str(key)) @classmethod def ids(cls, store: ConfigStoreListing) -> Collection[Tuple[str, str]]: return [_split(k) for k in store.contents(str(cls.namespace))] + @classmethod + def to_key(cls, resource: SMBResource) -> ResourceKey: + assert isinstance(resource, cls._for_resource) + return ChildResourceIDKey(resource.cluster_id, resource.share_id) + def get_share(self) -> resources.Share: return self.get_resource_type(resources.Share) -class JoinAuthEntry(ResourceEntry): +class JoinAuthEntry(CommonResourceEntry): """JoinAuth resource getter/setter for the smb internal data store(s).""" namespace = ConfigNS.JOIN_AUTHS + _for_resource = resources.JoinAuth @classmethod - def from_store(cls, store: ConfigStore, auth_id: str) -> Self: - return cls(auth_id, store[str(cls.namespace), auth_id]) - - @classmethod - def ids(cls, store: ConfigStoreListing) -> Collection[str]: - return store.contents(str(cls.namespace)) + def to_key(cls, resource: SMBResource) -> ResourceKey: + assert isinstance(resource, cls._for_resource) + return ResourceIDKey(resource.auth_id) def get_join_auth(self) -> resources.JoinAuth: return self.get_resource_type(resources.JoinAuth) -class UsersAndGroupsEntry(ResourceEntry): +class UsersAndGroupsEntry(CommonResourceEntry): """UsersAndGroupsEntry resource getter/setter for the smb internal data store(s). """ namespace = ConfigNS.USERS_AND_GROUPS + _for_resource = resources.UsersAndGroups @classmethod - def from_store(cls, store: ConfigStore, auth_id: str) -> Self: - return cls(auth_id, store[str(cls.namespace), auth_id]) - - @classmethod - def ids(cls, store: ConfigStoreListing) -> Collection[str]: - return store.contents(str(cls.namespace)) + def to_key(cls, resource: SMBResource) -> ResourceKey: + assert isinstance(resource, cls._for_resource) + return ResourceIDKey(resource.users_groups_id) def get_users_and_groups(self) -> resources.UsersAndGroups: return self.get_resource_type(resources.UsersAndGroups) +def _map_resource_entry( + resource: Union[SMBResource, Type[SMBResource]] +) -> Type[ResourceEntry]: + rcls = resource if isinstance(resource, type) else type(resource) + _map = { + resources.Cluster: ClusterEntry, + resources.RemovedCluster: ClusterEntry, + resources.Share: ShareEntry, + resources.RemovedShare: ShareEntry, + resources.JoinAuth: JoinAuthEntry, + resources.UsersAndGroups: UsersAndGroupsEntry, + } + try: + return _map[rcls] + except KeyError: + raise TypeError('not a valid smb resource') + + def resource_entry( store: ConfigStore, resource: SMBResource ) -> ResourceEntry: """Return a bound store entry object given a resource object.""" - if isinstance(resource, (resources.Cluster, resources.RemovedCluster)): - return ClusterEntry.from_store(store, resource.cluster_id) - elif isinstance(resource, (resources.Share, resources.RemovedShare)): - return ShareEntry.from_store( - store, resource.cluster_id, resource.share_id - ) - elif isinstance(resource, resources.JoinAuth): - return JoinAuthEntry.from_store(store, resource.auth_id) - elif isinstance(resource, resources.UsersAndGroups): - return UsersAndGroupsEntry.from_store(store, resource.users_groups_id) - raise TypeError('not a valid smb resource') + entry_cls = _map_resource_entry(resource) + key = entry_cls.to_key(resource) + return entry_cls.from_store_by_key(store, key) + + +def resource_key(resource: SMBResource) -> EntryKey: + """Return a store entry key for an smb resource object.""" + entry_cls = _map_resource_entry(resource) + key = entry_cls.to_key(resource) + return str(entry_cls.namespace), str(key) def _split(share_key: str) -> Tuple[str, str]: