]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: allow pubsub REST API on master
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 8 Jul 2019 20:00:06 +0000 (23:00 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Sep 2019 15:54:05 +0000 (18:54 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
31 files changed:
src/common/options.cc
src/mrun
src/rgw/CMakeLists.txt
src/rgw/rgw_basic_types.cc
src/rgw/rgw_basic_types.h
src/rgw/rgw_bucket.cc
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_file.h
src/rgw/rgw_main.cc
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_quota.cc
src/rgw/rgw_rest.h
src/rgw/rgw_rest_iam.cc
src/rgw/rgw_rest_pubsub.cc [new file with mode: 0644]
src/rgw/rgw_rest_pubsub.h [new file with mode: 0644]
src/rgw/rgw_rest_pubsub_common.cc [new file with mode: 0644]
src/rgw/rgw_rest_pubsub_common.h [new file with mode: 0644]
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/rgw/rgw_rest_sts.cc
src/rgw/rgw_swift_auth.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/rgw/rgw_sync_module_pubsub_rest.h
src/rgw/services/svc_user_rados.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py
src/test/rgw/test_multi.py
src/test/rgw/test_rgw_xml.cc

index c43dac9aeb61f29951fd4b1d9ab455de93f4001a..3438c6f53c34e3328e3d19cca9de2523ef9700d1 100644 (file)
@@ -5716,7 +5716,7 @@ std::vector<Option> get_rgw_options() {
         "will be located in the path that is specified here. "),
 
     Option("rgw_enable_apis", Option::TYPE_STR, Option::LEVEL_ADVANCED)
-    .set_default("s3, s3website, swift, swift_auth, admin, sts, iam")
+    .set_default("s3, s3website, swift, swift_auth, admin, sts, iam, pubsub")
     .set_description("A list of set of RESTful APIs that rgw handles."),
 
     Option("rgw_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
index 5f2d4646ffb4207717938661c0608aac94ca9992..3122342e1bc73621ffa1e6d21bc09fb8cd532f59 100755 (executable)
--- a/src/mrun
+++ b/src/mrun
@@ -21,4 +21,10 @@ fi
 
 shift 2
 
-$CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+if [ "$RGW_VALGRIND" == 'yes' ] && [ $command == 'radosgw' ]; then
+    valgrind --trace-children=yes --tool=memcheck --max-threads=1024 $CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+    sleep 10
+else
+    $CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+fi
+
index c891c4a3fdf865a9277519b90d1a32df10edcac9..57901ce3f6b050061fc9ff46d9e0df7a7c91ae19 100644 (file)
@@ -113,6 +113,8 @@ set(librgw_common_srcs
   rgw_rest_conn.cc
   rgw_rest_log.cc
   rgw_rest_metadata.cc
+  rgw_rest_pubsub.cc
+  rgw_rest_pubsub_common.cc
   rgw_rest_realm.cc
   rgw_rest_role.cc
   rgw_rest_s3.cc
index ef89b223f051223bcd9e3a181fbd4fbe41ee0c04..fbeb88d6f746c4f58b521789d24a7f1c5ffb8fb7 100644 (file)
@@ -6,6 +6,7 @@
 #include <string>
 
 #include "rgw_basic_types.h"
+#include "rgw_xml.h"
 #include "common/ceph_json.h"
 
 using std::string;
@@ -13,14 +14,17 @@ using std::stringstream;
 
 void decode_json_obj(rgw_user& val, JSONObj *obj)
 {
-  string s = obj->get_data();
-  val.from_str(s);
+  val.from_str(obj->get_data());
 }
 
 void encode_json(const char *name, const rgw_user& val, Formatter *f)
 {
-  string s = val.to_str();
-  f->dump_string(name, s);
+  f->dump_string(name, val.to_str());
+}
+
+void encode_xml(const char *name, const rgw_user& val, Formatter *f)
+{
+  encode_xml(name, val.to_str(), f);
 }
 
 namespace rgw {
index 18630ed2b017a5f45a98b3fe97cb15f90a63ea29..e0a3458d0a57c39b9e357fdccd81c020c1fa802e 100644 (file)
@@ -13,8 +13,7 @@ struct rgw_user {
   std::string id;
 
   rgw_user() {}
-  // cppcheck-suppress noExplicitConstructor
-  rgw_user(const std::string& s) {
+  explicit rgw_user(const std::string& s) {
     from_str(s);
   }
   rgw_user(const std::string& tenant, const std::string& id)
@@ -203,6 +202,7 @@ class JSONObj;
 
 void decode_json_obj(rgw_user& val, JSONObj *obj);
 void encode_json(const char *name, const rgw_user& val, Formatter *f);
+void encode_xml(const char *name, const rgw_user& val, Formatter *f);
 
 inline ostream& operator<<(ostream& out, const rgw_user &u) {
   string s;
index c7b65fef57832e5b8a23ac02f3252b2afbeb354f..e6d80eeb4a5d51d16c8e9feef41e55018f80e7eb 100644 (file)
@@ -1389,7 +1389,7 @@ int RGWBucketAdminOp::limit_check(RGWRadosStore *store,
     do {
       RGWUserBuckets buckets;
 
-      ret = rgw_read_user_buckets(store, user_id, buckets,
+      ret = rgw_read_user_buckets(store, rgw_user(user_id), buckets,
                                  marker, string(), max_entries, false,
                                  &is_truncated);
       if (ret < 0)
index b7ffea42b8f6f93b47d3f4813f76467378adabb5..b9b29f4e11dfabe6fb4de843b5f541bde7d2d88b 100644 (file)
@@ -589,7 +589,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
 
   std::optional<uint64_t> bytes_transferred;
   int r = store->getRados()->fetch_remote_obj(obj_ctx,
-                       user_id,
+                       rgw_user(user_id),
                        NULL, /* req_info */
                        source_zone,
                        dest_obj,
@@ -646,7 +646,7 @@ int RGWAsyncStatRemoteObj::_send_request()
   rgw_obj dest_obj(src_obj);
 
   int r = store->getRados()->stat_remote_obj(obj_ctx,
-                       user_id,
+                       rgw_user(user_id),
                        nullptr, /* req_info */
                        source_zone,
                        src_obj,
@@ -722,7 +722,7 @@ int RGWAsyncRemoveObj::_send_request()
   }
   del_op.params.olh_epoch = versioned_epoch;
   del_op.params.marker_version_id = marker_version_id;
-  del_op.params.obj_owner.set_id(owner);
+  del_op.params.obj_owner.set_id(rgw_user(owner));
   del_op.params.obj_owner.set_name(owner_display_name);
   del_op.params.mtime = timestamp;
   del_op.params.high_precision_time = true;
index f3d02640b630ece37b098e21e9531f004a831a0d..12bca8fab0042d42c467fdb723476f59abccdb14 100644 (file)
@@ -992,7 +992,7 @@ namespace rgw {
        }
        if (token.valid() && (ldh->auth(token.id, token.key) == 0)) {
          /* try to store user if it doesn't already exist */
-         if (store->ctl()->user->get_info_by_uid(token.id, &user, null_yield) < 0) {
+         if (store->ctl()->user->get_info_by_uid(rgw_user(token.id), &user, null_yield) < 0) {
            int ret = store->ctl()->user->store_info(user, null_yield,
                                                   RGWUserCtl::PutParams()
                                                   .set_exclusive(true));
index 0667b14ad142242d5f17d447a96769677f2562ce..bc0363bdf0795d5adf12424e6d5e91df773dc98c 100644 (file)
@@ -353,12 +353,13 @@ int main(int argc, const char **argv)
   const bool s3website_enabled = apis_map.count("s3website") > 0;
   const bool sts_enabled = apis_map.count("sts") > 0;
   const bool iam_enabled = apis_map.count("iam") > 0;
+  const bool pubsub_enabled = apis_map.count("pubsub") > 0;
   // Swift API entrypoint could placed in the root instead of S3
   const bool swift_at_root = g_conf()->rgw_swift_url_prefix == "/";
   if (apis_map.count("s3") > 0 || s3website_enabled) {
     if (! swift_at_root) {
       rest.register_default_mgr(set_logging(rest_filter(store->getRados(), RGW_REST_S3,
-                                                        new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled))));
+                                                        new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled, pubsub_enabled))));
     } else {
       derr << "Cannot have the S3 or S3 Website enabled together with "
            << "Swift API placed in the root of hierarchy" << dendl;
index cb96094f92a99fb5640a3a05c19f0994ce239dcb..68a6e32489269c76595131e5e0029bb3906dca87 100644 (file)
@@ -2887,7 +2887,7 @@ static int forward_request_to_master(struct req_state *s, obj_version *objv,
   bufferlist response;
   string uid_str = s->user->user_id.to_str();
 #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
-  int ret = store->svc()->zone->get_master_conn()->forward(uid_str, (forward_info ? *forward_info : s->info),
+  int ret = store->svc()->zone->get_master_conn()->forward(rgw_user(uid_str), (forward_info ? *forward_info : s->info),
                                                         objv, MAX_REST_RESPONSE, &in_data, &response);
   if (ret < 0)
     return ret;
index 1032b2b7dfa5e22222f31023439fcd8541788b91..389a3c84e91ede1e158c822bcecc3f62c7073710 100644 (file)
@@ -116,6 +116,14 @@ void rgw_pubsub_topic::dump(Formatter *f) const
   encode_json("arn", arn, f);
 }
 
+void rgw_pubsub_topic::dump_xml(Formatter *f) const
+{
+  encode_xml("User", user, f);
+  encode_xml("Name", name, f);
+  encode_xml("EndPoint", dest, f);
+  encode_xml("TopicArn", arn, f);
+}
+
 void rgw_pubsub_topic_filter::dump(Formatter *f) const
 {
   encode_json("topic", topic, f);
@@ -144,6 +152,13 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const
   }
 }
 
+void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+{
+  for (auto& t : topics) {
+    encode_xml("member", t.second.topic, f);
+  }
+}
+
 void rgw_pubsub_sub_dest::dump(Formatter *f) const
 {
   encode_json("bucket_name", bucket_name, f);
@@ -153,6 +168,13 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const
   encode_json("arn_topic", arn_topic, f);
 }
 
+void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
+{
+  encode_xml("EndpointAddress", push_endpoint, f);
+  encode_xml("EndpointArgs", push_endpoint_args, f);
+  encode_xml("TopicArn", arn_topic, f);
+}
+
 void rgw_pubsub_sub_config::dump(Formatter *f) const
 {
   encode_json("user", user, f);
index 3d0b824376a98d84426e83639bc7922119a6ef57..7d4760adef42da7abcb9fe30842f8bf148bc0d28 100644 (file)
@@ -257,6 +257,7 @@ struct rgw_pubsub_sub_dest {
   }
 
   void dump(Formatter *f) const;
+  void dump_xml(Formatter *f) const;
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
 
@@ -325,6 +326,7 @@ struct rgw_pubsub_topic {
   }
 
   void dump(Formatter *f) const;
+  void dump_xml(Formatter *f) const;
 
   bool operator<(const rgw_pubsub_topic& t) const {
     return to_str().compare(t.to_str());
@@ -413,6 +415,7 @@ struct rgw_pubsub_user_topics {
   }
 
   void dump(Formatter *f) const;
+  void dump_xml(Formatter *f) const;
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
 
index 110aa996fb47d43299112666b37231dd023b0e2a..07f2f4cc4ec4732894913a582a8dd1f0bffbc419 100644 (file)
@@ -652,7 +652,7 @@ int RGWUserStatsCache::sync_user(const rgw_user& user)
   ceph::real_time last_stats_sync;
   ceph::real_time last_stats_update;
 
-  int ret = store->ctl()->user->read_stats(user_str, &stats, &last_stats_sync, &last_stats_update);
+  int ret = store->ctl()->user->read_stats(rgw_user(user_str), &stats, &last_stats_sync, &last_stats_update);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
     return ret;
index a11584a23e0d96d4802a98d5c12559465bcc5646..64d438818f5e833b14f093c5a2a2b7492d6c8aa3 100644 (file)
@@ -553,10 +553,10 @@ protected:
   virtual RGWOp *op_copy() { return NULL; }
   virtual RGWOp *op_options() { return NULL; }
 
+public:
   static int allocate_formatter(struct req_state *s, int default_formatter,
                                bool configurable);
 
-public:
   static constexpr int MAX_BUCKET_NAME_LEN = 255;
   static constexpr int MAX_OBJ_NAME_LEN = 1024;
 
index 3c1063d3431a3e28428c345e0ff1b1dfec8ac211..b884d05b74d835ede4151b19fe9eec52850cbe5b 100644 (file)
@@ -26,12 +26,13 @@ void RGWHandler_REST_IAM::rgw_iam_parse_input()
       for (const auto& t : tokens) {
         auto pos = t.find("=");
         if (pos != string::npos) {
-          std::string key = t.substr(0, pos);
-          std::string value = t.substr(pos + 1, t.size() - 1);
-          if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
-            value = url_decode(value);
+          const auto key = t.substr(0, pos);
+          if (key == "Action") {
+            s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+          } else if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
+            const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+            s->info.args.append(key, value);
           }
-          s->info.args.append(key, value);
         }
       }
     }
diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc
new file mode 100644 (file)
index 0000000..1ff5099
--- /dev/null
@@ -0,0 +1,756 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include <boost/tokenizer.hpp>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_pubsub.h"
+#include "rgw_sync_module_pubsub.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+#include "rgw_arn.h"
+#include "rgw_auth_s3.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+// command (AWS compliant): 
+// POST
+// Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
+class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
+public:
+  int get_params() override {
+    topic_name = s->info.args.get("Name");
+    if (topic_name.empty()) {
+        ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+        return -EINVAL;
+    }
+
+    dest.push_endpoint = s->info.args.get("push-endpoint");
+    for (const auto param : s->info.args.get_params()) {
+        if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
+            continue;
+        }
+        dest.push_endpoint_args.append(param.first+"="+param.second+"&");
+    }
+
+    if (!dest.push_endpoint_args.empty()) {
+        // remove last separator
+        dest.push_endpoint_args.pop_back();
+    }
+    
+    // dest object only stores endpoint info
+    // bucket to store events/records will be set only when subscription is created
+    dest.bucket_name = "";
+    dest.oid_prefix = "";
+    dest.arn_topic = topic_name;
+    // the topic ARN will be sent in the reply
+    const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
+        store->svc()->zone->get_zonegroup().get_name(),
+        s->user->user_id.tenant, topic_name);
+    topic_arn = arn.to_string();
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    {
+      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+      f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+      f->open_object_section("CreateTopicResult");
+      encode_xml("TopicArn", topic_arn, f); 
+      f->close_section();
+      f->open_object_section("ResponseMetadata");
+      encode_xml("RequestId", s->req_id, f); 
+      f->close_section();
+      f->close_section();
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+// command (AWS compliant): 
+// POST 
+// Action=ListTopics
+class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+public:
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    {
+      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+      f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+      f->open_object_section("ListTopicsResult");
+      encode_xml("Topics", result, f); 
+      f->close_section();
+      f->open_object_section("ResponseMetadata");
+      encode_xml("RequestId", s->req_id, f); 
+      f->close_section();
+      f->close_section();
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+// command (extension to AWS): 
+// POST
+// Action=GetTopic&TopicArn=<topic-arn>
+class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
+public:
+  int get_params() override {
+    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+    if (!topic_arn || topic_arn->resource.empty()) {
+        ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+        return -EINVAL;
+    }
+
+    topic_name = topic_arn->resource;
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    {
+      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+      f->open_object_section("GetTopicResponse");
+      f->open_object_section("GetTopicResult");
+      encode_xml("Topic", result.topic, f); 
+      f->close_section();
+      f->open_object_section("ResponseMetadata");
+      encode_xml("RequestId", s->req_id, f); 
+      f->close_section();
+      f->close_section();
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+// command (AWS compliant): 
+// POST
+// Action=DeleteTopic&TopicArn=<topic-arn>
+class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
+public:
+  int get_params() override {
+    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+    if (!topic_arn || topic_arn->resource.empty()) {
+        ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+        return -EINVAL;
+    }
+
+    topic_name = topic_arn->resource;
+    return 0;
+  }
+  
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    {
+      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+      f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+      f->open_object_section("ResponseMetadata");
+      encode_xml("RequestId", s->req_id, f); 
+      f->close_section();
+      f->close_section();
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+namespace {
+// utility classes and functions for handling parameters with the following format:
+// Attributes.entry.{N}.{key|value}={VALUE}
+// N - any unsigned number
+// VALUE - url encoded string
+
+// and Attribute is holding key and value
+// ctor and set are done according to the "type" argument
+// if type is not "key" or "value" its a no-op
+class Attribute {
+    std::string key;
+    std::string value;
+public:
+    Attribute(const std::string& type, const std::string& key_or_value) {
+        set(type, key_or_value);
+    }
+    void set(const std::string& type, const std::string& key_or_value) {
+        if (type == "key") {
+            key = key_or_value;
+        } else if (type == "value") {
+            value = key_or_value;
+        }
+    }
+    const std::string& get_key() const { return key; }
+    const std::string& get_value() const { return value; }
+};
+
+using AttributeMap = std::map<unsigned, Attribute>;
+
+// aggregate the attributes into a map
+// the key and value are associated by the index (N)
+// no assumptions are made on the order in which these parameters are added
+void update_attribute_map(const std::string& input, AttributeMap& map) {
+  const boost::char_separator<char> sep(".");
+  const boost::tokenizer tokens(input, sep);
+  auto token = tokens.begin();
+  if (*token != "Attributes") {
+      return;
+  }
+  ++token;
+
+  if (*token != "entry") {
+      return;
+  }
+  ++token;
+
+  unsigned idx;
+  try {
+    idx = std::stoul(*token);
+  } catch (const std::invalid_argument&) {
+    return;
+  }
+  ++token;
+
+  std::string key_or_value = "";
+  // get the rest of the string regardless of dots
+  // this is to allow dots in the value
+  while (token != tokens.end()) {
+    key_or_value.append(*token+".");
+    ++token;
+  }
+  // remove last separator
+  key_or_value.pop_back();
+
+  auto pos = key_or_value.find("=");
+  if (pos != string::npos) {
+    const auto key_or_value_lhs = key_or_value.substr(0, pos);
+    const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
+    const auto map_it = map.find(idx);
+    if (map_it == map.end()) {
+      // new entry
+      map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
+    } else {
+      // existing entry
+      map_it->second.set(key_or_value_lhs, key_or_value_rhs);
+    }
+  }
+}
+}
+
+void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
+  if (post_body.size() > 0) {
+    ldout(s->cct, 10) << "Content of POST: " << post_body << dendl;
+
+    if (post_body.find("Action") != string::npos) {
+      const boost::char_separator<char> sep("&");
+      const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
+      AttributeMap map;
+      for (const auto& t : tokens) {
+        auto pos = t.find("=");
+        if (pos != string::npos) {
+          const auto key = t.substr(0, pos);
+          if (key == "Action") {
+            s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+          } else if (key == "Name" || key == "TopicArn") {
+            const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+            s->info.args.append(key, value);
+          } else {
+            update_attribute_map(t, map);
+          }
+        }
+      }
+      // update the regular args with the content of the attribute map
+      for (const auto attr : map) {
+          s->info.args.append(attr.second.get_key(), attr.second.get_value());
+      }
+    }
+    const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
+    s->info.args.append("PayloadHash", payload_hash);
+  }
+}
+
+RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
+  rgw_topic_parse_input();
+
+  if (s->info.args.exists("Action")) {
+    const auto action = s->info.args.get("Action");
+    if (action.compare("CreateTopic") == 0)
+      return new RGWPSCreateTopic_ObjStore_AWS();
+    if (action.compare("DeleteTopic") == 0)
+      return new RGWPSDeleteTopic_ObjStore_AWS;
+    if (action.compare("ListTopics") == 0)
+      return new RGWPSListTopics_ObjStore_AWS();
+    if (action.compare("GetTopic") == 0)
+      return new RGWPSGetTopic_ObjStore_AWS();
+  }
+
+  return nullptr;
+}
+
+int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) {
+  /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) {
+      // TODO: some topic specific authorization
+      return 0;
+  }*/
+  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
+}
+
+namespace {
+// conversion functions between S3 and GCP style event names
+std::string s3_to_gcp_event(const std::string& event) {
+  if (event == "s3:ObjectCreated:*") {
+    return "OBJECT_CREATE";
+  }
+  if (event == "s3:ObjectRemoved:*") {
+    return "OBJECT_DELETE";
+  }
+  return "UNKNOWN_EVENT";
+}
+std::string gcp_to_s3_event(const std::string& event) {
+  if (event == "OBJECT_CREATE") {
+    return "s3:ObjectCreated:";
+  }
+  if (event == "OBJECT_DELETE") {
+    return "s3:ObjectRemoved:";
+  }
+  return "UNKNOWN_EVENT";
+}
+
+// return a unique topic by prefexing with the notification name: <notification>_<topic>
+std::string topic_to_unique(const std::string& topic, const std::string& notification) {
+  return notification + "_" + topic;
+}
+
+// extract the topic from a unique topic of the form: <notification>_<topic>
+[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
+  if (unique_topic.find(notification + "_") == string::npos) {
+    return "";
+  }
+  return unique_topic.substr(notification.length() + 1);
+}
+}
+
+// command (S3 compliant): PUT /<bucket name>?notification
+// a "notification" and a subscription will be auto-generated
+// actual configuration is XML encoded in the body of the message
+class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+  rgw_pubsub_s3_notifications configurations;
+
+  int get_params_from_body() {
+    const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+    int r;
+    bufferlist data;
+    std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+
+    if (r < 0) {
+      ldout(s->cct, 1) << "failed to read XML payload" << dendl;
+      return r;
+    }
+    if (data.length() == 0) {
+      ldout(s->cct, 1) << "XML payload missing" << dendl;
+      return -EINVAL;
+    }
+
+    RGWXMLDecoder::XMLParser parser;
+
+    if (!parser.init()){
+      ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
+      return -EINVAL;
+    }
+    if (!parser.parse(data.c_str(), data.length(), 1)) {
+      ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
+      return -ERR_MALFORMED_XML;
+    }
+    try {
+      // NotificationConfigurations is mandatory
+      RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
+    } catch (RGWXMLDecoder::err& err) {
+      ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
+      return -ERR_MALFORMED_XML;
+    }
+    return 0;
+  }
+
+  int get_params() override {
+    bool exists;
+    const auto no_value = s->info.args.get("notification", &exists);
+    if (!exists) {
+      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      return -EINVAL;
+    } 
+    if (no_value.length() > 0) {
+      ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
+      return -EINVAL;
+    }
+    if (s->bucket_name.empty()) {
+      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      return -EINVAL;
+    }
+    bucket_name = s->bucket_name;
+    return 0;
+  }
+
+public:
+  const char* name() const override { return "pubsub_notification_create_s3"; }
+  void execute() override;
+};
+
+void RGWPSCreateNotif_ObjStore_S3::execute() {
+  op_ret = get_params_from_body();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups.emplace(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  ceph_assert(b);
+  std::string data_bucket_prefix = "";
+  std::string data_oid_prefix = "";
+  if (store->getRados()->get_sync_module()) {
+    const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
+    if (psmodule) {
+        const auto& conf = psmodule->get_effective_conf();
+        data_bucket_prefix = conf["data_bucket_prefix"];
+        data_oid_prefix = conf["data_oid_prefix"];
+    }
+  }
+
+  for (const auto& c : configurations.list) {
+    const auto& sub_name = c.id;
+    if (sub_name.empty()) {
+      ldout(s->cct, 1) << "missing notification id" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+    if (c.topic_arn.empty()) {
+      ldout(s->cct, 1) << "missing topic ARN" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+
+    const auto arn = rgw::ARN::parse(c.topic_arn);
+    if (!arn || arn->resource.empty()) {
+      ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+
+    const auto topic_name = arn->resource;
+
+    // get topic information. destination information is stored in the topic
+    rgw_pubsub_topic_subs topic_info;  
+    op_ret = ups->get_topic(topic_name, &topic_info);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+      return;
+    }
+    // make sure that full topic configuration match
+    // TODO: use ARN match function
+    
+    // create unique topic name. this has 2 reasons:
+    // (1) topics cannot be shared between different S3 notifications because they hold the filter information
+    // (2) make topic clneaup easier, when notification is removed
+    const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
+    // generate the internal topic, no need to store destination info in thr unique topic
+    // ARN is cached to make the "GET" method faster
+    op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name << 
+        "', ret=" << op_ret << dendl;
+      return;
+    }
+    // generate the notification
+    EventTypeList events;
+    std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
+    op_ret = b->create_notification(unique_topic_name, events);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
+        "', ret=" << op_ret << dendl;
+      // rollback generated topic (ignore return value)
+      ups->remove_topic(unique_topic_name);
+      return;
+    }
+   
+    // generate the subscription with destination information from the original topic
+    rgw_pubsub_sub_dest dest = topic_info.topic.dest;
+    dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
+    dest.oid_prefix = data_oid_prefix + sub_name + "/";
+    auto sub = ups->get_sub(sub_name);
+    op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
+      // rollback generated notification (ignore return value)
+      b->remove_notification(unique_topic_name);
+      // rollback generated topic (ignore return value)
+      ups->remove_topic(unique_topic_name);
+      return;
+    }
+    ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
+  }
+}
+
+// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
+class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
+private:
+  std::string sub_name;
+
+  int get_params() override {
+    bool exists;
+    sub_name = s->info.args.get("notification", &exists);
+    if (!exists) {
+      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      return -EINVAL;
+    } 
+    if (s->bucket_name.empty()) {
+      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      return -EINVAL;
+    }
+    bucket_name = s->bucket_name;
+    return 0;
+  }
+
+  void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
+    auto sub = ups->get_sub(_sub_name);
+    rgw_pubsub_sub_config sub_conf;
+    op_ret = sub->get_conf(&sub_conf);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+      return;
+    }
+    if (sub_conf.s3_id.empty()) {
+      if (must_delete) {
+        op_ret = -ENOENT;
+        ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+      }
+      return;
+    }
+    const auto& sub_topic_name = sub_conf.topic;
+    op_ret = sub->unsubscribe(sub_topic_name);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
+      return;
+    }
+    op_ret = b->remove_notification(sub_topic_name);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
+    }
+    op_ret = ups->remove_topic(sub_topic_name);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
+    }
+    return;
+  }
+
+public:
+  void execute() override;
+  const char* name() const override { return "pubsub_notification_delete_s3"; }
+};
+
+void RGWPSDeleteNotif_ObjStore_S3::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups.emplace(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  ceph_assert(b);
+
+  if (!sub_name.empty()) {
+    // delete a specific notification
+    delete_notification(sub_name, b, true);
+    return;
+  }
+
+  // delete all notifications on a bucket
+  rgw_pubsub_bucket_topics bucket_topics;
+  b->get_topics(&bucket_topics);
+  // loop through all topics of the bucket
+  for (const auto& topic : bucket_topics.topics) {
+    // for each topic get all subscriptions
+    rgw_pubsub_topic_subs topic_subs;
+    ups->get_topic(topic.first, &topic_subs);
+    // loop through all subscriptions
+    for (const auto& topic_sub_name : topic_subs.subs) {
+      delete_notification(topic_sub_name, b, false);
+    }
+  }
+}
+
+// command (S3 compliant): GET /bucket?notification[=<notification-id>]
+class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+private:
+  std::string sub_name;
+  rgw_pubsub_s3_notifications notifications;
+
+  int get_params() override {
+    bool exists;
+    sub_name = s->info.args.get("notification", &exists);
+    if (!exists) {
+      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      return -EINVAL;
+    } 
+    if (s->bucket_name.empty()) {
+      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      return -EINVAL;
+    }
+    bucket_name = s->bucket_name;
+    return 0;
+  }
+
+  void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
+      const EventTypeList& events,
+      const std::string& topic_arn);  
+
+public:
+  void execute() override;
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+    notifications.dump_xml(s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+  const char* name() const override { return "pubsub_notifications_get_s3"; }
+};
+
+void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
+    const EventTypeList& events, 
+    const std::string& topic_arn) { 
+    rgw_pubsub_s3_notification notification;
+    notification.id = sub_conf.s3_id;
+    notification.topic_arn = topic_arn,
+    std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
+    notifications.list.push_back(notification);
+}
+
+void RGWPSListNotifs_ObjStore_S3::execute() {
+  ups.emplace(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  ceph_assert(b);
+  if (!sub_name.empty()) {
+    // get info of a specific notification
+    auto sub = ups->get_sub(sub_name);
+    rgw_pubsub_sub_config sub_conf;
+    op_ret = sub->get_conf(&sub_conf);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+      return;
+    }
+    if (sub_conf.s3_id.empty()) {
+      op_ret = -ENOENT;
+      ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+      return;
+    }
+    rgw_pubsub_bucket_topics bucket_topics;
+    op_ret = b->get_topics(&bucket_topics);
+    if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+      return;
+    }
+    const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
+    if (topic_it == bucket_topics.topics.end()) {
+      op_ret = -ENOENT;
+      ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
+      return;
+    }
+    add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
+    return;
+  }
+  // get info on all s3 notifications of the bucket
+  rgw_pubsub_bucket_topics bucket_topics;
+  b->get_topics(&bucket_topics);
+  // loop through all topics of the bucket
+  for (const auto& topic : bucket_topics.topics) {
+    // for each topic get all subscriptions
+    rgw_pubsub_topic_subs topic_subs;
+    ups->get_topic(topic.first, &topic_subs);
+    const auto& events = topic.second.events;
+    const auto& topic_arn = topic.second.topic.arn;
+    // loop through all subscriptions
+    for (const auto& topic_sub_name : topic_subs.subs) {
+      // get info of a specific notification
+      auto sub = ups->get_sub(topic_sub_name);
+      rgw_pubsub_sub_config sub_conf;
+      op_ret = sub->get_conf(&sub_conf);
+      if (op_ret < 0) {
+        ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+        return;
+      }
+      if (sub_conf.s3_id.empty()) {
+        // not an s3 notification
+        continue;
+      }
+      add_notification_to_list(sub_conf, events, topic_arn);
+    }
+  }
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
+  return new RGWPSListNotifs_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
+  return new RGWPSCreateNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
+  return new RGWPSDeleteNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
+    return new RGWPSListNotifs_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
+  return new RGWPSCreateNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
+  return new RGWPSDeleteNotif_ObjStore_S3();
+}
+
diff --git a/src/rgw/rgw_rest_pubsub.h b/src/rgw/rgw_rest_pubsub.h
new file mode 100644 (file)
index 0000000..f2f6335
--- /dev/null
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include "rgw_rest_s3.h"
+
+// s3 compliant notification handler factory
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+  int init_permissions(RGWOp* op) override {return 0;}
+  int read_permissions(RGWOp* op) override {return 0;}
+  bool supports_quota() override {return false;}
+  RGWOp* op_get() override;
+  RGWOp* op_put() override;
+  RGWOp* op_delete() override;
+public:
+  using RGWHandler_REST_S3::RGWHandler_REST_S3;
+  virtual ~RGWHandler_REST_PSNotifs_S3() = default;
+  // following are used to generate the operations when invoked by another REST handler
+  static RGWOp* create_get_op();
+  static RGWOp* create_put_op();
+  static RGWOp* create_delete_op();
+};
+
+// AWS compliant topics handler factory
+class RGWHandler_REST_PSTopic_AWS : public RGWHandler_REST {
+  const rgw::auth::StrategyRegistry& auth_registry;
+  const std::string& post_body;
+  void rgw_topic_parse_input();
+  //static int init_from_header(struct req_state *s, int default_formatter, bool configurable_format);
+protected:
+  RGWOp* op_post() override;
+public:
+  RGWHandler_REST_PSTopic_AWS(const rgw::auth::StrategyRegistry& _auth_registry, const std::string& _post_body) : 
+      auth_registry(_auth_registry),
+      post_body(_post_body) {}
+  virtual ~RGWHandler_REST_PSTopic_AWS() = default;
+  int postauth_init() override { return 0; }
+  int authorize(const DoutPrefixProvider* dpp) override;
+};
+
diff --git a/src/rgw/rgw_rest_pubsub_common.cc b/src/rgw/rgw_rest_pubsub_common.cc
new file mode 100644 (file)
index 0000000..3b4c963
--- /dev/null
@@ -0,0 +1,204 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_rest_pubsub_common.h"
+#include "common/dout.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+void RGWPSCreateTopicOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups.emplace(store, s->owner.get_id());
+  op_ret = ups->create_topic(topic_name, dest, topic_arn);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSListTopicsOp::execute() {
+  ups.emplace(store, s->owner.get_id());
+  op_ret = ups->get_user_topics(&result);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully got topics" << dendl;
+}
+
+void RGWPSGetTopicOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  op_ret = ups->get_topic(topic_name, &result);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSDeleteTopicOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  op_ret = ups->remove_topic(topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSCreateSubOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->subscribe(topic_name, dest);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSGetSubOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->get_conf(&result);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSDeleteSubOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  auto sub = ups->get_sub(sub_name);
+  op_ret = sub->unsubscribe(topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSAckSubEventOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  auto sub = ups->get_sub_with_events(sub_name);
+  op_ret = sub->remove_event(event_id);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSPullSubEventsOp::execute() {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups.emplace(store, s->owner.get_id());
+  sub = ups->get_sub_with_events(sub_name);
+  if (!sub) {
+    op_ret = -ENOENT;
+    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
+    return;
+  }
+  op_ret = sub->list_events(marker, max_entries);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
+}
+
+
+int RGWPSCreateNotifOp::verify_permission() {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  const auto& id = s->owner.get_id();
+
+  ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
+                               bucket_info, nullptr, null_yield, nullptr);
+  if (ret < 0) {
+    ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
+    return ret;
+  }
+
+  if (bucket_info.owner != id) {
+    ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
+    return -EPERM;
+  }
+  return 0;
+}
+
+int RGWPSDeleteNotifOp::verify_permission() {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+                               bucket_info, nullptr, null_yield, nullptr);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (bucket_info.owner != s->owner.get_id()) {
+    ldout(s->cct, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
+    return -EPERM;
+  }
+  return 0;
+}
+
+int RGWPSListNotifsOp::verify_permission() {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+                               bucket_info, nullptr, null_yield, nullptr);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (bucket_info.owner != s->owner.get_id()) {
+    ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
+    return -EPERM;
+  }
+
+  return 0;
+}
+
diff --git a/src/rgw/rgw_rest_pubsub_common.h b/src/rgw/rgw_rest_pubsub_common.h
new file mode 100644 (file)
index 0000000..6d78ce5
--- /dev/null
@@ -0,0 +1,281 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+#include <string>
+#include <optional>
+#include "rgw_op.h"
+#include "rgw_pubsub.h"
+
+// create a topic
+class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
+protected:
+  std::optional<RGWUserPubSub> ups;
+  std::string topic_name;
+  rgw_pubsub_sub_dest dest;
+  std::string topic_arn;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_create"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// list all topics
+class RGWPSListTopicsOp : public RGWOp {
+protected:
+  std::optional<RGWUserPubSub> ups;
+  rgw_pubsub_user_topics result;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topics_list"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// get topic information
+class RGWPSGetTopicOp : public RGWOp {
+protected:
+  std::string topic_name;
+  std::optional<RGWUserPubSub> ups;
+  rgw_pubsub_topic_subs result;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// delete a topic
+class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
+protected:
+  string topic_name;
+  std::optional<RGWUserPubSub> ups;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_delete"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// create a subscription
+class RGWPSCreateSubOp : public RGWDefaultResponseOp {
+protected:
+  std::string sub_name;
+  std::string topic_name;
+  std::optional<RGWUserPubSub> ups;
+  rgw_pubsub_sub_dest dest;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_create"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// get subscription information (including push-endpoint if exist)
+class RGWPSGetSubOp : public RGWOp {
+protected:
+  std::string sub_name;
+  std::optional<RGWUserPubSub> ups;
+  rgw_pubsub_sub_config result;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// delete subscription
+class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
+protected:
+  std::string sub_name;
+  std::string topic_name;
+  std::optional<RGWUserPubSub> ups;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_delete"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// acking of an event
+class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
+protected:
+  std::string sub_name;
+  std::string event_id;
+  std::optional<RGWUserPubSub> ups;
+  
+  virtual int get_params() = 0;
+
+public:
+  RGWPSAckSubEventOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_ack"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// fetching events from a subscription
+// dpending on whether the subscription was created via s3 compliant API or not
+// the matching events will be returned
+class RGWPSPullSubEventsOp : public RGWOp {
+protected:
+  int max_entries{0};
+  std::string sub_name;
+  std::string marker;
+  std::optional<RGWUserPubSub> ups;
+  RGWUserPubSub::SubRef sub; 
+
+  virtual int get_params() = 0;
+
+public:
+  RGWPSPullSubEventsOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_pull"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// notification creation
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+protected:
+  std::optional<RGWUserPubSub> ups;
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// delete a notification
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+protected:
+  std::optional<RGWUserPubSub> ups;
+  std::string bucket_name;
+  RGWBucketInfo bucket_info;
+  
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// get topics/notifications on a bucket
+class RGWPSListNotifsOp : public RGWOp {
+protected:
+  std::string bucket_name;
+  RGWBucketInfo bucket_info;
+  std::optional<RGWUserPubSub> ups;
+
+  virtual int get_params() = 0;
+
+public:
+  int verify_permission() override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
index 917a6a399b4502fa834d5172d990932b38bd25d3..2b9ca81a156ea441099e9bb6414a5f0cdfaf6529 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_rest_s3website.h"
+#include "rgw_rest_pubsub.h"
 #include "rgw_auth_s3.h"
 #include "rgw_acl.h"
 #include "rgw_policy_s3.h"
@@ -943,7 +944,7 @@ void RGWListBucket_ObjStore_S3::send_versioned_response()
         auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
         s->formatter->dump_string("StorageClass", storage_class.c_str());
       }
-      dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+      dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
       if (iter->meta.appendable) {
         s->formatter->dump_string("Type", "Appendable");
       } else {
@@ -1031,7 +1032,7 @@ if(op_ret < 0) {
         s->formatter->dump_int("Size", iter->meta.accounted_size);
         auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
         s->formatter->dump_string("StorageClass", storage_class.c_str());
-        dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+        dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
         if (s->system_request) {
           s->formatter->dump_string("RgwxTag", iter->tag);
       }
@@ -3583,12 +3584,16 @@ RGWOp *RGWHandler_REST_Service_S3::op_post()
 {
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
 
-  int ret = 0;
+  int ret;
   bufferlist data;
   std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
-  string post_body = data.to_str();
+  if (ret < 0) {
+      return nullptr;
+  }
+
+  const auto post_body = data.to_str();
 
-  if (this->isSTSenabled) {
+  if (isSTSEnabled) {
     RGWHandler_REST_STS sts_handler(auth_registry, post_body);
     sts_handler.init(store, s, s->cio);
     auto op = sts_handler.get_op(store);
@@ -3597,7 +3602,7 @@ RGWOp *RGWHandler_REST_Service_S3::op_post()
     }
   }
 
-  if (this->isIAMenabled) {
+  if (isIAMEnabled) {
     RGWHandler_REST_IAM iam_handler(auth_registry, post_body);
     iam_handler.init(store, s, s->cio);
     auto op = iam_handler.get_op(store);
@@ -3605,7 +3610,17 @@ RGWOp *RGWHandler_REST_Service_S3::op_post()
       return op;
     }
   }
-  return NULL;
+
+  if (isPSEnabled) {
+    RGWHandler_REST_PSTopic_AWS topic_handler(auth_registry, post_body);
+    topic_handler.init(store, s, s->cio);
+    auto op = topic_handler.get_op(store);
+    if (op) {
+      return op;
+    }
+  }
+
+  return nullptr;
 }
 
 RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
@@ -3666,6 +3681,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
     return new RGWGetBucketTags_ObjStore_S3;
   } else if (is_object_lock_op()) {
     return new RGWGetBucketObjectLock_ObjStore_S3;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_get_op();
   }
   return get_obj_op(true);
 }
@@ -3709,6 +3726,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
     return nullptr;
   } else if (is_object_lock_op()) {
     return new RGWPutBucketObjectLock_ObjStore_S3;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_put_op();
   }
   return new RGWCreateBucket_ObjStore_S3;
 }
@@ -3723,6 +3742,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
     return new RGWDeleteLC_ObjStore_S3;
   } else if(is_policy_op()) {
     return new RGWDeleteBucketPolicy;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_delete_op();
   }
 
   if (s->info.args.sub_resource_exists("website")) {
@@ -4132,9 +4153,9 @@ RGWHandler_REST* RGWRESTMgr_S3::get_handler(struct req_state* const s,
     }
   } else {
     if (s->init_state.url_bucket.empty()) {
-      handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam);
+      handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam, enable_pubsub);
     } else if (s->object.empty()) {
-      handler = new RGWHandler_REST_Bucket_S3(auth_registry);
+      handler = new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub);
     } else {
       handler = new RGWHandler_REST_Obj_S3(auth_registry);
     }
index 0ca98de412edeecd761d95b75276c917fdd912fd..97bd1db3ff0a7bba9b27cb57b45b21ff0af280bd 100644 (file)
@@ -603,8 +603,9 @@ public:
 
 class RGWHandler_REST_Service_S3 : public RGWHandler_REST_S3 {
 protected:
-    bool isSTSenabled;
-    bool isIAMenabled;
+    const bool isSTSEnabled;
+    const bool isIAMEnabled;
+    const bool isPSEnabled;
     bool is_usage_op() {
     return s->info.args.exists("usage");
   }
@@ -613,12 +614,13 @@ protected:
   RGWOp *op_post() override;
 public:
    RGWHandler_REST_Service_S3(const rgw::auth::StrategyRegistry& auth_registry,
-                              bool isSTSenabled, bool isIAMenabled) :
-      RGWHandler_REST_S3(auth_registry), isSTSenabled(isSTSenabled), isIAMenabled(isIAMenabled) {}
+                              bool _isSTSEnabled, bool _isIAMEnabled, bool _isPSEnabled) :
+      RGWHandler_REST_S3(auth_registry), isSTSEnabled(_isSTSEnabled), isIAMEnabled(_isIAMEnabled), isPSEnabled(_isPSEnabled) {}
   ~RGWHandler_REST_Service_S3() override = default;
 };
 
 class RGWHandler_REST_Bucket_S3 : public RGWHandler_REST_S3 {
+  const bool enable_pubsub;
 protected:
   bool is_acl_op() {
     return s->info.args.exists("acl");
@@ -649,6 +651,12 @@ protected:
   bool is_object_lock_op() {
     return s->info.args.exists("object-lock");
   }
+  bool is_notification_op() const {
+    if (enable_pubsub) {
+        return s->info.args.exists("notification");
+    }
+    return false;
+  }
   RGWOp *get_obj_op(bool get_data);
 
   RGWOp *op_get() override;
@@ -658,7 +666,8 @@ protected:
   RGWOp *op_post() override;
   RGWOp *op_options() override;
 public:
-  using RGWHandler_REST_S3::RGWHandler_REST_S3;
+  RGWHandler_REST_Bucket_S3(const rgw::auth::StrategyRegistry& auth_registry, bool _enable_pubsub) :
+      RGWHandler_REST_S3(auth_registry), enable_pubsub(_enable_pubsub) {}
   ~RGWHandler_REST_Bucket_S3() override = default;
 };
 
@@ -695,14 +704,16 @@ public:
 
 class RGWRESTMgr_S3 : public RGWRESTMgr {
 private:
-  bool enable_s3website;
-  bool enable_sts;
-  bool enable_iam;
-public:
-  explicit RGWRESTMgr_S3(bool enable_s3website = false, bool enable_sts = false, bool enable_iam = false)
-    : enable_s3website(enable_s3website),
-      enable_sts(enable_sts),
-      enable_iam(enable_iam) {
+  const bool enable_s3website;
+  const bool enable_sts;
+  const bool enable_iam;
+  const bool enable_pubsub;
+public:
+  explicit RGWRESTMgr_S3(bool _enable_s3website=false, bool _enable_sts=false, bool _enable_iam=false, bool _enable_pubsub=false)
+    : enable_s3website(_enable_s3website),
+      enable_sts(_enable_sts),
+      enable_iam(_enable_iam),
+      enable_pubsub(_enable_pubsub) {
   }
 
   ~RGWRESTMgr_S3() override = default;
index 73052e192324614660c96d1ec2dc60ef39c7745e..337c605e03d5c9b029c1b073f6ab17383bc0c3af 100644 (file)
@@ -353,15 +353,16 @@ void RGWHandler_REST_STS::rgw_sts_parse_input()
       for (const auto& t : tokens) {
         auto pos = t.find("=");
         if (pos != string::npos) {
-           std::string key = t.substr(0, pos);
-           std::string value = t.substr(pos + 1, t.size() - 1);
-           if (key == "RoleArn" || key == "Policy") {
-            value = url_decode(value);
-           }
-           s->info.args.append(key, value);
-         }
-       }
-    }
+          const auto key = t.substr(0, pos);
+          if (key == "Action") {
+            s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+          } else if (key == "RoleArn" || key == "Policy") {
+            const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+            s->info.args.append(key, value);
+          }
+        }
+      }
+    } 
   }
   auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
   s->info.args.append("PayloadHash", payload_hash);
