]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: initial version of S3 compliant API
authorYuval Lifshitz <yuvalif@yahoo.com>
Wed, 27 Feb 2019 17:13:08 +0000 (19:13 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Thu, 21 Mar 2019 06:57:04 +0000 (08:57 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_sync_module_pubsub_rest.cc

index c265f9190644b6b0716057ea7b1a46539b16f869..24843900cc7ec9ff5d8a5227fa73a6a8e92fff41 100644 (file)
@@ -657,13 +657,13 @@ void abort_early(struct req_state *s, RGWOp* op, int err_no,
   if (op != NULL) {
     int new_err_no;
     new_err_no = op->error_handler(err_no, &error_content);
-    ldout(s->cct, 20) << "op->ERRORHANDLER: err_no=" << err_no
+    ldout(s->cct, 1) << "op->ERRORHANDLER: err_no=" << err_no
                      << " new_err_no=" << new_err_no << dendl;
     err_no = new_err_no;
   } else if (handler != NULL) {
     int new_err_no;
     new_err_no = handler->error_handler(err_no, &error_content);
-    ldout(s->cct, 20) << "handler->ERRORHANDLER: err_no=" << err_no
+    ldout(s->cct, 1) << "handler->ERRORHANDLER: err_no=" << err_no
                      << " new_err_no=" << new_err_no << dendl;
     err_no = new_err_no;
   }
index cf65357f2bad2100e1d89163c2a7f235f7198b5f..b5eaee291ecdd117db207337cb2e2cbe14484b51 100644 (file)
@@ -4055,6 +4055,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_PUT_OBJ_TAGGING:
         case RGW_OP_PUT_LC:
         case RGW_OP_SET_REQUEST_PAYMENT:
+        case RGW_OP_PUBSUB_NOTIF_CREATE:
           break;
         default:
           dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
index ef87613989ce5b280b0b3f956d080d5e5f0c9408..eaa8c003e3a7b01b39ef62d075b818db0e945cb2 100644 (file)
@@ -41,7 +41,7 @@ void RGWPSCreateTopicOp::execute()
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->create_topic(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -83,7 +83,7 @@ void RGWPSListTopicsOp::execute()
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->get_user_topics(&result);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
     return;
   }
 
@@ -141,7 +141,7 @@ void RGWPSGetTopicOp::execute()
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->get_topic(topic_name, &result);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get topic, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -203,7 +203,7 @@ void RGWPSDeleteTopicOp::execute()
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->remove_topic(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -290,7 +290,7 @@ void RGWPSCreateSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->subscribe(topic_name, dest);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -306,7 +306,7 @@ public:
 
     topic_name = s->info.args.get("topic", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
+      ldout(s->cct, 1) << "missing required param 'topic' for request" << dendl;
       return -EINVAL;
     }
 
@@ -355,7 +355,7 @@ void RGWPSGetSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->get_conf(&result);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -423,7 +423,7 @@ void RGWPSDeleteSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->unsubscribe(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -472,7 +472,7 @@ void RGWPSAckSubEventOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->remove_event(event_id);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -488,7 +488,7 @@ public:
 
     event_id = s->info.args.get("event-id", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
+      ldout(s->cct, 1) << "missing required param 'event-id' for request" << dendl;
       return -EINVAL;
     }
     return 0;
@@ -530,7 +530,7 @@ void RGWPSPullSubEventsOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->list_events(marker, max_entries, &result);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
     return;
   }
 }
@@ -545,7 +545,7 @@ public:
 #define DEFAULT_MAX_ENTRIES 100
     int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
     if (ret < 0) {
-      ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
+      ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
       return -EINVAL;
     }
     return 0;
@@ -652,14 +652,16 @@ public:
       return ret;
     }
 
-    ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+    const auto& id = s->owner.get_id();
+
+    ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
                                  bucket_info, nullptr, nullptr);
     if (ret < 0) {
       return ret;
     }
 
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+    if (bucket_info.owner != id) {
+      ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl;
       return -EPERM;
     }
     return 0;
@@ -667,15 +669,41 @@ public:
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
   }
