result.extend(daemons)
return result
"""
+ def mock_exec(cls, args):
+ u = {
+ "user_id": "abc",
+ "display_name": "foo",
+ "email": "",
+ "suspended": 0,
+ "max_buckets": 1000,
+ "subusers": [],
+ "keys": [
+ {
+ "user": "abc",
+ "access_key": "the_access_key",
+ "secret_key": "the_secret_key"
+ }
+ ],
+ "swift_keys": [],
+ "caps": [],
+ "op_mask": "read, write, delete",
+ "default_placement": "",
+ "default_storage_class": "",
+ "placement_tags": [],
+ "bucket_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "user_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "temp_url_keys": [],
+ "type": "rgw",
+ "mfa_ids": []
+ }
+ if args[2] == 'list':
+ return 0, json.dumps([u]), ''
+ return 0, json.dumps(u), ''
with mock.patch('nfs.module.Module.describe_service') as describe_service, \
mock.patch('nfs.module.Module.rados') as rados, \
mock.patch('nfs.export.available_clusters',
return_value=self.clusters.keys()), \
- mock.patch('nfs.export.restart_nfs_service'):
+ mock.patch('nfs.export.restart_nfs_service'), \
+ mock.patch('nfs.export.ExportMgr._exec', mock_exec), \
+ mock.patch('nfs.export.check_fs', return_value=True), \
+ mock.patch('nfs.export_utils.check_fs', return_value=True), \
+ mock.patch('nfs.export.FSExport._create_user_key',
+ return_value=('client.abc', 'thekeyforclientabc')):
rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
assert len(exports) == 1
assert exports[0].export_id == 1
- """
def test_create_export_rgw(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_create_export_rgw(cluster_id, info['exports'])
- self._reset_temp_store()
+ with self._mock_orchestrator(True):
+ for cluster_id, info in self.clusters.items():
+ self._do_test_create_export_rgw(cluster_id, info['exports'])
+ self._reset_temp_store()
def _do_test_create_export_rgw(self, cluster_id, expected_exports):
- ganesha.RgwClient = MagicMock()
- admin_inst_mock = MagicMock()
- admin_inst_mock.get_user_keys.return_value = {
- 'access_key': 'access_key2',
- 'secret_key': 'secret_key2'
- }
- ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
- conf = ExportMgr.instance(cluster_id)
- ex_id = conf.create_export({
- 'daemons': expected_exports[3],
- 'path': 'bucket',
- 'pseudo': '/rgw/bucket',
- 'tag': 'bucket_tag',
- 'cluster_id': cluster_id,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': False,
- 'protocols': [4, 3],
- 'transports': ['TCP', 'UDP'],
- 'clients': [{
- 'addresses': ["192.168.0.0/16"],
- 'access_type': None,
- 'squash': None
- }],
- 'fsal': {
- 'name': 'RGW',
- 'rgw_user_id': 'testuser'
- }
- })
+ exports = conf.list_exports(cluster_id=cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=cluster_id,
+ bucket='bucket',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
- conf = ExportMgr.instance(cluster_id)
exports = conf.list_exports(cluster_id=cluster_id)
- assert len(exports) == 3
- export = conf.get_export(ex_id)
- assert export.export_id == ex_id
- assert export.path, "bucket")
- assert export.pseudo, "/rgw/bucket")
- assert export.tag, "bucket_tag")
- assert export.access_type == "RW"
- assert export.squash == "all_squash"
- assert export.protocols == {4, 3}
- assert export.transports == {"TCP", "UDP"}
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
assert export.fsal.name == "RGW"
- assert export.fsal.rgw_user_id == "testuser"
- assert export.fsal.access_key, "access_key2")
- assert export.fsal.secret_key, "secret_key2")
- assert len(export.clients), 1)
- assert export.clients[0].user_id == "ganesha"
- self.assertIsNone(export.clients[0].squash)
- assert export.clients[0].access_type is None
- assert export.daemons== set(expected_exports[3])
+ assert export.fsal.user_id == "nfs.foo.bucket"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
assert export.cluster_id == cluster_id
def test_create_export_cephfs(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_create_export_cephfs(cluster_id, info['exports'])
- self._reset_temp_store()
+ with self._mock_orchestrator(True):
+ for cluster_id, info in self.clusters.items():
+ self._do_test_create_export_cephfs(cluster_id, info['exports'])
+ self._reset_temp_store()
def _do_test_create_export_cephfs(self, cluster_id, expected_exports):
- ganesha.CephX = MagicMock()
- ganesha.CephX.list_clients.return_value = ["fs"]
- ganesha.CephX.get_client_key.return_value = "fs_key"
-
- ganesha.CephFS = MagicMock()
- ganesha.CephFS.dir_exists.return_value = True
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
- conf = ExportMgr.instance(cluster_id)
- ex_id = conf.create_export({
- 'daemons': expected_exports[3],
- 'path': '/',
- 'pseudo': '/cephfs2',
- 'cluster_id': cluster_id,
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': True,
- 'protocols': [4],
- 'transports': ['TCP'],
- 'clients': [],
- 'fsal': {
- 'name': 'CEPH',
- 'user_id': 'fs',
- 'fs_name': None,
- 'sec_label_xattr': 'security.selinux'
- }
- })
+ exports = conf.list_exports(cluster_id=cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='cephfs',
+ cluster_id=cluster_id,
+ fs_name='myfs',
+ path='/',
+ pseudo_path='/cephfs2',
+ read_only=False,
+ squash='root',
+ addr=["192.168.1.0/8"],
+ )
+ assert r[0] == 0
- conf = ExportMgr.instance(cluster_id)
exports = conf.list_exports(cluster_id=cluster_id)
- assert len(exports) == 3
- export = conf.get_export(ex_id)
- assert export.export_id == ex_id
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/cephfs2')
+ assert export.export_id
assert export.path == "/"
assert export.pseudo == "/cephfs2"
- assert export.tag is None
- assert export.access_type == "RW"
- assert export.squash == "all_squash"
- assert export.protocols == {4}
- assert export.transports == {"TCP"}
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
assert export.fsal.name == "CEPH"
- assert export.fsal.user_id == "fs"
- assert export.fsal.cephx_key == "fs_key"
- assert export.fsal.sec_label_xattr == "security.selinux"
- assert export.fsal.fs_name is None
- assert len(export.clients) == 0
- assert export.daemons== set(expected_exports[3])
+ assert export.fsal.user_id == "nfs.foo.3"
+ assert export.fsal.cephx_key == "thekeyforclientabc"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.1.0/8"]
assert export.cluster_id == cluster_id
- assert export.attr_expiration_time == 0
- assert export
- """
+
"""
def test_reload_daemons(self):
# Fail to import call in Python 3.8, see https://bugs.python.org/issue35753