def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
raise NotImplementedError()
- def _inventory_get_addr(self, hostname: str):
+ def _inventory_get_addr(self, hostname: str) -> str:
"""Get a host's address with its hostname."""
return self.mgr.inventory.get_addr(hostname)
'who': 'mon',
'key': 'public_network',
})
- network = network.strip() # type: ignore
+ network = network.strip() if network else network
if not network:
raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
if '/' not in network:
return self.mgr._create_daemon(daemon_spec)
- def _check_safe_to_destroy(self, mon_id):
- # type: (str) -> None
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'quorum_status',
})
self.daemon_id = daemon_id
self.spec = spec
- def get_daemon_name(self):
- # type: () -> str
+ def get_daemon_name(self) -> str:
return '%s.%s' % (self.spec.service_type, self.daemon_id)
- def get_rados_user(self):
- # type: () -> str
+ def get_rados_user(self) -> str:
return '%s.%s' % (self.spec.service_type, self.daemon_id)
- def get_keyring_entity(self):
- # type: () -> str
+ def get_keyring_entity(self) -> str:
return utils.name_to_config_section(self.get_rados_user())
- def get_or_create_keyring(self, entity=None):
- # type: (Optional[str]) -> str
+ def get_or_create_keyring(self, entity: Optional[str] = None) -> str:
if not entity:
entity = self.get_keyring_entity()
% (entity, ret, err))
return keyring
- def update_keyring_caps(self, entity=None):
- # type: (Optional[str]) -> None
+ def update_keyring_caps(self, entity: Optional[str] = None) -> None:
if not entity:
entity = self.get_keyring_entity()
'Unable to update keyring caps %s: %s %s' \
% (entity, ret, err))
- def create_rados_config_obj(self, clobber=False):
- # type: (Optional[bool]) -> None
+ def create_rados_config_obj(self, clobber: Optional[bool] = False) -> None:
obj = self.spec.rados_config_name()
with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx:
logger.info('Creating rados config object: %s' % obj)
ioctx.write_full(obj, ''.encode('utf-8'))
- def get_ganesha_conf(self):
- # type: () -> str
+ def get_ganesha_conf(self) -> str:
context = dict(user=self.get_rados_user(),
nodeid=self.get_daemon_name(),
pool=self.spec.pool,
url=self.spec.rados_config_location())
return self.mgr.template.render('services/nfs/ganesha.conf.j2', context)
- def get_cephadm_config(self):
- # type: () -> Dict
+ def get_cephadm_config(self) -> Dict[str, Any]:
config = {'pool' : self.spec.pool} # type: Dict
if self.spec.namespace:
config['namespace'] = self.spec.namespace