To create a user with the administrator role you can use the following
commands::
- $ ceph dashboard ac-user-create <username> <password> administrator
+ $ ceph dashboard ac-user-create <username> -i <file-containing-password> administrator
Account Lock-out
^^^^^^^^^^^^^^^^
Finally, provide the credentials to the dashboard::
- $ ceph dashboard set-rgw-api-access-key <access_key>
- $ ceph dashboard set-rgw-api-secret-key <secret_key>
+ $ ceph dashboard set-rgw-api-access-key -i <file-containing-access-key>
+ $ ceph dashboard set-rgw-api-secret-key -i <file-containing-secret-key>
In a typical default configuration with a single RGW endpoint, this is all you
have to do to get the Object Gateway management functionality working. The
The available iSCSI gateways must be defined using the following commands::
$ ceph dashboard iscsi-gateway-list
- $ ceph dashboard iscsi-gateway-add <scheme>://<username>:<password>@<host>[:port]
+ $ # Gateway URL format for a new gateway: <scheme>://<username>:<password>@<host>[:port]
+ $ ceph dashboard iscsi-gateway-add -i <file-containing-gateway-url> [<gateway_name>]
$ ceph dashboard iscsi-gateway-rm <gateway_name>
- *Create User*::
- $ ceph dashboard ac-user-create [--enabled] [--force-password] [--pwd_update_required] <username> [<password>] [<rolename>] [<name>] [<email>] [<pwd_expiration_date>]
+ $ ceph dashboard ac-user-create [--enabled] [--force-password] [--pwd_update_required] <username> -i <file-containing-password> [<rolename>] [<name>] [<email>] [<pwd_expiration_date>]
To bypass the password policy checks use the `force-password` option.
Use the option `pwd_update_required` so that a newly created user has
- *Change Password*::
- $ ceph dashboard ac-user-set-password [--force-password] <username> <password>
+ $ ceph dashboard ac-user-set-password [--force-password] <username> -i <file-containing-password>
- *Change Password Hash*::
- $ ceph dashboard ac-user-set-password-hash <username> <hash>
+ $ ceph dashboard ac-user-set-password-hash <username> -i <file-containing-password-hash>
The hash must be a bcrypt hash and salt, e.g. ``$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2``.
This can be used to import users from an external database.
1. *Create the user*::
- $ ceph dashboard ac-user-create bob mypassword
+ $ ceph dashboard ac-user-create bob -i <file-containing-password>
2. *Create role and specify scope permissions*::
import json
import logging
-from collections import namedtuple
+import random
+import re
+import string
import time
+from collections import namedtuple
+from typing import List
import requests
import six
raise ex
user_create_args = [
- 'dashboard', 'ac-user-create', username, password
+ 'dashboard', 'ac-user-create', username
]
if force_password:
user_create_args.append('--force-password')
if cmd_args:
user_create_args.extend(cmd_args)
- cls._ceph_cmd(user_create_args)
-
+ cls._ceph_cmd_with_secret(user_create_args, password)
if roles:
set_roles_args = ['dashboard', 'ac-user-set-roles', username]
for idx, role in enumerate(roles):
log.info("command exit status: %d", exitstatus)
return exitstatus
+ @classmethod
+ def _ceph_cmd_with_secret(cls, cmd: List[str], secret: str, return_exit_code: bool = False):
+ cmd.append('-i')
+ cmd.append('{}'.format(cls._ceph_create_tmp_file(secret)))
+ if return_exit_code:
+ return cls._ceph_cmd_result(cmd)
+ return cls._ceph_cmd(cmd)
+
+ @classmethod
+ def _ceph_create_tmp_file(cls, content: str) -> str:
+ """Create a temporary file in the remote cluster"""
+ file_name = ''.join(random.choices(string.ascii_letters + string.digits, k=20))
+ file_path = '/tmp/{}'.format(file_name)
+ cls._cmd(['sh', '-c', 'echo -n {} > {}'.format(content, file_path)])
+ return file_path
+
def set_config_key(self, key, value):
self._ceph_cmd(['config-key', 'set', key, value])
import time
import jwt
+from teuthology.orchestra.run import \
+ CommandFailedError # pylint: disable=import-error
from .helper import DashboardTestCase, JObj, JLeaf
self.assertIn('create', perms)
self.assertIn('delete', perms)
+ def test_login_without_password(self):
+ with self.assertRaises(CommandFailedError):
+ self.create_user('admin2', '', ['administrator'], force_password=True)
+
def test_a_set_login_credentials(self):
# test with Authorization header
self.create_user('admin2', 'admin2', ['administrator'])
"detail": "Invalid credentials"
})
- def test_login_without_password(self):
- # test with Authorization header
- self.create_user('admin2', '', ['administrator'])
- self._post("/api/auth", {'username': 'admin2', 'password': ''})
- self.assertStatus(400)
- self.assertJsonBody({
- "component": "auth",
- "code": "invalid_credentials",
- "detail": "Invalid credentials"
- })
- self.delete_user('admin2')
-
- # test with Cookies set
- self.create_user('admin2', '', ['administrator'])
- self._post("/api/auth", {'username': 'admin2', 'password': ''}, set_cookies=True)
- self.assertStatus(400)
- self.assertJsonBody({
- "component": "auth",
- "code": "invalid_credentials",
- "detail": "Invalid credentials"
- })
- self.delete_user('admin2')
-
def test_lockout_user(self):
# test with Authorization header
self._ceph_cmd(['dashboard', 'set-account-lockout-attempts', '3'])
self._get("/api/host")
self.assertStatus(200)
time.sleep(1)
- self._ceph_cmd(['dashboard', 'ac-user-set-password', '--force-password',
- 'user', 'user2'])
+ self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password', '--force-password',
+ 'user'],
+ 'user2')
time.sleep(1)
self._get("/api/host")
self.assertStatus(401)
self._get("/api/host", set_cookies=True)
self.assertStatus(200)
time.sleep(1)
- self._ceph_cmd(['dashboard', 'ac-user-set-password', '--force-password',
- 'user', 'user2'])
+ self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password', '--force-password',
+ 'user'],
+ 'user2')
time.sleep(1)
self._get("/api/host", set_cookies=True)
self.assertStatus(401)
'user', 'create', '--uid', 'admin', '--display-name', 'admin',
'--system', '--access-key', 'admin', '--secret', 'admin'
])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-secret-key'], 'admin')
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-access-key'], 'admin')
@classmethod
def tearDownClass(cls):
])
# Update the dashboard configuration.
cls._ceph_cmd(['dashboard', 'set-rgw-api-user-id', 'admin'])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-secret-key'], 'admin')
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-access-key'], 'admin')
# Create a test user?
if cls.create_test_user:
cls._radosgw_admin_cmd([
self._ceph_cmd(['mgr', 'module', 'enable', 'dashboard', '--force'])
# Set the default credentials.
self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', ''])
- self._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
- self._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ self._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-secret-key'], 'admin')
+ self._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-access-key'], 'admin')
super(RgwApiCredentialsTest, self).setUp()
def test_no_access_secret_key(self):
- self._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', ''])
- self._ceph_cmd(['dashboard', 'set-rgw-api-access-key', ''])
+ self._ceph_cmd(['dashboard', 'reset-rgw-api-secret-key'])
+ self._ceph_cmd(['dashboard', 'reset-rgw-api-access-key'])
resp = self._get('/api/rgw/user')
self.assertStatus(500)
self.assertIn('detail', resp)
self.assertError(code='invalid_credentials', component='auth')
def test_create_user_password_cli(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-create',
- 'test1', 'mypassword10#'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
+ 'test1'],
+ 'mypassword10#',
+ return_exit_code=True)
self.assertEqual(exitcode, 0)
self.delete_user('test1')
@DashboardTestCase.RunAs('test2', 'foo_bar_10#', force_password=False, login=False)
def test_change_user_password_cli(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-set-password',
- 'test2', 'foo_new-password01#'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
+ 'test2'],
+ 'foo_new-password01#',
+ return_exit_code=True)
self.assertEqual(exitcode, 0)
def test_create_user_password_force_cli(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-create',
- '--force-password', 'test11',
- 'bar'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
+ '--force-password', 'test11'],
+ 'bar',
+ return_exit_code=True)
self.assertEqual(exitcode, 0)
self.delete_user('test11')
@DashboardTestCase.RunAs('test22', 'foo_bar_10#', force_password=False, login=False)
def test_change_user_password_force_cli(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-set-password',
- '--force-password', 'test22',
- 'bar'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
+ '--force-password', 'test22'],
+ 'bar',
+ return_exit_code=True)
self.assertEqual(exitcode, 0)
def test_create_user_password_cli_fail(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-create', 'test3', 'foo'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-create',
+ 'test3'],
+ 'foo',
+ return_exit_code=True)
self.assertNotEqual(exitcode, 0)
@DashboardTestCase.RunAs('test4', 'x1z_tst+_10#', force_password=False, login=False)
def test_change_user_password_cli_fail(self):
- exitcode = self._ceph_cmd_result(['dashboard', 'ac-user-set-password',
- 'test4', 'bar'])
+ exitcode = self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password',
+ 'test4'],
+ 'bar',
+ return_exit_code=True)
self.assertNotEqual(exitcode, 0)
def test_create_user_with_pwd_expiration_date(self):
logger.info('Creating initial admin user...')
password = args.initial_dashboard_password or generate_password()
- cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, password, 'administrator', '--force-password']
+ tmp_password_file = write_tmp(password, uid, gid)
+ cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password']
if not args.dashboard_password_noupdate:
cmd.append('--pwd-update-required')
- cli(cmd)
+ cli(cmd, extra_mounts={pathify(tmp_password_file.name): '/tmp/dashboard.pw:z'})
logger.info('Fetching dashboard port number...')
out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
port = int(out)
cmd_dicts = get_set_cmd_dicts(out.strip())
for cmd_dict in list(cmd_dicts):
try:
- _, out, _ = self.mgr.check_mon_command(cmd_dict)
+ inbuf = cmd_dict.pop('inbuf', None)
+ _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
except MonCommandFailed as e:
logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
logger.info('Adding iSCSI gateway %s to Dashboard', safe_service_url)
cmd_dicts.append({
'prefix': 'dashboard iscsi-gateway-add',
- 'service_url': service_url,
+ 'inbuf': service_url,
'name': dd.hostname
})
return cmd_dicts
with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
CephadmServe(cephadm_module)._check_daemons()
_mon_cmd.assert_any_call(
- {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
+ {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
+ None)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
self.config = ''
self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
- def _check_mon_command(self, cmd_dict):
+ def _check_mon_command(self, cmd_dict, inbuf=None):
prefix = cmd_dict.get('prefix')
if prefix == 'get-cmd':
return 0, self.config, ''
def handle_command(self, inbuf, cmd):
# pylint: disable=too-many-return-statements
- res = handle_option_command(cmd)
+ res = handle_option_command(cmd, inbuf)
if res[0] != -errno.ENOSYS:
return res
res = handle_sso_command(cmd)
# Set the user-id
./bin/ceph dashboard set-rgw-api-user-id dev
# Obtain and set access and secret key for the previously created user. $() is safer than backticks `..`
- ./bin/ceph dashboard set-rgw-api-access-key $(./bin/radosgw-admin user info --uid=dev | jq -r .keys[0].access_key)
- ./bin/ceph dashboard set-rgw-api-secret-key $(./bin/radosgw-admin user info --uid=dev | jq -r .keys[0].secret_key)
+ RGW_ACCESS_KEY_FILE="/tmp/rgw-user-access-key.txt"
+ printf "$(./bin/radosgw-admin user info --uid=dev | jq -r .keys[0].access_key)" > "${RGW_ACCESS_KEY_FILE}"
+ ./bin/ceph dashboard set-rgw-api-access-key -i "${RGW_ACCESS_KEY_FILE}"
+ RGW_SECRET_KEY_FILE="/tmp/rgw-user-secret-key.txt"
+ printf "$(./bin/radosgw-admin user info --uid=dev | jq -r .keys[0].secret_key)" > "${RGW_SECRET_KEY_FILE}"
+ ./bin/ceph dashboard set-rgw-api-secret-key -i "${RGW_SECRET_KEY_FILE}"
# Set SSL verify to False
./bin/ceph dashboard set-rgw-api-ssl-verify False
from datetime import datetime, timedelta
import bcrypt
-
-from mgr_module import CLIReadCommand, CLIWriteCommand
+from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
from .. import mgr
from ..security import Scope, Permission
# CLI dashboard access control scope commands
@CLIWriteCommand('dashboard set-login-credentials',
- 'name=username,type=CephString '
- 'name=password,type=CephString',
- 'Set the login credentials')
-def set_login_credentials_cmd(_, username, password):
+ 'name=username,type=CephString',
+ 'Set the login credentials. Password read from -i <file>')
+@CLICheckNonemptyFileInput
+def set_login_credentials_cmd(_, username, inbuf):
+ password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.set_password(password)
@CLIWriteCommand('dashboard ac-user-create',
'name=username,type=CephString '
- 'name=password,type=CephString,req=false '
'name=rolename,type=CephString,req=false '
'name=name,type=CephString,req=false '
'name=email,type=CephString,req=false '
'name=force_password,type=CephBool,req=false '
'name=pwd_expiration_date,type=CephInt,req=false '
'name=pwd_update_required,type=CephBool,req=false',
- 'Create a user')
-def ac_user_create_cmd(_, username, password=None, rolename=None, name=None,
+ 'Create a user. Password read from -i <file>')
+@CLICheckNonemptyFileInput
+def ac_user_create_cmd(_, username, inbuf, rolename=None, name=None,
email=None, enabled=True, force_password=False,
pwd_expiration_date=None, pwd_update_required=False):
+ password = inbuf
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None
except RoleDoesNotExist as ex:
@CLIWriteCommand('dashboard ac-user-set-password',
'name=username,type=CephString '
- 'name=password,type=CephString '
'name=force_password,type=CephBool,req=false',
- 'Set user password')
-def ac_user_set_password(_, username, password, force_password=False):
+ 'Set user password from -i <file>')
+@CLICheckNonemptyFileInput
+def ac_user_set_password(_, username, inbuf, force_password=False):
+ password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
if not force_password:
@CLIWriteCommand('dashboard ac-user-set-password-hash',
- 'name=username,type=CephString '
- 'name=hashed_password,type=CephString',
- 'Set user password bcrypt hash')
-def ac_user_set_password_hash(_, username, hashed_password):
+ 'name=username,type=CephString',
+ 'Set user password bcrypt hash from -i <file>')
+@CLICheckNonemptyFileInput
+def ac_user_set_password_hash(_, username, inbuf):
+ hashed_password = inbuf
try:
# make sure the hashed_password is actually a bcrypt hash
bcrypt.checkpw(b'', hashed_password.encode('utf-8'))
import errno
import json
-from mgr_module import CLIReadCommand, CLIWriteCommand
+from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
from .iscsi_client import IscsiClient
from .iscsi_config import IscsiGatewaysConfig, IscsiGatewayAlreadyExists, InvalidServiceUrl, \
@CLIWriteCommand('dashboard iscsi-gateway-add',
- 'name=service_url,type=CephString '
'name=name,type=CephString,req=false',
- 'Add iSCSI gateway configuration')
-def add_iscsi_gateway(_, service_url, name=None):
+ 'Add iSCSI gateway configuration. Gateway URL read from -i <file>')
+@CLICheckNonemptyFileInput
+def add_iscsi_gateway(_, inbuf, name=None):
+ service_url = inbuf
try:
IscsiGatewaysConfig.validate_service_url(service_url)
if name is None:
import inspect
from six import add_metaclass
+from mgr_module import CLICheckNonemptyFileInput
+
from . import mgr
'perm': 'r'
})
elif cmd.startswith('dashboard set'):
- cmd_list.append({
+ cmd_entry = {
'cmd': '{} name=value,type={}'
.format(cmd, py2ceph(opt['type'])),
'desc': 'Set the {} option value'.format(opt['name']),
'perm': 'w'
- })
+ }
+ if handles_secret(cmd):
+ cmd_entry['cmd'] = cmd
+ cmd_entry['desc'] = '{} read from -i <file>'.format(cmd_entry['desc'])
+ cmd_list.append(cmd_entry)
elif cmd.startswith('dashboard reset'):
desc = 'Reset the {} option to its default value'.format(
opt['name'])
return result
-def handle_option_command(cmd):
+def handle_option_command(cmd, inbuf):
if cmd['prefix'] not in _OPTIONS_COMMAND_MAP:
return -errno.ENOSYS, '', "Command not found '{}'".format(cmd['prefix'])
elif cmd['prefix'].startswith('dashboard get'):
return 0, str(getattr(Settings, opt['name'])), ''
elif cmd['prefix'].startswith('dashboard set'):
- value = opt['type'](cmd['value'])
+ if handles_secret(cmd['prefix']):
+ value, stdout, stderr = get_secret(inbuf=inbuf)
+ if stderr:
+ return value, stdout, stderr
+ else:
+ value = cmd['value']
+ value = opt['type'](value)
if opt['type'] == bool and cmd['value'].lower() == 'false':
value = False
setattr(Settings, opt['name'], value)
return 0, 'Option {} updated'.format(opt['name']), ''
+
+
+def handles_secret(cmd: str) -> bool:
+ return bool([cmd for secret_word in ['password', 'key'] if (secret_word in cmd)])
+
+
+@CLICheckNonemptyFileInput
+def get_secret(inbuf=None):
+ return inbuf, None, None
def exec_dashboard_cmd(command_handler, cmd, **kwargs):
+ inbuf = kwargs['inbuf'] if 'inbuf' in kwargs else None
cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
cmd_dict.update(kwargs)
if cmd_dict['prefix'] not in CLICommand.COMMANDS:
except ValueError:
return out
- ret, out, err = CLICommand.COMMANDS[cmd_dict['prefix']].call(mgr, cmd_dict,
- None)
+ ret, out, err = CLICommand.COMMANDS[cmd_dict['prefix']].call(mgr, cmd_dict, inbuf)
if ret < 0:
raise CmdException(ret, err)
try:
from datetime import datetime, timedelta
+from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE
+
from . import CmdException, CLICommandTestMixin
from .. import mgr
from ..security import Scope, Permission
def test_create_user(self, username='admin', rolename=None, enabled=True,
pwdExpirationDate=None):
user = self.exec_cmd('ac-user-create', username=username,
- rolename=rolename, password='admin',
+ rolename=rolename, inbuf='admin',
name='{} User'.format(username),
email='{}@user.com'.format(username),
enabled=enabled, force_password=True,
def test_create_duplicate_user(self):
self.test_create_user()
- ret = self.exec_cmd('ac-user-create', username='admin', password='admin',
+ ret = self.exec_cmd('ac-user-create', username='admin', inbuf='admin',
force_password=True)
self.assertEqual(ret, "User 'admin' already exists")
# create a user with a role that does not exist; expect a failure
try:
self.exec_cmd('ac-user-create', username='foo',
- rolename='dne_role', password='foopass',
+ rolename='dne_role', inbuf='foopass',
name='foo User', email='foo@user.com',
force_password=True)
except CmdException as e:
# create a role (this will be 'test_role')
self.test_create_role()
self.exec_cmd('ac-user-create', username='bar',
- rolename='test_role', password='barpass',
+ rolename='test_role', inbuf='barpass',
name='bar User', email='bar@user.com',
force_password=True)
def test_set_user_password(self):
user_orig = self.test_create_user()
user = self.exec_cmd('ac-user-set-password', username='admin',
- password='newpass', force_password=True)
+ inbuf='newpass', force_password=True)
pass_hash = password_hash('newpass', user['password'])
self.assertDictEqual(user, {
'username': 'admin',
def test_set_user_password_nonexistent_user(self):
with self.assertRaises(CmdException) as ctx:
self.exec_cmd('ac-user-set-password', username='admin',
- password='newpass', force_password=True)
+ inbuf='newpass', force_password=True)
self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+ def test_set_user_password_empty(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-password', username='admin', inbuf='\n',
+ force_password=True)
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception), ERROR_MSG_EMPTY_INPUT_FILE)
+
def test_set_user_password_hash(self):
user_orig = self.test_create_user()
user = self.exec_cmd('ac-user-set-password-hash', username='admin',
- hashed_password='$2b$12$Pt3Vq/rDt2y9glTPSV.'
- 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
+ inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
pass_hash = password_hash('newpass', user['password'])
self.assertDictEqual(user, {
'username': 'admin',
def test_set_user_password_hash_nonexistent_user(self):
with self.assertRaises(CmdException) as ctx:
self.exec_cmd('ac-user-set-password-hash', username='admin',
- hashed_password='$2b$12$Pt3Vq/rDt2y9glTPSV.'
- 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
+ inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
self.test_create_user()
with self.assertRaises(CmdException) as ctx:
self.exec_cmd('ac-user-set-password-hash', username='admin',
- hashed_password='')
+ inbuf='1')
self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
self.assertEqual(str(ctx.exception), 'Invalid password hash')
def test_set_login_credentials(self):
self.exec_cmd('set-login-credentials', username='admin',
- password='admin')
+ inbuf='admin')
user = self.exec_cmd('ac-user-show', username='admin')
pass_hash = password_hash('admin', user['password'])
self.assertDictEqual(user, {
def test_set_login_credentials_for_existing_user(self):
self.test_add_user_roles('admin', ['read-only'])
self.exec_cmd('set-login-credentials', username='admin',
- password='admin2')
+ inbuf='admin2')
user = self.exec_cmd('ac-user-show', username='admin')
pass_hash = password_hash('admin2', user['password'])
self.assertDictEqual(user, {
-# pylint: disable=too-many-public-methods
+# pylint: disable=too-many-public-methods, too-many-lines
import copy
import errno
except ImportError:
import unittest.mock as mock
+from mgr_module import ERROR_MSG_NO_INPUT_FILE
+
from . import CmdException, ControllerTestCase, CLICommandTestMixin, KVStoreMockMixin
from .. import mgr
from ..controllers.iscsi import Iscsi, IscsiTarget
def test_cli_add_gateway_invalid_url(self):
with self.assertRaises(CmdException) as ctx:
self.exec_cmd('iscsi-gateway-add', name='node1',
- service_url='http:/hello.com')
+ inbuf='http:/hello.com')
self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
self.assertEqual(str(ctx.exception),
"Invalid service URL 'http:/hello.com'. Valid format: "
"'<scheme>://<username>:<password>@<host>[:port]'.")
+ def test_cli_add_gateway_empty_url(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('iscsi-gateway-add', name='node1',
+ inbuf='')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception), ERROR_MSG_NO_INPUT_FILE)
+
def test_cli_add_gateway(self):
self.exec_cmd('iscsi-gateway-add', name='node1',
- service_url='https://admin:admin@10.17.5.1:5001')
+ inbuf='https://admin:admin@10.17.5.1:5001')
self.exec_cmd('iscsi-gateway-add', name='node2',
- service_url='https://admin:admin@10.17.5.2:5001')
+ inbuf='https://admin:admin@10.17.5.2:5001')
iscsi_config = json.loads(self.get_key("_iscsi_config"))
self.assertEqual(iscsi_config['gateways'], {
'node1': {
import errno
import unittest
+
+from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE
+
from . import KVStoreMockMixin, ControllerTestCase
from .. import settings
from ..controllers.settings import Settings as SettingsController
def test_get_cmd(self):
r, out, err = handle_option_command(
- {'prefix': 'dashboard get-grafana-api-port'})
+ {'prefix': 'dashboard get-grafana-api-port'},
+ None
+ )
self.assertEqual(r, 0)
self.assertEqual(out, '3000')
self.assertEqual(err, '')
def test_set_cmd(self):
r, out, err = handle_option_command(
{'prefix': 'dashboard set-grafana-api-port',
- 'value': '4000'})
+ 'value': '4000'},
+ None
+ )
self.assertEqual(r, 0)
self.assertEqual(out, 'Option GRAFANA_API_PORT updated')
self.assertEqual(err, '')
+ def test_set_secret_empty(self):
+ r, out, err = handle_option_command(
+ {'prefix': 'dashboard set-grafana-api-password'},
+ None
+ )
+ self.assertEqual(r, -errno.EINVAL)
+ self.assertEqual(out, '')
+ self.assertEqual(err, ERROR_MSG_EMPTY_INPUT_FILE)
+
+ def test_set_secret(self):
+ r, out, err = handle_option_command(
+ {'prefix': 'dashboard set-grafana-api-password'},
+ 'my-secret'
+ )
+ self.assertEqual(r, 0)
+ self.assertEqual(out, 'Option GRAFANA_API_PASSWORD updated')
+ self.assertEqual(err, '')
+
def test_reset_cmd(self):
r, out, err = handle_option_command(
- {'prefix': 'dashboard reset-grafana-enabled'}
+ {'prefix': 'dashboard reset-grafana-enabled'},
+ None
)
self.assertEqual(r, 0)
self.assertEqual(out, 'Option {} reset to default value "{}"'.format(
def test_inv_cmd(self):
r, out, err = handle_option_command(
- {'prefix': 'dashboard get-non-existent-option'})
+ {'prefix': 'dashboard get-non-existent-option'},
+ None
+ )
self.assertEqual(r, -errno.ENOSYS)
self.assertEqual(out, '')
self.assertEqual(err, "Command not found "
def test_sync(self):
Settings.GRAFANA_API_PORT = 5000
r, out, err = handle_option_command(
- {'prefix': 'dashboard get-grafana-api-port'})
+ {'prefix': 'dashboard get-grafana-api-port'},
+ None
+ )
self.assertEqual(r, 0)
self.assertEqual(out, '5000')
self.assertEqual(err, '')
r, out, err = handle_option_command(
{'prefix': 'dashboard set-grafana-api-host',
- 'value': 'new-local-host'})
+ 'value': 'new-local-host'},
+ None
+ )
self.assertEqual(r, 0)
self.assertEqual(out, 'Option GRAFANA_API_HOST updated')
self.assertEqual(err, '')
import time
from mgr_util import profile_method
+ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
+ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
# Full list of strings in "osd_types.cc:pg_state_string()"
PG_STATES = [
"active",
return CLICommand(prefix, args, desc, "w")
+def CLICheckNonemptyFileInput(func):
+ def check(*args, **kwargs):
+ if not 'inbuf' in kwargs:
+ return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
+ if not kwargs['inbuf'] or (isinstance(kwargs['inbuf'], str)
+ and not kwargs['inbuf'].strip('\n')):
+ return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
+ return func(*args, **kwargs)
+ return check
+
+
def _get_localized_key(prefix, key):
return '{}/{}'.format(prefix, key)
"""
return self._ceph_get_daemon_status(svc_type, svc_id)
- def check_mon_command(self, cmd_dict: dict) -> HandleCommandResult:
+ def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None) -> HandleCommandResult:
"""
Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
if ``retval != 0``.
"""
- r = HandleCommandResult(*self.mon_command(cmd_dict))
+ r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
if r.retval:
raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
return r
- def mon_command(self, cmd_dict):
+ def mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None):
"""
Helper for modules that do simple, synchronous mon command
execution.
t1 = time.time()
result = CommandResult()
- self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
+ self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
r = result.wait()
t2 = time.time()
return r
- def send_command(self, *args, **kwargs):
+ def send_command(
+ self,
+ result: CommandResult,
+ svc_type: str,
+ svc_id: str,
+ command: str,
+ tag: str,
+ inbuf: Optional[str]=None):
"""
Called by the plugin to send a command to the mon
cluster.
completes, the ``notify()`` callback on the MgrModule instance is
triggered, with notify_type set to "command", and notify_id set to
the tag of the command.
+ :param str inbuf: input buffer for sending additional data.
"""
- self._ceph_send_command(*args, **kwargs)
+ self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
def set_health_checks(self, checks):
"""
def _ceph_get(self, data_name):
return self.mock_store_get('_ceph_get', data_name, mock.MagicMock())
- def _ceph_send_command(self, res, svc_type, svc_id, command, tag):
+ def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf):
cmd = json.loads(command)
# Mocking the config store is handy sometimes:
if self.__class__.__name__ not in M_classes:
- # call those only once.
+ # call those only once.
self._register_commands('')
self._register_options('')
M_classes.add(self.__class__.__name__)
tries=$((tries+1))
sleep 1
done
- ceph_adm dashboard set-login-credentials admin admin
+
+ DASHBOARD_ADMIN_SECRET_FILE="/tmp/dashboard-admin-secret.txt"
+ printf 'admin' > "${DASHBOARD_ADMIN_SECRET_FILE}"
+ ceph_adm dashboard ac-user-create admin -i "${DASHBOARD_ADMIN_SECRET_FILE}" --force-password
tries=0
while [[ $tries < 30 ]] ; do
debug echo 'waiting for mgr dashboard module to start'
sleep 1
done
- ceph_adm dashboard ac-user-create --force-password admin admin administrator
+ DASHBOARD_ADMIN_SECRET_FILE="${CEPH_CONF_PATH}/dashboard-admin-secret.txt"
+ printf 'admin' > "${DASHBOARD_ADMIN_SECRET_FILE}"
+ ceph_adm dashboard ac-user-create admin -i "${DASHBOARD_ADMIN_SECRET_FILE}" \
+ administrator --force-password
if [ "$ssl" != "0" ]; then
if ! ceph_adm dashboard create-self-signed-cert; then
debug echo dashboard module not working correctly!