]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: cleanups and data structure changes, api changes
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 6 Jul 2018 19:53:36 +0000 (12:53 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
split user topics and bucket topics, can have multiple buckets per topic
and multiple topics per bucket. Buckets don't hold subscription info.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_common.h
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc

index 52ac87c61aaf8acbad227b34954306a09d20f59a..fde88f0dd3920ed86c9f25ad5d9b6b263de3b62f 100644 (file)
@@ -535,6 +535,8 @@ enum {
   OPT_PUBSUB_TOPICS_LIST,
   OPT_PUBSUB_TOPIC_CREATE,
   OPT_PUBSUB_TOPIC_RM,
+  OPT_PUBSUB_NOTIFICATION_CREATE,
+  OPT_PUBSUB_NOTIFICATION_RM,
   OPT_PUBSUB_SUB_GET,
   OPT_PUBSUB_SUB_CREATE,
   OPT_PUBSUB_SUB_RM,
@@ -564,6 +566,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
       strcmp(cmd, "mdlog") == 0 ||
       strcmp(cmd, "metadata") == 0 ||
       strcmp(cmd, "mfa") == 0 ||
+      strcmp(cmd, "notification") == 0 ||
       strcmp(cmd, "object") == 0 ||
       strcmp(cmd, "objects") == 0 ||
       strcmp(cmd, "olh") == 0 ||
@@ -1015,6 +1018,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
         return OPT_PUBSUB_TOPIC_CREATE;
       if (strcmp(cmd, "rm") == 0)
         return OPT_PUBSUB_TOPIC_RM;
+    } else if (strcmp(prev_cmd, "notification") == 0) {
+      if (strcmp(cmd, "create") == 0)
+        return OPT_PUBSUB_NOTIFICATION_CREATE;
+      if (strcmp(cmd, "rm") == 0)
+        return OPT_PUBSUB_NOTIFICATION_RM;
     } else if (strcmp(prev_cmd, "sub") == 0) {
       if (strcmp(cmd, "get") == 0)
         return OPT_PUBSUB_SUB_GET;
@@ -7779,9 +7787,8 @@ next:
 
     rgw_bucket bucket;
 
-    rgw_pubsub_user_topics result;
-
     if (!bucket_name.empty()) {
+      rgw_pubsub_bucket_topics result;
       RGWBucketInfo bucket_info;
       int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
       if (ret < 0) {
@@ -7789,23 +7796,49 @@ next:
         return -ret;
       }
 
-      ret = ups.get_bucket_topics(bucket_info.bucket, &result);
+      auto b = ups.get_bucket(bucket_info.bucket);
+      ret = b->get_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
         return -ret;
       }
+      encode_json("result", result, formatter);
     } else {
-      int ret = ups.get_topics(&result);
+      rgw_pubsub_user_topics result;
+      int ret = ups.get_user_topics(&result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
         return -ret;
       }
+      encode_json("result", result, formatter);
     }
-    encode_json("result", result, formatter);
     formatter->flush(cout);
   }
 
   if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    ret = ups.create_topic(topic_name);
