"""Support for working with the internal data store and the strucutured
resources that the internal store holds.
"""
-from typing import Collection, Tuple, Type, TypeVar
+from typing import Collection, Tuple, Type, TypeVar, Union
from . import resources
from .enums import ConfigNS, State
ConfigStore,
ConfigStoreListing,
EntryKey,
+ ResourceKey,
Self,
Simplifiable,
)
T = TypeVar('T')
-def cluster_key(cluster_id: str) -> EntryKey:
- """Return store entry key for a cluster entry."""
- return str(ConfigNS.CLUSTERS), cluster_id
+class ResourceIDKey:
+ """A ResourceKey for resources with only one ID."""
+ def __init__(self, resource_id: str) -> None:
+ self.resource_id = resource_id
-def share_key(cluster_id: str, share_id: str) -> EntryKey:
- """Return store entry key for a share entry."""
- return str(ConfigNS.SHARES), f'{cluster_id}.{share_id}'
+ def __str__(self) -> str:
+ return self.resource_id
-def join_auth_key(auth_id: str) -> EntryKey:
- """Return store entry key for a join auth entry."""
- return str(ConfigNS.JOIN_AUTHS), auth_id
+class ChildResourceIDKey:
+ """A ResourceKey for resources with both a parent and child ID."""
+ def __init__(self, parent_id: str, resource_id: str) -> None:
+ self.parent_id = parent_id
+ self.resource_id = resource_id
+ self._key = f'{parent_id}.{resource_id}'
-def users_and_groups_key(users_groups_id: str) -> EntryKey:
- """Return store entry key for a users-and-groups entry."""
- return str(ConfigNS.USERS_AND_GROUPS), users_groups_id
-
-
-def resource_key(resource: SMBResource) -> EntryKey:
- """Return a store entry key for an smb resource object."""
- if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
- return cluster_key(resource.cluster_id)
- elif isinstance(resource, (resources.Share, resources.RemovedShare)):
- return share_key(resource.cluster_id, resource.share_id)
- elif isinstance(resource, resources.JoinAuth):
- return join_auth_key(resource.auth_id)
- elif isinstance(resource, resources.UsersAndGroups):
- return users_and_groups_key(resource.users_groups_id)
- raise TypeError('not a valid smb resource')
+ def __str__(self) -> str:
+ return self._key
class ResourceEntry:
def get_resource_type(self, cls: Type[T]) -> T:
obj = self.get()
- assert isinstance(obj, cls)
+ assert isinstance(obj, cls), f"{obj!r} is not a {cls}"
return obj
def set(self, resource: Simplifiable) -> None:
def remove(self) -> bool:
return self.config_entry.remove()
+ @classmethod
+ def ids(
+ cls, store: ConfigStoreListing
+ ) -> Union[Collection[str], Collection[Tuple[str, str]]]:
+ raise NotImplementedError()
-class ClusterEntry(ResourceEntry):
- """Cluster resource getter/setter for the smb internal data store(s)."""
+ @classmethod
+ def from_store_by_key(
+ cls, store: ConfigStore, key: Union[str, ResourceKey]
+ ) -> Self:
+ _key = str(key)
+ return cls(_key, store[str(cls.namespace), _key])
+
+ @classmethod
+ def to_key(cls, resource: SMBResource) -> ResourceKey:
+ raise NotImplementedError()
- namespace = ConfigNS.CLUSTERS
+
+class CommonResourceEntry(ResourceEntry):
+ """Common resource getter/setter for objects with a single ID value in the
+ smb internal data store(s).
+ """
@classmethod
- def from_store(cls, store: ConfigStore, cluster_id: str) -> Self:
- return cls(cluster_id, store[str(cls.namespace), cluster_id])
+ def from_store(cls, store: ConfigStore, resource_id: str) -> Self:
+ return cls.from_store_by_key(store, resource_id)
@classmethod
def ids(cls, store: ConfigStoreListing) -> Collection[str]:
return store.contents(str(cls.namespace))
+
+class ClusterEntry(CommonResourceEntry):
+ """Cluster resource getter/setter for the smb internal data store(s)."""
+
+ namespace = ConfigNS.CLUSTERS
+ _for_resource = (resources.Cluster, resources.RemovedCluster)
+
+ @classmethod
+ def to_key(cls, resource: SMBResource) -> ResourceKey:
+ assert isinstance(resource, cls._for_resource)
+ return ResourceIDKey(resource.cluster_id)
+
def get_cluster(self) -> resources.Cluster:
return self.get_resource_type(resources.Cluster)
"""Share resource getter/setter for the smb internal data store(s)."""
namespace = ConfigNS.SHARES
+ _for_resource = (resources.Share, resources.RemovedShare)
@classmethod
def from_store(
cls, store: ConfigStore, cluster_id: str, share_id: str
) -> Self:
- key = f'{cluster_id}.{share_id}'
- return cls(key, store[str(cls.namespace), key])
+ key = ChildResourceIDKey(cluster_id, share_id)
+ return cls.from_store_by_key(store, str(key))
@classmethod
def ids(cls, store: ConfigStoreListing) -> Collection[Tuple[str, str]]:
return [_split(k) for k in store.contents(str(cls.namespace))]
+ @classmethod
+ def to_key(cls, resource: SMBResource) -> ResourceKey:
+ assert isinstance(resource, cls._for_resource)
+ return ChildResourceIDKey(resource.cluster_id, resource.share_id)
+
def get_share(self) -> resources.Share:
return self.get_resource_type(resources.Share)
-class JoinAuthEntry(ResourceEntry):
+class JoinAuthEntry(CommonResourceEntry):
"""JoinAuth resource getter/setter for the smb internal data store(s)."""
namespace = ConfigNS.JOIN_AUTHS
+ _for_resource = resources.JoinAuth
@classmethod
- def from_store(cls, store: ConfigStore, auth_id: str) -> Self:
- return cls(auth_id, store[str(cls.namespace), auth_id])
-
- @classmethod
- def ids(cls, store: ConfigStoreListing) -> Collection[str]:
- return store.contents(str(cls.namespace))
+ def to_key(cls, resource: SMBResource) -> ResourceKey:
+ assert isinstance(resource, cls._for_resource)
+ return ResourceIDKey(resource.auth_id)
def get_join_auth(self) -> resources.JoinAuth:
return self.get_resource_type(resources.JoinAuth)
-class UsersAndGroupsEntry(ResourceEntry):
+class UsersAndGroupsEntry(CommonResourceEntry):
"""UsersAndGroupsEntry resource getter/setter for the smb internal data
store(s).
"""
namespace = ConfigNS.USERS_AND_GROUPS
+ _for_resource = resources.UsersAndGroups
@classmethod
- def from_store(cls, store: ConfigStore, auth_id: str) -> Self:
- return cls(auth_id, store[str(cls.namespace), auth_id])
-
- @classmethod
- def ids(cls, store: ConfigStoreListing) -> Collection[str]:
- return store.contents(str(cls.namespace))
+ def to_key(cls, resource: SMBResource) -> ResourceKey:
+ assert isinstance(resource, cls._for_resource)
+ return ResourceIDKey(resource.users_groups_id)
def get_users_and_groups(self) -> resources.UsersAndGroups:
return self.get_resource_type(resources.UsersAndGroups)
+def _map_resource_entry(
+ resource: Union[SMBResource, Type[SMBResource]]
+) -> Type[ResourceEntry]:
+ rcls = resource if isinstance(resource, type) else type(resource)
+ _map = {
+ resources.Cluster: ClusterEntry,
+ resources.RemovedCluster: ClusterEntry,
+ resources.Share: ShareEntry,
+ resources.RemovedShare: ShareEntry,
+ resources.JoinAuth: JoinAuthEntry,
+ resources.UsersAndGroups: UsersAndGroupsEntry,
+ }
+ try:
+ return _map[rcls]
+ except KeyError:
+ raise TypeError('not a valid smb resource')
+
+
def resource_entry(
store: ConfigStore, resource: SMBResource
) -> ResourceEntry:
"""Return a bound store entry object given a resource object."""
- if isinstance(resource, (resources.Cluster, resources.RemovedCluster)):
- return ClusterEntry.from_store(store, resource.cluster_id)
- elif isinstance(resource, (resources.Share, resources.RemovedShare)):
- return ShareEntry.from_store(
- store, resource.cluster_id, resource.share_id
- )
- elif isinstance(resource, resources.JoinAuth):
- return JoinAuthEntry.from_store(store, resource.auth_id)
- elif isinstance(resource, resources.UsersAndGroups):
- return UsersAndGroupsEntry.from_store(store, resource.users_groups_id)
- raise TypeError('not a valid smb resource')
+ entry_cls = _map_resource_entry(resource)
+ key = entry_cls.to_key(resource)
+ return entry_cls.from_store_by_key(store, key)
+
+
+def resource_key(resource: SMBResource) -> EntryKey:
+ """Return a store entry key for an smb resource object."""
+ entry_cls = _map_resource_entry(resource)
+ key = entry_cls.to_key(resource)
+ return str(entry_cls.namespace), str(key)
def _split(share_key: str) -> Tuple[str, str]: