]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cephadm: move context based getters to context_getters.py
authorJohn Mulligan <jmulligan@redhat.com>
Tue, 26 Sep 2023 17:26:23 +0000 (13:26 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Tue, 26 Sep 2023 17:31:26 +0000 (13:31 -0400)
Move functions that exist mainly to pull information out of the
CephadmContext in various ways to a new context_getters.py module.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/cephadm/cephadm.py
src/cephadm/cephadmlib/context_getters.py [new file with mode: 0644]
src/cephadm/tests/test_cephadm.py

index d3d00e6185155058bcca621ea9f9711df78e700b..4287ab6f547f5b82939808f15dc93577cb93fa20 100755 (executable)
@@ -81,6 +81,16 @@ from cephadmlib.constants import (
     UNIT_DIR,
 )
 from cephadmlib.context import CephadmContext
+from cephadmlib.context_getters import (
+    fetch_configs,
+    fetch_custom_config_files,
+    fetch_meta,
+    fetch_tcp_ports,
+    get_config_and_keyring,
+    get_parm,
+    read_configuration_source,
+    should_log_to_journald,
+)
 from cephadmlib.exceptions import (
     ClusterAlreadyExists,
     Error,
@@ -171,7 +181,6 @@ You can invoke cephadm in two ways:
 
        injected_stdin = '...'
 """
-cached_stdin = None
 
 
 ##################################
@@ -1912,13 +1921,6 @@ def get_legacy_daemon_fsid(ctx, cluster,
     return fsid
 
 
-def should_log_to_journald(ctx: CephadmContext) -> bool:
-    if ctx.log_to_journald is not None:
-        return ctx.log_to_journald
-    return isinstance(ctx.container_engine, Podman) and \
-        ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION
-
-
 def get_daemon_args(ctx: CephadmContext, ident: 'DaemonIdentity') -> List[str]:
     r = list()  # type: List[str]
 
@@ -2175,158 +2177,6 @@ def _write_custom_conf_files(
                     f.write(ccf['content'])
 
 
-def get_parm(option: str) -> Dict[str, str]:
-    js = _get_config_json(option)
-    # custom_config_files is a special field that may be in the config
-    # dict. It is used for mounting custom config files into daemon's containers
-    # and should be accessed through the "fetch_custom_config_files" function.
-    # For get_parm we need to discard it.
-    js.pop('custom_config_files', None)
-    return js
-
-
-def _get_config_json(option: str) -> Dict[str, Any]:
-    if not option:
-        return dict()
-
-    global cached_stdin
-    if option == '-':
-        if cached_stdin is not None:
-            j = cached_stdin
-        else:
-            j = sys.stdin.read()
-            cached_stdin = j
-    else:
-        # inline json string
-        if option[0] == '{' and option[-1] == '}':
-            j = option
-        # json file
-        elif os.path.exists(option):
-            with open(option, 'r') as f:
-                j = f.read()
-        else:
-            raise Error('Config file {} not found'.format(option))
-
-    try:
-        js = json.loads(j)
-    except ValueError as e:
-        raise Error('Invalid JSON in {}: {}'.format(option, e))
-    else:
-        return js
-
-
-def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
-    """Return a dict containing metadata about a deployment.
-    """
-    meta = getattr(ctx, 'meta_properties', None)
-    if meta is not None:
-        return meta
-    mjson = getattr(ctx, 'meta_json', None)
-    if mjson is not None:
-        meta = json.loads(mjson) or {}
-        ctx.meta_properties = meta
-        return meta
-    return {}
-
-
-def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
-    """Return a dict containing arbitrary configuration parameters.
-    This function filters out the key 'custom_config_files' which
-    must not be part of a deployment's configuration key-value pairs.
-    To access custom configuration file data, use `fetch_custom_config_files`.
-    """
-    # ctx.config_blobs is *always* a dict. it is created once when
-    # a command is parsed/processed and stored "forever"
-    cfg_blobs = getattr(ctx, 'config_blobs', None)
-    if cfg_blobs:
-        cfg_blobs = dict(cfg_blobs)
-        cfg_blobs.pop('custom_config_files', None)
-        return cfg_blobs
-    # ctx.config_json is the legacy equivalent of config_blobs. it is a
-    # string that either contains json or refers to a file name where
-    # the file contains json.
-    cfg_json = getattr(ctx, 'config_json', None)
-    if cfg_json:
-        jdata = _get_config_json(cfg_json) or {}
-        jdata.pop('custom_config_files', None)
-        return jdata
-    return {}
-
-
-def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
-    """Return a list containing dicts that can be used to populate
-    custom configuration files for containers.
-    """
-    # NOTE: this function works like the opposite of fetch_configs.
-    # instead of filtering out custom_config_files, it returns only
-    # the content in that key.
-    cfg_blobs = getattr(ctx, 'config_blobs', None)
-    if cfg_blobs:
-        return cfg_blobs.get('custom_config_files', [])
-    cfg_json = getattr(ctx, 'config_json', None)
-    if cfg_json:
-        jdata = _get_config_json(cfg_json)
-        return jdata.get('custom_config_files', [])
-    return []
-
-
-def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
-    """Return a list of Endpoints, which have a port and ip attribute
-    """
-    ports = getattr(ctx, 'tcp_ports', None)
-    if ports is None:
-        ports = []
-    if isinstance(ports, str):
-        ports = list(map(int, ports.split()))
-    port_ips: Dict[str, str] = {}
-    port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
-    if isinstance(port_ips_attr, str):
-        port_ips = json.loads(port_ips_attr)
-    elif port_ips_attr is not None:
-        # if it's not None or a str, assume it's already the dict we want
-        port_ips = port_ips_attr
-
-    endpoints: List[EndPoint] = []
-    for port in ports:
-        if str(port) in port_ips:
-            endpoints.append(EndPoint(port_ips[str(port)], port))
-        else:
-            endpoints.append(EndPoint('0.0.0.0', port))
-
-    return endpoints
-
-
-def get_config_and_keyring(ctx):
-    # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
-    config = None
-    keyring = None
-
-    d = fetch_configs(ctx)
-    if d:
-        config = d.get('config')
-        keyring = d.get('keyring')
-        if config and keyring:
-            return config, keyring
-
-    if 'config' in ctx and ctx.config:
-        try:
-            with open(ctx.config, 'r') as f:
-                config = f.read()
-        except FileNotFoundError as e:
-            raise Error(e)
-
-    if 'key' in ctx and ctx.key:
-        keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key)
-    elif 'keyring' in ctx and ctx.keyring:
-        try:
-            with open(ctx.keyring, 'r') as f:
-                keyring = f.read()
-        except FileNotFoundError as e:
-            raise Error(e)
-
-    return config, keyring
-
-
 def get_container_binds(
     ctx: CephadmContext, ident: 'DaemonIdentity'
 ) -> List[List[str]]:
@@ -5403,19 +5253,6 @@ def command_deploy(ctx):
     _common_deploy(ctx)
 
 
-def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]:
-    """Read a JSON configuration based on the `ctx.source` value."""
-    source = '-'
-    if 'source' in ctx and ctx.source:
-        source = ctx.source
-    if source == '-':
-        config_data = json.load(sys.stdin)
-    else:
-        with open(source, 'rb') as fh:
-            config_data = json.load(fh)
-    return config_data
-
-
 def apply_deploy_config_to_ctx(
     config_data: Dict[str, Any],
     ctx: CephadmContext,
diff --git a/src/cephadm/cephadmlib/context_getters.py b/src/cephadm/cephadmlib/context_getters.py
new file mode 100644 (file)
index 0000000..e40319d
--- /dev/null
@@ -0,0 +1,188 @@
+# context_getters.py - extract configuration/values from context objects
+
+import json
+import sys
+import os
+
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from .constants import CGROUPS_SPLIT_PODMAN_VERSION
+from .container_engines import Podman
+from .context import CephadmContext
+from .exceptions import Error
+from .net_utils import EndPoint
+
+
+cached_stdin = None
+
+
+def _get_config_json(option: str) -> Dict[str, Any]:
+    if not option:
+        return dict()
+
+    global cached_stdin
+    if option == '-':
+        if cached_stdin is not None:
+            j = cached_stdin
+        else:
+            j = sys.stdin.read()
+            cached_stdin = j
+    else:
+        # inline json string
+        if option[0] == '{' and option[-1] == '}':
+            j = option
+        # json file
+        elif os.path.exists(option):
+            with open(option, 'r') as f:
+                j = f.read()
+        else:
+            raise Error('Config file {} not found'.format(option))
+
+    try:
+        js = json.loads(j)
+    except ValueError as e:
+        raise Error('Invalid JSON in {}: {}'.format(option, e))
+    else:
+        return js
+
+
+def get_parm(option: str) -> Dict[str, str]:
+    js = _get_config_json(option)
+    # custom_config_files is a special field that may be in the config
+    # dict. It is used for mounting custom config files into daemon's containers
+    # and should be accessed through the "fetch_custom_config_files" function.
+    # For get_parm we need to discard it.
+    js.pop('custom_config_files', None)
+    return js
+
+
+def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
+    """Return a dict containing metadata about a deployment.
+    """
+    meta = getattr(ctx, 'meta_properties', None)
+    if meta is not None:
+        return meta
+    mjson = getattr(ctx, 'meta_json', None)
+    if mjson is not None:
+        meta = json.loads(mjson) or {}
+        ctx.meta_properties = meta
+        return meta
+    return {}
+
+
+def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
+    """Return a dict containing arbitrary configuration parameters.
+    This function filters out the key 'custom_config_files' which
+    must not be part of a deployment's configuration key-value pairs.
+    To access custom configuration file data, use `fetch_custom_config_files`.
+    """
+    # ctx.config_blobs is *always* a dict. it is created once when
+    # a command is parsed/processed and stored "forever"
+    cfg_blobs = getattr(ctx, 'config_blobs', None)
+    if cfg_blobs:
+        cfg_blobs = dict(cfg_blobs)
+        cfg_blobs.pop('custom_config_files', None)
+        return cfg_blobs
+    # ctx.config_json is the legacy equivalent of config_blobs. it is a
+    # string that either contains json or refers to a file name where
+    # the file contains json.
+    cfg_json = getattr(ctx, 'config_json', None)
+    if cfg_json:
+        jdata = _get_config_json(cfg_json) or {}
+        jdata.pop('custom_config_files', None)
+        return jdata
+    return {}
+
+
+def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
+    """Return a list containing dicts that can be used to populate
+    custom configuration files for containers.
+    """
+    # NOTE: this function works like the opposite of fetch_configs.
+    # instead of filtering out custom_config_files, it returns only
+    # the content in that key.
+    cfg_blobs = getattr(ctx, 'config_blobs', None)
+    if cfg_blobs:
+        return cfg_blobs.get('custom_config_files', [])
+    cfg_json = getattr(ctx, 'config_json', None)
+    if cfg_json:
+        jdata = _get_config_json(cfg_json)
+        return jdata.get('custom_config_files', [])
+    return []
+
+
+def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
+    """Return a list of Endpoints, which have a port and ip attribute
+    """
+    ports = getattr(ctx, 'tcp_ports', None)
+    if ports is None:
+        ports = []
+    if isinstance(ports, str):
+        ports = list(map(int, ports.split()))
+    port_ips: Dict[str, str] = {}
+    port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+    if isinstance(port_ips_attr, str):
+        port_ips = json.loads(port_ips_attr)
+    elif port_ips_attr is not None:
+        # if it's not None or a str, assume it's already the dict we want
+        port_ips = port_ips_attr
+
+    endpoints: List[EndPoint] = []
+    for port in ports:
+        if str(port) in port_ips:
+            endpoints.append(EndPoint(port_ips[str(port)], port))
+        else:
+            endpoints.append(EndPoint('0.0.0.0', port))
+
+    return endpoints
+
+
+def get_config_and_keyring(ctx):
+    # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
+    config = None
+    keyring = None
+
+    d = fetch_configs(ctx)
+    if d:
+        config = d.get('config')
+        keyring = d.get('keyring')
+        if config and keyring:
+            return config, keyring
+
+    if 'config' in ctx and ctx.config:
+        try:
+            with open(ctx.config, 'r') as f:
+                config = f.read()
+        except FileNotFoundError as e:
+            raise Error(e)
+
+    if 'key' in ctx and ctx.key:
+        keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key)
+    elif 'keyring' in ctx and ctx.keyring:
+        try:
+            with open(ctx.keyring, 'r') as f:
+                keyring = f.read()
+        except FileNotFoundError as e:
+            raise Error(e)
+
+    return config, keyring
+
+
+def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]:
+    """Read a JSON configuration based on the `ctx.source` value."""
+    source = '-'
+    if 'source' in ctx and ctx.source:
+        source = ctx.source
+    if source == '-':
+        config_data = json.load(sys.stdin)
+    else:
+        with open(source, 'rb') as fh:
+            config_data = json.load(fh)
+    return config_data
+
+
+def should_log_to_journald(ctx: CephadmContext) -> bool:
+    if ctx.log_to_journald is not None:
+        return ctx.log_to_journald
+    return isinstance(ctx.container_engine, Podman) and \
+        ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION
index 909185a5a4f67b6a54db977aae637f703d2307a3..565111903ea1d538ae03115e40d4c3d8c9b2ee7c 100644 (file)
@@ -367,19 +367,21 @@ class TestCephAdm(object):
     @mock.patch('cephadm.logger')
     @mock.patch('cephadm.FileLock')
     @mock.patch('cephadm.deploy_daemon')
-    @mock.patch('cephadm.fetch_configs')
     @mock.patch('cephadm.make_var_run')
     @mock.patch('cephadm.migrate_sysctl_dir')
     @mock.patch('cephadm.check_unit', lambda *args, **kwargs: (None, 'running', None))
     @mock.patch('cephadm.get_unit_name', lambda *args, **kwargs: 'mon-unit-name')
     @mock.patch('cephadm.extract_uid_gid', lambda *args, **kwargs: (0, 0))
     @mock.patch('cephadm.get_deployment_container')
-    @mock.patch('cephadm.read_configuration_source', lambda c: {})
     @mock.patch('cephadm.apply_deploy_config_to_ctx', lambda d, c: None)
-    def test_mon_crush_location(self, _get_deployment_container, _migrate_sysctl, _make_var_run, _fetch_configs, _deploy_daemon, _file_lock, _logger):
+    def test_mon_crush_location(self, _get_deployment_container, _migrate_sysctl, _make_var_run, _deploy_daemon, _file_lock, _logger, monkeypatch):
         """
         test that crush location for mon is set if it is included in config_json
         """
+        _fetch_configs = mock.MagicMock()
+        monkeypatch.setattr('cephadmlib.context_getters.fetch_configs', _fetch_configs)
+        monkeypatch.setattr('cephadm.fetch_configs', _fetch_configs)
+        monkeypatch.setattr('cephadm.read_configuration_source', lambda c: {})
 
         ctx = _cephadm.CephadmContext()
         ctx.name = 'mon.test'