]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: Add RGW topics endpoint creation for create ,delete and list in dashboard 61075/head
authorpujaoshahu <pshahu@redhat.com>
Fri, 13 Dec 2024 05:11:13 +0000 (10:41 +0530)
committerpujaoshahu <pshahu@redhat.com>
Mon, 17 Mar 2025 07:17:19 +0000 (12:47 +0530)
Fixes: https://tracker.ceph.com/issues/69229
Signed-off-by: pujaoshahu <pshahu@redhat.com>
src/pybind/mgr/dashboard/controllers/rgw.py
src/pybind/mgr/dashboard/openapi.yaml
src/pybind/mgr/dashboard/services/rgw_client.py
src/pybind/mgr/dashboard/tests/test_rgw.py

index c1e3fce0a5a38fb908144335d351e51ed2f78492..746bd1c5d1a37665704dfa7a93668a8624b5c661 100755 (executable)
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 
 # pylint: disable=C0302
+
 import json
 import logging
 import re
@@ -16,7 +17,8 @@ from ..security import Permission, Scope
 from ..services.auth import AuthManager, JwtManager
 from ..services.ceph_service import CephService
 from ..services.rgw_client import _SYNC_GROUP_ID, NoRgwDaemonsException, \
-    RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit
+    RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit, \
+    RgwTopicmanagement
 from ..services.rgw_iam import RgwAccounts
 from ..services.service import RgwServiceManager, wait_for_daemon_to_start
 from ..tools import json_str_to_object, str_to_bool
@@ -1405,3 +1407,112 @@ class RgwZone(RESTController):
         multisite_instance = RgwMultisite()
         result = multisite_instance.get_user_list(zoneName, realmName)
         return result
