From: pujaoshahu Date: Fri, 13 Dec 2024 05:11:13 +0000 (+0530) Subject: mgr/dashboard: Add RGW topics endpoint creation for create ,delete and list in dashboard X-Git-Tag: testing/wip-pdonnell-testing-20250324.181635-debug~27^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=94aebfc3f7d4789e8a570c87999409ef0d591957;p=ceph-ci.git mgr/dashboard: Add RGW topics endpoint creation for create ,delete and list in dashboard Fixes: https://tracker.ceph.com/issues/69229 Signed-off-by: pujaoshahu --- diff --git a/src/pybind/mgr/dashboard/controllers/rgw.py b/src/pybind/mgr/dashboard/controllers/rgw.py index c1e3fce0a5a..746bd1c5d1a 100755 --- a/src/pybind/mgr/dashboard/controllers/rgw.py +++ b/src/pybind/mgr/dashboard/controllers/rgw.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # pylint: disable=C0302 + import json import logging import re @@ -16,7 +17,8 @@ from ..security import Permission, Scope from ..services.auth import AuthManager, JwtManager from ..services.ceph_service import CephService from ..services.rgw_client import _SYNC_GROUP_ID, NoRgwDaemonsException, \ - RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit + RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit, \ + RgwTopicmanagement from ..services.rgw_iam import RgwAccounts from ..services.service import RgwServiceManager, wait_for_daemon_to_start from ..tools import json_str_to_object, str_to_bool @@ -1405,3 +1407,112 @@ class RgwZone(RESTController): multisite_instance = RgwMultisite() result = multisite_instance.get_user_list(zoneName, realmName) return result + + +@APIRouter('/rgw/topic', Scope.RGW) +@APIDoc("RGW Topic Management API", "RGW Topic Management") +class RgwTopic(RESTController): + + @EndpointDoc( + "Create a new RGW Topic", + parameters={ + "name": (str, "Name of the topic"), + "push_endpoint": (str, "Push Endpoint"), + "opaque_data": (str, " opaque data"), + "persistent": (bool, "persistent"), + "time_to_live": (str, "Time to live"), + "max_retries": (str, "max retries"), + "retry_sleep_duration": (str, "retry sleep duration"), + "policy": (str, "policy"), + "verify_ssl": (bool, 'verify ssl'), + "cloud_events": (str, 'cloud events'), + "user": (str, 'user'), + "password": (str, 'password'), + "vhost": (str, 'vhost'), + "ca_location": (str, 'ca location'), + "amqp_exchange": (str, 'amqp exchange'), + "amqp_ack_level": (str, 'amqp ack level'), + "use_ssl": (bool, 'use ssl'), + "kafka_ack_level": (str, 'kafka ack level'), + "kafka_brokers": (str, 'kafka brokers'), + "mechanism": (str, 'mechanism'), + }, + ) + def create( + self, + name: str, + daemon_name=None, + owner=None, + push_endpoint: Optional[str] = None, + opaque_data: Optional[str] = None, + persistent: Optional[bool] = False, + time_to_live: Optional[str] = None, + max_retries: Optional[str] = None, + retry_sleep_duration: Optional[str] = None, + policy: Optional[str] = None, + verify_ssl: Optional[bool] = False, + cloud_events: Optional[bool] = False, + ca_location: Optional[str] = None, + amqp_exchange: Optional[str] = None, + amqp_ack_level: Optional[str] = None, + use_ssl: Optional[bool] = False, + kafka_ack_level: Optional[str] = None, + kafka_brokers: Optional[str] = None, + mechanism: Optional[str] = None + ): + rgw_topic_instance = RgwClient.instance(owner, daemon_name=daemon_name) + return rgw_topic_instance.create_topic( + name=name, + push_endpoint=push_endpoint, + opaque_data=opaque_data, + persistent=persistent, + time_to_live=time_to_live, + max_retries=max_retries, + retry_sleep_duration=retry_sleep_duration, + policy=policy, + verify_ssl=verify_ssl, + cloud_events=cloud_events, + ca_location=ca_location, + amqp_exchange=amqp_exchange, + amqp_ack_level=amqp_ack_level, + use_ssl=use_ssl, + kafka_ack_level=kafka_ack_level, + kafka_brokers=kafka_brokers, + mechanism=mechanism + ) + + @EndpointDoc( + "Get RGW Topic List", + parameters={ + "uid": (str, "Name of the user"), + "tenant": (str, "Name of the tenant"), + }, + ) + def list(self, uid: Optional[str] = None, tenant: Optional[str] = None): + rgw_topic_instance = RgwTopicmanagement() + result = rgw_topic_instance.list_topics(uid, tenant) + return result + + @EndpointDoc( + "Get RGW Topic", + parameters={ + "name": (str, "Name of the user"), + "tenant": (str, "Name of the tenant"), + }, + ) + def get(self, name: str, tenant: Optional[str] = None): + rgw_topic_instance = RgwTopicmanagement() + result = rgw_topic_instance.get_topic(name, tenant) + return result + + @EndpointDoc( + "Delete RGW Topic", + parameters={ + "name": (str, "Name of the user"), + "tenant": (str, "Name of the tenant"), + }, + ) + def delete(self, name: str, tenant: Optional[str] = None): + rgw_topic_instance = RgwTopicmanagement() + result = rgw_topic_instance.delete_topic(name=name, tenant=tenant) + return result diff --git a/src/pybind/mgr/dashboard/openapi.yaml b/src/pybind/mgr/dashboard/openapi.yaml index d107f664437..a539a557774 100755 --- a/src/pybind/mgr/dashboard/openapi.yaml +++ b/src/pybind/mgr/dashboard/openapi.yaml @@ -12944,6 +12944,209 @@ paths: - jwt: [] tags: - RgwSite + /api/rgw/topic: + get: + parameters: + - allowEmptyValue: true + description: Name of the user + in: query + name: uid + schema: + type: string + - allowEmptyValue: true + description: Name of the tenant + in: query + name: tenant + schema: + type: string + responses: + '200': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: OK + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: Get RGW Topic List + tags: + - RGW Topic Management + post: + parameters: [] + requestBody: + content: + application/json: + schema: + properties: + amqp_ack_level: + description: amqp ack level + type: string + amqp_exchange: + description: amqp exchange + type: string + ca_location: + description: ca location + type: string + cloud_events: + default: false + description: cloud events + type: string + daemon_name: + type: string + kafka_ack_level: + description: kafka ack level + type: string + kafka_brokers: + description: kafka brokers + type: string + max_retries: + description: max retries + type: string + mechanism: + description: mechanism + type: string + name: + description: Name of the topic + type: string + opaque_data: + description: ' opaque data' + type: string + owner: + type: string + persistent: + default: false + description: persistent + type: boolean + policy: + description: policy + type: string + push_endpoint: + description: Push Endpoint + type: string + retry_sleep_duration: + description: retry sleep duration + type: string + time_to_live: + description: Time to live + type: string + use_ssl: + default: false + description: use ssl + type: boolean + verify_ssl: + default: false + description: verify ssl + type: boolean + required: + - name + type: object + responses: + '201': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: Resource created. + '202': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: Operation is still executing. Please check the task queue. + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: Create a new RGW Topic + tags: + - RGW Topic Management + /api/rgw/topic/{name}: + delete: + parameters: + - description: Name of the user + in: path + name: name + required: true + schema: + type: string + - allowEmptyValue: true + description: Name of the tenant + in: query + name: tenant + schema: + type: string + responses: + '202': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: Operation is still executing. Please check the task queue. + '204': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: Resource deleted. + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: Delete RGW Topic + tags: + - RGW Topic Management + get: + parameters: + - description: Name of the user + in: path + name: name + required: true + schema: + type: string + - allowEmptyValue: true + description: Name of the tenant + in: query + name: tenant + schema: + type: string + responses: + '200': + content: + application/vnd.ceph.api.v1.0+json: + type: object + description: OK + '400': + description: Operation exception. Please check the response body for details. + '401': + description: Unauthenticated access. Please login first. + '403': + description: Unauthorized access. Please check your permissions. + '500': + description: Unexpected error. Please check the response body for the stack + trace. + security: + - jwt: [] + summary: Get RGW Topic + tags: + - RGW Topic Management /api/rgw/user: get: parameters: @@ -17840,6 +18043,8 @@ tags: name: PrometheusNotifications - description: List of RGW roles name: RGW +- description: RGW Topic Management API + name: RGW Topic Management - description: RBD Management API name: Rbd - description: RBD Mirroring Management API diff --git a/src/pybind/mgr/dashboard/services/rgw_client.py b/src/pybind/mgr/dashboard/services/rgw_client.py index f9423da564e..c2f8786afde 100755 --- a/src/pybind/mgr/dashboard/services/rgw_client.py +++ b/src/pybind/mgr/dashboard/services/rgw_client.py @@ -1105,6 +1105,59 @@ class RgwClient(RestClient): return None raise e + @RestClient.api_post('?Action=CreateTopic&Name={name}') + def create_topic(self, request=None, name: str = '', + push_endpoint: Optional[str] = '', opaque_data: Optional[str] = '', + persistent: Optional[bool] = False, time_to_live: Optional[str] = '', + max_retries: Optional[str] = '', retry_sleep_duration: Optional[str] = '', + policy: Optional[str] = '', + verify_ssl: Optional[bool] = False, cloud_events: Optional[bool] = False, + ca_location: Optional[str] = None, amqp_exchange: Optional[str] = None, + amqp_ack_level: Optional[str] = None, + use_ssl: Optional[bool] = False, kafka_ack_level: Optional[str] = None, + kafka_brokers: Optional[str] = None, mechanism: Optional[str] = None, + ): + params = {'Name': name} + + if push_endpoint: + params['push-endpoint'] = push_endpoint + if opaque_data: + params['OpaqueData'] = opaque_data + if persistent: + params['persistent'] = 'true' if persistent else 'false' + if time_to_live: + params['time_to_live'] = time_to_live + if max_retries: + params['max_retries'] = max_retries + if retry_sleep_duration: + params['retry_sleep_duration'] = retry_sleep_duration + if policy: + params['Policy'] = policy + if verify_ssl: + params['verify_ssl'] = 'true' if verify_ssl else 'false' + if cloud_events: + params['cloud_events'] = 'true' if cloud_events else 'false' + if ca_location: + params['ca_location'] = ca_location + if amqp_exchange: + params['amqp_exchange'] = amqp_exchange + if amqp_ack_level: + params['amqp_ack_level'] = amqp_ack_level + if use_ssl: + params['use_ssl'] = 'true' if use_ssl else 'false' + if kafka_ack_level: + params['kafka_ack_level'] = kafka_ack_level + if kafka_brokers: + params['kafka_brokers'] = kafka_brokers + if mechanism: + params['mechanism'] = mechanism + try: + result = request(params=params) + except RequestException as e: + raise DashboardException(msg=str(e), component='rgw') + + return result + class SyncStatus(Enum): enabled = 'enabled' @@ -2605,3 +2658,61 @@ class RgwMultisite: return True except DashboardException: return False + + +class RgwTopicmanagement: + def list_topics(self, uid: Optional[str], tenant: Optional[str]): + rgw_topics_list = {} + rgw_topic_list_cmd = ['topic', 'list'] + try: + if uid: + rgw_topic_list_cmd.append('--uid') + rgw_topic_list_cmd.append(uid) + + if tenant: + rgw_topic_list_cmd.append('--tenant') + rgw_topic_list_cmd.append(tenant) + + exit_code, rgw_topics_list, _ = mgr.send_rgwadmin_command(rgw_topic_list_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to fetch topic list', + http_status_code=500, component='rgw') + return rgw_topics_list + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def get_topic(self, name: str, tenant: Optional[str]): + rgw_topic_info_cmd = ['topic', 'get'] + try: + if tenant: + rgw_topic_info_cmd.append('--tenant') + rgw_topic_info_cmd.append(tenant) + + if name: + rgw_topic_info_cmd.append('--topic') + rgw_topic_info_cmd.append(name) + + exit_code, topic_info, _ = mgr.send_rgwadmin_command(rgw_topic_info_cmd) + if exit_code > 0: + raise DashboardException('Unable to get topic info', + http_status_code=500, component='rgw') + return topic_info + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def delete_topic(self, name: str, tenant: Optional[str] = None): + rgw_delete_topic_cmd = ['topic', 'rm'] + try: + if tenant: + rgw_delete_topic_cmd.extend(['--tenant', tenant]) + + if name: + rgw_delete_topic_cmd.extend(['--topic', name]) + + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_topic_cmd) + + if exit_code > 0: + raise DashboardException(msg='Unable to delete topic', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') diff --git a/src/pybind/mgr/dashboard/tests/test_rgw.py b/src/pybind/mgr/dashboard/tests/test_rgw.py index fb39bc8d519..298b6a4c213 100644 --- a/src/pybind/mgr/dashboard/tests/test_rgw.py +++ b/src/pybind/mgr/dashboard/tests/test_rgw.py @@ -1,7 +1,8 @@ + from unittest.mock import Mock, call, patch from .. import mgr -from ..controllers.rgw import Rgw, RgwDaemon, RgwUser +from ..controllers.rgw import Rgw, RgwDaemon, RgwTopic, RgwUser from ..rest_client import RequestException from ..services.rgw_client import RgwClient, RgwMultisite from ..tests import ControllerTestCase, RgwStub @@ -479,3 +480,124 @@ class RgwUserControllerTestCase(ControllerTestCase): self._get('/test/api/rgw/user/testuser/ratelimit') self.assertStatus(200) self.assertJsonBody(mock_return_value) + + +class TestRgwTopicController(ControllerTestCase): + + @classmethod + def setup_server(cls): + cls.setup_controllers([RgwTopic], '/test') + + @patch('dashboard.services.rgw_client._get_daemons') + @patch('dashboard.services.rgw_client.RgwClient', autospec=True) + def test_create_topic(self, mock_rgw_client, mock_get_daemons): + """ + Test creating a topic with mock return values. + """ + mock_daemon = { + "name": "dummy_daemon", + "host": "127.0.0.1", + "port": 8000, + "ssl": False, + "realm_name": "dummy_realm", + "zonegroup_name": "dummy_zonegroup", + "zonegroup_id": "dummy_zonegroup_id", + "zone_name": "dummy_zone" + } + mock_daemon_dict = {'dummy_daemon': mock_daemon} + + mock_get_daemons.return_value = mock_daemon_dict + mock_rgw_client_instance = mock_rgw_client.return_value + mock_rgw_client_instance._daemons = mock_daemon_dict # pylint: disable=W0212 + + mock_response = { + "CreateTopicResult": { + "TopicArn": "arn:aws:sns:zg1-realm1::HttpTest" + }, + "ResponseMetadata": { + "RequestId": "b13925ff-a04a-4ff5-9578-7c51aa7932df.4926.3389207753149441947" + } + } + with patch('dashboard.controllers.rgw.RgwTopic.create', return_value=mock_response): + self._post('/test/api/rgw/topic', { + 'name': 'HttpTest', + 'owner': 'dashboard', + 'opaque_data': 'testopaque', + 'persistent': True, + 'time_to_live': '10', + 'max_retries': '3', + 'retry_sleep_duration': '5', + 'policy': {} + }) + self.assertStatus(200) + + @patch('dashboard.controllers.rgw.RgwTopic.list') + def test_list_topic_with_details(self, mock_list_topics): + mock_return_value = [ + { + "topic": { + "owner": "dashboard", + "name": "HttpTest", + "dest": { + "push_endpoint": "https://10.0.66.13:443", + "push_endpoint_args": "verify_ssl=true", + "push_endpoint_topic": "HttpTest", + "stored_secret": False, + "persistent": True, + "persistent_queue": ":HttpTest", + "time_to_live": "5", + "max_retries": "2", + "retry_sleep_duration": "2" + }, + "arn": "arn:aws:sns:zg1-realm1::HttpTest", + "opaqueData": "test123", + "policy": "{}", + "subscribed_buckets": [] + } + } + ] + + mock_list_topics.return_value = mock_return_value + controller = RgwTopic() + result = controller.list(True, None) + mock_list_topics.assert_called_with(True, None) + self.assertEqual(result, mock_return_value) + + @patch('dashboard.controllers.rgw.RgwTopic.get') + def test_get_topic(self, mock_get_topic): + mock_return_value = { + "topic": { + "owner": "dashboard", + "name": "HttpTest", + "dest": { + "push_endpoint": "https://10.0.66.13:443", + "push_endpoint_args": "verify_ssl=true", + "push_endpoint_topic": "HttpTest", + "stored_secret": False, + "persistent": True, + "persistent_queue": ":HttpTest", + "time_to_live": "5", + "max_retries": "2", + "retry_sleep_duration": "2" + }, + "arn": "arn:aws:sns:zg1-realm1::HttpTest", + "opaqueData": "test123", + "policy": "{}", + "subscribed_buckets": [] + } + } + mock_get_topic.return_value = mock_return_value + + controller = RgwTopic() + result = controller.get('HttpTest', None) + mock_get_topic.assert_called_with('HttpTest', None) + self.assertEqual(result, mock_return_value) + + @patch('dashboard.controllers.rgw.RgwTopic.delete') + def test_delete_topic(self, mock_delete_topic): + mock_delete_topic.return_value = None + + controller = RgwTopic() + result = controller.delete(name='HttpTest', tenant=None) + mock_delete_topic.assert_called_with(name='HttpTest', tenant=None) + self.assertEqual(result, None)