self.path = "/"
self.fs_name = "nfs-cephfs"
self.expected_name = "nfs.ganesha-test"
+ self.sample_export = {
+ "export_id": 1,
+ "path": self.path,
+ "cluster_id": self.cluster_id,
+ "pseudo": self.pseudo_path,
+ "access_type": "RW",
+ "squash": "no_root_squash",
+ "security_label": True,
+ "protocols": [
+ 4
+ ],
+ "transports": [
+ "TCP"
+ ],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "test1",
+ "fs_name": self.fs_name,
+ "sec_label_xattr": ''
+ },
+ "clients": []
+ }
def _check_port_status(self):
log.info("NETSTAT")
nfs_output = json.loads(self._nfs_cmd('export', 'ls', self.cluster_id))
self.assertIn(self.pseudo_path, nfs_output)
+ def _test_list_detailed(self, sub_vol_path):
+ '''
+ Test listing of created exports with detailed option.
+ :param sub_vol_path: Denotes path of subvolume
+ '''
+ nfs_output = json.loads(self._nfs_cmd('export', 'ls', self.cluster_id, '--detailed'))
+ # Export-1 with default values (access type = rw and path = '\')
+ self.assertDictEqual(self.sample_export, nfs_output[0])
+ # Export-2 with r only
+ self.sample_export['export_id'] = 2
+ self.sample_export['pseudo'] = self.pseudo_path + '1'
+ self.sample_export['access_type'] = 'RO'
+ self.sample_export['fsal']['user_id'] = self.cluster_id + '2'
+ self.assertDictEqual(self.sample_export, nfs_output[1])
+ # Export-3 for subvolume with r only
+ self.sample_export['export_id'] = 3
+ self.sample_export['path'] = sub_vol_path
+ self.sample_export['pseudo'] = self.pseudo_path + '2'
+ self.sample_export['fsal']['user_id'] = self.cluster_id + '3'
+ self.assertDictEqual(self.sample_export, nfs_output[2])
+ # Export-4 for subvolume
+ self.sample_export['export_id'] = 4
+ self.sample_export['pseudo'] = self.pseudo_path + '3'
+ self.sample_export['access_type'] = 'RW'
+ self.sample_export['fsal']['user_id'] = self.cluster_id + '4'
+ self.assertDictEqual(self.sample_export, nfs_output[3])
+
+ def _test_get_export(self):
+ '''
+ Test fetching of created export.
+ '''
+ nfs_output = json.loads(self._nfs_cmd('export', 'get', self.cluster_id, self.pseudo_path))
+ self.assertDictEqual(self.sample_export, nfs_output)
+
def _check_export_obj_deleted(self, conf_obj=False):
'''
Test if export or config object are deleted successfully.
if e.exitstatus != errno.EINVAL:
raise
- def test_export_create_and_delete(self):
+ def test_create_and_delete_export(self):
'''
Test successful creation and deletion of the cephfs export.
'''
self._create_default_export()
+ self._test_get_export()
self._delete_export()
# Check if rados export object is deleted
self._check_export_obj_deleted()
'''
Test creating multiple exports with different access type and path.
'''
- #Export-1 with default values (access type = rw and path = '\')
+ # Export-1 with default values (access type = rw and path = '\')
self._create_default_export()
- #Export-2 with r only
+ # Export-2 with r only
self._create_export(export_id='2', extra_cmd=[self.pseudo_path+'1', '--readonly'])
- #Export-3 for subvolume with r only
+ # Export-3 for subvolume with r only
self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol')
- fs_path = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol')
- self._create_export(export_id='3', extra_cmd=[self.pseudo_path+'2', '--readonly', fs_path.strip()])
- #Export-4 for subvolume
- self._create_export(export_id='4', extra_cmd=[self.pseudo_path+'3', fs_path.strip()])
+ fs_path = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol').strip()
+ self._create_export(export_id='3', extra_cmd=[self.pseudo_path+'2', '--readonly', fs_path])
+ # Export-4 for subvolume
+ self._create_export(export_id='4', extra_cmd=[self.pseudo_path+'3', fs_path])
+ # Check if exports gets listed
+ self._test_list_detailed(fs_path)
self._test_delete_cluster()
# Check if rados ganesha conf object is deleted
self._check_export_obj_deleted(conf_obj=True)