+
+
+@APIRouter('/rgw/topic', Scope.RGW)
+@APIDoc("RGW Topic Management API", "RGW Topic Management")
+class RgwTopic(RESTController):
+
+    @EndpointDoc(
+        "Create a new RGW Topic",
+        parameters={
+            "name": (str, "Name of the topic"),
+            "push_endpoint": (str, "Push Endpoint"),
+            "opaque_data": (str, " opaque data"),
+            "persistent": (bool, "persistent"),
+            "time_to_live": (str, "Time to live"),
+            "max_retries": (str, "max retries"),
+            "retry_sleep_duration": (str, "retry sleep duration"),
+            "policy": (str, "policy"),
+            "verify_ssl": (bool, 'verify ssl'),
+            "cloud_events": (str, 'cloud events'),
+            "user": (str, 'user'),
+            "password": (str, 'password'),
+            "vhost": (str, 'vhost'),
+            "ca_location": (str, 'ca location'),
+            "amqp_exchange": (str, 'amqp exchange'),
+            "amqp_ack_level": (str, 'amqp ack level'),
+            "use_ssl": (bool, 'use ssl'),
+            "kafka_ack_level": (str, 'kafka ack level'),
+            "kafka_brokers": (str, 'kafka brokers'),
+            "mechanism": (str, 'mechanism'),
+        },
+    )
+    def create(
+        self,
+        name: str,
+        daemon_name=None,
+        owner=None,
+        push_endpoint: Optional[str] = None,
+        opaque_data: Optional[str] = None,
+        persistent: Optional[bool] = False,
+        time_to_live: Optional[str] = None,
+        max_retries: Optional[str] = None,
+        retry_sleep_duration: Optional[str] = None,
+        policy: Optional[str] = None,
+        verify_ssl: Optional[bool] = False,
+        cloud_events: Optional[bool] = False,
+        ca_location: Optional[str] = None,
+        amqp_exchange: Optional[str] = None,
+        amqp_ack_level: Optional[str] = None,
+        use_ssl: Optional[bool] = False,
+        kafka_ack_level: Optional[str] = None,
+        kafka_brokers: Optional[str] = None,
+        mechanism: Optional[str] = None
+    ):
+        rgw_topic_instance = RgwClient.instance(owner, daemon_name=daemon_name)
+        return rgw_topic_instance.create_topic(
+            name=name,
+            push_endpoint=push_endpoint,
+            opaque_data=opaque_data,
+            persistent=persistent,
+            time_to_live=time_to_live,
+            max_retries=max_retries,
+            retry_sleep_duration=retry_sleep_duration,
+            policy=policy,
+            verify_ssl=verify_ssl,
+            cloud_events=cloud_events,
+            ca_location=ca_location,
+            amqp_exchange=amqp_exchange,
+            amqp_ack_level=amqp_ack_level,
+            use_ssl=use_ssl,
+            kafka_ack_level=kafka_ack_level,
+            kafka_brokers=kafka_brokers,
+            mechanism=mechanism
+        )
+
+    @EndpointDoc(
+        "Get RGW Topic List",
+        parameters={
+            "uid": (str, "Name of the user"),
+            "tenant": (str, "Name of the tenant"),
+        },
+    )
+    def list(self, uid: Optional[str] = None, tenant: Optional[str] = None):
+        rgw_topic_instance = RgwTopicmanagement()
+        result = rgw_topic_instance.list_topics(uid, tenant)
+        return result
+
+    @EndpointDoc(
+        "Get RGW Topic",
+        parameters={
+            "name": (str, "Name of the user"),
+            "tenant": (str, "Name of the tenant"),
+        },
+    )
+    def get(self, name: str, tenant: Optional[str] = None):
+        rgw_topic_instance = RgwTopicmanagement()
+        result = rgw_topic_instance.get_topic(name, tenant)
+        return result
+
+    @EndpointDoc(
+        "Delete RGW Topic",
+        parameters={
+            "name": (str, "Name of the user"),
+            "tenant": (str, "Name of the tenant"),
+        },
+    )
+    def delete(self, name: str, tenant: Optional[str] = None):
+        rgw_topic_instance = RgwTopicmanagement()
+        result = rgw_topic_instance.delete_topic(name=name, tenant=tenant)
+        return result
index d107f6644370683088242fa2d073c72349a25a0c..a539a5577740c00fbe1dd5272222d12c80dd8791 100755 (executable)
@@ -12944,6 +12944,209 @@ paths:
       - jwt: []
       tags:
       - RgwSite
+  /api/rgw/topic:
+    get:
+      parameters:
+      - allowEmptyValue: true
+        description: Name of the user
+        in: query
+        name: uid
+        schema:
+          type: string
+      - allowEmptyValue: true
+        description: Name of the tenant
+        in: query
+        name: tenant
+        schema:
+          type: string
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Get RGW Topic List
+      tags:
+      - RGW Topic Management
+    post:
+      parameters: []
+      requestBody:
+        content:
+          application/json:
+            schema:
+              properties:
+                amqp_ack_level:
+                  description: amqp ack level
+                  type: string
+                amqp_exchange:
+                  description: amqp exchange
+                  type: string
+                ca_location:
+                  description: ca location
+                  type: string
+                cloud_events:
+                  default: false
+                  description: cloud events
+                  type: string
+                daemon_name:
+                  type: string
+                kafka_ack_level:
+                  description: kafka ack level
+                  type: string
+                kafka_brokers:
+                  description: kafka brokers
+                  type: string
+                max_retries:
+                  description: max retries
+                  type: string
+                mechanism:
+                  description: mechanism
+                  type: string
+                name:
+                  description: Name of the topic
+                  type: string
+                opaque_data:
+                  description: ' opaque data'
+                  type: string
+                owner:
+                  type: string
+                persistent:
+                  default: false
+                  description: persistent
+                  type: boolean
+                policy:
+                  description: policy
+                  type: string
+                push_endpoint:
+                  description: Push Endpoint
+                  type: string
+                retry_sleep_duration:
+                  description: retry sleep duration
+                  type: string
+                time_to_live:
+                  description: Time to live
+                  type: string
+                use_ssl:
+                  default: false
+                  description: use ssl
+                  type: boolean
+                verify_ssl:
+                  default: false
+                  description: verify ssl
+                  type: boolean
+              required:
+              - name
+              type: object
+      responses:
+        '201':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource created.
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Create a new RGW Topic
+      tags:
+      - RGW Topic Management
+  /api/rgw/topic/{name}:
+    delete:
+      parameters:
+      - description: Name of the user
+        in: path
+        name: name
+        required: true
+        schema:
+          type: string
+      - allowEmptyValue: true
+        description: Name of the tenant
+        in: query
+        name: tenant
+        schema:
+          type: string
+      responses:
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '204':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource deleted.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Delete RGW Topic
+      tags:
+      - RGW Topic Management
+    get:
+      parameters:
+      - description: Name of the user
+        in: path
+        name: name
+        required: true
+        schema:
+          type: string
+      - allowEmptyValue: true
+        description: Name of the tenant
+        in: query
+        name: tenant
+        schema:
+          type: string
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Get RGW Topic
+      tags:
+      - RGW Topic Management
   /api/rgw/user:
     get:
       parameters:
