],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.1",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": self.fs_name,
+ "cmount_path": "/",
},
"clients": []
}
return
self.fail(fail_msg)
- def _check_auth_ls(self, export_id=1, check_in=False):
+ def _check_auth_ls(self, fs_name, check_in=False):
'''
Tests export user id creation or deletion.
:param export_id: Denotes export number
output = self._cmd('auth', 'ls')
client_id = f'client.nfs.{self.cluster_id}'
if check_in:
- self.assertIn(f'{client_id}.{export_id}', output)
+ self.assertIn(f'{client_id}.{fs_name}', output)
else:
- self.assertNotIn(f'{client_id}.{export_id}', output)
+ self.assertNotIn(f'{client_id}.{fs_name}', output)
def _test_idempotency(self, cmd_func, cmd_args):
'''
# Runs the nfs export create command
self._cmd(*export_cmd)
# Check if user id for export is created
- self._check_auth_ls(export_id, check_in=True)
+ self._check_auth_ls(self.fs_name, check_in=True)
res = self._sys_cmd(['rados', '-p', NFS_POOL_NAME, '-N', self.cluster_id, 'get',
f'export-{export_id}', '-'])
# Check if export object is created
Delete an export.
'''
self._nfs_cmd('export', 'rm', self.cluster_id, self.pseudo_path)
- self._check_auth_ls()
+ self._check_auth_ls(self.fs_name)
def _test_list_export(self):
'''
self.sample_export['export_id'] = 2
self.sample_export['pseudo'] = self.pseudo_path + '1'
self.sample_export['access_type'] = 'RO'
- self.sample_export['fsal']['user_id'] = f'{self.expected_name}.2'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.{self.fs_name}.3746f603'
self.assertDictEqual(self.sample_export, nfs_output[1])
# Export-3 for subvolume with r only
self.sample_export['export_id'] = 3
self.sample_export['path'] = sub_vol_path
self.sample_export['pseudo'] = self.pseudo_path + '2'
- self.sample_export['fsal']['user_id'] = f'{self.expected_name}.3'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.{self.fs_name}.3746f603'
self.assertDictEqual(self.sample_export, nfs_output[2])
# Export-4 for subvolume
self.sample_export['export_id'] = 4
self.sample_export['pseudo'] = self.pseudo_path + '3'
self.sample_export['access_type'] = 'RW'
- self.sample_export['fsal']['user_id'] = f'{self.expected_name}.4'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.{self.fs_name}.3746f603'
self.assertDictEqual(self.sample_export, nfs_output[3])
def _get_export(self):
self._test_delete_cluster()
# Check if rados ganesha conf object is deleted
self._check_export_obj_deleted(conf_obj=True)
- self._check_auth_ls()
+ self._check_auth_ls(self.fs_name)
def test_exports_on_mgr_restart(self):
'''
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.1",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": self.fs_name
}
},
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.2",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": "invalid_fs_name" # invalid fs
}
},
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.3",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": self.fs_name
}
}
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.1",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": "invalid_fs_name" # invalid fs
}
}
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.1",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": self.fs_name
}
},
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.2",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": self.fs_name
}
},
"protocols": [4],
"fsal": {
"name": "CEPH",
- "user_id": "nfs.test.3",
+ "user_id": "nfs.test.nfs-cephfs.3746f603",
"fs_name": "invalid_fs_name"
}
}
EXPORT {
FSAL {
name = "CEPH";
- user_id = "nfs.foo.1";
filesystem = "a";
- secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ cmount_path = "/";
}
export_id = 1;
path = "/";
EXPORT {
FSAL {
name = "CEPH";
- user_id = "nfs.foo.1";
filesystem = "a";
- secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ cmount_path = "/";
}
export_id = 1;
path = "/secure/me";
protocols = 4;
transports = "TCP";
}
+"""
+ export_5 = """
+EXPORT {
+ Export_ID=3;
+ Protocols = 4;
+ Path = /;
+ Pseudo = /cephfs_b/;
+ Access_Type = RW;
+ Protocols = 4;
+ Attr_Expiration_Time = 0;
+
+ FSAL {
+ Name = CEPH;
+ Filesystem = "b";
+ User_Id = "nfs.foo.b.lgudhr";
+ Secret_Access_Key = "YOUR SECRET KEY HERE";
+ cmount_path = "/";
+ }
+}
"""
conf_nfs_foo = f'''
'foo': {
'export-1': TestNFS.RObject("export-1", self.export_1),
'export-2': TestNFS.RObject("export-2", self.export_2),
+ 'export-3': TestNFS.RObject("export-3", self.export_5),
'conf-nfs.foo': TestNFS.RObject("conf-nfs.foo", self.conf_nfs_foo)
}
}
export = Export.from_export_block(blocks[0], self.cluster_id)
self._validate_export_2(export)
+ def _validate_export_3(self, export: Export):
+ assert export.export_id == 3
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_b/"
+ assert export.access_type == "RW"
+ assert export.squash == "no_root_squash"
+ assert export.protocols == [4]
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "nfs.foo.b.lgudhr"
+ assert export.fsal.fs_name == "b"
+ assert export.fsal.sec_label_xattr == None
+ assert export.fsal.cmount_path == "/"
+ assert export.cluster_id == 'foo'
+ assert export.attr_expiration_time == 0
+ assert export.security_label == True
+
+ def test_export_parser_3(self) -> None:
+ blocks = GaneshaConfParser(self.export_5).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ self._validate_export_3(export)
+
def test_daemon_conf_parser(self) -> None:
blocks = GaneshaConfParser(self.conf_nfs_foo).parse()
assert isinstance(blocks, list)
ganesha_conf = ExportMgr(nfs_mod)
exports = ganesha_conf.exports[self.cluster_id]
- assert len(exports) == 2
+ assert len(exports) == 3
self._validate_export_1([e for e in exports if e.export_id == 1][0])
self._validate_export_2([e for e in exports if e.export_id == 2][0])
+ self._validate_export_3([e for e in exports if e.export_id == 3][0])
def test_config_dict(self) -> None:
self._do_mock_test(self._do_test_config_dict)
def test_update_export_with_list(self):
self._do_mock_test(self._do_test_update_export_with_list)
+
+ def test_update_export_cephfs(self):
+ self._do_mock_test(self._do_test_update_export_cephfs)
def _do_test_update_export_with_list(self):
nfs_mod = Module('nfs', '', '')
assert len(r.changes) == 2
export = conf._fetch_export('foo', '/rgw/bucket')
- assert export.export_id == 3
+ assert export.export_id == 4
assert export.path == "bucket"
assert export.pseudo == "/rgw/bucket"
assert export.access_type == "RW"
assert export.cluster_id == self.cluster_id
export = conf._fetch_export('foo', '/rgw/bucket2')
- assert export.export_id == 4
+ assert export.export_id == 5
assert export.path == "bucket2"
assert export.pseudo == "/rgw/bucket2"
assert export.access_type == "RO"
assert export.clients[0].access_type is None
assert export.cluster_id == self.cluster_id
+ def _do_test_update_export_cephfs(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 3,
+ 'path': '/',
+ 'cluster_id': self.cluster_id,
+ 'pseudo': '/cephfs_c',
+ 'access_type': 'RW',
+ 'squash': 'root_squash',
+ 'security_label': True,
+ 'protocols': [4],
+ 'transports': ['TCP', 'UDP'],
+ 'fsal': {
+ 'name': 'CEPH',
+ 'fs_name': 'c',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ export = conf._fetch_export('foo', '/cephfs_c')
+ assert export.export_id == 3
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_c"
+ assert export.access_type == "RW"
+ assert export.squash == "root_squash"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP", "UDP"]
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.cmount_path == "/"
+ assert export.fsal.user_id == "nfs.foo.c.02de2980"
+ assert export.cluster_id == self.cluster_id
+
def test_remove_export(self) -> None:
self._do_mock_test(self._do_test_remove_export)
def _do_test_remove_export(self) -> None:
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
- assert len(conf.exports[self.cluster_id]) == 2
+ assert len(conf.exports[self.cluster_id]) == 3
conf.delete_export(cluster_id=self.cluster_id,
pseudo_path="/rgw")
exports = conf.exports[self.cluster_id]
- assert len(exports) == 1
+ assert len(exports) == 2
assert exports[0].export_id == 1
def test_create_export_rgw_bucket(self):
conf = ExportMgr(nfs_mod)
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 2
+ assert len(ls) == 3
r = conf.create_export(
fsal_type='rgw',
assert r["bind"] == "/mybucket"
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 3
+ assert len(ls) == 4
export = conf._fetch_export('foo', '/mybucket')
assert export.export_id
conf = ExportMgr(nfs_mod)
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 2
+ assert len(ls) == 3
r = conf.create_export(
fsal_type='rgw',
assert r["bind"] == "/mybucket"
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 3
+ assert len(ls) == 4
export = conf._fetch_export('foo', '/mybucket')
assert export.export_id
conf = ExportMgr(nfs_mod)
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 2
+ assert len(ls) == 3
r = conf.create_export(
fsal_type='rgw',
assert r["bind"] == "/mybucket"
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 3
+ assert len(ls) == 4
export = conf._fetch_export('foo', '/mybucket')
assert export.export_id
def test_create_export_cephfs(self):
self._do_mock_test(self._do_test_create_export_cephfs)
+
+ def test_create_export_cephfs_with_cmount_path(self):
+ self._do_mock_test(self._do_test_create_export_cephfs_with_cmount_path)
def _do_test_create_export_cephfs(self):
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 2
+ assert len(ls) == 3
r = conf.create_export(
fsal_type='cephfs',
assert r["bind"] == "/cephfs2"
ls = conf.list_exports(cluster_id=self.cluster_id)
- assert len(ls) == 3
+ assert len(ls) == 4
export = conf._fetch_export('foo', '/cephfs2')
assert export.export_id
assert export.protocols == [4]
assert export.transports == ["TCP"]
assert export.fsal.name == "CEPH"
- assert export.fsal.user_id == "nfs.foo.3"
+ assert export.fsal.user_id == "nfs.foo.myfs.86ca58ef"
assert export.fsal.cephx_key == "thekeyforclientabc"
assert len(export.clients) == 1
assert export.clients[0].squash == 'root'
assert export.clients[0].access_type == 'rw'
assert export.clients[0].addresses == ["192.168.1.0/8"]
assert export.cluster_id == self.cluster_id
+
+ def _do_test_create_export_cephfs_with_cmount_path(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 3
+
+ r = conf.create_export(
+ fsal_type='cephfs',
+ cluster_id=self.cluster_id,
+ fs_name='myfs',
+ path='/',
+ pseudo_path='/cephfs3',
+ read_only=False,
+ squash='root',
+ cmount_path='/',
+ )
+ assert r["bind"] == "/cephfs3"
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 4
+
+ export = conf._fetch_export('foo', '/cephfs3')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs3"
+ assert export.access_type == "RW"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "nfs.foo.myfs.86ca58ef"
+ assert export.fsal.cephx_key == "thekeyforclientabc"
+ assert export.fsal.cmount_path == "/"
+ assert export.cluster_id == self.cluster_id
def _do_test_cluster_ls(self):
nfs_mod = Module('nfs', '', '')