]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: trigger notifications on changes from any user 38136/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 17 Nov 2020 11:31:59 +0000 (13:31 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 1 Dec 2020 18:23:43 +0000 (20:23 +0200)
any user authorized to make changes to a bucket may trigger
notifications defined on that bucket.
manual test procedure of the fix is described here:
https://gist.github.com/yuvalif/39c183aa0f74d286ecef7844268817df

fixes: https://tracker.ceph.com/issues/47904

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/rgw_admin.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_pubsub_common.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc

index 7a4b9d070669c3fd5b14d4a1d0a38520395b9d4b..60031f9ee5f271fbe74b9e2978b1a062f8189518 100644 (file)
@@ -12,8 +12,8 @@ Currently, notifications could be sent to: HTTP, AMQP0.9.1 and Kafka endpoints.
 Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
 the `PubSub Module`_ should be used instead of the bucket notification mechanism.
 
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only associate them with buckets it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
 
 In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
@@ -50,25 +50,25 @@ In this case, the only latency added to the original operation is of committing
 Topic Management via CLI
 ------------------------
 
-Configuration of all topics of a user could be fetched using the following command:
+Configuration of all topics, associated with a tenant, could be fetched using the following command:
 
 ::
 
-   # radosgw-admin topic list --uid={user-id}
+   # radosgw-admin topic list [--tenant={tenant}]
 
 
 Configuration of a specific topic could be fetched using:
 
 ::
 
-   # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+   # radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
 
 
 And removed using:
 
 ::
 
-   # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+   # radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
 
 
 Notification Performance Stats
@@ -256,7 +256,7 @@ The response will have the following format:
 List Topics
 ```````````
 
-List all topics that user defined.
+List all topics associated with a tenant.
 
 ::
 
index d39ab3e84a849cc57bd4bac310d8206e5b41290a..638d8d3703c75eb460bff4ce8ccd93091db9ae4d 100644 (file)
@@ -15,9 +15,8 @@ A push notification mechanism exists too, currently supporting HTTP,
 AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
 and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module. 
 
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only subscribe to events published by buckets
-it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
 
 In order to publish events for specific bucket a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
@@ -31,7 +30,7 @@ mechanisms. This API has two flavors, one is S3-compatible and one is not. The t
 together, although it is recommended to use the S3-compatible one. 
 The S3-compatible API is similar to the one used in the bucket notification mechanism.
 
-Events are stored as RGW objects in a special bucket, under a special user. Events cannot
+Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
 be accessed directly, but need to be pulled and acked using the new REST API.
 
 .. toctree::
@@ -116,52 +115,52 @@ A configuration field can be removed by using ``--tier-config-rm={key}``.
 Topic and Subscription Management via CLI
 -----------------------------------------
 
-Configuration of all topics of a user could be fetched using the following command:
+Configuration of all topics, associated with a tenant, could be fetched using the following command:
    
 ::
    
-   # radosgw-admin topic list --uid={user-id}
+   # radosgw-admin topic list [--tenant={tenant}]
 
 
 Configuration of a specific topic could be fetched using:
 
 ::
    
-   # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+   # radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
 
 
 And removed using:
 
 ::
    
-   # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+   # radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
 
 
 Configuration of a subscription could be fetched using:
 
 ::
    
-   # radosgw-admin subscription get --uid={user-id} --subscription={topic-name}
+   # radosgw-admin subscription get --subscription={topic-name} [--tenant={tenant}]
 
 And removed using:
 
 ::
    
-   # radosgw-admin subscription rm --uid={user-id} --subscription={topic-name}
+   # radosgw-admin subscription rm --subscription={topic-name} [--tenant={tenant}]
 
 
 To fetch all of the events stored in a subcription, use:
 
 ::
    
-   # radosgw-admin subscription pull --uid={user-id} --subscription={topic-name} [--marker={last-marker}]
+   # radosgw-admin subscription pull --subscription={topic-name} [--marker={last-marker}] [--tenant={tenant}]
 
 
 To ack (and remove) an event from a subscription, use:
 
 ::
    
-   # radosgw-admin subscription ack --uid={user-id} --subscription={topic-name} --event-id={event-id}
+   # radosgw-admin subscription ack --subscription={topic-name} --event-id={event-id} [--tenant={tenant}]
 
 
 PubSub Performance Stats
@@ -307,7 +306,7 @@ Delete the specified topic.
 List Topics
 ```````````
 
-List all topics that user defined.
+List all topics associated with a tenant.
 
 ::
 
index c85bf8564045b0a1e82c5d46d54d0fc77d83a493..e73e1743daa7a037e871adf76aa794150029a431 100644 (file)
@@ -9113,13 +9113,8 @@ next:
  }
 
   if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
 