+    if (ret < 0) {
+      cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
+  if (opt_cmd == OPT_PUBSUB_NOTIFICATION_CREATE) {
     if (get_tier_type(store) != "pubsub") {
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
@@ -7834,14 +7867,15 @@ next:
       return -ret;
     }
 
-    ret = ups.create_topic(topic_name, bucket_info.bucket);
+    auto b = ups.get_bucket(bucket_info.bucket);
+    ret = b->create_notification(topic_name);
     if (ret < 0) {
-      cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
   }
 
-  if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+  if (opt_cmd == OPT_PUBSUB_NOTIFICATION_RM) {
     if (get_tier_type(store) != "pubsub") {
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
       return EINVAL;
@@ -7870,6 +7904,30 @@ next:
       return -ret;
     }
 
+    auto b = ups.get_bucket(bucket_info.bucket);
+    ret = b->remove_notification(topic_name);
+    if (ret < 0) {
+      cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
+  if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
     ret = ups.remove_topic(topic_name);
     if (ret < 0) {
       cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
@@ -7893,14 +7951,15 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    rgw_pubsub_user_sub_config sub;
+    rgw_pubsub_sub_config sub_conf;
 
-    ret = ups.get_sub(sub_name, &sub);
+    auto sub = ups.get_sub(sub_name);
+    ret = sub->get_conf(&sub_conf);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    encode_json("sub", sub, formatter);
+    encode_json("sub", sub_conf, formatter);
     formatter->flush(cout);
   }
 
@@ -7924,28 +7983,28 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    rgw_pubsub_user_topic_info topic;
+    rgw_pubsub_topic_subs topic;
     int ret = ups.get_topic(topic_name, &topic);
     if (ret < 0) {
       cerr << "ERROR: topic not found" << std::endl;
       return EINVAL;
     }
-    rgw_pubsub_user_sub_config sub;
 
     auto& tier_config = get_tier_config(store);
 
-    rgw_pubsub_user_sub_dest dest_config;
+    rgw_pubsub_sub_dest dest_config;
     dest_config.bucket_name = sub_dest_bucket;
     dest_config.oid_prefix = sub_oid_prefix;
     dest_config.push_endpoint = sub_push_endpoint;
 
     if (dest_config.bucket_name.empty()) {
-      dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name;
+      dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
     }
     if (dest_config.oid_prefix.empty()) {
       dest_config.oid_prefix = tier_config["data_oid_prefix"];
     }
-    ret = ups.add_sub(sub_name, topic_name, dest_config);
+    auto sub = ups.get_sub(sub_name);
+    ret = sub->subscribe(topic_name, dest_config);
     if (ret < 0) {
       cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -7968,15 +8027,12 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    rgw_pubsub_user_sub_config sub;
-
-    ret = ups.get_sub(sub_name, &sub);
+    auto sub = ups.get_sub(sub_name);
+    ret = sub->unsubscribe(topic_name);
     if (ret < 0) {
       cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    encode_json("sub", sub, formatter);
-    formatter->flush(cout);
   }
 
  if (opt_cmd == OPT_PUBSUB_SUB_PULL) {
@@ -7995,12 +8051,13 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    RGWUserPubSub::list_events_result result;
+    RGWUserPubSub::Sub::list_events_result result;
 
     if (!max_entries_specified) {
       max_entries = 100;
     }
-    ret = ups.list_events(sub_name, marker, max_entries, &result);
+    auto sub = ups.get_sub(sub_name);
+    ret = sub->list_events(marker, max_entries, &result);
     if (ret < 0) {
       cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -8029,7 +8086,8 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    ret = ups.remove_event(sub_name, event_id);
+    auto sub = ups.get_sub(sub_name);
+    ret = sub->remove_event(event_id);
     if (ret < 0) {
       cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
       return -ret;
index aaaaaeda899cf7b79b0feb50edc81aea2fdb68a1..c5e0a30b596eb067b3aafbdf12a0e9ef9840ed0d 100644 (file)
@@ -540,6 +540,9 @@ enum RGWOpType {
   RGW_OP_PUBSUB_SUB_DELETE,
   RGW_OP_PUBSUB_SUB_PULL,
   RGW_OP_PUBSUB_SUB_ACK,
+  RGW_OP_PUBSUB_NOTIF_CREATE,
+  RGW_OP_PUBSUB_NOTIF_DELETE,
+  RGW_OP_PUBSUB_NOTIF_LIST,
 };
 
 class RGWAccessControlPolicy;
index 449b33c867eb7c7d707b355620d6a56c1f353008..d53a0547bd51c28043bdedabaf9a8930e1e18fc7 100644 (file)
@@ -52,19 +52,26 @@ void rgw_pubsub_event::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_topic::dump(Formatter *f) const
+void rgw_pubsub_topic::dump(Formatter *f) const
 {
+  encode_json("user", user, f);
   encode_json("name", name, f);
-  encode_json("bucket", bucket, f);
 }
 
-void rgw_pubsub_user_topic_info::dump(Formatter *f) const
+void rgw_pubsub_topic_subs::dump(Formatter *f) const
 {
-  encode_json("user", user, f);
   encode_json("topic", topic, f);
   encode_json("subs", subs, f);
 }
 
+void rgw_pubsub_bucket_topics::dump(Formatter *f) const
+{
+  Formatter::ObjectSection s(*f, "topics");
+  for (auto& t : topics) {
+    encode_json(t.first.c_str(), t.second, f);
+  }
+}
+
 void rgw_pubsub_user_topics::dump(Formatter *f) const
 {
   Formatter::ObjectSection s(*f, "topics");
@@ -73,14 +80,14 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const
   }
 }
 
-void rgw_pubsub_user_sub_dest::dump(Formatter *f) const
+void rgw_pubsub_sub_dest::dump(Formatter *f) const
 {
   encode_json("bucket_name", bucket_name, f);
   encode_json("oid_prefix", oid_prefix, f);
   encode_json("push_endpoint", push_endpoint, f);
 }
 
-void rgw_pubsub_user_sub_config::dump(Formatter *f) const
+void rgw_pubsub_sub_config::dump(Formatter *f) const
 {
   encode_json("user", user, f);
   encode_json("name", name, f);
@@ -99,13 +106,19 @@ int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tra
   return 0;
 }
 
-int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result)
+int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
 {
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
+  int ret = read(user_meta_obj, result, objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
 
-  RGWObjVersionTracker objv_tracker;
-  int ret = read(obj, result, &objv_tracker);
+int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
+{
+  int ret = write(user_meta_obj, topics, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -113,29 +126,41 @@ int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result)
   return 0;
 }
 
-int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result)
+int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
 {
-  rgw_raw_obj obj;
-  get_bucket_meta_obj(bucket, &obj);
+  return read_user_topics(result, nullptr);
+}
 
-  RGWObjVersionTracker objv_tracker;
-  int ret = read(obj, result, &objv_tracker);
+int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+{
+  int ret = ps->read(bucket_meta_obj, result, objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    ldout(ps->store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
+{
+  int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
+  if (ret < 0) {
+    ldout(ps->store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
     return ret;
   }
+
   return 0;
 }
 
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result)
+int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
 {
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
+  return read_topics(result, nullptr);
+}
 
-  RGWObjVersionTracker objv_tracker;
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+{
   rgw_pubsub_user_topics topics;
-
-  int ret = read(obj, &topics, &objv_tracker);
+  int ret = get_user_topics(&topics);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -152,121 +177,158 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *res
 }
 
 
-int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name)
 {
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
+  rgw_pubsub_topic_subs user_topic_info;
+  RGWRados *store = ps->store;
+
+  int ret = ps->get_topic(topic_name, &user_topic_info);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+    return ret;
+  }
 
   RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_user_topics topics;
+  rgw_pubsub_bucket_topics bucket_topics;
 
-  int ret = read(obj, &topics, &objv_tracker);
+  ret = read_topics(&bucket_topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  rgw_pubsub_user_topic_info& new_topic = topics.topics[name];
-  new_topic.user = user;
-  new_topic.topic.name = name;
-  new_topic.topic.bucket = bucket;
+  bucket_topics.topics[topic_name] = user_topic_info.topic;
 
-  ret = write(obj, topics, &objv_tracker);
+  ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  ret = update_bucket(topics, bucket);
+  return 0;
+}
+
+int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
+{
+  rgw_pubsub_topic_subs user_topic_info;
+  RGWRados *store = ps->store;
+
+  int ret = ps->get_topic(topic_name, &user_topic_info);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_bucket_topics bucket_topics;
+
+  ret = read_topics(&bucket_topics, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  bucket_topics.topics.erase(topic_name);
+
+  ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
   return 0;
 }
 
-int RGWUserPubSub::remove_topic(const string& name)
+int RGWUserPubSub::create_topic(const string& name)
 {
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
-
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_user_topics topics;
 
-  int ret = read(obj, &topics, &objv_tracker);
+  int ret = read_user_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  rgw_bucket bucket;
-  auto t = topics.topics.find(name);
-  if (t != topics.topics.end()) {
-    bucket = t->second.topic.bucket;
-  }
-
-  topics.topics.erase(name);
+  rgw_pubsub_topic_subs& new_topic = topics.topics[name];
+  new_topic.topic.user = user;
+  new_topic.topic.name = name;
 
-  ret = write(obj, topics, &objv_tracker);
+  ret = write_user_topics(topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  if (bucket.name.empty()) {
-    return 0;
+  return 0;
+}
+
+int RGWUserPubSub::remove_topic(const string& name)
+{
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read_user_topics(&topics, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
   }
 
-  ret = update_bucket(topics, bucket);
+  topics.topics.erase(name);
+
+  ret = write_user_topics(topics, &objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
   return 0;
 }
 
-int RGWUserPubSub::update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket)
+int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
 {
-  rgw_pubsub_user_topics bucket_topics;
-  for (auto& t : topics.topics) {
-    if (t.second.topic.bucket == bucket) {
-      bucket_topics.topics.insert(t);
-    }
+  int ret = ps->read(sub_meta_obj, result, objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(ps->store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+    return ret;
   }
+  return 0;
+}
 
-  rgw_raw_obj bobj;
-  get_bucket_meta_obj(bucket, &bobj);
-  int ret = write(bobj, bucket_topics, nullptr);
+int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
+{
+  int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
   }
+
   return 0;
 }
 
-int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result)
+int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
 {
-  rgw_raw_obj obj;
-  get_sub_meta_obj(name, &obj);
-  int ret = read(obj, result, nullptr);
+  int ret = ps->remove(sub_meta_obj, objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+    ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
   }
+
   return 0;
 }
 
-int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
 {
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
+  return read_sub(result, nullptr);
+}
 
-  RGWObjVersionTracker objv_tracker;
+int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest)
+{
+  RGWObjVersionTracker user_objv_tracker;
   rgw_pubsub_user_topics topics;
+  RGWRados *store = ps->store;
 
-  int ret = read(obj, &topics, &objv_tracker);
+  int ret = ps->read_user_topics(&topics, &user_objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -280,30 +342,22 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
 
   auto& t = iter->second;
 
-  rgw_pubsub_user_sub_config sub_conf;
+  rgw_pubsub_sub_config sub_conf;
 
-  sub_conf.user = user;
-  sub_conf.name = name;
+  sub_conf.user = ps->user;
+  sub_conf.name = sub;
   sub_conf.topic = topic;
   sub_conf.dest = dest;
 
-  t.subs.insert(name);
+  t.subs.insert(sub);
 
-  ret = write(obj, topics, &objv_tracker);
+  ret = ps->write_user_topics(topics, &user_objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  ret = update_bucket(topics, t.topic.bucket);
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
-    return ret;
-  }
-
-  rgw_raw_obj sobj;
-  get_sub_meta_obj(name, &sobj);
-  ret = write(sobj, sub_conf, nullptr);
+  ret = write_sub(sub_conf, nullptr);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
@@ -311,17 +365,15 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
   return 0;
 }
 
-int RGWUserPubSub::remove_sub(const string& name, const string& _topic)
+int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
 {
   string topic = _topic;
-
   RGWObjVersionTracker sobjv_tracker;
-  rgw_raw_obj sobj;
-  get_sub_meta_obj(name, &sobj);
+  RGWRados *store = ps->store;
 
   if (topic.empty()) {
-    rgw_pubsub_user_sub_config sub_conf;
-    int ret = read(sobj, &sub_conf, &sobjv_tracker);
+    rgw_pubsub_sub_config sub_conf;
+    int ret = read_sub(&sub_conf, &sobjv_tracker);
     if (ret < 0) {
       ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
       return ret;
@@ -329,27 +381,22 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic)
     topic = sub_conf.topic;
   }
 
-  rgw_raw_obj obj;
-  get_user_meta_obj(&obj);
-
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_user_topics topics;
 
-  int ret = read(obj, &topics, &objv_tracker);
+  int ret = ps->read_user_topics(&topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
   }
 
   if (ret >= 0) {
     auto iter = topics.topics.find(topic);
-    if (iter == topics.topics.end()) {
-      ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
-    } else {
+    if (iter != topics.topics.end()) {
       auto& t = iter->second;
 
-      t.subs.erase(name);
+      t.subs.erase(sub);
 
-      ret = write(obj, topics, &objv_tracker);
+      ret = ps->write_user_topics(topics, &objv_tracker);
       if (ret < 0) {
         ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
         return ret;
@@ -357,15 +404,15 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic)
     }
   }
 
-  ret = remove(sobj, &sobjv_tracker);
+  ret = remove_sub(&sobjv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
     return ret;
   }
   return 0;
 }
 
-void RGWUserPubSub::list_events_result::dump(Formatter *f) const
+void RGWUserPubSub::Sub::list_events_result::dump(Formatter *f) const
 {
   encode_json("next_marker", next_marker, f);
   encode_json("is_truncated", is_truncated, f);
@@ -376,12 +423,12 @@ void RGWUserPubSub::list_events_result::dump(Formatter *f) const
   }
 }
 
-int RGWUserPubSub::list_events(const string& sub_name,
-                               const string& marker, int max_events,
+int RGWUserPubSub::Sub::list_events(const string& marker, int max_events,
                                list_events_result *result)
 {
-  rgw_pubsub_user_sub_config sub_conf;
-  int ret = get_sub(sub_name, &sub_conf);
+  RGWRados *store = ps->store;
+  rgw_pubsub_sub_config sub_conf;
+  int ret = get_conf(&sub_conf);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
     return ret;
@@ -438,10 +485,11 @@ int RGWUserPubSub::list_events(const string& sub_name,
   return 0;
 }
 
-int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id)
+int RGWUserPubSub::Sub::remove_event(const string& event_id)
 {
-  rgw_pubsub_user_sub_config sub_conf;
-  int ret = get_sub(sub_name, &sub_conf);
+  RGWRados *store = ps->store;
+  rgw_pubsub_sub_config sub_conf;
+  int ret = get_conf(&sub_conf);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
     return ret;
index 071432ed8e1985428e761502ff8a5d44baf6eafb..40f3abb6f985c9dd98dd9880157453fe16470d04 100644 (file)
@@ -54,7 +54,7 @@ struct rgw_pubsub_event {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_event)
 
-struct rgw_pubsub_user_sub_dest {
+struct rgw_pubsub_sub_dest {
   string bucket_name;
   string oid_prefix;
   string push_endpoint;
@@ -77,13 +77,13 @@ struct rgw_pubsub_user_sub_dest {
 
   void dump(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_dest)
+WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
 
-struct rgw_pubsub_user_sub_config {
+struct rgw_pubsub_sub_config {
   rgw_user user;
   string name;
   string topic;
-  rgw_pubsub_user_sub_dest dest;
+  rgw_pubsub_sub_dest dest;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -105,41 +105,44 @@ struct rgw_pubsub_user_sub_config {
 
   void dump(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
 
-struct rgw_pubsub_user_topic {
+struct rgw_pubsub_topic {
+  rgw_user user;
   string name;
-  rgw_bucket bucket;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
+    encode(user, bl);
     encode(name, bl);
-    encode(bucket, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
+    decode(user, bl);
     decode(name, bl);
-    decode(bucket, bl);
     DECODE_FINISH(bl);
   }
 
-  const string& to_str() const {
-    return name;
+  string to_str() const {
+    return user.to_str() + "/" + name;
   }
+
   void dump(Formatter *f) const;
+
+  bool operator<(const rgw_pubsub_topic& t) const {
+    return to_str().compare(t.to_str());
+  }
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topic)
+WRITE_CLASS_ENCODER(rgw_pubsub_topic)
 
-struct rgw_pubsub_user_topic_info {
-  rgw_user user;
-  rgw_pubsub_user_topic topic;
+struct rgw_pubsub_topic_subs {
+  rgw_pubsub_topic topic;
   set<string> subs;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(user, bl);
     encode(topic, bl);
     encode(subs, bl);
     ENCODE_FINISH(bl);
@@ -147,26 +150,36 @@ struct rgw_pubsub_user_topic_info {
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(user, bl);
     decode(topic, bl);
     decode(subs, bl);
     DECODE_FINISH(bl);
   }
 
-  string to_str() const {
-    return user.to_str() + "/" + topic.name;
-  }
-
   void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
 
-  bool operator<(const rgw_pubsub_user_topic& t) const {
-    return to_str().compare(t.to_str());
+struct rgw_pubsub_bucket_topics {
+  map<string, rgw_pubsub_topic> topics;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(topics, bl);
+    ENCODE_FINISH(bl);
   }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(topics, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topic_info)
+WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
 
 struct rgw_pubsub_user_topics {
-  map<string, rgw_pubsub_user_topic_info> topics;
+  map<string, rgw_pubsub_topic_subs> topics;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -188,10 +201,26 @@ static string pubsub_user_oid_prefix = "pubsub.user.";
 
 class RGWUserPubSub
 {
+  friend class Bucket;
+
   RGWRados *store;
   rgw_user user;
   RGWObjectCtx obj_ctx;
 
+  rgw_raw_obj user_meta_obj;
+
+  string user_meta_oid() const {
+    return pubsub_user_oid_prefix + user.to_str();
+  }
+
+  string bucket_meta_oid(const rgw_bucket& bucket) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+  }
+
+  string sub_meta_oid(const string& name) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+  }
+
   template <class T>
   int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
 
@@ -200,22 +229,72 @@ class RGWUserPubSub
 
   int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
 
-  int update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket);
+  int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
+  int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
 public:
   RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
                                                            user(_user),
-                                                           obj_ctx(store) {}
-
-  string user_meta_oid() const {
-    return pubsub_user_oid_prefix + user.to_str();
+                                                           obj_ctx(store) {
+    get_user_meta_obj(&user_meta_obj);
   }
 
-  string bucket_meta_oid(const rgw_bucket& bucket) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+  class Bucket {
+    friend class RGWUserPubSub;
+    RGWUserPubSub *ps;
+    rgw_bucket bucket;
+    rgw_raw_obj bucket_meta_obj;
+
+    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+    int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
+  public:
+    Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+      ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
+    }
+
+    int get_topics(rgw_pubsub_bucket_topics *result);
+    int create_notification(const string& topic_name);
+    int remove_notification(const string& topic_name);
+  };
+
+  class Sub {
+    friend class RGWUserPubSub;
+    RGWUserPubSub *ps;
+    string sub;
+    rgw_raw_obj sub_meta_obj;
+
+    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
+    int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
+    int remove_sub(RGWObjVersionTracker *objv_tracker);
+  public:
+    Sub(RGWUserPubSub *_ps, const string& _sub) : ps(_ps), sub(_sub) {
+      ps->get_sub_meta_obj(sub, &sub_meta_obj);
+    }
+
+    int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest);
+    int unsubscribe(const string& topic_name);
+    int get_conf(rgw_pubsub_sub_config *result);
+
+    struct list_events_result {
+      string next_marker;
+      bool is_truncated{false};
+      std::vector<rgw_pubsub_event> events;
+
+      void dump(Formatter *f) const;
+    };
+
+    int list_events(const string& marker, int max_events, list_events_result *result);
+    int remove_event(const string& event_id);
+  };
+
+  using BucketRef = std::shared_ptr<Bucket>;
+  using SubRef = std::shared_ptr<Sub>;
+
+  BucketRef get_bucket(const rgw_bucket& bucket) {
+    return std::make_shared<Bucket>(this, bucket);
   }
 
-  string sub_meta_oid(const string& name) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+  SubRef get_sub(const string& sub) {
+    return std::make_shared<Sub>(this, sub);
   }
 
   void get_user_meta_obj(rgw_raw_obj *obj) const {
@@ -230,25 +309,10 @@ public:
     *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
   }
 
-  int get_topics(rgw_pubsub_user_topics *result);
-  int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
-  int get_topic(const string& name, rgw_pubsub_user_topic_info *result);
-  int create_topic(const string& name, const rgw_bucket& bucket);
+  int get_user_topics(rgw_pubsub_user_topics *result);
+  int get_topic(const string& name, rgw_pubsub_topic_subs *result);
+  int create_topic(const string& name);
   int remove_topic(const string& name);
-  int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
-  int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
-  int remove_sub(const string& name, const string& topic);
-
-  struct list_events_result {
-    string next_marker;
-    bool is_truncated{false};
-    std::vector<rgw_pubsub_event> events;
-
-    void dump(Formatter *f) const;
-  };
-
-  int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result);
-  int remove_event(const string& sub_name, const string& event_id);
 };
 
 template <class T>
index c434bd37c7a7437beca1589d1cd43c031ce6f6d0..24a7286271411d61566d202c11dd2fead307a129 100644 (file)
@@ -89,7 +89,7 @@ struct PSSubConfig { /* subscription config */
   string data_bucket_name;
   string data_oid_prefix;
 
-  void from_user_conf(const rgw_pubsub_user_sub_config& uc) {
+  void from_user_conf(const rgw_pubsub_sub_config& uc) {
     name = uc.name;
     topic = uc.topic;
     push_endpoint = uc.dest.push_endpoint;
@@ -597,7 +597,7 @@ public:
 
   PSSubscription(RGWDataSyncEnv *_sync_env,
                  PSEnvRef _env,
-                 rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env),
+                 rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env),
                                       env(_env),
                                       sub_conf(std::make_shared<PSSubConfig>()),
                                       data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
@@ -649,7 +649,7 @@ class PSManager
     PSConfigRef conf;
 
     PSSubConfigRef sub_conf;
-    rgw_pubsub_user_sub_config user_sub_conf;
+    rgw_pubsub_sub_config user_sub_conf;
   public:
     GetSubCR(RGWDataSyncEnv *_sync_env,
                       PSManagerRef& _mgr,
@@ -677,7 +677,7 @@ class PSManager
 
           *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
         } else {
-          using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_sub_config>;
+          using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
           yield {
             RGWUserPubSub ups(sync_env->store, owner);
             rgw_raw_obj obj;
@@ -832,8 +832,10 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
 
   RGWUserPubSub ups;
 
-  rgw_raw_obj obj;
-  rgw_pubsub_user_topics bucket_topics;
+  rgw_raw_obj bucket_obj;
+  rgw_raw_obj user_obj;
+  rgw_pubsub_bucket_topics bucket_topics;
+  rgw_pubsub_user_topics user_topics;
   TopicsRef *topics;
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
@@ -853,14 +855,14 @@ public:
   }
   int operate() override {
     reenter(this) {
-      ups.get_bucket_meta_obj(bucket, &obj);
+      ups.get_bucket_meta_obj(bucket, &bucket_obj);
+      ups.get_user_meta_obj(&user_obj);
 
-
-      using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+      using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
       yield {
         bool empty_on_enoent = true;
         call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
-                            obj,
+                            bucket_obj,
                             &bucket_topics, empty_on_enoent));
       }
       if (retcode < 0 && retcode != -ENOENT) {
@@ -869,11 +871,24 @@ public:
 
       ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
 
+      if (!bucket_topics.topics.empty()) {
+       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+       yield {
+         bool empty_on_enoent = true;
+         call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store,
+                                       user_obj,
+                                       &user_topics, empty_on_enoent));
+       }
+       if (retcode < 0 && retcode != -ENOENT) {
+         return set_cr_error(retcode);
+       }
+      }
+
       for (auto& titer : bucket_topics.topics) {
         auto& info = titer.second;
         shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
-        tc->name = info.topic.name;
-        tc->subs = info.subs;
+        tc->name = info.name;
+        tc->subs = user_topics.topics[info.name].subs;
         (*topics)->push_back(tc);
       }
 
index f2df4afb998aa1e3c3d7dd37ccb132cb9af15ac5..5b5392cbde56f4d946a6ab95af1f55b4c5b0a21b 100644 (file)
@@ -14,30 +14,11 @@ protected:
   std::unique_ptr<RGWUserPubSub> ups;
   string topic_name;
   string bucket_name;
-  RGWBucketInfo bucket_info;
 
 public:
   RGWPSCreateTopicOp() {}
 
   int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
-    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
-      return -EPERM;
-    }
-
     return 0;
   }
   void pre_exec() override {
@@ -53,8 +34,13 @@ public:
 
 void RGWPSCreateTopicOp::execute()
 {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, bucket_info.bucket);
+  op_ret = ups->create_topic(topic_name);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
     return;
@@ -67,22 +53,12 @@ public:
 
   int get_params() override {
     topic_name = s->object.name;
-
-    bool exists;
-    bucket_name = s->info.args.get("bucket", &exists);
-    if (!exists) {
-      ldout(s->cct, 20) << "ERROR: missing required param 'bucket' for request" << dendl;
-      return -EINVAL;
-    }
-
     return 0;
   }
 };
 
 class RGWPSListTopicsOp : public RGWOp {
 protected:
-  string bucket_name;
-  RGWBucketInfo bucket_info;
   std::unique_ptr<RGWUserPubSub> ups;
   rgw_pubsub_user_topics result;
 
@@ -91,28 +67,6 @@ public:
   RGWPSListTopicsOp() {}
 
   int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_name.empty()) {
-      return 0;
-    }
-
-    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
-    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
-      return -EPERM;
-    }
-
     return 0;
   }
   void pre_exec() override {
@@ -123,17 +77,12 @@ public:
   const char* name() const override { return "pubsub_topics_list"; }
   virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
   virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-  virtual int get_params() = 0;
 };
 
 void RGWPSListTopicsOp::execute()
 {
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  if (bucket_name.empty()) {
-    op_ret = ups->get_topics(&result);
-  } else {
-    op_ret = ups->get_bucket_topics(bucket_info.bucket, &result);
-  }
+  op_ret = ups->get_user_topics(&result);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
     return;
@@ -145,11 +94,6 @@ class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
 public:
   explicit RGWPSListTopics_ObjStore_S3() {}
 
-  int get_params() override {
-    bucket_name = s->info.args.get("bucket");
-    return 0;
-  }
-
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -170,7 +114,7 @@ class RGWPSGetTopicOp : public RGWOp {
 protected:
   string topic_name;
   std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_user_topic_info result;
+  rgw_pubsub_topic_subs result;
 
 public:
   RGWPSGetTopicOp() {}
@@ -318,7 +262,7 @@ protected:
   string sub_name;
   string topic_name;
   std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_user_sub_dest dest;
+  rgw_pubsub_sub_dest dest;
 
 public:
   RGWPSCreateSubOp() {}
@@ -344,7 +288,8 @@ void RGWPSCreateSubOp::execute()
     return;
   }
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->add_sub(sub_name, topic_name, dest);
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->subscribe(topic_name, dest);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
     return;
@@ -376,7 +321,7 @@ class RGWPSGetSubOp : public RGWOp {
 protected:
   string sub_name;
   std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_user_sub_config result;
+  rgw_pubsub_sub_config result;
 
 public:
   RGWPSGetSubOp() {}
@@ -402,7 +347,8 @@ void RGWPSGetSubOp::execute()
     return;
   }
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->get_sub(sub_name, &result);
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->get_conf(&result);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
     return;
@@ -468,7 +414,8 @@ void RGWPSDeleteSubOp::execute()
     return;
   }
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->remove_sub(sub_name, topic_name);
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->unsubscribe(topic_name);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
     return;
@@ -516,7 +463,8 @@ void RGWPSAckSubEventOp::execute()
     return;
   }
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->remove_event(sub_name, event_id);
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->remove_event(event_id);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to remove event, ret=" << op_ret << dendl;
     return;
@@ -547,7 +495,7 @@ protected:
   string sub_name;
   string marker;
   std::unique_ptr<RGWUserPubSub> ups;
-  RGWUserPubSub::list_events_result result;
+  RGWUserPubSub::Sub::list_events_result result;
 
 public:
   RGWPSPullSubEventsOp() {}
@@ -573,7 +521,8 @@ void RGWPSPullSubEventsOp::execute()
     return;
   }
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->list_events(sub_name, marker, max_entries, &result);
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->list_events(marker, max_entries, &result);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
     return;
@@ -657,6 +606,288 @@ public:
 };
 
 
+static int notif_bucket_path(const string& path, string *bucket_name)
+{
+  if (path.empty()) {
+    return -EINVAL;
+  }
+  size_t pos = path.find('/');
+  if (pos  == string::npos) {
+    return -EINVAL;
+  }
+  if (pos >= path.size()) {
+    return -EINVAL;
+  }
+
+  string type = path.substr(0, pos);
+  if (type != "bucket") {
+    return -EINVAL;
+  }
+
+  *bucket_name = path.substr(pos + 1);
+  return 0;
+}
+
+class RGWPSCreateNotifOp : public RGWOp {
+protected:
+  std::unique_ptr<RGWUserPubSub> ups;
+  string topic_name;
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+
+public:
+  RGWPSCreateNotifOp() {}
+
+  int verify_permission() override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+                                 bucket_info, nullptr, nullptr);
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_info.owner != s->owner.get_id()) {
+      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      return -EPERM;
+    }
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_notification_create"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSCreateNotifOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  op_ret = b->create_notification(topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+public:
+  explicit RGWPSCreateNotif_ObjStore_S3() {}
+
+  int get_params() override {
+    bool exists;
+    topic_name = s->info.args.get("topic", &exists);
+    if (!exists) {
+      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+      return -EINVAL;
+    }
+    return notif_bucket_path(s->object.name, &bucket_name);
+  }
+};
+
+class RGWPSDeleteNotifOp : public RGWOp {
+protected:
+  std::unique_ptr<RGWUserPubSub> ups;
+  string topic_name;
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+
+public:
+  RGWPSDeleteNotifOp() {}
+
+  int verify_permission() override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+                                 bucket_info, nullptr, nullptr);
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_info.owner != s->owner.get_id()) {
+      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      return -EPERM;
+    }
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_notification_delete"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSDeleteNotifOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  op_ret = b->remove_notification(topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+public:
+  explicit RGWPSDeleteNotif_ObjStore_S3() {}
+
+  int get_params() override {
+    bool exists;
+    topic_name = s->info.args.get("topic", &exists);
+    if (!exists) {
+      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+      return -EINVAL;
+    }
+    return notif_bucket_path(s->object.name, &bucket_name);
+  }
+};
+
+class RGWPSListNotifsOp : public RGWOp {
+protected:
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+  std::unique_ptr<RGWUserPubSub> ups;
+  rgw_pubsub_bucket_topics result;
+
+
+public:
+  RGWPSListNotifsOp() {}
+
+  int verify_permission() override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+                                 bucket_info, nullptr, nullptr);
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_info.owner != s->owner.get_id()) {
+      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      return -EPERM;
+    }
+
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_notifications_list"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSListNotifsOp::execute()
+{
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  op_ret = b->get_topics(&result);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+
+}
+
+class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+public:
+  explicit RGWPSListNotifs_ObjStore_S3() {}
+
+  int get_params() override {
+    return notif_bucket_path(s->object.name, &bucket_name);
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    encode_json("result", result, s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+  int init_permissions(RGWOp* op) override {
+    return 0;
+  }
+
+  int read_permissions(RGWOp* op) override {
+    return 0;
+  }
+  bool supports_quota() override {
+    return false;
+  }
+  RGWOp *op_get() override {
+    if (s->object.empty()) {
+      return nullptr;
+    }
+    return new RGWPSListNotifs_ObjStore_S3();
+  }
+  RGWOp *op_put() override {
+    if (!s->object.empty()) {
+      return new RGWPSCreateNotif_ObjStore_S3();
+    }
+    return nullptr;
+  }
+  RGWOp *op_delete() override {
+    if (!s->object.empty()) {
+      return new RGWPSDeleteNotif_ObjStore_S3();
+    }
+    return nullptr;
+  }
+public:
+  explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSNotifs_S3() {}
+};
+
+
 RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
                                                      const rgw::auth::StrategyRegistry& auth_registry,
                                                      const std::string& frontend_prefix)
@@ -678,6 +909,10 @@ RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
     handler = new RGWHandler_REST_PSSub_S3(auth_registry);
   }
 
+  if (s->init_state.url_bucket == "notifications") {
+    handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
+  }
+
   ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
   return handler;
 }