From: Yuval Lifshitz Date: Mon, 21 Nov 2022 11:52:26 +0000 (+0200) Subject: rgw/pubsub: remove pusub subscription functionality X-Git-Tag: v18.1.0~782^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a7963d7f8cd57a1edae9b29c729628c8eb5362e3;p=ceph.git rgw/pubsub: remove pusub subscription functionality Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 796c5a707f677..81cb33c4479b6 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -308,13 +308,9 @@ void usage() cout << " mfa remove delete MFA TOTP token\n"; cout << " mfa check check MFA TOTP token\n"; cout << " mfa resync re-sync MFA TOTP token\n"; - cout << " topic list list bucket notifications/pubsub topics\n"; - cout << " topic get get a bucket notifications/pubsub topic\n"; - cout << " topic rm remove a bucket notifications/pubsub topic\n"; - cout << " subscription get get a pubsub subscription definition\n"; - cout << " subscription rm remove a pubsub subscription\n"; - cout << " subscription pull show events in a pubsub subscription\n"; - cout << " subscription ack ack (remove) an events in a pubsub subscription\n"; + cout << " topic list list bucket notifications topics\n"; + cout << " topic get get a bucket notifications topic\n"; + cout << " topic rm remove a bucket notifications topic\n"; cout << " script put upload a lua script to a context\n"; cout << " script get get the lua script of a context\n"; cout << " script rm remove the lua scripts of a context\n"; @@ -480,10 +476,8 @@ void usage() cout << " --totp-seconds the time resolution that is being used for TOTP generation\n"; cout << " --totp-window the number of TOTP tokens that are checked before and after the current token when validating token\n"; cout << " --totp-pin the valid value of a TOTP token at a certain time\n"; - cout << "\nBucket notifications/pubsub options:\n"; - cout << " --topic bucket notifications/pubsub topic name\n"; - cout << " --subscription pubsub subscription name\n"; - cout << " --event-id event id in a pubsub subscription\n"; + cout << "\nBucket notifications options:\n"; + cout << " --topic bucket notifications topic name\n"; cout << "\nScript options:\n"; cout << " --context context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n"; cout << " --package name of the lua package that should be added/removed to/from the allowlist\n"; @@ -831,13 +825,8 @@ enum class OPT { RESHARD_STALE_INSTANCES_LIST, RESHARD_STALE_INSTANCES_DELETE, PUBSUB_TOPICS_LIST, - // TODO add "subscription list" command PUBSUB_TOPIC_GET, PUBSUB_TOPIC_RM, - PUBSUB_SUB_GET, - PUBSUB_SUB_RM, - PUBSUB_SUB_PULL, - PUBSUB_EVENT_RM, SCRIPT_PUT, SCRIPT_GET, SCRIPT_RM, @@ -1068,10 +1057,6 @@ static SimpleCmd::Commands all_cmds = { { "topic list", OPT::PUBSUB_TOPICS_LIST }, { "topic get", OPT::PUBSUB_TOPIC_GET }, { "topic rm", OPT::PUBSUB_TOPIC_RM }, - { "subscription get", OPT::PUBSUB_SUB_GET }, - { "subscription rm", OPT::PUBSUB_SUB_RM }, - { "subscription pull", OPT::PUBSUB_SUB_PULL }, - { "subscription ack", OPT::PUBSUB_EVENT_RM }, { "script put", OPT::SCRIPT_PUT }, { "script get", OPT::SCRIPT_GET }, { "script rm", OPT::SCRIPT_RM }, @@ -4335,8 +4320,6 @@ int main(int argc, const char **argv) OPT::RESHARD_STATUS, OPT::PUBSUB_TOPICS_LIST, OPT::PUBSUB_TOPIC_GET, - OPT::PUBSUB_SUB_GET, - OPT::PUBSUB_SUB_PULL, OPT::SCRIPT_GET, }; @@ -10497,99 +10480,6 @@ next: } } - if (opt_cmd == OPT::PUBSUB_SUB_GET) { - if (store->get_zone()->get_tier_type() != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; - return EINVAL; - } - - RGWPubSub ps(static_cast(store), tenant); - - rgw_pubsub_sub_config sub_conf; - - auto sub = ps.get_sub(sub_name); - ret = sub->get_conf(&sub_conf); - if (ret < 0) { - cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - encode_json("sub", sub_conf, formatter.get()); - formatter->flush(cout); - } - - if (opt_cmd == OPT::PUBSUB_SUB_RM) { - if (store->get_zone()->get_tier_type() != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; - return EINVAL; - } - - RGWPubSub ps(static_cast(store), tenant); - - auto sub = ps.get_sub(sub_name); - ret = sub->unsubscribe(dpp(), topic_name, null_yield); - if (ret < 0) { - cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - - if (opt_cmd == OPT::PUBSUB_SUB_PULL) { - if (store->get_zone()->get_tier_type() != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; - return EINVAL; - } - - RGWPubSub ps(static_cast(store), tenant); - - if (!max_entries_specified) { - max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS; - } - auto sub = ps.get_sub_with_events(sub_name); - ret = sub->list_events(dpp(), marker, max_entries); - if (ret < 0) { - cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - encode_json("result", *sub, formatter.get()); - formatter->flush(cout); - } - - if (opt_cmd == OPT::PUBSUB_EVENT_RM) { - if (store->get_zone()->get_tier_type() != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; - return EINVAL; - } - if (event_id.empty()) { - cerr << "ERROR: event id was not provided (via --event-id)" << std::endl; - return EINVAL; - } - - RGWPubSub ps(static_cast(store), tenant); - - auto sub = ps.get_sub_with_events(sub_name); - ret = sub->remove_event(dpp(), event_id); - if (ret < 0) { - cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - if (opt_cmd == OPT::SCRIPT_PUT) { if (!str_script_ctx) { cerr << "ERROR: context was not provided (via --context)" << std::endl; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 012648ac3274a..bf0e2d46b4c24 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -296,15 +296,6 @@ void rgw_pubsub_s3_event::dump(Formatter *f) const { encode_json("opaqueData", opaque_data, f); } -void rgw_pubsub_event::dump(Formatter *f) const -{ - encode_json("id", id, f); - encode_json("event", event_name, f); - utime_t ut(timestamp); - encode_json("timestamp", ut, f); - encode_json("info", info, f); -} - void rgw_pubsub_topic::dump(Formatter *f) const { encode_json("user", user, f); @@ -718,247 +709,6 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, o return 0; } -int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) -{ - int ret = ps->read(sub_meta_obj, result, objv_tracker); - if (ret < 0 && ret != -ENOENT) { - ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; - return ret; - } - return 0; -} - -int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp, - const rgw_pubsub_sub_config& sub_conf, - RGWObjVersionTracker *objv_tracker, - optional_yield y) -{ - int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker, - optional_yield y) -{ - int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) -{ - return read_sub(result, nullptr); -} - -int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id) -{ - RGWObjVersionTracker objv_tracker; - rgw_pubsub_topics topics; - - int ret = ps->read_topics(&topics, &objv_tracker); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; - return ret != -ENOENT ? ret : -EINVAL; - } - - auto iter = topics.topics.find(topic); - if (iter == topics.topics.end()) { - ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; - return -EINVAL; - } - - auto& t = iter->second; - - rgw_pubsub_sub_config sub_conf; - - sub_conf.user = rgw_user("", ps->tenant); - sub_conf.name = sub; - sub_conf.topic = topic; - sub_conf.dest = dest; - sub_conf.s3_id = s3_id; - - t.subs.insert(sub); - - ret = ps->write_topics(dpp, topics, &objv_tracker, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; - return ret; - } - - ret = write_sub(dpp, sub_conf, nullptr, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; - return ret; - } - return 0; -} - -int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y) -{ - string topic = _topic; - RGWObjVersionTracker sobjv_tracker; - - if (topic.empty()) { - rgw_pubsub_sub_config sub_conf; - int ret = read_sub(&sub_conf, &sobjv_tracker); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; - return ret; - } - topic = sub_conf.topic; - } - - RGWObjVersionTracker objv_tracker; - rgw_pubsub_topics topics; - - int ret = ps->read_topics(&topics, &objv_tracker); - if (ret < 0) { - // not an error - could be that topic was already deleted - ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; - } else { - auto iter = topics.topics.find(topic); - if (iter != topics.topics.end()) { - auto& t = iter->second; - - t.subs.erase(sub); - - ret = ps->write_topics(dpp, topics, &objv_tracker, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; - return ret; - } - } - } - - ret = remove_sub(dpp, &sobjv_tracker, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; - return ret; - } - return 0; -} - -template -void RGWPubSub::SubWithEvents::list_events_result::dump(Formatter *f) const -{ - encode_json("next_marker", next_marker, f); - encode_json("is_truncated", is_truncated, f); - - Formatter::ArraySection s(*f, EventType::json_type_plural); - for (auto& event : events) { - encode_json("", event, f); - } -} - -template -int RGWPubSub::SubWithEvents::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) -{ - RGWRados *store = ps->store->getRados(); - rgw_pubsub_sub_config sub_conf; - int ret = get_conf(&sub_conf); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; - return ret; - } - - RGWBucketInfo bucket_info; - string tenant; - ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); - if (ret == -ENOENT) { - list.is_truncated = false; - return 0; - } - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; - return ret; - } - - RGWRados::Bucket target(store, bucket_info); - RGWRados::Bucket::List list_op(&target); - - list_op.params.prefix = sub_conf.dest.oid_prefix; - list_op.params.marker = marker; - - std::vector objs; - - ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; - return ret; - } - if (list.is_truncated) { - list.next_marker = list_op.get_next_marker().name; - } - - for (auto& obj : objs) { - bufferlist bl64; - bufferlist bl; - bl64.append(obj.meta.user_data); - try { - bl.decode_base64(bl64); - } catch (buffer::error& err) { - ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl; - continue; - } - EventType event; - - auto iter = bl.cbegin(); - try { - decode(event, iter); - } catch (buffer::error& err) { - ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl; - continue; - }; - - list.events.push_back(event); - } - return 0; -} - -template -int RGWPubSub::SubWithEvents::remove_event(const DoutPrefixProvider *dpp, const string& event_id) -{ - rgw::sal::RadosStore* store = ps->store; - rgw_pubsub_sub_config sub_conf; - int ret = get_conf(&sub_conf); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; - return ret; - } - - string tenant; - std::unique_ptr bucket; - ret = store->get_bucket(dpp, nullptr, tenant, sub_conf.dest.bucket_name, - &bucket, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; - return ret; - } - std::unique_ptr obj = bucket->get_object( - sub_conf.dest.oid_prefix + event_id); - obj->set_atomic(); - - std::unique_ptr del_op = obj->get_delete_op(); - - del_op->params.bucket_owner = bucket->get_info().owner; - del_op->params.versioning_status = bucket->get_info().versioning_status(); - - ret = del_op->delete_obj(dpp, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; - } - return 0; -} - void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const { *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid()); } @@ -971,13 +721,3 @@ void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name)); } -template -void RGWPubSub::SubWithEvents::dump(Formatter* f) const { - list.dump(f); -} - -// explicit instantiation for the only two possible types -// no need to move implementation to header -template class RGWPubSub::SubWithEvents; -template class RGWPubSub::SubWithEvents; - diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 6335a5cb67aef..c3b0b4d07750d 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -332,38 +332,6 @@ struct rgw_pubsub_s3_event { }; WRITE_CLASS_ENCODER(rgw_pubsub_s3_event) -struct rgw_pubsub_event { - constexpr static const char* const json_type_plural = "events"; - std::string id; - std::string event_name; - std::string source; - ceph::real_time timestamp; - JSONFormattable info; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(id, bl); - encode(event_name, bl); - encode(source, bl); - encode(timestamp, bl); - encode(info, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(id, bl); - decode(event_name, bl); - decode(source, bl); - decode(timestamp, bl); - decode(info, bl); - DECODE_FINISH(bl); - } - - void dump(Formatter *f) const; -}; -WRITE_CLASS_ENCODER(rgw_pubsub_event) - // setting a unique ID for an event based on object hash and timestamp void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); @@ -675,81 +643,12 @@ public: int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y); }; - // base class for subscription - class Sub { - friend class RGWPubSub; - protected: - RGWPubSub* const ps; - const std::string sub; - rgw_raw_obj sub_meta_obj; - - int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker); - int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf, - RGWObjVersionTracker* objv_tracker, optional_yield y); - int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y); - public: - Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) { - ps->get_sub_meta_obj(sub, &sub_meta_obj); - } - - virtual ~Sub() = default; - - int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y, - const std::string& s3_id=""); - int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y); - int get_conf(rgw_pubsub_sub_config* result); - - static const int DEFAULT_MAX_EVENTS = 100; - // followint virtual methods should only be called in derived - virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);} - virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);} - virtual void dump(Formatter* f) const {ceph_assert(false);} - }; - - // subscription with templated list of events to support both S3 compliant and Ceph specific events - template - class SubWithEvents : public Sub { - private: - struct list_events_result { - std::string next_marker; - bool is_truncated{false}; - void dump(Formatter *f) const; - std::vector events; - } list; - - public: - SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {} - - virtual ~SubWithEvents() = default; - - int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override; - int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override; - void dump(Formatter* f) const override; - }; - using BucketRef = std::shared_ptr; - using SubRef = std::shared_ptr; BucketRef get_bucket(const rgw_bucket& bucket) { return std::make_shared(this, bucket); } - SubRef get_sub(const std::string& sub) { - return std::make_shared(this, sub); - } - - SubRef get_sub_with_events(const std::string& sub) { - auto tmpsub = Sub(this, sub); - rgw_pubsub_sub_config conf; - if (tmpsub.get_conf(&conf) < 0) { - return nullptr; - } - if (conf.s3_id.empty()) { - return std::make_shared>(this, sub); - } - return std::make_shared>(this, sub); - } - void get_meta_obj(rgw_raw_obj *obj) const; void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 79fa736e92504..2f734c21df838 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -63,53 +63,6 @@ private: static const ack_level_t ACK_LEVEL_ANY = 0; static const ack_level_t ACK_LEVEL_NON_ERROR = 1; - // PostCR implements async execution of RGWPostHTTPData via coroutine - class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine { - private: - RGWDataSyncEnv* const sync_env; - bufferlist read_bl; - const ack_level_t ack_level; - - public: - PostCR(const std::string& _post_data, - RGWDataSyncEnv* _sync_env, - const std::string& endpoint, - ack_level_t _ack_level, - bool verify_ssl) : - RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl), - RGWSimpleCoroutine(_sync_env->cct), - sync_env(_sync_env), - ack_level (_ack_level) { - // ctor also set the data to send - set_post_data(_post_data); - set_send_length(_post_data.length()); - } - - // send message to endpoint - int send_request(const DoutPrefixProvider *dpp) override { - init_new_io(this); - const auto rc = sync_env->http_manager->add_request(this); - if (rc < 0) { - return rc; - } - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); - return 0; - } - - // wait for reply - int request_complete() override { - if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); - if (ack_level == ACK_LEVEL_ANY) { - return 0; - } else if (ack_level == ACK_LEVEL_NON_ERROR) { - // TODO check result code to be non-error - } else { - // TODO: check that result code == ack_level - } - return -1; - } - }; - public: RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) @@ -129,14 +82,6 @@ public: } } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { - return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); - } - - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { - return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); - } - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { bufferlist read_bl; RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); @@ -226,95 +171,6 @@ private: } throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); } - - // NoAckPublishCR implements async amqp publishing via coroutine - // This coroutine ends when it send the message and does not wait for an ack - class NoAckPublishCR : public RGWCoroutine { - private: - const std::string topic; - amqp::connection_ptr_t conn; - const std::string message; - - public: - NoAckPublishCR(CephContext* cct, - const std::string& _topic, - amqp::connection_ptr_t& _conn, - const std::string& _message) : - RGWCoroutine(cct), - topic(_topic), conn(_conn), message(_message) {} - - // send message to endpoint, without waiting for reply - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - const auto rc = amqp::publish(conn, topic, message); - if (rc < 0) { - return set_cr_error(rc); - } - return set_cr_done(); - } - return 0; - } - }; - - // AckPublishCR implements async amqp publishing via coroutine - // This coroutine ends when an ack is received from the borker - // note that it does not wait for an ack fron the end client - class AckPublishCR : public RGWCoroutine, public RGWIOProvider { - private: - const std::string topic; - amqp::connection_ptr_t conn; - const std::string message; - - public: - AckPublishCR(CephContext* cct, - const std::string& _topic, - amqp::connection_ptr_t& _conn, - const std::string& _message) : - RGWCoroutine(cct), - topic(_topic), conn(_conn), message(_message) {} - - // send message to endpoint, waiting for reply - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - yield { - init_new_io(this); - const auto rc = amqp::publish_with_confirm(conn, - topic, - message, - std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); - if (rc < 0) { - // failed to publish, does not wait for reply - return set_cr_error(rc); - } - // mark as blocked on the amqp answer - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); - io_block(); - return 0; - } - return set_cr_done(); - } - return 0; - } - - // callback invoked from the amqp manager thread when ack/nack is received - void request_complete(int status) { - ceph_assert(!is_done()); - if (status != 0) { - // server replied with a nack - set_cr_error(status); - } - io_complete(); - if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); - } - - // TODO: why are these mandatory in RGWIOProvider? - void set_io_user_info(void *_user_info) override { - } - - void *get_io_user_info() override { - return nullptr; - } - }; public: RGWPubSubAMQPEndpoint(const std::string& _endpoint, @@ -332,24 +188,6 @@ public: } } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { - ceph_assert(conn); - if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } - } - - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { - ceph_assert(conn); - if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } - } - // this allows waiting untill "finish()" is called from a different thread // waiting could be blocking the waiting thread or yielding, depending // with compilation flag support and whether the optional_yield is set @@ -438,7 +276,6 @@ static const std::string AMQP_1_0("1-0"); static const std::string AMQP_SCHEMA("amqp"); #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT - #ifdef WITH_RADOSGW_KAFKA_ENDPOINT class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { private: @@ -465,95 +302,6 @@ private: throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); } - // NoAckPublishCR implements async kafka publishing via coroutine - // This coroutine ends when it send the message and does not wait for an ack - class NoAckPublishCR : public RGWCoroutine { - private: - const std::string topic; - kafka::connection_ptr_t conn; - const std::string message; - - public: - NoAckPublishCR(CephContext* cct, - const std::string& _topic, - kafka::connection_ptr_t& _conn, - const std::string& _message) : - RGWCoroutine(cct), - topic(_topic), conn(_conn), message(_message) {} - - // send message to endpoint, without waiting for reply - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - const auto rc = kafka::publish(conn, topic, message); - if (rc < 0) { - return set_cr_error(rc); - } - return set_cr_done(); - } - return 0; - } - }; - - // AckPublishCR implements async kafka publishing via coroutine - // This coroutine ends when an ack is received from the borker - // note that it does not wait for an ack fron the end client - class AckPublishCR : public RGWCoroutine, public RGWIOProvider { - private: - const std::string topic; - kafka::connection_ptr_t conn; - const std::string message; - - public: - AckPublishCR(CephContext* cct, - const std::string& _topic, - kafka::connection_ptr_t& _conn, - const std::string& _message) : - RGWCoroutine(cct), - topic(_topic), conn(_conn), message(_message) {} - - // send message to endpoint, waiting for reply - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - yield { - init_new_io(this); - const auto rc = kafka::publish_with_confirm(conn, - topic, - message, - std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); - if (rc < 0) { - // failed to publish, does not wait for reply - return set_cr_error(rc); - } - // mark as blocked on the kafka answer - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); - io_block(); - return 0; - } - return set_cr_done(); - } - return 0; - } - - // callback invoked from the kafka manager thread when ack/nack is received - void request_complete(int status) { - ceph_assert(!is_done()); - if (status != 0) { - // server replied with a nack - set_cr_error(status); - } - io_complete(); - if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); - } - - // TODO: why are these mandatory in RGWIOProvider? - void set_io_user_info(void *_user_info) override { - } - - void *get_io_user_info() override { - return nullptr; - } - }; - public: RGWPubSubKafkaEndpoint(const std::string& _endpoint, const std::string& _topic, @@ -568,24 +316,6 @@ public: } } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { - ceph_assert(conn); - if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } - } - - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { - ceph_assert(conn); - if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); - } - } - // this allows waiting untill "finish()" is called from a different thread // waiting could be blocking the waiting thread or yielding, depending // with compilation flag support and whether the optional_yield is set diff --git a/src/rgw/rgw_pubsub_push.h b/src/rgw/rgw_pubsub_push.h index 6cb3db74b332a..17905937c035a 100644 --- a/src/rgw/rgw_pubsub_push.h +++ b/src/rgw/rgw_pubsub_push.h @@ -11,9 +11,7 @@ // TODO the env should be used as a template parameter to differentiate the source that triggers the pushes class RGWDataSyncEnv; -class RGWCoroutine; class RGWHTTPArgs; -struct rgw_pubsub_event; struct rgw_pubsub_s3_event; // endpoint base class all endpoint - types should derive from it @@ -31,14 +29,6 @@ public: // may throw a configuration_error if creation fails static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr); - // this method is used in order to send notification (Ceph specific) and wait for completion - // in async manner via a coroutine when invoked in the data sync environment - virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0; - - // this method is used in order to send notification (S3 compliant) and wait for completion - // in async manner via a coroutine when invoked in the data sync environment - virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0; - // this method is used in order to send notification (S3 compliant) and wait for completion // in async manner via a coroutine when invoked in the frontend environment virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0; diff --git a/src/rgw/store/rados/rgw_rest_pubsub.cc b/src/rgw/store/rados/rgw_rest_pubsub.cc index 239f509821052..5b216330bd415 100644 --- a/src/rgw/store/rados/rgw_rest_pubsub.cc +++ b/src/rgw/store/rados/rgw_rest_pubsub.cc @@ -64,9 +64,6 @@ public: } // dest object only stores endpoint info - // bucket to store events/records will be set only when subscription is created - dest.bucket_name = ""; - dest.oid_prefix = ""; dest.arn_topic = topic_name; // the topic ARN will be sent in the reply const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, @@ -437,28 +434,7 @@ int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::strin int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) { // delete all notifications of on a bucket for (const auto& topic : bucket_topics.topics) { - // remove the auto generated subscription of the topic (if exist) - rgw_pubsub_topic_subs topic_subs; - int op_ret = ps.get_topic(topic.first, &topic_subs); - for (const auto& topic_sub_name : topic_subs.subs) { - auto sub = ps.get_sub(topic_sub_name); - rgw_pubsub_sub_config sub_conf; - op_ret = sub->get_conf(&sub_conf); - if (op_ret < 0) { - ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl; - return op_ret; - } - if (!sub_conf.s3_id.empty()) { - // S3 notification, has autogenerated subscription - const auto& sub_topic_name = sub_conf.topic; - op_ret = sub->unsubscribe(dpp, sub_topic_name, y); - if (op_ret < 0) { - ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl; - return op_ret; - } - } - } - op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps); + const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps); if (op_ret < 0) { return op_ret; } @@ -542,10 +518,6 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) { auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); - std::string data_bucket_prefix = ""; - std::string data_oid_prefix = ""; - bool push_only = true; - if(configurations.list.empty()) { // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; @@ -622,24 +594,6 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) { return; } ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl; - - if (!push_only) { - // generate the subscription with destination information from the original topic - rgw_pubsub_sub_dest dest = topic_info.dest; - dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name; - dest.oid_prefix = data_oid_prefix + notif_name + "/"; - auto sub = ps->get_sub(notif_name); - op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl; - // rollback generated notification (ignore return value) - b->remove_notification(this, unique_topic_name, y); - // rollback generated topic (ignore return value) - ps->remove_topic(this, unique_topic_name, y); - return; - } - ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl; - } } } @@ -690,14 +644,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) { // delete a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notif_name); if (unique_topic) { - // remove the auto generated subscription according to notification name (if exist) const auto unique_topic_name = unique_topic->get().topic.name; - auto sub = ps->get_sub(notif_name); - op_ret = sub->unsubscribe(this, unique_topic_name, y); - if (op_ret < 0 && op_ret != -ENOENT) { - ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl; - return; - } op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps); return; } diff --git a/src/rgw/store/rados/rgw_rest_pubsub_common.cc b/src/rgw/store/rados/rgw_rest_pubsub_common.cc index 15455ec3ab268..6410e5b341eb7 100644 --- a/src/rgw/store/rados/rgw_rest_pubsub_common.cc +++ b/src/rgw/store/rados/rgw_rest_pubsub_common.cc @@ -41,10 +41,6 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext return true; } -bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) { - return sub.dest.stored_secret; -} - bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) { return topic.topic.dest.stored_secret; } @@ -121,92 +117,6 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl; } -void RGWPSCreateSubOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ps.emplace(static_cast(store), s->owner.get_id().tenant); - auto sub = ps->get_sub(sub_name); - op_ret = sub->subscribe(this, topic_name, dest, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl; - return; - } - ldpp_dout(this, 20) << "successfully created subscription '" << sub_name << "'" << dendl; -} - -void RGWPSGetSubOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ps.emplace(static_cast(store), s->owner.get_id().tenant); - auto sub = ps->get_sub(sub_name); - op_ret = sub->get_conf(&result); - if (subscription_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) { - ldpp_dout(this, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl; - op_ret = -EPERM; - return; - } - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl; - return; - } - ldpp_dout(this, 20) << "successfully got subscription '" << sub_name << "'" << dendl; -} - -void RGWPSDeleteSubOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ps.emplace(static_cast(store), s->owner.get_id().tenant); - auto sub = ps->get_sub(sub_name); - op_ret = sub->unsubscribe(this, topic_name, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl; - return; - } - ldpp_dout(this, 20) << "successfully removed subscription '" << sub_name << "'" << dendl; -} - -void RGWPSAckSubEventOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ps.emplace(static_cast(store), s->owner.get_id().tenant); - auto sub = ps->get_sub_with_events(sub_name); - op_ret = sub->remove_event(s, event_id); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl; - return; - } - ldpp_dout(this, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl; -} - -void RGWPSPullSubEventsOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ps.emplace(static_cast(store), s->owner.get_id().tenant); - sub = ps->get_sub_with_events(sub_name); - if (!sub) { - op_ret = -ENOENT; - ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl; - return; - } - op_ret = sub->list_events(s, marker, max_entries); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl; - return; - } - ldpp_dout(this, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl; -} - - int RGWPSCreateNotifOp::verify_permission(optional_yield y) { int ret = get_params(); if (ret < 0) { diff --git a/src/rgw/store/rados/rgw_rest_pubsub_common.h b/src/rgw/store/rados/rgw_rest_pubsub_common.h index a702cf3420414..fe3126c8377c8 100644 --- a/src/rgw/store/rados/rgw_rest_pubsub_common.h +++ b/src/rgw/store/rados/rgw_rest_pubsub_common.h @@ -101,130 +101,6 @@ public: uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } }; -// create a subscription -class RGWPSCreateSubOp : public RGWDefaultResponseOp { -protected: - std::string sub_name; - std::string topic_name; - std::optional ps; - rgw_pubsub_sub_dest dest; - - virtual int get_params() = 0; - -public: - int verify_permission(optional_yield) override { - return 0; - } - void pre_exec() override { - rgw_bucket_object_pre_exec(s); - } - void execute(optional_yield y) override; - - const char* name() const override { return "pubsub_subscription_create"; } - RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; } - uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } -}; - -// get subscription information (including push-endpoint if exist) -class RGWPSGetSubOp : public RGWOp { -protected: - std::string sub_name; - std::optional ps; - rgw_pubsub_sub_config result; - - virtual int get_params() = 0; - -public: - int verify_permission(optional_yield) override { - return 0; - } - void pre_exec() override { - rgw_bucket_object_pre_exec(s); - } - void execute(optional_yield y) override; - - const char* name() const override { return "pubsub_subscription_get"; } - RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; } - uint32_t op_mask() override { return RGW_OP_TYPE_READ; } -}; - -// delete subscription -class RGWPSDeleteSubOp : public RGWDefaultResponseOp { -protected: - std::string sub_name; - std::string topic_name; - std::optional ps; - - virtual int get_params() = 0; - -public: - int verify_permission(optional_yield) override { - return 0; - } - void pre_exec() override { - rgw_bucket_object_pre_exec(s); - } - void execute(optional_yield y) override; - - const char* name() const override { return "pubsub_subscription_delete"; } - RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; } - uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } -}; - -// acking of an event -class RGWPSAckSubEventOp : public RGWDefaultResponseOp { -protected: - std::string sub_name; - std::string event_id; - std::optional ps; - - virtual int get_params() = 0; - -public: - RGWPSAckSubEventOp() {} - - int verify_permission(optional_yield) override { - return 0; - } - void pre_exec() override { - rgw_bucket_object_pre_exec(s); - } - void execute(optional_yield y) override; - - const char* name() const override { return "pubsub_subscription_ack"; } - RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; } - uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } -}; - -// fetching events from a subscription -// dpending on whether the subscription was created via s3 compliant API or not -// the matching events will be returned -class RGWPSPullSubEventsOp : public RGWOp { -protected: - int max_entries{0}; - std::string sub_name; - std::string marker; - std::optional ps; - RGWPubSub::SubRef sub; - - virtual int get_params() = 0; - -public: - RGWPSPullSubEventsOp() {} - - int verify_permission(optional_yield) override { - return 0; - } - void pre_exec() override { - rgw_bucket_object_pre_exec(s); - } - void execute(optional_yield y) override; - - const char* name() const override { return "pubsub_subscription_pull"; } - RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; } - uint32_t op_mask() override { return RGW_OP_TYPE_READ; } -}; - // notification creation class RGWPSCreateNotifOp : public RGWDefaultResponseOp { protected: diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index bdff2abc4c2e8..34953ad205456 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -178,13 +178,9 @@ mfa remove delete MFA TOTP token mfa check check MFA TOTP token mfa resync re-sync MFA TOTP token - topic list list bucket notifications/pubsub topics - topic get get a bucket notifications/pubsub topic - topic rm remove a bucket notifications/pubsub topic - subscription get get a pubsub subscription definition - subscription rm remove a pubsub subscription - subscription pull show events in a pubsub subscription - subscription ack ack (remove) an events in a pubsub subscription + topic list list bucket notifications topics + topic get get a bucket notifications topic + topic rm remove a bucket notifications topic script put upload a lua script to a context script get get the lua script of a context script rm remove the lua scripts of a context @@ -357,10 +353,8 @@ --totp-window the number of TOTP tokens that are checked before and after the current token when validating token --totp-pin the valid value of a TOTP token at a certain time - Bucket notifications/pubsub options: - --topic bucket notifications/pubsub topic name - --subscription pubsub subscription name - --event-id event id in a pubsub subscription + Bucket notifications options: + --topic bucket notifications topic name Script options: --context context in which the script runs. one of: prerequest, postrequest, background, getdata, putdata