]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub: REST api
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 5 Jul 2018 04:09:14 +0000 (21:09 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_common.h
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc [new file with mode: 0644]
src/rgw/rgw_sync_module_pubsub_rest.h [new file with mode: 0644]

index 30a6af0e3151602bb795d377d0098f8536cbe84f..8e931cd0282f498a9f4bf7ce9c5e1c82d0bc0a00 100644 (file)
@@ -86,6 +86,7 @@ set(librgw_common_srcs
   rgw_sync_module_es_rest.cc
   rgw_sync_module_log.cc
   rgw_sync_module_pubsub.cc
+  rgw_sync_module_pubsub_rest.cc
   rgw_sync_log_trim.cc
   rgw_sync_trace.cc
   rgw_period_history.cc
index da9f8283f88489d92408c42916a9856896a43947..aaaaaeda899cf7b79b0feb50edc81aea2fdb68a1 100644 (file)
@@ -530,6 +530,16 @@ enum RGWOpType {
   /* sts specific*/
   RGW_STS_ASSUME_ROLE,
   RGW_STS_GET_SESSION_TOKEN,
+  /* pubsub */
+  RGW_OP_PUBSUB_TOPIC_CREATE,
+  RGW_OP_PUBSUB_TOPICS_LIST,
+  RGW_OP_PUBSUB_TOPIC_GET,
+  RGW_OP_PUBSUB_TOPIC_DELETE,
+  RGW_OP_PUBSUB_SUB_CREATE,
+  RGW_OP_PUBSUB_SUB_GET,
+  RGW_OP_PUBSUB_SUB_DELETE,
+  RGW_OP_PUBSUB_SUB_PULL,
+  RGW_OP_PUBSUB_SUB_ACK,
 };
 
 class RGWAccessControlPolicy;
index 5c926ec9db803de850a1d0980e447c0d1ef45ed6..449b33c867eb7c7d707b355620d6a56c1f353008 100644 (file)
@@ -311,7 +311,7 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
   return 0;
 }
 
-int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rgw_pubsub_user_sub_dest& dest)
+int RGWUserPubSub::remove_sub(const string& name, const string& _topic)
 {
   string topic = _topic;
 
index c6b8f350cb136ae3ae55692c980586472dd6b2e1..071432ed8e1985428e761502ff8a5d44baf6eafb 100644 (file)
@@ -237,7 +237,7 @@ public:
   int remove_topic(const string& name);
   int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
   int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
-  int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+  int remove_sub(const string& name, const string& topic);
 
   struct list_events_result {
     string next_marker;
index 4de58cab62d0ab8d31b9a5ff6847612e7bb97b71..c434bd37c7a7437beca1589d1cd43c031ce6f6d0 100644 (file)
@@ -3,6 +3,7 @@
 #include "rgw_sync_module.h"
 #include "rgw_data_sync.h"
 #include "rgw_sync_module_pubsub.h"
+#include "rgw_sync_module_pubsub_rest.h"
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_tools.h"
@@ -1161,15 +1162,10 @@ RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
 }
 
 RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
-#warning REST filter implementation missing
-#if 0
   if (dialect != RGW_REST_S3) {
     return orig;
   }
-  delete orig;
-  return new RGWRESTMgr_MDSearch_S3();
-#endif
-  return orig;
+  return new RGWRESTMgr_PubSub_S3(orig);
 }
 
 int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc
new file mode 100644 (file)
index 0000000..cb0d4d0
--- /dev/null
@@ -0,0 +1,668 @@
+#include "rgw_sync_module_pubsub.h"
+#include "rgw_sync_module_pubsub_rest.h"
+#include "rgw_pubsub.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+
+class RGWPSCreateTopicOp : public RGWOp {
+protected:
+  std::unique_ptr<RGWUserPubSub> ups;
+  string topic_name;
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+
+public:
+  RGWPSCreateTopicOp() {}
+
+  int verify_permission() override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+                                 bucket_info, nullptr, nullptr);
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_info.owner != s->owner.get_id()) {
+      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      return -EPERM;
+    }
+
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_create"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSCreateTopicOp::execute()
+{
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->create_topic(topic_name, bucket_info.bucket);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
+public:
+  explicit RGWPSCreateTopic_ObjStore_S3() {}
+
+  int get_params() override {
+    topic_name = s->object.name;
+
+    bool exists;
+    bucket_name = s->info.args.get("bucket", &exists);
+    if (!exists) {
+      ldout(s->cct, 20) << "ERROR: missing required param 'bucket' for request" << dendl;
+      return -EINVAL;
+    }
+
+    return 0;
+  }
+};
+
+class RGWPSListTopicsOp : public RGWOp {
+protected:
+  string bucket_name;
+  RGWBucketInfo bucket_info;
+  std::unique_ptr<RGWUserPubSub> ups;
+  rgw_pubsub_user_topics result;
+
+
+public:
+  RGWPSListTopicsOp() {}
+
+  int verify_permission() override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_name.empty()) {
+      return 0;
+    }
+
+    RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+    ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+                                 bucket_info, nullptr, nullptr);
+    if (ret < 0) {
+      return ret;
+    }
+
+    if (bucket_info.owner != s->owner.get_id()) {
+      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      return -EPERM;
+    }
+
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topics_list"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSListTopicsOp::execute()
+{
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  if (bucket_name.empty()) {
+    op_ret = ups->get_topics(&result);
+  } else {
+    op_ret = ups->get_bucket_topics(bucket_info.bucket, &result);
+  }
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+
+}
+
+class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
+public:
+  explicit RGWPSListTopics_ObjStore_S3() {}
+
+  int get_params() override {
+    bucket_name = s->info.args.get("bucket");
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    encode_json("result", result, s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+class RGWPSGetTopicOp : public RGWOp {
+protected:
+  string topic_name;
+  std::unique_ptr<RGWUserPubSub> ups;
+  rgw_pubsub_user_topic_info result;
+
+public:
+  RGWPSGetTopicOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSGetTopicOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->get_topic(topic_name, &result);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
+public:
+  explicit RGWPSGetTopic_ObjStore_S3() {}
+
+  int get_params() override {
+    topic_name = s->object.name;
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    encode_json("result", result, s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+class RGWPSDeleteTopicOp : public RGWOp {
+protected:
+  string topic_name;
+  std::unique_ptr<RGWUserPubSub> ups;
+
+public:
+  RGWPSDeleteTopicOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_topic_delete"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSDeleteTopicOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->remove_topic(topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
+public:
+  explicit RGWPSDeleteTopic_ObjStore_S3() {}
+
+  int get_params() override {
+    topic_name = s->object.name;
+    return 0;
+  }
+};
+
+class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
+protected:
+  RGWOp *op_get() override {
+    if (s->init_state.url_bucket.empty()) {
+      return nullptr;
+    }
+    if (s->object.empty()) {
+      return new RGWPSListTopics_ObjStore_S3();
+    }
+    return new RGWPSGetTopic_ObjStore_S3();
+  }
+  RGWOp *op_put() override {
+    if (!s->object.empty()) {
+      return new RGWPSCreateTopic_ObjStore_S3();
+    }
+    return nullptr;
+  }
+  RGWOp *op_delete() override {
+    if (!s->object.empty()) {
+      return new RGWPSDeleteTopic_ObjStore_S3();
+    }
+    return nullptr;
+  }
+public:
+  explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSTopic_S3() {}
+};
+
+
+class RGWPSCreateSubOp : public RGWOp {
+protected:
+  string sub_name;
+  string topic_name;
+  std::unique_ptr<RGWUserPubSub> ups;
+  rgw_pubsub_user_sub_dest dest;
+
+public:
+  RGWPSCreateSubOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_create"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSCreateSubOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->add_sub(sub_name, topic_name, dest);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
+public:
+  explicit RGWPSCreateSub_ObjStore_S3() {}
+
+  int get_params() override {
+    sub_name = s->object.name;
+
+    bool exists;
+
+    topic_name = s->info.args.get("topic", &exists);
+    if (!exists) {
+      ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
+      return -EINVAL;
+    }
+
+    dest.push_endpoint = s->info.args.get("push-endpoint");
+
+    return 0;
+  }
+};
+
+class RGWPSGetSubOp : public RGWOp {
+protected:
+  string sub_name;
+  std::unique_ptr<RGWUserPubSub> ups;
+  rgw_pubsub_user_sub_config result;
+
+public:
+  RGWPSGetSubOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_get"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSGetSubOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->get_sub(sub_name, &result);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
+public:
+  explicit RGWPSGetSub_ObjStore_S3() {}
+
+  int get_params() override {
+    sub_name = s->object.name;
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    {
+      Formatter::ObjectSection section(*s->formatter, "result");
+      encode_json("topic", result.topic, s->formatter);
+      encode_json("push_endpoint", result.dest.push_endpoint, s->formatter);
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+class RGWPSDeleteSubOp : public RGWOp {
+protected:
+  string sub_name;
+  string topic_name;
+  std::unique_ptr<RGWUserPubSub> ups;
+
+public:
+  RGWPSDeleteSubOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_delete"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSDeleteSubOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->remove_sub(sub_name, topic_name);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
+public:
+  explicit RGWPSDeleteSub_ObjStore_S3() {}
+
+  int get_params() override {
+    sub_name = s->object.name;
+    topic_name = s->info.args.get("topic");
+    return 0;
+  }
+};
+
+class RGWPSAckSubEventOp : public RGWOp {
+protected:
+  string sub_name;
+  string event_id;
+  std::unique_ptr<RGWUserPubSub> ups;
+
+public:
+  RGWPSAckSubEventOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_ack"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSAckSubEventOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->remove_event(sub_name, event_id);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to remove event, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
+public:
+  explicit RGWPSAckSubEvent_ObjStore_S3() {}
+
+  int get_params() override {
+    sub_name = s->object.name;
+
+    bool exists;
+
+    event_id = s->info.args.get("event-id", &exists);
+    if (!exists) {
+      ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
+      return -EINVAL;
+    }
+    return 0;
+  }
+};
+
+class RGWPSPullSubEventsOp : public RGWOp {
+protected:
+  int max_entries{0};
+  string sub_name;
+  string marker;
+  std::unique_ptr<RGWUserPubSub> ups;
+  RGWUserPubSub::list_events_result result;
+
+public:
+  RGWPSPullSubEventsOp() {}
+
+  int verify_permission() override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute() override;
+
+  const char* name() const override { return "pubsub_subscription_pull"; }
+  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+  virtual int get_params() = 0;
+};
+
+void RGWPSPullSubEventsOp::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  op_ret = ups->list_events(sub_name, marker, max_entries, &result);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
+class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
+public:
+  explicit RGWPSPullSubEvents_ObjStore_S3() {}
+
+  int get_params() override {
+    sub_name = s->object.name;
+    marker = s->info.args.get("marker");
+#define DEFAULT_MAX_ENTRIES 100
+    int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
+    if (ret < 0) {
+      ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
+      return -EINVAL;
+    }
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    encode_json("result", result, s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+};
+
+class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
+protected:
+  bool supports_quota() override {
+    return false;
+  }
+  RGWOp *op_get() override {
+    if (s->object.empty()) {
+      return nullptr;
+    }
+    if (s->info.args.exists("events")) {
+      return new RGWPSPullSubEvents_ObjStore_S3();
+    }
+    return new RGWPSGetSub_ObjStore_S3();
+  }
+  RGWOp *op_put() override {
+    if (!s->object.empty()) {
+      return new RGWPSCreateSub_ObjStore_S3();
+    }
+    return nullptr;
+  }
+  RGWOp *op_delete() override {
+    if (!s->object.empty()) {
+      return new RGWPSDeleteSub_ObjStore_S3();
+    }
+    return nullptr;
+  }
+  RGWOp *op_post() override {
+    if (s->info.args.exists("ack")) {
+      return new RGWPSAckSubEvent_ObjStore_S3();
+    }
+    return nullptr;
+  }
+public:
+  explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSSub_S3() {}
+};
+
+
+RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
+                                                     const rgw::auth::StrategyRegistry& auth_registry,
+                                                     const std::string& frontend_prefix)
+{
+  int ret =
+    RGWHandler_REST_S3::init_from_header(s,
+                                       RGW_FORMAT_JSON, true);
+  if (ret < 0) {
+    return nullptr;
+  }
+
+  RGWHandler_REST *handler = nullptr;;
+
+  if (s->init_state.url_bucket == "topics") {
+    handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
+  }
+
+  if (s->init_state.url_bucket == "subscriptions") {
+    handler = new RGWHandler_REST_PSSub_S3(auth_registry);
+  }
+
+  ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
+  return handler;
+}
+
diff --git a/src/rgw/rgw_sync_module_pubsub_rest.h b/src/rgw/rgw_sync_module_pubsub_rest.h
new file mode 100644 (file)
index 0000000..120cb3d
--- /dev/null
@@ -0,0 +1,16 @@
+#ifndef CEPH_RGW_SYNC_MODULE_PUBSUB_REST_H
+#define CEPH_RGW_SYNC_MODULE_PUBSUB_REST_H
+
+#include "rgw_rest.h"
+
+class RGWRESTMgr_PubSub_S3 : public RGWRESTMgr {
+  RGWRESTMgr *next;
+public:
+  explicit RGWRESTMgr_PubSub_S3(RGWRESTMgr *_next) : next(_next) {}
+
+  RGWHandler_REST *get_handler(struct req_state* s,
+                               const rgw::auth::StrategyRegistry& auth_registry,
+                               const std::string& frontend_prefix) override;
+};
+
+#endif