@@ -17840,6 +18043,8 @@ tags:
   name: PrometheusNotifications
 - description: List of RGW roles
   name: RGW
+- description: RGW Topic Management API
+  name: RGW Topic Management
 - description: RBD Management API
   name: Rbd
 - description: RBD Mirroring Management API
index f9423da564e7ebc7d0015fd19ea9332b62a9ea73..c2f8786afde663fd83cc49e54343ad66cf3b1f25 100755 (executable)
@@ -1105,6 +1105,59 @@ class RgwClient(RestClient):
                     return None
             raise e
 
+    @RestClient.api_post('?Action=CreateTopic&Name={name}')
+    def create_topic(self, request=None, name: str = '',
+                     push_endpoint: Optional[str] = '', opaque_data: Optional[str] = '',
+                     persistent: Optional[bool] = False, time_to_live: Optional[str] = '',
+                     max_retries: Optional[str] = '', retry_sleep_duration: Optional[str] = '',
+                     policy: Optional[str] = '',
+                     verify_ssl: Optional[bool] = False, cloud_events: Optional[bool] = False,
+                     ca_location: Optional[str] = None, amqp_exchange: Optional[str] = None,
+                     amqp_ack_level: Optional[str] = None,
+                     use_ssl: Optional[bool] = False, kafka_ack_level: Optional[str] = None,
+                     kafka_brokers: Optional[str] = None, mechanism: Optional[str] = None,
+                     ):
+        params = {'Name': name}
+
+        if push_endpoint:
+            params['push-endpoint'] = push_endpoint
+        if opaque_data:
+            params['OpaqueData'] = opaque_data
+        if persistent:
+            params['persistent'] = 'true' if persistent else 'false'
+        if time_to_live:
+            params['time_to_live'] = time_to_live
+        if max_retries:
+            params['max_retries'] = max_retries
+        if retry_sleep_duration:
+            params['retry_sleep_duration'] = retry_sleep_duration
+        if policy:
+            params['Policy'] = policy
+        if verify_ssl:
+            params['verify_ssl'] = 'true' if verify_ssl else 'false'
+        if cloud_events:
+            params['cloud_events'] = 'true' if cloud_events else 'false'
+        if ca_location:
+            params['ca_location'] = ca_location
+        if amqp_exchange:
+            params['amqp_exchange'] = amqp_exchange
+        if amqp_ack_level:
+            params['amqp_ack_level'] = amqp_ack_level
+        if use_ssl:
+            params['use_ssl'] = 'true' if use_ssl else 'false'
+        if kafka_ack_level:
+            params['kafka_ack_level'] = kafka_ack_level
+        if kafka_brokers:
+            params['kafka_brokers'] = kafka_brokers
+        if mechanism:
+            params['mechanism'] = mechanism
+        try:
+            result = request(params=params)
+        except RequestException as e:
+            raise DashboardException(msg=str(e), component='rgw')
+
+        return result
+
 
 class SyncStatus(Enum):
     enabled = 'enabled'
