kwargs['check_status'] = False
return self.run_cluster_cmd(**kwargs).exitstatus
+ def get_keyring(self, client_id):
+ """
+ Return keyring for the given client.
+
+ :param client_id: str
+ :return keyring: str
+ """
+ if client_id.find('client.') != -1:
+ client_id = client_id.replace('client.', '')
+
+ keyring = self.run_cluster_cmd(args=f'auth get client.{client_id}',
+ stdout=StringIO()).\
+ stdout.getvalue().strip()
+
+ assert isinstance(keyring, str) and keyring != ''
+ return keyring
+
def run_ceph_w(self, watch_channel=None):
"""
Execute "ceph -w" in the background with stdout connected to a BytesIO,