def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
- """Return a dict containing metadata about a deployment.
- """
+ """Return a dict containing metadata about a deployment."""
meta = getattr(ctx, 'meta_properties', None)
if meta is not None:
return meta
def fetch_endpoints(ctx: CephadmContext) -> List[EndPoint]:
- """Return a list of Endpoints, which have a port and ip attribute
- """
+ """Return a list of Endpoints, which have a port and ip attribute"""
ports = getattr(ctx, 'tcp_ports', None)
if ports is None:
ports = []
if isinstance(ports, str):
ports = list(map(int, ports.split()))
port_ips: Dict[str, str] = {}
- port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+ port_ips_attr: Union[str, Dict[str, str], None] = getattr(
+ ctx, 'port_ips', None
+ )
if isinstance(port_ips_attr, str):
port_ips = json.loads(port_ips_attr)
elif port_ips_attr is not None:
def should_log_to_journald(ctx: CephadmContext) -> bool:
if ctx.log_to_journald is not None:
return ctx.log_to_journald
- return isinstance(ctx.container_engine, Podman) and \
- ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION
+ return (
+ isinstance(ctx.container_engine, Podman)
+ and ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION
+ )