+import errno
import json
import time
import logging
wait_time += 10
self.fail("NFS Ganesha cluster could not be deleted")
- def _test_list_cluster(self):
+ def _test_list_cluster(self, empty=False):
+ if empty:
+ cluster_id = ''
+ else:
+ cluster_id = self.cluster_id
nfs_output = self._nfs_cmd('cluster', 'ls')
- self.assertEqual(self.cluster_id, nfs_output)
+ self.assertEqual(cluster_id, nfs_output.strip())
def _create_export(self, export_id, create_fs=False, extra_cmd=None):
if create_fs:
self._test_create_cluster()
self._test_list_cluster()
self._test_delete_cluster()
+ # List clusters again to ensure no cluster is shown
+ self._test_list_cluster(empty=True)
def test_create_delete_cluster_idempotency(self):
self._test_idempotency(self._test_create_cluster, ['nfs', 'cluster', 'create', self.export_type,
self.cluster_id])
self._test_idempotency(self._test_delete_cluster, ['nfs', 'cluster', 'delete', self.cluster_id])
+ def test_create_cluster_with_invalid_cluster_id(self):
+ try:
+ invalid_cluster_id = '/cluster_test'
+ self._nfs_cmd('cluster', 'create', self.export_type, invalid_cluster_id)
+ self.fail(f"Cluster successfully created with invalid cluster id {invalid_cluster_id}")
+ except CommandFailedError as e:
+ # Command should fail for test to pass
+ if e.exitstatus != errno.EINVAL:
+ raise
+
+ def test_create_cluster_with_invalid_export_type(self):
+ try:
+ invalid_export_type = 'rgw' # Only cephfs is valid
+ self._nfs_cmd('cluster', 'create', invalid_export_type, self.cluster_id)
+ self.fail(f"Cluster successfully created with invalid export type {invalid_export_type}")
+ except CommandFailedError as e:
+ # Command should fail for test to pass
+ if e.exitstatus != errno.EINVAL:
+ raise
+
def test_export_create_and_delete(self):
self._create_default_export()
self._delete_export()