on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
):
- self._on_complete = on_complete
+ self._on_complete_ = on_complete
self._name = name
self._next_promise = None # type: Optional[_Promise]
# T instead of (T -> r) -> r. Therefore we need to store the first promise here.
self._first_promise = _first_promise or self # type: '_Promise'
+ @property
+ def _on_complete(self):
+ # type: () -> Optional[Callable]
+ # https://github.com/python/mypy/issues/4125
+ return self._on_complete_
+
+ @_on_complete.setter
+ def _on_complete(self, val):
+ # type: (Optional[Callable]) -> None
+ self._on_complete_ = val
+
+
def __repr__(self):
name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
class AsyncCompletion(orchestrator.Completion):
def __init__(self,
- _first_promise=None, # type: Optional["Completion"]
+ _first_promise=None, # type: Optional[orchestrator.Completion]
value=orchestrator._Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable],
- name=None, # type: Optional[str],
+ on_complete=None, # type: Optional[Callable]
+ name=None, # type: Optional[str]
many=False, # type: bool
):
@property
def _progress_reference(self):
- if hasattr(self.__on_complete, 'progress_id'):
- return self.__on_complete
+ # type: () -> Optional[orchestrator.ProgressReference]
+ if hasattr(self._on_complete_, 'progress_id'): # type: ignore
+ return self._on_complete_ # type: ignore
return None
@property
def _on_complete(self):
# type: () -> Optional[Callable]
- if self.__on_complete is None:
+ if self._on_complete_ is None:
return None
def callback(result):
try:
- self.__on_complete = None
+ self._on_complete_ = None
self._finalize(result)
except Exception as e:
self.fail(e)
self.fail(e)
if six.PY3:
- _callback = self.__on_complete
+ _callback = self._on_complete_
else:
def _callback(*args, **kwargs):
# Py2 only: _worker_pool doesn't call error_callback
try:
- return self.__on_complete(*args, **kwargs)
+ return self._on_complete_(*args, **kwargs)
except Exception as e:
self.fail(e)
def run(value):
+ assert SSHOrchestrator.instance
if self.many:
if not value:
logger.info('calling map_async without values')
@_on_complete.setter
def _on_complete(self, inner):
# type: (Callable) -> None
- self.__on_complete = inner
+ self._on_complete_ = inner
def ssh_completion(cls=AsyncCompletion, **c_kwargs):
instance = None
- NATIVE_OPTIONS = []
+ NATIVE_OPTIONS = [] # type: List[Any]
MODULE_OPTIONS = [
{
'name': 'ssh_config_file',
self.log.error('ssh: shutdown')
self._worker_pool.close()
self._worker_pool.join()
- self._worker_pool = None
def config_notify(self):
"""
"""
for opt in self.MODULE_OPTIONS:
setattr(self,
- opt['name'],
- self.get_module_option(opt['name']) or opt['default'])
+ opt['name'], # type: ignore
+ self.get_module_option(opt['name']) or opt['default']) # type: ignore
self.log.debug(' mgr option %s = %s',
- opt['name'], getattr(self, opt['name']))
+ opt['name'], getattr(self, opt['name'])) # type: ignore
for opt in self.NATIVE_OPTIONS:
setattr(self,
- opt,
+ opt, # type: ignore
self.get_ceph_option(opt))
- self.log.debug(' native option %s = %s', opt, getattr(self, opt))
+ self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
def get_unique_name(self, existing, prefix=None, forcename=None):
"""
Generate a unique random service name
"""
if forcename:
- if len([d for d in existing if d.service_instance == name]):
- raise RuntimeError('specified name %s already in use', name)
+ if len([d for d in existing if d.service_instance == forcename]):
+ raise RuntimeError('specified name %s already in use', forcename)
return forcename
while True:
self.set_store('inventory', json.dumps(self.inventory))
def _reconfig_ssh(self):
- temp_files = []
- ssh_options = []
+ temp_files = [] # type: list
+ ssh_options = [] # type: List[str]
# ssh_config
ssh_config_fname = self.ssh_config_file
self._temp_files = temp_files
if ssh_options:
- self._ssh_options = ' '.join(ssh_options)
+ self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
else:
self._ssh_options = None
self.log.info('ssh_options %s' % ssh_options)
'--format', 'json',
])
self.log.debug('code %s out %s' % (code, out))
- j = json.loads('\n'.join(out))
+ osds_elems = json.loads('\n'.join(out))
fsid = self._cluster_fsid
- for osd_id, osds in j.items():
+ for osd_id, osds in osds_elems.items():
for osd in osds:
if osd['tags']['ceph.cluster_fsid'] != fsid:
self.log.debug('mismatched fsid, skipping %s' % osd)