from .export import ExportMgr, AppliedExportResults
from .cluster import NFSCluster
-from .utils import available_clusters
+from .utils import available_clusters, cephfs_client_for_mgr
from .qos_conf import QOSType, QOSBandwidthControl, UserQoSType, QOSOpsControl
log = logging.getLogger(__name__)
) -> Dict[str, Any]:
"""Create a CephFS export"""
self.export_mgr.skip_notify_nfs_server = skip_notify_nfs_server
- earmark_resolver = CephFSEarmarkResolver(self)
+ earmark_resolver = CephFSEarmarkResolver(
+ self, client=cephfs_client_for_mgr(self))
return self.export_mgr.create_export(
fsal_type='cephfs',
fs_name=fsname,
cluster_id: str,
inbuf: str,
skip_notify_nfs_server: bool = False) -> AppliedExportResults:
- earmark_resolver = CephFSEarmarkResolver(self)
"""Create or update an export by `-i <json_or_ganesha_export_file>`"""
+ earmark_resolver = CephFSEarmarkResolver(
+ self, client=cephfs_client_for_mgr(self))
self.export_mgr.skip_notify_nfs_server = skip_notify_nfs_server
return self.export_mgr.apply_export(cluster_id, export_config=inbuf,
earmark_resolver=earmark_resolver)
def export_apply(self, cluster_id: str, export_config: str) -> AppliedExportResults:
"""Create or update an export by `export_config` which can be json string or ganesha export specification"""
- earmark_resolver = CephFSEarmarkResolver(self)
+ earmark_resolver = CephFSEarmarkResolver(
+ self, client=cephfs_client_for_mgr(self))
return self.export_mgr.apply_export(cluster_id, export_config=export_config,
earmark_resolver=earmark_resolver)
from contextlib import contextmanager
from unittest import mock
from unittest.mock import MagicMock
+import mgr_util
from mgr_module import MgrModule, NFS_POOL_NAME
from rados import ObjectNotFound
from ceph.utils import with_units_to_int, bytes_to_human
from nfs import Module
from nfs.export import ExportMgr, normalize_path
+from nfs.utils import cephfs_client_for_mgr
from nfs.ganesha_conf import GaneshaConfParser, Export
from nfs.qos_conf import (
RawBlock,
_validate_access_type(ok)
with pytest.raises(NFSInvalidOperation):
_validate_access_type("any")
+
+
+class TestCephfsClientForMgr:
+ @pytest.fixture(autouse=True)
+ def clear_cephfs_client_cache(self):
+ cephfs_client_for_mgr.cache_clear()
+ yield
+ cephfs_client_for_mgr.cache_clear()
+
+ def test_cephfs_client_for_mgr_returns_same_instance(self):
+ mgr = MagicMock()
+ with mock.patch('nfs.utils.CephfsClient') as mock_cephfs_cls:
+ mock_cephfs_cls.return_value = MagicMock()
+ first = cephfs_client_for_mgr(mgr)
+ second = cephfs_client_for_mgr(mgr)
+ assert first is second
+ mock_cephfs_cls.assert_called_once_with(mgr)
+
+ def test_multiple_earmark_resolvers_share_cached_cephfs_client(self):
+ mgr = MagicMock()
+ with mock.patch('nfs.utils.CephfsClient') as mock_cephfs_cls:
+ mock_cephfs_cls.return_value = MagicMock()
+ cached = cephfs_client_for_mgr(mgr)
+ r1 = mgr_util.CephFSEarmarkResolver(mgr=mgr, client=cached)
+ r2 = mgr_util.CephFSEarmarkResolver(mgr=mgr, client=cephfs_client_for_mgr(mgr))
+ assert r1._cephfs_client is r2._cephfs_client
+ mock_cephfs_cls.assert_called_once_with(mgr)
return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
+@functools.lru_cache(maxsize=1)
+def cephfs_client_for_mgr(mgr: 'Module') -> CephfsClient:
+ return CephfsClient(mgr)
+
+
def cephfs_path_is_dir(mgr: 'Module', fs: str, path: str) -> None:
- @functools.lru_cache(maxsize=1)
- def _get_cephfs_client() -> CephfsClient:
- return CephfsClient(mgr)
- cephfs_client = _get_cephfs_client()
+ cephfs_client = cephfs_client_for_mgr(mgr)
with open_filesystem(cephfs_client, fs) as fs_handle:
stx = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_MODE,