]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: trigger notifications on changes from any user 40029/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 17 Nov 2020 11:31:59 +0000 (13:31 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 14 Mar 2021 11:50:38 +0000 (13:50 +0200)
any user authorized to make changes to a bucket may trigger
notifications defined on that bucket.
manual test procedure of the fix is described here:
https://gist.github.com/yuvalif/39c183aa0f74d286ecef7844268817df

Fixes: https://tracker.ceph.com/issues/48461
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit 658993efc16498c2106108fe407f6d44040f349d)

Conflics:
src/test/rgw/rgw_multi/tests_ps.py
manual changes were made to the tests so they can pass. this does not affect teuthology runs

12 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/rgw_admin.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_pubsub_common.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests_ps.py

index 3c0945f5cab6574098f78aa70b8db4b2c6c21a6c..3e5d4fcdaaed4bd9f3d3d218bdc26ca5fd4bb599 100644 (file)
@@ -9,11 +9,11 @@ Bucket Notifications
 Bucket notifications provide a mechanism for sending information out of the radosgw when certain events are happening on the bucket.
 Currently, notifications could be sent to: HTTP, AMQP0.9.1 and Kafka endpoints.
 
-Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint, 
+Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
 the `PubSub Module`_ should be used instead of the bucket notification mechanism.
 
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only associate them with buckets it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
 
 In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
@@ -39,9 +39,9 @@ The same counters are shared between the pubsub sync module and the bucket notif
 - ``pubsub_push_fail``: running counter, for all notifications, of events failed to be pushed to their endpoint
 - ``pubsub_push_pending``: gauge value of events pushed to an endpoint but not acked or nacked yet
 
-.. note:: 
+.. note::
 
-    ``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while: 
+    ``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
     ``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per push action on each notification.
 
 Bucket Notification REST API
@@ -58,14 +58,13 @@ when a notification is created.
 Upon a successful request, the response will include the topic ARN that could be later used to reference this topic in the notification request.
 To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.
 
-.. tip:: Any notification already associated with the topic needs to be re-created for the topic update to take effect 
+.. tip:: Any notification already associated with the topic needs to be re-created for the topic update to take effect
 
 ::
 
    POST
    Action=CreateTopic
    &Name=<topic-name>