@@ -2605,3 +2658,61 @@ class RgwMultisite:
             return True
         except DashboardException:
             return False
+
+
+class RgwTopicmanagement:
+    def list_topics(self, uid: Optional[str], tenant: Optional[str]):
+        rgw_topics_list = {}
+        rgw_topic_list_cmd = ['topic', 'list']
+        try:
+            if uid:
+                rgw_topic_list_cmd.append('--uid')
+                rgw_topic_list_cmd.append(uid)
+
+            if tenant:
+                rgw_topic_list_cmd.append('--tenant')
+                rgw_topic_list_cmd.append(tenant)
+
+            exit_code, rgw_topics_list, _ = mgr.send_rgwadmin_command(rgw_topic_list_cmd)
+            if exit_code > 0:
+                raise DashboardException(msg='Unable to fetch topic list',
+                                         http_status_code=500, component='rgw')
+            return rgw_topics_list
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def get_topic(self, name: str, tenant: Optional[str]):
+        rgw_topic_info_cmd = ['topic', 'get']
+        try:
+            if tenant:
+                rgw_topic_info_cmd.append('--tenant')
+                rgw_topic_info_cmd.append(tenant)
+
+            if name:
+                rgw_topic_info_cmd.append('--topic')
+                rgw_topic_info_cmd.append(name)
+
+            exit_code, topic_info, _ = mgr.send_rgwadmin_command(rgw_topic_info_cmd)
+            if exit_code > 0:
+                raise DashboardException('Unable to get topic info',
+                                         http_status_code=500, component='rgw')
+            return topic_info
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def delete_topic(self, name: str, tenant: Optional[str] = None):
+        rgw_delete_topic_cmd = ['topic', 'rm']
+        try:
+            if tenant:
+                rgw_delete_topic_cmd.extend(['--tenant', tenant])
+
+            if name:
+                rgw_delete_topic_cmd.extend(['--topic', name])
+
+            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_topic_cmd)
+
+            if exit_code > 0:
+                raise DashboardException(msg='Unable to delete topic',
+                                         http_status_code=500, component='rgw')
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
index fb39bc8d519d474d9e46bf605326cd202fb2aef7..298b6a4c21361eb04652a55d6b8022d9b42124ac 100644 (file)
@@ -1,7 +1,8 @@
+
 from unittest.mock import Mock, call, patch
 
 from .. import mgr
-from ..controllers.rgw import Rgw, RgwDaemon, RgwUser
+from ..controllers.rgw import Rgw, RgwDaemon, RgwTopic, RgwUser
 from ..rest_client import RequestException
 from ..services.rgw_client import RgwClient, RgwMultisite
 from ..tests import ControllerTestCase, RgwStub
@@ -479,3 +480,124 @@ class RgwUserControllerTestCase(ControllerTestCase):
         self._get('/test/api/rgw/user/testuser/ratelimit')
         self.assertStatus(200)
         self.assertJsonBody(mock_return_value)
