import errno
+import hashlib
import json
import logging
from typing import (
return path
+def validate_cephfs_path(mgr: 'Module', fs_name: str, path: str) -> None:
+ try:
+ cephfs_path_is_dir(mgr, fs_name, path)
+ except NotADirectoryError:
+ raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
+ except cephfs.ObjectNotFound:
+ raise NFSObjectNotFound(f"path {path} does not exist")
+ except cephfs.Error as e:
+ raise NFSException(e.args[1], -e.args[0])
+
+
+def _validate_cmount_path(mgr: 'Module', fs_name: str, cmount_path: str, path: str) -> None:
+ validate_cephfs_path(mgr, fs_name, cmount_path)
+ if cmount_path not in path:
+ raise ValueError(
+ f"Invalid cmount_path: '{cmount_path}'. The path '{path}' is not within the mount path. "
+ f"Please ensure that the cmount_path includes the specified path '{path}'. "
+ "It is allowed to be any complete path hierarchy between / and the EXPORT {path}."
+ )
+
+
class NFSRados:
def __init__(self, rados: 'Rados', namespace: str) -> None:
self.rados = rados
# do nothing; we're using the bucket owner creds.
pass
- def _create_export_user(self, export: Export) -> None:
- if isinstance(export.fsal, CephFSFSAL):
- fsal = cast(CephFSFSAL, export.fsal)
- assert fsal.fs_name
- fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
- fsal.cephx_key = self._create_user_key(
- export.cluster_id, fsal.user_id, export.path, fsal.fs_name
+ def _create_rgw_export_user(self, export: Export) -> None:
+ rgwfsal = cast(RGWFSAL, export.fsal)
+ if not rgwfsal.user_id:
+ assert export.path
+ ret, out, err = self.mgr.tool_exec(
+ ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
)
- log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
-
- elif isinstance(export.fsal, RGWFSAL):
- rgwfsal = cast(RGWFSAL, export.fsal)
- if not rgwfsal.user_id:
- assert export.path
- ret, out, err = self.mgr.tool_exec(
- ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
- )
- if ret:
- raise NFSException(f'Failed to fetch owner for bucket {export.path}')
- j = json.loads(out)
- owner = j.get('owner', '')
- rgwfsal.user_id = owner
- assert rgwfsal.user_id
- ret, out, err = self.mgr.tool_exec([
- 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
- ])
if ret:
- raise NFSException(
- f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
- )
+ raise NFSException(f'Failed to fetch owner for bucket {export.path}')
j = json.loads(out)
+ owner = j.get('owner', '')
+ rgwfsal.user_id = owner
+ assert rgwfsal.user_id
+ ret, out, err = self.mgr.tool_exec([
+ 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
+ ])
+ if ret:
+ raise NFSException(
+ f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
+ )
+ j = json.loads(out)
+
+ # FIXME: make this more tolerate of unexpected output?
+ rgwfsal.access_key_id = j['keys'][0]['access_key']
+ rgwfsal.secret_access_key = j['keys'][0]['secret_key']
+ log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
- # FIXME: make this more tolerate of unexpected output?
- rgwfsal.access_key_id = j['keys'][0]['access_key']
- rgwfsal.secret_access_key = j['keys'][0]['secret_key']
- log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
+ def _ensure_cephfs_export_user(self, export: Export) -> None:
+ fsal = cast(CephFSFSAL, export.fsal)
+ assert fsal.fs_name
+ assert fsal.cmount_path
+
+ fsal.user_id = f"nfs.{get_user_id(export.cluster_id, fsal.fs_name, fsal.cmount_path)}"
+ fsal.cephx_key = self._create_user_key(
+ export.cluster_id, fsal.user_id, fsal.cmount_path, fsal.fs_name
+ )
+ log.debug(f"Established user {fsal.user_id} for cephfs {fsal.fs_name}")
def _gen_export_id(self, cluster_id: str) -> int:
exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
export = self._fetch_export(cluster_id, pseudo_path)
if export:
+ exports_count = 0
+ if export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ exports_count = self.get_export_count_with_same_fsal(export.fsal.cmount_path, # type: ignore
+ cluster_id, export.fsal.fs_name) # type: ignore
+ if exports_count == 1:
+ self._delete_export_user(export)
if pseudo_path:
self._rados(cluster_id).remove_obj(
export_obj_name(export.export_id), conf_obj_name(cluster_id))
self.exports[cluster_id].remove(export)
- self._delete_export_user(export)
+ if export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ self._delete_export_user(export)
if not self.exports[cluster_id]:
del self.exports[cluster_id]
log.debug("Deleted all exports for cluster %s", cluster_id)
log.info("Export user updated %s", user_id)
- def _create_user_key(
- self,
- cluster_id: str,
- entity: str,
- path: str,
- fs_name: str,
- ) -> str:
- osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
- self.rados_pool, cluster_id, fs_name)
+ def _create_user_key(self, cluster_id: str, entity: str, path: str, fs_name: str) -> str:
+ osd_cap = f'allow rw pool={self.rados_pool} namespace={cluster_id}, allow rw tag cephfs data={fs_name}'
nfs_caps = [
'mon', 'allow r',
'osd', osd_cap,
- 'mds', 'allow rw path={}'.format(path)
+ 'mds', f'allow rw path={path}'
]
ret, out, err = self.mgr.mon_command({
'prefix': 'auth get-or-create',
- 'entity': 'client.{}'.format(entity),
+ 'entity': f'client.{entity}',
'caps': nfs_caps,
'format': 'json',
})
if ret == -errno.EINVAL and 'does not match' in err:
ret, out, err = self.mgr.mon_command({
'prefix': 'auth caps',
- 'entity': 'client.{}'.format(entity),
+ 'entity': f'client.{entity}',
'caps': nfs_caps,
'format': 'json',
})
raise NFSException(f'Failed to update caps for {entity}: {err}')
ret, out, err = self.mgr.mon_command({
'prefix': 'auth get',
- 'entity': 'client.{}'.format(entity),
+ 'entity': f'client.{entity}',
'format': 'json',
})
if err:
raise NFSException(f'Failed to fetch caps for {entity}: {err}')
json_res = json.loads(out)
- log.info("Export user created is %s", json_res[0]['entity'])
+ log.info(f"Export user created is {json_res[0]['entity']}")
return json_res[0]['key']
def create_export_from_dict(self,
if not check_fs(self.mgr, fs_name):
raise FSNotFound(fs_name)
- user_id = f"nfs.{cluster_id}.{ex_id}"
+ validate_cephfs_path(self.mgr, fs_name, path)
+ if fsal["cmount_path"] != "/":
+ _validate_cmount_path(self.mgr, fsal["cmount_path"], path) # type: ignore
+
+ user_id = f"nfs.{get_user_id(cluster_id, fs_name, fsal['cmount_path'])}"
if "user_id" in fsal and fsal["user_id"] != user_id:
raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
else:
squash: str,
access_type: str,
clients: list = [],
- sectype: Optional[List[str]] = None) -> Dict[str, Any]:
+ sectype: Optional[List[str]] = None,
+ cmount_path: Optional[str] = "/") -> Dict[str, Any]:
- try:
- cephfs_path_is_dir(self.mgr, fs_name, path)
- except NotADirectoryError:
- raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
- except cephfs.ObjectNotFound:
- raise NFSObjectNotFound(f"path {path} does not exist")
- except cephfs.Error as e:
- raise NFSException(e.args[1], -e.args[0])
+ validate_cephfs_path(self.mgr, fs_name, path)
+ if cmount_path != "/":
+ _validate_cmount_path(self.mgr, cmount_path, path) # type: ignore
pseudo_path = normalize_path(pseudo_path)
"squash": squash,
"fsal": {
"name": NFS_GANESHA_SUPPORTED_FSALS[0],
+ "cmount_path": cmount_path,
"fs_name": fs_name,
},
"clients": clients,
}
)
log.debug("creating cephfs export %s", export)
- self._create_export_user(export)
+ self._ensure_cephfs_export_user(export)
self._save_export(cluster_id, export)
result = {
"bind": export.pseudo,
}
)
log.debug("creating rgw export %s", export)
- self._create_export_user(export)
+ self._create_rgw_export_user(export)
self._save_export(cluster_id, export)
result = {
"bind": export.pseudo,
log.debug("export %s pseudo %s -> %s",
old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
+ fsal_dict = new_export_dict.get('fsal')
+ if fsal_dict and fsal_dict['name'] == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ # Ensure cmount_path is present in CephFS FSAL block
+ if not fsal_dict.get('cmount_path'):
+ if old_export:
+ new_export_dict['fsal']['cmount_path'] = old_export.fsal.cmount_path
+ else:
+ new_export_dict['fsal']['cmount_path'] = '/'
+
new_export = self.create_export_from_dict(
cluster_id,
new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
)
if not old_export:
- self._create_export_user(new_export)
+ if new_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: # only for RGW
+ self._create_rgw_export_user(new_export)
self._save_export(cluster_id, new_export)
return {"pseudo": new_export.pseudo, "state": "added"}
if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
old_fsal = cast(CephFSFSAL, old_export.fsal)
new_fsal = cast(CephFSFSAL, new_export.fsal)
- if old_fsal.user_id != new_fsal.user_id:
- self._delete_export_user(old_export)
- self._create_export_user(new_export)
- elif (
- old_export.path != new_export.path
- or old_fsal.fs_name != new_fsal.fs_name
- ):
- self._update_user_id(
- cluster_id,
- new_export.path,
- cast(str, new_fsal.fs_name),
- cast(str, new_fsal.user_id)
- )
- new_fsal.cephx_key = old_fsal.cephx_key
- else:
- expected_mds_caps = 'allow rw path={}'.format(new_export.path)
- entity = new_fsal.user_id
- ret, out, err = self.mgr.mon_command({
- 'prefix': 'auth get',
- 'entity': 'client.{}'.format(entity),
- 'format': 'json',
- })
- if ret:
- raise NFSException(f'Failed to fetch caps for {entity}: {err}')
- actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
- if actual_mds_caps != expected_mds_caps:
- self._update_user_id(
- cluster_id,
- new_export.path,
- cast(str, new_fsal.fs_name),
- cast(str, new_fsal.user_id)
- )
- elif old_export.pseudo == new_export.pseudo:
- need_nfs_service_restart = False
- new_fsal.cephx_key = old_fsal.cephx_key
+ self._ensure_cephfs_export_user(new_export)
+ need_nfs_service_restart = not (old_fsal.user_id == new_fsal.user_id
+ and old_fsal.fs_name == new_fsal.fs_name
+ and old_export.path == new_export.path
+ and old_export.pseudo == new_export.pseudo)
if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
self._delete_export_user(old_export)
- self._create_export_user(new_export)
+ self._create_rgw_export_user(new_export)
elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
raise NFSInvalidOperation('access_key_id change is not allowed')
elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
def _rados(self, cluster_id: str) -> NFSRados:
"""Return a new NFSRados object for the given cluster id."""
return NFSRados(self.mgr.rados, cluster_id)
+
+ def get_export_count_with_same_fsal(self, cmount_path: str, cluster_id: str, fs_name: str) -> int:
+ exports = self.list_exports(cluster_id, detailed=True)
+ exports_count = 0
+ for export in exports:
+ if export['fsal']['name'] == 'CEPH' and export['fsal']['cmount_path'] == cmount_path and export['fsal']['fs_name'] == fs_name:
+ exports_count += 1
+ return exports_count
+
+
+def get_user_id(cluster_id: str, fs_name: str, cmount_path: str) -> str:
+ """
+ Generates a unique ID based on the input parameters using SHA-1.
+
+ :param cluster_id: String representing the cluster ID.
+ :param fs_name: String representing the file system name.
+ :param cmount_path: String representing the complicated mount path.
+ :return: A unique ID in the format 'cluster_id.fs_name.<hash>'.
+ """
+ input_string = f"{cluster_id}:{fs_name}:{cmount_path}"
+ hash_hex = hashlib.sha1(input_string.encode('utf-8')).hexdigest()
+ unique_id = f"{cluster_id}.{fs_name}.{hash_hex[:8]}" # Use the first 8 characters of the hash
+
+ return unique_id