index 8ec6a4da8a1f3a5a38d6b8fccb625dd0b9232264..6270ee67599563a330599ceb45eee1b57acaa92f 100644 (file)
@@ -196,7 +196,7 @@ class DefaultStrategy : public rgw::auth::Strategy,
                              acl_strategy_t&& extra_acl_strategy,
                              const rgw::auth::RemoteApplier::AuthInfo &info) const override {
     auto apl = \
-      rgw::auth::add_3rdparty(ctl, s->account_name,
+      rgw::auth::add_3rdparty(ctl, rgw_user(s->account_name),
         rgw::auth::add_sysreq(cct, ctl, s,
           rgw::auth::RemoteApplier(cct, ctl, std::move(extra_acl_strategy), info,
                                    implicit_tenant_context,
@@ -211,7 +211,7 @@ class DefaultStrategy : public rgw::auth::Strategy,
                             const std::string& subuser,
                             const boost::optional<uint32_t>& perm_mask) const override {
     auto apl = \
-      rgw::auth::add_3rdparty(ctl, s->account_name,
+      rgw::auth::add_3rdparty(ctl, rgw_user(s->account_name),
         rgw::auth::add_sysreq(cct, ctl, s,
           rgw::auth::LocalApplier(cct, user_info, subuser, perm_mask)));
     /* TODO(rzarzynski): replace with static_ptr. */
index 2bb0711bd801b3c2065e5e30b7656f1afadd8daa..440d085094c1cadcf36a02012285106abd35cd11 100644 (file)
@@ -1583,7 +1583,7 @@ RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *or
   if (dialect != RGW_REST_S3) {
     return orig;
   }
-  return new RGWRESTMgr_PubSub_S3(orig);
+  return new RGWRESTMgr_PubSub();
 }
 
 bool RGWPSSyncModuleInstance::should_full_sync() const {
index 6293289cf93bd6f26e8e43959b5b2ccc8d3b7a86..e968360e3a92a3fac5ff7a7bbeb46b51d7ae4080 100644 (file)
@@ -2,6 +2,8 @@
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <algorithm>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
 #include "rgw_sync_module_pubsub.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_sync_module_pubsub_rest.h"
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-// create a topic
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  std::string topic_name;
-  rgw_pubsub_sub_dest dest;
-  std::string topic_arn;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_create"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSCreateTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, dest, topic_arn);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
-}
-
 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
+class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
 public:
   int get_params() override {
     
@@ -95,39 +57,8 @@ public:
   }
 };
 
-// list all topics
-class RGWPSListTopicsOp : public RGWOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_user_topics result;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topics_list"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSListTopicsOp::execute()
-{
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->get_user_topics(&result);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully got topics" << dendl;
-}
-
 // command: GET /topics
-class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
+class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
 public:
   void send_response() override {
     if (op_ret) {
@@ -145,46 +76,8 @@ public:
   }
 };
 
-// get topic information
-class RGWPSGetTopicOp : public RGWOp {
-protected:
-  std::string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_topic_subs result;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_get"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSGetTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->get_topic(topic_name, &result);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
-}
-
 // command: GET /topics/<topic-name>
-class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
+class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
 public:
   int get_params() override {
     topic_name = s->object.name;
@@ -207,46 +100,8 @@ public:
   }
 };
 
-// delete a topic
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
-  string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_delete"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-void RGWPSDeleteTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->remove_topic(topic_name);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
-}
-
 // command: DELETE /topics/<topic-name>
-class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
+class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
 public:
   int get_params() override {
     topic_name = s->object.name;
@@ -254,8 +109,8 @@ public:
   }
 };
 
-// topics handler factory
-class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
+// ceph specifc topics handler factory
+class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -274,69 +129,29 @@ protected:
       return nullptr;
     }
     if (s->object.empty()) {
-      return new RGWPSListTopics_ObjStore_S3();
+      return new RGWPSListTopics_ObjStore();
     }
-    return new RGWPSGetTopic_ObjStore_S3();
+    return new RGWPSGetTopic_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateTopic_ObjStore_S3();
+      return new RGWPSCreateTopic_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteTopic_ObjStore_S3();
+      return new RGWPSDeleteTopic_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSTopic_S3() = default;
+  explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSTopic() = default;
 };
 
-// create a subscription
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_sub_dest dest;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_create"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSCreateSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->subscribe(topic_name, dest);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
-}
-
 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
-class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
+class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
 public:
   int get_params() override {
     sub_name = s->object.name;
@@ -361,47 +176,8 @@ public:
   }
 };
 
-// get subscription information (including push-endpoint if exist)
-class RGWPSGetSubOp : public RGWOp {
-protected:
-  std::string sub_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_sub_config result;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_get"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSGetSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->get_conf(&result);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
-}
-
 // command: GET /subscriptions/<sub-name>
-class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
+class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
 public:
   int get_params() override {
     sub_name = s->object.name;
@@ -423,47 +199,8 @@ public:
   }
 };
 
-// delete subscription
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_delete"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-void RGWPSDeleteSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->unsubscribe(topic_name);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
-}
-
 // command: DELETE /subscriptions/<sub-name>
-class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
+class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
 public:
   int get_params() override {
     sub_name = s->object.name;
@@ -472,51 +209,10 @@ public:
   }
 };
 
