]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: manage flow and pipes apis 57462/head
authorNizamudeen A <nia@redhat.com>
Tue, 14 May 2024 09:53:36 +0000 (15:23 +0530)
committerNizamudeen A <nia@redhat.com>
Tue, 28 May 2024 06:38:45 +0000 (12:08 +0530)
Fixes: https://tracker.ceph.com/issues/66238
Signed-off-by: Nizamudeen A <nia@redhat.com>
src/pybind/mgr/dashboard/controllers/rgw.py
src/pybind/mgr/dashboard/openapi.yaml
src/pybind/mgr/dashboard/services/rgw_client.py

index 7193d225b9330a8400420c9a104aaa87f8975e0b..f48dc592292f7c5a6157fe8da3b571e10182e1a2 100644 (file)
@@ -14,12 +14,12 @@ from ..rest_client import RequestException
 from ..security import Permission, Scope
 from ..services.auth import AuthManager, JwtManager
 from ..services.ceph_service import CephService
-from ..services.rgw_client import NoRgwDaemonsException, RgwClient, RgwMultisite, \
-    SyncStatus
+from ..services.rgw_client import NoRgwDaemonsException, RgwClient, RgwMultisite
 from ..tools import json_str_to_object, str_to_bool
 from . import APIDoc, APIRouter, BaseController, CreatePermission, \
-    CRUDCollectionMethod, CRUDEndpoint, Endpoint, EndpointDoc, ReadPermission, \
-    RESTController, UIRouter, UpdatePermission, allow_empty_body, DeletePermission
+    CRUDCollectionMethod, CRUDEndpoint, DeletePermission, Endpoint, \
+    EndpointDoc, ReadPermission, RESTController, UIRouter, UpdatePermission, \
+    allow_empty_body
 from ._crud import CRUDMeta, Form, FormField, FormTaskInfo, Icon, MethodType, \
     TableAction, Validator, VerticalContainer
 from ._version import APIVersion
@@ -118,6 +118,7 @@ class RgwMultisiteStatus(RESTController):
 class RgwMultisiteController(RESTController):
     @Endpoint(path='/sync_status')
     @EndpointDoc("Get the sync status")
+    @ReadPermission
     @allow_empty_body
     # pylint: disable=W0102,W0613
     def get_sync_status(self):
@@ -128,9 +129,9 @@ class RgwMultisiteController(RESTController):
     @Endpoint(path='/sync-policy')
     @EndpointDoc("Get the sync policy")
     @ReadPermission
-    def get_sync_policy(self, bucket_name = ''):
+    def get_sync_policy(self, bucket_name='', zonegroup_name=''):
         multisite_instance = RgwMultisite()
-        return multisite_instance.get_sync_policy(bucket_name)
+        return multisite_instance.get_sync_policy(bucket_name, zonegroup_name)
 
     @Endpoint(path='/sync-policy-group')
     @EndpointDoc("Get the sync policy group")
@@ -142,14 +143,14 @@ class RgwMultisiteController(RESTController):
     @Endpoint(method='POST', path='/sync-policy-group')
     @EndpointDoc("Create the sync policy group")
     @CreatePermission
-    def create_sync_policy_group(self, group_id: str, status: SyncStatus, bucket_name=''):
+    def create_sync_policy_group(self, group_id: str, status: str, bucket_name=''):
         multisite_instance = RgwMultisite()
         return multisite_instance.create_sync_policy_group(group_id, status, bucket_name)
 
     @Endpoint(method='PUT', path='/sync-policy-group')
     @EndpointDoc("Update the sync policy group")
     @UpdatePermission
-    def update_sync_policy_group(self, group_id: str, status: SyncStatus, bucket_name=''):
+    def update_sync_policy_group(self, group_id: str, status: str, bucket_name=''):
         multisite_instance = RgwMultisite()
         return multisite_instance.update_sync_policy_group(group_id, status, bucket_name)
 
@@ -160,6 +161,52 @@ class RgwMultisiteController(RESTController):
         multisite_instance = RgwMultisite()
         return multisite_instance.remove_sync_policy_group(group_id, bucket_name)
 
