# -*- coding: utf-8 -*-
# pylint: disable=C0302
+
import json
import logging
import re
from ..services.auth import AuthManager, JwtManager
from ..services.ceph_service import CephService
from ..services.rgw_client import _SYNC_GROUP_ID, NoRgwDaemonsException, \
- RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit
+ RgwClient, RgwMultisite, RgwMultisiteAutomation, RgwRateLimit, \
+ RgwTopicmanagement
from ..services.rgw_iam import RgwAccounts
from ..services.service import RgwServiceManager, wait_for_daemon_to_start
from ..tools import json_str_to_object, str_to_bool
multisite_instance = RgwMultisite()
result = multisite_instance.get_user_list(zoneName, realmName)
return result
+
+
+@APIRouter('/rgw/topic', Scope.RGW)
+@APIDoc("RGW Topic Management API", "RGW Topic Management")
+class RgwTopic(RESTController):
+
+ @EndpointDoc(
+ "Create a new RGW Topic",
+ parameters={
+ "name": (str, "Name of the topic"),
+ "push_endpoint": (str, "Push Endpoint"),
+ "opaque_data": (str, " opaque data"),
+ "persistent": (bool, "persistent"),
+ "time_to_live": (str, "Time to live"),
+ "max_retries": (str, "max retries"),
+ "retry_sleep_duration": (str, "retry sleep duration"),
+ "policy": (str, "policy"),
+ "verify_ssl": (bool, 'verify ssl'),
+ "cloud_events": (str, 'cloud events'),
+ "user": (str, 'user'),
+ "password": (str, 'password'),
+ "vhost": (str, 'vhost'),
+ "ca_location": (str, 'ca location'),
+ "amqp_exchange": (str, 'amqp exchange'),
+ "amqp_ack_level": (str, 'amqp ack level'),
+ "use_ssl": (bool, 'use ssl'),
+ "kafka_ack_level": (str, 'kafka ack level'),
+ "kafka_brokers": (str, 'kafka brokers'),
+ "mechanism": (str, 'mechanism'),
+ },
+ )
+ def create(
+ self,
+ name: str,
+ daemon_name=None,
+ owner=None,
+ push_endpoint: Optional[str] = None,
+ opaque_data: Optional[str] = None,
+ persistent: Optional[bool] = False,
+ time_to_live: Optional[str] = None,
+ max_retries: Optional[str] = None,
+ retry_sleep_duration: Optional[str] = None,
+ policy: Optional[str] = None,
+ verify_ssl: Optional[bool] = False,
+ cloud_events: Optional[bool] = False,
+ ca_location: Optional[str] = None,
+ amqp_exchange: Optional[str] = None,
+ amqp_ack_level: Optional[str] = None,
+ use_ssl: Optional[bool] = False,
+ kafka_ack_level: Optional[str] = None,
+ kafka_brokers: Optional[str] = None,
+ mechanism: Optional[str] = None
+ ):
+ rgw_topic_instance = RgwClient.instance(owner, daemon_name=daemon_name)
+ return rgw_topic_instance.create_topic(
+ name=name,
+ push_endpoint=push_endpoint,
+ opaque_data=opaque_data,
+ persistent=persistent,
+ time_to_live=time_to_live,
+ max_retries=max_retries,
+ retry_sleep_duration=retry_sleep_duration,
+ policy=policy,
+ verify_ssl=verify_ssl,
+ cloud_events=cloud_events,
+ ca_location=ca_location,
+ amqp_exchange=amqp_exchange,
+ amqp_ack_level=amqp_ack_level,
+ use_ssl=use_ssl,
+ kafka_ack_level=kafka_ack_level,
+ kafka_brokers=kafka_brokers,
+ mechanism=mechanism
+ )
+
+ @EndpointDoc(
+ "Get RGW Topic List",
+ parameters={
+ "uid": (str, "Name of the user"),
+ "tenant": (str, "Name of the tenant"),
+ },
+ )
+ def list(self, uid: Optional[str] = None, tenant: Optional[str] = None):
+ rgw_topic_instance = RgwTopicmanagement()
+ result = rgw_topic_instance.list_topics(uid, tenant)
+ return result
+
+ @EndpointDoc(
+ "Get RGW Topic",
+ parameters={
+ "name": (str, "Name of the user"),
+ "tenant": (str, "Name of the tenant"),
+ },
+ )
+ def get(self, name: str, tenant: Optional[str] = None):
+ rgw_topic_instance = RgwTopicmanagement()
+ result = rgw_topic_instance.get_topic(name, tenant)
+ return result
+
+ @EndpointDoc(
+ "Delete RGW Topic",
+ parameters={
+ "name": (str, "Name of the user"),
+ "tenant": (str, "Name of the tenant"),
+ },
+ )
+ def delete(self, name: str, tenant: Optional[str] = None):
+ rgw_topic_instance = RgwTopicmanagement()
+ result = rgw_topic_instance.delete_topic(name=name, tenant=tenant)
+ return result
- jwt: []
tags:
- RgwSite
+ /api/rgw/topic:
+ get:
+ parameters:
+ - allowEmptyValue: true
+ description: Name of the user
+ in: query
+ name: uid
+ schema:
+ type: string
+ - allowEmptyValue: true
+ description: Name of the tenant
+ in: query
+ name: tenant
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Get RGW Topic List
+ tags:
+ - RGW Topic Management
+ post:
+ parameters: []
+ requestBody:
+ content:
+ application/json:
+ schema:
+ properties:
+ amqp_ack_level:
+ description: amqp ack level
+ type: string
+ amqp_exchange:
+ description: amqp exchange
+ type: string
+ ca_location:
+ description: ca location
+ type: string
+ cloud_events:
+ default: false
+ description: cloud events
+ type: string
+ daemon_name:
+ type: string
+ kafka_ack_level:
+ description: kafka ack level
+ type: string
+ kafka_brokers:
+ description: kafka brokers
+ type: string
+ max_retries:
+ description: max retries
+ type: string
+ mechanism:
+ description: mechanism
+ type: string
+ name:
+ description: Name of the topic
+ type: string
+ opaque_data:
+ description: ' opaque data'
+ type: string
+ owner:
+ type: string
+ persistent:
+ default: false
+ description: persistent
+ type: boolean
+ policy:
+ description: policy
+ type: string
+ push_endpoint:
+ description: Push Endpoint
+ type: string
+ retry_sleep_duration:
+ description: retry sleep duration
+ type: string
+ time_to_live:
+ description: Time to live
+ type: string
+ use_ssl:
+ default: false
+ description: use ssl
+ type: boolean
+ verify_ssl:
+ default: false
+ description: verify ssl
+ type: boolean
+ required:
+ - name
+ type: object
+ responses:
+ '201':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Resource created.
+ '202':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Operation is still executing. Please check the task queue.
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Create a new RGW Topic
+ tags:
+ - RGW Topic Management
+ /api/rgw/topic/{name}:
+ delete:
+ parameters:
+ - description: Name of the user
+ in: path
+ name: name
+ required: true
+ schema:
+ type: string
+ - allowEmptyValue: true
+ description: Name of the tenant
+ in: query
+ name: tenant
+ schema:
+ type: string
+ responses:
+ '202':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Operation is still executing. Please check the task queue.
+ '204':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Resource deleted.
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Delete RGW Topic
+ tags:
+ - RGW Topic Management
+ get:
+ parameters:
+ - description: Name of the user
+ in: path
+ name: name
+ required: true
+ schema:
+ type: string
+ - allowEmptyValue: true
+ description: Name of the tenant
+ in: query
+ name: tenant
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Get RGW Topic
+ tags:
+ - RGW Topic Management
/api/rgw/user:
get:
parameters:
name: PrometheusNotifications
- description: List of RGW roles
name: RGW
+- description: RGW Topic Management API
+ name: RGW Topic Management
- description: RBD Management API
name: Rbd
- description: RBD Mirroring Management API
return None
raise e
+ @RestClient.api_post('?Action=CreateTopic&Name={name}')
+ def create_topic(self, request=None, name: str = '',
+ push_endpoint: Optional[str] = '', opaque_data: Optional[str] = '',
+ persistent: Optional[bool] = False, time_to_live: Optional[str] = '',
+ max_retries: Optional[str] = '', retry_sleep_duration: Optional[str] = '',
+ policy: Optional[str] = '',
+ verify_ssl: Optional[bool] = False, cloud_events: Optional[bool] = False,
+ ca_location: Optional[str] = None, amqp_exchange: Optional[str] = None,
+ amqp_ack_level: Optional[str] = None,
+ use_ssl: Optional[bool] = False, kafka_ack_level: Optional[str] = None,
+ kafka_brokers: Optional[str] = None, mechanism: Optional[str] = None,
+ ):
+ params = {'Name': name}
+
+ if push_endpoint:
+ params['push-endpoint'] = push_endpoint
+ if opaque_data:
+ params['OpaqueData'] = opaque_data
+ if persistent:
+ params['persistent'] = 'true' if persistent else 'false'
+ if time_to_live:
+ params['time_to_live'] = time_to_live
+ if max_retries:
+ params['max_retries'] = max_retries
+ if retry_sleep_duration:
+ params['retry_sleep_duration'] = retry_sleep_duration
+ if policy:
+ params['Policy'] = policy
+ if verify_ssl:
+ params['verify_ssl'] = 'true' if verify_ssl else 'false'
+ if cloud_events:
+ params['cloud_events'] = 'true' if cloud_events else 'false'
+ if ca_location:
+ params['ca_location'] = ca_location
+ if amqp_exchange:
+ params['amqp_exchange'] = amqp_exchange
+ if amqp_ack_level:
+ params['amqp_ack_level'] = amqp_ack_level
+ if use_ssl:
+ params['use_ssl'] = 'true' if use_ssl else 'false'
+ if kafka_ack_level:
+ params['kafka_ack_level'] = kafka_ack_level
+ if kafka_brokers:
+ params['kafka_brokers'] = kafka_brokers
+ if mechanism:
+ params['mechanism'] = mechanism
+ try:
+ result = request(params=params)
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')
+
+ return result
+
class SyncStatus(Enum):
enabled = 'enabled'
return True
except DashboardException:
return False
+
+
+class RgwTopicmanagement:
+ def list_topics(self, uid: Optional[str], tenant: Optional[str]):
+ rgw_topics_list = {}
+ rgw_topic_list_cmd = ['topic', 'list']
+ try:
+ if uid:
+ rgw_topic_list_cmd.append('--uid')
+ rgw_topic_list_cmd.append(uid)
+
+ if tenant:
+ rgw_topic_list_cmd.append('--tenant')
+ rgw_topic_list_cmd.append(tenant)
+
+ exit_code, rgw_topics_list, _ = mgr.send_rgwadmin_command(rgw_topic_list_cmd)
+ if exit_code > 0:
+ raise DashboardException(msg='Unable to fetch topic list',
+ http_status_code=500, component='rgw')
+ return rgw_topics_list
+ except SubprocessError as error:
+ raise DashboardException(error, http_status_code=500, component='rgw')
+
+ def get_topic(self, name: str, tenant: Optional[str]):
+ rgw_topic_info_cmd = ['topic', 'get']
+ try:
+ if tenant:
+ rgw_topic_info_cmd.append('--tenant')
+ rgw_topic_info_cmd.append(tenant)
+
+ if name:
+ rgw_topic_info_cmd.append('--topic')
+ rgw_topic_info_cmd.append(name)
+
+ exit_code, topic_info, _ = mgr.send_rgwadmin_command(rgw_topic_info_cmd)
+ if exit_code > 0:
+ raise DashboardException('Unable to get topic info',
+ http_status_code=500, component='rgw')
+ return topic_info
+ except SubprocessError as error:
+ raise DashboardException(error, http_status_code=500, component='rgw')
+
+ def delete_topic(self, name: str, tenant: Optional[str] = None):
+ rgw_delete_topic_cmd = ['topic', 'rm']
+ try:
+ if tenant:
+ rgw_delete_topic_cmd.extend(['--tenant', tenant])
+
+ if name:
+ rgw_delete_topic_cmd.extend(['--topic', name])
+
+ exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_topic_cmd)
+
+ if exit_code > 0:
+ raise DashboardException(msg='Unable to delete topic',
+ http_status_code=500, component='rgw')
+ except SubprocessError as error:
+ raise DashboardException(error, http_status_code=500, component='rgw')
+
from unittest.mock import Mock, call, patch
from .. import mgr
-from ..controllers.rgw import Rgw, RgwDaemon, RgwUser
+from ..controllers.rgw import Rgw, RgwDaemon, RgwTopic, RgwUser
from ..rest_client import RequestException
from ..services.rgw_client import RgwClient, RgwMultisite
from ..tests import ControllerTestCase, RgwStub
self._get('/test/api/rgw/user/testuser/ratelimit')
self.assertStatus(200)
self.assertJsonBody(mock_return_value)
+
+
+class TestRgwTopicController(ControllerTestCase):
+
+ @classmethod
+ def setup_server(cls):
+ cls.setup_controllers([RgwTopic], '/test')
+
+ @patch('dashboard.services.rgw_client._get_daemons')
+ @patch('dashboard.services.rgw_client.RgwClient', autospec=True)
+ def test_create_topic(self, mock_rgw_client, mock_get_daemons):
+ """
+ Test creating a topic with mock return values.
+ """
+ mock_daemon = {
+ "name": "dummy_daemon",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "ssl": False,
+ "realm_name": "dummy_realm",
+ "zonegroup_name": "dummy_zonegroup",
+ "zonegroup_id": "dummy_zonegroup_id",
+ "zone_name": "dummy_zone"
+ }
+ mock_daemon_dict = {'dummy_daemon': mock_daemon}
+
+ mock_get_daemons.return_value = mock_daemon_dict
+ mock_rgw_client_instance = mock_rgw_client.return_value
+ mock_rgw_client_instance._daemons = mock_daemon_dict # pylint: disable=W0212
+
+ mock_response = {
+ "CreateTopicResult": {
+ "TopicArn": "arn:aws:sns:zg1-realm1::HttpTest"
+ },
+ "ResponseMetadata": {
+ "RequestId": "b13925ff-a04a-4ff5-9578-7c51aa7932df.4926.3389207753149441947"
+ }
+ }
+ with patch('dashboard.controllers.rgw.RgwTopic.create', return_value=mock_response):
+ self._post('/test/api/rgw/topic', {
+ 'name': 'HttpTest',
+ 'owner': 'dashboard',
+ 'opaque_data': 'testopaque',
+ 'persistent': True,
+ 'time_to_live': '10',
+ 'max_retries': '3',
+ 'retry_sleep_duration': '5',
+ 'policy': {}
+ })
+ self.assertStatus(200)
+
+ @patch('dashboard.controllers.rgw.RgwTopic.list')
+ def test_list_topic_with_details(self, mock_list_topics):
+ mock_return_value = [
+ {
+ "topic": {
+ "owner": "dashboard",
+ "name": "HttpTest",
+ "dest": {
+ "push_endpoint": "https://10.0.66.13:443",
+ "push_endpoint_args": "verify_ssl=true",
+ "push_endpoint_topic": "HttpTest",
+ "stored_secret": False,
+ "persistent": True,
+ "persistent_queue": ":HttpTest",
+ "time_to_live": "5",
+ "max_retries": "2",
+ "retry_sleep_duration": "2"
+ },
+ "arn": "arn:aws:sns:zg1-realm1::HttpTest",
+ "opaqueData": "test123",
+ "policy": "{}",
+ "subscribed_buckets": []
+ }
+ }
+ ]
+
+ mock_list_topics.return_value = mock_return_value
+ controller = RgwTopic()
+ result = controller.list(True, None)
+ mock_list_topics.assert_called_with(True, None)
+ self.assertEqual(result, mock_return_value)
+
+ @patch('dashboard.controllers.rgw.RgwTopic.get')
+ def test_get_topic(self, mock_get_topic):
+ mock_return_value = {
+ "topic": {
+ "owner": "dashboard",
+ "name": "HttpTest",
+ "dest": {
+ "push_endpoint": "https://10.0.66.13:443",
+ "push_endpoint_args": "verify_ssl=true",
+ "push_endpoint_topic": "HttpTest",
+ "stored_secret": False,
+ "persistent": True,
+ "persistent_queue": ":HttpTest",
+ "time_to_live": "5",
+ "max_retries": "2",
+ "retry_sleep_duration": "2"
+ },
+ "arn": "arn:aws:sns:zg1-realm1::HttpTest",
+ "opaqueData": "test123",
+ "policy": "{}",
+ "subscribed_buckets": []
+ }
+ }
+ mock_get_topic.return_value = mock_return_value
+
+ controller = RgwTopic()
+ result = controller.get('HttpTest', None)
+ mock_get_topic.assert_called_with('HttpTest', None)
+ self.assertEqual(result, mock_return_value)
+
+ @patch('dashboard.controllers.rgw.RgwTopic.delete')
+ def test_delete_topic(self, mock_delete_topic):
+ mock_delete_topic.return_value = None
+
+ controller = RgwTopic()
+ result = controller.delete(name='HttpTest', tenant=None)
+ mock_delete_topic.assert_called_with(name='HttpTest', tenant=None)
+ self.assertEqual(result, None)