From b85f9f9037ca21e38f0f87d23780388f8d12b46a Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 4 Jul 2018 21:09:14 -0700 Subject: [PATCH] rgw: pubsub: REST api Signed-off-by: Yehuda Sadeh --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_common.h | 10 + src/rgw/rgw_pubsub.cc | 2 +- src/rgw/rgw_pubsub.h | 2 +- src/rgw/rgw_sync_module_pubsub.cc | 8 +- src/rgw/rgw_sync_module_pubsub_rest.cc | 668 +++++++++++++++++++++++++ src/rgw/rgw_sync_module_pubsub_rest.h | 16 + 7 files changed, 699 insertions(+), 8 deletions(-) create mode 100644 src/rgw/rgw_sync_module_pubsub_rest.cc create mode 100644 src/rgw/rgw_sync_module_pubsub_rest.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 30a6af0e315..8e931cd0282 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -86,6 +86,7 @@ set(librgw_common_srcs rgw_sync_module_es_rest.cc rgw_sync_module_log.cc rgw_sync_module_pubsub.cc + rgw_sync_module_pubsub_rest.cc rgw_sync_log_trim.cc rgw_sync_trace.cc rgw_period_history.cc diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index da9f8283f88..aaaaaeda899 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -530,6 +530,16 @@ enum RGWOpType { /* sts specific*/ RGW_STS_ASSUME_ROLE, RGW_STS_GET_SESSION_TOKEN, + /* pubsub */ + RGW_OP_PUBSUB_TOPIC_CREATE, + RGW_OP_PUBSUB_TOPICS_LIST, + RGW_OP_PUBSUB_TOPIC_GET, + RGW_OP_PUBSUB_TOPIC_DELETE, + RGW_OP_PUBSUB_SUB_CREATE, + RGW_OP_PUBSUB_SUB_GET, + RGW_OP_PUBSUB_SUB_DELETE, + RGW_OP_PUBSUB_SUB_PULL, + RGW_OP_PUBSUB_SUB_ACK, }; class RGWAccessControlPolicy; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 5c926ec9db8..449b33c867e 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -311,7 +311,7 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu return 0; } -int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rgw_pubsub_user_sub_dest& dest) +int RGWUserPubSub::remove_sub(const string& name, const string& _topic) { string topic = _topic; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index c6b8f350cb1..071432ed8e1 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -237,7 +237,7 @@ public: int remove_topic(const string& name); int get_sub(const string& name, rgw_pubsub_user_sub_config *result); int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); - int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); + int remove_sub(const string& name, const string& topic); struct list_events_result { string next_marker; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 4de58cab62d..c434bd37c7a 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -3,6 +3,7 @@ #include "rgw_sync_module.h" #include "rgw_data_sync.h" #include "rgw_sync_module_pubsub.h" +#include "rgw_sync_module_pubsub_rest.h" #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" #include "rgw_cr_tools.h" @@ -1161,15 +1162,10 @@ RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() } RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { -#warning REST filter implementation missing -#if 0 if (dialect != RGW_REST_S3) { return orig; } - delete orig; - return new RGWRESTMgr_MDSearch_S3(); -#endif - return orig; + return new RGWRESTMgr_PubSub_S3(orig); } int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc new file mode 100644 index 00000000000..cb0d4d0c9a9 --- /dev/null +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -0,0 +1,668 @@ +#include "rgw_sync_module_pubsub.h" +#include "rgw_sync_module_pubsub_rest.h" +#include "rgw_pubsub.h" +#include "rgw_op.h" +#include "rgw_rest.h" +#include "rgw_rest_s3.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + + +class RGWPSCreateTopicOp : public RGWOp { +protected: + std::unique_ptr ups; + string topic_name; + string bucket_name; + RGWBucketInfo bucket_info; + +public: + RGWPSCreateTopicOp() {} + + int verify_permission() override { + int ret = get_params(); + if (ret < 0) { + return ret; + } + + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); + + ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, + bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + if (bucket_info.owner != s->owner.get_id()) { + ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + return -EPERM; + } + + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_topic_create"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + virtual int get_params() = 0; +}; + +void RGWPSCreateTopicOp::execute() +{ + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->create_topic(topic_name, bucket_info.bucket); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp { +public: + explicit RGWPSCreateTopic_ObjStore_S3() {} + + int get_params() override { + topic_name = s->object.name; + + bool exists; + bucket_name = s->info.args.get("bucket", &exists); + if (!exists) { + ldout(s->cct, 20) << "ERROR: missing required param 'bucket' for request" << dendl; + return -EINVAL; + } + + return 0; + } +}; + +class RGWPSListTopicsOp : public RGWOp { +protected: + string bucket_name; + RGWBucketInfo bucket_info; + std::unique_ptr ups; + rgw_pubsub_user_topics result; + + +public: + RGWPSListTopicsOp() {} + + int verify_permission() override { + int ret = get_params(); + if (ret < 0) { + return ret; + } + + if (bucket_name.empty()) { + return 0; + } + + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); + + ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, + bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + if (bucket_info.owner != s->owner.get_id()) { + ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + return -EPERM; + } + + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_topics_list"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } + virtual int get_params() = 0; +}; + +void RGWPSListTopicsOp::execute() +{ + ups = make_unique(store, s->owner.get_id()); + if (bucket_name.empty()) { + op_ret = ups->get_topics(&result); + } else { + op_ret = ups->get_bucket_topics(bucket_info.bucket, &result); + } + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl; + return; + } + +} + +class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp { +public: + explicit RGWPSListTopics_ObjStore_S3() {} + + int get_params() override { + bucket_name = s->info.args.get("bucket"); + return 0; + } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + + if (op_ret < 0) { + return; + } + + encode_json("result", result, s->formatter); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWPSGetTopicOp : public RGWOp { +protected: + string topic_name; + std::unique_ptr ups; + rgw_pubsub_user_topic_info result; + +public: + RGWPSGetTopicOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_topic_get"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } + virtual int get_params() = 0; +}; + +void RGWPSGetTopicOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->get_topic(topic_name, &result); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp { +public: + explicit RGWPSGetTopic_ObjStore_S3() {} + + int get_params() override { + topic_name = s->object.name; + return 0; + } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + + if (op_ret < 0) { + return; + } + + encode_json("result", result, s->formatter); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWPSDeleteTopicOp : public RGWOp { +protected: + string topic_name; + std::unique_ptr ups; + +public: + RGWPSDeleteTopicOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_topic_delete"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } + virtual int get_params() = 0; +}; + +void RGWPSDeleteTopicOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->remove_topic(topic_name); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp { +public: + explicit RGWPSDeleteTopic_ObjStore_S3() {} + + int get_params() override { + topic_name = s->object.name; + return 0; + } +}; + +class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 { +protected: + RGWOp *op_get() override { + if (s->init_state.url_bucket.empty()) { + return nullptr; + } + if (s->object.empty()) { + return new RGWPSListTopics_ObjStore_S3(); + } + return new RGWPSGetTopic_ObjStore_S3(); + } + RGWOp *op_put() override { + if (!s->object.empty()) { + return new RGWPSCreateTopic_ObjStore_S3(); + } + return nullptr; + } + RGWOp *op_delete() override { + if (!s->object.empty()) { + return new RGWPSDeleteTopic_ObjStore_S3(); + } + return nullptr; + } +public: + explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_PSTopic_S3() {} +}; + + +class RGWPSCreateSubOp : public RGWOp { +protected: + string sub_name; + string topic_name; + std::unique_ptr ups; + rgw_pubsub_user_sub_dest dest; + +public: + RGWPSCreateSubOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_subscription_create"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + virtual int get_params() = 0; +}; + +void RGWPSCreateSubOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->add_sub(sub_name, topic_name, dest); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp { +public: + explicit RGWPSCreateSub_ObjStore_S3() {} + + int get_params() override { + sub_name = s->object.name; + + bool exists; + + topic_name = s->info.args.get("topic", &exists); + if (!exists) { + ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl; + return -EINVAL; + } + + dest.push_endpoint = s->info.args.get("push-endpoint"); + + return 0; + } +}; + +class RGWPSGetSubOp : public RGWOp { +protected: + string sub_name; + std::unique_ptr ups; + rgw_pubsub_user_sub_config result; + +public: + RGWPSGetSubOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_subscription_get"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } + virtual int get_params() = 0; +}; + +void RGWPSGetSubOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->get_sub(sub_name, &result); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp { +public: + explicit RGWPSGetSub_ObjStore_S3() {} + + int get_params() override { + sub_name = s->object.name; + return 0; + } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + + if (op_ret < 0) { + return; + } + + { + Formatter::ObjectSection section(*s->formatter, "result"); + encode_json("topic", result.topic, s->formatter); + encode_json("push_endpoint", result.dest.push_endpoint, s->formatter); + } + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWPSDeleteSubOp : public RGWOp { +protected: + string sub_name; + string topic_name; + std::unique_ptr ups; + +public: + RGWPSDeleteSubOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_subscription_delete"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } + virtual int get_params() = 0; +}; + +void RGWPSDeleteSubOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->remove_sub(sub_name, topic_name); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp { +public: + explicit RGWPSDeleteSub_ObjStore_S3() {} + + int get_params() override { + sub_name = s->object.name; + topic_name = s->info.args.get("topic"); + return 0; + } +}; + +class RGWPSAckSubEventOp : public RGWOp { +protected: + string sub_name; + string event_id; + std::unique_ptr ups; + +public: + RGWPSAckSubEventOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_subscription_ack"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + virtual int get_params() = 0; +}; + +void RGWPSAckSubEventOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->remove_event(sub_name, event_id); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to remove event, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp { +public: + explicit RGWPSAckSubEvent_ObjStore_S3() {} + + int get_params() override { + sub_name = s->object.name; + + bool exists; + + event_id = s->info.args.get("event-id", &exists); + if (!exists) { + ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl; + return -EINVAL; + } + return 0; + } +}; + +class RGWPSPullSubEventsOp : public RGWOp { +protected: + int max_entries{0}; + string sub_name; + string marker; + std::unique_ptr ups; + RGWUserPubSub::list_events_result result; + +public: + RGWPSPullSubEventsOp() {} + + int verify_permission() override { + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_subscription_pull"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } + virtual int get_params() = 0; +}; + +void RGWPSPullSubEventsOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); + op_ret = ups->list_events(sub_name, marker, max_entries, &result); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp { +public: + explicit RGWPSPullSubEvents_ObjStore_S3() {} + + int get_params() override { + sub_name = s->object.name; + marker = s->info.args.get("marker"); +#define DEFAULT_MAX_ENTRIES 100 + int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES); + if (ret < 0) { + ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl; + return -EINVAL; + } + return 0; + } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + + if (op_ret < 0) { + return; + } + + encode_json("result", result, s->formatter); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 { +protected: + bool supports_quota() override { + return false; + } + RGWOp *op_get() override { + if (s->object.empty()) { + return nullptr; + } + if (s->info.args.exists("events")) { + return new RGWPSPullSubEvents_ObjStore_S3(); + } + return new RGWPSGetSub_ObjStore_S3(); + } + RGWOp *op_put() override { + if (!s->object.empty()) { + return new RGWPSCreateSub_ObjStore_S3(); + } + return nullptr; + } + RGWOp *op_delete() override { + if (!s->object.empty()) { + return new RGWPSDeleteSub_ObjStore_S3(); + } + return nullptr; + } + RGWOp *op_post() override { + if (s->info.args.exists("ack")) { + return new RGWPSAckSubEvent_ObjStore_S3(); + } + return nullptr; + } +public: + explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_PSSub_S3() {} +}; + + +RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) +{ + int ret = + RGWHandler_REST_S3::init_from_header(s, + RGW_FORMAT_JSON, true); + if (ret < 0) { + return nullptr; + } + + RGWHandler_REST *handler = nullptr;; + + if (s->init_state.url_bucket == "topics") { + handler = new RGWHandler_REST_PSTopic_S3(auth_registry); + } + + if (s->init_state.url_bucket == "subscriptions") { + handler = new RGWHandler_REST_PSSub_S3(auth_registry); + } + + ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "") << dendl; + return handler; +} + diff --git a/src/rgw/rgw_sync_module_pubsub_rest.h b/src/rgw/rgw_sync_module_pubsub_rest.h new file mode 100644 index 00000000000..120cb3d3cd9 --- /dev/null +++ b/src/rgw/rgw_sync_module_pubsub_rest.h @@ -0,0 +1,16 @@ +#ifndef CEPH_RGW_SYNC_MODULE_PUBSUB_REST_H +#define CEPH_RGW_SYNC_MODULE_PUBSUB_REST_H + +#include "rgw_rest.h" + +class RGWRESTMgr_PubSub_S3 : public RGWRESTMgr { + RGWRESTMgr *next; +public: + explicit RGWRESTMgr_PubSub_S3(RGWRESTMgr *_next) : next(_next) {} + + RGWHandler_REST *get_handler(struct req_state* s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) override; +}; + +#endif -- 2.39.5