-    RGWUserPubSub ups(store, user_info.user_id);
+    RGWPubSub ps(store, tenant);
 
     rgw_bucket bucket;
 
@@ -9132,7 +9127,7 @@ next:
         return -ret;
       }
 
-      auto b = ups.get_bucket(bucket_info.bucket);
+      auto b = ps.get_bucket(bucket_info.bucket);
       ret = b->get_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
@@ -9140,8 +9135,8 @@ next:
       }
       encode_json("result", result, formatter.get());
     } else {
-      rgw_pubsub_user_topics result;
-      int ret = ups.get_user_topics(&result);
+      rgw_pubsub_topics result;
+      int ret = ps.get_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
         return -ret;
@@ -9156,15 +9151,11 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     rgw_pubsub_topic_subs topic;
-    ret = ups.get_topic(topic_name, &topic);
+    ret = ps.get_topic(topic_name, &topic);
     if (ret < 0) {
       cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -9178,14 +9169,10 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    ret = ups.remove_topic(topic_name, null_yield);
+    RGWPubSub ps(store, tenant);
+
+    ret = ps.remove_topic(topic_name, null_yield);
     if (ret < 0) {
       cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -9197,20 +9184,16 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     rgw_pubsub_sub_config sub_conf;
 
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ps.get_sub(sub_name);
     ret = sub->get_conf(&sub_conf);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
@@ -9225,18 +9208,14 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    auto sub = ups.get_sub(sub_name);
+    RGWPubSub ps(store, tenant);
+
+    auto sub = ps.get_sub(sub_name);
     ret = sub->unsubscribe(topic_name, null_yield);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
@@ -9249,21 +9228,17 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWPubSub ps(store, tenant);
 
     if (!max_entries_specified) {
-      max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
+      max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
     }
-    auto sub = ups.get_sub_with_events(sub_name);
+    auto sub = ps.get_sub_with_events(sub_name);
     ret = sub->list_events(marker, max_entries);
     if (ret < 0) {
       cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
@@ -9278,10 +9253,6 @@ next:
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
     }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
     if (sub_name.empty()) {
       cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
@@ -9290,10 +9261,10 @@ next:
       cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
       return EINVAL;
     }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
 
-    auto sub = ups.get_sub_with_events(sub_name);
+    RGWPubSub ps(store, tenant);
+
+    auto sub = ps.get_sub_with_events(sub_name);
     ret = sub->remove_event(event_id);
     if (ret < 0) {
       cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
index 3f56da71ac6f5495677d36a98f41a0c2922fcda3..f611cf188106176f92844763de247836a8455b1a 100644 (file)
@@ -650,8 +650,8 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, const rgw:
 int publish_reserve(EventType event_type,
       reservation_t& res) 
 {
-  RGWUserPubSub ps_user(res.store, res.s->user->get_id());
-  RGWUserPubSub::Bucket ps_bucket(&ps_user, res.s->bucket->get_key());
+  RGWPubSub ps(res.store, res.s->user->get_id().tenant);
+  RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
   rgw_pubsub_bucket_topics bucket_topics;
   auto rc = ps_bucket.get_topics(&bucket_topics);
   if (rc < 0) {
index d3a27807c3b66c0dfb03965db2b3a6f0d5bce9ce..5a7f3b3727ba1c9fc768b78f4ebda9ec1e9cbb26 100644 (file)
@@ -341,7 +341,7 @@ void rgw_pubsub_bucket_topics::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_topics::dump(Formatter *f) const
+void rgw_pubsub_topics::dump(Formatter *f) const
 {
   Formatter::ArraySection s(*f, "topics");
   for (auto& t : topics) {
@@ -349,7 +349,7 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+void rgw_pubsub_topics::dump_xml(Formatter *f) const
 {
   for (auto& t : topics) {
     encode_xml("member", t.second.topic, f);
@@ -387,14 +387,14 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const
   encode_json("s3_id", s3_id, f);
 }
 
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) : 
+RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) : 
                             store(_store),
-                            user(_user),
+                            tenant(_tenant),
                             obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
-    get_user_meta_obj(&user_meta_obj);
+    get_meta_obj(&meta_obj);
 }
 
-int RGWUserPubSub::remove(const rgw_raw_obj& obj,
+int RGWPubSub::remove(const rgw_raw_obj& obj,
                          RGWObjVersionTracker *objv_tracker,
                          optional_yield y)
 {
@@ -406,9 +406,9 @@ int RGWUserPubSub::remove(const rgw_raw_obj& obj,
   return 0;
 }
 
-int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
 {
-  int ret = read(user_meta_obj, result, objv_tracker);
+  int ret = read(meta_obj, result, objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -416,10 +416,10 @@ int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersio
   return 0;
 }
 
-int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics,
+int RGWPubSub::write_topics(const rgw_pubsub_topics& topics,
                                     RGWObjVersionTracker *objv_tracker, optional_yield y)
 {
-  int ret = write(user_meta_obj, topics, objv_tracker, y);
+  int ret = write(meta_obj, topics, objv_tracker, y);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -427,12 +427,12 @@ int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics,
   return 0;
 }
 
-int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
+int RGWPubSub::get_topics(rgw_pubsub_topics *result)
 {
-  return read_user_topics(result, nullptr);
+  return read_topics(result, nullptr);
 }
 
-int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->read(bucket_meta_obj, result, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
@@ -442,7 +442,7 @@ int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjV
   return 0;
 }
 
-int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
+int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
                                        RGWObjVersionTracker *objv_tracker,
                                        optional_yield y)
 {
@@ -455,15 +455,15 @@ int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
   return 0;
 }
 
-int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
+int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
 {
   return read_topics(result, nullptr);
 }
 
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
 {
-  rgw_pubsub_user_topics topics;
-  int ret = get_user_topics(&topics);
+  rgw_pubsub_topics topics;
+  int ret = get_topics(&topics);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -479,10 +479,10 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
   return 0;
 }
 
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
 {
-  rgw_pubsub_user_topics topics;
-  int ret = get_user_topics(&topics);
+  rgw_pubsub_topics topics;
+  int ret = get_topics(&topics);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -498,15 +498,15 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
   return 0;
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
+int RGWPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
   return create_notification(topic_name, events, std::nullopt, "", y);
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
-  rgw_pubsub_topic_subs user_topic_info;
+int RGWPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
+  rgw_pubsub_topic_subs topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->get_topic(topic_name, &user_topic_info);
+  int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
     return ret;
@@ -526,7 +526,7 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name,const rg
     bucket.name << "'" << dendl;
 
   auto& topic_filter = bucket_topics.topics[topic_name];
-  topic_filter.topic = user_topic_info.topic;
+  topic_filter.topic = topic_info.topic;
   topic_filter.events = events;
   topic_filter.s3_id = notif_name;
   if (s3_filter) {
@@ -544,12 +544,12 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name,const rg
   return 0;
 }
 
-int RGWUserPubSub::Bucket::remove_notification(const string& topic_name, optional_yield y)
+int RGWPubSub::Bucket::remove_notification(const string& topic_name, optional_yield y)
 {
-  rgw_pubsub_topic_subs user_topic_info;
+  rgw_pubsub_topic_subs topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->get_topic(topic_name, &user_topic_info);
+  int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
     return ret;
@@ -575,15 +575,15 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name, optiona
   return 0;
 }
 
-int RGWUserPubSub::create_topic(const string& name, optional_yield y) {
+int RGWPubSub::create_topic(const string& name, optional_yield y) {
   return create_topic(name, rgw_pubsub_sub_dest(), "", "", y);
 }
 
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
+int RGWPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = read_user_topics(&topics, &objv_tracker);
+  int ret = read_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     // its not an error if not topics exist, we create one
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
@@ -591,13 +591,13 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   }
  
   rgw_pubsub_topic_subs& new_topic = topics.topics[name];
-  new_topic.topic.user = user;
+  new_topic.topic.user = rgw_user("", tenant);
   new_topic.topic.name = name;
   new_topic.topic.dest = dest;
   new_topic.topic.arn = arn;
   new_topic.topic.opaque_data = opaque_data;
 
-  ret = write_user_topics(topics, &objv_tracker, y);
+  ret = write_topics(topics, &objv_tracker, y);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -606,12 +606,12 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   return 0;
 }
 
-int RGWUserPubSub::remove_topic(const string& name, optional_yield y)
+int RGWPubSub::remove_topic(const string& name, optional_yield y)
 {
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = read_user_topics(&topics, &objv_tracker);
+  int ret = read_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -623,7 +623,7 @@ int RGWUserPubSub::remove_topic(const string& name, optional_yield y)
 
   topics.topics.erase(name);
 
-  ret = write_user_topics(topics, &objv_tracker, y);
+  ret = write_topics(topics, &objv_tracker, y);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
     return ret;
@@ -632,7 +632,7 @@ int RGWUserPubSub::remove_topic(const string& name, optional_yield y)
   return 0;
 }
 
-int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
 {
   int ret = ps->read(sub_meta_obj, result, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
@@ -642,7 +642,7 @@ int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTra
   return 0;
 }
 
-int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
+int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
                                  RGWObjVersionTracker *objv_tracker,
                                  optional_yield y)
 {
@@ -655,7 +655,7 @@ int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
   return 0;
 }
 
-int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
+int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
                                   optional_yield y)
 {
   int ret = ps->remove(sub_meta_obj, objv_tracker, y);
@@ -667,18 +667,18 @@ int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
   return 0;
 }
 
-int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
+int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
 {
   return read_sub(result, nullptr);
 }
 
-int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
+int RGWPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
 {
-  RGWObjVersionTracker user_objv_tracker;
-  rgw_pubsub_user_topics topics;
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_topics topics;
   rgw::sal::RGWRadosStore *store = ps->store;
 
-  int ret = ps->read_user_topics(&topics, &user_objv_tracker);
+  int ret = ps->read_topics(&topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret != -ENOENT ? ret : -EINVAL;
@@ -694,7 +694,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
 
   rgw_pubsub_sub_config sub_conf;
 
-  sub_conf.user = ps->user;
+  sub_conf.user = rgw_user("", ps->tenant);
   sub_conf.name = sub;
   sub_conf.topic = topic;
   sub_conf.dest = dest;
@@ -702,7 +702,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
 
   t.subs.insert(sub);
 
-  ret = ps->write_user_topics(topics, &user_objv_tracker, y);
+  ret = ps->write_topics(topics, &objv_tracker, y);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -716,7 +716,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
   return 0;
 }
 
-int RGWUserPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
+int RGWPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
 {
   string topic = _topic;
   RGWObjVersionTracker sobjv_tracker;
@@ -733,9 +733,9 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
   }
 
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_topics topics;
 
-  int ret = ps->read_user_topics(&topics, &objv_tracker);
+  int ret = ps->read_topics(&topics, &objv_tracker);
   if (ret < 0) {
     // not an error - could be that topic was already deleted
     ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@@ -746,7 +746,7 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
 
       t.subs.erase(sub);
 
-      ret = ps->write_user_topics(topics, &objv_tracker, y);
+      ret = ps->write_topics(topics, &objv_tracker, y);
       if (ret < 0) {
         ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
         return ret;
@@ -763,7 +763,7 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
 }
 
 template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
+void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
 {
   encode_json("next_marker", next_marker, f);
   encode_json("is_truncated", is_truncated, f);
@@ -775,7 +775,7 @@ void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter
 }
 
 template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
+int RGWPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
 {
   RGWRados *store = ps->store->getRados();
   rgw_pubsub_sub_config sub_conf;
@@ -840,7 +840,7 @@ int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, i
 }
 
 template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
+int RGWPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
 {
   rgw::sal::RGWRadosStore *store = ps->store;
   rgw_pubsub_sub_config sub_conf;
@@ -878,25 +878,25 @@ int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id
   return 0;
 }
 
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
+  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
 }
 
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
 }
 
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
 }
 
 template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
+void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
   list.dump(f);
 }
 
 // explicit instantiation for the only two possible types
 // no need to move implementation to header
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
 
index 8c76a4df2010f3de3488efe8af8be425d8465395..38f3216a6ef8bed9c356d7749cbfc605f5170917 100644 (file)
@@ -471,7 +471,7 @@ struct rgw_pubsub_topic {
   }
 
   string to_str() const {
-    return user.to_str() + "/" + name;
+    return user.tenant + "/" + name;
   }
 
   void dump(Formatter *f) const;
@@ -564,7 +564,7 @@ struct rgw_pubsub_bucket_topics {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
 
-struct rgw_pubsub_user_topics {
+struct rgw_pubsub_topics {
   std::map<std::string, rgw_pubsub_topic_subs> topics;
 
   void encode(bufferlist& bl) const {
@@ -582,66 +582,66 @@ struct rgw_pubsub_user_topics {
   void dump(Formatter *f) const;
   void dump_xml(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+WRITE_CLASS_ENCODER(rgw_pubsub_topics)
 
-static std::string pubsub_user_oid_prefix = "pubsub.user.";
+static std::string pubsub_oid_prefix = "pubsub.";
 
-class RGWUserPubSub
+class RGWPubSub
 {
   friend class Bucket;
 
   rgw::sal::RGWRadosStore *store;
-  rgw_user user;
+  const std::string tenant;
   RGWSysObjectCtx obj_ctx;
 
-  rgw_raw_obj user_meta_obj;
+  rgw_raw_obj meta_obj;
 
-  std::string user_meta_oid() const {
-    return pubsub_user_oid_prefix + user.to_str();
+  std::string meta_oid() const {
+    return pubsub_oid_prefix + tenant;
   }
 
   std::string bucket_meta_oid(const rgw_bucket& bucket) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+    return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id;
   }
 
   std::string sub_meta_oid(const string& name) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+    return pubsub_oid_prefix + tenant + ".sub." + name;
   }
 
   template <class T>
-  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+  int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
 
   template <class T>
   int write(const rgw_raw_obj& obj, const T& info,
-           RGWObjVersionTracker *obj_tracker, optional_yield y);
+           RGWObjVersionTrackerobj_tracker, optional_yield y);
 
-  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker,
+  int remove(const rgw_raw_obj& obj, RGWObjVersionTrackerobjv_tracker,
             optional_yield y);
 
-  int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
-  int write_user_topics(const rgw_pubsub_user_topics& topics,
-                       RGWObjVersionTracker *objv_tracker, optional_yield y);
+  int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+  int write_topics(const rgw_pubsub_topics& topics,
+                       RGWObjVersionTrackerobjv_tracker, optional_yield y);
 
 public:
-  RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user);
+  RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant);
 
   class Bucket {
-    friend class RGWUserPubSub;
-    RGWUserPubSub *ps;
+    friend class RGWPubSub;
+    RGWPubSub *ps;
     rgw_bucket bucket;
     rgw_raw_obj bucket_meta_obj;
 
     // read the list of topics associated with a bucket and populate into result
     // use version tacker to enforce atomicity between read/write
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
-    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTrackerobjv_tracker);
     // set the list of topics associated with a bucket
     // use version tacker to enforce atomicity between read/write
     // return 0 on success, error code otherwise
     int write_topics(const rgw_pubsub_bucket_topics& topics,
-                    RGWObjVersionTracker *objv_tracker, optional_yield y);
+                    RGWObjVersionTrackerobjv_tracker, optional_yield y);
   public:
-    Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+    Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
       ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
     }
 
@@ -665,18 +665,18 @@ public:
 
   // base class for subscription
   class Sub {
-    friend class RGWUserPubSub;
+    friend class RGWPubSub;
   protected:
-    RGWUserPubSub* const ps;
+    RGWPubSub* const ps;
     const std::string sub;
     rgw_raw_obj sub_meta_obj;
 
-    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
+    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTrackerobjv_tracker);
     int write_sub(const rgw_pubsub_sub_config& sub_conf,
-                 RGWObjVersionTracker *objv_tracker, optional_yield y);
-    int remove_sub(RGWObjVersionTracker *objv_tracker, optional_yield y);
+                 RGWObjVersionTrackerobjv_tracker, optional_yield y);
+    int remove_sub(RGWObjVersionTrackerobjv_tracker, optional_yield y);
   public:
-    Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
+    Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
       ps->get_sub_meta_obj(sub, &sub_meta_obj);
     }
 
@@ -706,7 +706,7 @@ public:
     } list;
 
   public:
-    SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
+    SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
 
     virtual ~SubWithEvents() = default;
     
@@ -738,14 +738,14 @@ public:
     return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
   }
 
-  void get_user_meta_obj(rgw_raw_obj *obj) const;
+  void get_meta_obj(rgw_raw_obj *obj) const;
   void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
 
   void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
 
-  // get all topics defined for the user and populate them into "result"
+  // get all topics (per tenant, if used)) and populate them into "result"
   // return 0 on success or if no topics exist, error code otherwise
-  int get_user_topics(rgw_pubsub_user_topics *result);
+  int get_topics(rgw_pubsub_topics *result);
   // get a topic with its subscriptions by its name and populate it into "result"
   // return -ENOENT if the topic does not exists 
   // return 0 on success, error code otherwise
@@ -770,7 +770,7 @@ public:
 
 
 template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
 {
   bufferlist bl;
   int ret = rgw_get_system_obj(obj_ctx,
@@ -793,8 +793,8 @@ int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker
 }
 
 template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info,
-                        RGWObjVersionTracker *objv_tracker, optional_yield y)
+int RGWPubSub::write(const rgw_raw_obj& obj, const T& info,
+                        RGWObjVersionTrackerobjv_tracker, optional_yield y)
 {
   bufferlist bl;
   encode(info, bl);
index d6de68c2cf16c15d22a25e6d605c0abdac775399..145024be704fc051feb7397ae19209a1a3a587fb 100644 (file)
@@ -452,8 +452,8 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
   std::string data_bucket_prefix = "";
   std::string data_oid_prefix = "";
@@ -499,7 +499,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
 
     // get topic information. destination information is stored in the topic
     rgw_pubsub_topic topic_info;  
-    op_ret = ups->get_topic(topic_name, &topic_info);
+    op_ret = ps->get_topic(topic_name, &topic_info);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
@@ -514,7 +514,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+    op_ret = ps->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
@@ -528,7 +528,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
       ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
       // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name, y);
+      ps->remove_topic(unique_topic_name, y);
       return;
     }
     ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
@@ -538,14 +538,14 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
       rgw_pubsub_sub_dest dest = topic_info.dest;
       dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
       dest.oid_prefix = data_oid_prefix + notif_name + "/";
-      auto sub = ups->get_sub(notif_name);
+      auto sub = ps->get_sub(notif_name);
       op_ret = sub->subscribe(unique_topic_name, dest, y, notif_name);
       if (op_ret < 0) {
         ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
         // rollback generated notification (ignore return value)
         b->remove_notification(unique_topic_name, y);
         // rollback generated topic (ignore return value)
-        ups->remove_topic(unique_topic_name, y);
+        ps->remove_topic(unique_topic_name, y);
         return;
       }
       ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
@@ -573,12 +573,12 @@ private:
     return 0;
   }
 
-  void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b, optional_yield y) {
+  void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) {
     op_ret = b->remove_notification(topic_name, y);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
-    op_ret = ups->remove_topic(topic_name, y);
+    op_ret = ps->remove_topic(topic_name, y);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
@@ -595,8 +595,8 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
 
   // get all topics on a bucket
@@ -613,7 +613,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
     if (unique_topic) {
       // remove the auto generated subscription according to notification name (if exist)
       const auto unique_topic_name = unique_topic->get().topic.name;
-      auto sub = ups->get_sub(notif_name);
+      auto sub = ps->get_sub(notif_name);
       op_ret = sub->unsubscribe(unique_topic_name, y);
       if (op_ret < 0 && op_ret != -ENOENT) {
         ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
@@ -631,9 +631,9 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
   for (const auto& topic : bucket_topics.topics) {
     // remove the auto generated subscription of the topic (if exist)
     rgw_pubsub_topic_subs topic_subs;
-    op_ret = ups->get_topic(topic.first, &topic_subs);
+    op_ret = ps->get_topic(topic.first, &topic_subs);
     for (const auto& topic_sub_name : topic_subs.subs) {
-      auto sub = ups->get_sub(topic_sub_name);
+      auto sub = ps->get_sub(topic_sub_name);
       rgw_pubsub_sub_config sub_conf;
       op_ret = sub->get_conf(&sub_conf);
       if (op_ret < 0) {
@@ -694,8 +694,8 @@ public:
 };
 
 void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
   
   // get all topics on a bucket
index 3b7d71828dbd69d3608d1a51f395191840a1337e..1d1311d2b9292d79a988e64b94ad09446bb708e1 100644 (file)
@@ -40,7 +40,7 @@ bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
     return topic.topic.dest.stored_secret;
 }
 
-bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
     for (const auto& topic : topics.topics) {
         if (topic_has_endpoint_secret(topic.second)) return true;
     }
@@ -53,8 +53,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data, y);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->create_topic(topic_name, dest, topic_arn, opaque_data, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -63,8 +63,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
 }
 
 void RGWPSListTopicsOp::execute(optional_yield y) {
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->get_user_topics(&result);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->get_topics(&result);
   // if there are no topics it is not considered an error
   op_ret = op_ret == -ENOENT ? 0 : op_ret;
   if (op_ret < 0) {
@@ -84,8 +84,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->get_topic(topic_name, &result);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->get_topic(topic_name, &result);
   if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
     ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
     op_ret = -EPERM;
@@ -103,8 +103,8 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  op_ret = ups->remove_topic(topic_name, y);
+  ps.emplace(store, s->owner.get_id().tenant);
+  op_ret = ps->remove_topic(topic_name, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
     return;
@@ -117,8 +117,8 @@ void RGWPSCreateSubOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->subscribe(topic_name, dest, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -132,8 +132,8 @@ void RGWPSGetSubOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->get_conf(&result);
   if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
     ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
@@ -152,8 +152,8 @@ void RGWPSDeleteSubOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub(sub_name);
   op_ret = sub->unsubscribe(topic_name, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -167,8 +167,8 @@ void RGWPSAckSubEventOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  auto sub = ups->get_sub_with_events(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto sub = ps->get_sub_with_events(sub_name);
   op_ret = sub->remove_event(event_id);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
@@ -182,8 +182,8 @@ void RGWPSPullSubEventsOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  ups.emplace(store, s->owner.get_id());
-  sub = ups->get_sub_with_events(sub_name);
+  ps.emplace(store, s->owner.get_id().tenant);
+  sub = ps->get_sub_with_events(sub_name);
   if (!sub) {
     op_ret = -ENOENT;
     ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
index 0abb27c5ccffac2159d2c64a4642d6b2b6cdfc72..f42a40e2fd1e57ef09782460998e0af872752b32 100644 (file)
@@ -14,7 +14,7 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext
 // create a topic
 class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   std::string topic_name;
   rgw_pubsub_sub_dest dest;
   std::string topic_arn;
@@ -39,8 +39,8 @@ public:
 // list all topics
 class RGWPSListTopicsOp : public RGWOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
-  rgw_pubsub_user_topics result;
+  std::optional<RGWPubSub> ps;
+  rgw_pubsub_topics result;
 
 public:
   int verify_permission(optional_yield) override {
@@ -60,7 +60,7 @@ public:
 class RGWPSGetTopicOp : public RGWOp {
 protected:
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_topic_subs result;
   
   virtual int get_params() = 0;
@@ -83,7 +83,7 @@ public:
 class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
 protected:
   string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -106,7 +106,7 @@ class RGWPSCreateSubOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_sub_dest dest;
   
   virtual int get_params() = 0;
@@ -129,7 +129,7 @@ public:
 class RGWPSGetSubOp : public RGWOp {
 protected:
   std::string sub_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   rgw_pubsub_sub_config result;
   
   virtual int get_params() = 0;
@@ -153,7 +153,7 @@ class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string topic_name;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -176,7 +176,7 @@ class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
 protected:
   std::string sub_name;
   std::string event_id;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   
   virtual int get_params() = 0;
 
@@ -204,8 +204,8 @@ protected:
   int max_entries{0};
   std::string sub_name;
   std::string marker;
-  std::optional<RGWUserPubSub> ups;
-  RGWUserPubSub::SubRef sub; 
+  std::optional<RGWPubSub> ps;
+  RGWPubSub::SubRef sub; 
 
   virtual int get_params() = 0;
 
@@ -228,7 +228,7 @@ public:
 // notification creation
 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   string bucket_name;
   RGWBucketInfo bucket_info;
 
@@ -248,7 +248,7 @@ public:
 // delete a notification
 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
 protected:
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
   std::string bucket_name;
   RGWBucketInfo bucket_info;
   
@@ -270,7 +270,7 @@ class RGWPSListNotifsOp : public RGWOp {
 protected:
   std::string bucket_name;
   RGWBucketInfo bucket_info;
-  std::optional<RGWUserPubSub> ups;
+  std::optional<RGWPubSub> ps;
 
   virtual int get_params() = 0;
 
index 75e08300386c4d5c321ec075ab075b61a7cf5f5c..c1610747c39024b0fe8b9ad0d5fb4d9e999d8e34 100644 (file)
@@ -780,9 +780,9 @@ class PSManager
         } else {
           using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
           yield {
-            RGWUserPubSub ups(sync_env->store, owner);
+            RGWPubSub ps(sync_env->store, owner.tenant);
             rgw_raw_obj obj;
-            ups.get_sub_meta_obj(sub_name, &obj);
+            ps.get_sub_meta_obj(sub_name, &obj);
             bool empty_on_enoent = false;
             call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
                                 obj,
@@ -942,12 +942,12 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   rgw_obj_key key;
   rgw::notify::EventType event_type;
 
-  RGWUserPubSub ups;
+  RGWPubSub ps;
 
   rgw_raw_obj bucket_obj;
   rgw_raw_obj user_obj;
   rgw_pubsub_bucket_topics bucket_topics;
-  rgw_pubsub_user_topics user_topics;
+  rgw_pubsub_topics user_topics;
   TopicsRef *topics;
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
@@ -963,14 +963,14 @@ public:
                                                           bucket(_bucket),
                                                           key(_key),
                                                           event_type(_event_type),
-                                                          ups(sync_env->store, owner),
+                                                          ps(sync_env->store, owner.tenant),
                                                           topics(_topics) {
     *topics = std::make_shared<vector<PSTopicConfigRef> >();
   }
   int operate() override {
     reenter(this) {
-      ups.get_bucket_meta_obj(bucket, &bucket_obj);
-      ups.get_user_meta_obj(&user_obj);
+      ps.get_bucket_meta_obj(bucket, &bucket_obj);
+      ps.get_meta_obj(&user_obj);
 
       using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
       yield {
@@ -986,7 +986,7 @@ public:
       ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
 
       if (!bucket_topics.topics.empty()) {
-       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
        yield {
          bool empty_on_enoent = true;
          call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
index 2326e958a0ae304dedcd877c5e8e34cceaf9286f..c7feff500226b842726e8d5fdb11fb102025ce1f 100644 (file)
@@ -246,7 +246,7 @@ public:
     sub_name = s->object->get_name();
     marker = s->info.args.get("marker");
     const int ret = s->info.args.get_int("max-entries", &max_entries, 
-        RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
+        RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
     if (ret < 0) {
       ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
       return -EINVAL;
@@ -374,9 +374,9 @@ public:
 
 void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
 {
-  ups.emplace(store, s->owner.get_id());
+  ps.emplace(store, s->owner.get_id().tenant);
 
-  auto b = ups->get_bucket(bucket_info.bucket);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -411,8 +411,8 @@ void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name, y);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -450,8 +450,8 @@ public:
 
 void RGWPSListNotifs_ObjStore::execute(optional_yield y)
 {
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(store, s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   op_ret = b->get_topics(&result);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;