+    @Endpoint(method='PUT', path='/sync-flow')
+    @EndpointDoc("Create or update the sync flow")
+    @CreatePermission
+    def create_sync_flow(self, flow_id: str, flow_type: str, group_id: str,
+                         source_zone='', destination_zone='', zones: Optional[List[str]] = None,
+                         bucket_name=''):
+        multisite_instance = RgwMultisite()
+        return multisite_instance.create_sync_flow(group_id, flow_id, flow_type, zones,
+                                                   bucket_name, source_zone, destination_zone)
+
+    @Endpoint(method='DELETE', path='/sync-flow')
+    @EndpointDoc("Remove the sync flow")
+    @DeletePermission
+    def remove_sync_flow(self, flow_id: str, flow_type: str, group_id: str,
+                         source_zone='', destination_zone='', zones: Optional[List[str]] = None,
+                         bucket_name=''):
+        multisite_instance = RgwMultisite()
+        return multisite_instance.remove_sync_flow(group_id, flow_id, flow_type, source_zone,
+                                                   destination_zone, zones, bucket_name)
+
+    @Endpoint(method='PUT', path='/sync-pipe')
+    @EndpointDoc("Create or update the sync pipe")
+    @CreatePermission
+    def create_sync_pipe(self, group_id: str, pipe_id: str,
+                         source_zones: Optional[List[str]] = None,
+                         destination_zones: Optional[List[str]] = None,
+                         destination_buckets: Optional[List[str]] = None,
+                         bucket_name: str = ''):
+        multisite_instance = RgwMultisite()
+        return multisite_instance.create_sync_pipe(group_id, pipe_id, source_zones,
+                                                   destination_zones, destination_buckets,
+                                                   bucket_name)
+
+    @Endpoint(method='DELETE', path='/sync-pipe')
+    @EndpointDoc("Remove the sync pipe")
+    @DeletePermission
+    def remove_sync_pipe(self, group_id: str, pipe_id: str,
+                         source_zones: Optional[List[str]] = None,
+                         destination_zones: Optional[List[str]] = None,
+                         destination_buckets: Optional[List[str]] = None,
+                         bucket_name: str = ''):
+        multisite_instance = RgwMultisite()
+        return multisite_instance.remove_sync_pipe(group_id, pipe_id, source_zones,
+                                                   destination_zones, destination_buckets,
+                                                   bucket_name)
+
 
 @APIRouter('/rgw/daemon', Scope.RGW)
 @APIDoc("RGW Daemon Management API", "RgwDaemon")
index 1c004bd8b3c7006a72a41bab03b4ef1c22de4002..99a370d1426b9d6d8b8a3914df8d1af8881b9f28 100644 (file)
@@ -11058,6 +11058,422 @@ paths:
       - jwt: []
       tags:
       - RgwDaemon
