import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
-from orchestrator import OrchestratorError, HostSpec
+from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
if TYPE_CHECKING:
from .module import CephadmOrchestrator
"""
return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
for h in self.get_hosts())
+
+
+class EventStore():
+ def __init__(self, mgr):
+ # type: (CephadmOrchestrator) -> None
+ self.mgr: CephadmOrchestrator = mgr
+ self.events = {} # type: Dict[str, List[OrchestratorEvent]]
+
+ def add(self, event: OrchestratorEvent) -> None:
+
+ if event.kind_subject() not in self.events:
+ self.events[event.kind_subject()] = [event]
+
+ for e in self.events[event.kind_subject()]:
+ if e.message == event.message:
+ return
+
+ self.events[event.kind_subject()].append(event)
+
+ # limit to five events for now.
+ self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
+
+ def for_service(self, spec: ServiceSpec, level, message) -> None:
+ e = OrchestratorEvent(datetime.datetime.utcnow(), 'service', spec.service_name(), level, message)
+ self.add(e)
+
+ def for_daemon(self, daemon_name, level, message):
+ e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+ self.add(e)
+
+ def cleanup(self) -> None:
+ # Needs to be properly done, in case events are persistently stored.
+
+ unknowns: List[str] = []
+ daemons = self.mgr.cache.get_daemon_names()
+ specs = self.mgr.spec_store.specs.keys()
+ for k_s, v in self.events.keys():
+ kind, subject = k_s.split(':')
+ if kind == 'service':
+ if subject not in specs:
+ unknowns.append(k_s)
+ elif kind == 'daemon':
+ if subject not in daemons:
+ unknowns.append(k_s)
+
+ for k_s in unknowns:
+ del self.events[k_s]
+
+ def get_for_service(self, name):
+ return self.events.get('service:' + name, [])
+
+ def get_for_daemon(self, name):
+ return self.events.get('daemon:' + name, [])
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from .schedule import HostAssignment, HostPlacementSpec
-from .inventory import Inventory, SpecStore, HostCache
+from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
if h not in self.inventory:
self.cache.rm_host(h)
+
# in-memory only.
+ self.events = EventStore(self)
self.offline_hosts: Set[str] = set()
self.migration = Migrations(self)