import errno
import json
import logging
-from typing import List, Any, Dict, Tuple, Optional, TYPE_CHECKING, TypeVar, Callable, cast
+from typing import (
+ List,
+ Any,
+ Dict,
+ Tuple,
+ Optional,
+ TYPE_CHECKING,
+ TypeVar,
+ Callable,
+ Set,
+ cast)
from os.path import normpath
-from rados import TimedOut, ObjectNotFound, Rados
+from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+from orchestrator import NoOrchestrator
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
from .export_utils import GaneshaConfParser, Export, RawBlock, CephFSFSAL, RGWFSAL
from .exception import NFSException, NFSInvalidOperation, FSNotFound, \
ClusterNotFound
from .utils import (
+ CONF_PREFIX,
EXPORT_PREFIX,
USER_CONF_PREFIX,
export_obj_name,
"""
This method checks if cluster exists
"""
- if kwargs['cluster_id'] not in available_clusters(export.mgr):
+ try:
+ clusters = set(available_clusters(export.mgr))
+ except NoOrchestrator:
+ clusters = nfs_rados_configs(export.mgr.rados)
+ cluster_id: str = kwargs['cluster_id']
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
return -errno.ENOENT, "", "Cluster does not exist"
return func(export, *args, **kwargs)
return cast(FuncT, cluster_check)
return False
+def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
+ """Return a set of all the namespaces in the nfs_pool where nfs
+ configuration objects are found. The namespaces also correspond
+ to the cluster ids.
+ """
+ ns: Set[str] = set()
+ prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
+ with rados.open_ioctx(nfs_pool) as ioctx:
+ ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(prefixes):
+ ns.add(obj.nspace)
+ return ns
+
+
class ExportMgr:
def __init__(
self,