-  void execute() override;
 
-  const char* name() const override { return "pubsub_notification_create"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
   virtual int get_params() = 0;
 };
 
-void RGWPSCreateNotifOp::execute()
+
+// GCP style notification creation
+// Command: PUT /notification/bucket/<bucket name>?topic=<topic name>
+// "topic" has to be created beforehand
+class RGWPSCreateNotif_ObjStore_GCP : public RGWPSCreateNotifOp {
+public:
+  explicit RGWPSCreateNotif_ObjStore_GCP() {}
+
+  const char* name() const override { return "pubsub_notification_create_gcp"; }
+
+  void execute() override;
+
+  int get_params() override {
+    bool exists;
+    topic_name = s->info.args.get("topic", &exists);
+    if (!exists) {
+      ldout(s->cct, 1) << "param 'topic' not provided" << dendl;
+      return -EINVAL;
+    }
+
+    string events_str = s->info.args.get("events", &exists);
+    if (exists) {
+      get_str_set(events_str, ",", events);
+    }
+    return notif_bucket_path(s->object.name, &bucket_name);
+  }
+};
+
+void RGWPSCreateNotif_ObjStore_GCP::execute()
 {
   op_ret = get_params();
   if (op_ret < 0) {
@@ -683,34 +711,105 @@ void RGWPSCreateNotifOp::execute()
   }
 
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
     return;
   }
 }
 
+// S3 compliant notification creation
+// Command: PUT /<bucket name>?notification
+// a "topic" will be auto-generated
 class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+protected:
+    std::string topic_name;
+
 public:
   explicit RGWPSCreateNotif_ObjStore_S3() {}
 
+  const char* name() const override { return "pubsub_notification_create_s3"; }
+
+  void execute() override;
+
   int get_params() override {
     bool exists;
-    topic_name = s->info.args.get("topic", &exists);
+    auto no_value = s->info.args.get("notification", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+      ldout(s->cct, 20) << "param 'notification' not provided" << dendl;
+      return -EINVAL;
+    } else if (no_value.length() > 0) {
+      ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl;
       return -EINVAL;
     }
 
-    string events_str = s->info.args.get("events", &exists);
-    if (exists) {
-      get_str_set(events_str, ",", events);
+    if (s->bucket_name.empty()) {
+      ldout(s->cct, 1) << "notification must be set on a bucket" << dendl;
+      return -EINVAL;
     }
-    return notif_bucket_path(s->object.name, &bucket_name);
+    
+    bucket_name = s->bucket_name;
+
+    const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+
+    int r = 0;
+    bufferlist data;
+    std::tie(r, data) = rgw_rest_read_all_input(s, max_size);
+
+    if (r < 0) {
+      ldout(s->cct, 1) << "failed to read notification parameters from payload" << dendl;
+      return r;
+    }
+    if (data.length() == 0) {
+      ldout(s->cct, 1) << "payload missing for notification" << dendl;
+      return -EINVAL;
+    }
+
+    RGWXMLParser parser;
+
+    if (!parser.init()){
+      ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
+      return -EINVAL;
+    }
+
+    if (!parser.parse(data.c_str(), data.length(), 1)) {
+      ldout(s->cct, 1) << "failed to parse XML payload or notification" << dendl;
+      return -ERR_MALFORMED_XML;
+    }
+
+    try {
+    } catch (RGWXMLDecoder::err& err) {
+        ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl;
+        return -ERR_MALFORMED_XML;
+    }
+    return 0;
   }
 };
 
+void RGWPSCreateNotif_ObjStore_S3::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+
+  op_ret = ups->create_topic(topic_name);
+  if (op_ret < 0) {
+      ldout(s->cct, 1) << "failed to auto-generate topic for notification, ret=" << op_ret << dendl;
+      return;
+  }
+  auto b = ups->get_bucket(bucket_info.bucket);
+  op_ret = b->create_notification(topic_name, events);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
+    return;
+  }
+}
+
 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
 protected:
   std::unique_ptr<RGWUserPubSub> ups;
