from ..settings import Settings
logger = logging.getLogger('access_control')
-
+DEFAULT_FILE_DESC = 'password/secret'
# password hashing algorithm
def password_hash(password, salt_password=None):
# CLI dashboard access control scope commands
@CLIWriteCommand('dashboard set-login-credentials')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def set_login_credentials_cmd(_, username: str, inbuf: str):
'''
Set the login credentials. Password read from -i <file>
@CLIWriteCommand('dashboard ac-user-create')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_create_cmd(_, username: str, inbuf: str,
rolename: Optional[str] = None,
name: Optional[str] = None,
@CLIWriteCommand('dashboard ac-user-set-password')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password(_, username: str, inbuf: str,
force_password: bool = False):
'''
@CLIWriteCommand('dashboard ac-user-set-password-hash')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password_hash(_, username: str, inbuf: str):
'''
Set user password bcrypt hash from -i <file>
@CLIWriteCommand('dashboard iscsi-gateway-add')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc='iSCSI gateway configuration')
def add_iscsi_gateway(_, inbuf, name: Optional[str] = None):
'''
Add iSCSI gateway configuration. Gateway URL read from -i <file>
return bool([cmd for secret_word in ['password', 'key'] if (secret_word in cmd)])
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc='password/secret')
def get_secret(inbuf=None):
return inbuf, None, None
force_password=True)
self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
- self.assertEqual(str(ctx.exception), ERROR_MSG_EMPTY_INPUT_FILE)
+ self.assertIn(ERROR_MSG_EMPTY_INPUT_FILE, str(ctx.exception))
def test_set_user_password_hash(self):
user_orig = self.test_create_user()
inbuf='')
self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
- self.assertEqual(str(ctx.exception), ERROR_MSG_NO_INPUT_FILE)
+ self.assertIn(ERROR_MSG_NO_INPUT_FILE, str(ctx.exception))
def test_cli_add_gateway(self):
self.exec_cmd('iscsi-gateway-add', name='node1',
)
self.assertEqual(r, -errno.EINVAL)
self.assertEqual(out, '')
- self.assertEqual(err, ERROR_MSG_EMPTY_INPUT_FILE)
+ self.assertIn(ERROR_MSG_EMPTY_INPUT_FILE, err)
def test_set_secret(self):
r, out, err = handle_option_command(
return getattr(tp, '__origin__', None)
-ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
-ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
+ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
+ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
# Full list of strings in "osd_types.cc:pg_state_string()"
PG_STATES = [
"active",
return CLICommand(prefix, "w", poll)
-def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
- @functools.wraps(func)
- def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
- if 'inbuf' not in kwargs:
- return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
- if isinstance(kwargs['inbuf'], str):
- # Delete new line separator at EOF (it may have been added by a text editor).
- kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
- if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
- return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
- return func(*args, **kwargs)
- check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
- return check
+def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
+ def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
+ @functools.wraps(func)
+ def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+ if 'inbuf' not in kwargs:
+ return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
+ f'containing {desc} with "-i" option'
+ if isinstance(kwargs['inbuf'], str):
+ # Delete new line separator at EOF (it may have been added by a text editor).
+ kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
+ if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
+ return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
+ 'the file'
+ return func(*args, **kwargs)
+ check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
+ return check
+ return CheckFileInput
def _get_localized_key(prefix: str, key: str) -> str: