import logging
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set
-import six
-
import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
def load(self):
# type: () -> None
- for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
+ for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
service_name = k[len(SPEC_STORE_PREFIX):]
try:
v = json.loads(v)
def load(self):
# type: () -> None
- for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
+ for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
host = k[len(HOST_CACHE_PREFIX):]
if host not in self.mgr.inventory:
self.mgr.log.warning('removing stray HostCache host record %s' % (