-// acking of an event
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string event_id;
-  std::unique_ptr<RGWUserPubSub> ups;
-  
-  virtual int get_params() = 0;
-
-public:
-  RGWPSAckSubEventOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_ack"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSAckSubEventOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub_with_events(sub_name);
-  op_ret = sub->remove_event(event_id);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
-}
-
 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
-class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
+class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
 public:
-  explicit RGWPSAckSubEvent_ObjStore_S3() {}
+  explicit RGWPSAckSubEvent_ObjStore() {}
 
   int get_params() override {
     sub_name = s->object.name;
@@ -532,58 +228,8 @@ public:
   }
 };
 
-// fetching events from a subscription
-// dpending on whether the subscription was created via s3 compliant API or not
-// the matching events will be returned
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
-  int max_entries{0};
-  std::string sub_name;
-  std::string marker;
-  std::unique_ptr<RGWUserPubSub> ups;
-  RGWUserPubSub::SubRef sub; 
-
-  virtual int get_params() = 0;
-
-public:
-  RGWPSPullSubEventsOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_pull"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSPullSubEventsOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  sub = ups->get_sub_with_events(sub_name);
-  if (!sub) {
-    op_ret = -ENOENT;
-    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
-    return;
-  }
-  op_ret = sub->list_events(marker, max_entries);
-  if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
-}
-
 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