+  /api/rgw/multisite/sync-flow:
+    put:
+      parameters: []
+      requestBody:
+        content:
+          application/json:
+            schema:
+              properties:
+                bucket_name:
+                  default: ''
+                  type: string
+                destination_zone:
+                  default: ''
+                  type: string
+                flow_id:
+                  type: string
+                flow_type:
+                  type: string
+                group_id:
+                  type: string
+                source_zone:
+                  default: ''
+                  type: string
+                zones:
+                  type: string
+              required:
+              - flow_id
+              - flow_type
+              - group_id
+              type: object
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource updated.
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Create or update the sync flow
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-flow/{flow_id}/{flow_type}/{group_id}:
+    delete:
+      parameters:
+      - in: path
+        name: flow_id
+        required: true
+        schema:
+          type: string
+      - in: path
+        name: flow_type
+        required: true
+        schema:
+          type: string
+      - in: path
+        name: group_id
+        required: true
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: source_zone
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: destination_zone
+        schema:
+          type: string
+      - allowEmptyValue: true
+        in: query
+        name: zones
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: bucket_name
+        schema:
+          type: string
+      responses:
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '204':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource deleted.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Remove the sync flow
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-pipe:
+    put:
+      parameters: []
+      requestBody:
+        content:
+          application/json:
+            schema:
+              properties:
+                bucket_name:
+                  default: ''
+                  type: string
+                destination_buckets:
+                  type: string
+                destination_zones:
+                  type: string
+                group_id:
+                  type: string
+                pipe_id:
+                  type: string
+                source_zones:
+                  type: string
+              required:
+              - group_id
+              - pipe_id
+              type: object
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource updated.
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Create or update the sync pipe
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-pipe/{group_id}/{pipe_id}:
+    delete:
+      parameters:
+      - in: path
+        name: group_id
+        required: true
+        schema:
+          type: string
+      - in: path
+        name: pipe_id
+        required: true
+        schema:
+          type: string
+      - allowEmptyValue: true
+        in: query
+        name: source_zones
+        schema:
+          type: string
+      - allowEmptyValue: true
+        in: query
+        name: destination_zones
+        schema:
+          type: string
+      - allowEmptyValue: true
+        in: query
+        name: destination_buckets
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: bucket_name
+        schema:
+          type: string
+      responses:
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '204':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource deleted.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Remove the sync pipe
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-policy:
+    get:
+      parameters:
+      - default: ''
+        in: query
+        name: bucket_name
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: zonegroup_name
+        schema:
+          type: string
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Get the sync policy
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-policy-group:
+    post:
+      parameters: []
+      requestBody:
+        content:
+          application/json:
+            schema:
+              properties:
+                bucket_name:
+                  default: ''
+                  type: string
+                group_id:
+                  type: string
+                status:
+                  type: string
+              required:
+              - group_id
+              - status
+              type: object
+      responses:
+        '201':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource created.
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Create the sync policy group
+      tags:
+      - RgwMultisite
+    put:
+      parameters: []
+      requestBody:
+        content:
+          application/json:
+            schema:
+              properties:
+                bucket_name:
+                  default: ''
+                  type: string
+                group_id:
+                  type: string
+                status:
+                  type: string
+              required:
+              - group_id
+              - status
+              type: object
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource updated.
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Update the sync policy group
+      tags:
+      - RgwMultisite
+  /api/rgw/multisite/sync-policy-group/{group_id}:
+    delete:
+      parameters:
+      - in: path
+        name: group_id
+        required: true
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: bucket_name
+        schema:
+          type: string
+      responses:
+        '202':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Operation is still executing. Please check the task queue.
+        '204':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: Resource deleted.
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Remove the sync policy group
+      tags:
+      - RgwMultisite
+    get:
+      parameters:
+      - in: path
+        name: group_id
+        required: true
+        schema:
+          type: string
+      - default: ''
+        in: query
+        name: bucket_name
+        schema:
+          type: string
+      responses:
+        '200':
+          content:
+            application/vnd.ceph.api.v1.0+json:
+              type: object
+          description: OK
+        '400':
+          description: Operation exception. Please check the response body for details.
+        '401':
+          description: Unauthenticated access. Please login first.
+        '403':
+          description: Unauthorized access. Please check your permissions.
+        '500':
+          description: Unexpected error. Please check the response body for the stack
+            trace.
+      security:
+      - jwt: []
+      summary: Get the sync policy group
+      tags:
+      - RgwMultisite
   /api/rgw/multisite/sync_status:
     get:
       parameters: []
@@ -11078,6 +11494,7 @@ paths:
             trace.
       security:
       - jwt: []
+      summary: Get the sync status
       tags:
       - RgwMultisite
   /api/rgw/realm:
index 8f26861766c3597a201b273d015396bd3d1812c9..fb6c83d60cbe437a25cabd95ecc76c81b69a133e 100644 (file)
@@ -9,8 +9,8 @@ import logging
 import os
 import re
 import xml.etree.ElementTree as ET  # noqa: N814
-from subprocess import SubprocessError
 from enum import Enum
+from subprocess import SubprocessError
 
 from mgr_util import build_url, name_to_config_section
 
