Optional,
Set,
Tuple,
+ Type,
Union,
cast,
)
)
from .internal import (
ClusterEntry,
+ CommonResourceEntry,
ExternalCephClusterEntry,
JoinAuthEntry,
ShareEntry,
TLSCredentialEntry,
UsersAndGroupsEntry,
+ map_resource_entry,
)
from .proto import (
AccessAuthorizer,
f'{txt!r} does not match a valid resource type'
)
+ def resources(self) -> List[Type[SMBResource]]:
+ return list(self._match_resources)
+
class ClusterConfigHandler:
"""The central class for ingesting and handling smb configuration change
def _search_resources(self, matcher: _Matcher) -> List[SMBResource]:
log.debug("performing search with matcher: %s", matcher)
out: List[SMBResource] = []
+ # clusters and shares (with parent-child relationship)
if resources.Cluster in matcher or resources.Share in matcher:
log.debug("searching for clusters and/or shares")
cluster_shares = self.share_ids_by_cluster()
cluster_id, share_id
).get_share()
)
- _resources = (
- (resources.JoinAuth, JoinAuthEntry),
- (resources.UsersAndGroups, UsersAndGroupsEntry),
- (resources.TLSCredential, TLSCredentialEntry),
- )
- for rtype, ecls in _resources:
- if rtype in matcher:
- log.debug("searching for %s", cast(Any, rtype).resource_type)
- out.extend(
- ecls.from_store(
- self.internal_store, rid
- ).get_resource_type(rtype)
- for rid in ecls.ids(self.internal_store)
- if (rtype, rid) in matcher
- )
+ # other common top-level resources
+ for rtype in matcher.resources():
+ if rtype in (resources.Cluster, resources.Share):
+ continue # already handled above
+ if rtype not in matcher:
+ continue
+ log.debug("searching for %s", cast(Any, rtype).resource_type)
+ ecls = map_resource_entry(rtype)
+ assert issubclass(ecls, CommonResourceEntry)
+ for rid in ecls.ids(self.internal_store):
+ if (rtype, rid) in matcher:
+ entry = ecls.from_store(self.internal_store, rid)
+ res = entry.get_resource_type(rtype)
+ out.append(cast(SMBResource, res))
log.debug("search found %d resources", len(out))
return out