"""
return not self.is_persistent
+
def _hide_in_features(f):
f._hide_in_features = True
return f
+
class Orchestrator(object):
"""
Calls in this class may do long running remote operations, with time
... except (OrchestratorError, NotImplementedError):
... ...
-
:returns: Dict of API method names to ``{'available': True or False}``
"""
module = self.__class__
"""
raise NotImplementedError()
+
class UpgradeSpec(object):
# Request to orchestrator to initiate an upgrade to a particular
# version of Ceph
self[key] = OutdatableData(self[key].data,
datetime.datetime.fromtimestamp(0))
+
class OutdatablePersistentDict(OutdatableDictMixin, PersistentStoreDict):
pass
+
class OutdatableDict(OutdatableDictMixin, dict):
pass
_read_cli = _cli_command('r')
_write_cli = _cli_command('rw')
+
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
MODULE_OPTIONS = [
{'name': 'orchestrator'}
def _get_device_locations(self, dev_id):
# type: (str) -> List[orchestrator.DeviceLightLoc]
locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id]
- return [orchestrator.DeviceLightLoc(**l) for l in sum(locs, [])]
+ return [orchestrator.DeviceLightLoc(**l) for l in sum(locs, [])]
@_read_cli(prefix='device ls-lights',
desc='List currently active device indicator lights')
remoto = None
remoto_import_error = str(e)
+try:
+ from typing import List
+except ImportError:
+ pass
+
logger = logging.getLogger(__name__)
DEFAULT_SSH_CONFIG = ('Host *\n'
# - bring over some of the protections from ceph-deploy that guard against
# multiple bootstrapping / initialization
+
class SSHCompletionmMixin(object):
def __init__(self, result):
if isinstance(result, multiprocessing.pool.AsyncResult):
def result(self):
return list(map(lambda r: r.get(), self._result))
+
class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
@property
def is_complete(self):
return True
return False
+
class SSHWriteCompletionReady(SSHWriteCompletion):
def __init__(self, result):
orchestrator.WriteCompletion.__init__(self)
def is_errored(self):
return False
+
def log_exceptions(f):
if six.PY3:
return f
self.log.debug('_service_action code %s out %s' % (code, out))
return "{} {} from host '{}'".format(action, name, host)
-
def get_inventory(self, node_filter=None, refresh=False):
"""
Return the storage inventory of nodes matching the given filter.
def run(host, host_info):
# type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
-
if host_info.outdated(self.inventory_cache_timeout) or refresh:
self.log.info("refresh stale inventory for '{}'".format(host))
out, code = self._run_ceph_daemon(
@log_exceptions
def blink_device_light(self, ident_fault, on, locs):
- def blink(host, dev, ident_fault, on):
+ # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> SSHWriteCompletion
+
+ def blink(host, dev, ident_fault_, on_):
+ # type: (str, str, str, bool) -> str
cmd = [
'lsmcli',
'local-disk-%s-led-%s' % (
- ident_fault,
- 'on' if on else 'off'),
+ ident_fault_,
+ 'on' if on_ else 'off'),
'--path', '/dev/' + dev,
]
out, code = self._run_ceph_daemon(host, 'osd', 'shell', ['--'] + cmd,
if code:
raise RuntimeError(
'Unable to affect %s light for %s:%s. Command: %s' % (
- ident_fault, host, dev, ' '.join(cmd)))
+ ident_fault_, host, dev, ' '.join(cmd)))
return "Set %s light for %s:%s %s" % (
- ident_fault, host, dev, 'on' if on else 'off')
+ ident_fault_, host, dev, 'on' if on_ else 'off')
results = []
for loc in locs: