Podman,
check_container_engine,
find_container_engine,
+ registry_login,
)
from cephadmlib.data_utils import (
dict_get,
return 0
-def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None:
- try:
- engine = ctx.container_engine
- cmd = [engine.path, 'login',
- '-u', username, '-p', password,
- url]
- if isinstance(engine, Podman):
- cmd.append('--authfile=/etc/ceph/podman-auth.json')
- out, _, _ = call_throws(ctx, cmd)
- if isinstance(engine, Podman):
- os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE)
- except Exception:
- raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
-
##################################
# container_engines.py - container engine types and selection funcs
+import os
from typing import Tuple, List, Optional
from .call_wrappers import call_throws, CallVerbosity
from .context import CephadmContext
from .container_engine_base import ContainerEngine
-from .constants import MIN_PODMAN_VERSION
+from .constants import DEFAULT_MODE, MIN_PODMAN_VERSION
from .exceptions import Error
return to_int(val[0:-1], org_e or e)
return tuple(map(to_int, version_str.split('.')))
+
+
+def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None:
+ try:
+ engine = ctx.container_engine
+ cmd = [engine.path, 'login',
+ '-u', username, '-p', password,
+ url]
+ if isinstance(engine, Podman):
+ cmd.append('--authfile=/etc/ceph/podman-auth.json')
+ out, _, _ = call_throws(ctx, cmd)
+ if isinstance(engine, Podman):
+ os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE)
+ except Exception:
+ raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
with pytest.raises(FileNotFoundError):
open(os.path.join(_cephadm.DATA_DIR, 'fsid', 'custom_config_files', 'mon.host1', 'no-content.conf'), 'r')
- @mock.patch('cephadm.call_throws')
+ @mock.patch('cephadmlib.container_engines.call_throws')
@mock.patch('cephadm.get_parm')
@mock.patch('cephadm.logger')
def test_registry_login(self, _logger, _get_parm, _call_throws):