@classmethod
def for_daemon_type(cls, daemon_type: str) -> bool:
# TODO: figure out a way to un-special-case osd
- return daemon_type in cls._daemons and daemon_type not in ('osd', 'crash')
+ return daemon_type in cls._daemons and daemon_type not in (
+ 'osd',
+ 'crash',
+ )
def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None:
self.ctx = ctx
def osd_fsid(self) -> Optional[str]:
return self._osd_fsid
+
@register_daemon_form
class Crash(Ceph):
@classmethod
def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Ceph':
(uid, gid) = extract_uid_gid(ctx)
data_dir_base = f'{ctx.data_dir}/{ident.fsid}'
- makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE)
- makedirs(os.path.join(data_dir_base, 'crash', 'posted'), uid, gid,
- DATA_DIR_MODE)
+ makedirs(
+ os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE
+ )
+ makedirs(
+ os.path.join(data_dir_base, 'crash', 'posted'),
+ uid,
+ gid,
+ DATA_DIR_MODE,
+ )
return cls(ctx, ident)
+
@register_daemon_form
class CephExporter(ContainerDaemonForm):
"""Defines a Ceph exporter container"""
}
_os_path_isdir = mock.MagicMock(return_value=True)
monkeypatch.setattr('os.path.isdir', _os_path_isdir)
+ monkeypatch.setattr('cephadmlib.daemons.ceph.extract_uid_gid',
+ lambda ctx: (167, 167))
+ monkeypatch.setattr('os.chown', lambda *args, **kwargs: None)
dtypes = _cephadm.get_supported_daemons()
for daemon_type in dtypes:
if daemon_type == 'agent':