-class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
+class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
 public:
   int get_params() override {
     sub_name = s->object.name;
@@ -614,7 +260,7 @@ public:
 };
 
 // subscriptions handler factory
-class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
+class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -631,31 +277,31 @@ protected:
       return nullptr;
     }
     if (s->info.args.exists("events")) {
-      return new RGWPSPullSubEvents_ObjStore_S3();
+      return new RGWPSPullSubEvents_ObjStore();
     }
-    return new RGWPSGetSub_ObjStore_S3();
+    return new RGWPSGetSub_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateSub_ObjStore_S3();
+      return new RGWPSCreateSub_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteSub_ObjStore_S3();
+      return new RGWPSDeleteSub_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_post() override {
     if (s->info.args.exists("ack")) {
-      return new RGWPSAckSubEvent_ObjStore_S3();
+      return new RGWPSAckSubEvent_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSSub_S3() = default;
+  explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSSub() = default;
 };
 
 namespace {
@@ -683,49 +329,8 @@ int notif_bucket_path(const string& path, std::string& bucket_name) {
 }
 }
 
-// notification creation
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  string bucket_name;
-  RGWBucketInfo bucket_info;
-
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    const auto& id = s->owner.get_id();
-
-    ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
-                                 bucket_info, nullptr, null_yield, nullptr);
-    if (ret < 0) {
-      ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
-      return ret;
-    }
-
-    if (bucket_info.owner != id) {
-      ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
-      return -EPERM;
-    }
-    return 0;
-  }
-
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-
 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
