]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: reorganize internal.py to use fewer if-isinstance blocks
authorJohn Mulligan <jmulligan@redhat.com>
Tue, 1 Jul 2025 15:28:02 +0000 (11:28 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Wed, 2 Jul 2025 17:00:16 +0000 (13:00 -0400)
While working on adding a new top-level resource type I realized that
this code can be a bit error prone and un-DRY. Use a single method for
mapping between resource classes and resource entry classes and refactor
the rest of the module around that idea.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/internal.py

index 57e7a0c0278a801b85ea66e1aca6edb3493a84e6..5846a4742df6e747a01179ca313bf98d160cccbd 100644 (file)
@@ -1,7 +1,7 @@
 """Support for working with the internal data store and the strucutured
 resources that the internal store holds.
 """
-from typing import Collection, Tuple, Type, TypeVar
+from typing import Collection, Tuple, Type, TypeVar, Union
 
 from . import resources
 from .enums import ConfigNS, State
@@ -10,6 +10,7 @@ from .proto import (
     ConfigStore,
     ConfigStoreListing,
     EntryKey,
+    ResourceKey,
     Self,
     Simplifiable,
 )
@@ -19,37 +20,26 @@ from .utils import one
 T = TypeVar('T')
 
 
-def cluster_key(cluster_id: str) -> EntryKey:
-    """Return store entry key for a cluster entry."""
-    return str(ConfigNS.CLUSTERS), cluster_id
+class ResourceIDKey:
+    """A ResourceKey for resources with only one ID."""
 
+    def __init__(self, resource_id: str) -> None:
+        self.resource_id = resource_id
 
-def share_key(cluster_id: str, share_id: str) -> EntryKey:
-    """Return store entry key for a share entry."""
-    return str(ConfigNS.SHARES), f'{cluster_id}.{share_id}'
+    def __str__(self) -> str:
+        return self.resource_id
 
 
-def join_auth_key(auth_id: str) -> EntryKey:
-    """Return store entry key for a join auth entry."""
-    return str(ConfigNS.JOIN_AUTHS), auth_id
+class ChildResourceIDKey:
+    """A ResourceKey for resources with both a parent and child ID."""
 
+    def __init__(self, parent_id: str, resource_id: str) -> None:
+        self.parent_id = parent_id
+        self.resource_id = resource_id
+        self._key = f'{parent_id}.{resource_id}'
 
-def users_and_groups_key(users_groups_id: str) -> EntryKey:
-    """Return store entry key for a users-and-groups entry."""
-    return str(ConfigNS.USERS_AND_GROUPS), users_groups_id
-
-
-def resource_key(resource: SMBResource) -> EntryKey:
-    """Return a store entry key for an smb resource object."""
-    if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
-        return cluster_key(resource.cluster_id)
-    elif isinstance(resource, (resources.Share, resources.RemovedShare)):
-        return share_key(resource.cluster_id, resource.share_id)
-    elif isinstance(resource, resources.JoinAuth):
-        return join_auth_key(resource.auth_id)
-    elif isinstance(resource, resources.UsersAndGroups):
-        return users_and_groups_key(resource.users_groups_id)
-    raise TypeError('not a valid smb resource')
+    def __str__(self) -> str:
+        return self._key
 
 
 class ResourceEntry:
@@ -70,7 +60,7 @@ class ResourceEntry:
 
     def get_resource_type(self, cls: Type[T]) -> T:
         obj = self.get()
-        assert isinstance(obj, cls)
+        assert isinstance(obj, cls), f"{obj!r} is not a {cls}"
         return obj
 
     def set(self, resource: Simplifiable) -> None:
@@ -90,20 +80,49 @@ class ResourceEntry:
     def remove(self) -> bool:
         return self.config_entry.remove()
 
+    @classmethod
+    def ids(
+        cls, store: ConfigStoreListing
+    ) -> Union[Collection[str], Collection[Tuple[str, str]]]:
+        raise NotImplementedError()
 
-class ClusterEntry(ResourceEntry):
-    """Cluster resource getter/setter for the smb internal data store(s)."""
+    @classmethod
+    def from_store_by_key(
+        cls, store: ConfigStore, key: Union[str, ResourceKey]
+    ) -> Self:
+        _key = str(key)
+        return cls(_key, store[str(cls.namespace), _key])
+
+    @classmethod
+    def to_key(cls, resource: SMBResource) -> ResourceKey:
+        raise NotImplementedError()
 
-    namespace = ConfigNS.CLUSTERS
+
+class CommonResourceEntry(ResourceEntry):
+    """Common resource getter/setter for objects with a single ID value in the
+    smb internal data store(s).
+    """
 
     @classmethod
-    def from_store(cls, store: ConfigStore, cluster_id: str) -> Self:
-        return cls(cluster_id, store[str(cls.namespace), cluster_id])
+    def from_store(cls, store: ConfigStore, resource_id: str) -> Self:
+        return cls.from_store_by_key(store, resource_id)
 
     @classmethod
     def ids(cls, store: ConfigStoreListing) -> Collection[str]:
         return store.contents(str(cls.namespace))
 
+
+class ClusterEntry(CommonResourceEntry):
+    """Cluster resource getter/setter for the smb internal data store(s)."""
+
+    namespace = ConfigNS.CLUSTERS
+    _for_resource = (resources.Cluster, resources.RemovedCluster)
+
+    @classmethod
+    def to_key(cls, resource: SMBResource) -> ResourceKey:
+        assert isinstance(resource, cls._for_resource)
+        return ResourceIDKey(resource.cluster_id)
+
     def get_cluster(self) -> resources.Cluster:
         return self.get_resource_type(resources.Cluster)
 
@@ -112,73 +131,92 @@ class ShareEntry(ResourceEntry):
     """Share resource getter/setter for the smb internal data store(s)."""
 
     namespace = ConfigNS.SHARES
+    _for_resource = (resources.Share, resources.RemovedShare)
 
     @classmethod
     def from_store(
         cls, store: ConfigStore, cluster_id: str, share_id: str
     ) -> Self:
-        key = f'{cluster_id}.{share_id}'
-        return cls(key, store[str(cls.namespace), key])
+        key = ChildResourceIDKey(cluster_id, share_id)
+        return cls.from_store_by_key(store, str(key))
 
     @classmethod
     def ids(cls, store: ConfigStoreListing) -> Collection[Tuple[str, str]]:
         return [_split(k) for k in store.contents(str(cls.namespace))]
 
+    @classmethod
+    def to_key(cls, resource: SMBResource) -> ResourceKey:
+        assert isinstance(resource, cls._for_resource)
+        return ChildResourceIDKey(resource.cluster_id, resource.share_id)
+
     def get_share(self) -> resources.Share:
         return self.get_resource_type(resources.Share)
 
 
-class JoinAuthEntry(ResourceEntry):
+class JoinAuthEntry(CommonResourceEntry):
     """JoinAuth resource getter/setter for the smb internal data store(s)."""
 
     namespace = ConfigNS.JOIN_AUTHS
+    _for_resource = resources.JoinAuth
 
     @classmethod
-    def from_store(cls, store: ConfigStore, auth_id: str) -> Self:
-        return cls(auth_id, store[str(cls.namespace), auth_id])
-
-    @classmethod
-    def ids(cls, store: ConfigStoreListing) -> Collection[str]:
-        return store.contents(str(cls.namespace))
+    def to_key(cls, resource: SMBResource) -> ResourceKey:
+        assert isinstance(resource, cls._for_resource)
+        return ResourceIDKey(resource.auth_id)
 
     def get_join_auth(self) -> resources.JoinAuth:
         return self.get_resource_type(resources.JoinAuth)
 
 
-class UsersAndGroupsEntry(ResourceEntry):
+class UsersAndGroupsEntry(CommonResourceEntry):
     """UsersAndGroupsEntry resource getter/setter for the smb internal data
     store(s).
     """
 
     namespace = ConfigNS.USERS_AND_GROUPS
+    _for_resource = resources.UsersAndGroups
 
     @classmethod
-    def from_store(cls, store: ConfigStore, auth_id: str) -> Self:
-        return cls(auth_id, store[str(cls.namespace), auth_id])
-
-    @classmethod
-    def ids(cls, store: ConfigStoreListing) -> Collection[str]:
-        return store.contents(str(cls.namespace))
+    def to_key(cls, resource: SMBResource) -> ResourceKey:
+        assert isinstance(resource, cls._for_resource)
+        return ResourceIDKey(resource.users_groups_id)
 
     def get_users_and_groups(self) -> resources.UsersAndGroups:
         return self.get_resource_type(resources.UsersAndGroups)
 
 
+def _map_resource_entry(
+    resource: Union[SMBResource, Type[SMBResource]]
+) -> Type[ResourceEntry]:
+    rcls = resource if isinstance(resource, type) else type(resource)
+    _map = {
+        resources.Cluster: ClusterEntry,
+        resources.RemovedCluster: ClusterEntry,
+        resources.Share: ShareEntry,
+        resources.RemovedShare: ShareEntry,
+        resources.JoinAuth: JoinAuthEntry,
+        resources.UsersAndGroups: UsersAndGroupsEntry,
+    }
+    try:
+        return _map[rcls]
+    except KeyError:
+        raise TypeError('not a valid smb resource')
+
+
 def resource_entry(
     store: ConfigStore, resource: SMBResource
 ) -> ResourceEntry:
     """Return a bound store entry object given a resource object."""
-    if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
-        return ClusterEntry.from_store(store, resource.cluster_id)
-    elif isinstance(resource, (resources.Share, resources.RemovedShare)):
-        return ShareEntry.from_store(
-            store, resource.cluster_id, resource.share_id
-        )
-    elif isinstance(resource, resources.JoinAuth):
-        return JoinAuthEntry.from_store(store, resource.auth_id)
-    elif isinstance(resource, resources.UsersAndGroups):
-        return UsersAndGroupsEntry.from_store(store, resource.users_groups_id)
-    raise TypeError('not a valid smb resource')
+    entry_cls = _map_resource_entry(resource)
+    key = entry_cls.to_key(resource)
+    return entry_cls.from_store_by_key(store, key)
+
+
+def resource_key(resource: SMBResource) -> EntryKey:
+    """Return a store entry key for an smb resource object."""
+    entry_cls = _map_resource_entry(resource)
+    key = entry_cls.to_key(resource)
+    return str(entry_cls.namespace), str(key)
 
 
 def _split(share_key: str) -> Tuple[str, str]: