# pylint: disable=C0302
# pylint: disable=too-many-branches
# pylint: disable=too-many-lines
-
+import ast
import ipaddress
import json
import logging
import os
import re
+import time
import xml.etree.ElementTree as ET # noqa: N814
from enum import Enum
from subprocess import SubprocessError
+from urllib.parse import urlparse
+import requests
from mgr_util import build_url, name_to_config_section
from .. import mgr
from ..awsauth import S3Auth
+from ..controllers.multi_cluster import MultiCluster
from ..exceptions import DashboardException
from ..rest_client import RequestException, RestClient
from ..settings import Settings
from ..tools import dict_contains_path, dict_get, json_str_to_object, str_to_bool
from .ceph_service import CephService
+from .orchestrator import OrchClient
+from .service import RgwServiceManager
try:
from typing import Any, Dict, List, Optional, Tuple, Union
super().__init__('No RGW service is running.')
-class NoCredentialsException(Exception):
- def __init__(self):
- super(NoCredentialsException, self).__init__(
- 'No RGW credentials found, '
- 'please consult the documentation on how to enable RGW for '
- 'the dashboard.')
-
-
class RgwAdminException(Exception):
pass
raise LookupError('Failed to determine RGW port from "{}"'.format(config))
-def _parse_secrets(user: str, data: dict) -> Tuple[str, str]:
- for key in data.get('keys', []):
- if key.get('user') == user and data.get('system') in ['true', True]:
- access_key = key.get('access_key')
- secret_key = key.get('secret_key')
- return access_key, secret_key
- return '', ''
-
-
-def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]:
- access_key = ''
- secret_key = ''
- rgw_user_info_cmd = ['user', 'info', '--uid', user]
- cmd_realm_option = ['--rgw-realm', realm] if realm else []
- if realm:
- rgw_user_info_cmd += cmd_realm_option
- try:
- _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd)
- if out:
- access_key, secret_key = _parse_secrets(user, out)
- if not access_key:
- rgw_create_user_cmd = [
- 'user', 'create',
- '--uid', user,
- '--display-name', 'Ceph Dashboard',
- '--system',
- ] + cmd_realm_option
- _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd)
- if out:
- access_key, secret_key = _parse_secrets(user, out)
- if not access_key:
- logger.error('Unable to create rgw user "%s": %s', user, err)
- except SubprocessError as error:
- logger.exception(error)
-
- return access_key, secret_key
-
-
-def configure_rgw_credentials():
- logger.info('Configuring dashboard RGW credentials')
- user = 'dashboard'
- realms = []
- access_key = ''
- secret_key = ''
- try:
- _, out, err = mgr.send_rgwadmin_command(['realm', 'list'])
- if out:
- realms = out.get('realms', [])
- if err:
- logger.error('Unable to list RGW realms: %s', err)
- if realms:
- realm_access_keys = {}
- realm_secret_keys = {}
- for realm in realms:
- realm_access_key, realm_secret_key = _get_user_keys(user, realm)
- if realm_access_key:
- realm_access_keys[realm] = realm_access_key
- realm_secret_keys[realm] = realm_secret_key
- if realm_access_keys:
- access_key = json.dumps(realm_access_keys)
- secret_key = json.dumps(realm_secret_keys)
- else:
- access_key, secret_key = _get_user_keys(user)
-
- assert access_key and secret_key
- Settings.RGW_API_ACCESS_KEY = access_key
- Settings.RGW_API_SECRET_KEY = secret_key
- except (AssertionError, SubprocessError) as error:
- logger.exception(error)
- raise NoCredentialsException
-
-
# pylint: disable=R0904
class RgwClient(RestClient):
_host = None
# The API access key and secret key are mandatory for a minimal configuration.
if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY):
- configure_rgw_credentials()
+ rgw_service_manager = RgwServiceManager()
+ rgw_service_manager.configure_rgw_credentials()
daemon_keys = RgwClient._daemons.keys()
if not daemon_name:
except SubprocessError as error:
raise DashboardException(error, http_status_code=500, component='rgw')
+ def replace_hostname(self, endpoint, hostname_to_ip):
+ # Replace the hostname in the endpoint URL with its corresponding IP address.
+ parsed_url = urlparse(endpoint)
+ hostname = parsed_url.hostname
+ if hostname in hostname_to_ip:
+ return endpoint.replace(hostname, hostname_to_ip[hostname])
+ return endpoint
+
+ def setup_multisite_replication(self, realm_name: str, zonegroup_name: str,
+ zonegroup_endpoints: str, zone_name: str,
+ zone_endpoints: str, username: str,
+ cluster_fsid: Optional[str] = None):
+
+ # Set up multisite replication for Ceph RGW.
+ logger.info("Starting multisite replication setup")
+ orch = OrchClient.instance()
+
+ def get_updated_endpoints(endpoints):
+ # Update endpoint URLs by replacing hostnames with IP addresses.
+ logger.debug("Updating endpoints: %s", endpoints)
+ try:
+ hostname_to_ip = {host['hostname']: host['addr'] for host in (h.to_json() for h in orch.hosts.list())} # noqa E501 # pylint: disable=line-too-long
+ updated_endpoints = [self.replace_hostname(endpoint, hostname_to_ip) for endpoint in endpoints.split(',')] # noqa E501 # pylint: disable=line-too-long
+ logger.debug("Updated endpoints: %s", updated_endpoints)
+ return updated_endpoints
+ except Exception as e:
+ logger.error("Failed to update endpoints: %s", e)
+ raise
+
+ zonegroup_ip_url = ','.join(get_updated_endpoints(zonegroup_endpoints))
+ zone_ip_url = ','.join(get_updated_endpoints(zone_endpoints))
+ try:
+ # Create the realm and zonegroup
+ logger.info("Creating realm: %s", realm_name)
+ self.create_realm(realm_name=realm_name, default=True)
+ logger.info("Creating zonegroup: %s", zonegroup_name)
+ self.create_zonegroup(realm_name=realm_name, zonegroup_name=zonegroup_name,
+ default=True, master=True, endpoints=zonegroup_ip_url)
+ except Exception as e:
+ logger.error("Failed to create realm or zonegroup: %s", e)
+ raise
+ try:
+ # Create the zone and system user, then modify the zone with user credentials
+ logger.info("Creating zone: %s", zone_name)
+ if self.create_zone(zone_name=zone_name, zonegroup_name=zonegroup_name,
+ default=True, master=True, endpoints=zone_ip_url,
+ access_key=None, secret_key=None):
+ logger.info("Creating system user: %s", username)
+ user_details = self.create_system_user(username, zone_name)
+ if user_details:
+ keys = user_details['keys'][0]
+ logger.info("Modifying zone with user credentials: %s", username)
+ self.modify_zone(zone_name=zone_name, zonegroup_name=zonegroup_name,
+ default='true', master='true', endpoints=zone_ip_url,
+ access_key=keys['access_key'],
+ secret_key=keys['secret_key'])
+ except Exception as e:
+ logger.error("Failed to create zone or system user: %s", e)
+ raise
+ try:
+ # Restart RGW daemons and set credentials
+ logger.info("Restarting RGW daemons and setting credentials")
+ rgw_service_manager = RgwServiceManager()
+ rgw_service_manager.restart_rgw_daemons_and_set_credentials()
+ except Exception as e:
+ logger.error("Failed to restart RGW daemons: %s", e)
+ raise
+ try:
+ # Get realm tokens and import to another cluster if specified
+ logger.info("Getting realm tokens")
+ realm_token_info = CephService.get_realm_tokens()
+
+ if cluster_fsid and realm_token_info:
+ logger.info("Importing realm token to cluster: %s", cluster_fsid)
+ self.import_realm_token_to_cluster(cluster_fsid, realm_name,
+ realm_token_info, username)
+ except Exception as e:
+ logger.error("Failed to get realm tokens or import to cluster: %s", e)
+ raise
+ logger.info("Multisite replication setup completed")
+ return realm_token_info
+
+ def import_realm_token_to_cluster(self, cluster_fsid, realm_name, realm_token_info, username):
+ logger.info("Importing realm token to cluster: %s", cluster_fsid)
+ try:
+ for realm_token in realm_token_info:
+ if realm_token['realm'] == realm_name:
+ realm_export_token = realm_token['token']
+ break
+ else:
+ raise ValueError(f"Realm {realm_name} not found in realm tokens")
+ multi_cluster_config_str = str(mgr.get_module_option_ex('dashboard', 'MULTICLUSTER_CONFIG')) # noqa E501 # pylint: disable=line-too-long
+ multi_cluster_config = ast.literal_eval(multi_cluster_config_str)
+ for fsid, clusters in multi_cluster_config['config'].items():
+ if fsid == cluster_fsid:
+ for cluster_info in clusters:
+ cluster_token = cluster_info.get('token')
+ cluster_url = cluster_info.get('url')
+ break
+ else:
+ raise ValueError(f"No cluster token found for fsid: {cluster_fsid}")
+ break
+ else:
+ raise ValueError(f"Cluster fsid {cluster_fsid} not found in multi-cluster config")
+ if cluster_token:
+ placement_spec: Dict[str, Dict] = {"placement": {}}
+ payload = {
+ 'realm_token': realm_export_token,
+ 'zone_name': 'new_replicated_zone',
+ 'port': 81,
+ 'placement_spec': placement_spec
+ }
+
+ if not cluster_url.endswith('/'):
+ cluster_url += '/'
+
+ path = 'api/rgw/realm/import_realm_token'
+ try:
+ multi_cluster_instance = MultiCluster()
+ # pylint: disable=protected-access
+ response = multi_cluster_instance._proxy(method='POST', base_url=cluster_url,
+ path=path, payload=payload,
+ token=cluster_token)
+ logger.info("Successfully imported realm token to cluster: %s", cluster_fsid)
+ self.check_user_in_second_cluster(cluster_url, cluster_token, username)
+ return response
+ except requests.RequestException as e:
+ logger.error("Could not reach %s: %s", cluster_url, e)
+ raise DashboardException(f"Could not reach {cluster_url}: {e}",
+ http_status_code=404, component='dashboard')
+ except json.JSONDecodeError as e:
+ logger.error("Error parsing Dashboard API response: %s", e.msg)
+ raise DashboardException(f"Error parsing Dashboard API response: {e.msg}",
+ component='dashboard')
+ except Exception as e:
+ logger.error("Failed to import realm token to cluster: %s", e)
+ raise
+
+ def check_user_in_second_cluster(self, cluster_url, cluster_token, username):
+ logger.info("Checking for user %s in the second cluster", username)
+ path = 'api/rgw/zone/get_user_list?zoneName=new_replicated_zone'
+ user_found = False
+ start_time = time.time()
+ while not user_found:
+ if time.time() - start_time > 120: # Timeout after 2 minutes
+ logger.error("Timeout reached while waiting for user %s to appear \
+ in the second cluster", username)
+ raise DashboardException(code='user_replication_timeout',
+ msg="Timeout reached while waiting for \
+ user %s to appear in the second cluster." % username)
+ try:
+ multi_cluster_instance = MultiCluster()
+ # pylint: disable=protected-access
+ user_content = multi_cluster_instance._proxy(method='GET', base_url=cluster_url,
+ path=path, token=cluster_token)
+ logger.info("User content in the second cluster: %s", user_content)
+ for user in user_content:
+ if user['user_id'] == username:
+ user_found = True
+ logger.info("User %s found in the second cluster", username)
+ # pylint: disable=protected-access
+ restart_daemons_content = multi_cluster_instance._proxy(method='PUT', base_url=cluster_url, # noqa E501 # pylint: disable=line-too-long
+ path='ui-api/rgw/multisite/setup-rgw-credentials', # noqa E501 # pylint: disable=line-too-long
+ token=cluster_token) # noqa E501 # pylint: disable=line-too-long
+ logger.info("Restarted RGW daemons in the second cluster: %s", restart_daemons_content) # noqa E501 # pylint: disable=line-too-long
+ break
+ except requests.RequestException as e:
+ logger.error("Error checking user in the second cluster: %s", e)
+ logger.info("User %s not found yet, retrying in 5 seconds", username)
+ time.sleep(5)
+
def create_realm(self, realm_name: str, default: bool):
rgw_realm_create_cmd = ['realm', 'create']
cmd_create_realm_options = ['--rgw-realm', realm_name]
--- /dev/null
+import json
+import logging
+import time
+from subprocess import SubprocessError
+
+try:
+ from typing import Optional, Tuple
+except ImportError:
+ pass # For typing only
+
+from .. import mgr
+from ..exceptions import DashboardException
+from ..settings import Settings
+from .orchestrator import OrchClient
+
+logger = logging.getLogger('service')
+
+
+class NoCredentialsException(Exception):
+ def __init__(self):
+ super(NoCredentialsException, self).__init__(
+ 'No RGW credentials found, '
+ 'please consult the documentation on how to enable RGW for '
+ 'the dashboard.')
+
+
+def verify_service_restart(service_type: str, service_id: str):
+ orch = OrchClient.instance()
+ service_name = f'{service_type}.{service_id}'
+
+ logger.info("Getting initial service info for: %s", service_name)
+ info = orch.services.get(service_name)[0].to_dict()
+ last_refreshed = info['status']['last_refresh']
+
+ logger.info("Reloading service: %s", service_name)
+ orch.services.reload(service_type, service_id)
+
+ logger.info("Waiting for service refresh: %s", service_name)
+ wait_for_refresh(orch, service_name, last_refreshed)
+
+ logger.info("Checking daemon status for: %s", service_name)
+ daemon_status = wait_for_daemon_to_start(orch, service_name)
+ return daemon_status
+
+
+def wait_for_refresh(orch, service_name, last_refreshed):
+ orch = OrchClient.instance()
+ logger.info("Waiting for service %s to refresh", service_name)
+
+ while True:
+ updated_info = orch.services.get(service_name)[0].to_dict()
+ if updated_info['status']['last_refresh'] != last_refreshed:
+ logger.info("Service %s refreshed", service_name)
+ break
+
+
+def wait_for_daemon_to_start(orch, service_name):
+ orch = OrchClient.instance()
+ start_time = time.time()
+ logger.info("Waiting for daemon %s to start", service_name)
+
+ while True:
+ daemons = [d.to_dict() for d in orch.services.list_daemons(service_name=service_name)]
+ all_running = True
+
+ for daemon in daemons:
+ daemon_state = daemon['status_desc']
+ logger.debug("Daemon %s state: %s", daemon['daemon_id'], daemon_state)
+
+ if daemon_state in ('unknown', 'error', 'stopped'):
+ logger.error("Failed to restart daemon %s for service %s. State is %s", daemon['daemon_id'], service_name, daemon_state) # noqa E501 # pylint: disable=line-too-long
+ raise DashboardException(
+ code='daemon_restart_failed',
+ msg="Failed to restart the daemon %s. Daemon state is %s." % (service_name, daemon_state) # noqa E501 # pylint: disable=line-too-long
+ )
+ if daemon_state != 'running':
+ all_running = False
+
+ if all_running:
+ logger.info("All daemons for service %s are running", service_name)
+ return True
+
+ if time.time() - start_time > 10:
+ logger.error("Timeout reached while waiting for daemon %s to start", service_name)
+ raise DashboardException(
+ code='daemon_restart_timeout',
+ msg="Timeout reached while waiting for daemon %s to start." % service_name
+ )
+ return False
+
+
+class RgwServiceManager:
+ def restart_rgw_daemons_and_set_credentials(self):
+ # Restart RGW daemons and set credentials.
+ logger.info("Restarting RGW daemons and setting credentials")
+ orch = OrchClient.instance()
+ services, _ = orch.services.list(service_type='rgw', offset=0)
+
+ all_daemons_up = True
+ for service in services:
+ logger.info("Verifying service restart for: %s", service['service_id'])
+ daemons_up = verify_service_restart('rgw', service['service_id'])
+ if not daemons_up:
+ logger.error("Service %s restart verification failed", service['service_id'])
+ all_daemons_up = False
+
+ if all_daemons_up:
+ logger.info("All daemons are up, configuring RGW credentials")
+ self.configure_rgw_credentials()
+ else:
+ logger.error("Not all daemons are up, skipping RGW credentials configuration")
+
+ def _parse_secrets(self, user: str, data: dict) -> Tuple[str, str]:
+ for key in data.get('keys', []):
+ if key.get('user') == user and data.get('system') in ['true', True]:
+ access_key = key.get('access_key')
+ secret_key = key.get('secret_key')
+ return access_key, secret_key
+ return '', ''
+
+ def _get_user_keys(self, user: str, realm: Optional[str] = None) -> Tuple[str, str]:
+ access_key = ''
+ secret_key = ''
+ rgw_user_info_cmd = ['user', 'info', '--uid', user]
+ cmd_realm_option = ['--rgw-realm', realm] if realm else []
+ if realm:
+ rgw_user_info_cmd += cmd_realm_option
+ try:
+ _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd)
+ if out:
+ access_key, secret_key = self._parse_secrets(user, out)
+ if not access_key:
+ rgw_create_user_cmd = [
+ 'user', 'create',
+ '--uid', user,
+ '--display-name', 'Ceph Dashboard',
+ '--system',
+ ] + cmd_realm_option
+ _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd)
+ if out:
+ access_key, secret_key = self._parse_secrets(user, out)
+ if not access_key:
+ logger.error('Unable to create rgw user "%s": %s', user, err)
+ except SubprocessError as error:
+ logger.exception(error)
+
+ return access_key, secret_key
+
+ def configure_rgw_credentials(self):
+ logger.info('Configuring dashboard RGW credentials')
+ user = 'dashboard'
+ realms = []
+ access_key = ''
+ secret_key = ''
+ try:
+ _, out, err = mgr.send_rgwadmin_command(['realm', 'list'])
+ if out:
+ realms = out.get('realms', [])
+ if err:
+ logger.error('Unable to list RGW realms: %s', err)
+ if realms:
+ realm_access_keys = {}
+ realm_secret_keys = {}
+ for realm in realms:
+ realm_access_key, realm_secret_key = self._get_user_keys(user, realm)
+ if realm_access_key:
+ realm_access_keys[realm] = realm_access_key
+ realm_secret_keys[realm] = realm_secret_key
+ if realm_access_keys:
+ access_key = json.dumps(realm_access_keys)
+ secret_key = json.dumps(realm_secret_keys)
+ else:
+ access_key, secret_key = self._get_user_keys(user)
+
+ assert access_key and secret_key
+ Settings.RGW_API_ACCESS_KEY = access_key
+ Settings.RGW_API_SECRET_KEY = secret_key
+ except (AssertionError, SubprocessError) as error:
+ logger.exception(error)
+ raise NoCredentialsException