@@ -734,7 +833,7 @@ public:
     }
 
     if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl;
       return -EPERM;
     }
     return 0;
@@ -761,20 +860,20 @@ void RGWPSDeleteNotifOp::execute()
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl;
     return;
   }
 }
 
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+class RGWPSDeleteNotif_ObjStore_GCP : public RGWPSDeleteNotifOp {
 public:
-  explicit RGWPSDeleteNotif_ObjStore_S3() {}
+  explicit RGWPSDeleteNotif_ObjStore_GCP() {}
 
   int get_params() override {
     bool exists;
     topic_name = s->info.args.get("topic", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+      ldout(s->cct, 1) << "param 'topic' not provided" << dendl;
       return -EINVAL;
     }
     return notif_bucket_path(s->object.name, &bucket_name);
@@ -805,7 +904,7 @@ public:
     }
 
     if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+      ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl;
       return -EPERM;
     }
 
@@ -828,15 +927,15 @@ void RGWPSListNotifsOp::execute()
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->get_topics(&result);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
     return;
   }
 
 }
 
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+class RGWPSListNotifs_ObjStore_GCP : public RGWPSListNotifsOp {
 public:
-  explicit RGWPSListNotifs_ObjStore_S3() {}
+  explicit RGWPSListNotifs_ObjStore_GCP() {}
 
   int get_params() override {
     return notif_bucket_path(s->object.name, &bucket_name);
@@ -859,7 +958,8 @@ public:
 };
 
 
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+// GCP-style notification handler factory
+class RGWHandler_REST_PSNotifs_GCP : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -875,26 +975,53 @@ protected:
     if (s->object.empty()) {
       return nullptr;
     }
-    return new RGWPSListNotifs_ObjStore_S3();
+    return new RGWPSListNotifs_ObjStore_GCP();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateNotif_ObjStore_S3();
+      return new RGWPSCreateNotif_ObjStore_GCP();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteNotif_ObjStore_S3();
+      return new RGWPSDeleteNotif_ObjStore_GCP();
     }
     return nullptr;
   }
+public:
+  explicit RGWHandler_REST_PSNotifs_GCP(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSNotifs_GCP() {}
+};
+
+// S3-compliant notification handler factory
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+  int init_permissions(RGWOp* op) override {
+    return 0;
+  }
+
+  int read_permissions(RGWOp* op) override {
+    return 0;
+  }
+  bool supports_quota() override {
+    return false;
+  }
+  RGWOp *op_get() override {
+    return nullptr;
+  }
+  RGWOp *op_put() override {
+    return new RGWPSCreateNotif_ObjStore_S3();
+  }
+  RGWOp *op_delete() override {
+    return nullptr;
+  }
 public:
   explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
   virtual ~RGWHandler_REST_PSNotifs_S3() {}
 };
 
-
+// factory for PubSub REST handlers 
 RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
                                                      const rgw::auth::StrategyRegistry& auth_registry,
                                                      const std::string& frontend_prefix)
@@ -906,17 +1033,17 @@ RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
     return nullptr;
   }
 
-  RGWHandler_REST *handler = nullptr;;
+  RGWHandler_REST *handler = nullptr;
 
+  // GCP-style PubSub API: topics/subscriptions/notification are reserved bucket names
   if (s->init_state.url_bucket == "topics") {
     handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
-  }
-
-  if (s->init_state.url_bucket == "subscriptions") {
+  } else if (s->init_state.url_bucket == "subscriptions") {
     handler = new RGWHandler_REST_PSSub_S3(auth_registry);
-  }
-
-  if (s->init_state.url_bucket == "notifications") {
+  } else if (s->init_state.url_bucket == "notifications") {
+    handler = new RGWHandler_REST_PSNotifs_GCP(auth_registry);
+  } else {
+    // S3-compliant PubSub API: uses: <bucket name>?notification
     handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
   }