return self._cmd("orch", *args)
def _sys_cmd(self, cmd):
- cmd[0:0] = ['sudo']
ret = self.ctx.cluster.run(args=cmd, check_status=False, stdout=BytesIO(), stderr=BytesIO())
stdout = ret[0].stdout
if stdout:
}
def _check_nfs_server_status(self):
- res = self._sys_cmd(['systemctl', 'status', 'nfs-server'])
+ res = self._sys_cmd(['sudo', 'systemctl', 'status', 'nfs-server'])
if isinstance(res, bytes) and b'Active: active' in res:
self._disable_nfs()
def _disable_nfs(self):
log.info("Disabling NFS")
- self._sys_cmd(['systemctl', 'disable', 'nfs-server', '--now'])
+ self._sys_cmd(['sudo', 'systemctl', 'disable', 'nfs-server', '--now'])
def _fetch_nfs_status(self):
return self._orch_cmd('ps', f'--service_name={self.expected_name}')
}}
}}"""
port, ip = self._get_port_ip_info()
- self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster', 'config',
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'cluster', 'config',
'set', self.cluster_id, '-i', '-'], stdin=config)
time.sleep(30)
res = self._sys_cmd(['rados', '-p', pool, '-N', self.cluster_id, 'get',
'''
try:
cluster_id = 'invalidtest'
- self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster',
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'cluster',
'config', 'set', self.cluster_id, '-i', '-'], stdin='testing')
self.fail(f"User config set for non-existing cluster {cluster_id}")
except CommandFailedError as e:
Test creation of export via apply
'''
self._test_create_cluster()
- self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'apply',
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
self.cluster_id, '-i', '-'],
stdin=json.dumps({
"path": "/",
new_pseudo_path = '/testing'
export_block['pseudo'] = new_pseudo_path
export_block['access_type'] = 'RO'
- self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'apply',
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
self.cluster_id, '-i', '-'],
stdin=json.dumps(export_block))
self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed')
else:
export_block_new[key] = value
try:
- self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'apply',
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
self.cluster_id, '-i', '-'],
stdin=json.dumps(export_block_new))
except CommandFailedError: