import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
+ Any, Set, TYPE_CHECKING, cast, Iterator, NamedTuple
import datetime
import os
from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
-from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
+from .utils import forall_hosts, cephadmNoImage
try:
import remoto
'username': username,
'password': password,
})
- out, err, code = self._run_cephadm(
+ out, err, code = CephadmServe(self)._run_cephadm(
host, 'mon', 'registry-login',
['--registry-json', '-'], stdin=args_str, error_ok=True)
if code:
'Check whether we can access and manage a remote host')
def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
try:
- out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True)
+ out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
if code:
return 1, '', ('check-host failed:\n' + '\n'.join(err))
except OrchestratorError as e:
'name=addr,type=CephString,req=false',
'Prepare a remote host for use with cephadm')
def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
- out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True)
+ out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
if code:
return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
# if we have an outstanding health alert for this host, give the
return image
- def _run_cephadm(self,
- host: str,
- entity: Union[CephadmNoImage, str],
- command: str,
- args: List[str],
- addr: Optional[str] = "",
- stdin: Optional[str] = "",
- no_fsid: Optional[bool] = False,
- error_ok: Optional[bool] = False,
- image: Optional[str] = "",
- env_vars: Optional[List[str]] = None,
- ) -> Tuple[List[str], List[str], int]:
- """
- Run cephadm on the remote host with the given command + args
-
- :env_vars: in format -> [KEY=VALUE, ..]
- """
- self.log.debug(f"_run_cephadm : command = {command}")
- self.log.debug(f"_run_cephadm : args = {args}")
-
- bypass_image = ('cephadm-exporter',)
-
- with self._remote_connection(host, addr) as tpl:
- conn, connr = tpl
- assert image or entity
- # Skip the image check for daemons deployed that are not ceph containers
- if not str(entity).startswith(bypass_image):
- if not image and entity is not cephadmNoImage:
- image = self._get_container_image(entity)
-
- final_args = []
-
- if env_vars:
- for env_var_pair in env_vars:
- final_args.extend(['--env', env_var_pair])
-
- if image:
- final_args.extend(['--image', image])
- final_args.append(command)
-
- if not no_fsid:
- final_args += ['--fsid', self._cluster_fsid]
-
- if self.container_init:
- final_args += ['--container-init']
-
- final_args += args
-
- self.log.debug('args: %s' % (' '.join(final_args)))
- if self.mode == 'root':
- if stdin:
- self.log.debug('stdin: %s' % stdin)
- script = 'injected_argv = ' + json.dumps(final_args) + '\n'
- if stdin:
- script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
- script += self._cephadm
- python = connr.choose_python()
- if not python:
- raise RuntimeError(
- 'unable to find python on %s (tried %s in %s)' % (
- host, remotes.PYTHONS, remotes.PATH))
- try:
- out, err, code = remoto.process.check(
- conn,
- [python, '-u'],
- stdin=script.encode('utf-8'))
- except RuntimeError as e:
- self._reset_con(host)
- if error_ok:
- return [], [str(e)], 1
- raise
- elif self.mode == 'cephadm-package':
- try:
- out, err, code = remoto.process.check(
- conn,
- ['sudo', '/usr/bin/cephadm'] + final_args,
- stdin=stdin)
- except RuntimeError as e:
- self._reset_con(host)
- if error_ok:
- return [], [str(e)], 1
- raise
- else:
- assert False, 'unsupported mode'
-
- self.log.debug('code: %d' % code)
- if out:
- self.log.debug('out: %s' % '\n'.join(out))
- if err:
- self.log.debug('err: %s' % '\n'.join(err))
- if code and not error_ok:
- raise OrchestratorError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
- return out, err, code
-
def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
"""
Returns all usable hosts that went through _refresh_host_daemons().
:param host: host name
"""
assert_valid_host(spec.hostname)
- out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
- ['--expect-hostname', spec.hostname],
- addr=spec.addr,
- error_ok=True, no_fsid=True)
+ out, err, code = CephadmServe(self)._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
+ ['--expect-hostname', spec.hostname],
+ addr=spec.addr,
+ error_ok=True, no_fsid=True)
if code:
# err will contain stdout and stderr, so we filter on the message text to
# only show the errors
raise OrchestratorError(msg, errno=rc)
# call the host-maintenance function
- out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
- ["enter"],
- error_ok=True)
+ out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
+ ["enter"],
+ error_ok=True)
if out:
raise OrchestratorError(
f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
if tgt_host['status'] != "maintenance":
raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
- out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
- ['exit'],
- error_ok=True)
+ out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
+ ['exit'],
+ error_ok=True)
if out:
raise OrchestratorError(
f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
name = daemon_spec.name()
for a in actions[action]:
try:
- out, err, code = self._run_cephadm(
+ out, err, code = CephadmServe(self)._run_cephadm(
host, name, 'unit',
['--name', name, a])
except Exception:
@trivial_completion
def zap_device(self, host: str, path: str) -> str:
self.log.info('Zap device %s:%s' % (host, path))
- out, err, code = self._run_cephadm(
+ out, err, code = CephadmServe(self)._run_cephadm(
host, 'osd', 'ceph-volume',
['--', 'lvm', 'zap', '--destroy', path],
error_ok=True)
host=host)
cmd_args = shlex.split(cmd_line)
- out, err, code = self._run_cephadm(
+ out, err, code = CephadmServe(self)._run_cephadm(
host, 'osd', 'shell', ['--'] + cmd_args,
error_ok=True)
if code:
if self.cache.host_needs_registry_login(host) and self.registry_url:
self._registry_login(host, self.registry_url,
self.registry_username, self.registry_password)
- out, err, code = self._run_cephadm(
+ out, err, code = CephadmServe(self)._run_cephadm(
host, '', 'pull', [],
image=image_name,
no_fsid=True,
import json
import logging
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any, Union, Tuple
+
+from cephadm import remotes
try:
import remoto
from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent
from cephadm.services.cephadmservice import CephadmDaemonSpec
from cephadm.schedule import HostAssignment
-from cephadm.upgrade import CEPH_UPGRADE_ORDER
-from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
+from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
+ CephadmNoImage, CEPH_UPGRADE_ORDER
from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
if TYPE_CHECKING:
return None
self.log.debug(' checking %s' % host)
try:
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, cephadmNoImage, 'check-host', [],
error_ok=True, no_fsid=True)
self.mgr.cache.update_last_host_check(host)
def _refresh_host_daemons(self, host: str) -> Optional[str]:
try:
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
if code:
return 'host %s cephadm ls returned %d: %s' % (
def _refresh_facts(self, host: str) -> Optional[str]:
try:
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, cephadmNoImage, 'gather-facts', [],
error_ok=True, no_fsid=True)
def _refresh_host_devices(self, host: str) -> Optional[str]:
try:
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, 'osd',
'ceph-volume',
['--', 'inventory', '--format=json', '--filter-for-batch'])
except Exception as e:
return 'host %s ceph-volume inventory failed: %s' % (host, e)
try:
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, 'mon',
'list-networks',
[],
'Reconfiguring' if reconfig else 'Deploying',
daemon_spec.name(), daemon_spec.host))
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
daemon_spec.host, daemon_spec.name(), 'deploy',
[
'--name', daemon_spec.name(),
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s' % (name, host))
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = self._run_cephadm(
host, name, 'rm-daemon', args)
if not code:
# remove item from cache
self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon)
return "Removed {} from host '{}'".format(name, host)
+
+ def _run_cephadm(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ addr: Optional[str] = "",
+ stdin: Optional[str] = "",
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ env_vars: Optional[List[str]] = None,
+ ) -> Tuple[List[str], List[str], int]:
+ """
+ Run cephadm on the remote host with the given command + args
+
+ Important: You probably don't want to run _run_cephadm from CLI handlers
+
+ :env_vars: in format -> [KEY=VALUE, ..]
+ """
+ self.log.debug(f"_run_cephadm : command = {command}")
+ self.log.debug(f"_run_cephadm : args = {args}")
+
+ bypass_image = ('cephadm-exporter',)
+
+ with self.mgr._remote_connection(host, addr) as tpl:
+ conn, connr = tpl
+ assert image or entity
+ # Skip the image check for daemons deployed that are not ceph containers
+ if not str(entity).startswith(bypass_image):
+ if not image and entity is not cephadmNoImage:
+ image = self.mgr._get_container_image(entity)
+
+ final_args = []
+
+ if env_vars:
+ for env_var_pair in env_vars:
+ final_args.extend(['--env', env_var_pair])
+
+ if image:
+ final_args.extend(['--image', image])
+ final_args.append(command)
+
+ if not no_fsid:
+ final_args += ['--fsid', self.mgr._cluster_fsid]
+
+ if self.mgr.container_init:
+ final_args += ['--container-init']
+
+ final_args += args
+
+ self.log.debug('args: %s' % (' '.join(final_args)))
+ if self.mgr.mode == 'root':
+ if stdin:
+ self.log.debug('stdin: %s' % stdin)
+ script = 'injected_argv = ' + json.dumps(final_args) + '\n'
+ if stdin:
+ script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
+ script += self.mgr._cephadm
+ python = connr.choose_python()
+ if not python:
+ raise RuntimeError(
+ 'unable to find python on %s (tried %s in %s)' % (
+ host, remotes.PYTHONS, remotes.PATH))
+ try:
+ out, err, code = remoto.process.check(
+ conn,
+ [python, '-u'],
+ stdin=script.encode('utf-8'))
+ except RuntimeError as e:
+ self.mgr._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+ elif self.mgr.mode == 'cephadm-package':
+ try:
+ out, err, code = remoto.process.check(
+ conn,
+ ['sudo', '/usr/bin/cephadm'] + final_args,
+ stdin=stdin)
+ except RuntimeError as e:
+ self.mgr._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+ else:
+ assert False, 'unsupported mode'
+
+ self.log.debug('code: %d' % code)
+ if out:
+ self.log.debug('out: %s' % '\n'.join(out))
+ if err:
+ self.log.debug('err: %s' % '\n'.join(err))
+ if code and not error_ok:
+ raise OrchestratorError(
+ 'cephadm exited with an error code: %d, stderr:%s' % (
+ code, '\n'.join(err)))
+ return out, err, code
code, '\n'.join(err)))
# check result
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
host, 'osd', 'ceph-volume',
[
'--',
split_cmd = cmd.split(' ')
_cmd = ['--config-json', '-', '--']
_cmd.extend(split_cmd)
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
host, 'osd', 'ceph-volume',
_cmd,
env_vars=env_vars,
new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
match_glob(new_mgr, 'myhost.*')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_host(self, cephadm_module):
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
with with_host(cephadm_module, 'test'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_service_ls(self, cephadm_module):
with with_host(cephadm_module, 'test'):
del o['events'] # delete it, as it contains a timestamp
assert out == expected
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
with with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
assert wait(cephadm_module, c) == [InventoryHost('test')]
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
dict(
name='rgw.myrgw.foobar',
c = cephadm_module.list_daemons()
assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
CephadmServe(cephadm_module)._check_daemons()
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
'redeploy'
]
)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
stdin='{"config": "\\n\\n[mon]\\nk=v\\n", "keyring": ""}',
image='')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
{'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
None)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
wait(cephadm_module, c)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
with pytest.raises(OrchestratorError):
out = cephadm_module.osd_service.find_destroyed_osds()
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
with with_host(cephadm_module, 'test'):
_run_cephadm.assert_called_with(
'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
with with_host(cephadm_module, 'test'):
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
_save_spec.assert_called_with(spec)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_prepare_drivegroup(self, cephadm_module):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
"CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
]
)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert out in exp_command
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
dict(
name='osd.0',
out = wait(cephadm_module, c)
assert out == []
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_rgw_update(self, cephadm_module):
with with_host(cephadm_module, 'host1'):
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
dict(
name='rgw.myrgw.myhost.myid',
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, spec, meth, 'test'):
pass
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_nfs(self, cephadm_module):
with with_host(cephadm_module, 'test'):
# automatically.
assert_rm_service(cephadm_module, 'nfs.name')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_iscsi(self, cephadm_module):
with with_host(cephadm_module, 'test'):
'ident'
]
)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
_run_cephadm.return_value = '{}', '', 0
with with_host(cephadm_module, 'test'):
_run_cephadm.assert_called_with('test', 'osd', 'shell', [
'--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
_run_cephadm.return_value = '{}', '', 0
with with_host(cephadm_module, 'test'):
_run_cephadm.assert_called_with('test', 'osd', 'shell', [
'--', 'echo', 'hello'], error_ok=True)
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
_run_cephadm.return_value = '{}', '', 0
with with_host(cephadm_module, 'mgr0'):
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, spec, meth, 'test'):
pass
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
spec = ServiceSpec(
with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
assert m.manage_etc_ceph_ceph_conf is True
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
def check_registry_credentials(url, username, password):
assert cephadm_module.get_module_option('registry_url') == url
assert err == 'Host test failed to login to fail-url as fail-user with given password'
check_registry_credentials('json-url', 'json-user', 'json-pass')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
'image_id': 'image_id',
'repo_digest': 'image@repo_digest',
})))
from tests import mock
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'host1', refresh_hosts=False):
hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])]
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'host1'):
cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({
)
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'host1'):
cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon', json.dumps({
)
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'host1'):
cephadm_module.set_store(SPEC_STORE_PREFIX + 'mds', json.dumps({
from .fixtures import _run_cephadm, wait, cephadm_module, with_host, with_service
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_upgrade_start(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
assert wait(cephadm_module, cephadm_module.upgrade_start(
assert wait(cephadm_module, cephadm_module.upgrade_stop()) == 'Stopped upgrade to image_id'
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@pytest.mark.parametrize("use_repo_digest",
[
False,
cephadm_module._mon_command_mock_versions = _versions_mock
- with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
'image_id': 'image_id',
'repo_digest': 'to_image@repo_digest',
}))):
assert cephadm_module.upgrade_status is not None
- with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
dict(
name=list(cephadm_module.cache.daemons['test'].keys())[0],
import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, NamedTuple
+from typing import TYPE_CHECKING, Optional, Dict
import orchestrator
-from cephadm.utils import name_to_config_section
+from cephadm.serve import CephadmServe
+from cephadm.utils import name_to_config_section, CEPH_UPGRADE_ORDER
from orchestrator import OrchestratorError, DaemonDescription
if TYPE_CHECKING:
from .module import CephadmOrchestrator
-# ceph daemon types that use the ceph container image.
-# NOTE: listed in upgrade order!
-CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
-
logger = logging.getLogger(__name__)
continue
# make sure host has latest container image
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'inspect-image', [],
image=target_image, no_fsid=True, error_ok=True)
if code or json.loads(''.join(out)).get('image_id') != target_id:
logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
- out, err, code = self.mgr._run_cephadm(
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'pull', [],
image=target_image, no_fsid=True, error_ok=True)
if code:
token = 1
+# ceph daemon types that use the ceph container image.
+# NOTE: listed in upgrade order!
+CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
+
+
# Used for _run_cephadm used for check-host etc that don't require an --image parameter
cephadmNoImage = CephadmNoImage.token