from mgr_module import HandleCommandResult
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
-import rados
from orchestrator import DaemonDescription
def create_rados_config_obj(self,
spec: NFSServiceSpec,
clobber: bool = False) -> None:
- with self.mgr.rados.open_ioctx(spec.pool) as ioctx:
- if spec.namespace:
- ioctx.set_namespace(spec.namespace)
-
- obj = spec.rados_config_name()
- exists = True
- try:
- ioctx.stat(obj)
- except rados.ObjectNotFound:
- exists = False
-
- if exists and not clobber:
- # Assume an existing config
- logger.info('Rados config object exists: %s' % obj)
- else:
- # Create an empty config object
- logger.info('Creating rados config object: %s' % obj)
- ioctx.write_full(obj, ''.encode('utf-8'))
+ objname = spec.rados_config_name()
+ cmd = [
+ 'rados',
+ '-n', f"mgr.{self.mgr.get_mgr_id()}",
+ '-k', str(self.mgr.get_ceph_option('keyring')),
+ '-p', cast(str, spec.pool),
+ ]
+ if spec.namespace:
+ cmd += ['--namespace', spec.namespace]
+ result = subprocess.run(
+ cmd + ['get', objname, '-'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ timeout=10)
+ if not result.returncode and not clobber:
+ logger.info('Rados config object exists: %s' % objname)
+ else:
+ logger.info('Creating rados config object: %s' % objname)
+ result = subprocess.run(
+ cmd + ['put', objname, '-'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ timeout=10)
+ if result.returncode:
+ self.mgr.log.warning(
+ f'Unable to create rados config object {objname}: {result.stderr.decode("utf-8")}'
+ )
+ raise RuntimeError(result.stderr.decode("utf-8"))
def create_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
daemon_id = daemon_spec.daemon_id
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
- @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
def test_nfs(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
@mock.patch("subprocess.run", None)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, spec, meth, 'test'):