-class RGWPSCreateNotif_ObjStore_Ceph : public RGWPSCreateNotifOp {
+class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
 private:
   std::string topic_name;
   std::set<std::string, ltstr_nocase> events;
@@ -750,9 +355,9 @@ public:
   void execute() override;
 };
 
-void RGWPSCreateNotif_ObjStore_Ceph::execute()
+void RGWPSCreateNotif_ObjStore::execute()
 {
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  ups.emplace(store, s->owner.get_id());
 
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events);
@@ -763,229 +368,8 @@ void RGWPSCreateNotif_ObjStore_Ceph::execute()
   ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
 }
 
-namespace {
-// conversion functions between S3 and GCP style event names
-std::string s3_to_gcp_event(const std::string& event) {
-  if (event == "s3:ObjectCreated:*") {
-    return "OBJECT_CREATE";
-  }
-  if (event == "s3:ObjectRemoved:*") {
-    return "OBJECT_DELETE";
-  }
-  return "UNKNOWN_EVENT";
-}
-std::string gcp_to_s3_event(const std::string& event) {
-  if (event == "OBJECT_CREATE") {
-    return "s3:ObjectCreated:";
-  }
-  if (event == "OBJECT_DELETE") {
-    return "s3:ObjectRemoved:";
-  }
-  return "UNKNOWN_EVENT";
-}
-
-// return a unique topic by prefexing with the notification name: <notification>_<topic>
-std::string topic_to_unique(const std::string& topic, const std::string& notification) {
-  return notification + "_" + topic;
-}
-
-// extract the topic from a unique topic of the form: <notification>_<topic>
-[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
-  if (unique_topic.find(notification + "_") == string::npos) {
-    return "";
-  }
-  return unique_topic.substr(notification.length() + 1);
-}
-}
-
-// command (S3 compliant): PUT /<bucket name>?notification
-// a "notification" and a subscription will be auto-generated
-// actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-  rgw_pubsub_s3_notifications configurations;
-
-  int get_params_from_body() {
-    const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-    int r;
-    bufferlist data;
-    std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
-
-    if (r < 0) {
-      ldout(s->cct, 1) << "failed to read XML payload" << dendl;
-      return r;
-    }
-    if (data.length() == 0) {
-      ldout(s->cct, 1) << "XML payload missing" << dendl;
-      return -EINVAL;
-    }
-
-    RGWXMLDecoder::XMLParser parser;
-
-    if (!parser.init()){
-      ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
-      return -EINVAL;
-    }
-    if (!parser.parse(data.c_str(), data.length(), 1)) {
-      ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
-      return -ERR_MALFORMED_XML;
-    }
-    try {
-      // TopicConfigurations is mandatory
-      RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
-    } catch (RGWXMLDecoder::err& err) {
-      ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
-      return -ERR_MALFORMED_XML;
-    }
-    return 0;
-  }
-
-  int get_params() override {
-    bool exists;
-    const auto no_value = s->info.args.get("notification", &exists);
-    if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
-      return -EINVAL;
-    } 
-    if (no_value.length() > 0) {
-      ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
-      return -EINVAL;
-    }
-    if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
-      return -EINVAL;
-    }
-    bucket_name = s->bucket_name;
-    return 0;
-  }
-
-public:
-  const char* name() const override { return "pubsub_notification_create_s3"; }
-  void execute() override;
-};
-
-void RGWPSCreateNotif_ObjStore_S3::execute() {
-  op_ret = get_params_from_body();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
-  const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
-  const auto& conf = psmodule->get_effective_conf();
-
-  for (const auto& c : configurations.list) {
-    const auto& sub_name = c.id;
-    if (sub_name.empty()) {
-      ldout(s->cct, 1) << "missing notification id" << dendl;
-      op_ret = -EINVAL;
-      return;
-    }
-    if (c.topic_arn.empty()) {
-      ldout(s->cct, 1) << "missing topic ARN" << dendl;
-      op_ret = -EINVAL;
-      return;
-    }
-
-    const auto arn = rgw::ARN::parse(c.topic_arn);
-    if (!arn || arn->resource.empty()) {
-      ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
-      op_ret = -EINVAL;
-      return;
-    }
-
-    const auto topic_name = arn->resource;
-
-    // get topic information. destination information is stored in the topic
-    rgw_pubsub_topic_subs topic_info;  
-    op_ret = ups->get_topic(topic_name, &topic_info);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-      return;
-    }
-    // make sure that full topic configuration match
-    // TODO: use ARN match function
-    
-    // create unique topic name. this has 2 reasons:
-    // (1) topics cannot be shared between different S3 notifications because they hold the filter information
-    // (2) make topic clneaup easier, when notification is removed
-    const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
-    // generate the internal topic, no need to store destination info
-    // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name << 
-        "', ret=" << op_ret << dendl;
-      return;
-    }
-    // generate the notification
-    EventTypeList events;
-    std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
-    op_ret = b->create_notification(unique_topic_name, events);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
-        "', ret=" << op_ret << dendl;
-      // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name);
-      return;
-    }
-    
-    rgw_pubsub_sub_dest dest = topic_info.topic.dest;
-    dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + unique_topic_name;
-    dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
-    auto sub = ups->get_sub(sub_name);
-    op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
-      // rollback generated notification (ignore return value)
-      b->remove_notification(unique_topic_name);
-      // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name);
-      return;
-    }
-    ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
-  }
-}
-
-// delete a notification
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  std::string bucket_name;
-  RGWBucketInfo bucket_info;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, null_yield, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
-      return -EPERM;
-    }
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// command:  DELETE /notifications/bucket/<bucket>?topic=<topic-name>
-class RGWPSDeleteNotif_ObjStore_Ceph : public RGWPSDeleteNotifOp {
+// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
+class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
 private:
   std::string topic_name;
 
@@ -1004,13 +388,13 @@ public:
   const char* name() const override { return "pubsub_notification_delete"; }
 };
 
