import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
+from cephadm.utils import str_to_datetime, datetime_to_str
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
if TYPE_CHECKING:
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class Inventory:
try:
v = json.loads(v)
spec = ServiceSpec.from_json(v['spec'])
- created = datetime.datetime.strptime(v['created'], DATEFMT)
+ created = str_to_datetime(v['created'])
self.specs[service_name] = spec
self.spec_created[service_name] = created
self.mgr.log.debug('SpecStore: loaded spec for %s' % (
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
'spec': spec.to_json(),
- 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
+ 'created': datetime_to_str(self.spec_created[spec.service_name()]),
}, sort_keys=True),
)
self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
try:
j = json.loads(v)
if 'last_device_update' in j:
- self.last_device_update[host] = datetime.datetime.strptime(
- j['last_device_update'], DATEFMT)
+ self.last_device_update[host] = str_to_datetime(j['last_device_update'])
else:
self.device_refresh_queue.append(host)
# for services, we ignore the persisted last_*_update
for name, d in j.get('daemon_config_deps', {}).items():
self.daemon_config_deps[host][name] = {
'deps': d.get('deps', []),
- 'last_config': datetime.datetime.strptime(
- d['last_config'], DATEFMT),
+ 'last_config': str_to_datetime(d['last_config']),
}
if 'last_host_check' in j:
- self.last_host_check[host] = datetime.datetime.strptime(
- j['last_host_check'], DATEFMT)
+ self.last_host_check[host] = str_to_datetime(j['last_host_check'])
if 'last_etc_ceph_ceph_conf' in j:
- self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
- j['last_etc_ceph_ceph_conf'], DATEFMT)
+ self.last_etc_ceph_ceph_conf[host] = str_to_datetime(
+ j['last_etc_ceph_ceph_conf'])
self.registry_login_queue.add(host)
self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
'daemon_config_deps': {},
}
if host in self.last_daemon_update:
- j['last_daemon_update'] = self.last_daemon_update[host].strftime(
- DATEFMT)
+ j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
if host in self.last_device_update:
- j['last_device_update'] = self.last_device_update[host].strftime(
- DATEFMT)
+ j['last_device_update'] = datetime_to_str(self.last_device_update[host])
for name, dd in self.daemons[host].items():
j['daemons'][name] = dd.to_json()
for d in self.devices[host]:
for name, depi in self.daemon_config_deps[host].items():
j['daemon_config_deps'][name] = {
'deps': depi.get('deps', []),
- 'last_config': depi['last_config'].strftime(DATEFMT),
+ 'last_config': datetime_to_str(depi['last_config']),
}
if self.osdspec_previews[host]:
j['osdspec_previews'] = self.osdspec_previews[host]
if host in self.last_host_check:
- j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
+ j['last_host_check'] = datetime_to_str(self.last_host_check[host])
if host in self.last_etc_ceph_ceph_conf:
- j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+ j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
if self.scheduled_daemon_actions.get(host, {}):
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
-from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, is_repo_digest
+from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, str_to_datetime, is_repo_digest
try:
import remoto
ConnectTimeout=30
"""
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
for k in ['created', 'started', 'last_configured', 'last_deployed']:
v = d.get(k, None)
if v:
- setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT))
+ setattr(sd, k, str_to_datetime(d[k]))
sd.daemon_type = d['name'].split('.')[0]
sd.daemon_id = '.'.join(d['name'].split('.')[1:])
sd.hostname = host
})
return config
-
def _invalidate_daemons_and_kick_serve(self, filter_host=None):
if filter_host:
self.cache.invalidate_host_daemons(filter_host)
from datetime import datetime
import orchestrator
-from cephadm.utils import forall_hosts
+from cephadm.utils import forall_hosts, datetime_to_str, str_to_datetime
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
from cephadm.services.cephadmservice import CephadmDaemonSpec, CephService
logger = logging.getLogger(__name__)
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class OSDService(CephService):
return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
def to_json(self) -> dict:
- out = dict()
+ out: Dict[str, Any] = dict()
out['osd_id'] = self.osd_id
out['started'] = self.started
out['draining'] = self.draining
for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if getattr(self, k):
- out[k] = getattr(self, k).strftime(DATEFMT)
+ out[k] = datetime_to_str(getattr(self, k))
else:
out[k] = getattr(self, k)
return out
return None
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if inp.get(date_field):
- inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)})
+ inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
inp.update({'remove_util': ctx})
if 'nodename' in inp:
hostname = inp.pop('nodename')