status['message'] = 'Failed to connect to the Object Gateway\'s Admin Ops API.'
raise RequestException(status['message'])
# Ensure the API user ID is known by the RGW.
- if not instance.is_system_user():
+ if not instance.user_exists():
status['message'] = 'The user "{}" is unknown to the Object Gateway.'.format(
instance.userid)
raise RequestException(status['message'])
+ # Ensure the system flag is set for the API user ID.
+ if not instance.is_system_user():
+ status['message'] = 'The system flag is not set for user "{}".'.format(
+ instance.userid)
+ raise RequestException(status['message'])
status['available'] = True
except RequestException:
pass
from __future__ import absolute_import
import re
+from distutils.util import strtobool
from ..awsauth import S3Auth
from ..settings import Settings, Options
from ..rest_client import RestClient, RequestException
return response['data']['user_id']
@RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]')
- def _is_system_user(self, admin_path, request=None):
+ def _user_exists(self, admin_path, request=None):
# pylint: disable=unused-argument
response = request()
return self.userid in response
+ def user_exists(self):
+ return self._user_exists(self.admin_path)
+
+ @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
+ resp_structure='data > system')
+ def _is_system_user(self, admin_path, userid, request=None):
+ # pylint: disable=unused-argument
+ response = request()
+ return strtobool(response['data']['system'])
+
def is_system_user(self):
- return self._is_system_user(self.admin_path)
+ return self._is_system_user(self.admin_path, self.userid)
@RestClient.api_get(
'/{admin_path}/user',