from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
-from .schedule import HostAssignment, HostPlacementSpec
+from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
return "Removed {} from host '{}'".format(name, host)
- def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
- fn = {
- 'mds': self.mds_service.config,
- 'rgw': self.rgw_service.config,
- 'nfs': self.nfs_service.config,
- 'iscsi': self.iscsi_service.config,
- }.get(service_type)
- return cast(Callable[[ServiceSpec], None], fn)
-
- def _apply_service(self, spec: ServiceSpec) -> bool:
- """
- Schedule a service. Deploy new daemons or remove old ones, depending
- on the target label and count specified in the placement.
- """
- self.migration.verify_no_migration()
-
- daemon_type = spec.service_type
- service_name = spec.service_name()
- if spec.unmanaged:
- self.log.debug('Skipping unmanaged service %s' % service_name)
- return False
- if spec.preview_only:
- self.log.debug('Skipping preview_only service %s' % service_name)
- return False
- self.log.debug('Applying service %s spec' % service_name)
-
- config_func = self._config_fn(daemon_type)
-
- if daemon_type == 'osd':
- self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
- # TODO: return True would result in a busy loop
- # can't know if daemon count changed; create_from_spec doesn't
- # return a solid indication
- return False
-
- daemons = self.cache.get_daemons_by_service(service_name)
-
- public_network = None
- if daemon_type == 'mon':
- ret, out, err = self.check_mon_command({
- 'prefix': 'config get',
- 'who': 'mon',
- 'key': 'public_network',
- })
- if '/' in out:
- public_network = out.strip()
- self.log.debug('mon public_network is %s' % public_network)
-
- def matches_network(host):
- # type: (str) -> bool
- if not public_network:
- return False
- # make sure we have 1 or more IPs for that network on that
- # host
- return len(self.cache.networks[host].get(public_network, [])) > 0
-
- ha = HostAssignment(
- spec=spec,
- hosts=self._hosts_with_daemon_inventory(),
- get_daemons_func=self.cache.get_daemons_by_service,
- filter_new_host=matches_network if daemon_type == 'mon' else None,
- )
-
- hosts: List[HostPlacementSpec] = ha.place()
- self.log.debug('Usable hosts: %s' % hosts)
-
- r = None
-
- # sanity check
- if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
- self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
- return False
-
- # add any?
- did_config = False
-
- add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
- self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
-
- remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
- self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
-
- for host, network, name in add_daemon_hosts:
- daemon_id = self.get_unique_name(daemon_type, host, daemons,
- prefix=spec.service_id,
- forcename=name)
-
- if not did_config and config_func:
- if daemon_type == 'rgw':
- rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
- rgw_config_func(cast(RGWSpec, spec), daemon_id)
- else:
- config_func(spec)
- did_config = True
-
- daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
- host, daemon_id, network, spec)
- self.log.debug('Placing %s.%s on host %s' % (
- daemon_type, daemon_id, host))
-
- try:
- daemon_spec = self.cephadm_services[daemon_type].prepare_create(daemon_spec)
- self._create_daemon(daemon_spec)
- r = True
- except (RuntimeError, OrchestratorError) as e:
- self.events.for_service(spec, 'ERROR',
- f"Failed while placing {daemon_type}.{daemon_id}"
- "on {host}: {e}")
- # only return "no change" if no one else has already succeeded.
- # later successes will also change to True
- if r is None:
- r = False
- continue
-
- # add to daemon list so next name(s) will also be unique
- sd = orchestrator.DaemonDescription(
- hostname=host,
- daemon_type=daemon_type,
- daemon_id=daemon_id,
- )
- daemons.append(sd)
-
- # remove any?
- def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
- daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
- r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
- return not r.retval
-
- while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
- # let's find a subset that is ok-to-stop
- remove_daemon_hosts.pop()
- for d in remove_daemon_hosts:
- r = True
- # NOTE: we are passing the 'force' flag here, which means
- # we can delete a mon instances data.
- self._remove_daemon(d.name(), d.hostname)
-
- if r is None:
- r = False
- return r
-
def _check_pool_exists(self, pool, service_name):
logger.info(f'Checking pool "{pool}" exists for service {service_name}')
if not self.rados.pool_exists(pool):
import json
import logging
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set
try:
import remoto
remoto = None
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec
import orchestrator
+from cephadm.schedule import HostAssignment
from cephadm.utils import forall_hosts, cephadmNoImage, str_to_datetime
from orchestrator import OrchestratorError
name = '%s.%s' % (s.get('type'), s.get('id'))
if s.get('type') == 'rbd-mirror':
defaults = defaultdict(lambda: None, {'id': None})
- metadata = self.mgr.get_metadata("rbd-mirror", s.get('id'), default=defaults)
+ metadata = self.mgr.get_metadata(
+ "rbd-mirror", s.get('id'), default=defaults)
if metadata['id']:
name = '%s.%s' % (s.get('type'), metadata['id'])
else:
specs.append(spec)
for spec in specs:
try:
- if self.mgr._apply_service(spec):
+ if self._apply_service(spec):
r = True
except Exception as e:
self.log.exception('Failed to apply %s spec %s: %s' % (
self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
return r
+
+ def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
+ fn = {
+ 'mds': self.mgr.mds_service.config,
+ 'rgw': self.mgr.rgw_service.config,
+ 'nfs': self.mgr.nfs_service.config,
+ 'iscsi': self.mgr.iscsi_service.config,
+ }.get(service_type)
+ return cast(Callable[[ServiceSpec], None], fn)
+
+ def _apply_service(self, spec: ServiceSpec) -> bool:
+ """
+ Schedule a service. Deploy new daemons or remove old ones, depending
+ on the target label and count specified in the placement.
+ """
+ self.mgr.migration.verify_no_migration()
+
+ daemon_type = spec.service_type
+ service_name = spec.service_name()
+ if spec.unmanaged:
+ self.log.debug('Skipping unmanaged service %s' % service_name)
+ return False
+ if spec.preview_only:
+ self.log.debug('Skipping preview_only service %s' % service_name)
+ return False
+ self.log.debug('Applying service %s spec' % service_name)
+
+ config_func = self._config_fn(daemon_type)
+
+ if daemon_type == 'osd':
+ self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
+ # TODO: return True would result in a busy loop
+ # can't know if daemon count changed; create_from_spec doesn't
+ # return a solid indication
+ return False
+
+ daemons = self.mgr.cache.get_daemons_by_service(service_name)
+
+ public_network = None
+ if daemon_type == 'mon':
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'mon',
+ 'key': 'public_network',
+ })
+ if '/' in out:
+ public_network = out.strip()
+ self.log.debug('mon public_network is %s' % public_network)
+
+ def matches_network(host):
+ # type: (str) -> bool
+ if not public_network:
+ return False
+ # make sure we have 1 or more IPs for that network on that
+ # host
+ return len(self.mgr.cache.networks[host].get(public_network, [])) > 0
+
+ ha = HostAssignment(
+ spec=spec,
+ hosts=self.mgr._hosts_with_daemon_inventory(),
+ get_daemons_func=self.mgr.cache.get_daemons_by_service,
+ filter_new_host=matches_network if daemon_type == 'mon' else None,
+ )
+
+ hosts: List[HostPlacementSpec] = ha.place()
+ self.log.debug('Usable hosts: %s' % hosts)
+
+ r = None
+
+ # sanity check
+ if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
+ self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
+ return False
+
+ # add any?
+ did_config = False
+
+ add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
+ self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
+
+ remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
+ self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
+
+ for host, network, name in add_daemon_hosts:
+ daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
+ prefix=spec.service_id,
+ forcename=name)
+
+ if not did_config and config_func:
+ if daemon_type == 'rgw':
+ rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
+ rgw_config_func(cast(RGWSpec, spec), daemon_id)
+ else:
+ config_func(spec)
+ did_config = True
+
+ daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec(
+ host, daemon_id, network, spec)
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+
+ try:
+ daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec)
+ self.mgr._create_daemon(daemon_spec)
+ r = True
+ except (RuntimeError, OrchestratorError) as e:
+ self.mgr.events.for_service(spec, 'ERROR',
+ f"Failed while placing {daemon_type}.{daemon_id}"
+ "on {host}: {e}")
+ # only return "no change" if no one else has already succeeded.
+ # later successes will also change to True
+ if r is None:
+ r = False
+ continue
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ )
+ daemons.append(sd)
+
+ # remove any?
+ def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
+ daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
+ r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+ return not r.retval
+
+ while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
+ # let's find a subset that is ok-to-stop
+ remove_daemon_hosts.pop()
+ for d in remove_daemon_hosts:
+ r = True
+ # NOTE: we are passing the 'force' flag here, which means
+ # we can delete a mon instances data.
+ self.mgr._remove_daemon(d.name(), d.hostname)
+
+ if r is None:
+ r = False
+ return r
def test_mgr_update(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
assert r
assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- r = cephadm_module._apply_service(
+ r = CephadmServe(cephadm_module)._apply_service(
RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
assert r