# TODO Add test for cluster update when ganesha can be deployed on multiple ports.
class TestNFS(MgrTestCase):
- def _cmd(self, *args):
+ def _cmd(self, *args, stdin=''):
+ if stdin:
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args, stdin=stdin)
return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
def _nfs_cmd(self, *args):
}]}
self.assertDictEqual(info_output, host_details)
self._test_delete_cluster()
+
+ def test_cluster_set_reset_user_config(self):
+ '''
+ Test cluster is created using user config and reverts back to default
+ config on reset.
+ '''
+ self._test_create_cluster()
+ time.sleep(30)
+
+ pool = 'nfs-ganesha'
+ user_id = 'test'
+ fs_name = 'user_test_fs'
+ self._cmd('fs', 'volume', 'create', fs_name)
+ time.sleep(20)
+ key = self._cmd('auth', 'get-or-create-key', f'client.{user_id}', 'mon',
+ 'allow r', 'osd',
+ f'allow rw pool={pool} namespace={self.cluster_id}, allow rw tag cephfs data={fs_name}',
+ 'mds', f'allow rw path={self.path}').strip()
+ config = f""" LOG {{
+ Default_log_level = FULL_DEBUG;
+ }}
+
+ EXPORT {{
+ Export_Id = 100;
+ Transports = TCP;
+ Path = /;
+ Pseudo = /ceph/;
+ Protocols = 4;
+ Access_Type = RW;
+ Attr_Expiration_Time = 0;
+ Squash = None;
+ FSAL {{
+ Name = CEPH;
+ Filesystem = {fs_name};
+ User_Id = {user_id};
+ Secret_Access_Key = '{key}';
+ }}
+ }}"""
+ #{'test': [{'hostname': 'smithi068', 'ip': ['172.21.15.68'], 'port': 2049}]}
+ info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))['test'][0]
+ mnt_cmd = ['sudo', 'mount', '-t', 'nfs', '-o', f'port={info_output["port"]}', f'{info_output["ip"][0]}:/ceph', '/mnt']
+ MNT_FAILED = 32
+ self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster', 'config',
+ 'set', self.cluster_id, '-i', '-'], stdin=config)
+ time.sleep(30)
+ log.info(self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls']))
+ res = self._sys_cmd(['rados', '-p', pool, '-N', self.cluster_id, 'get',
+ f'userconf-nfs.ganesha-{user_id}', '-'])
+ self.assertEqual(config, res.decode('utf-8'))
+ self.ctx.cluster.run(args=mnt_cmd)
+ self.ctx.cluster.run(args=['sudo', 'touch', '/mnt/test'])
+ out_mnt = self._sys_cmd(['sudo', 'ls', '/mnt'])
+ self.assertEqual(out_mnt, b'test\n')
+ self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
+ self._nfs_cmd('cluster', 'config', 'reset', self.cluster_id)
+ rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
+ if b'conf-nfs' not in rados_obj_ls and b'userconf-nfs' in rados_obj_ls:
+ self.fail("User config not deleted")
+ time.sleep(30)
+ try:
+ self.ctx.cluster.run(args=mnt_cmd)
+ except CommandFailedError as e:
+ if e.exitstatus != MNT_FAILED:
+ raise
+ self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
+ self._test_delete_cluster()
+ time.sleep(30)
+
+ def test_cluster_set_user_config_with_non_existing_clusterid(self):
+ '''
+ Test setting user config for non-existing nfs cluster.
+ '''
+ try:
+ cluster_id = 'invalidtest'
+ self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster',
+ 'config', 'set', self.cluster_id, '-i', '-'], stdin='testing')
+ self.fail(f"User config set for non-existing cluster {cluster_id}")
+ except CommandFailedError as e:
+ # Command should fail for test to pass
+ if e.exitstatus != errno.ENOENT:
+ raise
+
+ def test_cluster_reset_user_config_with_non_existing_clusterid(self):
+ '''
+ Test resetting user config for non-existing nfs cluster.
+ '''
+ try:
+ cluster_id = 'invalidtest'
+ self._nfs_cmd('cluster', 'config', 'reset', cluster_id)
+ self.fail(f"User config reset for non-existing cluster {cluster_id}")
+ except CommandFailedError as e:
+ # Command should fail for test to pass
+ if e.exitstatus != errno.ENOENT:
+ raise