self.sample_export['fsal']['user_id'] = self.cluster_id + '4'
self.assertDictEqual(self.sample_export, nfs_output[3])
+ def _get_export(self):
+ '''
+ Returns export block in json format
+ '''
+ return json.loads(self._nfs_cmd('export', 'get', self.cluster_id, self.pseudo_path))
+
def _test_get_export(self):
'''
Test fetching of created export.
'''
- nfs_output = json.loads(self._nfs_cmd('export', 'get', self.cluster_id, self.pseudo_path))
+ nfs_output = self._get_export()
self.assertDictEqual(self.sample_export, nfs_output)
def _check_export_obj_deleted(self, conf_obj=False):
finally:
self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
+ def _write_to_read_only_export(self, pseudo_path, port, ip):
+ '''
+ Check if write to read only export fails
+ '''
+ try:
+ self._test_mnt(pseudo_path, port, ip)
+ except CommandFailedError as e:
+ # Write to cephfs export should fail for test to pass
+ if e.exitstatus != errno.EPERM:
+ raise
+
def test_create_and_delete_cluster(self):
'''
Test successful creation and deletion of the nfs cluster.
self._test_create_cluster()
self._create_export(export_id='1', create_fs=True, extra_cmd=[self.pseudo_path, '--readonly'])
port, ip = self._get_port_ip_info()
- try:
- self._test_mnt(self.pseudo_path, port, ip)
- except CommandFailedError as e:
- # Write to cephfs export should fail for test to pass
- if e.exitstatus != errno.EPERM:
- raise
+ self._write_to_read_only_export(self.pseudo_path, port, ip)
self._test_delete_cluster()
def test_cluster_info(self):
# Command should fail for test to pass
if e.exitstatus != errno.ENOENT:
raise
+
+ def test_update_export(self):
+ '''
+ Test update of exports
+ '''
+ self._create_default_export()
+ port, ip = self._get_port_ip_info()
+ self._test_mnt(self.pseudo_path, port, ip)
+ export_block = self._get_export()
+ new_pseudo_path = '/testing'
+ export_block['pseudo'] = new_pseudo_path
+ export_block['access_type'] = 'RO'
+ self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'update', '-i', '-'],
+ stdin=json.dumps(export_block))
+ self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed')
+ self._write_to_read_only_export(new_pseudo_path, port, ip)
+ self._test_delete_cluster()
+
+ def test_update_export_with_invalid_values(self):
+ '''
+ Test update of export with invalid values
+ '''
+ self._create_default_export()
+ export_block = self._get_export()
+
+ def update_with_invalid_values(key, value, fsal=False):
+ export_block_new = dict(export_block)
+ if fsal:
+ export_block_new['fsal'] = dict(export_block['fsal'])
+ export_block_new['fsal'][key] = value
+ else:
+ export_block_new[key] = value
+ try:
+ self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'update', '-i', '-'],
+ stdin=json.dumps(export_block_new))
+ except CommandFailedError:
+ pass
+
+ update_with_invalid_values('export_id', 9)
+ update_with_invalid_values('cluster_id', 'testing_new')
+ update_with_invalid_values('pseudo', 'test_relpath')
+ update_with_invalid_values('access_type', 'W')
+ update_with_invalid_values('squash', 'no_squash')
+ update_with_invalid_values('security_label', 'invalid')
+ update_with_invalid_values('protocols', [2])
+ update_with_invalid_values('transports', ['UD'])
+ update_with_invalid_values('name', 'RGW', True)
+ update_with_invalid_values('user_id', 'testing_export', True)
+ update_with_invalid_values('fs_name', 'b', True)
+ self._test_delete_cluster()