-   &push-endpoint=<endpoint>
    [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
    [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
    [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
@@ -73,13 +72,14 @@ To update a topic, use the same command used for topic creation, with the topic
    [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
    [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
    [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
+   [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
-- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic
 
-- HTTP endpoint 
+- HTTP endpoint
 
  - URI: ``http[s]://<fqdn>[:<port]``
  - port defaults to: 80/443 for HTTP/S accordingly
@@ -89,30 +89,32 @@ Request parameters:
 
  - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
  - user/password defaults to: guest/guest
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
  - port defaults to: 5672
  - vhost defaults to: "/"
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
  - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
   - "routable": message is considered "delivered" if broker can route to a consumer
 
-- Kafka endpoint 
+.. tip:: The topic-name is used for the AMQP topic ("routing key" for a topic exchange)
+
+- Kafka endpoint
 
  - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
  - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
  - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
+ - user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
+ - user/password may only be provided together with ``use-ssl``, if not, the connection to the broker would fail.
  - port defaults to: 9092
  - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
 
-.. note:: 
+.. note::
 
     - The key/value of a specific parameter does not have to reside in the same line, or in any specific order, but must use the same index
     - Attribute indexing does not need to be sequential or start from any specific value
@@ -129,14 +131,14 @@ The response will have the following format:
         <ResponseMetadata>
             <RequestId></RequestId>
         </ResponseMetadata>
-    </CreateTopicResponse>    
+    </CreateTopicResponse>
 
 The topic ARN in the response will have the following format:
 
 ::
 
    arn:aws:sns:<zone-group>:<tenant>:<topic>
+
 Get Topic Information
 `````````````````````
 
@@ -145,7 +147,9 @@ Returns information about specific topic. This includes push-endpoint informatio
 ::
 
    POST
-   Action=GetTopic&TopicArn=<topic-arn>
+
+   Action=GetTopic
+   &TopicArn=<topic-arn>
 
 Response will have the following format:
 
@@ -168,13 +172,13 @@ Response will have the following format:
         <ResponseMetadata>
             <RequestId></RequestId>
         </ResponseMetadata>
-    </GetTopicResponse>    
+    </GetTopicResponse>
 
 - User: name of the user that created the topic
 - Name: name of the topic
-- EndPoinjtAddress: the push-endpoint URL
-- if endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not 
-- EndPointArgs: the push-endpoint args
+- EndpointAddress: the push-endpoint URL
+- if endpoint URL contain user/password information, request must be made over HTTPS. If not, topic get request will be rejected.
+- EndpointArgs: the push-endpoint args
 - EndpointTopic: the topic name that should be sent to the endpoint (mat be different than the above topic name)
 - TopicArn: topic ARN
 
@@ -184,7 +188,9 @@ Delete Topic
 ::
 
    POST
-   Action=DeleteTopic&TopicArn=<topic-arn>
+
+   Action=DeleteTopic
+   &TopicArn=<topic-arn>
 
 Delete the specified topic. Note that deleting a deleted topic should result with no-op and not a failure.
 
@@ -196,18 +202,19 @@ The response will have the following format:
         <ResponseMetadata>
             <RequestId></RequestId>
         </ResponseMetadata>
-    </DeleteTopicResponse>    
+    </DeleteTopicResponse>
 
 List Topics
 ```````````
 
-List all topics that user defined.
+List all topics associated with a tenant.
 
 ::
 
-   POST 
+   POST
+
    Action=ListTopics
+
 Response will have the following format:
 
 ::
@@ -231,19 +238,18 @@ Response will have the following format:
         <ResponseMetadata>
             <RequestId></RequestId>
         </ResponseMetadata>
-    </ListTopicsResponse>    
+    </ListTopicsResponse>
 
-- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not 
+- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. If not, topic list request will be rejected.
 
 Notifications
 ~~~~~~~~~~~~~
 
 Detailed under: `Bucket Operations`_.
 
-.. note:: 
+.. note::
 
     - "Abort Multipart Upload" request does not emit a notification
-    - "Delete Multiple Objects" request does not emit a notification
     - Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
 
 
@@ -251,50 +257,50 @@ Events
 ~~~~~~
 
 The events are in JSON format (regardless of the actual endpoint), and share the same structure as the S3-compatible events
-pushed or pulled using the pubsub sync module.
+pushed or pulled using the pubsub sync module. For example:
 
 ::
 
-   {"Records":[  
+   {"Records":[
        {
-           "eventVersion":"2.1"
-           "eventSource":"aws:s3",
-           "awsRegion":"",
-           "eventTime":"",
-           "eventName":"",
-           "userIdentity":{  
-               "principalId":""
+           "eventVersion":"2.1",
+           "eventSource":"ceph:s3",
+           "awsRegion":"us-east-1",
+           "eventTime":"2019-11-22T13:47:35.124724Z",
+           "eventName":"s3:ObjectCreated:Put",
+           "userIdentity":{
+               "principalId":"tester"
            },
            "requestParameters":{
                "sourceIPAddress":""
            },
            "responseElements":{
-               "x-amz-request-id":"",
-               "x-amz-id-2":""
+               "x-amz-request-id":"503a4c37-85eb-47cd-8681-2817e80b4281.5330.903595",
+               "x-amz-id-2":"14d2-zone1-zonegroup1"
            },
            "s3":{
                "s3SchemaVersion":"1.0",
-               "configurationId":"",
+               "configurationId":"mynotif1",
                "bucket":{
-                   "name":"",
+                   "name":"mybucket1",
                    "ownerIdentity":{
-                       "principalId":""
+                       "principalId":"tester"
                    },
-                   "arn":"",
-                   "id:""
+                   "arn":"arn:aws:s3:us-east-1::mybucket1",
+                   "id":"503a4c37-85eb-47cd-8681-2817e80b4281.5332.38"
                },
                "object":{
-                   "key":"",
-                   "size":"",
-                   "eTag":"",
+                   "key":"myimage1.jpg",
+                   "size":"1024",
+                   "eTag":"37b51d194a7513e45b56f6524f2d51f2",
                    "versionId":"",
-                   "sequencer": "",
+                   "sequencer": "F7E6D75DC742D108",
                    "metadata":[],
                    "tags":[]
                }
            },
            "eventId":"",
-           "opaqueData":"",
+           "opaqueData":"me@example.com"
        }
    ]}
 
@@ -303,8 +309,8 @@ pushed or pulled using the pubsub sync module.
 - eventName: for list of supported events see: `S3 Notification Compatibility`_
 - userIdentity.principalId: user that triggered the change
 - requestParameters.sourceIPAddress: not supported
-- responseElements.x-amz-request-id: request ID of the original change 
-- responseElements.x_amz_id_2: RGW on which the change was made 
+- responseElements.x-amz-request-id: request ID of the original change
+- responseElements.x_amz_id_2: RGW on which the change was made
 - s3.configurationId: notification ID that created the event
 - s3.bucket.name: name of the bucket
 - s3.bucket.ownerIdentity.principalId: owner of the bucket
@@ -315,10 +321,10 @@ pushed or pulled using the pubsub sync module.
 - s3.object.eTag: object etag
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
-- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
-- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
+- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API)
+- s3.object.tags: any tags set on the object (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
-- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
index fd3b9f021e64e48e0313bcac1906e16305f60cfb..377afca6dbcc53a83566b80f2ad00e8e16e9b506 100644 (file)
@@ -15,9 +15,8 @@ A push notification mechanism exists too, currently supporting HTTP,
 AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
 and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module. 
 
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only subscribe to events published by buckets
-it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
 
 In order to publish events for specific bucket a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
@@ -31,7 +30,7 @@ mechanisms. This API has two flavors, one is S3-compatible and one is not. The t
 together, although it is recommended to use the S3-compatible one. 
 The S3-compatible API is similar to the one used in the bucket notification mechanism.
 
-Events are stored as RGW objects in a special bucket, under a special user. Events cannot
+Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
 be accessed directly, but need to be pulled and acked using the new REST API.
 
 .. toctree::
@@ -42,7 +41,7 @@ be accessed directly, but need to be pulled and acked using the new REST API.
 PubSub Zone Configuration
 -------------------------
 
-The pubsub sync module requires the creation of a new zone in a `Multisite`_ environment.
+The pubsub sync module requires the creation of a new zone in a :ref:`multisite` environment...
 First, a master zone must exist (see: :ref:`master-zone-label`), 
 then a secondary zone should be created (see :ref:`secondary-zone-label`).
 In the creation of the secondary zone, its tier type must be set to ``pubsub``:
@@ -172,13 +171,15 @@ The endpoint URI may include parameters depending with the type of endpoint:
  - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
  - port defaults to: 5672
  - vhost defaults to: "/"
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
  - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
   - "routable": message is considered "delivered" if broker can route to a consumer
 
+.. tip:: The topic-name is used for the AMQP topic ("routing key" for a topic exchange)
+
 - Kafka endpoint 
 
  - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
@@ -251,7 +252,7 @@ Delete the specified topic.
 List Topics
 ```````````
 
-List all topics that user defined.
+List all topics associated with a tenant.
 
 ::
 
@@ -581,6 +582,5 @@ Request parameters:
 
 - event-id: id of event to be acked
 
-.. _Multisite : ../multisite
 .. _Bucket Notification : ../notifications
 .. _Bucket Operations: ../s3/bucketops
index a6eef9a54205c8527e94022b42bff5595d004190..a42c5b332bf5cbec83cd2685f8f0db3447862658 100644 (file)
@@ -9026,13 +9026,8 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
 
-    RGWUserPubSub ups(store, user_info.user_id);
+    RGWPubSub ps(store, tenant);
 
     rgw_bucket bucket;
 
@@ -9045,7 +9040,7 @@ next:
         return -ret;
       }
 
-      auto b = ups.get_bucket(bucket_info.bucket);
+      auto b = ps.get_bucket(bucket_info.bucket);
       ret = b->get_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
@@ -9053,8 +9048,8 @@ next:
       }
       encode_json("result", result, formatter);
     } else {
-      rgw_pubsub_user_topics result;
-      int ret = ups.get_user_topics(&result);
+      rgw_pubsub_topics result;
+      int ret = ps.get_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
         return -ret;
@@ -9073,14 +9068,9 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+    RGWPubSub ps(store, tenant);
 
-    ret = ups.create_topic(topic_name);
+    ret = ps.create_topic(topic_name);
     if (ret < 0) {
       cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -9096,15 +9086,11 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     rgw_pubsub_topic_subs topic;
-    ret = ups.get_topic(topic_name, &topic);
+    ret = ps.get_topic(topic_name, &topic);
     if (ret < 0) {
       cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -9122,16 +9108,12 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     rgw_bucket bucket;
 
@@ -9142,7 +9124,7 @@ next:
       return -ret;
     }
 
-    auto b = ups.get_bucket(bucket_info.bucket);
+    auto b = ps.get_bucket(bucket_info.bucket);
     ret = b->create_notification(topic_name, event_types);
     if (ret < 0) {
       cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
@@ -9159,16 +9141,11 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+    RGWPubSub ups(store, tenant);
 
     rgw_bucket bucket;
 
@@ -9196,14 +9173,10 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    ret = ups.remove_topic(topic_name);
+    RGWPubSub ps(store, tenant);
+
+    ret = ps.remove_topic(topic_name);
     if (ret < 0) {
       cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -9215,20 +9188,16 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     rgw_pubsub_sub_config sub_conf;
 
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ps.get_sub(sub_name);
     ret = sub->get_conf(&sub_conf);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
@@ -9243,10 +9212,6 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
       return EINVAL;
@@ -9255,11 +9220,10 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+    RGWPubSub ps(store, tenant);
 
     rgw_pubsub_topic_subs topic;
-    int ret = ups.get_topic(topic_name, &topic);
+    int ret = ps.get_topic(topic_name, &topic);
     if (ret < 0) {
       cerr << "ERROR: topic not found" << std::endl;
       return EINVAL;
@@ -9274,12 +9238,12 @@ next:
     auto conf = psmodule->get_effective_conf();
 
     if (dest_config.bucket_name.empty()) {
-      dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
+      dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_id.to_str() + "-" + topic.topic.name;
     }
     if (dest_config.oid_prefix.empty()) {
       dest_config.oid_prefix = conf["data_oid_prefix"];
     }
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ps.get_sub(sub_name);
     ret = sub->subscribe(topic_name, dest_config);
     if (ret < 0) {
       cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
@@ -9292,18 +9256,14 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    auto sub = ups.get_sub(sub_name);
+    RGWPubSub ps(store, tenant);
+
+    auto sub = ps.get_sub(sub_name);
     ret = sub->unsubscribe(topic_name);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
@@ -9316,21 +9276,17 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     if (!max_entries_specified) {
-      max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
+      max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
     }
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ps.get_sub_with_events(sub_name);
     ret = sub->list_events(marker, max_entries);
     if (ret < 0) {
       cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
@@ -9345,10 +9301,6 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
       return EINVAL;
@@ -9357,10 +9309,10 @@ next:
       cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    auto sub = ups.get_sub(sub_name);
+    RGWPubSub ps(store, tenant);
+
+    auto sub = ps.get_sub_with_events(sub_name);
     ret = sub->remove_event(event_id);
     if (ret < 0) {
       cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
index 25b5c533bfdfd5e444a4c3da137bb5e0fb35f0cb..7945d051f762af0d2a636d7a92909eb4efd16fd0 100644 (file)
@@ -70,8 +70,8 @@ int publish(const req_state* s,
         const std::string& etag, 
         EventType event_type,
         rgw::sal::RGWRadosStore* store) {
-    RGWUserPubSub ps_user(store, s->user->get_id());
-    RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
+    RGWPubSub ps_user(store, s->user->get_id().tenant);
+    RGWPubSub::Bucket ps_bucket(&ps_user, s->bucket);
     rgw_pubsub_bucket_topics bucket_topics;
     auto rc = ps_bucket.get_topics(&bucket_topics);
     if (rc < 0) {
index 947578b2831a1a75b2ed9273f2100235569bdabd..ca58a7f0384f41b43bf1a29226e4f681844d2ef0 100644 (file)
@@ -341,7 +341,7 @@ void rgw_pubsub_bucket_topics::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_topics::dump(Formatter *f) const
+void rgw_pubsub_topics::dump(Formatter *f) const
 {
   Formatter::ArraySection s(*f, "topics");
   for (auto& t : topics) {
@@ -349,7 +349,7 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+void rgw_pubsub_topics::dump_xml(Formatter *f) const
 {
   for (auto& t : topics) {
     encode_xml("member", t.second.topic, f);
@@ -381,14 +381,15 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const
   encode_json("s3_id", s3_id, f);
 }
 
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) : 
+RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) : 
                             store(_store),
-                            user(_user),
+                            tenant(_tenant),
                             obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
-    get_user_meta_obj(&user_meta_obj);
+    get_meta_obj(&meta_obj);
 }
 
-int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::remove(const rgw_raw_obj& obj,
+                         RGWObjVersionTracker *objv_tracker)
 {
   int ret = rgw_delete_system_obj(store->svc()->sysobj, obj.pool, obj.oid, objv_tracker);
   if (ret < 0) {
@@ -398,9 +399,9 @@ int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tra
   return 0;
 }
 
-int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
 {
-  int ret = read(user_meta_obj, result, objv_tracker);
+  int ret = read(meta_obj, result, objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -408,9 +409,10 @@ int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersio
   return 0;
 }
 
-int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::write_topics(const rgw_pubsub_topics& topics,
+                                    RGWObjVersionTracker *objv_tracker)
 {
-  int ret = write(user_meta_obj, topics, objv_tracker);
+  int ret = write(meta_obj, topics, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -418,12 +420,12 @@ int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWOb
   return 0;
 }
 
-int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
+int RGWPubSub::get_topics(rgw_pubsub_topics *result)
 {
-  return read_user_topics(result, nullptr);
+  return read_topics(result, nullptr);
 }
 
-int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->read(bucket_meta_obj, result, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
@@ -433,7 +435,8 @@ int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjV
   return 0;
 }
 
-int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
+                                       RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
   if (ret < 0) {
@@ -444,15 +447,15 @@ int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
   return 0;
 }
 
-int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
+int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
 {
   return read_topics(result, nullptr);
 }
 
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
 {
-  rgw_pubsub_user_topics topics;
-  int ret = get_user_topics(&topics);
+  rgw_pubsub_topics topics;
+  int ret = get_topics(&topics);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -468,10 +471,10 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
   return 0;
 }
 
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
 {
-  rgw_pubsub_user_topics topics;
-  int ret = get_user_topics(&topics);
+  rgw_pubsub_topics topics;
+  int ret = get_topics(&topics);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -487,15 +490,15 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
   return 0;
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
-    return create_notification(topic_name, events, std::nullopt, "");
+int RGWPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
+  return create_notification(topic_name, events, std::nullopt, "");
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
-  rgw_pubsub_topic_subs user_topic_info;
+int RGWPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
+  rgw_pubsub_topic_subs topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->get_topic(topic_name, &user_topic_info);
+  int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
     return ret;
@@ -515,7 +518,7 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const r
     bucket.name << "'" << dendl;
 
   auto& topic_filter = bucket_topics.topics[topic_name];
-  topic_filter.topic = user_topic_info.topic;
+  topic_filter.topic = topic_info.topic;
   topic_filter.events = events;
   topic_filter.s3_id = notif_name;
   if (s3_filter) {
@@ -533,12 +536,12 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const r
   return 0;
 }
 
-int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
+int RGWPubSub::Bucket::remove_notification(const string& topic_name)
 {
-  rgw_pubsub_topic_subs user_topic_info;
+  rgw_pubsub_topic_subs topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->get_topic(topic_name, &user_topic_info);
+  int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
     return ret;
@@ -564,15 +567,15 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
   return 0;
 }
 
-int RGWUserPubSub::create_topic(const string& name) {
+int RGWPubSub::create_topic(const string& name) {
   return create_topic(name, rgw_pubsub_sub_dest(), "", "");
 }
 
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
+int RGWPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = read_user_topics(&topics, &objv_tracker);
+  int ret = read_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     // its not an error if not topics exist, we create one
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
@@ -580,13 +583,13 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   }
  
   rgw_pubsub_topic_subs& new_topic = topics.topics[name];
-  new_topic.topic.user = user;
+  new_topic.topic.user = rgw_user("", tenant);
   new_topic.topic.name = name;
   new_topic.topic.dest = dest;
   new_topic.topic.arn = arn;
   new_topic.topic.opaque_data = opaque_data;
 
-  ret = write_user_topics(topics, &objv_tracker);
+  ret = write_topics(topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -595,12 +598,12 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   return 0;
 }
 
-int RGWUserPubSub::remove_topic(const string& name)
+int RGWPubSub::remove_topic(const string& name)
 {
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = read_user_topics(&topics, &objv_tracker);
+  int ret = read_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -612,7 +615,7 @@ int RGWUserPubSub::remove_topic(const string& name)
 
   topics.topics.erase(name);
 
-  ret = write_user_topics(topics, &objv_tracker);
+  ret = write_topics(topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
     return ret;
@@ -621,7 +624,7 @@ int RGWUserPubSub::remove_topic(const string& name)
   return 0;
 }
 
-int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->read(sub_meta_obj, result, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
@@ -631,7 +634,8 @@ int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTra
   return 0;
 }
 
-int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
+                                 RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
   if (ret < 0) {
@@ -642,7 +646,7 @@ int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjV
   return 0;
 }
 
-int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->remove(sub_meta_obj, objv_tracker);
   if (ret < 0) {
@@ -653,18 +657,18 @@ int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
   return 0;
 }
 
-int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
+int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
 {
   return read_sub(result, nullptr);
 }
 
-int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
+int RGWPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
 {
-  RGWObjVersionTracker user_objv_tracker;
-  rgw_pubsub_user_topics topics;
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_topics topics;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->read_user_topics(&topics, &user_objv_tracker);
+  int ret = ps->read_topics(&topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret != -ENOENT ? ret : -EINVAL;
@@ -680,7 +684,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
 
   rgw_pubsub_sub_config sub_conf;
 
-  sub_conf.user = ps->user;
+  sub_conf.user = rgw_user("", ps->tenant);
   sub_conf.name = sub;
   sub_conf.topic = topic;
   sub_conf.dest = dest;
@@ -688,7 +692,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
 
   t.subs.insert(sub);
 
-  ret = ps->write_user_topics(topics, &user_objv_tracker);
+  ret = ps->write_topics(topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -702,7 +706,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
   return 0;
 }
 
-int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
+int RGWPubSub::Sub::unsubscribe(const string& _topic)
 {
   string topic = _topic;
   RGWObjVersionTracker sobjv_tracker;
@@ -719,9 +723,9 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
   }
 
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = ps->read_user_topics(&topics, &objv_tracker);
+  int ret = ps->read_topics(&topics, &objv_tracker);
   if (ret < 0) {
     // not an error - could be that topic was already deleted
     ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@@ -732,7 +736,7 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
 
       t.subs.erase(sub);
 
-      ret = ps->write_user_topics(topics, &objv_tracker);
+      ret = ps->write_topics(topics, &objv_tracker);
       if (ret < 0) {
         ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
         return ret;
@@ -749,7 +753,7 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
 }
 
 template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
+void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
 {
   encode_json("next_marker", next_marker, f);
   encode_json("is_truncated", is_truncated, f);
@@ -761,7 +765,7 @@ void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter
 }
 
 template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
+int RGWPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
 {
   RGWRados *store = ps->store->getRados();
   rgw_pubsub_sub_config sub_conf;
@@ -826,7 +830,7 @@ int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, i
 }
 
 template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
+int RGWPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
 {
   rgw::sal::RGWRadosStore *store = ps->store;
   rgw_pubsub_sub_config sub_conf;
@@ -864,25 +868,25 @@ int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id
   return 0;
 }
 
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
+  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
 }
 
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
 }
 
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
 }
 
 template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
+void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
   list.dump(f);
 }
 
 // explicit instantiation for the only two possible types
 // no need to move implementation to header
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
 
index 256c5918585b7e6e22ebf7b548f26cf78fad0763..db7e7901027055c23ea1b253d5a8f9220b52a28e 100644 (file)
@@ -467,7 +467,7 @@ struct rgw_pubsub_topic {
   }
 
   string to_str() const {
-    return user.to_str() + "/" + name;
+    return user.tenant + "/" + name;
   }
 
   void dump(Formatter *f) const;
@@ -560,7 +560,7 @@ struct rgw_pubsub_bucket_topics {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
 
-struct rgw_pubsub_user_topics {
+struct rgw_pubsub_topics {
   std::map<std::string, rgw_pubsub_topic_subs> topics;
 
   void encode(bufferlist& bl) const {
@@ -578,62 +578,65 @@ struct rgw_pubsub_user_topics {
   void dump(Formatter *f) const;
   void dump_xml(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+WRITE_CLASS_ENCODER(rgw_pubsub_topics)
 
-static std::string pubsub_user_oid_prefix = "pubsub.user.";
+static std::string pubsub_oid_prefix = "pubsub.";
 
-class RGWUserPubSub
+class RGWPubSub
 {
   friend class Bucket;
 
   rgw::sal::RGWRadosStore *store;
-  rgw_user user;
+  const std::string tenant;
   RGWSysObjectCtx obj_ctx;
 
-  rgw_raw_obj user_meta_obj;
+  rgw_raw_obj meta_obj;
 
-  std::string user_meta_oid() const {
-    return pubsub_user_oid_prefix + user.to_str();
+  std::string meta_oid() const {
+    return pubsub_oid_prefix + tenant;
   }
 
   std::string bucket_meta_oid(const rgw_bucket& bucket) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+    return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id;
   }
 
   std::string sub_meta_oid(const string& name) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+    return pubsub_oid_prefix + tenant + ".sub." + name;
   }
 
   template <class T>
-  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+  int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
 
   template <class T>
-  int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
+  int write(const rgw_raw_obj& obj, const T& info,
+           RGWObjVersionTracker* obj_tracker);
 
-  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+  int remove(const rgw_raw_obj& obj, RGWObjVersionTrackerobjv_tracker);
 
-  int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
-  int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
+  int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+  int write_topics(const rgw_pubsub_topics& topics,
+                       RGWObjVersionTracker* objv_tracker);
 
 public:
-  RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user);
+  RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant);
 
   class Bucket {
-    friend class RGWUserPubSub;
-    RGWUserPubSub *ps;
+    friend class RGWPubSub;
+    RGWPubSub *ps;
     rgw_bucket bucket;
     rgw_raw_obj bucket_meta_obj;
 
     // read the list of topics associated with a bucket and populate into result
     // use version tacker to enforce atomicity between read/write
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
-    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTrackerobjv_tracker);
     // set the list of topics associated with a bucket
     // use version tacker to enforce atomicity between read/write
     // return 0 on success, error code otherwise
-    int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
+    int write_topics(const rgw_pubsub_bucket_topics& topics,
+                    RGWObjVersionTracker* objv_tracker);
   public:
-    Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+    Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
       ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
     }
 
@@ -657,17 +660,18 @@ public:
 
   // base class for subscription
   class Sub {
-    friend class RGWUserPubSub;
+    friend class RGWPubSub;
   protected:
-    RGWUserPubSub* const ps;
+    RGWPubSub* const ps;
     const std::string sub;
     rgw_raw_obj sub_meta_obj;
 
-    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
-    int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
-    int remove_sub(RGWObjVersionTracker *objv_tracker);
+    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
+    int write_sub(const rgw_pubsub_sub_config& sub_conf,
+                 RGWObjVersionTracker* objv_tracker);
+    int remove_sub(RGWObjVersionTracker* objv_tracker);
   public:
-    Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
+    Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
       ps->get_sub_meta_obj(sub, &sub_meta_obj);
     }
 
@@ -696,7 +700,7 @@ public:
     } list;
 
   public:
-    SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
+    SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
 
     virtual ~SubWithEvents() = default;
     
@@ -728,14 +732,14 @@ public:
     return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
   }
 
-  void get_user_meta_obj(rgw_raw_obj *obj) const;
+  void get_meta_obj(rgw_raw_obj *obj) const;
   void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
 
   void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
 
-  // get all topics defined for the user and populate them into "result"
+  // get all topics (per tenant, if used)) and populate them into "result"
   // return 0 on success or if no topics exist, error code otherwise
-  int get_user_topics(rgw_pubsub_user_topics *result);
+  int get_topics(rgw_pubsub_topics *result);
   // get a topic with its subscriptions by its name and populate it into "result"
   // return -ENOENT if the topic does not exists 
   // return 0 on success, error code otherwise
@@ -760,7 +764,7 @@ public:
 
 
 template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
 {
   bufferlist bl;
   int ret = rgw_get_system_obj(obj_ctx,
@@ -783,7 +787,8 @@ int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker
 }
 
 template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::write(const rgw_raw_obj& obj, const T& info,
+                        RGWObjVersionTracker* objv_tracker)
 {
   bufferlist bl;
   encode(info, bl);
index 65f783a293ef4b5b9f3a8373484e6124b8654fa2..8c757b7c9c6e4ed82a88e143fb3390834f341b54 100644 (file)
@@ -429,8 +429,8 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
   std::string data_bucket_prefix = "";
   std::string data_oid_prefix = "";
@@ -476,7 +476,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
 
     // get topic information. destination information is stored in the topic
     rgw_pubsub_topic topic_info;  
-    op_ret = ups->get_topic(topic_name, &topic_info);
+    op_ret = ps->get_topic(topic_name, &topic_info);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
@@ -491,7 +491,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
+    op_ret = ps->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
@@ -505,7 +505,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
       ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
       // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name);
+      ps->remove_topic(unique_topic_name);
       return;
     }
     ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
@@ -515,14 +515,14 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
       rgw_pubsub_sub_dest dest = topic_info.dest;
       dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
       dest.oid_prefix = data_oid_prefix + notif_name + "/";
-      auto sub = ups->get_sub(notif_name);
+      auto sub = ps->get_sub(notif_name);
       op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
       if (op_ret < 0) {
         ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
         // rollback generated notification (ignore return value)
         b->remove_notification(unique_topic_name);
         // rollback generated topic (ignore return value)
-        ups->remove_topic(unique_topic_name);
+        ps->remove_topic(unique_topic_name);
         return;
       }
       ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
@@ -550,12 +550,12 @@ private:
     return 0;
   }
 
-  void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
+  void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b) {
     op_ret = b->remove_notification(topic_name);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
-    op_ret = ups->remove_topic(topic_name);
+    op_ret = ps->remove_topic(topic_name);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
@@ -572,8 +572,8 @@ void RGWPSDeleteNotif_ObjStore_S3::execute() {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
 
   // get all topics on a bucket
@@ -590,7 +590,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute() {
     if (unique_topic) {
       // remove the auto generated subscription according to notification name (if exist)
       const auto unique_topic_name = unique_topic->get().topic.name;
-      auto sub = ups->get_sub(notif_name);
+      auto sub = ps->get_sub(notif_name);
       op_ret = sub->unsubscribe(unique_topic_name);
       if (op_ret < 0 && op_ret != -ENOENT) {
         ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
@@ -608,9 +608,9 @@ void RGWPSDeleteNotif_ObjStore_S3::execute() {
   for (const auto& topic : bucket_topics.topics) {
     // remove the auto generated subscription of the topic (if exist)
     rgw_pubsub_topic_subs topic_subs;
-    op_ret = ups->get_topic(topic.first, &topic_subs);
+    op_ret = ps->get_topic(topic.first, &topic_subs);
     for (const auto& topic_sub_name : topic_subs.subs) {
-      auto sub = ups->get_sub(topic_sub_name);
+      auto sub = ps->get_sub(topic_sub_name);
       rgw_pubsub_sub_config sub_conf;
       op_ret = sub->get_conf(&sub_conf);
       if (op_ret < 0) {
@@ -671,8 +671,8 @@ public:
 };
 
 void RGWPSListNotifs_ObjStore_S3::execute() {
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
   
   // get all topics on a bucket
index ba09a3073dc0d845ba86def68acdf1b70f9e989a..3957a584653000d71eed9549376741a6bfca028c 100644 (file)
@@ -39,7 +39,7 @@ bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
     return topic.topic.dest.stored_secret;
 }
 
-bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
     for (const auto& topic : topics.topics) {
         if (topic_has_endpoint_secret(topic.second)) return true;
     }
@@ -51,8 +51,8 @@ void RGWPSCreateTopicOp::execute() {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->create_topic(topic_name, dest, topic_arn, opaque_data);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -61,8 +61,8 @@ void RGWPSCreateTopicOp::execute() {
 }
 
 void RGWPSListTopicsOp::execute() {
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->get_user_topics(&result);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->get_topics(&result);
   // if there are no topics it is not considered an error
   op_ret = op_ret == -ENOENT ? 0 : op_ret;
   if (op_ret < 0) {
@@ -82,8 +82,8 @@ void RGWPSGetTopicOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->get_topic(topic_name, &result);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->get_topic(topic_name, &result);
   if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
     ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
     op_ret = -EPERM;
@@ -101,8 +101,8 @@ void RGWPSDeleteTopicOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->remove_topic(topic_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->remove_topic(topic_name);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
     return;
@@ -115,8 +115,8 @@ void RGWPSCreateSubOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->subscribe(topic_name, dest);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -130,8 +130,8 @@ void RGWPSGetSubOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->get_conf(&result);
   if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
     ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
@@ -150,8 +150,8 @@ void RGWPSDeleteSubOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->unsubscribe(topic_name);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -165,8 +165,8 @@ void RGWPSAckSubEventOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub_with_events(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub_with_events(sub_name);
   op_ret = sub->remove_event(event_id);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -180,8 +180,8 @@ void RGWPSPullSubEventsOp::execute() {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  sub = ups->get_sub_with_events(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  sub = ps->get_sub_with_events(sub_name);
   if (!sub) {
     op_ret = -ENOENT;
     ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
index d472fa4076d9818c886a3ab43c71b35d21bbece8..e6d85d0846d9771daa1e51808fc0021cbff1f072 100644 (file)
@@ -14,7 +14,7 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext
 // create a topic
 class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   std::string topic_name;
   rgw_pubsub_sub_dest dest;
   std::string topic_arn;
@@ -39,8 +39,8 @@ public:
 // list all topics
 class RGWPSListTopicsOp : public RGWOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
-  rgw_pubsub_user_topics result;
+  std::optional<RGWPubSub> ps;
+  rgw_pubsub_topics result;
 
 public:
   int verify_permission() override {
@@ -60,7 +60,7 @@ public:
 class RGWPSGetTopicOp : public RGWOp {
 protected:
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_topic_subs result;
   
   virtual int get_params() = 0;
@@ -83,7 +83,7 @@ public:
 class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
 protected:
   string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -106,7 +106,7 @@ class RGWPSCreateSubOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_sub_dest dest;
   
   virtual int get_params() = 0;
@@ -129,7 +129,7 @@ public:
 class RGWPSGetSubOp : public RGWOp {
 protected:
   std::string sub_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_sub_config result;
   
   virtual int get_params() = 0;
@@ -153,7 +153,7 @@ class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -176,7 +176,7 @@ class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string event_id;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -204,8 +204,8 @@ protected:
   int max_entries{0};
   std::string sub_name;
   std::string marker;
-  std::optional<RGWUserPubSub> ups;
-  RGWUserPubSub::SubRef sub; 
+  std::optional<RGWPubSub> ps;
+  RGWPubSub::SubRef sub; 
 
   virtual int get_params() = 0;
 
@@ -228,7 +228,7 @@ public:
 // notification creation
 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   string bucket_name;
   RGWBucketInfo bucket_info;
 
@@ -248,7 +248,7 @@ public:
 // delete a notification
 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   std::string bucket_name;
   RGWBucketInfo bucket_info;
   
@@ -270,7 +270,7 @@ class RGWPSListNotifsOp : public RGWOp {
 protected:
   std::string bucket_name;
   RGWBucketInfo bucket_info;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
 
   virtual int get_params() = 0;
 
index d78bb29c76c5c6167ad7fd6065eeb2d48798e745..0dbe3400e69ab28847a09b70f25e16b9072385e3 100644 (file)
@@ -930,9 +930,9 @@ class PSManager
         } else {
           using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
           yield {
-            RGWUserPubSub ups(sync_env->store, owner);
+            RGWPubSub ps(sync_env->store, owner.tenant);
             rgw_raw_obj obj;
-            ups.get_sub_meta_obj(sub_name, &obj);
+            ps.get_sub_meta_obj(sub_name, &obj);
             bool empty_on_enoent = false;
             call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
                                 obj,
@@ -1095,12 +1095,12 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   rgw_obj_key key;
   rgw::notify::EventType event_type;
 
-  RGWUserPubSub ups;
+  RGWPubSub ps;
 
   rgw_raw_obj bucket_obj;
   rgw_raw_obj user_obj;
   rgw_pubsub_bucket_topics bucket_topics;
-  rgw_pubsub_user_topics user_topics;
+  rgw_pubsub_topics user_topics;
   TopicsRef *topics;
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
@@ -1116,14 +1116,14 @@ public:
                                                           bucket(_bucket),
                                                           key(_key),
                                                           event_type(_event_type),
-                                                          ups(sync_env->store, owner),
+                                                          ps(sync_env->store, owner.tenant),
                                                           topics(_topics) {
     *topics = std::make_shared<vector<PSTopicConfigRef> >();
   }
   int operate() override {
     reenter(this) {
-      ups.get_bucket_meta_obj(bucket, &bucket_obj);
-      ups.get_user_meta_obj(&user_obj);
+      ps.get_bucket_meta_obj(bucket, &bucket_obj);
+      ps.get_meta_obj(&user_obj);
 
       using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
       yield {
@@ -1139,7 +1139,7 @@ public:
       ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
 
       if (!bucket_topics.topics.empty()) {
-       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
        yield {
          bool empty_on_enoent = true;
          call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
index 9a77ea78627a847b299f9342692b0b4773d44e93..b84fcbb95bc3f54fa4315257fb29d8f5680cf905 100644 (file)
@@ -245,7 +245,7 @@ public:
     sub_name = s->object.name;
     marker = s->info.args.get("marker");
     const int ret = s->info.args.get_int("max-entries", &max_entries, 
-        RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
+        RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
     if (ret < 0) {
       ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
       return -EINVAL;
@@ -373,9 +373,9 @@ public:
 
 void RGWPSCreateNotif_ObjStore::execute()
 {
-  ups.emplace(store, s->owner.get_id());
+  ps.emplace(store, s->owner.get_id().tenant);
 
-  auto b = ups->get_bucket(bucket_info.bucket);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -410,8 +410,8 @@ void RGWPSDeleteNotif_ObjStore::execute() {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -449,8 +449,8 @@ public:
 
 void RGWPSListNotifs_ObjStore::execute()
 {
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->get_topics(&result);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
index 9f2dc27ce1e4db3c76943d7f6c486e08c1cdd605..83e88f783fc6a70a2170c4d077ef8075f3f6bcba 100644 (file)
@@ -1763,10 +1763,9 @@ def test_ps_s3_notification_multi_delete_on_master():
     client_threads = []
     objects_size = {}
     for i in range(number_of_objects):
-        object_size = randint(1, 1024)
-        content = str(os.urandom(object_size))
+        content = str(os.urandom(randint(1, 1024)))
         key = bucket.new_key(str(i))
-        objects_size[key.name] = object_size
+        objects_size[key.name] = len(content)
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
@@ -1834,10 +1833,9 @@ def test_ps_s3_notification_push_http_on_master():
     objects_size = {}
     start_time = time.time()
     for i in range(number_of_objects):
-        object_size = randint(1, 1024)
-        content = str(os.urandom(object_size))
+        content = str(os.urandom(randint(1, 1024)))
         key = bucket.new_key(str(i))
-        objects_size[key.name] = object_size
+        objects_size[key.name] = len(content)
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
@@ -2912,16 +2910,20 @@ def test_ps_s3_metadata_on_master():
     
     # create objects in the bucket using COPY
     bucket.copy_key('copy_of_foo', bucket.name, key.name)
+
     # create objects in the bucket using multi-part upload
-    fp = tempfile.NamedTemporaryFile(mode='w')
-    fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
-    fp.close()
+    fp = tempfile.NamedTemporaryFile(mode='w+b')
+    object_size = 1024
+    content = bytearray(os.urandom(object_size))
+    fp.write(content)
+    fp.flush()
+    fp.seek(0)
     uploader = bucket.initiate_multipart_upload('multipart_foo', 
             metadata={meta_key: meta_value})
-    fp = tempfile.TemporaryFile(mode='r')
     uploader.upload_part_from_file(fp, 1)
     uploader.complete_upload()
     fp.close()
+
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver