}
+def _get_owner(owner):
+ accounts = RgwAccounts().get_accounts()
+
+ # if the owner is present in the accounts list,
+ # then the bucket is owned by an account.
+ # hence we will use dashboard user to fetch the
+ # bucket info
+ return owner if owner not in accounts else RgwServiceManager.user
+
+
@UIRouter('/rgw', Scope.RGW)
@APIDoc("RGW Management API", "Rgw")
class Rgw(BaseController):
if bucket['tenant'] else bucket['bucket']
return bucket
- def _get_owner(self, owner):
- accounts = RgwAccounts().get_accounts()
-
- # if the owner is present in the accounts list,
- # then the bucket is owned by an account.
- # hence we will use dashboard user to fetch the
- # bucket info
- return owner if owner not in accounts else RgwServiceManager.user
-
def _get_versioning(self, owner, daemon_name, bucket_name):
rgw_client = RgwClient.instance(owner, daemon_name)
return rgw_client.get_bucket_versioning(bucket_name)
bucket_name = RgwBucket.get_s3_bucket_name(result['bucket'],
result['tenant'])
- owner = self._get_owner(result['owner'])
+ owner = _get_owner(result['owner'])
# Append the versioning configuration.
versioning = self._get_versioning(owner, daemon_name, bucket_name)
encryption = self._get_encryption(bucket_name, daemon_name, owner)
uid_tenant = uid[:uid.find('$')] if uid.find('$') >= 0 else None
bucket_name = RgwBucket.get_s3_bucket_name(bucket, uid_tenant)
- uid = self._get_owner(uid)
+ uid = _get_owner(uid)
locking = self._get_locking(uid, daemon_name, bucket_name)
if versioning_state:
@allow_empty_body
def set_lifecycle_policy(self, bucket_name: str = '', lifecycle: str = '', daemon_name=None,
owner=None, tenant=None):
- owner = self._get_owner(owner)
+ owner = _get_owner(owner)
bucket_name = RgwBucket.get_s3_bucket_name(bucket_name, tenant)
if lifecycle == '{}':
return self._delete_lifecycle(bucket_name, daemon_name, owner)
@RESTController.Collection(method='GET', path='/lifecycle')
def get_lifecycle_policy(self, bucket_name: str = '', daemon_name=None, owner=None,
tenant=None):
- owner = self._get_owner(owner)
+ owner = _get_owner(owner)
bucket_name = RgwBucket.get_s3_bucket_name(bucket_name, tenant)
return self._get_lifecycle(bucket_name, daemon_name, owner)
"Create a new RGW Topic",
parameters={
"name": (str, "Name of the topic"),
+ "owner": (str, "Name of the owner"),
+ "daemon_name": (str, "Name of the daemon"),
"push_endpoint": (str, "Push Endpoint"),
- "opaque_data": (str, " opaque data"),
- "persistent": (bool, "persistent"),
+ "opaque_data": (str, "OpaqueData"),
+ "persistent": (bool, "Persistent"),
"time_to_live": (str, "Time to live"),
- "max_retries": (str, "max retries"),
- "retry_sleep_duration": (str, "retry sleep duration"),
- "policy": (str, "policy"),
- "verify_ssl": (bool, 'verify ssl'),
- "cloud_events": (str, 'cloud events'),
- "user": (str, 'user'),
- "password": (str, 'password'),
- "vhost": (str, 'vhost'),
- "ca_location": (str, 'ca location'),
- "amqp_exchange": (str, 'amqp exchange'),
- "amqp_ack_level": (str, 'amqp ack level'),
- "use_ssl": (bool, 'use ssl'),
- "kafka_ack_level": (str, 'kafka ack level'),
- "kafka_brokers": (str, 'kafka brokers'),
- "mechanism": (str, 'mechanism'),
+ "max_retries": (str, "Max retries"),
+ "retry_sleep_duration": (str, "Retry sleep duration"),
+ "policy": (str, "Policy"),
+ "verify_ssl": (bool, 'Verify ssl'),
+ "cloud_events": (str, 'Cloud events'),
+ "ca_location": (str, 'Ca location'),
+ "amqp_exchange": (str, 'Amqp exchange'),
+ "ack_level": (str, 'Amqp ack level'),
+ "use_ssl": (bool, 'Use ssl'),
+ "kafka_brokers": (str, 'Kafka brokers'),
+ "mechanism": (str, 'Mechanism'),
},
)
def create(
cloud_events: Optional[bool] = False,
ca_location: Optional[str] = None,
amqp_exchange: Optional[str] = None,
- amqp_ack_level: Optional[str] = None,
+ ack_level: Optional[str] = None,
use_ssl: Optional[bool] = False,
- kafka_ack_level: Optional[str] = None,
kafka_brokers: Optional[str] = None,
mechanism: Optional[str] = None
):
+ owner = _get_owner(owner)
rgw_topic_instance = RgwClient.instance(owner, daemon_name=daemon_name)
return rgw_topic_instance.create_topic(
name=name,
+ daemon_name=daemon_name,
push_endpoint=push_endpoint,
opaque_data=opaque_data,
persistent=persistent,
cloud_events=cloud_events,
ca_location=ca_location,
amqp_exchange=amqp_exchange,
- amqp_ack_level=amqp_ack_level,
+ ack_level=ack_level,
use_ssl=use_ssl,
- kafka_ack_level=kafka_ack_level,
kafka_brokers=kafka_brokers,
mechanism=mechanism
)
@EndpointDoc(
"Get RGW Topic List",
- parameters={
- "uid": (str, "Name of the user"),
- "tenant": (str, "Name of the tenant"),
- },
)
- def list(self, uid: Optional[str] = None, tenant: Optional[str] = None):
+ def list(self):
rgw_topic_instance = RgwTopicmanagement()
- result = rgw_topic_instance.list_topics(uid, tenant)
- return result['topics'] if 'topics' in result else []
+ result = rgw_topic_instance.list_topics()
+ return result
@EndpointDoc(
"Get RGW Topic",
parameters={
- "name": (str, "Name of the user"),
- "tenant": (str, "Name of the tenant"),
+ "key": (str, "The metadata object key to retrieve the topic e.g owner:topic_name"),
},
)
- def get(self, name: str, tenant: Optional[str] = None):
+ def get(self, key: str):
rgw_topic_instance = RgwTopicmanagement()
- result = rgw_topic_instance.get_topic(name, tenant)
+ result = rgw_topic_instance.get_topic(key)
return result
@EndpointDoc(
"Delete RGW Topic",
parameters={
- "name": (str, "Name of the user"),
- "tenant": (str, "Name of the tenant"),
+ "key": (str, "The metadata object key to retrieve the topic e.g topic:topic_name"),
},
)
- def delete(self, name: str, tenant: Optional[str] = None):
+ def delete(self, key: str):
rgw_topic_instance = RgwTopicmanagement()
- result = rgw_topic_instance.delete_topic(name=name, tenant=tenant)
+ result = rgw_topic_instance.delete_topic(key=key)
return result
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { RgwTopicDetailsComponent } from './rgw-topic-details.component';
-import { TopicDetails } from '~/app/shared/models/topic.model';
+import { Topic } from '~/app/shared/models/topic.model';
interface Destination {
push_endpoint: string;
push_endpoint_args: string;
push_endpoint_topic: string;
- stored_secret: string;
+ stored_secret: boolean;
persistent: boolean;
persistent_queue: string;
time_to_live: number;
push_endpoint: 'http://localhost:8080',
push_endpoint_args: 'args',
push_endpoint_topic: 'topic',
- stored_secret: 'secret',
+ stored_secret: false,
persistent: true,
persistent_queue: 'queue',
time_to_live: 3600,
});
it('should parse policy string correctly', () => {
- const mockSelection: TopicDetails = {
+ const mockSelection: Topic = {
name: 'testHttp',
owner: 'ownerName',
arn: 'arnValue',
dest: mockDestination,
policy: '{"key": "value"}',
+ key: 'topic:ownerName:testHttp',
opaqueData: 'test@12345',
subscribed_buckets: []
};
});
it('should set policy to empty object if policy is not a string', () => {
- const mockSelection: TopicDetails = {
+ const mockSelection: Topic = {
name: 'testHttp',
owner: 'ownerName',
arn: 'arnValue',
dest: mockDestination,
policy: '{}',
+ key: 'topic:ownerName:testHttp',
subscribed_buckets: [],
opaqueData: ''
};
import { Component, Input, SimpleChanges, OnChanges } from '@angular/core';
-import { TopicDetails } from '~/app/shared/models/topic.model';
+import { Topic } from '~/app/shared/models/topic.model';
import * as _ from 'lodash';
@Component({
})
export class RgwTopicDetailsComponent implements OnChanges {
@Input()
- selection: TopicDetails;
- policy: string;
+ selection: Topic;
+ policy: string | object = '{}';
constructor() {}
ngOnChanges(changes: SimpleChanges): void {
if (changes['selection'] && this.selection) {
--- /dev/null
+<div cdsCol
+ *cdFormLoading="loading"
+ [columnNumbers]="{ md: 4 }">
+ <form name="topicForm"
+ #formDir="ngForm"
+ [formGroup]="topicForm"
+ novalidate>
+ <div class="form-header">
+ {{ action | titlecase }} {{ resource | upperFirst }}
+ <cd-help-text>
+ <span i18n>
+ Configure the push endpoint parameters to send notifications. On successful creation, you'll receive the topic's unique Amazon Resource Name
+ </span>
+ </cd-help-text>
+ </div>
+
+ <div class="form-item form-item-append"
+ cdsRow>
+ <!-- Topic Type-->
+ <div cdsCol>
+ <cds-select formControlName="endpointType"
+ label="Type"
+ i18n-label
+ cdRequiredField="Type"
+ id="endpointType"
+ (change)="onEndpointTypeChange();"
+ [invalid]="topicForm.controls.endpointType.invalid && topicForm.controls.endpointType.dirty"
+ [invalidText]="topicTypeError"
+ helperText="This user will manage and configure the topic’s settings."
+ i18n-helperText>
+ <option i18n
+ *ngIf="endpointType === null"
+ value="null">Loading... </option>
+ <option i18n
+ *ngIf="endpointType !== null"
+ value="">-- Select a Topic type --</option>
+ <option *ngFor="let data of endpointType"
+ i18n>{{ data | upperFirst }} </option>
+ </cds-select>
+ <ng-template #topicTypeError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('endpointType', formDir, 'required')"
+ i18n>This field is required.</span>
+ </ng-template>
+ </div>
+
+ <!-- owner -->
+ <div cdsCol>
+ <cds-select
+ formControlName="owner"
+ label="User"
+ i18n-label
+ cdRequiredField="Owner"
+ [invalid]="topicForm.controls.owner.invalid && topicForm.controls.owner.dirty"
+ id="owner"
+ helperText="This owner will define and control the topic’s settings"
+ [invalidText]="ownerError"
+ i18n-helperText>
+ <option i18n
+ *ngIf="owners === null"
+ value="null">Loading... </option>
+ <option i18n
+ *ngIf="owners !== null"
+ value="">-- Select a user --</option>
+ <option *ngFor="let data of owners"
+ i18n
+ [value]="data">{{ data }}</option>
+ </cds-select>
+ <ng-template #ownerError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('owner', formDir, 'required')"
+ i18n>This field is required.</span>
+ </ng-template>
+ </div>
+ </div>
+
+ <ng-container *ngIf="selectedOption">
+ <div class="form-item form-item-append"
+ cdsRow>
+
+ <!-- Topic Name -->
+ <div cdsCol>
+ <cds-text-label
+ labelInputID="name"
+ i18n
+ i18n-helperText
+ cdRequiredField="Name"
+ [invalid]="topicForm.controls.name.invalid && topicForm.controls.name.dirty"
+ [invalidText]="nameError"
+ helperText="Enter a Topic name">Name
+
+ <input cdsText
+ type="text"
+ id="name"
+ autofocus
+ formControlName="name"
+ [invalid]="topicForm.controls.name.invalid && topicForm.controls.name.dirty"/>
+ </cds-text-label>
+
+ <ng-template #nameError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('name', formDir, 'required')"
+ i18n>This field is required.</span>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('name', formDir, 'notUnique')"
+ i18n>The name is already in use. Please choose a different one</span>
+ </ng-template>
+ </div>
+ </div>
+ <div class="form-item"
+ cdsRow>
+ <fieldset>
+ <legend i18n
+ cdsCol
+ class="cd-header">
+ Generate push endpoint
+ <cd-help-text> Configure the endpoint URL to receive push notifications</cd-help-text>
+ </legend>
+
+ <!-- Enable SSL -->
+ <div cdsCol
+ [columnNumbers]="{sm: 4}">
+ <cds-checkbox id="enable_ssl"
+ formControlName="enable_ssl"
+ (checkedChange)="onSecureSSLChange($event)">
+ <ng-container i18n>SSL</ng-container>
+ <cd-help-text i18n>
+ Enabling SSL ensures that your connection is encrypted and secure. You must have a valid SSL certificate installed on your server.
+ </cd-help-text>
+ </cds-checkbox>
+ </div>
+
+ <!-- Verify ssl -->
+ <div cdsCol
+ *ngIf="(selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.HTTP)"
+ [columnNumbers]="{sm: 4}">
+
+ <cds-checkbox id="verify_ssl"
+ formControlName="verify_ssl">
+ <ng-container i18n>Verify SSL</ng-container>
+ <cd-help-text i18n>Ensures that the server's SSL certificate is valid and trusted.</cd-help-text>
+ </cds-checkbox>
+ </div>
+
+ <!-- Cloud Event-->
+ <div cdsCol
+ *ngIf="(selectedOption == hostProtocols.HTTP)"
+ [columnNumbers]="{sm: 4}">
+ <cds-checkbox id="cloud_events"
+ formControlName="cloud_events">
+ <ng-container i18n>Cloud events</ng-container>
+ <cd-help-text i18n>Captures cloud events as triggers for notifications.</cd-help-text>
+ </cds-checkbox>
+ </div>
+ </fieldset>
+ </div>
+
+ <div cdsRow
+ class="form-item form-item-append">
+
+ <!-- Fqdn-->
+ <div cdsCol>
+ <cds-text-label
+ labelInputID="fqdn"
+ cdRequiredField="RGW Gateway Hostname"
+ [invalid]="topicForm.controls.fqdn.invalid && topicForm.controls.fqdn.dirty"
+ [invalidText]="fqdnError"
+ helperText="Enter the FQDN to configure the topic's settings and behavior"
+ i18n-helperText
+ i18n>
+ RGW Gateway hostname
+ <input cdsText
+ type="text"
+ id="fqdn"
+ placeholder="e.g., 127.0.0.1 or localhost"
+ formControlName="fqdn"
+ [invalid]="topicForm.controls.fqdn.invalid && topicForm.controls.fqdn.dirty"
+ (change)="generatePushEndpoint()"/>
+ </cds-text-label>
+ <ng-template #fqdnError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('fqdn', formDir, 'required')">This field is required</span>
+ </ng-template>
+ </div>
+
+ <!-- Port-->
+ <div cdsCol>
+ <cds-number
+ id="Port"
+ formControlName="port"
+ cdRequiredField="Port"
+ label="Port"
+ i18n-label
+ [min]="1"
+ [invalid]="topicForm.controls.port.invalid && topicForm.controls.port.dirty"
+ [invalidText]="portError"
+ (change)="generatePushEndpoint()"
+ helperText="Enter the port number for the push endpoint"
+ i18n-helperText
+ ></cds-number>
+ <ng-template #portError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('port', formDir, 'required')"
+ i18n>This field is required</span>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.controls.port.hasError('pattern') && topicForm.controls.port.touched"
+ i18n> Port must be a valid integer</span>
+ </ng-template>
+ </div>
+ </div>
+
+ <div cdsRow
+ *ngIf="(selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA)"
+ class="form-item form-item-append">
+
+ <!-- User-->
+ <div cdsCol>
+ <cds-text-label labelInputID="User"
+ i18n
+ helperText="Enter the user for the push endpoint"
+ i18n-helperText>
+ User
+ <input cdsText
+ type="text"
+ id="user"
+ formControlName="user"
+ (change)="generatePushEndpoint()"/>
+ </cds-text-label>
+ </div>
+
+ <!-- password-->
+ <div cdsCol
+ *ngIf="(selectedOption !==hostProtocols.HTTP)">
+ <cds-password-label labelInputID="password"
+ [invalid]="!topicForm.controls.password.valid && topicForm.controls.password.dirty"
+ [invalidText]="passwordError"
+ i18n
+ helperText="Enter the password for the push endpoint"
+ i18n-helperText
+ >
+ Password
+ <input cdsPassword
+ type="password"
+ autocomplete="new-password"
+ formControlName="password"
+ (change)="generatePushEndpoint()"/>
+ </cds-password-label>
+ <ng-template #passwordError>
+ <span class="invalid-feedback"
+ *ngIf="userForm.showError('password', formDir, 'required')"
+ i18n>This field is required</span>
+ <span class="invalid-feedback"
+ *ngIf="userForm.showError('password', formDir, 'passwordPolicy')">
+ {{ passwordValuation }}
+ </span>
+ </ng-template>
+ </div>
+ </div>
+
+ <!--Vhost-->
+ <div cdsRow
+ class="form-item form-item-append"
+ *ngIf="(selectedOption === hostProtocols.AMQP)">
+ <div cdsCol>
+ <cds-text-label labelInputID="Virtual Host(vhost)"
+ i18n
+ helperText="Enter the vhost for the push endpoint"
+ i18n-helperText>
+ Virtual Host(vhost)
+ <input cdsText
+ type="text"
+ id="vhost"
+ (change)="generatePushEndpoint()"
+ formControlName="vhost" />
+ </cds-text-label>
+ </div>
+ </div>
+
+ <div cdsRow
+ class="form-item form-item-append">
+
+ <!--push_endpoint -->
+ <div cdsCol>
+ <cds-text-label
+ labelInputID="push_endpoint"
+ i18n
+ cdRequiredField="Push endpoint"
+ [invalid]="topicForm.controls.push_endpoint.invalid && topicForm.controls.push_endpoint.dirty"
+ [invalidText]="endpointError"
+ helperText="Specify the endpoint URL for receiving push notifications"
+ i18n-helperText>
+ Push endpoint
+ <input
+ cdsText
+ type="text"
+ [placeholder]="pushEndpointPlaceholder"
+ i18n-placeholder
+ id="push_endpoint"
+ formControlName="push_endpoint"
+ [invalid]="topicForm.controls.push_endpoint.invalid && topicForm.controls.push_endpoint.dirty"
+ />
+ </cds-text-label>
+ <ng-template #endpointError>
+ <span class="invalid-feedback"
+ *ngIf="topicForm.showError('push_endpoint', formDir, 'required')"
+ i18n>This field is required.</span>
+ </ng-template>
+ </div>
+ </div>
+ <ng-container *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+ <div class="form-item">
+ <fieldset>
+ <legend i18n
+ class="cd-header">
+ {{ selectedOption === 'AMQP' ? 'AMQP attributes' : 'KAFKA attributes' }}
+ <cd-help-text> {{ attributeHelpText }}</cd-help-text>
+ </legend>
+ </fieldset>
+ <ng-container *ngIf="selectedOption === 'KAFKA'">
+ <div cdsRow
+ class="form-item-append">
+
+ <div cdsCol
+ *ngIf="selectedOption === 'KAFKA'">
+ <cds-checkbox id="use-ssl"
+ formControlName="use_ssl">
+ <ng-container i18n>Use SSL</ng-container>
+ <cd-help-text i18n>
+ Enabling SSL encrypts communication between your Kafka client and broker, ensuring the confidentiality and integrity of your messages
+ </cd-help-text>
+ </cds-checkbox>
+ </div>
+ </div>
+ </ng-container>
+ </div>
+
+ <div class="form-item form-item-append"
+ cdsRow>
+
+ <!---CA location--->
+ <div cdsCol
+ *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+ <cds-text-label labelInputID="Ca-location"
+ i18n
+ helperText="The file path of the CA certificate used to verify the server"
+ i18n-helperText>
+ CA location
+ <input cdsText
+ type="text"
+ id="ca_location"
+ formControlName="ca_location"/>
+ </cds-text-label>
+ </div>
+ <div cdsCol
+ *ngIf="selectedOption === hostProtocols.AMQP">
+ <cds-text-label labelInputID="AMQP exchange"
+ i18n
+ helperText="Name of the AMQP exchange to publish messages to; must exist on the broker"
+ i18n-helperText>
+ AMQP exchange
+ <input cdsText
+ type="text"
+ id="amqp_exchange"
+ formControlName="amqp_exchange"/>
+ </cds-text-label>
+ </div>
+
+ <!-- mechanism-->
+ <div cdsCol
+ *ngIf="selectedOption ===hostProtocols.KAFKA">
+ <cds-select formControlName="mechanism"
+ label="Mechanism"
+ i18n-label
+ id="mechanism"
+ helperText="Select the authentication mechanism to connect to the Kafka broker"
+ i18n-helperText>
+ <option i18n
+ value="">-- Select a KAFKA mechanism --</option>
+ <option *ngFor="let data of kafkaMechanism"
+ i18n>{{ data }}</option>
+ </cds-select>
+ </div>
+ </div>
+ <div class="form-item form-item-append"
+ cdsRow>
+
+ <!-- Ack level -->
+ <div cdsCol
+ *ngIf="selectedOption === hostProtocols.AMQP || selectedOption === hostProtocols.KAFKA">
+ <cds-select formControlName="ack_level"
+ label="Ack level"
+ i18n-label
+ id="ack_level"
+ helperText="Select the acknowledgment level to control message delivery guarantees between client and broker"
+ i18n-helperText>
+ <option i18n
+ value="">-- Select the {{ selectedOption }} ack level --</option>
+ <option *ngFor="let level of ackLevels"
+ [value]="level"
+ i18n>{{ level }}</option>
+ </cds-select>
+ </div>
+
+ <!-- kafka-brokers-->
+ <div cdsCol
+ *ngIf="selectedOption === hostProtocols.KAFKA">
+ <cds-text-label labelInputID="kafka_brokers"
+ i18n
+ helperText="Specify the address of the Kafka broker (e.g., host:9092) "
+ i18n-helperText>
+ Kafka brokers
+ <input cdsText
+ type="text"
+ id="kafka_brokers"
+ formControlName="kafka_brokers"/>
+ </cds-text-label>
+ </div>
+ </div>
+ </ng-container>
+ <ng-container>
+ <div cdsRow
+ class="form-item form-item-append">
+ <legend i18n
+ cdsCol
+ class="cd-header">
+ Additional common attributes
+ <cd-help-text>Configure additional attributes to customize the topic's behavior and settings</cd-help-text>
+ </legend>
+
+ <!-- Persistent-->
+ <div cdsCol
+ [columnNumbers]="{sm: 4}">
+ <cds-checkbox id="persistent"
+ formControlName="persistent">
+ <ng-container i18n>Persistent</ng-container>
+ <cd-help-text i18n> Select the checkbox to ensure notifications are retried.</cd-help-text>
+ </cds-checkbox>
+ </div>
+ </div>
+ <div cdsRow
+ class="form-item form-item-append">
+
+ <!-- Opaque data-->
+ <div cdsCol>
+ <cds-text-label labelInputID="opaque_data"
+ i18n
+ i18n-helperText
+ helperText="A user-defined metadata added to all notifications that are triggered by the topic.">
+ Opaque data
+ <input cdsText
+ type="text"
+ id="opaqueData"
+ formControlName="OpaqueData"/>
+ </cds-text-label>
+ </div>
+
+ <!-- Time to live-->
+ <div cdsCol>
+ <cds-number id="time_to_live"
+ formControlName="time_to_live"
+ label="Time to live"
+ i18n-label
+ [min]="1"
+ helperText="Time limit (in seconds) for retaining notifications"
+ i18n-helperText>Time to live </cds-number>
+ </div>
+
+ <!-- Max retries-->
+ <div cdsCol>
+ <cds-number id="max_retries"
+ label="Max retries"
+ formControlName="max_retries"
+ i18n-label
+ [min]="1"
+ helperText="Max retries before expiring notifications"
+ i18n-helperText> Max retries </cds-number>
+ </div>
+
+ <!-- Retry sleep duration-->
+ <div cdsCol>
+ <cds-number
+ id="retry_sleep_duration"
+ label="Retry sleep duration"
+ formControlName="retry_sleep_duration"
+ i18n-label
+ [min]="1"
+ helperText="Controls the frequency of retrying the notifications"
+ i18n-helperText
+ >
+ Retry sleep duration
+ </cds-number>
+ </div>
+ </div>
+
+ <div cdsRow
+ class="form-item form-item-append">
+
+ <!-- policy-->
+ <div cdsCol>
+ <cds-textarea-label i18n>
+ <span> {{'Policy'}}</span>
+ <textarea cdsTextArea
+ #topicPolicyTextArea
+ row="4"
+ cols="200"
+ formControlName="policy"
+ aria-label="textarea"
+ (change)="textAreaOnChange('topicPolicyTextArea')"></textarea>
+ </cds-textarea-label>
+ <cd-help-text i18n>JSON formatted document</cd-help-text>
+ <div cdsRow>
+ <div cdsCol>
+ <cds-button-set class="mt-1">
+ <button cdsButton="tertiary"
+ id="example-generator-button"
+ (click)="openUrl('https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html?icmpid=docs_amazons3_console')"
+ i18n>
+ Policy examples
+ <svg cdsIcon="launch"
+ size="16"
+ class="cds--btn__icon"></svg>
+ </button>
+ <button cdsButton="tertiary"
+ id="example-generator-button"
+ (click)="openUrl('https://awspolicygen.s3.amazonaws.com/policygen.html')"
+ i18n>
+ Policy generator
+ <svg cdsIcon="launch"
+ size="16"
+ class="cds--btn__icon"></svg>
+ </button>
+ </cds-button-set>
+ </div>
+ <div cdsCol>
+ <cds-button-set class="float-end mt-1">
+ <button cdsButton="tertiary"
+ id="clear-bucket-policy"
+ (click)="clearTextArea('policy', '{}')"
+ i18n>
+ Clear
+ <svg cdsIcon="close"
+ size="32"
+ class="cds--btn__icon"></svg>
+ </button>
+ </cds-button-set>
+ </div>
+ </div>
+ </div>
+ </div>
+ </ng-container>
+ </ng-container>
+ <div class="card-footer">
+ <cd-form-button-panel (submitActionEvent)="submitAction()"
+ [form]="topicForm"
+ [submitText]="(action | titlecase) + ' ' + (resource | upperFirst)"
+ wrappingClass="text-right"></cd-form-button-panel>
+ </div>
+ </form>
+ </div>
--- /dev/null
+import { ComponentFixture, TestBed } from '@angular/core/testing';
+import { RgwTopicFormComponent } from './rgw-topic-form.component';
+import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
+import { NotificationService } from '~/app/shared/services/notification.service';
+import { RouterTestingModule } from '@angular/router/testing';
+import { ReactiveFormsModule } from '@angular/forms';
+import { HttpClientTestingModule } from '@angular/common/http/testing';
+import { ActionLabelsI18n } from '~/app/shared/constants/app.constants';
+import { TextAreaJsonFormatterService } from '~/app/shared/services/text-area-json-formatter.service';
+import { SharedModule } from '~/app/shared/shared.module';
+import { ToastrModule } from 'ngx-toastr';
+import { GridModule, InputModule, SelectModule } from 'carbon-components-angular';
+import { NO_ERRORS_SCHEMA } from '@angular/compiler';
+
+describe('RgwTopicFormComponent', () => {
+ let component: RgwTopicFormComponent;
+ let fixture: ComponentFixture<RgwTopicFormComponent>;
+ let textAreaJsonFormatterService: TextAreaJsonFormatterService;
+
+ const mockNotificationService = {
+ show: jest.fn()
+ };
+ const mockRouter = {
+ navigate: jest.fn(),
+ url: '/rgw/topic/create'
+ };
+
+ const mockActivatedRoute = {
+ snapshot: {
+ paramMap: {
+ get: jest.fn()
+ }
+ }
+ };
+ beforeEach(async () => {
+ await TestBed.configureTestingModule({
+ declarations: [RgwTopicFormComponent],
+ imports: [
+ ReactiveFormsModule,
+ RouterTestingModule,
+ HttpClientTestingModule,
+ SharedModule,
+ ToastrModule.forRoot(),
+ SelectModule,
+ GridModule,
+ InputModule
+ ],
+ providers: [
+ RgwTopicService,
+ TextAreaJsonFormatterService,
+ {
+ provide: ActionLabelsI18n,
+ useValue: { CREATE: 'Create', EDIT: 'Edit', Delete: 'Delete' }
+ },
+ {
+ provide: NotificationService,
+ useValue: mockNotificationService
+ },
+ { provide: 'Router', useValue: mockRouter },
+ { provide: 'ActivatedRoute', useValue: mockActivatedRoute }
+ ],
+ schemas: [NO_ERRORS_SCHEMA]
+ }).compileComponents();
+ });
+
+ beforeEach(() => {
+ fixture = TestBed.createComponent(RgwTopicFormComponent);
+ component = fixture.componentInstance;
+ textAreaJsonFormatterService = TestBed.inject(TextAreaJsonFormatterService);
+ fixture.detectChanges();
+ });
+
+ it('should create the component', () => {
+ expect(component).toBeTruthy();
+ });
+
+ it('should initialize the form correctly', () => {
+ const form = component.topicForm;
+ expect(form).toBeDefined();
+ expect(form.get('name')).toBeDefined();
+ expect(form.get('owner')).toBeDefined();
+ expect(form.get('push_endpoint')).toBeDefined();
+ });
+
+ it('should generate push endpoint for HTTP', () => {
+ component.topicForm.setValue({
+ owner: 'dashboard',
+ name: 'Testtopic',
+ OpaqueData: 'test@123',
+ endpointType: 'HTTP',
+ fqdn: 'localhost',
+ port: '80',
+ push_endpoint: 'http://localhost:80',
+ verify_ssl: false,
+ persistent: 'true',
+ max_retries: '3',
+ time_to_live: '100',
+ retry_sleep_duration: '10',
+ policy: '{}',
+ cloud_events: 'true',
+ user: '',
+ password: '',
+ vhost: '',
+ ca_location: '',
+ amqp_exchange: '',
+ ack_level: '',
+ use_ssl: false,
+ kafka_brokers: '',
+ mechanism: '',
+ enable_ssl: false
+ });
+ component.generatePushEndpoint(false);
+ expect(component.topicForm.get('push_endpoint')?.value).toBe('http://localhost:80');
+ });
+
+ it('should format JSON in the policy field on text area change', () => {
+ const formatSpy = jest.spyOn(textAreaJsonFormatterService, 'format');
+ const textArea: any = { nativeElement: { value: '{"key": "value"}' } };
+ component.textAreaOnChange(textArea);
+ expect(formatSpy).toHaveBeenCalledWith(textArea);
+ });
+
+ it('should generate HTTP push endpoint', () => {
+ component.selectedOption = 'HTTP';
+ component.topicForm.patchValue({
+ fqdn: 'example.com',
+ port: '8080',
+ enable_ssl: false
+ });
+
+ component.generatePushEndpoint();
+ expect(component.topicForm.get('push_endpoint')?.value).toBe('http://example.com:8080');
+ });
+
+ it('should generate AMQP push endpoint with auth', () => {
+ component.selectedOption = 'AMQP';
+ component.topicForm.patchValue({
+ user: 'guest',
+ password: 'guest',
+ fqdn: 'mq.example.com',
+ port: '5672',
+ vhost: '/'
+ });
+
+ component.generatePushEndpoint();
+ expect(component.topicForm.get('push_endpoint')?.value).toBe(
+ 'amqps://guest:guest@mq.example.com:5672/'
+ );
+ });
+
+ it('should disable verify_ssl and use_ssl when enable_ssl is false', () => {
+ component.topicForm.patchValue({ enable_ssl: false });
+ component.topicForm.get('enable_ssl')?.setValue(false);
+ fixture.detectChanges();
+
+ expect(component.topicForm.get('verify_ssl')?.value).toBe(false);
+ expect(component.topicForm.get('use_ssl')?.value).toBe(false);
+ });
+});
--- /dev/null
+import { AfterViewChecked, Component, ElementRef, OnInit, ViewChild } from '@angular/core';
+import { ActionLabelsI18n, URLVerbs } from '~/app/shared/constants/app.constants';
+import { CdForm } from '~/app/shared/forms/cd-form';
+import { CdFormGroup } from '~/app/shared/forms/cd-form-group';
+import { UntypedFormControl, Validators } from '@angular/forms';
+import * as _ from 'lodash';
+import { TextAreaJsonFormatterService } from '~/app/shared/services/text-area-json-formatter.service';
+import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
+import { NotificationType } from '~/app/shared/enum/notification-type.enum';
+import { ActivatedRoute, Router } from '@angular/router';
+import { NotificationService } from '~/app/shared/services/notification.service';
+
+import { RgwUserService } from '~/app/shared/api/rgw-user.service';
+import { CdValidators } from '~/app/shared/forms/cd-validators';
+import {
+ AMQP_ACK_LEVEL,
+ TopicRequest,
+ END_POINT_TYPE,
+ KAFKA_ACK_LEVEL,
+ KAFKA_MECHANISM,
+ Topic,
+ URLPort,
+ HostURLProtocol,
+ URL_FORMAT_PLACEHOLDERS,
+ UrlProtocol,
+ Endpoint
+} from '~/app/shared/models/topic.model';
+
+const BASE_URL = 'rgw/topic';
+@Component({
+ selector: 'cd-rgw-topic-form',
+ templateUrl: './rgw-topic-form.component.html',
+ styleUrls: ['./rgw-topic-form.component.scss']
+})
+export class RgwTopicFormComponent extends CdForm implements OnInit, AfterViewChecked {
+ @ViewChild('topicPolicyTextArea')
+ public topicPolicyTextArea: ElementRef<any>;
+ topicForm: CdFormGroup;
+ action: string;
+ resource: string;
+ endpointType: string[] = [];
+ ackLevels: string[] = [];
+ selectedOption: string;
+ port: string;
+ owners: string[];
+ vhost: string;
+ kafkaMechanism: string[] = [];
+ editing: boolean = false;
+ topicId: string;
+ hostProtocols: typeof Endpoint = Endpoint;
+ key: string = '';
+ protocolPlaceholders: Record<string, string> = {
+ HTTP: URL_FORMAT_PLACEHOLDERS.http,
+ AMQP: URL_FORMAT_PLACEHOLDERS.amqp,
+ KAFKA: URL_FORMAT_PLACEHOLDERS.kafka
+ };
+
+ constructor(
+ public actionLabels: ActionLabelsI18n,
+ private textAreaJsonFormatterService: TextAreaJsonFormatterService,
+ public rgwTopicService: RgwTopicService,
+ private rgwUserService: RgwUserService,
+ public notificationService: NotificationService,
+ private router: Router,
+ private route: ActivatedRoute
+ ) {
+ super();
+ this.editing = this.router.url.startsWith(`/rgw/topic/${URLVerbs.EDIT}`);
+ this.action = this.editing ? this.actionLabels.EDIT : this.actionLabels.CREATE;
+ this.resource = $localize`topic`;
+ }
+
+ ngAfterViewChecked(): void {
+ this.textAreaOnChange(this.topicPolicyTextArea);
+ }
+
+ ngOnInit(): void {
+ this.endpointType = Object.values(END_POINT_TYPE);
+ this.createForm();
+ this.rgwUserService.enumerate().subscribe((data: string[]) => {
+ this.owners = data.sort();
+ if (this.editing) {
+ this.topicId = this.route.snapshot.paramMap.get('name');
+ this.topicId = decodeURIComponent(this.topicId);
+ this.loadTopicData(this.topicId);
+ } else {
+ this.loadingReady();
+ }
+ });
+
+ this.topicForm.get('user')?.valueChanges.subscribe(() => this.setMechanism());
+ this.topicForm.get('password')?.valueChanges.subscribe(() => this.setMechanism());
+ this.topicForm.get('endpointType')?.valueChanges.subscribe((option: string) => {
+ this.ackLevels = Object.values(option === Endpoint.AMQP ? AMQP_ACK_LEVEL : KAFKA_ACK_LEVEL);
+ });
+ this.kafkaMechanism = Object.values(KAFKA_MECHANISM);
+
+ this.setMechanism();
+
+ this.topicForm.get('enable_ssl')!.valueChanges.subscribe((enabled: boolean) => {
+ const verifySSLControl = this.topicForm.get('verify_ssl');
+ const useSSLControl = this.topicForm.get('use_ssl');
+ if (enabled) {
+ verifySSLControl!.enable();
+ useSSLControl!.enable();
+ } else {
+ verifySSLControl!.disable();
+ useSSLControl!.disable();
+ useSSLControl!.setValue(false);
+ verifySSLControl!.setValue(false);
+ }
+ });
+ }
+
+ loadTopicData(topicId: string) {
+ this.rgwTopicService.getTopic(topicId).subscribe((topic: Topic) => {
+ this.topicForm.get('name')?.disable();
+ let url = topic.dest.push_endpoint;
+ let hostname = url.split('://')[0];
+ let endpointType: string;
+ if (hostname === HostURLProtocol.amqp || hostname === HostURLProtocol.amqps) {
+ endpointType = Endpoint.AMQP;
+ } else if (hostname === HostURLProtocol.https || hostname === HostURLProtocol.http) {
+ endpointType = Endpoint.HTTP;
+ } else {
+ endpointType = Endpoint.KAFKA;
+ }
+ this.selectedOption = endpointType;
+ this.getPushUrlQueryValues(topic);
+ this.topicForm.get('owner').disable();
+ this.loadingReady();
+ });
+ }
+
+ get pushEndpointPlaceholder(): string {
+ return (
+ this.protocolPlaceholders[this.topicForm.get('endpointType')?.value] ||
+ $localize`Enter endpoint URL`
+ );
+ }
+
+ createForm() {
+ this.topicForm = new CdFormGroup({
+ owner: new UntypedFormControl('', { validators: [Validators.required] }),
+ name: new UntypedFormControl(
+ '',
+ [Validators.required],
+ CdValidators.unique(this.rgwTopicService.exists, this.rgwTopicService)
+ ),
+ push_endpoint: new UntypedFormControl(
+ { value: '', disabled: true },
+ { validators: [Validators.required] }
+ ),
+ OpaqueData: new UntypedFormControl(''),
+ persistent: new UntypedFormControl('0'),
+ max_retries: new UntypedFormControl('0'),
+ time_to_live: new UntypedFormControl('0'),
+ retry_sleep_duration: new UntypedFormControl('0'),
+ policy: new UntypedFormControl('{}', CdValidators.json()),
+ endpointType: new UntypedFormControl('', { validators: [Validators.required] }),
+ port: new UntypedFormControl('', {
+ validators: [Validators.required, Validators.pattern('^[0-9]+$')]
+ }),
+ verify_ssl: new UntypedFormControl(true),
+ enable_ssl: new UntypedFormControl(true),
+ cloud_events: new UntypedFormControl(),
+ user: new UntypedFormControl(),
+ password: new UntypedFormControl(),
+ vhost: new UntypedFormControl(),
+ ca_location: new UntypedFormControl(),
+ amqp_exchange: new UntypedFormControl(),
+ ack_level: new UntypedFormControl(),
+ use_ssl: new UntypedFormControl(false),
+ kafka_brokers: new UntypedFormControl(),
+ mechanism: new UntypedFormControl(),
+ fqdn: new UntypedFormControl('', { validators: [Validators.required] })
+ });
+ }
+
+ onEndpointTypeChange() {
+ this.selectedOption = this.topicForm.get('endpointType').value;
+ const secureSslChecked = this.topicForm.get('enable_ssl')?.value;
+ this.vhost = '/';
+ this.setDefaultPort(secureSslChecked, this.selectedOption);
+ this.generatePushEndpoint(secureSslChecked);
+ this.reset();
+ }
+
+ setDefaultPort(enableSSL: boolean, selectedValue: string) {
+ this.port = this.getPort(selectedValue as HostURLProtocol, enableSSL).toString();
+ this.topicForm.patchValue({ port: this.port });
+ }
+
+ onSecureSSLChange(event: any) {
+ const ssl = !!event;
+ this.port = this.getPort(this.selectedOption as HostURLProtocol, ssl).toString();
+ this.topicForm.patchValue({ port: this.port });
+ this.generatePushEndpoint(ssl);
+ }
+
+ private getPort(protocol: HostURLProtocol, ssl: boolean): number {
+ const map = {
+ [Endpoint.HTTP]: [URLPort.HTTP, URLPort.HTTPS],
+ [Endpoint.AMQP]: [URLPort.AMQP, URLPort.AMQPS],
+ [Endpoint.KAFKA]: [URLPort.KAFKA, URLPort.KAFKA_SSL]
+ };
+ return ssl ? map[protocol][1] : map[protocol][0];
+ }
+
+ textAreaOnChange(textArea: ElementRef<any>) {
+ this.textAreaJsonFormatterService.format(textArea);
+ }
+ setMechanism(): void {
+ const user = this.topicForm.get('user')?.value;
+ const password = this.topicForm.get('password')?.value;
+ const mechanismControl = this.topicForm.get('mechanism');
+ let defaultMechanism = '';
+ if (user && password) {
+ defaultMechanism = KAFKA_MECHANISM.PLAIN;
+ }
+ mechanismControl?.setValue(defaultMechanism);
+ }
+
+ generatePushEndpoint(secureSsl?: boolean) {
+ if (!this.selectedOption) {
+ return;
+ }
+ let generatedEndpoint = '';
+ const ssl = secureSsl !== undefined ? secureSsl : this.topicForm.get('enable_ssl')?.value;
+ const fqdn = this.topicForm.get('fqdn')?.value || '<fqdn>';
+ const port = this.topicForm.get('port')?.value || '[:port]';
+ switch (this.selectedOption) {
+ case Endpoint.HTTP:
+ generatedEndpoint = `http${ssl ? 's' : ''}://${fqdn}:${port}`;
+ break;
+ case Endpoint.AMQP:
+ generatedEndpoint = this.generateAMQPEndpoint(port, fqdn, ssl ? 's' : '');
+
+ break;
+ case Endpoint.KAFKA:
+ generatedEndpoint = this.generateKafkaEndpoint(fqdn, port);
+ break;
+ default:
+ generatedEndpoint = '';
+ }
+ if (generatedEndpoint) {
+ this.topicForm.patchValue({ push_endpoint: generatedEndpoint });
+ }
+ }
+
+ generateAMQPEndpoint(port: string, fqdn: string, ssl: string): string {
+ let generatedEndpoint;
+ const userAmqp = this.topicForm.get('user')?.value;
+ const passwordAmqp = this.topicForm.get('password')?.value;
+ const vhostAmqp = this.topicForm.get('vhost')?.value || '/';
+ generatedEndpoint = `amqp${ssl ? 's' : ''}://${fqdn}:${port}${vhostAmqp}`;
+ if (userAmqp && passwordAmqp) {
+ generatedEndpoint = `amqp${
+ ssl ? 's' : ''
+ }://${userAmqp}:${passwordAmqp}@${fqdn}:${port}${vhostAmqp}`;
+ }
+ return generatedEndpoint;
+ }
+
+ generateKafkaEndpoint(fqdn: string, port: string): string {
+ let generatedEndpoint;
+ const kafkaProtocol = HostURLProtocol.kafka;
+ const userKafka = this.topicForm.get('user')?.value;
+ const passwordKafka = this.topicForm.get('password')?.value;
+ const kafkaBrokers = this.topicForm.get('kafka_brokers')?.value;
+ generatedEndpoint = `${kafkaProtocol}://${fqdn}:${port}`;
+ if (userKafka && passwordKafka) {
+ generatedEndpoint = `${kafkaProtocol}://${userKafka}:${passwordKafka}@${fqdn}:${port}`;
+ } else if (kafkaBrokers) {
+ generatedEndpoint = `kafka://${kafkaBrokers}`;
+ } else {
+ generatedEndpoint = `kafka://${fqdn}:${port}`;
+ }
+
+ return generatedEndpoint;
+ }
+
+ getTopicPolicy() {
+ return this.topicForm.getValue('policy') || '{}';
+ }
+
+ getPushUrlQueryValues(topic: Topic) {
+ let url = topic.dest.push_endpoint;
+ let pushEndpointUrl = this.convertUrlToObject(url);
+ let pushendpointArg = topic.dest.push_endpoint_args;
+ const pushendpointAddarg = this.extractAdditionalValues(pushendpointArg);
+ const protocol = pushEndpointUrl.protocol?.toLowerCase();
+ switch (protocol) {
+ case UrlProtocol.AMQP:
+ case UrlProtocol.AMQPS:
+ this.selectedOption = Endpoint.AMQP;
+ break;
+
+ case UrlProtocol.HTTP:
+ case UrlProtocol.HTTPS:
+ this.selectedOption = Endpoint.HTTP;
+ break;
+
+ default:
+ this.selectedOption = Endpoint.KAFKA;
+ break;
+ }
+
+ const defaults: typeof this.topicForm.value = _.clone(this.topicForm.value);
+ const keys = Object.keys(this.topicForm.value) as (keyof typeof topic)[];
+ let value: Pick<typeof topic, typeof keys[number]> = _.pick(topic, keys);
+
+ value = _.merge(defaults, value);
+ if (!this.owners.includes(value['owner'])) {
+ this.owners.push(value['owner']);
+ this.topicForm.get('owner').disable();
+ }
+ this.topicForm.patchValue({ endpointType: this.selectedOption });
+ this.topicForm.patchValue({
+ name: topic.name,
+ owner: topic.owner,
+ push_endpoint: topic.dest.push_endpoint,
+ OpaqueData: topic.opaqueData,
+ persistent: topic.dest.persistent,
+ max_retries: topic.dest.max_retries,
+ time_to_live: topic.dest.time_to_live,
+ retry_sleep_duration: topic.dest.retry_sleep_duration,
+ policy: topic.policy,
+ port: pushEndpointUrl.port,
+ fqdn: pushEndpointUrl.hostname,
+ vhost: pushEndpointUrl.pathname,
+ user: pushEndpointUrl.username,
+ password: pushEndpointUrl.password,
+ ca_location: pushendpointAddarg.ca_location,
+ mechanism: pushendpointAddarg.mechanism,
+ enable_ssl:
+ pushEndpointUrl.protocol === UrlProtocol.HTTPS ||
+ pushEndpointUrl.protocol == UrlProtocol.AMQPS ||
+ pushEndpointUrl.protocol === UrlProtocol.KAFKA
+ ? true
+ : false,
+ verify_ssl: pushendpointAddarg.verify_ssl,
+ cloud_events: pushendpointAddarg.cloud_events,
+ amqp_exchange: pushendpointAddarg.amqp_exchange,
+ ack_level: pushendpointAddarg.ack_level,
+ use_ssl: pushendpointAddarg.use_ssl,
+ kafka_brokers: pushendpointAddarg.kafka_brokers
+ });
+ }
+
+ convertUrlToObject(url: string) {
+ const urlObj = new URL(url);
+
+ return {
+ protocol: urlObj.protocol,
+ hostname: urlObj.hostname,
+ pathname: urlObj.pathname,
+ hash: urlObj.hash,
+ port: this.getPortFromUrl(url),
+ username: urlObj.username,
+ password: urlObj.password
+ };
+ }
+
+ getPortFromUrl(url: string): string {
+ const urlObj = new URL(url);
+ let port = urlObj.port;
+ if (!port) {
+ port =
+ urlObj.protocol === UrlProtocol.HTTPS
+ ? URLPort.HTTPS
+ : urlObj.protocol === UrlProtocol.HTTP
+ ? URLPort.HTTP
+ : '';
+ }
+ return port;
+ }
+
+ extractAdditionalValues(str: string): { [key: string]: string } {
+ let obj: { [key: string]: string } = {};
+ let pairs = str.split('&');
+ pairs.forEach((pair) => {
+ let [key, value] = pair.split('=');
+ if (key && value) {
+ obj[key] = value;
+ }
+ });
+ return obj;
+ }
+
+ openUrl(url: string) {
+ window.open(url, '_blank');
+ }
+
+ submitAction() {
+ if (this.topicForm.invalid || this.topicForm.pending) {
+ return this.topicForm.setErrors({ cdSubmitButton: true });
+ }
+ const notificationTitle = this.editing
+ ? $localize`Topic updated successfully`
+ : $localize`Topic created successfully`;
+ const formValue = this.topicForm.getRawValue();
+ const topicPolicy = this.getTopicPolicy();
+ const payload = this.generatePayload(formValue, topicPolicy);
+
+ const action = this.rgwTopicService.create(payload);
+
+ action.subscribe({
+ next: () => {
+ this.notificationService.show(NotificationType.success, notificationTitle);
+ this.goToListView();
+ },
+ error: () => this.topicForm.setErrors({ cdSubmitButton: true })
+ });
+ }
+
+ generatePayload(formValue: any, topicPolicy: any): TopicRequest {
+ const basePayload: TopicRequest = {
+ name: formValue.name,
+ owner: formValue.owner,
+ push_endpoint: formValue.push_endpoint,
+ opaque_data: formValue.OpaqueData,
+ persistent: formValue.persistent,
+ time_to_live: formValue.time_to_live,
+ max_retries: formValue.max_retries,
+ retry_sleep_duration: formValue.retry_sleep_duration,
+ policy: topicPolicy
+ };
+
+ const topicType = formValue.endpointType;
+
+ const additionalFieldsMap: Record<string, Partial<TopicRequest>> = {
+ [Endpoint.KAFKA]: {
+ use_ssl: formValue.use_ssl,
+ ack_level: formValue.ack_level,
+ kafka_brokers: formValue.kafka_brokers,
+ ca_location: formValue.ca_location,
+ mechanism: formValue.mechanism
+ },
+ [Endpoint.AMQP]: {
+ verify_ssl: formValue.verify_ssl,
+ amqp_exchange: formValue.amqp_exchange,
+ ack_level: formValue.ack_level,
+ ca_location: formValue.ca_location
+ },
+ [Endpoint.HTTP]: {
+ verify_ssl: formValue.verify_ssl,
+ cloud_events: formValue.cloud_events
+ }
+ };
+
+ if (additionalFieldsMap[topicType]) {
+ Object.assign(basePayload, additionalFieldsMap[topicType]);
+ }
+
+ return basePayload;
+ }
+
+ get attributeHelpText(): string {
+ return this.selectedOption === this.hostProtocols.AMQP
+ ? $localize`Choose the configuration settings for the AMQP connection`
+ : $localize`Choose the configuration settings for the KAFKA connection`;
+ }
+
+ goToListView() {
+ this.router.navigate([BASE_URL]);
+ }
+
+ clearTextArea(field: string, defaultValue: string = '') {
+ this.topicForm.get(field)?.setValue(defaultValue);
+ this.topicForm.markAsDirty();
+ this.topicForm.updateValueAndValidity();
+ }
+
+ reset() {
+ this.topicForm.patchValue({ enable_ssl: true });
+ this.topicForm.patchValue({ port: this.port });
+ this.topicForm.patchValue({ vhost: this.vhost });
+ }
+}
- <ng-container *ngIf="topic$ | async as topics">
+ <ng-container>
<cd-table #table
[autoReload]="false"
- [data]="topics"
+ [data]="topics$ | async"
[columns]="columns"
columnMode="flex"
selectionType="single"
[hasDetails]="true"
+ id="key"
(setExpandedRow)="setExpandedRow($event)"
(updateSelection)="updateSelection($event)"
- (fetchData)="fetchData($event)">
+ (fetchData)="fetchData()">
+ <cd-table-actions class="table-actions"
+ [permission]="permission"
+ [selection]="selection"
+ [tableActions]="tableActions">
+ </cd-table-actions>
<cd-rgw-topic-details *cdTableDetail
[selection]="expandedRow"></cd-rgw-topic-details>
</cd-table>
import { ComponentFixture, TestBed } from '@angular/core/testing';
-
import { RgwTopicListComponent } from './rgw-topic-list.component';
import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
import { SharedModule } from '~/app/shared/shared.module';
-import { configureTestBed } from '~/testing/unit-test-helper';
+import { configureTestBed, PermissionHelper } from '~/testing/unit-test-helper';
import { RgwTopicDetailsComponent } from '../rgw-topic-details/rgw-topic-details.component';
import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
import { RouterTestingModule } from '@angular/router/testing';
import { HttpClientTestingModule } from '@angular/common/http/testing';
import { ToastrModule } from 'ngx-toastr';
+import { of } from 'rxjs';
+
describe('RgwTopicListComponent', () => {
let component: RgwTopicListComponent;
let fixture: ComponentFixture<RgwTopicListComponent>;
ToastrModule.forRoot(),
RouterTestingModule
],
-
declarations: [RgwTopicListComponent]
}).compileComponents();
fixture = TestBed.createComponent(RgwTopicListComponent);
component = fixture.componentInstance;
rgwtTopicService = TestBed.inject(RgwTopicService);
- rgwTopicServiceListSpy = spyOn(rgwtTopicService, 'listTopic').and.callThrough();
- fixture = TestBed.createComponent(RgwTopicListComponent);
- component = fixture.componentInstance;
+
+ // Stub external methods before ngOnInit triggers
spyOn(component, 'setTableRefreshTimeout').and.stub();
- fixture.detectChanges();
+
+ // Spy on the service method
+ rgwTopicServiceListSpy = spyOn(rgwtTopicService, 'listTopic').and.returnValue(of([]));
+
+ fixture.detectChanges(); // Triggers ngOnInit
});
it('should create', () => {
expect(component).toBeTruthy();
- expect(rgwTopicServiceListSpy).toHaveBeenCalledTimes(1);
+ expect(rgwTopicServiceListSpy).toHaveBeenCalledTimes(2);
});
- it('should call listTopic on ngOnInit', () => {
- component.ngOnInit();
- expect(rgwTopicServiceListSpy).toHaveBeenCalled();
+ it('should test all TableActions combinations', () => {
+ const permissionHelper: PermissionHelper = new PermissionHelper(component.permission);
+ const tableActions = permissionHelper.setPermissionsAndGetActions(component.tableActions);
+
+ expect(tableActions).toEqual({
+ 'create,update,delete': {
+ actions: ['Create', 'Edit', 'Delete'],
+ primary: {
+ multiple: 'Create',
+ executing: 'Create',
+ single: 'Create',
+ no: 'Create'
+ }
+ },
+ 'create,update': {
+ actions: ['Create', 'Edit'],
+ primary: {
+ multiple: 'Create',
+ executing: 'Create',
+ single: 'Create',
+ no: 'Create'
+ }
+ },
+ 'create,delete': {
+ actions: ['Create', 'Delete'],
+ primary: {
+ multiple: 'Create',
+ executing: 'Create',
+ single: 'Create',
+ no: 'Create'
+ }
+ },
+ create: {
+ actions: ['Create'],
+ primary: {
+ multiple: 'Create',
+ executing: 'Create',
+ single: 'Create',
+ no: 'Create'
+ }
+ },
+ 'update,delete': {
+ actions: ['Edit', 'Delete'],
+ primary: {
+ multiple: '',
+ executing: '',
+ single: '',
+ no: ''
+ }
+ },
+ update: {
+ actions: ['Edit'],
+ primary: {
+ multiple: 'Edit',
+ executing: 'Edit',
+ single: 'Edit',
+ no: 'Edit'
+ }
+ },
+ delete: {
+ actions: ['Delete'],
+ primary: {
+ multiple: 'Delete',
+ executing: 'Delete',
+ single: 'Delete',
+ no: 'Delete'
+ }
+ },
+ 'no-permissions': {
+ actions: [],
+ primary: {
+ multiple: '',
+ executing: '',
+ single: '',
+ no: ''
+ }
+ }
+ });
});
});
-import { Component, OnInit, ViewChild } from '@angular/core';
+import { Component, NgZone, OnInit, ViewChild } from '@angular/core';
import _ from 'lodash';
import { ActionLabelsI18n } from '~/app/shared/constants/app.constants';
import { RgwTopicService } from '~/app/shared/api/rgw-topic.service';
import { CdTableSelection } from '~/app/shared/models/cd-table-selection';
-import { BehaviorSubject, Observable, of } from 'rxjs';
+import { URLBuilderService } from '~/app/shared/services/url-builder.service';
+import { Icons } from '~/app/shared/enum/icons.enum';
+import { ModalCdsService } from '~/app/shared/services/modal-cds.service';
+import { TaskWrapperService } from '~/app/shared/services/task-wrapper.service';
+import { FinishedTask } from '~/app/shared/models/finished-task';
+import { DeleteConfirmationModalComponent } from '~/app/shared/components/delete-confirmation-modal/delete-confirmation-modal.component';
import { Topic } from '~/app/shared/models/topic.model';
+import { BehaviorSubject, Observable, of, Subscriber } from 'rxjs';
import { catchError, shareReplay, switchMap } from 'rxjs/operators';
+const BASE_URL = 'rgw/topic';
@Component({
selector: 'cd-rgw-topic-list',
templateUrl: './rgw-topic-list.component.html',
- styleUrls: ['./rgw-topic-list.component.scss']
+ styleUrls: ['./rgw-topic-list.component.scss'],
+ providers: [{ provide: URLBuilderService, useValue: new URLBuilderService(BASE_URL) }]
})
export class RgwTopicListComponent extends ListWithDetails implements OnInit {
@ViewChild('table', { static: true })
context: CdTableFetchDataContext;
errorMessage: string;
selection: CdTableSelection = new CdTableSelection();
- topic$: Observable<Topic[]>;
- subject = new BehaviorSubject<Topic[]>([]);
+ topicsSubject = new BehaviorSubject<Topic[]>([]);
+ topics$ = this.topicsSubject.asObservable();
name: string;
constructor(
private authStorageService: AuthStorageService,
public actionLabels: ActionLabelsI18n,
- private rgwTopicService: RgwTopicService
+ private rgwTopicService: RgwTopicService,
+ private modalService: ModalCdsService,
+ private urlBuilder: URLBuilderService,
+ private taskWrapper: TaskWrapperService,
+ protected ngZone: NgZone
) {
- super();
+ super(ngZone);
this.permission = this.authStorageService.getPermissions().rgw;
}
flexGrow: 2
}
];
- this.topic$ = this.subject.pipe(
+
+ const getBucketUri = () =>
+ this.selection.first() && `${encodeURIComponent(this.selection.first().key)}`;
+ const addAction: CdTableAction = {
+ permission: 'create',
+ icon: Icons.add,
+ routerLink: () => this.urlBuilder.getCreate(),
+ name: this.actionLabels.CREATE,
+ canBePrimary: (selection: CdTableSelection) => !selection.hasSelection
+ };
+
+ const editAction: CdTableAction = {
+ permission: 'update',
+ icon: Icons.edit,
+ routerLink: () => this.urlBuilder.getEdit(getBucketUri()),
+ name: this.actionLabels.EDIT
+ };
+
+ const deleteAction: CdTableAction = {
+ permission: 'delete',
+ icon: Icons.destroy,
+ click: () => this.deleteAction(),
+ disable: () => !this.selection.hasSelection,
+ name: this.actionLabels.DELETE,
+ canBePrimary: (selection: CdTableSelection) => !selection.hasSelection
+ };
+ this.tableActions = [addAction, editAction, deleteAction];
+ this.setTableRefreshTimeout();
+ this.topics$ = this.topicsSubject.pipe(
switchMap(() =>
this.rgwTopicService.listTopic().pipe(
catchError(() => {
shareReplay(1)
);
}
-
fetchData() {
- this.subject.next([]);
+ this.topicsSubject.next([]);
}
updateSelection(selection: CdTableSelection) {
this.selection = selection;
}
+
+ deleteAction() {
+ const key = this.selection.first().key;
+ const name = this.selection.first().name;
+ this.modalService.show(DeleteConfirmationModalComponent, {
+ itemDescription: $localize`Topic`,
+ itemNames: [name],
+ submitActionObservable: () => {
+ return new Observable((observer: Subscriber<any>) => {
+ this.taskWrapper
+ .wrapTaskAroundCall({
+ task: new FinishedTask('rgw/topic/delete', {
+ name: [name]
+ }),
+ call: this.rgwTopicService.delete(key)
+ })
+ .subscribe({
+ error: (error: any) => {
+ observer.error(error);
+ },
+ complete: () => {
+ observer.complete();
+ this.table.refreshBtn();
+ }
+ });
+ });
+ }
+ });
+ }
}
import { NfsClusterComponent } from '../nfs/nfs-cluster/nfs-cluster.component';
import { RgwTopicListComponent } from './rgw-topic-list/rgw-topic-list.component';
import { RgwTopicDetailsComponent } from './rgw-topic-details/rgw-topic-details.component';
+import { RgwTopicFormComponent } from './rgw-topic-form/rgw-topic-form.component';
@NgModule({
imports: [
InputModule,
AccordionModule,
CheckboxModule,
- SelectModule,
NumberModule,
TabsModule,
TagModule,
TooltipModule,
ComboBoxModule,
ToggletipModule,
- RadioModule
+ RadioModule,
+ SelectModule
],
exports: [
RgwDaemonDetailsComponent,
RgwBucketLifecycleListComponent,
RgwRateLimitDetailsComponent,
RgwTopicListComponent,
- RgwTopicDetailsComponent
+ RgwTopicDetailsComponent,
+ RgwTopicFormComponent
],
providers: [TitleCasePipe]
})
{
path: 'topic',
data: { breadcrumbs: 'Topic' },
- children: [{ path: '', component: RgwTopicListComponent }]
+ children: [
+ { path: '', component: RgwTopicListComponent },
+ {
+ path: URLVerbs.CREATE,
+ component: RgwTopicFormComponent,
+ data: { breadcrumbs: ActionLabels.CREATE }
+ },
+ {
+ path: `${URLVerbs.EDIT}/:name`,
+ component: RgwTopicFormComponent,
+ data: { breadcrumbs: ActionLabels.EDIT }
+ }
+ ]
}
];
import { TestBed } from '@angular/core/testing';
-
import { RgwTopicService } from './rgw-topic.service';
-import { configureTestBed } from '~/testing/unit-test-helper';
+import { configureTestBed, RgwHelper } from '~/testing/unit-test-helper';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
describe('RgwTopicService', () => {
beforeEach(() => {
service = TestBed.inject(RgwTopicService);
httpTesting = TestBed.inject(HttpTestingController);
- });
-
- afterEach(() => {
- httpTesting.verify();
+ RgwHelper.selectDaemon();
});
it('should be created', () => {
expect(service).toBeTruthy();
});
-
- it('should call list with empty result', () => {
- let result;
- service.listTopic().subscribe((resp) => {
- result = resp;
- });
- const req = httpTesting.expectOne(`api/rgw/topic`);
- expect(req.request.method).toBe('GET');
- req.flush([]);
- expect(result).toEqual([]);
- });
it('should call list with result', () => {
service.listTopic().subscribe((resp) => {
let result = resp;
req.flush(['foo', 'bar']);
});
- it('should call get', () => {
- service.getTopic('foo').subscribe();
+ it('should call create', () => {
+ service.create({} as any).subscribe();
+ const req = httpTesting.expectOne(`api/rgw/topic?${RgwHelper.DAEMON_QUERY_PARAM}`);
+ expect(req.request.method).toBe('POST');
+ });
+ it('should call update', () => {
+ service.create({} as any).subscribe();
+ const req = httpTesting.expectOne(`api/rgw/topic?${RgwHelper.DAEMON_QUERY_PARAM}`);
+ expect(req.request.method).toBe('POST');
+ });
+ it('should call delete', () => {
+ service.delete('foo').subscribe();
const req = httpTesting.expectOne(`api/rgw/topic/foo`);
- expect(req.request.method).toBe('GET');
+ expect(req.request.method).toBe('DELETE');
});
});
-import { HttpClient } from '@angular/common/http';
+import { HttpClient, HttpParams } from '@angular/common/http';
import { Injectable } from '@angular/core';
-import { Observable } from 'rxjs';
+import _ from 'lodash';
+import { Observable, of as observableOf } from 'rxjs';
import { ApiClient } from './api-client';
-import { Topic } from '~/app/shared/models/topic.model';
+import { Topic, TopicRequest } from '~/app/shared/models/topic.model';
+import { catchError, mapTo } from 'rxjs/operators';
+import { RgwDaemonService } from './rgw-daemon.service';
@Injectable({
providedIn: 'root'
export class RgwTopicService extends ApiClient {
baseURL = 'api/rgw/topic';
- constructor(private http: HttpClient) {
+ constructor(private http: HttpClient, private rgwDaemonService: RgwDaemonService) {
super();
}
return this.http.get<Topic[]>(this.baseURL);
}
- getTopic(name: string) {
- return this.http.get(`${this.baseURL}/${name}`);
+ getTopic(key: string) {
+ return this.http.get<Topic>(`${this.baseURL}/${encodeURIComponent(key)}`);
+ }
+
+ create(createParam: TopicRequest) {
+ return this.rgwDaemonService.request((params: HttpParams) => {
+ return this.http.post(`${this.baseURL}`, createParam, { params: params });
+ });
+ }
+
+ delete(key: string) {
+ return this.http.delete(`${this.baseURL}/${key}`, {
+ observe: 'response'
+ });
+ }
+
+ exists(key: string): Observable<boolean> {
+ const encodedKey = encodeURIComponent(`:${key}`);
+ return this.getTopic(encodedKey).pipe(
+ mapTo(true),
+ catchError((error: Event) => {
+ if (_.isFunction(error.preventDefault)) {
+ error.preventDefault();
+ }
+ return observableOf(false);
+ })
+ );
}
}
-interface Destination {
+export interface Destination {
push_endpoint: string;
push_endpoint_args: string;
push_endpoint_topic: string;
- stored_secret: string;
+ stored_secret: boolean;
persistent: boolean;
persistent_queue: string;
time_to_live: number;
dest: Destination;
opaqueData: string;
policy: string | {};
+ key: string;
subscribed_buckets: any[];
}
-export interface TopicDetails {
+export interface TopicRequest {
owner: string;
name: string;
- arn: string;
- dest: Destination;
- opaqueData: string;
- policy: string;
- subscribed_buckets: string[];
+ push_endpoint: string;
+ opaque_data: string;
+ policy: {} | string;
+ persistent?: string;
+ time_to_live?: string;
+ max_retries?: string;
+ retry_sleep_duration?: string;
+ verify_ssl?: boolean;
+ cloud_events?: string;
+ ca_location?: string;
+ amqp_exchange?: string;
+ ack_level?: string;
+ use_ssl?: boolean;
+ kafka_brokers?: string;
+ mechanism?: string;
+}
+
+export const KAFKA_MECHANISM = {
+ PLAIN: 'PLAIN',
+ SCRAM256: 'SCRAM-SHA-256',
+ SCRAM512: 'SCRAM-SHA-512'
+};
+export const END_POINT_TYPE = {
+ HTTP: 'HTTP',
+ AMQP: 'AMQP',
+ Kafka: 'KAFKA'
+};
+export const AMQP_ACK_LEVEL = {
+ none: 'none',
+ broker: 'broker',
+ routable: 'routable'
+};
+export const KAFKA_ACK_LEVEL = {
+ none: 'none',
+ broker: 'broker'
+};
+export enum URLPort {
+ HTTP = '80',
+ HTTPS = '443',
+ AMQP = '5672',
+ AMQPS = '5671',
+ KAFKA = '9092',
+ KAFKA_SSL = '9093'
}
+export enum HostURLProtocol {
+ http = 'http',
+ https = 'https',
+ amqp = 'amqp',
+ amqps = 'amqps',
+ kafka = 'kafka'
+}
+export enum Endpoint {
+ HTTP = 'HTTP',
+ AMQP = 'AMQP',
+ AMQPS = 'AMQPS',
+ KAFKA = 'KAFKA'
+}
+
+export enum UrlProtocol {
+ HTTP = 'http:',
+ HTTPS = 'https:',
+ AMQP = 'amqp:',
+ AMQPS = 'amqps:',
+ KAFKA = 'kafka'
+}
+
+export const URL_FORMAT_PLACEHOLDERS = {
+ http: 'http[s]://<fqdn>[:<port]...',
+ amqp: 'amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]...',
+ kafka: 'kafka://[<user>:<password>@]<fqdn>[:<port>]...'
+};
'nfs/delete': this.newTaskMessage(this.commonOperations.delete, (metadata) =>
this.nfs(metadata)
),
+ 'rgw/topic/delete': this.newTaskMessage(this.commonOperations.delete, (metadata) =>
+ this.topic(metadata)
+ ),
// Grafana tasks
'grafana/dashboards/update': this.newTaskMessage(
this.commonOperations.update,
return $localize`SMB users and groups access resource '${metadata.usersGroupsId}'`;
}
+ topic(metadata: any) {
+ return $localize`Topic '${metadata.name}'`;
+ }
service(metadata: any) {
return $localize`service '${metadata.service_name}'`;
}
- RgwSite
/api/rgw/topic:
get:
- parameters:
- - allowEmptyValue: true
- description: Name of the user
- in: query
- name: uid
- schema:
- type: string
- - allowEmptyValue: true
- description: Name of the tenant
- in: query
- name: tenant
- schema:
- type: string
+ parameters: []
responses:
'200':
content:
application/json:
schema:
properties:
- amqp_ack_level:
- description: amqp ack level
+ ack_level:
+ description: Amqp ack level
type: string
amqp_exchange:
- description: amqp exchange
+ description: Amqp exchange
type: string
ca_location:
- description: ca location
+ description: Ca location
type: string
cloud_events:
default: false
- description: cloud events
+ description: Cloud events
type: string
daemon_name:
- type: string
- kafka_ack_level:
- description: kafka ack level
+ description: Name of the daemon
type: string
kafka_brokers:
- description: kafka brokers
+ description: Kafka brokers
type: string
max_retries:
- description: max retries
+ description: Max retries
type: string
mechanism:
- description: mechanism
+ description: Mechanism
type: string
name:
description: Name of the topic
type: string
opaque_data:
- description: ' opaque data'
+ description: OpaqueData
type: string
owner:
+ description: Name of the owner
type: string
persistent:
default: false
- description: persistent
+ description: Persistent
type: boolean
policy:
- description: policy
+ description: Policy
type: string
push_endpoint:
description: Push Endpoint
type: string
retry_sleep_duration:
- description: retry sleep duration
+ description: Retry sleep duration
type: string
time_to_live:
description: Time to live
type: string
use_ssl:
default: false
- description: use ssl
+ description: Use ssl
type: boolean
verify_ssl:
default: false
- description: verify ssl
+ description: Verify ssl
type: boolean
required:
- name
summary: Create a new RGW Topic
tags:
- RGW Topic Management
- /api/rgw/topic/{name}:
+ /api/rgw/topic/{key}:
delete:
parameters:
- - description: Name of the user
+ - description: The metadata object key to retrieve the topic e.g topic:topic_name
in: path
- name: name
+ name: key
required: true
schema:
type: string
- - allowEmptyValue: true
- description: Name of the tenant
- in: query
- name: tenant
- schema:
- type: string
responses:
'202':
content:
- RGW Topic Management
get:
parameters:
- - description: Name of the user
+ - description: The metadata object key to retrieve the topic e.g owner:topic_name
in: path
- name: name
+ name: key
required: true
schema:
type: string
- - allowEmptyValue: true
- description: Name of the tenant
- in: query
- name: tenant
- schema:
- type: string
responses:
'200':
content:
from collections import defaultdict
from enum import Enum
from subprocess import SubprocessError
-from urllib.parse import urlparse
+from urllib.parse import urlparse, urlunparse
import requests
except ModuleNotFoundError:
logging.error("Module 'xmltodict' is not installed.")
-from mgr_util import build_url
+from mgr_util import build_url, name_to_config_section
from .. import mgr
from ..awsauth import S3Auth
@RestClient.api_post('?Action=CreateTopic&Name={name}')
def create_topic(self, request=None, name: str = '',
+ daemon_name: str = '',
push_endpoint: Optional[str] = '', opaque_data: Optional[str] = '',
persistent: Optional[bool] = False, time_to_live: Optional[str] = '',
max_retries: Optional[str] = '', retry_sleep_duration: Optional[str] = '',
policy: Optional[str] = '',
verify_ssl: Optional[bool] = False, cloud_events: Optional[bool] = False,
ca_location: Optional[str] = None, amqp_exchange: Optional[str] = None,
- amqp_ack_level: Optional[str] = None,
- use_ssl: Optional[bool] = False, kafka_ack_level: Optional[str] = None,
+ ack_level: Optional[str] = None,
+ use_ssl: Optional[bool] = False,
kafka_brokers: Optional[str] = None, mechanism: Optional[str] = None,
):
params = {'Name': name}
params['ca_location'] = ca_location
if amqp_exchange:
params['amqp_exchange'] = amqp_exchange
- if amqp_ack_level:
- params['amqp_ack_level'] = amqp_ack_level
+ if ack_level:
+ params['ack_level'] = ack_level
if use_ssl:
params['use_ssl'] = 'true' if use_ssl else 'false'
- if kafka_ack_level:
- params['kafka_ack_level'] = kafka_ack_level
if kafka_brokers:
params['kafka_brokers'] = kafka_brokers
if mechanism:
params['mechanism'] = mechanism
+ if push_endpoint and '://' in push_endpoint and '@' in push_endpoint:
+ try:
+ full_daemon_name = f'rgw.{daemon_name}'
+ CephService.send_command(
+ 'mon', 'config set',
+ who=name_to_config_section(full_daemon_name),
+ name='rgw_allow_notification_secrets_in_cleartext',
+ value='true'
+ )
+ except Exception as e:
+ raise DashboardException(
+ msg=f'Failed to set cleartext secret config: {e}', component='rgw'
+ )
try:
result = request(params=params)
except RequestException as e:
raise DashboardException(msg=str(e), component='rgw')
-
return result
class RgwTopicmanagement:
- def list_topics(self, uid: Optional[str], tenant: Optional[str]):
- rgw_topics_list = {}
- rgw_topic_list_cmd = ['topic', 'list']
- try:
- if uid:
- rgw_topic_list_cmd.append('--uid')
- rgw_topic_list_cmd.append(uid)
- if tenant:
- rgw_topic_list_cmd.append('--tenant')
- rgw_topic_list_cmd.append(tenant)
+ @staticmethod
+ def push_endpoint_password(push_endpoint: str) -> str:
+ parsed = urlparse(push_endpoint)
+ if parsed.username and parsed.password:
+ netloc = f"{parsed.username}:****@{parsed.hostname}"
+ if parsed.port:
+ netloc += f":{parsed.port}"
+ parsed = parsed._replace(netloc=netloc)
+ return urlunparse(parsed)
+ return push_endpoint
+
+ def list_topics(self):
+ try:
+ list_cmd = ['metadata', 'list', 'topic']
+ exit_code, topic_keys, _ = mgr.send_rgwadmin_command(list_cmd)
- exit_code, rgw_topics_list, _ = mgr.send_rgwadmin_command(rgw_topic_list_cmd)
if exit_code > 0:
- raise DashboardException(msg='Unable to fetch topic list',
- http_status_code=500, component='rgw')
- return rgw_topics_list
- except SubprocessError as error:
- raise DashboardException(error, http_status_code=500, component='rgw')
+ raise DashboardException(
+ 'Unable to fetch topic list',
+ http_status_code=500,
+ component='rgw'
+ )
- def get_topic(self, name: str, tenant: Optional[str]):
- rgw_topic_info_cmd = ['topic', 'get']
- try:
- if tenant:
- rgw_topic_info_cmd.append('--tenant')
- rgw_topic_info_cmd.append(tenant)
+ topics_info = []
+
+ for key in topic_keys:
+ get_cmd = ['metadata', 'get', f'topic:{key}']
+ exit_code, topic_info, _ = mgr.send_rgwadmin_command(get_cmd)
+
+ if exit_code == 0 and 'data' in topic_info:
+ data = topic_info['data']
+ modified_data = data.copy()
+ modified_data['key'] = key
+ push_endpoint = data.get('dest', {}).get('push_endpoint')
+ if push_endpoint:
+ modified_data.setdefault('dest', {})
+ modified_data['dest']['push_endpoint'] = self.push_endpoint_password(
+ push_endpoint
+ )
+
+ topics_info.append(modified_data)
- if name:
- rgw_topic_info_cmd.append('--topic')
- rgw_topic_info_cmd.append(name)
+ return topics_info
+
+ except SubprocessError as error:
+ raise DashboardException(str(error), http_status_code=500, component='rgw')
+ def get_topic(self, key):
+ rgw_topic_info_cmd = ['metadata', 'get', f'topic:{key}']
+ try:
exit_code, topic_info, _ = mgr.send_rgwadmin_command(rgw_topic_info_cmd)
if exit_code > 0:
- raise DashboardException('Unable to get topic info',
- http_status_code=500, component='rgw')
+ raise DashboardException(
+ 'Unable to get topic info',
+ http_status_code=500,
+ component='rgw'
+ )
+
+ topic_info = topic_info.get('data', {})
+ topic_info['key'] = key
+ push_endpoint = topic_info.get('dest', {}).get('push_endpoint')
+ if push_endpoint:
+ topic_info.setdefault('dest', {})
+ topic_info['dest']['push_endpoint'] = self.push_endpoint_password(push_endpoint)
+
return topic_info
except SubprocessError as error:
- raise DashboardException(error, http_status_code=500, component='rgw')
+ raise DashboardException(str(error), http_status_code=500, component='rgw')
- def delete_topic(self, name: str, tenant: Optional[str] = None):
- rgw_delete_topic_cmd = ['topic', 'rm']
+ def delete_topic(self, key: str):
+ rgw_delete_metadata_cmd = ['metadata', 'rm', f'topic:{key}']
try:
- if tenant:
- rgw_delete_topic_cmd.extend(['--tenant', tenant])
-
- if name:
- rgw_delete_topic_cmd.extend(['--topic', name])
+ exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_metadata_cmd)
+ if exit_code > 0:
+ raise DashboardException(
+ msg=f'Unable to remove metadata for topic: {key}',
+ http_status_code=500,
+ component='rgw'
+ )
- exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_topic_cmd)
+ return {
+ "status": "success",
+ "message": f"Metadata for topic '{key}' removed successfully."
+ }
- if exit_code > 0:
- raise DashboardException(msg='Unable to delete topic',
- http_status_code=500, component='rgw')
except SubprocessError as error:
- raise DashboardException(error, http_status_code=500, component='rgw')
+ raise DashboardException(str(error), http_status_code=500, component='rgw')
"arn": "arn:aws:sns:zg1-realm1::HttpTest",
"opaqueData": "test123",
"policy": "{}",
+ "key": "RGW22222222222222222:HttpTest",
"subscribed_buckets": []
}
]
"arn": "arn:aws:sns:zg1-realm1::HttpTest",
"opaqueData": "test123",
"policy": "{}",
+ "key": "RGW22222222222222222:HttpTest",
"subscribed_buckets": []
}
]
mock_delete_topic.return_value = None
controller = RgwTopic()
- result = controller.delete(name='HttpTest', tenant=None)
- mock_delete_topic.assert_called_with(name='HttpTest', tenant=None)
+ result = controller.delete(key='RGW22222222222222222:HttpTest')
+ mock_delete_topic.assert_called_with(key='RGW22222222222222222:HttpTest')
self.assertEqual(result, None)