]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: Add RGW CRUD topic in dashboard 61989/head
authorpujaoshahu <pshahu@redhat.com>
Thu, 9 Jan 2025 05:14:43 +0000 (10:44 +0530)
committerpujaoshahu <pshahu@redhat.com>
Wed, 2 Jul 2025 08:57:45 +0000 (14:27 +0530)
Fixes: https://tracker.ceph.com/issues/699654
Signed-off-by: pujaoshahu <pshahu@redhat.com>
18 files changed:
src/pybind/mgr/dashboard/controllers/rgw.py
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-details/rgw-topic-details.component.spec.ts
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-details/rgw-topic-details.component.ts
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.html [new file with mode: 0644]
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.scss [new file with mode: 0644]
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.spec.ts [new file with mode: 0644]
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.ts [new file with mode: 0644]
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-list/rgw-topic-list.component.html
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-list/rgw-topic-list.component.spec.ts
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-list/rgw-topic-list.component.ts
src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw.module.ts
src/pybind/mgr/dashboard/frontend/src/app/shared/api/rgw-topic.service.spec.ts
src/pybind/mgr/dashboard/frontend/src/app/shared/api/rgw-topic.service.ts
src/pybind/mgr/dashboard/frontend/src/app/shared/models/topic.model.ts
src/pybind/mgr/dashboard/frontend/src/app/shared/services/task-message.service.ts
src/pybind/mgr/dashboard/openapi.yaml
src/pybind/mgr/dashboard/services/rgw_client.py
src/pybind/mgr/dashboard/tests/test_rgw.py

index 757a8b89fcbaa0da519b9c4cd3d5f538a78901ad..42da0ecaf7fb113a94d096fde7f026b1edc75c32 100755 (executable)
@@ -51,6 +51,16 @@ RGW_USER_SCHEMA = {
 }
 
 
+def _get_owner(owner):
+    accounts = RgwAccounts().get_accounts()
+
+    # if the owner is present in the accounts list,
+    # then the bucket is owned by an account.
+    # hence we will use dashboard user to fetch the
+    # bucket info
+    return owner if owner not in accounts else RgwServiceManager.user
+
+
 @UIRouter('/rgw', Scope.RGW)
 @APIDoc("RGW Management API", "Rgw")
 class Rgw(BaseController):
@@ -402,15 +412,6 @@ class RgwBucket(RgwRESTController):
                 if bucket['tenant'] else bucket['bucket']
         return bucket
 
-    def _get_owner(self, owner):
-        accounts = RgwAccounts().get_accounts()
-
-        # if the owner is present in the accounts list,
-        # then the bucket is owned by an account.
-        # hence we will use dashboard user to fetch the
-        # bucket info
-        return owner if owner not in accounts else RgwServiceManager.user
-
     def _get_versioning(self, owner, daemon_name, bucket_name):
         rgw_client = RgwClient.instance(owner, daemon_name)
         return rgw_client.get_bucket_versioning(bucket_name)
@@ -548,7 +549,7 @@ class RgwBucket(RgwRESTController):
         bucket_name = RgwBucket.get_s3_bucket_name(result['bucket'],
                                                    result['tenant'])
 
-        owner = self._get_owner(result['owner'])
+        owner = _get_owner(result['owner'])
         # Append the versioning configuration.
         versioning = self._get_versioning(owner, daemon_name, bucket_name)
         encryption = self._get_encryption(bucket_name, daemon_name, owner)
@@ -636,7 +637,7 @@ class RgwBucket(RgwRESTController):
 
         uid_tenant = uid[:uid.find('$')] if uid.find('$') >= 0 else None
         bucket_name = RgwBucket.get_s3_bucket_name(bucket, uid_tenant)
-        uid = self._get_owner(uid)
+        uid = _get_owner(uid)
 
         locking = self._get_locking(uid, daemon_name, bucket_name)
         if versioning_state:
@@ -728,7 +729,7 @@ class RgwBucket(RgwRESTController):
     @allow_empty_body
     def set_lifecycle_policy(self, bucket_name: str = '', lifecycle: str = '', daemon_name=None,
                              owner=None, tenant=None):
-        owner = self._get_owner(owner)
+        owner = _get_owner(owner)
         bucket_name = RgwBucket.get_s3_bucket_name(bucket_name, tenant)
         if lifecycle == '{}':
             return self._delete_lifecycle(bucket_name, daemon_name, owner)
@@ -737,7 +738,7 @@ class RgwBucket(RgwRESTController):
     @RESTController.Collection(method='GET', path='/lifecycle')
     def get_lifecycle_policy(self, bucket_name: str = '', daemon_name=None, owner=None,
                              tenant=None):
-        owner = self._get_owner(owner)
+        owner = _get_owner(owner)
         bucket_name = RgwBucket.get_s3_bucket_name(bucket_name, tenant)
         return self._get_lifecycle(bucket_name, daemon_name, owner)
 
@@ -1433,25 +1434,23 @@ class RgwTopic(RESTController):
         "Create a new RGW Topic",
         parameters={
             "name": (str, "Name of the topic"),
+            "owner": (str, "Name of the owner"),
+            "daemon_name": (str, "Name of the daemon"),
             "push_endpoint": (str, "Push Endpoint"),
-            "opaque_data": (str, " opaque data"),
-            "persistent": (bool, "persistent"),
+            "opaque_data": (str, "OpaqueData"),
+            "persistent": (bool, "Persistent"),
             "time_to_live": (str, "Time to live"),
-            "max_retries": (str, "max retries"),
-            "retry_sleep_duration": (str, "retry sleep duration"),
-            "policy": (str, "policy"),
-            "verify_ssl": (bool, 'verify ssl'),
-            "cloud_events": (str, 'cloud events'),
-            "user": (str, 'user'),
-            "password": (str, 'password'),
-            "vhost": (str, 'vhost'),
-            "ca_location": (str, 'ca location'),
-            "amqp_exchange": (str, 'amqp exchange'),
-            "amqp_ack_level": (str, 'amqp ack level'),
-            "use_ssl": (bool, 'use ssl'),
-            "kafka_ack_level": (str, 'kafka ack level'),
-            "kafka_brokers": (str, 'kafka brokers'),
-            "mechanism": (str, 'mechanism'),
+            "max_retries": (str, "Max retries"),
+            "retry_sleep_duration": (str, "Retry sleep duration"),
+            "policy": (str, "Policy"),
+            "verify_ssl": (bool, 'Verify ssl'),
+            "cloud_events": (str, 'Cloud events'),
+            "ca_location": (str, 'Ca location'),
+            "amqp_exchange": (str, 'Amqp exchange'),
+            "ack_level": (str, 'Amqp ack level'),
+            "use_ssl": (bool, 'Use ssl'),
+            "kafka_brokers": (str, 'Kafka brokers'),
+            "mechanism": (str, 'Mechanism'),
         },
     )
     def create(
@@ -1470,15 +1469,16 @@ class RgwTopic(RESTController):
         cloud_events: Optional[bool] = False,
         ca_location: Optional[str] = None,
         amqp_exchange: Optional[str] = None,
-        amqp_ack_level: Optional[str] = None,
+        ack_level: Optional[str] = None,
         use_ssl: Optional[bool] = False,
-        kafka_ack_level: Optional[str] = None,
         kafka_brokers: Optional[str] = None,
         mechanism: Optional[str] = None
     ):
+        owner = _get_owner(owner)
         rgw_topic_instance = RgwClient.instance(owner, daemon_name=daemon_name)
         return rgw_topic_instance.create_topic(
             name=name,
+            daemon_name=daemon_name,
             push_endpoint=push_endpoint,
             opaque_data=opaque_data,
             persistent=persistent,
@@ -1490,45 +1490,38 @@ class RgwTopic(RESTController):
             cloud_events=cloud_events,
             ca_location=ca_location,
             amqp_exchange=amqp_exchange,
-            amqp_ack_level=amqp_ack_level,
+            ack_level=ack_level,
             use_ssl=use_ssl,
-            kafka_ack_level=kafka_ack_level,
             kafka_brokers=kafka_brokers,
             mechanism=mechanism
         )
 
     @EndpointDoc(
         "Get RGW Topic List",
-        parameters={
-            "uid": (str, "Name of the user"),
-            "tenant": (str, "Name of the tenant"),
-        },
     )
-    def list(self, uid: Optional[str] = None, tenant: Optional[str] = None):
+    def list(self):
         rgw_topic_instance = RgwTopicmanagement()
-        result = rgw_topic_instance.list_topics(uid, tenant)
-        return result['topics'] if 'topics' in result else []
+        result = rgw_topic_instance.list_topics()
+        return result
 
     @EndpointDoc(
         "Get RGW Topic",
         parameters={
-            "name": (str, "Name of the user"),
-            "tenant": (str, "Name of the tenant"),
+            "key": (str, "The metadata object key to retrieve the topic e.g owner:topic_name"),
         },
     )
-    def get(self, name: str, tenant: Optional[str] = None):
+    def get(self, key: str):
         rgw_topic_instance = RgwTopicmanagement()
-        result = rgw_topic_instance.get_topic(name, tenant)
+        result = rgw_topic_instance.get_topic(key)
         return result
 
     @EndpointDoc(
         "Delete RGW Topic",
         parameters={
-            "name": (str, "Name of the user"),
-            "tenant": (str, "Name of the tenant"),
+            "key": (str, "The metadata object key to retrieve the topic e.g topic:topic_name"),
         },
     )
-    def delete(self, name: str, tenant: Optional[str] = None):
+    def delete(self, key: str):
         rgw_topic_instance = RgwTopicmanagement()
-        result = rgw_topic_instance.delete_topic(name=name, tenant=tenant)
+        result = rgw_topic_instance.delete_topic(key=key)
         return result
index 85b1b1b00fa4385a34bdc481b6a8d036d62950ec..78bbed0af9e8de3991803c6dd861935709a4599f 100644 (file)
@@ -1,12 +1,12 @@
 import { ComponentFixture, TestBed } from '@angular/core/testing';
 import { RgwTopicDetailsComponent } from './rgw-topic-details.component';
