From: Yuval Lifshitz Date: Wed, 27 Feb 2019 17:13:08 +0000 (+0200) Subject: rgw/pubsub: initial version of S3 compliant API X-Git-Tag: v15.1.0~3002^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2e94f4d1cecb5164bc646fb6b088dd197f878263;p=ceph.git rgw/pubsub: initial version of S3 compliant API Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index c265f919064..24843900cc7 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -657,13 +657,13 @@ void abort_early(struct req_state *s, RGWOp* op, int err_no, if (op != NULL) { int new_err_no; new_err_no = op->error_handler(err_no, &error_content); - ldout(s->cct, 20) << "op->ERRORHANDLER: err_no=" << err_no + ldout(s->cct, 1) << "op->ERRORHANDLER: err_no=" << err_no << " new_err_no=" << new_err_no << dendl; err_no = new_err_no; } else if (handler != NULL) { int new_err_no; new_err_no = handler->error_handler(err_no, &error_content); - ldout(s->cct, 20) << "handler->ERRORHANDLER: err_no=" << err_no + ldout(s->cct, 1) << "handler->ERRORHANDLER: err_no=" << err_no << " new_err_no=" << new_err_no << dendl; err_no = new_err_no; } diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index cf65357f2ba..b5eaee291ec 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -4055,6 +4055,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s, case RGW_OP_PUT_OBJ_TAGGING: case RGW_OP_PUT_LC: case RGW_OP_SET_REQUEST_PAYMENT: + case RGW_OP_PUBSUB_NOTIF_CREATE: break; default: dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl; diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index ef87613989c..eaa8c003e3a 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -41,7 +41,7 @@ void RGWPSCreateTopicOp::execute() ups = make_unique(store, s->owner.get_id()); op_ret = ups->create_topic(topic_name); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl; return; } } @@ -83,7 +83,7 @@ void RGWPSListTopicsOp::execute() ups = make_unique(store, s->owner.get_id()); op_ret = ups->get_user_topics(&result); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl; return; } @@ -141,7 +141,7 @@ void RGWPSGetTopicOp::execute() ups = make_unique(store, s->owner.get_id()); op_ret = ups->get_topic(topic_name, &result); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get topic, ret=" << op_ret << dendl; return; } } @@ -203,7 +203,7 @@ void RGWPSDeleteTopicOp::execute() ups = make_unique(store, s->owner.get_id()); op_ret = ups->remove_topic(topic_name); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl; return; } } @@ -290,7 +290,7 @@ void RGWPSCreateSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->subscribe(topic_name, dest); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl; return; } } @@ -306,7 +306,7 @@ public: topic_name = s->info.args.get("topic", &exists); if (!exists) { - ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl; + ldout(s->cct, 1) << "missing required param 'topic' for request" << dendl; return -EINVAL; } @@ -355,7 +355,7 @@ void RGWPSGetSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->get_conf(&result); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl; return; } } @@ -423,7 +423,7 @@ void RGWPSDeleteSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->unsubscribe(topic_name); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl; return; } } @@ -472,7 +472,7 @@ void RGWPSAckSubEventOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->remove_event(event_id); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl; return; } } @@ -488,7 +488,7 @@ public: event_id = s->info.args.get("event-id", &exists); if (!exists) { - ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl; + ldout(s->cct, 1) << "missing required param 'event-id' for request" << dendl; return -EINVAL; } return 0; @@ -530,7 +530,7 @@ void RGWPSPullSubEventsOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->list_events(marker, max_entries, &result); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl; return; } } @@ -545,7 +545,7 @@ public: #define DEFAULT_MAX_ENTRIES 100 int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES); if (ret < 0) { - ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl; + ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl; return -EINVAL; } return 0; @@ -652,14 +652,16 @@ public: return ret; } - ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name, + const auto& id = s->owner.get_id(); + + ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name, bucket_info, nullptr, nullptr); if (ret < 0) { return ret; } - if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + if (bucket_info.owner != id) { + ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl; return -EPERM; } return 0; @@ -667,15 +669,41 @@ public: void pre_exec() override { rgw_bucket_object_pre_exec(s); } - void execute() override; - const char* name() const override { return "pubsub_notification_create"; } - virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; } - virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; } + uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } virtual int get_params() = 0; }; -void RGWPSCreateNotifOp::execute() + +// GCP style notification creation +// Command: PUT /notification/bucket/?topic= +// "topic" has to be created beforehand +class RGWPSCreateNotif_ObjStore_GCP : public RGWPSCreateNotifOp { +public: + explicit RGWPSCreateNotif_ObjStore_GCP() {} + + const char* name() const override { return "pubsub_notification_create_gcp"; } + + void execute() override; + + int get_params() override { + bool exists; + topic_name = s->info.args.get("topic", &exists); + if (!exists) { + ldout(s->cct, 1) << "param 'topic' not provided" << dendl; + return -EINVAL; + } + + string events_str = s->info.args.get("events", &exists); + if (exists) { + get_str_set(events_str, ",", events); + } + return notif_bucket_path(s->object.name, &bucket_name); + } +}; + +void RGWPSCreateNotif_ObjStore_GCP::execute() { op_ret = get_params(); if (op_ret < 0) { @@ -683,34 +711,105 @@ void RGWPSCreateNotifOp::execute() } ups = make_unique(store, s->owner.get_id()); + auto b = ups->get_bucket(bucket_info.bucket); op_ret = b->create_notification(topic_name, events); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl; return; } } +// S3 compliant notification creation +// Command: PUT /?notification +// a "topic" will be auto-generated class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { +protected: + std::string topic_name; + public: explicit RGWPSCreateNotif_ObjStore_S3() {} + const char* name() const override { return "pubsub_notification_create_s3"; } + + void execute() override; + int get_params() override { bool exists; - topic_name = s->info.args.get("topic", &exists); + auto no_value = s->info.args.get("notification", &exists); if (!exists) { - ldout(s->cct, 20) << "param 'topic' not provided" << dendl; + ldout(s->cct, 20) << "param 'notification' not provided" << dendl; + return -EINVAL; + } else if (no_value.length() > 0) { + ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl; return -EINVAL; } - string events_str = s->info.args.get("events", &exists); - if (exists) { - get_str_set(events_str, ",", events); + if (s->bucket_name.empty()) { + ldout(s->cct, 1) << "notification must be set on a bucket" << dendl; + return -EINVAL; } - return notif_bucket_path(s->object.name, &bucket_name); + + bucket_name = s->bucket_name; + + const auto max_size = s->cct->_conf->rgw_max_put_param_size; + + int r = 0; + bufferlist data; + std::tie(r, data) = rgw_rest_read_all_input(s, max_size); + + if (r < 0) { + ldout(s->cct, 1) << "failed to read notification parameters from payload" << dendl; + return r; + } + if (data.length() == 0) { + ldout(s->cct, 1) << "payload missing for notification" << dendl; + return -EINVAL; + } + + RGWXMLParser parser; + + if (!parser.init()){ + ldout(s->cct, 1) << "failed to initialize XML parser" << dendl; + return -EINVAL; + } + + if (!parser.parse(data.c_str(), data.length(), 1)) { + ldout(s->cct, 1) << "failed to parse XML payload or notification" << dendl; + return -ERR_MALFORMED_XML; + } + + try { + } catch (RGWXMLDecoder::err& err) { + ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl; + return -ERR_MALFORMED_XML; + } + return 0; } }; +void RGWPSCreateNotif_ObjStore_S3::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + + ups = make_unique(store, s->owner.get_id()); + + op_ret = ups->create_topic(topic_name); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to auto-generate topic for notification, ret=" << op_ret << dendl; + return; + } + auto b = ups->get_bucket(bucket_info.bucket); + op_ret = b->create_notification(topic_name, events); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl; + return; + } +} + class RGWPSDeleteNotifOp : public RGWDefaultResponseOp { protected: std::unique_ptr ups; @@ -734,7 +833,7 @@ public: } if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl; return -EPERM; } return 0; @@ -761,20 +860,20 @@ void RGWPSDeleteNotifOp::execute() auto b = ups->get_bucket(bucket_info.bucket); op_ret = b->remove_notification(topic_name); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl; return; } } -class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp { +class RGWPSDeleteNotif_ObjStore_GCP : public RGWPSDeleteNotifOp { public: - explicit RGWPSDeleteNotif_ObjStore_S3() {} + explicit RGWPSDeleteNotif_ObjStore_GCP() {} int get_params() override { bool exists; topic_name = s->info.args.get("topic", &exists); if (!exists) { - ldout(s->cct, 20) << "param 'topic' not provided" << dendl; + ldout(s->cct, 1) << "param 'topic' not provided" << dendl; return -EINVAL; } return notif_bucket_path(s->object.name, &bucket_name); @@ -805,7 +904,7 @@ public: } if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl; return -EPERM; } @@ -828,15 +927,15 @@ void RGWPSListNotifsOp::execute() auto b = ups->get_bucket(bucket_info.bucket); op_ret = b->get_topics(&result); if (op_ret < 0) { - ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl; return; } } -class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp { +class RGWPSListNotifs_ObjStore_GCP : public RGWPSListNotifsOp { public: - explicit RGWPSListNotifs_ObjStore_S3() {} + explicit RGWPSListNotifs_ObjStore_GCP() {} int get_params() override { return notif_bucket_path(s->object.name, &bucket_name); @@ -859,7 +958,8 @@ public: }; -class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 { +// GCP-style notification handler factory +class RGWHandler_REST_PSNotifs_GCP : public RGWHandler_REST_S3 { protected: int init_permissions(RGWOp* op) override { return 0; @@ -875,26 +975,53 @@ protected: if (s->object.empty()) { return nullptr; } - return new RGWPSListNotifs_ObjStore_S3(); + return new RGWPSListNotifs_ObjStore_GCP(); } RGWOp *op_put() override { if (!s->object.empty()) { - return new RGWPSCreateNotif_ObjStore_S3(); + return new RGWPSCreateNotif_ObjStore_GCP(); } return nullptr; } RGWOp *op_delete() override { if (!s->object.empty()) { - return new RGWPSDeleteNotif_ObjStore_S3(); + return new RGWPSDeleteNotif_ObjStore_GCP(); } return nullptr; } +public: + explicit RGWHandler_REST_PSNotifs_GCP(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_PSNotifs_GCP() {} +}; + +// S3-compliant notification handler factory +class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 { +protected: + int init_permissions(RGWOp* op) override { + return 0; + } + + int read_permissions(RGWOp* op) override { + return 0; + } + bool supports_quota() override { + return false; + } + RGWOp *op_get() override { + return nullptr; + } + RGWOp *op_put() override { + return new RGWPSCreateNotif_ObjStore_S3(); + } + RGWOp *op_delete() override { + return nullptr; + } public: explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} virtual ~RGWHandler_REST_PSNotifs_S3() {} }; - +// factory for PubSub REST handlers RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, const rgw::auth::StrategyRegistry& auth_registry, const std::string& frontend_prefix) @@ -906,17 +1033,17 @@ RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, return nullptr; } - RGWHandler_REST *handler = nullptr;; + RGWHandler_REST *handler = nullptr; + // GCP-style PubSub API: topics/subscriptions/notification are reserved bucket names if (s->init_state.url_bucket == "topics") { handler = new RGWHandler_REST_PSTopic_S3(auth_registry); - } - - if (s->init_state.url_bucket == "subscriptions") { + } else if (s->init_state.url_bucket == "subscriptions") { handler = new RGWHandler_REST_PSSub_S3(auth_registry); - } - - if (s->init_state.url_bucket == "notifications") { + } else if (s->init_state.url_bucket == "notifications") { + handler = new RGWHandler_REST_PSNotifs_GCP(auth_registry); + } else { + // S3-compliant PubSub API: uses: ?notification handler = new RGWHandler_REST_PSNotifs_S3(auth_registry); }