from ceph.deployment.service_spec import \
NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
CustomContainerSpec
+from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
from mgr_module import MgrModule, HandleCommandResult
A command handler will typically change the declarative state
of cephadm. This loop will then attempt to apply this new state.
"""
- self.log.debug("serve starting")
- while self.run:
-
- try:
-
- self.convert_tags_to_repo_digest()
-
- # refresh daemons
- self.log.debug('refreshing hosts and daemons')
- self._refresh_hosts_and_daemons()
-
- self._check_for_strays()
-
- self._update_paused_health()
-
- if not self.paused:
- self.rm_util.process_removal_queue()
-
- self.migration.migrate()
- if self.migration.is_migration_ongoing():
- continue
-
- if self._apply_all_services():
- continue # did something, refresh
-
- self._check_daemons()
-
- if self.upgrade.continue_upgrade():
- continue
-
- except OrchestratorError as e:
- if e.event_subject:
- self.events.from_orch_error(e)
-
- self._serve_sleep()
- self.log.debug("serve exit")
+ serve = CephadmServe(self)
+ serve.serve()
def convert_tags_to_repo_digest(self):
if not self.use_repo_digest:
import logging
from typing import TYPE_CHECKING
+from orchestrator import OrchestratorError
+
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ self.log = logger
+
+ def serve(self) -> None:
+ """
+ The main loop of cephadm.
+
+ A command handler will typically change the declarative state
+ of cephadm. This loop will then attempt to apply this new state.
+ """
+ self.log.debug("serve starting")
+ while self.mgr.run:
+
+ try:
+
+ self.mgr.convert_tags_to_repo_digest()
+
+ # refresh daemons
+ self.log.debug('refreshing hosts and daemons')
+ self.mgr._refresh_hosts_and_daemons()
+
+ self.mgr._check_for_strays()
+
+ self.mgr._update_paused_health()
+
+ if not self.mgr.paused:
+ self.mgr.rm_util.process_removal_queue()
+
+ self.mgr.migration.migrate()
+ if self.mgr.migration.is_migration_ongoing():
+ continue
+
+ if self.mgr._apply_all_services():
+ continue # did something, refresh
+
+ self.mgr._check_daemons()
+
+ if self.mgr.upgrade.continue_upgrade():
+ continue
+
+ except OrchestratorError as e:
+ if e.event_subject:
+ self.mgr.events.from_orch_error(e)
+
+ self.mgr._serve_sleep()
+ self.log.debug("serve exit")