)
import logging
+import operator
import time
from ceph.deployment.service_spec import SMBSpec
from .enums import (
AuthMode,
CephFSStorageProvider,
+ ConfigNS,
Intent,
JoinSourceType,
LoginAccess,
raise ErrorResult(
share, msg="path is not a valid directory in volume"
)
+ name_used_by = _share_name_in_use(staging, share)
+ if name_used_by:
+ raise ErrorResult(
+ share,
+ msg="share name already in use",
+ status={"conflicting_share_id": name_used_by},
+ )
+
+
+def _share_name_in_use(
+ staging: _Staging, share: resources.Share
+) -> Optional[str]:
+ """Returns the share_id value if the share's name is already in
+ use by a different share in the cluster. Returns None if no other
+ shares are using the name.
+ """
+ share_ids = (share.cluster_id, share.share_id)
+ share_ns = str(ConfigNS.SHARES)
+ # look for any duplicate names in the staging area.
+ # these items are already in memory
+ for ekey, res in staging.incoming.items():
+ if ekey[0] != share_ns:
+ continue # not a share
+ assert isinstance(res, resources.Share)
+ if (res.cluster_id, res.share_id) == share_ids:
+ continue # this share
+ if (res.cluster_id, res.name) == (share.cluster_id, share.name):
+ return res.share_id
+ # look for any duplicate names in the underyling store
+ found = config_store.find_in_store(
+ staging.destination_store,
+ share_ns,
+ {'cluster_id': share.cluster_id, 'name': share.name},
+ )
+ # remove any shares that are deleted in staging
+ found_curr = [
+ entry for entry in found if entry.full_key not in staging.deleted
+ ]
+ # remove self-share from list
+ id_pair = operator.itemgetter('cluster_id', 'share_id')
+ found_curr = [
+ entry for entry in found_curr if id_pair(entry.get()) != share_ids
+ ]
+ if not found_curr:
+ return None
+ if len(found_curr) != 1:
+ # this should not normally happen
+ log.warning(
+ 'multiple shares with one name in cluster: %s',
+ ' '.join(s.get()['share_id'] for s in found_curr),
+ )
+ return found_curr[0].get()['share_id']
def _check_join_auths(