import errno
import json
import logging
-import subprocess
from typing import List, Any, Dict, Tuple, Optional, TYPE_CHECKING, TypeVar, Callable, cast
from os.path import normpath
except TimedOut:
log.exception("Ganesha timed out")
- def _exec(self, args: List[str]) -> Tuple[int, str, str]:
- try:
- util = args.pop(0)
- cmd = [
- util,
- '-k', str(self.mgr.get_ceph_option('keyring')),
- '-n', f'mgr.{self.mgr.get_mgr_id()}',
- ] + args
- log.debug('exec: ' + ' '.join(cmd))
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- timeout=10,
- )
- except subprocess.CalledProcessError as ex:
- log.error('Error executing <<%s>>: %s', ex.cmd, ex.output)
- except subprocess.TimeoutExpired:
- log.error('timeout (10s) executing <<%s>>', cmd)
- return p.returncode, p.stdout.decode(), p.stderr.decode()
-
@property
def exports(self) -> Dict[str, List[Export]]:
if self._exports is None:
elif isinstance(export.fsal, RGWFSAL):
rgwfsal = cast(RGWFSAL, export.fsal)
- ret, out, err = self._exec(['radosgw-admin', 'bucket', 'stats', '--bucket',
- export.path])
+ ret, out, err = self.mgr.tool_exec(
+ ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
+ )
if ret:
raise NFSException(f'Failed to fetch owner for bucket {export.path}')
j = json.loads(out)
owner = j.get('owner', '')
rgwfsal.user_id = owner
- ret, out, err = self._exec([
+ ret, out, err = self.mgr.tool_exec([
'radosgw-admin', 'user', 'info', '--uid', owner
])
if ret:
ex_dict["cluster_id"] = cluster_id
export = Export.from_dict(ex_id, ex_dict)
export.validate(self.mgr)
- log.debug("Successfully created %s export-%s from dict for cluster %s", fsal_type, ex_id, cluster_id)
+ log.debug("Successfully created %s export-%s from dict for cluster %s",
+ fsal_type, ex_id, cluster_id)
return export
def create_cephfs_export(self,
from contextlib import contextmanager
from unittest import mock
from unittest.mock import MagicMock
-from mgr_module import NFS_POOL_NAME
+from mgr_module import MgrModule, NFS_POOL_NAME
from rados import ObjectNotFound
return_value=[self.cluster_id]), \
mock.patch('nfs.export.restart_nfs_service'), \
mock.patch('nfs.cluster.restart_nfs_service'), \
- mock.patch('nfs.export.ExportMgr._exec', mock_exec), \
+ mock.patch.object(MgrModule, 'tool_exec', mock_exec), \
mock.patch('nfs.export.check_fs', return_value=True), \
mock.patch('nfs.export_utils.check_fs', return_value=True), \
mock.patch('nfs.export.ExportMgr._create_user_key',