-void RGWPSDeleteNotif_ObjStore_Ceph::execute() {
+void RGWPSDeleteNotif_ObjStore::execute() {
   op_ret = get_params();
   if (op_ret < 0) {
     return;
   }
 
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  ups.emplace(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name);
   if (op_ret < 0) {
@@ -1020,133 +404,8 @@ void RGWPSDeleteNotif_ObjStore_Ceph::execute() {
   ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
 }
 
-// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
-  std::string sub_name;
-
-  int get_params() override {
-    bool exists;
-    sub_name = s->info.args.get("notification", &exists);
-    if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
-      return -EINVAL;
-    } 
-    if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
-      return -EINVAL;
-    }
-    bucket_name = s->bucket_name;
-    return 0;
-  }
-
-  void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
-    auto sub = ups->get_sub(_sub_name);
-    rgw_pubsub_sub_config sub_conf;
-    op_ret = sub->get_conf(&sub_conf);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    if (sub_conf.s3_id.empty()) {
-      if (must_delete) {
-        op_ret = -ENOENT;
-        ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
-      }
-      return;
-    }
-    const auto& sub_topic_name = sub_conf.topic;
-    op_ret = sub->unsubscribe(sub_topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
-      return;
-    }
-    op_ret = b->remove_notification(sub_topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
-    }
-    op_ret = ups->remove_topic(sub_topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
-    }
-    return;
-  }
-
-public:
-  void execute() override;
-  const char* name() const override { return "pubsub_notification_delete_s3"; }
-};
-
-void RGWPSDeleteNotif_ObjStore_S3::execute() {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
-
-  if (!sub_name.empty()) {
-    // delete a specific notification
-    delete_notification(sub_name, b, true);
-    return;
-  }
-
-  // delete all notifications on a bucket
-  rgw_pubsub_bucket_topics bucket_topics;
-  b->get_topics(&bucket_topics);
-  // loop through all topics of the bucket
-  for (const auto& topic : bucket_topics.topics) {
-    // for each topic get all subscriptions
-    rgw_pubsub_topic_subs topic_subs;
-    ups->get_topic(topic.first, &topic_subs);
-    // loop through all subscriptions
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      delete_notification(topic_sub_name, b, false);
-    }
-  }
-}
-
-// get topics/notifications on a bucket
-class RGWPSListNotifsOp : public RGWOp {
-protected:
-  std::string bucket_name;
-  RGWBucketInfo bucket_info;
-  std::unique_ptr<RGWUserPubSub> ups;
-
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, null_yield, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
-      return -EPERM;
-    }
-
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
 // command: GET /notifications/bucket/<bucket>
-class RGWPSListNotifs_ObjStore_Ceph : public RGWPSListNotifsOp {
+class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
 private:
   rgw_pubsub_bucket_topics result;
 
@@ -1172,9 +431,9 @@ public:
   const char* name() const override { return "pubsub_notifications_list"; }
 };
 
-void RGWPSListNotifs_ObjStore_Ceph::execute()
+void RGWPSListNotifs_ObjStore::execute()
 {
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  ups.emplace(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->get_topics(&result);
   if (op_ret < 0) {
@@ -1183,123 +442,8 @@ void RGWPSListNotifs_ObjStore_Ceph::execute()
   }
 }
 
-// command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-private:
-  std::string sub_name;
-  rgw_pubsub_s3_notifications notifications;
-
-  int get_params() override {
-    bool exists;
-    sub_name = s->info.args.get("notification", &exists);
-    if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
-      return -EINVAL;
-    } 
-    if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
-      return -EINVAL;
-    }
-    bucket_name = s->bucket_name;
-    return 0;
-  }
-
-  void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
-      const EventTypeList& events,
-      const std::string& topic_arn);  
-
-public:
-  void execute() override;
-  void send_response() override {
-    if (op_ret) {
-      set_req_state_err(s, op_ret);
-    }
-    dump_errno(s);
-    end_header(s, this, "application/xml");
-
-    if (op_ret < 0) {
-      return;
-    }
-    notifications.dump_xml(s->formatter);
-    rgw_flush_formatter_and_reset(s, s->formatter);
-  }
-  const char* name() const override { return "pubsub_notifications_get_s3"; }
-};
-
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
-    const EventTypeList& events, 
-    const std::string& topic_arn) { 
-    rgw_pubsub_s3_notification notification;
-    notification.id = sub_conf.s3_id;
-    notification.topic_arn = topic_arn,
-    std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
-    notifications.list.push_back(notification);
-}
-
-void RGWPSListNotifs_ObjStore_S3::execute() {
-  ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
-  if (!sub_name.empty()) {
-    // get info of a specific notification
-    auto sub = ups->get_sub(sub_name);
-    rgw_pubsub_sub_config sub_conf;
-    op_ret = sub->get_conf(&sub_conf);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    if (sub_conf.s3_id.empty()) {
-      op_ret = -ENOENT;
-      ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
-      return;
-    }
-    rgw_pubsub_bucket_topics bucket_topics;
-    op_ret = b->get_topics(&bucket_topics);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
-    if (topic_it == bucket_topics.topics.end()) {
-      op_ret = -ENOENT;
-      ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
-      return;
-    }
-    add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
-    return;
-  }
-  // get info on all s3 notifications of the bucket
-  rgw_pubsub_bucket_topics bucket_topics;
-  b->get_topics(&bucket_topics);
-  // loop through all topics of the bucket
-  for (const auto& topic : bucket_topics.topics) {
-    // for each topic get all subscriptions
-    rgw_pubsub_topic_subs topic_subs;
-    ups->get_topic(topic.first, &topic_subs);
-    const auto& events = topic.second.events;
-    const auto& topic_arn = topic.second.topic.arn;
-    // loop through all subscriptions
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      // get info of a specific notification
-      auto sub = ups->get_sub(topic_sub_name);
-      rgw_pubsub_sub_config sub_conf;
-      op_ret = sub->get_conf(&sub_conf);
-      if (op_ret < 0) {
-        ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-        return;
-      }
-      if (sub_conf.s3_id.empty()) {
-        // not an s3 notification
-        continue;
-      }
-      add_notification_to_list(sub_conf, events, topic_arn);
-    }
-  }
-}
-
 // ceph specific notification handler factory
-class RGWHandler_REST_PSNotifs_Ceph : public RGWHandler_REST_S3 {
+class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -1315,80 +459,53 @@ protected:
     if (s->object.empty()) {
       return nullptr;
     }
-    return new RGWPSListNotifs_ObjStore_Ceph();
+    return new RGWPSListNotifs_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateNotif_ObjStore_Ceph();
+      return new RGWPSCreateNotif_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteNotif_ObjStore_Ceph();
+      return new RGWPSDeleteNotif_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSNotifs_Ceph(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSNotifs_Ceph() = default;
+  explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSNotifs() = default;
 };
 
-// s3 compliant notification handler factory
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
-protected:
-  int init_permissions(RGWOp* op) override {
-    return 0;
-  }
-
-  int read_permissions(RGWOp* op) override {
-    return 0;
-  }
-  bool supports_quota() override {
-    return false;
-  }
-  RGWOp *op_get() override {
-    return new RGWPSListNotifs_ObjStore_S3();
-  }
-  RGWOp *op_put() override {
-    return new RGWPSCreateNotif_ObjStore_S3();
-  }
-  RGWOp *op_delete() override {
-    return new RGWPSDeleteNotif_ObjStore_S3();
-  }
-public:
-  explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSNotifs_S3() = default;
-};
-
-// factory for PubSub REST handlers 
-RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
+// factory for ceph specific PubSub REST handlers 
+RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
                                                      const rgw::auth::StrategyRegistry& auth_registry,
                                                      const std::string& frontend_prefix)
 {
   if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
     return nullptr;
   }
-  
-  RGWHandler_REST *handler = nullptr;
+  RGWHandler_REST* handler{nullptr};
 
   // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
+  // this API is available only on RGW that belong to a pubsub zone
   if (s->init_state.url_bucket == "topics") {
-    handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
+    handler = new RGWHandler_REST_PSTopic(auth_registry);
   } else if (s->init_state.url_bucket == "subscriptions") {
-    handler = new RGWHandler_REST_PSSub_S3(auth_registry);
+    handler = new RGWHandler_REST_PSSub(auth_registry);
   } else if (s->init_state.url_bucket == "notifications") {
-    handler = new RGWHandler_REST_PSNotifs_Ceph(auth_registry);
-  } else {
-    // S3 compatible answers are XML formatted, this is not necessarily in the header of the request
-    if (RGWHandler_REST_S3::reallocate_formatter(s, RGW_FORMAT_XML) < 0) {
-      return nullptr;
+    handler = new RGWHandler_REST_PSNotifs(auth_registry);
+  } else if (s->info.args.exists("notification")) {
+    const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
+    if (ret == 0) {
+        handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
     }
-    // s3 compliant PubSub API: uses: <bucket name>?notification
-    handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
   }
-
+  
   ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
+
   return handler;
 }
 
