"password":"REGISTRY_PASSWORD"
}
+For multiple registry logins, refer to the format below::
+
+ {
+ "registry_credentials": [
+ {
+ "url": "REGISTRY_URL1",
+ "username": "REGISTRY_USERNAME1",
+ "password": "REGISTRY_PASSWORD1"
+ },
+ {
+ "url": "REGISTRY_URL2",
+ "username": "REGISTRY_USERNAME2",
+ "password": "REGISTRY_PASSWORD2"
+ }
+ ]
+ }
+
and turn it in with command::
cephadm registry-login --registry-json [JSON FILE]
if ctx.registry_json:
logger.info('Pulling custom registry login info from %s.' % ctx.registry_json)
d = get_parm(ctx.registry_json)
- if d.get('url') and d.get('username') and d.get('password'):
- ctx.registry_url = d.get('url')
- ctx.registry_username = d.get('username')
- ctx.registry_password = d.get('password')
- registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password)
- else:
- raise Error('json provided for custom registry login did not include all necessary fields. '
- 'Please setup json file as\n'
- '{\n'
- ' "url": "REGISTRY_URL",\n'
- ' "username": "REGISTRY_USERNAME",\n'
- ' "password": "REGISTRY_PASSWORD"\n'
- '}\n')
+ # to support multiple container registries, the command will now accept a list
+ # of dictionaries. For backward compatibility, it will first check for the presence
+ # of the registry_credentials key. If the key is not found, it will fall back to
+ # parsing the old JSON format.
+ example_multi_registry = {
+ 'registry_credentials': [
+ {
+ 'url': 'REGISTRY_URL1',
+ 'username': 'REGISTRY_USERNAME1',
+ 'password': 'REGISTRY_PASSWORD1'
+ },
+ {
+ 'url': 'REGISTRY_URL2',
+ 'username': 'REGISTRY_USERNAME2',
+ 'password': 'REGISTRY_PASSWORD2'
+ }
+ ]
+ }
+ registry_creds = d.get('registry_credentials')
+ if not registry_creds:
+ registry_creds = [d]
+ for d in registry_creds:
+ if d.get('url') and d.get('username') and d.get('password'):
+ ctx.registry_url = d.get('url')
+ ctx.registry_username = d.get('username')
+ ctx.registry_password = d.get('password')
+ registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password)
+ else:
+ raise Error(
+ 'json provided for custom registry login did not include all necessary fields. '
+ 'Please setup json file as\n'
+ '{\n'
+ ' "url": "REGISTRY_URL",\n'
+ ' "username": "REGISTRY_USERNAME",\n'
+ ' "password": "REGISTRY_PASSWORD"\n'
+ '}\n'
+ 'or as below for multiple registry login\n'
+ f'{json.dumps(example_multi_registry, indent=4)}'
+ )
elif ctx.registry_url and ctx.registry_username and ctx.registry_password:
registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password)
else:
return js
-def get_parm(option: str) -> Dict[str, str]:
+def get_parm(option: str) -> Dict[str, Any]:
js = _get_config_json(option)
# custom_config_files is a special field that may be in the config
# dict. It is used for mounting custom config files into daemon's containers
['registry-login', '--registry-json', 'sample-json'])
with pytest.raises(Exception) as e:
assert _cephadm.command_registry_login(ctx)
- assert str(e.value) == ("json provided for custom registry login did not include all necessary fields. "
+ assert str(e.value).startswith("json provided for custom registry login did not include all necessary fields. "
"Please setup json file as\n"
"{\n"
" \"url\": \"REGISTRY_URL\",\n"
return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
"or -i <login credentials json file>")
elif (url and username and password):
- registry_json = {'url': url, 'username': username, 'password': password}
+ registry_json = {'registry_credentials': [{'url': url, 'username': username, 'password': password}]}
else:
assert isinstance(inbuf, str)
registry_json = json.loads(inbuf)
- if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
- return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
- "Please setup json file as\n"
- "{\n"
- " \"url\": \"REGISTRY_URL\",\n"
- " \"username\": \"REGISTRY_USERNAME\",\n"
- " \"password\": \"REGISTRY_PASSWORD\"\n"
- "}\n")
+ registry_creds = registry_json.get('registry_credentials')
+ if not registry_creds:
+ if isinstance(registry_json, dict) and all(
+ isinstance(k, str) and isinstance(v, str) for k, v in registry_json.items()
+ ):
+ registry_creds = [registry_json] # type: ignore[list-item]
+ registry_json = {'registry_credentials': registry_creds}
+ else:
+ return -errno.EINVAL, "", "Invalid login credentials json file"
+ for d in registry_creds:
+ if "url" not in d or "username" not in d or "password" not in d:
+ return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
# verify login info works by attempting login on random host
host = None
return r
# function responsible for logging single host into custom registry
- async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
+ async def _registry_login(self, host: str, registry_json: Dict[str, list[Dict[str, str]]]) -> Optional[str]:
self.log.debug(
- f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
+ f"Attempting to log host {host} into custom registries")
# want to pass info over stdin rather than through normal list of args
out, err, code = await self._run_cephadm(
host, 'mon', 'registry-login',
['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
if code:
- return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
+ return f"Host {host} failed to login to all registries"
return None
async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
def check_registry_credentials(url, username, password):
- assert json.loads(cephadm_module.get_store('registry_credentials')) == {
- 'url': url, 'username': username, 'password': password}
+ assert json.loads(cephadm_module.get_store('registry_credentials')) == {'registry_credentials': [{'url': url, 'username': username, 'password': password}]}
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
# test bad login where args are valid but login command fails
_run_cephadm.side_effect = async_side_effect(('{}', 'error', 1))
code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
- assert err == 'Host test failed to login to fail-url as fail-user with given password'
+ assert err == 'Host test failed to login to all registries'
check_registry_credentials('json-url', 'json-user', 'json-pass')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({