password_filter_out=password_filter_out,
).squash(cluster)
- @SMBCLICommand('cluster rm', perm='rw')
+ @overload
def cluster_rm(
self,
cluster_id: str,
+ wildcard: Literal[False] = False,
+ recursive: bool = False,
password_filter: PasswordFilter = PasswordFilter.NONE,
) -> results.Result:
+ ...
+
+ @overload
+ def cluster_rm(
+ self,
+ cluster_id: str,
+ wildcard: Literal[True],
+ recursive: bool = False,
+ password_filter: PasswordFilter = PasswordFilter.NONE,
+ ) -> results.ResultGroup:
+ ...
+
+ @SMBCLICommand('cluster rm', perm='rw')
+ def cluster_rm(
+ self,
+ cluster_id: str,
+ wildcard: bool = False,
+ recursive: bool = False,
+ password_filter: PasswordFilter = PasswordFilter.NONE,
+ ) -> Union[results.Result, results.ResultGroup]:
"""Remove an smb cluster"""
+ if recursive or wildcard:
+ return self._cluster_multi_rm(
+ cluster_id,
+ password_filter=password_filter,
+ recursive=recursive,
+ wildcard=wildcard,
+ )
cluster = resources.RemovedCluster(cluster_id=cluster_id)
return self._apply_res(
[cluster], password_filter_out=password_filter
).one()
+ def _cluster_multi_rm(
+ self,
+ cluster_id: str,
+ password_filter: PasswordFilter = PasswordFilter.NONE,
+ recursive: bool = False,
+ wildcard: bool = False,
+ ) -> results.ResultGroup:
+ """Remove >=1 cluster and optionally its shares."""
+ if wildcard:
+ # a wildcard cluster search will give 0-N matching clusters
+ matches = self._handler.matching_resources(
+ [f'ceph.smb.cluster.{cluster_id}'],
+ wildcard=True,
+ )
+ clusters = [
+ r for r in matches if isinstance(r, resources.Cluster)
+ ]
+ shares = []
+ else:
+ # a non-wildcard cluster - ids are expected to be exact
+ # belonging to the cluster
+ mixed = self._handler.matching_resources(
+ [
+ f'ceph.smb.cluster.{cluster_id}',
+ f'ceph.smb.share.{cluster_id}',
+ ],
+ wildcard=False,
+ )
+ clusters = [r for r in mixed if isinstance(r, resources.Cluster)]
+ shares = [r for r in mixed if isinstance(r, resources.Share)]
+ if not clusters:
+ raise cli.NoMatchingValue(f'no clusters matching "{cluster_id}"')
+ # get shares belonging to the clusters matched during wildcarding
+ if recursive and wildcard:
+ for cluster in clusters:
+ shares.extend(
+ s
+ for s in self._handler.matching_resources(
+ [f'ceph.smb.share.{cluster.cluster_id}'],
+ wildcard=False,
+ )
+ if isinstance(s, resources.Share)
+ )
+ # assemble the Removed{Share,Cluster} resources to submit
+ rm_res: list[resources.SMBResource] = [
+ resources.RemovedShare(
+ cluster_id=s.cluster_id, share_id=s.share_id
+ )
+ for s in shares
+ ]
+ rm_res.extend(
+ resources.RemovedCluster(cluster_id=c.cluster_id)
+ for c in clusters
+ )
+ return self._apply_res(
+ rm_res,
+ password_filter_out=password_filter,
+ )
+
@SMBCLICommand('cluster update cephfs qos', perm='rw')
def cluster_update_qos(
self,