+
+
+class TestRgwTopicController(ControllerTestCase):
+
+    @classmethod
+    def setup_server(cls):
+        cls.setup_controllers([RgwTopic], '/test')
+
+    @patch('dashboard.services.rgw_client._get_daemons')
+    @patch('dashboard.services.rgw_client.RgwClient', autospec=True)
+    def test_create_topic(self, mock_rgw_client, mock_get_daemons):
+        """
+        Test creating a topic with mock return values.
+        """
+        mock_daemon = {
+            "name": "dummy_daemon",
+            "host": "127.0.0.1",
+            "port": 8000,
+            "ssl": False,
+            "realm_name": "dummy_realm",
+            "zonegroup_name": "dummy_zonegroup",
+            "zonegroup_id": "dummy_zonegroup_id",
+            "zone_name": "dummy_zone"
+        }
+        mock_daemon_dict = {'dummy_daemon': mock_daemon}
+
+        mock_get_daemons.return_value = mock_daemon_dict
+        mock_rgw_client_instance = mock_rgw_client.return_value
+        mock_rgw_client_instance._daemons = mock_daemon_dict  # pylint: disable=W0212
+
+        mock_response = {
+            "CreateTopicResult": {
+                "TopicArn": "arn:aws:sns:zg1-realm1::HttpTest"
+            },
+            "ResponseMetadata": {
+                "RequestId": "b13925ff-a04a-4ff5-9578-7c51aa7932df.4926.3389207753149441947"
+            }
+        }
+        with patch('dashboard.controllers.rgw.RgwTopic.create', return_value=mock_response):
+            self._post('/test/api/rgw/topic', {
+                'name': 'HttpTest',
+                'owner': 'dashboard',
+                'opaque_data': 'testopaque',
+                'persistent': True,
+                'time_to_live': '10',
+                'max_retries': '3',
+                'retry_sleep_duration': '5',
+                'policy': {}
+            })
+            self.assertStatus(200)
+
+    @patch('dashboard.controllers.rgw.RgwTopic.list')
+    def test_list_topic_with_details(self, mock_list_topics):
+        mock_return_value = [
+            {
+                "topic": {
+                    "owner": "dashboard",
+                    "name": "HttpTest",
+                    "dest": {
+                        "push_endpoint": "https://10.0.66.13:443",
+                        "push_endpoint_args": "verify_ssl=true",
+                        "push_endpoint_topic": "HttpTest",
+                        "stored_secret": False,
+                        "persistent": True,
+                        "persistent_queue": ":HttpTest",
+                        "time_to_live": "5",
+                        "max_retries": "2",
+                        "retry_sleep_duration": "2"
+                    },
+                    "arn": "arn:aws:sns:zg1-realm1::HttpTest",
+                    "opaqueData": "test123",
+                    "policy": "{}",
+                    "subscribed_buckets": []
+                }
+            }
+        ]
+
+        mock_list_topics.return_value = mock_return_value
+        controller = RgwTopic()
+        result = controller.list(True, None)
+        mock_list_topics.assert_called_with(True, None)
+        self.assertEqual(result, mock_return_value)
+
+    @patch('dashboard.controllers.rgw.RgwTopic.get')
+    def test_get_topic(self, mock_get_topic):
+        mock_return_value = {
+            "topic": {
+                "owner": "dashboard",
+                "name": "HttpTest",
+                "dest": {
+                    "push_endpoint": "https://10.0.66.13:443",
+                    "push_endpoint_args": "verify_ssl=true",
+                    "push_endpoint_topic": "HttpTest",
+                    "stored_secret": False,
+                    "persistent": True,
+                    "persistent_queue": ":HttpTest",
+                    "time_to_live": "5",
+                    "max_retries": "2",
+                    "retry_sleep_duration": "2"
+                },
+                "arn": "arn:aws:sns:zg1-realm1::HttpTest",
+                "opaqueData": "test123",
+                "policy": "{}",
+                "subscribed_buckets": []
+            }
+        }
+        mock_get_topic.return_value = mock_return_value
+
+        controller = RgwTopic()
+        result = controller.get('HttpTest', None)
+        mock_get_topic.assert_called_with('HttpTest', None)
+        self.assertEqual(result, mock_return_value)
+
+    @patch('dashboard.controllers.rgw.RgwTopic.delete')
+    def test_delete_topic(self, mock_delete_topic):
+        mock_delete_topic.return_value = None
+
+        controller = RgwTopic()
+        result = controller.delete(name='HttpTest', tenant=None)
+        mock_delete_topic.assert_called_with(name='HttpTest', tenant=None)
+        self.assertEqual(result, None)