From 19b09573d7276dbbe80c31bf34a353684045573d Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Fri, 3 Oct 2025 13:20:39 +0530 Subject: [PATCH] pybind/nfs: allow passing in ceph section during export apply (json only) Signed-off-by: Venky Shankar --- src/pybind/mgr/nfs/export.py | 267 ++++++++++++++++++++--------- src/pybind/mgr/nfs/ganesha_conf.py | 38 +++- 2 files changed, 222 insertions(+), 83 deletions(-) diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py index aff6779bb16..1f0d4b26132 100644 --- a/src/pybind/mgr/nfs/export.py +++ b/src/pybind/mgr/nfs/export.py @@ -28,6 +28,7 @@ from .ganesha_conf import ( GaneshaConfParser, RGWFSAL, RawBlock, + CephBlock, format_block) from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound from .utils import ( @@ -216,24 +217,75 @@ class AppliedExportResults: " to be created/updated" return self.status +class GaneshaExport: + # currently, EXPORT and CEPH block. + def __init__(self, + export: Export, + ceph_block: Optional[CephBlock] = None) -> None: + self.export = export + self.ceph_block = ceph_block + + # frequently uesd properties so that much of the code that now + # has moved to using this class can still continue to acess via + # export.{path,pseudo,...}. + @property + def path(self): + return self.export.path + + @property + def pseudo(self): + return self.export.pseudo + + @property + def export_id(self): + return self.export.export_id + + @property + def cluster_id(self): + return self.export.cluster_id + + @property + def fsal(self): + return self.export.fsal + + def to_dict(self, full=False) -> Dict[str, Any]: + export_dict = self.export.to_dict() + if not full or not self.ceph_block: + return export_dict + ge_dict = { + 'export': export_dict, + 'ceph': self.ceph_block.to_dict() + } + return ge_dict + + def to_export_block(self): + block_str = format_block(self.export.to_export_block()) + if self.ceph_block: + block_str += format_block(self.ceph_block.to_ceph_block()) + return block_str + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, GaneshaExport): + return False + return self.to_dict(full=true) == other.to_dict(full=true) class ExportMgr: def __init__( self, mgr: 'Module', - export_ls: Optional[Dict[str, List[Export]]] = None + export_ls: Optional[Dict[str, List[GaneshaExport]]] = None ) -> None: self.mgr = mgr self.rados_pool = POOL_NAME - self._exports: Optional[Dict[str, List[Export]]] = export_ls + self._exports: Optional[Dict[str, List[GaneshaExport]]] = export_ls @property - def exports(self) -> Dict[str, List[Export]]: + def exports(self) -> Dict[str, List[GaneshaExport]]: if self._exports is None: self._exports = {} log.info("Begin export parsing") for cluster_id in known_cluster_ids(self.mgr): - self.export_conf_objs = [] # type: List[Export] + self.export_conf_objs = [] # type: List[GaneshaExport] self._read_raw_config(cluster_id) self._exports[cluster_id] = self.export_conf_objs log.info("Exports parsed successfully %s", self.exports.items()) @@ -243,7 +295,7 @@ class ExportMgr: self, cluster_id: str, pseudo_path: str - ) -> Optional[Export]: + ) -> Optional[GaneshaExport]: try: for ex in self.exports[cluster_id]: if ex.pseudo == pseudo_path: @@ -257,7 +309,7 @@ class ExportMgr: self, cluster_id: str, export_id: int - ) -> Optional[Export]: + ) -> Optional[GaneshaExport]: try: for ex in self.exports[cluster_id]: if ex.export_id == export_id: @@ -267,27 +319,27 @@ class ExportMgr: log.info(f'no exports for cluster {cluster_id}') return None - def _delete_export_user(self, export: Export) -> None: - if isinstance(export.fsal, CephFSFSAL): - assert export.fsal.user_id + def _delete_export_user(self, ganesha_export: GaneshaExport) -> None: + if isinstance(ganesha_export.fsal, CephFSFSAL): + assert ganesha_export.fsal.user_id self.mgr.check_mon_command({ 'prefix': 'auth rm', - 'entity': 'client.{}'.format(export.fsal.user_id), + 'entity': 'client.{}'.format(ganesha_export.fsal.user_id), }) log.info("Deleted export user %s", export.fsal.user_id) - elif isinstance(export.fsal, RGWFSAL): + elif isinstance(ganesha_export.fsal, RGWFSAL): # do nothing; we're using the bucket owner creds. pass - def _create_rgw_export_user(self, export: Export) -> None: - rgwfsal = cast(RGWFSAL, export.fsal) + def _create_rgw_export_user(self, ganesha_export: GaneshaExport) -> None: + rgwfsal = cast(RGWFSAL, ganesha_export.fsal) if not rgwfsal.user_id: - assert export.path + assert ganesha_export.path ret, out, err = self.mgr.tool_exec( - ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path] + ['radosgw-admin', 'bucket', 'stats', '--bucket', ganesha_export.path] ) if ret: - raise NFSException(f'Failed to fetch owner for bucket {export.path}') + raise NFSException(f'Failed to fetch owner for bucket {ganesha_export.path}') j = json.loads(out) owner = j.get('owner', '') rgwfsal.user_id = owner @@ -297,23 +349,23 @@ class ExportMgr: ]) if ret: raise NFSException( - f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}' + f'Failed to fetch key for bucket {ganesha_export.path} owner {rgwfsal.user_id}' ) j = json.loads(out) # FIXME: make this more tolerate of unexpected output? rgwfsal.access_key_id = j['keys'][0]['access_key'] rgwfsal.secret_access_key = j['keys'][0]['secret_key'] - log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path) + log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, ganesha_export.path) - def _ensure_cephfs_export_user(self, export: Export) -> None: - fsal = cast(CephFSFSAL, export.fsal) + def _ensure_cephfs_export_user(self, ganesha_export: GaneshaExport) -> None: + fsal = cast(CephFSFSAL, ganesha_export.fsal) assert fsal.fs_name assert fsal.cmount_path - fsal.user_id = f"nfs.{get_user_id(export.cluster_id, fsal.fs_name, fsal.cmount_path)}" + fsal.user_id = f"nfs.{get_user_id(ganesha_export.cluster_id, fsal.fs_name, fsal.cmount_path)}" fsal.cephx_key = self._create_user_key( - export.cluster_id, fsal.user_id, fsal.cmount_path, fsal.fs_name + ganesha_export.cluster_id, fsal.user_id, fsal.cmount_path, fsal.fs_name ) log.debug(f"Established user {fsal.user_id} for cephfs {fsal.fs_name}") @@ -327,6 +379,9 @@ class ExportMgr: break return nid + def _has_ceph_block(raw_config_parsed: List) -> bool: + return len(raw_config_parsed) > 1 + def _read_raw_config(self, rados_namespace: str) -> None: with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: ioctx.set_namespace(rados_namespace) @@ -338,43 +393,58 @@ class ExportMgr: log.debug("read export configuration from rados " "object %s/%s/%s", self.rados_pool, rados_namespace, obj.key) - self.export_conf_objs.append(Export.from_export_block( - GaneshaConfParser(raw_config).parse()[0], rados_namespace)) - - def _save_export(self, cluster_id: str, export: Export) -> None: - self.exports[cluster_id].append(export) + log.debug(f'raw_config: {raw_config}') + raw_config_parsed = GaneshaConfParser(raw_config).parse() + log.debug(f'raw_config_parsed: {raw_config_parsed}') + export_block = raw_config_parsed[0] + # do we have a ceph block? + if _has_ceph_block(raw_config_parsed): + ceph_block = raw_config_parsed[1] + self.export_conf_objs.append( + GaneshaExport(Export.from_export_block(export_block, rados_namespace), + CephBlock.from_ceph_block(ceph_block))) + else: + self.export_conf_objs.append( + GaneshaExport(Export.from_export_block(export_block, rados_namespace))) + + def _save_export(self, cluster_id: str, ganesha_export: GaneshaExport) -> None: + log.debug('in _save_export') + self.exports[cluster_id].append(ganesha_export) + block_str = ganesha_export.to_export_block() + log.debug(f'_save_export block_str: {block_str}') self._rados(cluster_id).write_obj( - format_block(export.to_export_block()), - export_obj_name(export.export_id), - conf_obj_name(export.cluster_id) + block_str, + export_obj_name(ganesha_export.export_id), + conf_obj_name(ganesha_export.cluster_id) ) def _delete_export( self, cluster_id: str, pseudo_path: Optional[str], - export_obj: Optional[Export] = None + ganesha_export_obj: Optional[GaneshaExport] = None ) -> None: try: - if export_obj: - export: Optional[Export] = export_obj + if ganesha_export_obj: + ganesha_export: Optional[GaneshaExport] = ganesha_export_obj else: assert pseudo_path - export = self._fetch_export(cluster_id, pseudo_path) + ganesha_export = self._fetch_export(cluster_id, pseudo_path) - if export: + if ganesha_export: exports_count = 0 - if export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: - exports_count = self.get_export_count_with_same_fsal(export.fsal.cmount_path, # type: ignore - cluster_id, export.fsal.fs_name) # type: ignore + if ganesha_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: + exports_count = self.get_export_count_with_same_fsal( + ganesha_export.fsal.cmount_path, # type: ignore + cluster_id, ganesha_export.fsal.fs_name) # type: ignore if exports_count == 1: - self._delete_export_user(export) + self._delete_export_user(ganesha_export) if pseudo_path: self._rados(cluster_id).remove_obj( - export_obj_name(export.export_id), conf_obj_name(cluster_id)) - self.exports[cluster_id].remove(export) - if export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: - self._delete_export_user(export) + export_obj_name(ganesha_export.export_id), conf_obj_name(cluster_id)) + self.exports[cluster_id].remove(ganesha_export) + if ganesha_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: + self._delete_export_user(ganehsa_export) if not self.exports[cluster_id]: del self.exports[cluster_id] log.debug("Deleted all exports for cluster %s", cluster_id) @@ -388,26 +458,36 @@ class ExportMgr: try: with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: ioctx.set_namespace(cluster_id) - export = Export.from_export_block( - GaneshaConfParser( - ioctx.read(export_obj_name(ex_id)).decode("utf-8") - ).parse()[0], - cluster_id - ) + raw_config = ioctx.read(export_obj_name(ex_id)).decode("utf-8") + log.debug(f'raw_config: {raw_config}') + raw_config_parsed = GaneshaConfParser(raw_config).parse() + log.debug(f'raw_config_parsed: {raw_config_parsed}') + export_block = raw_config_parsed[0] + # do we have a ceph block? + if _has_ceph_block(raw_config_parsed): + ceph_block = raw_config_parsed[1] + export = GaneshaExport(Export.from_export_block(export_block, cluster_id), + CephBlock.from_ceph_block(ceph_block)) + else: + export = GaneshaExport(Export.from_export_block(export_block, cluster_id)) + log.debug(f'export: {export}') return export except ObjectNotFound: log.exception("Export ID: %s not found", ex_id) return None - def _update_export(self, cluster_id: str, export: Export, + def _update_export(self, cluster_id: str, ganesha_export: GaneshaExport, need_nfs_service_restart: bool) -> None: - self.exports[cluster_id].append(export) + log.debug(f'in _update_export: service restart: {need_nfs_service_restart}') + self.exports[cluster_id].append(ganesha_export) + block_str = ganesha_export.to_export_block() + log.debug(f'_update_export block_str: {block_str}') self._rados(cluster_id).update_obj( - format_block(export.to_export_block()), - export_obj_name(export.export_id), conf_obj_name(export.cluster_id), + block_str, + export_obj_name(ganesha_export.export_id), conf_obj_name(ganesha_export.cluster_id), should_notify=not need_nfs_service_restart) if need_nfs_service_restart: - restart_nfs_service(self.mgr, export.cluster_id) + restart_nfs_service(self.mgr, ganesha_export.cluster_id) def _validate_cluster_id(self, cluster_id: str) -> None: """Raise an exception if cluster_id is not valid.""" @@ -461,22 +541,22 @@ class ExportMgr: def delete_all_exports(self, cluster_id: str) -> None: try: - export_list = list(self.exports[cluster_id]) + ganesha_export_list = list(self.exports[cluster_id]) except KeyError: log.info("No exports to delete") return - for export in export_list: + for ganesha_export in ganesha_export_list: try: self._delete_export(cluster_id=cluster_id, pseudo_path=None, - export_obj=export) + ganesha_export_obj=ganesha_export) except Exception as e: - raise NFSException(f"Failed to delete export {export.export_id}: {e}") + raise NFSException(f"Failed to delete export {ganesha_export.export_id}: {e}") log.info("All exports successfully deleted for cluster id: %s", cluster_id) def list_all_exports(self) -> List[Dict[str, Any]]: r = [] for cluster_id, ls in self.exports.items(): - r.extend([e.to_dict() for e in ls]) + r.extend([ge.to_dict() for ge in ls]) return r def list_exports(self, @@ -485,10 +565,10 @@ class ExportMgr: self._validate_cluster_id(cluster_id) try: if detailed: - result_d = [export.to_dict() for export in self.exports[cluster_id]] + result_d = [ganesha_export.to_dict() for ganesha_export in self.exports[cluster_id]] return result_d else: - result_ps = [export.pseudo for export in self.exports[cluster_id]] + result_ps = [ganesha_export.pseudo for ganesha_export in self.exports[cluster_id]] return result_ps except KeyError: @@ -499,9 +579,9 @@ class ExportMgr: raise ErrorResponse.wrap(e) def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]: - export = self._fetch_export(cluster_id, pseudo_path) - if export: - return export.to_dict() + ganesha_export = self._fetch_export(cluster_id, pseudo_path) + if ganesha_export: + return ganesha_export.to_dict(full=True) log.warning(f"No {pseudo_path} export to show for {cluster_id}") return None @@ -524,16 +604,16 @@ class ExportMgr: cluster_id: str, export_id: int ) -> Optional[Dict[str, Any]]: - export = self._fetch_export_id(cluster_id, export_id) - return export.to_dict() if export else None + ganesha_export = self._fetch_export_id(cluster_id, export_id) + return ganesha_export.to_dict() if ganesha_export else None def get_export_by_pseudo( self, cluster_id: str, pseudo_path: str ) -> Optional[Dict[str, Any]]: - export = self._fetch_export(cluster_id, pseudo_path) - return export.to_dict() if export else None + ganesha_export = self._fetch_export(cluster_id, pseudo_path) + return ganesha_export.to_dict() if ganesha_export else None # This method is used by the dashboard module (../dashboard/controllers/nfs.py) # Do not change interface without updating the Dashboard code @@ -562,6 +642,9 @@ class ExportMgr: j = json.loads(export_config) except ValueError: # okay, not JSON. is it an EXPORT block? + # including CEPH block when passing an EXPORT block + # is not currently supported (use export json for that). + # TODO: add this support. try: blocks = GaneshaConfParser(export_config).parse() exports = [ @@ -578,9 +661,21 @@ class ExportMgr: def _change_export(self, cluster_id: str, export: Dict, earmark_resolver: Optional[CephFSEarmarkResolver] = None) -> Dict[str, Any]: + # if the export json has a ceph section (key), extract it from the export + # json to preserver backward compatability. + ceph_dict = {} + if "ceph" in export.keys(): + ceph_dict = export.pop("ceph") + if not "export" in export.keys(): + raise Exception('\'export\' key missing in export json') + export = export.pop("export") + msg = f'export_dict: {export}' + log.exception(msg) + msg = f'ceph_dict: {ceph_dict}' + log.exception(msg) try: - return self._apply_export(cluster_id, export, earmark_resolver) - except NotImplementedError: + return self._apply_export(cluster_id, export, earmark_resolver, ceph_dict) + except NotImplementedError as e: # in theory, the NotImplementedError here may be raised by a hook back to # an orchestration module. If the orchestration module supports it the NFS # servers may be restarted. If not supported the expectation is that an @@ -681,8 +776,7 @@ class ExportMgr: cluster_id: str, ex_id: int, ex_dict: Dict[str, Any], - earmark_resolver: Optional[CephFSEarmarkResolver] = None - ) -> Export: + earmark_resolver: Optional[CephFSEarmarkResolver] = None) -> Export: pseudo_path = ex_dict.get("pseudo") if not pseudo_path: raise NFSInvalidOperation("export must specify pseudo path") @@ -831,8 +925,8 @@ class ExportMgr: self, cluster_id: str, new_export_dict: Dict, - earmark_resolver: Optional[CephFSEarmarkResolver] = None - ) -> Dict[str, str]: + earmark_resolver: Optional[CephFSEarmarkResolver] = None, + ceph_dict: Optional[Dict] = {}) -> Dict[str, str]: for k in ['path', 'pseudo']: if k not in new_export_dict: raise NFSInvalidOperation(f'Export missing required field {k}') @@ -874,28 +968,36 @@ class ExportMgr: new_export_dict, earmark_resolver ) + ceph_block = None + log.debug(f'ceph_dict: {ceph_dict}') + if ceph_dict: + ceph_block = CephBlock.from_dict(ceph_dict) + + # use @ganesha_export in place of @new_export here onwards + ganesha_export = GaneshaExport(new_export, ceph_block) if not old_export: if new_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: # only for RGW self._create_rgw_export_user(new_export) - self._save_export(cluster_id, new_export) + self._save_export(cluster_id, ganesha_export) return {"pseudo": new_export.pseudo, "state": "added"} need_nfs_service_restart = True - if old_export.fsal.name != new_export.fsal.name: + if old_export.fsal.name != ganesha_export.fsal.name: raise NFSInvalidOperation('FSAL change not allowed') - if old_export.pseudo != new_export.pseudo: + if old_export.pseudo != ganesha_export.pseudo: log.debug('export %s pseudo %s -> %s', - new_export.export_id, old_export.pseudo, new_export.pseudo) + ganesha_export.export_id, old_export.pseudo, ganesha_export.pseudo) if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: old_fsal = cast(CephFSFSAL, old_export.fsal) - new_fsal = cast(CephFSFSAL, new_export.fsal) - self._ensure_cephfs_export_user(new_export) + new_fsal = cast(CephFSFSAL, ganesha_export.fsal) + self._ensure_cephfs_export_user(ganesha_export) need_nfs_service_restart = not (old_fsal.user_id == new_fsal.user_id and old_fsal.fs_name == new_fsal.fs_name and old_export.path == new_export.path - and old_export.pseudo == new_export.pseudo) + and old_export.pseudo == new_export.pseudo + and old_export.ceph_block == ganesha_export.ceph_block) if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: old_rgw_fsal = cast(RGWFSAL, old_export.fsal) @@ -908,9 +1010,10 @@ class ExportMgr: elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key: raise NFSInvalidOperation('secret_access_key change is not allowed') + self.exports[cluster_id].remove(old_export) - self._update_export(cluster_id, new_export, need_nfs_service_restart) + self._update_export(cluster_id, ganesha_export, need_nfs_service_restart) return {"pseudo": new_export.pseudo, "state": "updated"} diff --git a/src/pybind/mgr/nfs/ganesha_conf.py b/src/pybind/mgr/nfs/ganesha_conf.py index b612f278d5c..6946297cd75 100644 --- a/src/pybind/mgr/nfs/ganesha_conf.py +++ b/src/pybind/mgr/nfs/ganesha_conf.py @@ -360,6 +360,42 @@ class Client: 'squash': self.squash } +class CephBlock: + def __init__(self, + is_async: bool, + is_zerocopy: bool): + self.is_async = is_async + self.is_zerocopy = is_zerocopy + + @classmethod + def from_ceph_block(cls, ceph_block: RawBlock) -> 'CephBlock': + return cls(ceph_block.values.get('async', False), + ceph_block.values.get('zerocopy', False)) + + def to_ceph_block(self) -> RawBlock: + values = { + 'async': self.is_async, + 'zerocopy': self.is_zerocopy + } + result = RawBlock("CEPH", values=values) + return result + + @classmethod + def from_dict(cls, ex_dict: Dict[str, Any]) -> 'Export': + return cls(ex_dict.get('async', False), + ex_dict.get('zerocopy', False)) + + def to_dict(self) -> Dict[str, Any]: + values = { + 'async': self.is_async, + 'zerocopy': self.is_zerocopy + } + return values + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, CephBlock): + return False + return self.to_dict() == other.to_dict() class Export: def __init__( @@ -375,7 +411,7 @@ class Export: transports: List[str], fsal: FSAL, clients: Optional[List[Client]] = None, - sectype: Optional[List[str]] = None) -> None: + sectype: Optional[List[str]] = None): self.export_id = export_id self.path = path self.fsal = fsal -- 2.39.5