import json
import time
import logging
-from io import BytesIO
+from io import BytesIO, StringIO
from tasks.mgr.mgr_test_case import MgrTestCase
from teuthology import contextutil
def _nfs_cmd(self, *args):
return self._cmd("nfs", *args)
+ def _nfs_complete_cmd(self, cmd):
+ return self.mgr_cluster.mon_manager.run_cluster_cmd(args=f"nfs {cmd}",
+ stdout=StringIO(),
+ stderr=StringIO(),
+ check_status=False)
+
def _orch_cmd(self, *args):
return self._cmd("orch", *args)
'''
Test single nfs cluster deployment.
'''
- # Disable any running nfs ganesha daemon
- self._check_nfs_server_status()
- self._nfs_cmd('cluster', 'create', self.cluster_id)
- # Check for expected status and daemon name (nfs.<cluster_id>)
- self._check_nfs_cluster_status('running', 'NFS Ganesha cluster deployment failed')
+ with contextutil.safe_while(sleep=4, tries=10) as proceed:
+ while proceed():
+ try:
+ # Disable any running nfs ganesha daemon
+ self._check_nfs_server_status()
+ cluster_create = self._nfs_complete_cmd(
+ f'cluster create {self.cluster_id}')
+ if cluster_create.stderr and 'cluster already exists' \
+ in cluster_create.stderr.getvalue():
+ self._test_delete_cluster()
+ continue
+ # Check for expected status and daemon name
+ # (nfs.<cluster_id>)
+ self._check_nfs_cluster_status(
+ 'running', 'NFS Ganesha cluster deployment failed')
+ break
+ except (AssertionError, CommandFailedError) as e:
+ log.warning(f'{e}, retrying')
def _test_delete_cluster(self):
'''