@@ -984,12 +984,18 @@ class RgwClient(RestClient):
             raise DashboardException(msg=msg, component='rgw')
         return retention_period_days, retention_period_years
 
+
 class SyncStatus(Enum):
     enabled = 'enabled'
     allowed = 'allowed'
     forbidden = 'forbidden'
 
 
+class SyncFlowTypes(Enum):
+    directional = 'directional'
+    symmetrical = 'symmetrical'
+
+
 class RgwMultisite:
     def migrate_to_multisite(self, realm_name: str, zonegroup_name: str, zone_name: str,
                              zonegroup_endpoints: str, zone_endpoints: str, access_key: str,
@@ -1751,14 +1757,16 @@ class RgwMultisite:
 
         return ''
 
-    def get_sync_policy(self, bucket_name: str = ''):
+    def get_sync_policy(self, bucket_name: str = '', zonegroup_name: str = ''):
         rgw_sync_policy_cmd = ['sync', 'policy', 'get']
         if bucket_name:
             rgw_sync_policy_cmd += ['--bucket', bucket_name]
+        if zonegroup_name:
+            rgw_sync_policy_cmd += ['--rgw-zonegroup', zonegroup_name]
         try:
-            exit_code, out, _ = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            exit_code, out, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to get sync policy',
+                raise DashboardException(f'Unable to get sync policy: {err}',
                                          http_status_code=500, component='rgw')
             return out
         except SubprocessError as error:
@@ -1769,36 +1777,36 @@ class RgwMultisite:
         if bucket_name:
             rgw_sync_policy_cmd += ['--bucket', bucket_name]
         try:
-            exit_code, out, _ = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            exit_code, out, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to get sync policy',
+                raise DashboardException(f'Unable to get sync policy group: {err}',
                                          http_status_code=500, component='rgw')
             return out
         except SubprocessError as error:
             raise DashboardException(error, http_status_code=500, component='rgw')
 
-    def create_sync_policy_group(self, group_id: str, status: SyncStatus, bucket_name: str = ''):
+    def create_sync_policy_group(self, group_id: str, status: str, bucket_name: str = ''):
         rgw_sync_policy_cmd = ['sync', 'group', 'create', '--group-id', group_id,
                                '--status', SyncStatus[status].value]
         if bucket_name:
             rgw_sync_policy_cmd += ['--bucket', bucket_name]
         try:
-            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to create sync policy',
+                raise DashboardException(f'Unable to create sync policy group: {err}',
                                          http_status_code=500, component='rgw')
         except SubprocessError as error:
             raise DashboardException(error, http_status_code=500, component='rgw')
 
-    def update_sync_policy_group(self, group_id: str, status: SyncStatus, bucket_name: str = ''):
+    def update_sync_policy_group(self, group_id: str, status: str, bucket_name: str = ''):
         rgw_sync_policy_cmd = ['sync', 'group', 'modify', '--group-id', group_id,
                                '--status', SyncStatus[status].value]
         if bucket_name:
             rgw_sync_policy_cmd += ['--bucket', bucket_name]
         try:
-            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to update sync policy',
+                raise DashboardException(f'Unable to update sync policy group: {err}',
                                          http_status_code=500, component='rgw')
         except SubprocessError as error:
             raise DashboardException(error, http_status_code=500, component='rgw')
@@ -1808,9 +1816,109 @@ class RgwMultisite:
         if bucket_name:
             rgw_sync_policy_cmd += ['--bucket', bucket_name]
         try:
-            exit_code, _, _ = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            if exit_code > 0:
+                raise DashboardException(f'Unable to remove sync policy group: {err}',
+                                         http_status_code=500, component='rgw')
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def create_sync_flow(self, group_id: str, flow_id: str, flow_type: str,
+                         zones: Optional[List[str]] = None, bucket_name: str = '',
+                         source_zone: str = '', destination_zone: str = ''):
+        rgw_sync_policy_cmd = ['sync', 'group', 'flow', 'create', '--group-id', group_id,
+                               '--flow-id', flow_id, '--flow-type', SyncFlowTypes[flow_type].value]
+
+        if SyncFlowTypes[flow_type].value == 'directional':
+            rgw_sync_policy_cmd += ['--source-zone', source_zone, '--dest-zone', destination_zone]
+        else:
+            if zones:
+                rgw_sync_policy_cmd += ['--zones', ','.join(zones)]
+
+        if bucket_name:
+            rgw_sync_policy_cmd += ['--bucket', bucket_name]
+
+        try:
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            if exit_code > 0:
+                raise DashboardException(f'Unable to create sync flow: {err}',
+                                         http_status_code=500, component='rgw')
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def remove_sync_flow(self, group_id: str, flow_id: str, flow_type: str,
+                         source_zone='', destination_zone='',
+                         zones: Optional[List[str]] = None, bucket_name: str = ''):
+        rgw_sync_policy_cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group_id,
+                               '--flow-id', flow_id, '--flow-type', SyncFlowTypes[flow_type].value]
+
+        if SyncFlowTypes[flow_type].value == 'directional':
+            rgw_sync_policy_cmd += ['--source-zone', source_zone, '--dest-zone', destination_zone]
+        else:
+            if zones:
+                rgw_sync_policy_cmd += ['--zones', ','.join(zones)]
+
+        if bucket_name:
+            rgw_sync_policy_cmd += ['--bucket', bucket_name]
+
+        try:
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            if exit_code > 0:
+                raise DashboardException(f'Unable to remove sync flow: {err}',
+                                         http_status_code=500, component='rgw')
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def create_sync_pipe(self, group_id: str, pipe_id: str,
+                         source_zones: Optional[List[str]] = None,
+                         destination_zones: Optional[List[str]] = None,
+                         destination_buckets: Optional[List[str]] = None, bucket_name: str = ''):
+        rgw_sync_policy_cmd = ['sync', 'group', 'pipe', 'create',
+                               '--group-id', group_id, '--pipe-id', pipe_id]
+
+        if bucket_name:
+            rgw_sync_policy_cmd += ['--bucket', bucket_name]
+
+        if source_zones:
+            rgw_sync_policy_cmd += ['--source-zones', ','.join(source_zones)]
+
+        if destination_zones:
+            rgw_sync_policy_cmd += ['--dest-zones', ','.join(destination_zones)]
+
+        if destination_buckets:
+            rgw_sync_policy_cmd += ['--dest-bucket', ','.join(destination_buckets)]
+
+        try:
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
+            if exit_code > 0:
+                raise DashboardException(f'Unable to create sync pipe: {err}',
+                                         http_status_code=500, component='rgw')
+        except SubprocessError as error:
+            raise DashboardException(error, http_status_code=500, component='rgw')
+
+    def remove_sync_pipe(self, group_id: str, pipe_id: str,
+                         source_zones: Optional[List[str]] = None,
+                         destination_zones: Optional[List[str]] = None,
+                         destination_buckets: Optional[List[str]] = None, bucket_name: str = ''):
+        rgw_sync_policy_cmd = ['sync', 'group', 'pipe', 'remove',
+                               '--group-id', group_id, '--pipe-id', pipe_id]
+
+        if bucket_name:
+            rgw_sync_policy_cmd += ['--bucket', bucket_name]
+
+        if source_zones:
+            rgw_sync_policy_cmd += ['--source-zones', ','.join(source_zones)]
+
+        if destination_zones:
+            rgw_sync_policy_cmd += ['--dest-zones', ','.join(destination_zones)]
+
+        if destination_buckets:
+            rgw_sync_policy_cmd += ['--dest-bucket', ','.join(destination_buckets)]
+
+        try:
+            exit_code, _, err = mgr.send_rgwadmin_command(rgw_sync_policy_cmd)
             if exit_code > 0:
-                raise DashboardException('Unable to remove sync policy',
+                raise DashboardException(f'Unable to remove sync pipe: {err}',
                                          http_status_code=500, component='rgw')
         except SubprocessError as error:
             raise DashboardException(error, http_status_code=500, component='rgw')