self.assertIn('rgw_status', data)
self.assertTrue(data['rgw_metadata'])
- @authenticate
- def test_rgw_status(self):
- self._radosgw_admin_cmd([
+
+class RgwApiUserTest(DashboardTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(RgwApiUserTest, cls).setUpClass()
+ cls._radosgw_admin_cmd([
'user', 'create', '--uid=admin', '--display-name=admin',
'--system', '--access-key=admin', '--secret=admin'
])
- self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', 'admin'])
- self._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
- self._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
+ cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+
+ def setUp(self):
+ # Restart the Dashboard module to ensure that the connection to the
+ # RGW Admin Ops API is re-established with the new credentials.
+ self._ceph_cmd(['mgr', 'module', 'disable', 'dashboard'])
+ self._ceph_cmd(['mgr', 'module', 'enable', 'dashboard', '--force'])
+ @authenticate
+ def test_success(self):
+ self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', ''])
data = self._get('/api/rgw/status')
self.assertStatus(200)
self.assertIn('available', data)
self.assertIn('message', data)
self.assertTrue(data['available'])
+ @authenticate
+ def test_failure(self):
+ self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', 'xyz'])
+ data = self._get('/api/rgw/status')
+ self.assertStatus(200)
+ self.assertIn('available', data)
+ self.assertIn('message', data)
+ self.assertFalse(data['available'])
+ self.assertIn('The user "xyz" is unknown to the Object Gateway.',
+ data['message'])
+
class RgwProxyExceptionsTest(DashboardTestCase):
RgwClient._port = port
RgwClient._ssl = Settings.RGW_API_SCHEME == 'https'
RgwClient._ADMIN_PATH = Settings.RGW_API_ADMIN_RESOURCE
- RgwClient._SYSTEM_USERID = Settings.RGW_API_USER_ID
- logger.info("Creating new connection for user: %s",
- RgwClient._SYSTEM_USERID)
- RgwClient._user_instances[RgwClient._SYSTEM_USERID] = \
- RgwClient(Settings.RGW_API_USER_ID, Settings.RGW_API_ACCESS_KEY,
- Settings.RGW_API_SECRET_KEY)
+ # Create an instance using the configured settings.
+ instance = RgwClient(Settings.RGW_API_USER_ID,
+ Settings.RGW_API_ACCESS_KEY,
+ Settings.RGW_API_SECRET_KEY)
+
+ RgwClient._SYSTEM_USERID = instance.userid
+
+ # Append the instance to the internal map.
+ RgwClient._user_instances[RgwClient._SYSTEM_USERID] = instance
@staticmethod
def instance(userid):
if not RgwClient._user_instances:
RgwClient._load_settings()
+
if not userid:
userid = RgwClient._SYSTEM_USERID
+
if userid not in RgwClient._user_instances:
- logger.info("Creating new connection for user: %s", userid)
+ # Get the access and secret keys for the specified user.
keys = RgwClient.admin_instance().get_user_keys(userid)
if not keys:
raise RequestException(
"User '{}' does not have any keys configured.".format(
userid))
- RgwClient._user_instances[userid] = RgwClient(
- userid, keys['access_key'], keys['secret_key'])
+ # Create an instance and append it to the internal map.
+ RgwClient._user_instances[userid] = RgwClient(userid,
+ keys['access_key'],
+ keys['secret_key'])
+
return RgwClient._user_instances[userid]
@staticmethod
admin_path = admin_path if admin_path else RgwClient._ADMIN_PATH
ssl = ssl if ssl else RgwClient._ssl
- self.userid = userid
self.service_url = build_url(host=host, port=port)
self.admin_path = admin_path
s3auth = S3Auth(access_key, secret_key, service_url=self.service_url)
super(RgwClient, self).__init__(host, port, 'RGW', ssl, s3auth)
- logger.info("Creating new connection")
+ # If user ID is not set, then try to get it via the RGW Admin Ops API.
+ self.userid = userid if userid else self._get_user_id(self.admin_path)
+
+ logger.info("Created new connection for user: %s", self.userid)
@RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)')
def is_service_online(self, request=None):
request({'format': 'json'})
return True
+ @RestClient.api_get('/{admin_path}/metadata/user?myself',
+ resp_structure='data > user_id')
+ def _get_user_id(self, admin_path, request=None):
+ # pylint: disable=unused-argument
+ """
+ Get the user ID of the user that is used to communicate with the
+ RGW Admin Ops API.
+ :rtype: str
+ :return: The user ID of the user that is used to sign the
+ RGW Admin Ops API calls.
+ """
+ response = request()
+ return response['data']['user_id']
+
@RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]')
def _is_system_user(self, admin_path, request=None):
# pylint: disable=unused-argument