index 657791bc1d2208875e052c5ea73668f6d317a7d1..d95539037ce8bfbef3b45c5e4ca9e9463740dfd3 100644 (file)
@@ -6,12 +6,9 @@
 
 #include "rgw_rest.h"
 
-class RGWRESTMgr_PubSub_S3 : public RGWRESTMgr {
-  RGWRESTMgr *next;
+class RGWRESTMgr_PubSub : public RGWRESTMgr {
 public:
-  explicit RGWRESTMgr_PubSub_S3(RGWRESTMgr *_next) : next(_next) {}
-
-  RGWHandler_REST *get_handler(struct req_state* s,
+  virtual RGWHandler_REST* get_handler(struct req_state* s,
                                const rgw::auth::StrategyRegistry& auth_registry,
                                const std::string& frontend_prefix) override;
 };
index 029ebff8069a97fd2cf110e3f931737cb389114c..d89a02eadc05802e43b86d3f24e53efd59a14b0c 100644 (file)
@@ -828,9 +828,9 @@ int RGWSI_User_RADOS::cls_user_get_header(const rgw_user& user, cls_user_header
   return rados_obj.operate(&op, &ibl, null_yield);
 }
 
-int RGWSI_User_RADOS::cls_user_get_header_async(const string& user, RGWGetUserHeader_CB *cb)
+int RGWSI_User_RADOS::cls_user_get_header_async(const string& user_str, RGWGetUserHeader_CB *cb)
 {
-  rgw_raw_obj obj = get_buckets_obj(user);
+  rgw_raw_obj obj = get_buckets_obj(rgw_user(user_str));
   auto rados_obj = svc.rados->obj(obj);
   int r = rados_obj.open();
   if (r < 0) {
@@ -855,7 +855,7 @@ int RGWSI_User_RADOS::read_stats(RGWSI_MetaBackend::Context *ctx,
   string user_str = user.to_str();
 
   cls_user_header header;
-  int r = cls_user_get_header(user_str, &header);
+  int r = cls_user_get_header(rgw_user(user_str), &header);
   if (r < 0)
     return r;
 
index e9a923b845ad36c93a46747feac44b70b2736fde..5762ff741fd78f962f5629fc70dcde90b1594b7d 100644 (file)
@@ -19,7 +19,7 @@ from .tests import get_realm, \
     gen_bucket_name, \
     get_user, \
     get_tenant
-from .zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
@@ -593,6 +593,44 @@ def test_ps_s3_notification():
     zones[0].delete_bucket(bucket_name)
 
 
+def test_ps_s3_notification_on_master():
+    """ test s3 notification set/get/delete on master """
+    zones, _  = init_env()
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    # create bucket on the first of the rados zones
+    zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+    # create s3 topic
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # get notifications on a bucket
+    response, status = s3_notification_conf.get_config()
+    assert_equal(status/100, 2)
+    assert_equal(len(response['TopicConfigurations']), 1)
+    assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
+
+    # delete specific notifications
+    _, status = s3_notification_conf.del_config(notification=notification_name)
+    assert_equal(status/100, 2)
+
+    # cleanup
+    topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+
+
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zones = init_env()
@@ -623,6 +661,33 @@ def test_ps_topic():
     assert_equal(parsed_result['Code'], 'NoSuchKey')
 
 
+def test_ps_s3_topic():
+    """ test set/get/delete of s3 topic """
+    zones, _ = init_env()
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create topic
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+    topic_arn = topic_conf.set_config()
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+    # list topics
+    result = topic_conf.get_list()
+    assert len(result['Topics']) > 0
+
+    # get topic
+    result, status  = topic_conf.get_config()
+    assert_equal(status/100, 2)
+    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+
+    # delete topic
+    topic_conf.del_config()
+
+
 def test_ps_topic_with_endpoint():
     """ test set topic with endpoint"""
     _, ps_zones = init_env()
@@ -647,6 +712,33 @@ def test_ps_topic_with_endpoint():
     topic_conf.del_config()
 
 
+def test_ps_s3_topic_with_endpoint():
+    """ test set/get/delete of s3 topic with endpoint """
+    zones, _ = init_env()
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create topic
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args)
+    topic_arn = topic_conf.set_config()
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+    # get topic
+    result, status  = topic_conf.get_config()
+    assert_equal(status/100, 2)
+    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+    # Note that endpoint args may be ordered differently in the result
+
+    # delete topic
+    topic_conf.del_config()
+
+
 def test_ps_notification():
     """ test set/get/delete of notification """
     zones, ps_zones = init_env()
index a24fe31d0ebef10e76453c0e202bc589fe2756c5..71737aaea36ec5efc8f83924d0f85a5d411d948d 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import httplib
 import urllib
+import urlparse
 import hmac
 import hashlib
 import base64
@@ -129,6 +130,72 @@ class PSTopic:
         return self.send_request('GET', get_list=True)
 
 
+class PSTopicS3:
+    """class to set/list/get/delete a topic
+    POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
+    POST ?Action=ListTopics
+    POST ?Action=GetTopic&TopicArn=<topic-arn>
+    POST ?Action=DeleteTopic&TopicArn=<topic-arn>
+    """
+    def __init__(self, conn, topic_name, region, endpoint_args=None):
+        self.conn = conn
+        self.topic_name = topic_name.strip()
+        assert self.topic_name
+        self.topic_arn = ''
+        self.attributes = {}
+        if endpoint_args is not None:
+            self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
+        self.client = boto3.client('sns',
+                                   endpoint_url='http://'+conn.host+':'+str(conn.port),
+                                   aws_access_key_id=conn.aws_access_key_id,
+                                   aws_secret_access_key=conn.aws_secret_access_key,
+                                   region_name=region,
+                                   config=Config(signature_version='s3'))
+
+
+    def get_config(self):
+        """get topic info"""
+        parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
+        body = urllib.urlencode(parameters)
+        string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+        content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+        resource = '/'
+        method = 'POST'
+        string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+        log.debug('StringTosign: %s', string_to_sign) 
+        signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+                                     string_to_sign.encode('utf-8'),
+                                     hashlib.sha1).digest())
+        headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+                   'Date': string_date,
+                   'Host': self.conn.host+':'+str(self.conn.port),
+                   'Content-Type': content_type}
+        http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+        if log.getEffectiveLevel() <= 10:
+            http_conn.set_debuglevel(5)
+        http_conn.request(method, resource, body, headers)
+        response = http_conn.getresponse()
+        data = response.read()
+        status = response.status
+        http_conn.close()
+        dict_response = xmltodict.parse(data)
+        return dict_response, status
+
+    def set_config(self):
+        """set topic"""
+        result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
+        self.topic_arn = result['TopicArn']
+        return self.topic_arn
+
+    def del_config(self):
+        """delete topic"""
+        self.client.delete_topic(TopicArn=self.topic_arn)
+    
+    def get_list(self):
+        """list all topics"""
+        return self.client.list_topics()
+
+
 class PSNotification:
     """class to set/get/delete a notification
     PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
index 45482ea281f49d59b68b964c0d3d121ce523ff41..d2ccbe505c86b765d9c896c0f582b214765b45ca 100644 (file)
@@ -92,13 +92,18 @@ class Gateway(multisite.Gateway):
     def start(self, args = None):
         """ start the gateway """
         assert(self.cluster)
+        env = os.environ.copy()
+        # to change frontend, set RGW_FRONTEND env variable
+        # e.g. RGW_FRONTEND=civetweb
+        # to run test under valgrind memcheck, set RGW_VALGRIND to 'yes'
+        # e.g. RGW_VALGRIND=yes
         cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
         if self.id:
             cmd += ['-i', self.id]
         cmd += ['--debug-rgw=20', '--debug-ms=1']
         if args:
             cmd += args
-        bash(cmd)
+        bash(cmd, env=env)
 
     def stop(self):
         """ stop the gateway """
index 0277493a269312c152f0620b930529a485e565c1..50c42c7af7eddb8f09685c5e36a2d492dfaf13d2 100644 (file)
@@ -433,3 +433,31 @@ TEST(TestDecoder, Attributes)
       expected_output_with_attributes);
 }
 
+static const char* expected_xml_output = "<Items xmlns=\"https://www.ceph.com/doc/\">"
+                             "<Item Order=\"0\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>0</Value></Item>"
+                             "<Item Order=\"1\"><NameAndStatus><Name>hello</Name><Status>False</Status></NameAndStatus><Value>1</Value></Item>"
+                             "<Item Order=\"2\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>2</Value></Item>"
+                             "<Item Order=\"3\"><NameAndStatus><Name>hello</Name><Status>False</Status></NameAndStatus><Value>3</Value></Item>"
+                             "<Item Order=\"4\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>4</Value></Item>"
+                           "</Items>";
+TEST(TestEncoder, ListWithAttrsAndNS)
+{
+  XMLFormatter f;
+  const auto array_size = 5;
+  f.open_array_section_in_ns("Items", "https://www.ceph.com/doc/");
+  for (auto i = 0; i < array_size; ++i) {
+    FormatterAttrs item_attrs("Order", std::to_string(i).c_str(), NULL);
+    f.open_object_section_with_attrs("Item", item_attrs);
+    f.open_object_section("NameAndStatus");
+    encode_xml("Name", "hello", &f);
+    encode_xml("Status", (i%2 == 0), &f);
+    f.close_section();
+    encode_xml("Value", i, &f);
+    f.close_section();
+  }
+  f.close_section();
+  std::stringstream ss;
+  f.flush(ss);
+  ASSERT_STREQ(ss.str().c_str(), expected_xml_output);
+}
+