-import { TopicDetails } from '~/app/shared/models/topic.model';
+import { Topic } from '~/app/shared/models/topic.model';
 
 interface Destination {
   push_endpoint: string;
   push_endpoint_args: string;
   push_endpoint_topic: string;
-  stored_secret: string;
+  stored_secret: boolean;
   persistent: boolean;
   persistent_queue: string;
   time_to_live: number;
@@ -18,7 +18,7 @@ const mockDestination: Destination = {
   push_endpoint: 'http://localhost:8080',
   push_endpoint_args: 'args',
   push_endpoint_topic: 'topic',
-  stored_secret: 'secret',
+  stored_secret: false,
   persistent: true,
   persistent_queue: 'queue',
   time_to_live: 3600,
@@ -45,12 +45,13 @@ describe('RgwTopicDetailsComponent', () => {
   });
 
   it('should parse policy string correctly', () => {
-    const mockSelection: TopicDetails = {
+    const mockSelection: Topic = {
       name: 'testHttp',
       owner: 'ownerName',
       arn: 'arnValue',
       dest: mockDestination,
       policy: '{"key": "value"}',
+      key: 'topic:ownerName:testHttp',
       opaqueData: 'test@12345',
       subscribed_buckets: []
     };
@@ -69,12 +70,13 @@ describe('RgwTopicDetailsComponent', () => {
   });
 
   it('should set policy to empty object if policy is not a string', () => {
-    const mockSelection: TopicDetails = {
+    const mockSelection: Topic = {
       name: 'testHttp',
       owner: 'ownerName',
       arn: 'arnValue',
       dest: mockDestination,
       policy: '{}',
+      key: 'topic:ownerName:testHttp',
       subscribed_buckets: [],
       opaqueData: ''
     };
index 359c927ae4609766e8d88f34e4f0423cd4355d5b..0d2673ad89046db21bbc4134f356b7ea91df350a 100644 (file)
@@ -1,6 +1,6 @@
 import { Component, Input, SimpleChanges, OnChanges } from '@angular/core';
 
-import { TopicDetails } from '~/app/shared/models/topic.model';
+import { Topic } from '~/app/shared/models/topic.model';
 import * as _ from 'lodash';
 
 @Component({
@@ -10,8 +10,8 @@ import * as _ from 'lodash';
 })
 export class RgwTopicDetailsComponent implements OnChanges {
   @Input()
-  selection: TopicDetails;
-  policy: string;
+  selection: Topic;
+  policy: string | object = '{}';
   constructor() {}
   ngOnChanges(changes: SimpleChanges): void {
     if (changes['selection'] && this.selection) {
diff --git a/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.html b/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.html
new file mode 100644 (file)
index 0000000..07f9b2c
--- /dev/null
@@ -0,0 +1,559 @@
+<div cdsCol
+     *cdFormLoading="loading"
+     [columnNumbers]="{ md: 4 }">
+  <form name="topicForm"
+        #formDir="ngForm"
+        [formGroup]="topicForm"
+        novalidate>
+    <div class="form-header">
+      {{ action | titlecase }} {{ resource | upperFirst }}
+      <cd-help-text>
+        <span i18n>
+            Configure the push endpoint parameters to send notifications. On successful creation, you'll receive the topic's unique Amazon Resource Name
+        </span>
+      </cd-help-text>
+    </div>
+
+    <div class="form-item form-item-append"
+         cdsRow>
+      <!-- Topic Type-->
+      <div cdsCol>
+        <cds-select formControlName="endpointType"
+                    label="Type"
+                    i18n-label
+                    cdRequiredField="Type"
+                    id="endpointType"
+                    (change)="onEndpointTypeChange();"
+                    [invalid]="topicForm.controls.endpointType.invalid && topicForm.controls.endpointType.dirty"
+                    [invalidText]="topicTypeError"
+                    helperText="This user will manage and configure the topic’s settings."
+                    i18n-helperText>
+          <option i18n
+                  *ngIf="endpointType === null"
+                  value="null">Loading... </option>
+          <option i18n
+                  *ngIf="endpointType !== null"
+                  value="">-- Select a Topic type --</option>
+          <option *ngFor="let data of endpointType"
+                  i18n>{{ data | upperFirst }} </option>
+        </cds-select>
+        <ng-template #topicTypeError>
+          <span class="invalid-feedback"
+                *ngIf="topicForm.showError('endpointType', formDir, 'required')"
+                i18n>This field is required.</span>
+        </ng-template>
+      </div>
+
+      <!-- owner -->
+      <div cdsCol>
+        <cds-select
+          formControlName="owner"
+          label="User"
+          i18n-label
+          cdRequiredField="Owner"
+          [invalid]="topicForm.controls.owner.invalid && topicForm.controls.owner.dirty"
+          id="owner"
+          helperText="This owner will define and control the topic’s settings"
+          [invalidText]="ownerError"
+          i18n-helperText>
+          <option i18n
+                  *ngIf="owners === null"
+                  value="null">Loading... </option>
+          <option i18n
+                  *ngIf="owners !== null"
+                  value="">-- Select a user --</option>
+          <option *ngFor="let data of owners"
+                  i18n
+                  [value]="data">{{ data }}</option>
+        </cds-select>
+        <ng-template #ownerError>
+          <span class="invalid-feedback"
+                *ngIf="topicForm.showError('owner', formDir, 'required')"
+                i18n>This field is required.</span>
+        </ng-template>
+      </div>
+    </div>
+
+    <ng-container *ngIf="selectedOption">
+      <div class="form-item form-item-append"
+           cdsRow>
+
+        <!-- Topic Name -->
+        <div cdsCol>
+          <cds-text-label
+            labelInputID="name"
+            i18n
+            i18n-helperText
+            cdRequiredField="Name"
+            [invalid]="topicForm.controls.name.invalid && topicForm.controls.name.dirty"
+            [invalidText]="nameError"
+            helperText="Enter a Topic name">Name
+
+          <input cdsText
+                 type="text"
+                 id="name"
+                 autofocus
+                 formControlName="name"
+                 [invalid]="topicForm.controls.name.invalid && topicForm.controls.name.dirty"/>
+          </cds-text-label>
+
+          <ng-template #nameError>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.showError('name', formDir, 'required')"
+                  i18n>This field is required.</span>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.showError('name', formDir, 'notUnique')"
+                  i18n>The name is already in use. Please choose a different one</span>
+          </ng-template>
+        </div>
+      </div>
+      <div class="form-item"
+           cdsRow>
+        <fieldset>
+          <legend i18n
+                  cdsCol
+                  class="cd-header">
+                Generate push endpoint
+            <cd-help-text> Configure the endpoint URL to receive push notifications</cd-help-text>
+          </legend>
+
+          <!-- Enable SSL -->
+          <div cdsCol
+               [columnNumbers]="{sm: 4}">
+            <cds-checkbox id="enable_ssl"
+                          formControlName="enable_ssl"
+                          (checkedChange)="onSecureSSLChange($event)">
+              <ng-container i18n>SSL</ng-container>
+              <cd-help-text i18n>
+                Enabling SSL ensures that your connection is encrypted and secure. You must have a valid SSL certificate installed on your server.
+              </cd-help-text>
+            </cds-checkbox>
+          </div>
+
+          <!-- Verify ssl -->
+          <div cdsCol
+               *ngIf="(selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.HTTP)"
+               [columnNumbers]="{sm: 4}">
+
+            <cds-checkbox id="verify_ssl"
+                          formControlName="verify_ssl">
+              <ng-container i18n>Verify SSL</ng-container>
+              <cd-help-text i18n>Ensures that the server's SSL certificate is valid and trusted.</cd-help-text>
+            </cds-checkbox>
+          </div>
+
+          <!-- Cloud Event-->
+          <div cdsCol
+               *ngIf="(selectedOption == hostProtocols.HTTP)"
+               [columnNumbers]="{sm: 4}">
+            <cds-checkbox id="cloud_events"
+                          formControlName="cloud_events">
+              <ng-container i18n>Cloud events</ng-container>
+              <cd-help-text i18n>Captures cloud events as triggers for notifications.</cd-help-text>
+            </cds-checkbox>
+          </div>
+        </fieldset>
+      </div>
+
+      <div cdsRow
+           class="form-item form-item-append">
+
+        <!-- Fqdn-->
+        <div cdsCol>
+          <cds-text-label
+            labelInputID="fqdn"
+            cdRequiredField="RGW Gateway Hostname"
+            [invalid]="topicForm.controls.fqdn.invalid && topicForm.controls.fqdn.dirty"
+            [invalidText]="fqdnError"
+            helperText="Enter the FQDN to configure the topic's settings and behavior"
+            i18n-helperText
+            i18n>
+           RGW Gateway hostname
+          <input cdsText
+                 type="text"
+                 id="fqdn"
+                 placeholder="e.g., 127.0.0.1 or localhost"
+                 formControlName="fqdn"
+                 [invalid]="topicForm.controls.fqdn.invalid && topicForm.controls.fqdn.dirty"
+                 (change)="generatePushEndpoint()"/>
+          </cds-text-label>
+          <ng-template #fqdnError>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.showError('fqdn', formDir, 'required')">This field is required</span>
+          </ng-template>
+        </div>
+
+        <!-- Port-->
+        <div cdsCol>
+          <cds-number
+            id="Port"
+            formControlName="port"
+            cdRequiredField="Port"
+            label="Port"
+            i18n-label
+            [min]="1"
+            [invalid]="topicForm.controls.port.invalid && topicForm.controls.port.dirty"
+            [invalidText]="portError"
+            (change)="generatePushEndpoint()"
+            helperText="Enter the port number for the push endpoint"
+            i18n-helperText
+          ></cds-number>
+          <ng-template #portError>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.showError('port', formDir, 'required')"
+                  i18n>This field is required</span>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.controls.port.hasError('pattern') && topicForm.controls.port.touched"
+                  i18n> Port must be a valid integer</span>
+          </ng-template>
+        </div>
+      </div>
+
+      <div cdsRow
+           *ngIf="(selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA)"
+           class="form-item form-item-append">
+
+        <!-- User-->
+        <div cdsCol>
+          <cds-text-label labelInputID="User"
+                          i18n
+                          helperText="Enter the user for the push endpoint"
+                          i18n-helperText>
+            User
+            <input cdsText
+                   type="text"
+                   id="user"
+                   formControlName="user"
+                   (change)="generatePushEndpoint()"/>
+          </cds-text-label>
+        </div>
+
+        <!-- password-->
+        <div cdsCol
+             *ngIf="(selectedOption !==hostProtocols.HTTP)">
+          <cds-password-label labelInputID="password"
+                              [invalid]="!topicForm.controls.password.valid && topicForm.controls.password.dirty"
+                              [invalidText]="passwordError"
+                              i18n
+                              helperText="Enter the password for the push endpoint"
+                              i18n-helperText
+                              >
+            Password
+            <input cdsPassword
+                   type="password"
+                   autocomplete="new-password"
+                   formControlName="password"
+                   (change)="generatePushEndpoint()"/>
+          </cds-password-label>
+          <ng-template #passwordError>
+            <span class="invalid-feedback"
+                  *ngIf="userForm.showError('password', formDir, 'required')"
+                  i18n>This field is required</span>
+            <span class="invalid-feedback"
+                  *ngIf="userForm.showError('password', formDir, 'passwordPolicy')">
+              {{ passwordValuation }}
+            </span>
+          </ng-template>
+        </div>
+      </div>
+
+      <!--Vhost-->
+      <div cdsRow
+           class="form-item form-item-append"
+           *ngIf="(selectedOption === hostProtocols.AMQP)">
+        <div cdsCol>
+          <cds-text-label labelInputID="Virtual Host(vhost)"
+                          i18n
+                          helperText="Enter the vhost for the push endpoint"
+                          i18n-helperText>
+            Virtual Host(vhost)
+          <input cdsText
+                 type="text"
+                 id="vhost"
+                 (change)="generatePushEndpoint()"
+                 formControlName="vhost" />
+          </cds-text-label>
+        </div>
+      </div>
+
+      <div cdsRow
+           class="form-item form-item-append">
+
+        <!--push_endpoint -->
+        <div cdsCol>
+          <cds-text-label
+            labelInputID="push_endpoint"
+            i18n
+            cdRequiredField="Push endpoint"
+            [invalid]="topicForm.controls.push_endpoint.invalid && topicForm.controls.push_endpoint.dirty"
+            [invalidText]="endpointError"
+            helperText="Specify the endpoint URL for receiving push notifications"
+            i18n-helperText>
+            Push endpoint
+            <input
+              cdsText
+              type="text"
+              [placeholder]="pushEndpointPlaceholder"
+              i18n-placeholder
+              id="push_endpoint"
+              formControlName="push_endpoint"
+              [invalid]="topicForm.controls.push_endpoint.invalid && topicForm.controls.push_endpoint.dirty"
+            />
+          </cds-text-label>
+          <ng-template #endpointError>
+            <span class="invalid-feedback"
+                  *ngIf="topicForm.showError('push_endpoint', formDir, 'required')"
+                  i18n>This field is required.</span>
+          </ng-template>
+        </div>
+      </div>
+      <ng-container *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+        <div class="form-item">
+        <fieldset>
+          <legend i18n
+                  class="cd-header">
+            {{ selectedOption === 'AMQP' ? 'AMQP attributes' : 'KAFKA attributes' }}
+            <cd-help-text> {{ attributeHelpText }}</cd-help-text>
+          </legend>
+        </fieldset>
+          <ng-container *ngIf="selectedOption === 'KAFKA'">
+            <div cdsRow
+                 class="form-item-append">
+
+              <div cdsCol
+                   *ngIf="selectedOption === 'KAFKA'">
+                <cds-checkbox id="use-ssl"
+                              formControlName="use_ssl">
+                  <ng-container i18n>Use SSL</ng-container>
+                  <cd-help-text i18n>
+                    Enabling SSL encrypts communication between your Kafka client and broker, ensuring the confidentiality and integrity of your messages
+                  </cd-help-text>
+                </cds-checkbox>
+              </div>
+            </div>
+          </ng-container>
+        </div>
+
+        <div class="form-item form-item-append"
+             cdsRow>
+
+        <!---CA location--->
+          <div cdsCol
+               *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+            <cds-text-label labelInputID="Ca-location"
+                            i18n
+                            helperText="The file path of the CA certificate used to verify the server"
+                            i18n-helperText>
+              CA location
+              <input cdsText
+                     type="text"
+                     id="ca_location"
+                     formControlName="ca_location"/>
+            </cds-text-label>
+          </div>
+          <div cdsCol
+               *ngIf="selectedOption === hostProtocols.AMQP">
+            <cds-text-label labelInputID="AMQP exchange"
+                            i18n
+                            helperText="Name of the AMQP exchange to publish messages to; must exist on the broker"
+                            i18n-helperText>
+              AMQP exchange
+              <input cdsText
+                     type="text"
+                     id="amqp_exchange"
+                     formControlName="amqp_exchange"/>
+            </cds-text-label>
+          </div>
+
+          <!-- mechanism-->
+          <div cdsCol
+               *ngIf="selectedOption ===hostProtocols.KAFKA">
+            <cds-select formControlName="mechanism"
+                        label="Mechanism"
+                        i18n-label
+                        id="mechanism"
+                        helperText="Select the authentication mechanism to connect to the Kafka broker"
+                        i18n-helperText>
+              <option i18n
+                      value="">-- Select a KAFKA mechanism --</option>
+              <option *ngFor="let data of kafkaMechanism"
+                      i18n>{{ data }}</option>
+            </cds-select>
+          </div>
+        </div>
+        <div class="form-item form-item-append"
+             cdsRow>
+
+          <!-- Ack level -->
+          <div cdsCol
+               *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+            <cds-select formControlName="ack_level"
+                        label="Ack level"
+                        i18n-label
+                        id="ack_level"
+                        helperText="Select the acknowledgment level to control message delivery guarantees between client and broker"
+                        i18n-helperText>
+              <option i18n
+                      value="">-- Select the {{ selectedOption }} ack level --</option>
+              <option *ngFor="let level of ackLevels"
+                      [value]="level"
+                      i18n>{{ level }}</option>
+            </cds-select>
+          </div>
+
+          <!-- kafka-brokers-->
+          <div cdsCol
+               *ngIf="selectedOption === hostProtocols.KAFKA">
+            <cds-text-label labelInputID="kafka_brokers"
+                            i18n
+                            helperText="Specify the address of the Kafka broker (e.g., host:9092) "
+                            i18n-helperText>
+              Kafka brokers
+              <input cdsText
+                     type="text"
+                     id="kafka_brokers"
+                     formControlName="kafka_brokers"/>
+            </cds-text-label>
+          </div>
+        </div>
+      </ng-container>
+      <ng-container>
+        <div cdsRow
+             class="form-item form-item-append">
+          <legend i18n
+                  cdsCol
+                  class="cd-header">
+            Additional common attributes
+            <cd-help-text>Configure additional attributes to customize the topic's behavior and settings</cd-help-text>
+          </legend>
+
+          <!-- Persistent-->
+          <div cdsCol
+               [columnNumbers]="{sm: 4}">
+            <cds-checkbox id="persistent"
+                          formControlName="persistent">
+              <ng-container i18n>Persistent</ng-container>
+              <cd-help-text i18n> Select the checkbox to ensure notifications are retried.</cd-help-text>
+            </cds-checkbox>
+          </div>
+        </div>
+        <div cdsRow
+             class="form-item form-item-append">
+
+          <!-- Opaque data-->
+          <div cdsCol>
+            <cds-text-label labelInputID="opaque_data"
+                            i18n
+                            i18n-helperText
+                            helperText="A user-defined metadata added to all notifications that are triggered by the topic.">
+              Opaque data
+              <input cdsText
+                     type="text"
+                     id="opaqueData"
+                     formControlName="OpaqueData"/>
+            </cds-text-label>
+          </div>
+
+          <!-- Time to live-->
+          <div cdsCol>
+            <cds-number id="time_to_live"
+                        formControlName="time_to_live"
+                        label="Time to live"
+                        i18n-label
+                        [min]="1"
+                        helperText="Time limit (in seconds) for retaining notifications"
+                        i18n-helperText>Time to live </cds-number>
+          </div>
+
+          <!-- Max retries-->
+          <div cdsCol>
+            <cds-number id="max_retries"
+                        label="Max retries"
+                        formControlName="max_retries"
+                        i18n-label
+                        [min]="1"
+                        helperText="Max retries before expiring notifications"
+                        i18n-helperText> Max retries </cds-number>
+          </div>
+
+          <!-- Retry sleep duration-->
+          <div cdsCol>
+            <cds-number
+              id="retry_sleep_duration"
+              label="Retry sleep duration"
+              formControlName="retry_sleep_duration"
+              i18n-label
+              [min]="1"
+              helperText="Controls the frequency of retrying the notifications"
+              i18n-helperText
+            >
+              Retry sleep duration
+            </cds-number>
+          </div>
+        </div>
+
+        <div cdsRow
+             class="form-item form-item-append">
+
+          <!-- policy-->
+          <div cdsCol>
+            <cds-textarea-label i18n>
+              <span> {{'Policy'}}</span>
+              <textarea cdsTextArea
+                        #topicPolicyTextArea
+                        row="4"
+                        cols="200"
+                        formControlName="policy"
+                        aria-label="textarea"
+                        (change)="textAreaOnChange('topicPolicyTextArea')"></textarea>
+            </cds-textarea-label>
+            <cd-help-text i18n>JSON formatted document</cd-help-text>
+            <div cdsRow>
+              <div cdsCol>
+                <cds-button-set class="mt-1">
+                  <button cdsButton="tertiary"
+                          id="example-generator-button"
+                          (click)="openUrl('https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html?icmpid=docs_amazons3_console')"
+                          i18n>
+                    Policy examples
+                    <svg cdsIcon="launch"
+                         size="16"
+                         class="cds--btn__icon"></svg>
+                  </button>
+                  <button cdsButton="tertiary"
+                          id="example-generator-button"
+                          (click)="openUrl('https://awspolicygen.s3.amazonaws.com/policygen.html')"
+                          i18n>
+                    Policy generator
+                    <svg cdsIcon="launch"
+                         size="16"
+                         class="cds--btn__icon"></svg>
+                  </button>
+                </cds-button-set>
+              </div>
+              <div cdsCol>
+                <cds-button-set class="float-end mt-1">
+                  <button cdsButton="tertiary"
+                          id="clear-bucket-policy"
+                          (click)="clearTextArea('policy', '{}')"
+                          i18n>
+                    Clear
+                    <svg cdsIcon="close"
+                         size="32"
+                         class="cds--btn__icon"></svg>
+                  </button>
+                </cds-button-set>
+              </div>
+            </div>
+          </div>
+        </div>
+      </ng-container>
+    </ng-container>
+    <div class="card-footer">
+      <cd-form-button-panel (submitActionEvent)="submitAction()"
+                            [form]="topicForm"
+                            [submitText]="(action | titlecase) + ' ' + (resource | upperFirst)"
+                            wrappingClass="text-right"></cd-form-button-panel>
+    </div>
+  </form>
+  </div>
diff --git a/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.scss b/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.scss
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.spec.ts b/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.spec.ts
new file mode 100644 (file)
index 0000000..dd6ff2f
--- /dev/null
@@ -0,0 +1,159 @@
+import { ComponentFixture, TestBed } from '@angular/core/testing';
+import { RgwTopicFormComponent } from './rgw-topic-form.component';
+import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
+import { NotificationService } from '~/app/shared/services/notification.service';
+import { RouterTestingModule } from '@angular/router/testing';
+import { ReactiveFormsModule } from '@angular/forms';
+import { HttpClientTestingModule } from '@angular/common/http/testing';
+import { ActionLabelsI18n } from '~/app/shared/constants/app.constants';
+import { TextAreaJsonFormatterService } from '~/app/shared/services/text-area-json-formatter.service';
+import { SharedModule } from '~/app/shared/shared.module';
+import { ToastrModule } from 'ngx-toastr';
+import { GridModule, InputModule, SelectModule } from 'carbon-components-angular';
+import { NO_ERRORS_SCHEMA } from '@angular/compiler';
+
+describe('RgwTopicFormComponent', () => {
+  let component: RgwTopicFormComponent;
+  let fixture: ComponentFixture<RgwTopicFormComponent>;
+  let textAreaJsonFormatterService: TextAreaJsonFormatterService;
+
+  const mockNotificationService = {
+    show: jest.fn()
+  };
+  const mockRouter = {
+    navigate: jest.fn(),
+    url: '/rgw/topic/create'
+  };
+
+  const mockActivatedRoute = {
+    snapshot: {
+      paramMap: {
+        get: jest.fn()
+      }
+    }
+  };
+  beforeEach(async () => {
+    await TestBed.configureTestingModule({
+      declarations: [RgwTopicFormComponent],
+      imports: [
+        ReactiveFormsModule,
+        RouterTestingModule,
+        HttpClientTestingModule,
+        SharedModule,
+        ToastrModule.forRoot(),
+        SelectModule,
+        GridModule,
+        InputModule
+      ],
+      providers: [
+        RgwTopicService,
+        TextAreaJsonFormatterService,
+        {
+          provide: ActionLabelsI18n,
+          useValue: { CREATE: 'Create', EDIT: 'Edit', Delete: 'Delete' }
+        },
+        {
+          provide: NotificationService,
+          useValue: mockNotificationService
+        },
+        { provide: 'Router', useValue: mockRouter },
+        { provide: 'ActivatedRoute', useValue: mockActivatedRoute }
+      ],
+      schemas: [NO_ERRORS_SCHEMA]
+    }).compileComponents();
+  });
+
+  beforeEach(() => {
+    fixture = TestBed.createComponent(RgwTopicFormComponent);
+    component = fixture.componentInstance;
+    textAreaJsonFormatterService = TestBed.inject(TextAreaJsonFormatterService);
+    fixture.detectChanges();
+  });
+
+  it('should create the component', () => {
+    expect(component).toBeTruthy();
+  });
+
+  it('should initialize the form correctly', () => {
+    const form = component.topicForm;
+    expect(form).toBeDefined();
+    expect(form.get('name')).toBeDefined();
+    expect(form.get('owner')).toBeDefined();
+    expect(form.get('push_endpoint')).toBeDefined();
+  });
+
+  it('should generate push endpoint for HTTP', () => {
+    component.topicForm.setValue({
+      owner: 'dashboard',
+      name: 'Testtopic',
+      OpaqueData: 'test@123',
+      endpointType: 'HTTP',
+      fqdn: 'localhost',
+      port: '80',
+      push_endpoint: 'http://localhost:80',
+      verify_ssl: false,
+      persistent: 'true',
+      max_retries: '3',
+      time_to_live: '100',
+      retry_sleep_duration: '10',
+      policy: '{}',
+      cloud_events: 'true',
+      user: '',
+      password: '',
+      vhost: '',
+      ca_location: '',
+      amqp_exchange: '',
+      ack_level: '',
+      use_ssl: false,
+      kafka_brokers: '',
+      mechanism: '',
+      enable_ssl: false
+    });
+    component.generatePushEndpoint(false);
+    expect(component.topicForm.get('push_endpoint')?.value).toBe('http://localhost:80');
+  });
+
+  it('should format JSON in the policy field on text area change', () => {
+    const formatSpy = jest.spyOn(textAreaJsonFormatterService, 'format');
+    const textArea: any = { nativeElement: { value: '{"key": "value"}' } };
+    component.textAreaOnChange(textArea);
+    expect(formatSpy).toHaveBeenCalledWith(textArea);
+  });
+
+  it('should generate HTTP push endpoint', () => {
+    component.selectedOption = 'HTTP';
+    component.topicForm.patchValue({
+      fqdn: 'example.com',
+      port: '8080',
+      enable_ssl: false
+    });
+
+    component.generatePushEndpoint();
+    expect(component.topicForm.get('push_endpoint')?.value).toBe('http://example.com:8080');
+  });
+
+  it('should generate AMQP push endpoint with auth', () => {
+    component.selectedOption = 'AMQP';
+    component.topicForm.patchValue({
+      user: 'guest',
+      password: 'guest',
+      fqdn: 'mq.example.com',
+      port: '5672',
+      vhost: '/'
+    });
+
+    component.generatePushEndpoint();
+    expect(component.topicForm.get('push_endpoint')?.value).toBe(
+      'amqps://guest:guest@mq.example.com:5672/'
+    );
+  });
+
+  it('should disable verify_ssl and use_ssl when enable_ssl is false', () => {
+    component.topicForm.patchValue({ enable_ssl: false });
+    component.topicForm.get('enable_ssl')?.setValue(false);
+    fixture.detectChanges();
+
+    expect(component.topicForm.get('verify_ssl')?.value).toBe(false);
+    expect(component.topicForm.get('use_ssl')?.value).toBe(false);
+  });
+});
diff --git a/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.ts b/src/pybind/mgr/dashboard/frontend/src/app/ceph/rgw/rgw-topic-form/rgw-topic-form.component.ts
new file mode 100644 (file)
index 0000000..6d08a6b
--- /dev/null
@@ -0,0 +1,480 @@
+import { AfterViewChecked, Component, ElementRef, OnInit, ViewChild } from '@angular/core';
+import { ActionLabelsI18n, URLVerbs } from '~/app/shared/constants/app.constants';
+import { CdForm } from '~/app/shared/forms/cd-form';
+import { CdFormGroup } from '~/app/shared/forms/cd-form-group';
+import { UntypedFormControl, Validators } from '@angular/forms';
+import * as _ from 'lodash';
+import { TextAreaJsonFormatterService } from '~/app/shared/services/text-area-json-formatter.service';
+import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
+import { NotificationType } from '~/app/shared/enum/notification-type.enum';
+import { ActivatedRoute, Router } from '@angular/router';
+import { NotificationService } from '~/app/shared/services/notification.service';
+
+import { RgwUserService } from '~/app/shared/api/rgw-user.service';
+import { CdValidators } from '~/app/shared/forms/cd-validators';
+import {
+  AMQP_ACK_LEVEL,
+  TopicRequest,
+  END_POINT_TYPE,
+  KAFKA_ACK_LEVEL,
+  KAFKA_MECHANISM,
+  Topic,
+  URLPort,
+  HostURLProtocol,
+  URL_FORMAT_PLACEHOLDERS,
+  UrlProtocol,
+  Endpoint
+} from '~/app/shared/models/topic.model';
+
+const BASE_URL = 'rgw/topic';
+@Component({
+  selector: 'cd-rgw-topic-form',
+  templateUrl: './rgw-topic-form.component.html',
+  styleUrls: ['./rgw-topic-form.component.scss']
+})
+export class RgwTopicFormComponent extends CdForm implements OnInit, AfterViewChecked {
+  @ViewChild('topicPolicyTextArea')
+  public topicPolicyTextArea: ElementRef<any>;
+  topicForm: CdFormGroup;
+  action: string;
+  resource: string;
+  endpointType: string[] = [];
+  ackLevels: string[] = [];
+  selectedOption: string;
+  port: string;
+  owners: string[];
+  vhost: string;
+  kafkaMechanism: string[] = [];
+  editing: boolean = false;
+  topicId: string;
+  hostProtocols: typeof Endpoint = Endpoint;
+  key: string = '';
+  protocolPlaceholders: Record<string, string> = {
+    HTTP: URL_FORMAT_PLACEHOLDERS.http,
+    AMQP: URL_FORMAT_PLACEHOLDERS.amqp,
+    KAFKA: URL_FORMAT_PLACEHOLDERS.kafka
+  };
+
+  constructor(
+    public actionLabels: ActionLabelsI18n,
+    private textAreaJsonFormatterService: TextAreaJsonFormatterService,
+    public rgwTopicService: RgwTopicService,
+    private rgwUserService: RgwUserService,
+    public notificationService: NotificationService,
+    private router: Router,
+    private route: ActivatedRoute
+  ) {
+    super();
+    this.editing = this.router.url.startsWith(`/rgw/topic/${URLVerbs.EDIT}`);
+    this.action = this.editing ? this.actionLabels.EDIT : this.actionLabels.CREATE;
+    this.resource = $localize`topic`;
+  }
+
+  ngAfterViewChecked(): void {
+    this.textAreaOnChange(this.topicPolicyTextArea);
+  }
+
+  ngOnInit(): void {
+    this.endpointType = Object.values(END_POINT_TYPE);
+    this.createForm();
+    this.rgwUserService.enumerate().subscribe((data: string[]) => {
+      this.owners = data.sort();
+      if (this.editing) {
+        this.topicId = this.route.snapshot.paramMap.get('name');
+        this.topicId = decodeURIComponent(this.topicId);
+        this.loadTopicData(this.topicId);
+      } else {
+        this.loadingReady();
+      }
+    });
+
+    this.topicForm.get('user')?.valueChanges.subscribe(() => this.setMechanism());
+    this.topicForm.get('password')?.valueChanges.subscribe(() => this.setMechanism());
+    this.topicForm.get('endpointType')?.valueChanges.subscribe((option: string) => {
+      this.ackLevels = Object.values(option === Endpoint.AMQP ? AMQP_ACK_LEVEL : KAFKA_ACK_LEVEL);
+    });
+    this.kafkaMechanism = Object.values(KAFKA_MECHANISM);
+
+    this.setMechanism();
+
+    this.topicForm.get('enable_ssl')!.valueChanges.subscribe((enabled: boolean) => {
+      const verifySSLControl = this.topicForm.get('verify_ssl');
+      const useSSLControl = this.topicForm.get('use_ssl');
+      if (enabled) {
+        verifySSLControl!.enable();
+        useSSLControl!.enable();
+      } else {
+        verifySSLControl!.disable();
+        useSSLControl!.disable();
+        useSSLControl!.setValue(false);
+        verifySSLControl!.setValue(false);
+      }
+    });
+  }
+
+  loadTopicData(topicId: string) {
+    this.rgwTopicService.getTopic(topicId).subscribe((topic: Topic) => {
+      this.topicForm.get('name')?.disable();
+      let url = topic.dest.push_endpoint;
+      let hostname = url.split('://')[0];
+      let endpointType: string;
+      if (hostname === HostURLProtocol.amqp || hostname === HostURLProtocol.amqps) {
+        endpointType = Endpoint.AMQP;
+      } else if (hostname === HostURLProtocol.https || hostname === HostURLProtocol.http) {
+        endpointType = Endpoint.HTTP;
+      } else {
+        endpointType = Endpoint.KAFKA;
+      }
+      this.selectedOption = endpointType;
+      this.getPushUrlQueryValues(topic);
+      this.topicForm.get('owner').disable();
+      this.loadingReady();
+    });
+  }
+
+  get pushEndpointPlaceholder(): string {
+    return (
+      this.protocolPlaceholders[this.topicForm.get('endpointType')?.value] ||
+      $localize`Enter endpoint URL`
+    );
+  }
+
+  createForm() {
+    this.topicForm = new CdFormGroup({
+      owner: new UntypedFormControl('', { validators: [Validators.required] }),
+      name: new UntypedFormControl(
+        '',
+        [Validators.required],
+        CdValidators.unique(this.rgwTopicService.exists, this.rgwTopicService)
+      ),
+      push_endpoint: new UntypedFormControl(
+        { value: '', disabled: true },
+        { validators: [Validators.required] }
+      ),
+      OpaqueData: new UntypedFormControl(''),
+      persistent: new UntypedFormControl('0'),
+      max_retries: new UntypedFormControl('0'),
+      time_to_live: new UntypedFormControl('0'),
+      retry_sleep_duration: new UntypedFormControl('0'),
+      policy: new UntypedFormControl('{}', CdValidators.json()),
+      endpointType: new UntypedFormControl('', { validators: [Validators.required] }),
+      port: new UntypedFormControl('', {
+        validators: [Validators.required, Validators.pattern('^[0-9]+$')]
+      }),
+      verify_ssl: new UntypedFormControl(true),
+      enable_ssl: new UntypedFormControl(true),
+      cloud_events: new UntypedFormControl(),
+      user: new UntypedFormControl(),
+      password: new UntypedFormControl(),
+      vhost: new UntypedFormControl(),
+      ca_location: new UntypedFormControl(),
+      amqp_exchange: new UntypedFormControl(),
+      ack_level: new UntypedFormControl(),
+      use_ssl: new UntypedFormControl(false),
+      kafka_brokers: new UntypedFormControl(),
+      mechanism: new UntypedFormControl(),
+      fqdn: new UntypedFormControl('', { validators: [Validators.required] })
+    });
+  }
+
+  onEndpointTypeChange() {
+    this.selectedOption = this.topicForm.get('endpointType').value;
+    const secureSslChecked = this.topicForm.get('enable_ssl')?.value;
+    this.vhost = '/';
+    this.setDefaultPort(secureSslChecked, this.selectedOption);
+    this.generatePushEndpoint(secureSslChecked);
+    this.reset();
+  }
+
+  setDefaultPort(enableSSL: boolean, selectedValue: string) {
+    this.port = this.getPort(selectedValue as HostURLProtocol, enableSSL).toString();
+    this.topicForm.patchValue({ port: this.port });
+  }
+
+  onSecureSSLChange(event: any) {
+    const ssl = !!event;
+    this.port = this.getPort(this.selectedOption as HostURLProtocol, ssl).toString();
+    this.topicForm.patchValue({ port: this.port });
+    this.generatePushEndpoint(ssl);
+  }
+
+  private getPort(protocol: HostURLProtocol, ssl: boolean): number {
+    const map = {
+      [Endpoint.HTTP]: [URLPort.HTTP, URLPort.HTTPS],
+      [Endpoint.AMQP]: [URLPort.AMQP, URLPort.AMQPS],
+      [Endpoint.KAFKA]: [URLPort.KAFKA, URLPort.KAFKA_SSL]
+    };
+    return ssl ? map[protocol][1] : map[protocol][0];
+  }
+
+  textAreaOnChange(textArea: ElementRef<any>) {
+    this.textAreaJsonFormatterService.format(textArea);
+  }
+  setMechanism(): void {
+    const user = this.topicForm.get('user')?.value;
+    const password = this.topicForm.get('password')?.value;
+    const mechanismControl = this.topicForm.get('mechanism');
+    let defaultMechanism = '';
+    if (user && password) {
+      defaultMechanism = KAFKA_MECHANISM.PLAIN;
+    }
+    mechanismControl?.setValue(defaultMechanism);
+  }
+
+  generatePushEndpoint(secureSsl?: boolean) {
+    if (!this.selectedOption) {
+      return;
+    }
+    let generatedEndpoint = '';
+    const ssl = secureSsl !== undefined ? secureSsl : this.topicForm.get('enable_ssl')?.value;
+    const fqdn = this.topicForm.get('fqdn')?.value || '<fqdn>';
+    const port = this.topicForm.get('port')?.value || '[:port]';
+    switch (this.selectedOption) {
+      case Endpoint.HTTP:
+        generatedEndpoint = `http${ssl ? 's' : ''}://${fqdn}:${port}`;
+        break;
+      case Endpoint.AMQP:
+        generatedEndpoint = this.generateAMQPEndpoint(port, fqdn, ssl ? 's' : '');
+
+        break;
+      case Endpoint.KAFKA:
+        generatedEndpoint = this.generateKafkaEndpoint(fqdn, port);
+        break;
+      default:
+        generatedEndpoint = '';
+    }
+    if (generatedEndpoint) {
+      this.topicForm.patchValue({ push_endpoint: generatedEndpoint });
+    }
+  }
+
+  generateAMQPEndpoint(port: string, fqdn: string, ssl: string): string {
+    let generatedEndpoint;
+    const userAmqp = this.topicForm.get('user')?.value;
+    const passwordAmqp = this.topicForm.get('password')?.value;
+    const vhostAmqp = this.topicForm.get('vhost')?.value || '/';
+    generatedEndpoint = `amqp${ssl ? 's' : ''}://${fqdn}:${port}${vhostAmqp}`;
+    if (userAmqp && passwordAmqp) {
+      generatedEndpoint = `amqp${
+        ssl ? 's' : ''
+      }://${userAmqp}:${passwordAmqp}@${fqdn}:${port}${vhostAmqp}`;
+    }
+    return generatedEndpoint;
+  }
+
+  generateKafkaEndpoint(fqdn: string, port: string): string {
+    let generatedEndpoint;
+    const kafkaProtocol = HostURLProtocol.kafka;
+    const userKafka = this.topicForm.get('user')?.value;
+    const passwordKafka = this.topicForm.get('password')?.value;
+    const kafkaBrokers = this.topicForm.get('kafka_brokers')?.value;
+    generatedEndpoint = `${kafkaProtocol}://${fqdn}:${port}`;
+    if (userKafka && passwordKafka) {
+      generatedEndpoint = `${kafkaProtocol}://${userKafka}:${passwordKafka}@${fqdn}:${port}`;
+    } else if (kafkaBrokers) {
+      generatedEndpoint = `kafka://${kafkaBrokers}`;
+    } else {
+      generatedEndpoint = `kafka://${fqdn}:${port}`;
+    }
+
+    return generatedEndpoint;
+  }
+
+  getTopicPolicy() {
+    return this.topicForm.getValue('policy') || '{}';
+  }
+
+  getPushUrlQueryValues(topic: Topic) {
+    let url = topic.dest.push_endpoint;
+    let pushEndpointUrl = this.convertUrlToObject(url);
+    let pushendpointArg = topic.dest.push_endpoint_args;
+    const pushendpointAddarg = this.extractAdditionalValues(pushendpointArg);
+    const protocol = pushEndpointUrl.protocol?.toLowerCase();
+    switch (protocol) {
+      case UrlProtocol.AMQP:
+      case UrlProtocol.AMQPS:
+        this.selectedOption = Endpoint.AMQP;
+        break;
+
+      case UrlProtocol.HTTP:
+      case UrlProtocol.HTTPS:
+        this.selectedOption = Endpoint.HTTP;
+        break;
+
+      default:
+        this.selectedOption = Endpoint.KAFKA;
+        break;
+    }
+
+    const defaults: typeof this.topicForm.value = _.clone(this.topicForm.value);
+    const keys = Object.keys(this.topicForm.value) as (keyof typeof topic)[];
+    let value: Pick<typeof topic, typeof keys[number]> = _.pick(topic, keys);
+
+    value = _.merge(defaults, value);
+    if (!this.owners.includes(value['owner'])) {
+      this.owners.push(value['owner']);
+      this.topicForm.get('owner').disable();
+    }
+    this.topicForm.patchValue({ endpointType: this.selectedOption });
+    this.topicForm.patchValue({
+      name: topic.name,
+      owner: topic.owner,
+      push_endpoint: topic.dest.push_endpoint,
+      OpaqueData: topic.opaqueData,
+      persistent: topic.dest.persistent,
+      max_retries: topic.dest.max_retries,
+      time_to_live: topic.dest.time_to_live,
+      retry_sleep_duration: topic.dest.retry_sleep_duration,
+      policy: topic.policy,
+      port: pushEndpointUrl.port,
+      fqdn: pushEndpointUrl.hostname,
+      vhost: pushEndpointUrl.pathname,
+      user: pushEndpointUrl.username,
+      password: pushEndpointUrl.password,
+      ca_location: pushendpointAddarg.ca_location,
+      mechanism: pushendpointAddarg.mechanism,
+      enable_ssl:
+        pushEndpointUrl.protocol === UrlProtocol.HTTPS ||
+        pushEndpointUrl.protocol == UrlProtocol.AMQPS ||
+        pushEndpointUrl.protocol === UrlProtocol.KAFKA
+          ? true
+          : false,
+      verify_ssl: pushendpointAddarg.verify_ssl,
+      cloud_events: pushendpointAddarg.cloud_events,
+      amqp_exchange: pushendpointAddarg.amqp_exchange,
+      ack_level: pushendpointAddarg.ack_level,
+      use_ssl: pushendpointAddarg.use_ssl,
+      kafka_brokers: pushendpointAddarg.kafka_brokers
+    });
+  }
+
+  convertUrlToObject(url: string) {
+    const urlObj = new URL(url);
+
+    return {
+      protocol: urlObj.protocol,
+      hostname: urlObj.hostname,
+      pathname: urlObj.pathname,
+      hash: urlObj.hash,
+      port: this.getPortFromUrl(url),
+      username: urlObj.username,
+      password: urlObj.password
+    };
+  }
+
+  getPortFromUrl(url: string): string {
+    const urlObj = new URL(url);
+    let port = urlObj.port;
+    if (!port) {
+      port =
+        urlObj.protocol === UrlProtocol.HTTPS
+          ? URLPort.HTTPS
+          : urlObj.protocol === UrlProtocol.HTTP
+          ? URLPort.HTTP
+          : '';
+    }
+    return port;
+  }
+
+  extractAdditionalValues(str: string): { [key: string]: string } {
+    let obj: { [key: string]: string } = {};
+    let pairs = str.split('&');
+    pairs.forEach((pair) => {
+      let [key, value] = pair.split('=');
+      if (key && value) {
+        obj[key] = value;
+      }
+    });
+    return obj;
+  }
+
+  openUrl(url: string) {
+    window.open(url, '_blank');
+  }
+
+  submitAction() {
+    if (this.topicForm.invalid || this.topicForm.pending) {
+      return this.topicForm.setErrors({ cdSubmitButton: true });
+    }
+    const notificationTitle = this.editing
+      ? $localize`Topic updated successfully`
+      : $localize`Topic created successfully`;
+    const formValue = this.topicForm.getRawValue();
+    const topicPolicy = this.getTopicPolicy();
+    const payload = this.generatePayload(formValue, topicPolicy);
+
+    const action = this.rgwTopicService.create(payload);
+
+    action.subscribe({
+      next: () => {
+        this.notificationService.show(NotificationType.success, notificationTitle);
+        this.goToListView();
+      },
+      error: () => this.topicForm.setErrors({ cdSubmitButton: true })
+    });
+  }
+
+  generatePayload(formValue: any, topicPolicy: any): TopicRequest {
+    const basePayload: TopicRequest = {
+      name: formValue.name,
+      owner: formValue.owner,
+      push_endpoint: formValue.push_endpoint,
+      opaque_data: formValue.OpaqueData,
+      persistent: formValue.persistent,
+      time_to_live: formValue.time_to_live,
+      max_retries: formValue.max_retries,
+      retry_sleep_duration: formValue.retry_sleep_duration,
+      policy: topicPolicy
+    };
+
+    const topicType = formValue.endpointType;
+
+    const additionalFieldsMap: Record<string, Partial<TopicRequest>> = {
+      [Endpoint.KAFKA]: {
+        use_ssl: formValue.use_ssl,
+        ack_level: formValue.ack_level,
+        kafka_brokers: formValue.kafka_brokers,
+        ca_location: formValue.ca_location,
+        mechanism: formValue.mechanism
+      },
+      [Endpoint.AMQP]: {
+        verify_ssl: formValue.verify_ssl,
+        amqp_exchange: formValue.amqp_exchange,
+        ack_level: formValue.ack_level,
+        ca_location: formValue.ca_location
+      },
+      [Endpoint.HTTP]: {
+        verify_ssl: formValue.verify_ssl,
+        cloud_events: formValue.cloud_events
+      }
+    };
+
+    if (additionalFieldsMap[topicType]) {
+      Object.assign(basePayload, additionalFieldsMap[topicType]);
+    }
+
+    return basePayload;
+  }
+
+  get attributeHelpText(): string {
+    return this.selectedOption === this.hostProtocols.AMQP
+      ? $localize`Choose the configuration settings for the AMQP connection`
+      : $localize`Choose the configuration settings for the KAFKA connection`;
+  }
+
+  goToListView() {
+    this.router.navigate([BASE_URL]);
+  }
+
+  clearTextArea(field: string, defaultValue: string = '') {
+    this.topicForm.get(field)?.setValue(defaultValue);
+    this.topicForm.markAsDirty();
+    this.topicForm.updateValueAndValidity();
+  }
+
+  reset() {
+    this.topicForm.patchValue({ enable_ssl: true });
+    this.topicForm.patchValue({ port: this.port });
+    this.topicForm.patchValue({ vhost: this.vhost });
+  }
+}
index 22fce6e40a78cf2449d84e320c09938db7c490ac..7b4789a09cfbfcc6fabe4ad8323bb5665d149fbe 100644 (file)
@@ -1,14 +1,20 @@
-  <ng-container *ngIf="topic$ | async as topics">
+  <ng-container>
   <cd-table #table
             [autoReload]="false"
-            [data]="topics"
+            [data]="topics$ | async"
             [columns]="columns"
             columnMode="flex"
             selectionType="single"
             [hasDetails]="true"
+            id="key"
             (setExpandedRow)="setExpandedRow($event)"
             (updateSelection)="updateSelection($event)"
-            (fetchData)="fetchData($event)">
+            (fetchData)="fetchData()">
+  <cd-table-actions class="table-actions"
+                    [permission]="permission"
+                    [selection]="selection"
+                    [tableActions]="tableActions">
+  </cd-table-actions>
   <cd-rgw-topic-details *cdTableDetail
                         [selection]="expandedRow"></cd-rgw-topic-details>
   </cd-table>
index c87427862dd4eb62302a4f8dcc24e9a54442bb2d..b4e7d6653ee6a6b28d851d757f4c33794c531fc7 100644 (file)
@@ -1,14 +1,15 @@
 import { ComponentFixture, TestBed } from '@angular/core/testing';
-
 import { RgwTopicListComponent } from './rgw-topic-list.component';
 import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
 import { SharedModule } from '~/app/shared/shared.module';
-import { configureTestBed } from '~/testing/unit-test-helper';
+import { configureTestBed, PermissionHelper } from '~/testing/unit-test-helper';
 import { RgwTopicDetailsComponent } from '../rgw-topic-details/rgw-topic-details.component';
 import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
 import { RouterTestingModule } from '@angular/router/testing';
 import { HttpClientTestingModule } from '@angular/common/http/testing';
 import { ToastrModule } from 'ngx-toastr';
+import { of } from 'rxjs';
+
 describe('RgwTopicListComponent', () => {
   let component: RgwTopicListComponent;
   let fixture: ComponentFixture<RgwTopicListComponent>;
@@ -29,27 +30,104 @@ describe('RgwTopicListComponent', () => {
         ToastrModule.forRoot(),
         RouterTestingModule
       ],
-
       declarations: [RgwTopicListComponent]
     }).compileComponents();
 
     fixture = TestBed.createComponent(RgwTopicListComponent);
     component = fixture.componentInstance;
     rgwtTopicService = TestBed.inject(RgwTopicService);
-    rgwTopicServiceListSpy = spyOn(rgwtTopicService, 'listTopic').and.callThrough();
-    fixture = TestBed.createComponent(RgwTopicListComponent);
-    component = fixture.componentInstance;
+
+    // Stub external methods before ngOnInit triggers
     spyOn(component, 'setTableRefreshTimeout').and.stub();
-    fixture.detectChanges();
+
+    // Spy on the service method
+    rgwTopicServiceListSpy = spyOn(rgwtTopicService, 'listTopic').and.returnValue(of([]));
+
+    fixture.detectChanges(); // Triggers ngOnInit
   });
 
   it('should create', () => {
     expect(component).toBeTruthy();
-    expect(rgwTopicServiceListSpy).toHaveBeenCalledTimes(1);
+    expect(rgwTopicServiceListSpy).toHaveBeenCalledTimes(2);
   });
 
-  it('should call listTopic on ngOnInit', () => {
-    component.ngOnInit();
-    expect(rgwTopicServiceListSpy).toHaveBeenCalled();
+  it('should test all TableActions combinations', () => {
+    const permissionHelper: PermissionHelper = new PermissionHelper(component.permission);
+    const tableActions = permissionHelper.setPermissionsAndGetActions(component.tableActions);
+
+    expect(tableActions).toEqual({
+      'create,update,delete': {
+        actions: ['Create', 'Edit', 'Delete'],
+        primary: {
+          multiple: 'Create',
+          executing: 'Create',
+          single: 'Create',
+          no: 'Create'
+        }
+      },
+      'create,update': {
+        actions: ['Create', 'Edit'],
+        primary: {
+          multiple: 'Create',
+          executing: 'Create',
+          single: 'Create',
+          no: 'Create'
+        }
+      },
+      'create,delete': {
+        actions: ['Create', 'Delete'],
+        primary: {
+          multiple: 'Create',
+          executing: 'Create',
+          single: 'Create',
+          no: 'Create'
+        }
+      },
+      create: {
+        actions: ['Create'],
+        primary: {
+          multiple: 'Create',
+          executing: 'Create',
+          single: 'Create',
+          no: 'Create'
+        }
+      },
+      'update,delete': {
+        actions: ['Edit', 'Delete'],
+        primary: {
+          multiple: '',
+          executing: '',
+          single: '',
+          no: ''
+        }
+      },
+      update: {
+        actions: ['Edit'],
+        primary: {
+          multiple: 'Edit',
+          executing: 'Edit',
+          single: 'Edit',
+          no: 'Edit'
+        }
+      },
+      delete: {
+        actions: ['Delete'],
+        primary: {
+          multiple: 'Delete',
+          executing: 'Delete',
+          single: 'Delete',
+          no: 'Delete'
+        }
+      },
+      'no-permissions': {
+        actions: [],
+        primary: {
+          multiple: '',
+          executing: '',
+          single: '',
+          no: ''
+        }
+      }
+    });
   });
 });
index 4c3dace75cec743d3567e7723ec2856d2920a33c..90a58b90254a216720aa33ca74a76e25db674f90 100644 (file)
@@ -1,4 +1,4 @@
-import { Component, OnInit, ViewChild } from '@angular/core';
+import { Component, NgZone, OnInit, ViewChild } from '@angular/core';
 import _ from 'lodash';
 
 import { ActionLabelsI18n } from '~/app/shared/constants/app.constants';
@@ -13,14 +13,22 @@ import { AuthStorageService } from '~/app/shared/services/auth-storage.service';
 import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
 
 import { CdTableSelection } from '~/app/shared/models/cd-table-selection';
-import { BehaviorSubject, Observable, of } from 'rxjs';
+import { URLBuilderService } from '~/app/shared/services/url-builder.service';
+import { Icons } from '~/app/shared/enum/icons.enum';
+import { ModalCdsService } from '~/app/shared/services/modal-cds.service';
+import { TaskWrapperService } from '~/app/shared/services/task-wrapper.service';
+import { FinishedTask } from '~/app/shared/models/finished-task';
+import { DeleteConfirmationModalComponent } from '~/app/shared/components/delete-confirmation-modal/delete-confirmation-modal.component';
 import { Topic } from '~/app/shared/models/topic.model';
+import { BehaviorSubject, Observable, of, Subscriber } from 'rxjs';
 import { catchError, shareReplay, switchMap } from 'rxjs/operators';
 
+const BASE_URL = 'rgw/topic';
 @Component({
   selector: 'cd-rgw-topic-list',
   templateUrl: './rgw-topic-list.component.html',
-  styleUrls: ['./rgw-topic-list.component.scss']
+  styleUrls: ['./rgw-topic-list.component.scss'],
+  providers: [{ provide: URLBuilderService, useValue: new URLBuilderService(BASE_URL) }]
 })
 export class RgwTopicListComponent extends ListWithDetails implements OnInit {
   @ViewChild('table', { static: true })
@@ -31,15 +39,19 @@ export class RgwTopicListComponent extends ListWithDetails implements OnInit {
   context: CdTableFetchDataContext;
   errorMessage: string;
   selection: CdTableSelection = new CdTableSelection();
-  topic$: Observable<Topic[]>;
-  subject = new BehaviorSubject<Topic[]>([]);
+  topicsSubject = new BehaviorSubject<Topic[]>([]);
+  topics$ = this.topicsSubject.asObservable();
   name: string;
   constructor(
     private authStorageService: AuthStorageService,
     public actionLabels: ActionLabelsI18n,
-    private rgwTopicService: RgwTopicService
+    private rgwTopicService: RgwTopicService,
+    private modalService: ModalCdsService,
+    private urlBuilder: URLBuilderService,
+    private taskWrapper: TaskWrapperService,
+    protected ngZone: NgZone
   ) {
-    super();
+    super(ngZone);
     this.permission = this.authStorageService.getPermissions().rgw;
   }
 
@@ -66,7 +78,35 @@ export class RgwTopicListComponent extends ListWithDetails implements OnInit {
         flexGrow: 2
       }
     ];
-    this.topic$ = this.subject.pipe(
+
+    const getBucketUri = () =>
+      this.selection.first() && `${encodeURIComponent(this.selection.first().key)}`;
+    const addAction: CdTableAction = {
+      permission: 'create',
+      icon: Icons.add,
+      routerLink: () => this.urlBuilder.getCreate(),
+      name: this.actionLabels.CREATE,
+      canBePrimary: (selection: CdTableSelection) => !selection.hasSelection
+    };
+
+    const editAction: CdTableAction = {
+      permission: 'update',
+      icon: Icons.edit,
+      routerLink: () => this.urlBuilder.getEdit(getBucketUri()),
+      name: this.actionLabels.EDIT
+    };
+
+    const deleteAction: CdTableAction = {
+      permission: 'delete',
+      icon: Icons.destroy,
+      click: () => this.deleteAction(),
+      disable: () => !this.selection.hasSelection,
+      name: this.actionLabels.DELETE,
+      canBePrimary: (selection: CdTableSelection) => !selection.hasSelection
+    };
+    this.tableActions = [addAction, editAction, deleteAction];
+    this.setTableRefreshTimeout();
+    this.topics$ = this.topicsSubject.pipe(
       switchMap(() =>
         this.rgwTopicService.listTopic().pipe(
           catchError(() => {
@@ -78,12 +118,40 @@ export class RgwTopicListComponent extends ListWithDetails implements OnInit {
       shareReplay(1)
     );
   }
-
   fetchData() {
-    this.subject.next([]);
+    this.topicsSubject.next([]);
   }
 
   updateSelection(selection: CdTableSelection) {
     this.selection = selection;
   }
+
+  deleteAction() {
+    const key = this.selection.first().key;
+    const name = this.selection.first().name;
+    this.modalService.show(DeleteConfirmationModalComponent, {
+      itemDescription: $localize`Topic`,
+      itemNames: [name],
+      submitActionObservable: () => {
+        return new Observable((observer: Subscriber<any>) => {
+          this.taskWrapper
+            .wrapTaskAroundCall({
+              task: new FinishedTask('rgw/topic/delete', {
+                name: [name]
+              }),
+              call: this.rgwTopicService.delete(key)
+            })
+            .subscribe({
+              error: (error: any) => {
+                observer.error(error);
+              },
+              complete: () => {
+                observer.complete();
+                this.table.refreshBtn();
+              }
+            });
+        });
+      }
+    });
+  }
 }
index e6e09e4c08fe58deaa867f5984fc040b2f6cb558..3d7f9c60618d75269d8233b6748401ae9560ceea 100644 (file)
@@ -111,6 +111,7 @@ import { RgwRateLimitDetailsComponent } from './rgw-rate-limit-details/rgw-rate-
 import { NfsClusterComponent } from '../nfs/nfs-cluster/nfs-cluster.component';
 import { RgwTopicListComponent } from './rgw-topic-list/rgw-topic-list.component';
 import { RgwTopicDetailsComponent } from './rgw-topic-details/rgw-topic-details.component';
+import { RgwTopicFormComponent } from './rgw-topic-form/rgw-topic-form.component';
 
 @NgModule({
   imports: [
@@ -140,14 +141,14 @@ import { RgwTopicDetailsComponent } from './rgw-topic-details/rgw-topic-details.
     InputModule,
     AccordionModule,
     CheckboxModule,
-    SelectModule,
     NumberModule,
     TabsModule,
     TagModule,
     TooltipModule,
     ComboBoxModule,
     ToggletipModule,
-    RadioModule
+    RadioModule,
+    SelectModule
   ],
   exports: [
     RgwDaemonDetailsComponent,
@@ -210,7 +211,8 @@ import { RgwTopicDetailsComponent } from './rgw-topic-details/rgw-topic-details.
     RgwBucketLifecycleListComponent,
     RgwRateLimitDetailsComponent,
     RgwTopicListComponent,
-    RgwTopicDetailsComponent
+    RgwTopicDetailsComponent,
+    RgwTopicFormComponent
   ],
   providers: [TitleCasePipe]
 })
@@ -429,7 +431,19 @@ const routes: Routes = [
   {
     path: 'topic',
     data: { breadcrumbs: 'Topic' },
-    children: [{ path: '', component: RgwTopicListComponent }]
+    children: [
+      { path: '', component: RgwTopicListComponent },
+      {
+        path: URLVerbs.CREATE,
+        component: RgwTopicFormComponent,
+        data: { breadcrumbs: ActionLabels.CREATE }
+      },
+      {
+        path: `${URLVerbs.EDIT}/:name`,
+        component: RgwTopicFormComponent,
+        data: { breadcrumbs: ActionLabels.EDIT }
+      }
+    ]
   }
 ];
 
index b30cf0443c89c51639af988621961d2bf92371fe..5f7f39459c938d1a5dcdf465401c38c9a2271246 100644 (file)
@@ -1,7 +1,6 @@
 import { TestBed } from '@angular/core/testing';
-
 import { RgwTopicService } from './rgw-topic.service';
-import { configureTestBed } from '~/testing/unit-test-helper';
+import { configureTestBed, RgwHelper } from '~/testing/unit-test-helper';
 import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
 
 describe('RgwTopicService', () => {
@@ -18,26 +17,12 @@ describe('RgwTopicService', () => {
   beforeEach(() => {
     service = TestBed.inject(RgwTopicService);
     httpTesting = TestBed.inject(HttpTestingController);
-  });
-
-  afterEach(() => {
-    httpTesting.verify();
+    RgwHelper.selectDaemon();
   });
 
   it('should be created', () => {
     expect(service).toBeTruthy();
   });
-
-  it('should call list with empty result', () => {
-    let result;
-    service.listTopic().subscribe((resp) => {
-      result = resp;
-    });
-    const req = httpTesting.expectOne(`api/rgw/topic`);
-    expect(req.request.method).toBe('GET');
-    req.flush([]);
-    expect(result).toEqual([]);
-  });
   it('should call list with result', () => {
     service.listTopic().subscribe((resp) => {
       let result = resp;
@@ -48,9 +33,19 @@ describe('RgwTopicService', () => {
     req.flush(['foo', 'bar']);
   });
 
-  it('should call get', () => {
-    service.getTopic('foo').subscribe();
+  it('should call create', () => {
+    service.create({} as any).subscribe();
+    const req = httpTesting.expectOne(`api/rgw/topic?${RgwHelper.DAEMON_QUERY_PARAM}`);
+    expect(req.request.method).toBe('POST');
+  });
+  it('should call update', () => {
+    service.create({} as any).subscribe();
+    const req = httpTesting.expectOne(`api/rgw/topic?${RgwHelper.DAEMON_QUERY_PARAM}`);
+    expect(req.request.method).toBe('POST');
+  });
+  it('should call delete', () => {
+    service.delete('foo').subscribe();
     const req = httpTesting.expectOne(`api/rgw/topic/foo`);
-    expect(req.request.method).toBe('GET');
+    expect(req.request.method).toBe('DELETE');
   });
 });
index 8de7b32c04dbd6cc71eaaed912593f70b46c81bd..6dcc48882f453d8075640a39729382e919b6e94a 100644 (file)
@@ -1,8 +1,11 @@
-import { HttpClient } from '@angular/common/http';
+import { HttpClient, HttpParams } from '@angular/common/http';
 import { Injectable } from '@angular/core';
-import { Observable } from 'rxjs';
+import _ from 'lodash';
+import { Observable, of as observableOf } from 'rxjs';
 import { ApiClient } from './api-client';
-import { Topic } from '~/app/shared/models/topic.model';
+import { Topic, TopicRequest } from '~/app/shared/models/topic.model';
+import { catchError, mapTo } from 'rxjs/operators';
+import { RgwDaemonService } from './rgw-daemon.service';
 
 @Injectable({
   providedIn: 'root'
@@ -10,7 +13,7 @@ import { Topic } from '~/app/shared/models/topic.model';
 export class RgwTopicService extends ApiClient {
   baseURL = 'api/rgw/topic';
 
-  constructor(private http: HttpClient) {
+  constructor(private http: HttpClient, private rgwDaemonService: RgwDaemonService) {
     super();
   }
 
@@ -18,7 +21,32 @@ export class RgwTopicService extends ApiClient {
     return this.http.get<Topic[]>(this.baseURL);
   }
 
-  getTopic(name: string) {
-    return this.http.get(`${this.baseURL}/${name}`);
+  getTopic(key: string) {
+    return this.http.get<Topic>(`${this.baseURL}/${encodeURIComponent(key)}`);
+  }
+
+  create(createParam: TopicRequest) {
+    return this.rgwDaemonService.request((params: HttpParams) => {
+      return this.http.post(`${this.baseURL}`, createParam, { params: params });
+    });
+  }
+
+  delete(key: string) {
+    return this.http.delete(`${this.baseURL}/${key}`, {
+      observe: 'response'
+    });
+  }
+
+  exists(key: string): Observable<boolean> {
+    const encodedKey = encodeURIComponent(`:${key}`);
+    return this.getTopic(encodedKey).pipe(
+      mapTo(true),
+      catchError((error: Event) => {
+        if (_.isFunction(error.preventDefault)) {
+          error.preventDefault();
+        }
+        return observableOf(false);
+      })
+    );
   }
 }
index 31aa7b61c1225f6076589cf8099d80f774a0bd3d..0e265108eeb1a2d79631b808ef342105da63a8b2 100644 (file)
@@ -1,8 +1,8 @@
-interface Destination {
+export interface Destination {
   push_endpoint: string;
   push_endpoint_args: string;
   push_endpoint_topic: string;
-  stored_secret: string;
+  stored_secret: boolean;
   persistent: boolean;
   persistent_queue: string;
   time_to_live: number;
@@ -17,15 +17,81 @@ export interface Topic {
   dest: Destination;
   opaqueData: string;
   policy: string | {};
+  key: string;
   subscribed_buckets: any[];
 }
 
-export interface TopicDetails {
+export interface TopicRequest {
   owner: string;
   name: string;
-  arn: string;
-  dest: Destination;
-  opaqueData: string;
-  policy: string;
-  subscribed_buckets: string[];
+  push_endpoint: string;
+  opaque_data: string;
+  policy: {} | string;
+  persistent?: string;
+  time_to_live?: string;
+  max_retries?: string;
+  retry_sleep_duration?: string;
+  verify_ssl?: boolean;
+  cloud_events?: string;
+  ca_location?: string;
+  amqp_exchange?: string;
+  ack_level?: string;
+  use_ssl?: boolean;
+  kafka_brokers?: string;
+  mechanism?: string;
+}
+
+export const KAFKA_MECHANISM = {
+  PLAIN: 'PLAIN',
+  SCRAM256: 'SCRAM-SHA-256',
+  SCRAM512: 'SCRAM-SHA-512'
+};
+export const END_POINT_TYPE = {
+  HTTP: 'HTTP',
+  AMQP: 'AMQP',
+  Kafka: 'KAFKA'
+};
+export const AMQP_ACK_LEVEL = {
+  none: 'none',
+  broker: 'broker',
+  routable: 'routable'
+};
+export const KAFKA_ACK_LEVEL = {
+  none: 'none',
+  broker: 'broker'
+};
+export enum URLPort {
+  HTTP = '80',
+  HTTPS = '443',
+  AMQP = '5672',
+  AMQPS = '5671',
+  KAFKA = '9092',
+  KAFKA_SSL = '9093'
 }
+export enum HostURLProtocol {
+  http = 'http',
+  https = 'https',
+  amqp = 'amqp',
+  amqps = 'amqps',
+  kafka = 'kafka'
+}
+export enum Endpoint {
+  HTTP = 'HTTP',
+  AMQP = 'AMQP',
+  AMQPS = 'AMQPS',
+  KAFKA = 'KAFKA'
+}
+
+export enum UrlProtocol {
+  HTTP = 'http:',
+  HTTPS = 'https:',
+  AMQP = 'amqp:',
+  AMQPS = 'amqps:',
+  KAFKA = 'kafka'
+}
+
+export const URL_FORMAT_PLACEHOLDERS = {
+  http: 'http[s]://<fqdn>[:<port]...',
+  amqp: 'amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]...',
+  kafka: 'kafka://[<user>:<password>@]<fqdn>[:<port>]...'
+};
index 87bee9a139b00601daa050e5762414925fa6b629..ccd502fe21886e3a34bc66c70bb7c31e6083cc27 100644 (file)
@@ -404,6 +404,9 @@ export class TaskMessageService {
     'nfs/delete': this.newTaskMessage(this.commonOperations.delete, (metadata) =>
       this.nfs(metadata)
     ),
+    'rgw/topic/delete': this.newTaskMessage(this.commonOperations.delete, (metadata) =>
+      this.topic(metadata)
+    ),
     // Grafana tasks
     'grafana/dashboards/update': this.newTaskMessage(
       this.commonOperations.update,
@@ -615,6 +618,9 @@ export class TaskMessageService {
     return $localize`SMB users and groups access resource '${metadata.usersGroupsId}'`;
   }
 
+  topic(metadata: any) {
+    return $localize`Topic  '${metadata.name}'`;
+  }
   service(metadata: any) {
     return $localize`service '${metadata.service_name}'`;
   }
index b193aa45f67fa6f1751b5c15b15e8747ea5daecb..93ca1f4b53b847f034c07361bff0c107bdfc8fa5 100755 (executable)
@@ -13544,19 +13544,7 @@ paths:
       - RgwSite
   /api/rgw/topic:
     get:
-      parameters:
-      - allowEmptyValue: true
-        description: Name of the user
-        in: query
-        name: uid
-        schema:
-          type: string
-      - allowEmptyValue: true
-        description: Name of the tenant
-        in: query
-        name: tenant
-        schema:
-          type: string
+      parameters: []
       responses:
         '200':
           content:
@@ -13584,64 +13572,63 @@ paths:
           application/json:
             schema:
               properties:
-                amqp_ack_level:
-                  description: amqp ack level
+                ack_level:
+                  description: Amqp ack level
                   type: string
                 amqp_exchange:
-                  description: amqp exchange
+                  description: Amqp exchange
                   type: string
                 ca_location:
-                  description: ca location
+                  description: Ca location
                   type: string
                 cloud_events:
                   default: false
-                  description: cloud events
+                  description: Cloud events
                   type: string
                 daemon_name:
-                  type: string
-                kafka_ack_level:
-                  description: kafka ack level
+                  description: Name of the daemon
                   type: string
                 kafka_brokers:
-                  description: kafka brokers
+                  description: Kafka brokers
                   type: string
                 max_retries:
-                  description: max retries
+                  description: Max retries
                   type: string
                 mechanism:
-                  description: mechanism
+                  description: Mechanism
                   type: string
                 name:
                   description: Name of the topic
                   type: string
                 opaque_data:
-                  description: ' opaque data'
+                  description: OpaqueData
                   type: string
                 owner:
+                  description: Name of the owner
                   type: string
                 persistent:
                   default: false
-                  description: persistent
+                  description: Persistent
                   type: boolean
                 policy:
-                  description: policy
+                  description: Policy
                   type: string
                 push_endpoint:
                   description: Push Endpoint
                   type: string
                 retry_sleep_duration:
-                  description: retry sleep duration
+                  description: Retry sleep duration
                   type: string
                 time_to_live:
                   description: Time to live
                   type: string
                 use_ssl:
                   default: false
-                  description: use ssl
+                  description: Use ssl
                   type: boolean
                 verify_ssl:
                   default: false
-                  description: verify ssl
+                  description: Verify ssl
                   type: boolean
               required:
               - name
@@ -13671,21 +13658,15 @@ paths:
       summary: Create a new RGW Topic
       tags:
       - RGW Topic Management
-  /api/rgw/topic/{name}:
+  /api/rgw/topic/{key}:
     delete:
       parameters:
-      - description: Name of the user
+      - description: The metadata object key to retrieve the topic e.g topic:topic_name
         in: path
-        name: name
+        name: key
         required: true
         schema:
           type: string
-      - allowEmptyValue: true
-        description: Name of the tenant
-        in: query
-        name: tenant
-        schema:
-          type: string
       responses:
         '202':
           content:
@@ -13713,18 +13694,12 @@ paths:
       - RGW Topic Management
     get:
       parameters:
-      - description: Name of the user
+      - description: The metadata object key to retrieve the topic e.g owner:topic_name
         in: path
-        name: name
+        name: key
         required: true
         schema:
           type: string
-      - allowEmptyValue: true
-        description: Name of the tenant
-        in: query
-        name: tenant
-        schema:
-          type: string
       responses:
         '200':
           content:
index cefad2b045bb7225a5cfa48f55ffbff1136d35dd..8c937753dcd50052b0e5baaf8185a9971466745e 100755 (executable)
@@ -13,7 +13,7 @@ import xml.etree.ElementTree as ET  # noqa: N814
 from collections import defaultdict
 from enum import Enum
 from subprocess import SubprocessError
-from urllib.parse import urlparse
+from urllib.parse import urlparse, urlunparse
 
 import requests
 
@@ -22,7 +22,7 @@ try:
 except ModuleNotFoundError:
     logging.error("Module 'xmltodict' is not installed.")
 
-from mgr_util import build_url
+from mgr_util import build_url, name_to_config_section
 
 from .. import mgr
 from ..awsauth import S3Auth
@@ -1121,14 +1121,15 @@ class RgwClient(RestClient):
 
     @RestClient.api_post('?Action=CreateTopic&Name={name}')
     def create_topic(self, request=None, name: str = '',
+                     daemon_name: str = '',
                      push_endpoint: Optional[str] = '', opaque_data: Optional[str] = '',
                      persistent: Optional[bool] = False, time_to_live: Optional[str] = '',
                      max_retries: Optional[str] = '', retry_sleep_duration: Optional[str] = '',
                      policy: Optional[str] = '',
                      verify_ssl: Optional[bool] = False, cloud_events: Optional[bool] = False,
                      ca_location: Optional[str] = None, amqp_exchange: Optional[str] = None,
-                     amqp_ack_level: Optional[str] = None,
-                     use_ssl: Optional[bool] = False, kafka_ack_level: Optional[str] = None,
+                     ack_level: Optional[str] = None,
+                     use_ssl: Optional[bool] = False,
                      kafka_brokers: Optional[str] = None, mechanism: Optional[str] = None,
                      ):
         params = {'Name': name}
@@ -1155,21 +1156,31 @@ class RgwClient(RestClient):
             params['ca_location'] = ca_location
         if amqp_exchange:
             params['amqp_exchange'] = amqp_exchange
-        if amqp_ack_level:
-            params['amqp_ack_level'] = amqp_ack_level
+        if ack_level:
+            params['ack_level'] = ack_level
         if use_ssl:
             params['use_ssl'] = 'true' if use_ssl else 'false'
-        if kafka_ack_level:
-            params['kafka_ack_level'] = kafka_ack_level
         if kafka_brokers:
             params['kafka_brokers'] = kafka_brokers
         if mechanism:
             params['mechanism'] = mechanism
+        if push_endpoint and '://' in push_endpoint and '@' in push_endpoint:
+            try:
+                full_daemon_name = f'rgw.{daemon_name}'
+                CephService.send_command(
+                    'mon', 'config set',
+                    who=name_to_config_section(full_daemon_name),
+                    name='rgw_allow_notification_secrets_in_cleartext',
+                    value='true'
+                )
+            except Exception as e:
+                raise DashboardException(
+                    msg=f'Failed to set cleartext secret config: {e}', component='rgw'
+                )
         try:
             result = request(params=params)
         except RequestException as e:
             raise DashboardException(msg=str(e), component='rgw')
-
         return result
 
 
@@ -2706,58 +2717,91 @@ class RgwMultisite:
 
 
 class RgwTopicmanagement:
-    def list_topics(self, uid: Optional[str], tenant: Optional[str]):
-        rgw_topics_list = {}
-        rgw_topic_list_cmd = ['topic', 'list']
-        try:
-            if uid:
-                rgw_topic_list_cmd.append('--uid')
-                rgw_topic_list_cmd.append(uid)
 
-            if tenant:
-                rgw_topic_list_cmd.append('--tenant')
-                rgw_topic_list_cmd.append(tenant)
+    @staticmethod
+    def push_endpoint_password(push_endpoint: str) -> str:
+        parsed = urlparse(push_endpoint)
+        if parsed.username and parsed.password:
+            netloc = f"{parsed.username}:****@{parsed.hostname}"
+            if parsed.port:
+                netloc += f":{parsed.port}"
+            parsed = parsed._replace(netloc=netloc)
+            return urlunparse(parsed)
+        return push_endpoint
+
+    def list_topics(self):
+        try:
+            list_cmd = ['metadata', 'list', 'topic']
+            exit_code, topic_keys, _ = mgr.send_rgwadmin_command(list_cmd)
 
-            exit_code, rgw_topics_list, _ = mgr.send_rgwadmin_command(rgw_topic_list_cmd)
             if exit_code > 0:
-                raise DashboardException(msg='Unable to fetch topic list',
-                                         http_status_code=500, component='rgw')
-            return rgw_topics_list
-        except SubprocessError as error:
-            raise DashboardException(error, http_status_code=500, component='rgw')
+                raise DashboardException(
+                    'Unable to fetch topic list',
+                    http_status_code=500,
+                    component='rgw'
+                )
 
-    def get_topic(self, name: str, tenant: Optional[str]):
-        rgw_topic_info_cmd = ['topic', 'get']
-        try:
-            if tenant:
-                rgw_topic_info_cmd.append('--tenant')
-                rgw_topic_info_cmd.append(tenant)
+            topics_info = []
+
+            for key in topic_keys:
+                get_cmd = ['metadata', 'get', f'topic:{key}']
+                exit_code, topic_info, _ = mgr.send_rgwadmin_command(get_cmd)
+
+                if exit_code == 0 and 'data' in topic_info:
+                    data = topic_info['data']
+                    modified_data = data.copy()
+                    modified_data['key'] = key
+                    push_endpoint = data.get('dest', {}).get('push_endpoint')
+                    if push_endpoint:
+                        modified_data.setdefault('dest', {})
+                        modified_data['dest']['push_endpoint'] = self.push_endpoint_password(
+                            push_endpoint
+                        )
+
+                    topics_info.append(modified_data)
 
-            if name:
-                rgw_topic_info_cmd.append('--topic')
-                rgw_topic_info_cmd.append(name)
+            return topics_info
+
+        except SubprocessError as error:
+            raise DashboardException(str(error), http_status_code=500, component='rgw')
 
+    def get_topic(self, key):
+        rgw_topic_info_cmd = ['metadata', 'get', f'topic:{key}']
+        try:
             exit_code, topic_info, _ = mgr.send_rgwadmin_command(rgw_topic_info_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to get topic info',
-                                         http_status_code=500, component='rgw')
+                raise DashboardException(
+                    'Unable to get topic info',
+                    http_status_code=500,
+                    component='rgw'
+                )
+
+            topic_info = topic_info.get('data', {})
+            topic_info['key'] = key
+            push_endpoint = topic_info.get('dest', {}).get('push_endpoint')
+            if push_endpoint:
+                topic_info.setdefault('dest', {})
+                topic_info['dest']['push_endpoint'] = self.push_endpoint_password(push_endpoint)
+
             return topic_info
         except SubprocessError as error:
-            raise DashboardException(error, http_status_code=500, component='rgw')
+            raise DashboardException(str(error), http_status_code=500, component='rgw')
 
-    def delete_topic(self, name: str, tenant: Optional[str] = None):
-        rgw_delete_topic_cmd = ['topic', 'rm']
+    def delete_topic(self, key: str):
+        rgw_delete_metadata_cmd = ['metadata', 'rm', f'topic:{key}']
         try:
-            if tenant:
-                rgw_delete_topic_cmd.extend(['--tenant', tenant])
-
-            if name:
-                rgw_delete_topic_cmd.extend(['--topic', name])
+            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_metadata_cmd)
+            if exit_code > 0:
+                raise DashboardException(
+                    msg=f'Unable to remove metadata for topic: {key}',
+                    http_status_code=500,
+                    component='rgw'
+                )
 
-            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_topic_cmd)
+            return {
+                "status": "success",
+                "message": f"Metadata for topic '{key}' removed successfully."
+            }
 
-            if exit_code > 0:
-                raise DashboardException(msg='Unable to delete topic',
-                                         http_status_code=500, component='rgw')
         except SubprocessError as error:
-            raise DashboardException(error, http_status_code=500, component='rgw')
+            raise DashboardException(str(error), http_status_code=500, component='rgw')
index 00b2f5aff88183d726727de09a9cd05e7afbf5e2..609d5388fc4dab1bf68306c9610ba733dede50d8 100644 (file)
@@ -551,6 +551,7 @@ class TestRgwTopicController(ControllerTestCase):
                 "arn": "arn:aws:sns:zg1-realm1::HttpTest",
                 "opaqueData": "test123",
                 "policy": "{}",
+                "key": "RGW22222222222222222:HttpTest",
                 "subscribed_buckets": []
             }
         ]
@@ -580,6 +581,7 @@ class TestRgwTopicController(ControllerTestCase):
                 "arn": "arn:aws:sns:zg1-realm1::HttpTest",
                 "opaqueData": "test123",
                 "policy": "{}",
+                "key": "RGW22222222222222222:HttpTest",
                 "subscribed_buckets": []
             }
         ]
@@ -595,6 +597,6 @@ class TestRgwTopicController(ControllerTestCase):
         mock_delete_topic.return_value = None
 
         controller = RgwTopic()
-        result = controller.delete(name='HttpTest', tenant=None)
-        mock_delete_topic.assert_called_with(name='HttpTest', tenant=None)
+        result = controller.delete(key='RGW22222222222222222:HttpTest')
+        mock_delete_topic.assert_called_with(key='RGW22222222222222222:HttpTest')
         self.assertEqual(result, None)