From a3b050e24d77685ce210a086cd7f39229672d42e Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Fri, 24 Feb 2023 15:52:12 +0200 Subject: [PATCH] rgw/notifications: add bucket notification configuration to zipper bucket notification configuration is currently stored in system objects. adding API to be able to store it in any sotre. note that other aspects of sending bucket notifications via the cls_2pc_queu are still only supported in RADOS. also making sure that all APIs use yielding correctly. Signed-off-by: Yuval Lifshitz --- src/rgw/CMakeLists.txt | 4 +- src/rgw/driver/rados/rgw_notify.cc | 4 +- src/rgw/driver/rados/rgw_sal_rados.cc | 100 +++++++++++++++- src/rgw/driver/rados/rgw_sal_rados.h | 14 +++ src/rgw/rgw_admin.cc | 14 +-- src/rgw/{driver/rados => }/rgw_pubsub.cc | 107 ++++++------------ src/rgw/{driver/rados => }/rgw_pubsub.h | 99 ++++------------ src/rgw/{driver/rados => }/rgw_rest_pubsub.cc | 61 +++++----- src/rgw/rgw_sal.h | 22 +++- src/rgw/rgw_sal_filter.h | 26 +++++ src/rgw/rgw_sal_store.h | 14 +++ 11 files changed, 268 insertions(+), 197 deletions(-) rename src/rgw/{driver/rados => }/rgw_pubsub.cc (86%) rename src/rgw/{driver/rados => }/rgw_pubsub.h (86%) rename src/rgw/{driver/rados => }/rgw_rest_pubsub.cc (94%) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index b02cdd0e6c1..9d155a1728f 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -94,6 +94,7 @@ set(librgw_common_srcs rgw_notify_event_type.cc rgw_period_history.cc rgw_period_puller.cc + rgw_pubsub.cc rgw_coroutine.cc rgw_cr_rest.cc rgw_op.cc @@ -110,6 +111,7 @@ set(librgw_common_srcs rgw_rest_ratelimit.cc rgw_rest_role.cc rgw_rest_s3.cc + rgw_rest_pubsub.cc rgw_s3select.cc rgw_role.cc rgw_sal.cc @@ -165,14 +167,12 @@ set(librgw_common_srcs driver/rados/rgw_object_expirer_core.cc driver/rados/rgw_otp.cc driver/rados/rgw_period.cc - driver/rados/rgw_pubsub.cc driver/rados/rgw_pubsub_push.cc driver/rados/rgw_putobj_processor.cc driver/rados/rgw_rados.cc driver/rados/rgw_reshard.cc driver/rados/rgw_rest_bucket.cc driver/rados/rgw_rest_log.cc - driver/rados/rgw_rest_pubsub.cc driver/rados/rgw_rest_realm.cc driver/rados/rgw_rest_user.cc driver/rados/rgw_sal_rados.cc diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index bfe1f941360..c20768a85ef 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -778,9 +778,9 @@ static inline bool notification_match(reservation_t& res, const RGWObjTags* req_tags) { const RGWPubSub ps(res.store, res.user_tenant); - const RGWPubSub::Bucket ps_bucket(ps, res.bucket->get_key()); + const RGWPubSub::Bucket ps_bucket(ps, res.bucket); rgw_pubsub_bucket_topics bucket_topics; - auto rc = ps_bucket.get_topics(&bucket_topics); + auto rc = ps_bucket.get_topics(res.dpp, &bucket_topics, res.yield); if (rc < 0) { // failed to fetch bucket topics return rc; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 949125b7df8..0deff6a32be 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -74,6 +74,7 @@ namespace rgw::sal { // default number of entries to list with each bucket listing call // (use marker to bridge between calls) static constexpr size_t listing_max_entries = 1000; +static std::string pubsub_oid_prefix = "pubsub."; static int decode_policy(CephContext* cct, bufferlist& bl, @@ -471,7 +472,7 @@ int RadosBucket::remove_bucket(const DoutPrefixProvider* dpp, // if bucket has notification definitions associated with it // they should be removed (note that any pending notifications on the bucket are still going to be sent) const RGWPubSub ps(store, info.owner.tenant); - const RGWPubSub::Bucket ps_bucket(ps, info.bucket); + const RGWPubSub::Bucket ps_bucket(ps, this); const auto ps_ret = ps_bucket.remove_notifications(dpp, y); if (ps_ret < 0 && ps_ret != -ENOENT) { ldpp_dout(dpp, -1) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl; @@ -1024,6 +1025,55 @@ int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp, return 0; } +std::string RadosBucket::topics_oid() const { + return pubsub_oid_prefix + get_tenant() + ".bucket." + get_name() + "/" + get_marker(); +} + +int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications, + RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) +{ + bufferlist bl; + const int ret = rgw_get_system_obj(store->svc()->sysobj, + store->svc()->zone->get_zone_params().log_pool, + topics_oid(), + bl, + objv_tracker, + nullptr, y, dpp, nullptr); + if (ret < 0) { + return ret; + } + + auto iter = bl.cbegin(); + try { + decode(notifications, iter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 20) << " failed to decode bucket notifications from oid: " << topics_oid() << ". for bucket: " + << get_name() << ". error: " << err.what() << dendl; + return -EIO; + } + + return 0; +} + +int RadosBucket::write_topics(const rgw_pubsub_bucket_topics& notifications, + RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) { + bufferlist bl; + encode(notifications, bl); + + return rgw_put_system_obj(dpp, store->svc()->sysobj, + store->svc()->zone->get_zone_params().log_pool, + topics_oid(), + bl, false, objv_tracker, real_time(), y); +} + +int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) { + return rgw_delete_system_obj(dpp, store->svc()->sysobj, + store->svc()->zone->get_zone_params().log_pool, + topics_oid(), + objv_tracker, y); +} + std::unique_ptr RadosStore::get_user(const rgw_user &u) { return std::make_unique(this, u); @@ -1283,6 +1333,54 @@ std::unique_ptr RadosStore::get_notification(const DoutPrefixProvi return std::make_unique(dpp, this, obj, src_obj, event_type, _bucket, _user_id, _user_tenant, _req_id, y); } +std::string RadosStore::topics_oid(const std::string& tenant) const { + return pubsub_oid_prefix + tenant; +} + +int RadosStore::read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) { + bufferlist bl; + const int ret = rgw_get_system_obj(svc()->sysobj, + svc()->zone->get_zone_params().log_pool, + topics_oid(tenant), + bl, + objv_tracker, + nullptr, y, dpp, nullptr); + if (ret < 0) { + return ret; + } + + auto iter = bl.cbegin(); + try { + decode(topics, iter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 20) << " failed to decode topics from oid: " << topics_oid(tenant) << + ". error: " << err.what() << dendl; + return -EIO; + } + + return 0; +} + +int RadosStore::write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) { + bufferlist bl; + encode(topics, bl); + + return rgw_put_system_obj(dpp, svc()->sysobj, + svc()->zone->get_zone_params().log_pool, + topics_oid(tenant), + bl, false, objv_tracker, real_time(), y); +} + +int RadosStore::remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) { + return rgw_delete_system_obj(dpp, svc()->sysobj, + svc()->zone->get_zone_params().log_pool, + topics_oid(tenant), + objv_tracker, y); +} + int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) { return rados->delete_raw_obj(dpp, obj); diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 15d8022710b..5835bf01464 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -125,6 +125,7 @@ class RadosStore : public StoreDriver { RGWRados* rados; RGWUserCtl* user_ctl; std::unique_ptr zone; + std::string topics_oid(const std::string& tenant) const; public: RadosStore() @@ -168,6 +169,12 @@ class RadosStore : public StoreDriver { const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override; + int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; + int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; + int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } @@ -503,6 +510,7 @@ class RadosBucket : public StoreBucket { private: RadosStore* store; RGWAccessControlPolicy acls; + std::string topics_oid() const; public: RadosBucket(RadosStore *_st) @@ -608,6 +616,12 @@ class RadosBucket : public StoreBucket { bool *is_truncated) override; virtual int abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct) override; + int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; + int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; + int remove_topics(RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override; private: int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 8c1b0cf66d6..f29e066e5b5 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -10317,7 +10317,7 @@ next: if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) { - RGWPubSub ps(static_cast(driver), tenant); + RGWPubSub ps(driver, tenant); if (!bucket_name.empty()) { rgw_pubsub_bucket_topics result; @@ -10327,8 +10327,8 @@ next: return -ret; } - const RGWPubSub::Bucket b(ps, bucket->get_key()); - ret = b.get_topics(&result); + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), &result, null_yield); if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -10336,7 +10336,7 @@ next: encode_json("result", result, formatter.get()); } else { rgw_pubsub_topics result; - int ret = ps.get_topics(&result); + int ret = ps.get_topics(dpp(), &result, null_yield); if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -10352,10 +10352,10 @@ next: return EINVAL; } - RGWPubSub ps(static_cast(driver), tenant); + RGWPubSub ps(driver, tenant); rgw_pubsub_topic topic; - ret = ps.get_topic(topic_name, &topic); + ret = ps.get_topic(dpp(), topic_name, &topic, null_yield); if (ret < 0) { cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -10370,7 +10370,7 @@ next: return EINVAL; } - RGWPubSub ps(static_cast(driver), tenant); + RGWPubSub ps(driver, tenant); ret = ps.remove_topic(dpp(), topic_name, null_yield); if (ret < 0) { diff --git a/src/rgw/driver/rados/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc similarity index 86% rename from src/rgw/driver/rados/rgw_pubsub.cc rename to src/rgw/rgw_pubsub.cc index 4ffee1769d9..538b15e5a33 100644 --- a/src/rgw/driver/rados/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -4,7 +4,6 @@ #include "services/svc_zone.h" #include "rgw_b64.h" #include "rgw_sal.h" -#include "rgw_sal_rados.h" #include "rgw_pubsub.h" #include "rgw_tools.h" #include "rgw_xml.h" @@ -404,30 +403,16 @@ std::string rgw_pubsub_dest::to_json_str() const return ss.str(); } -RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant) - : store(_store), tenant(_tenant), svc_sysobj(store->svc()->sysobj) -{ - get_meta_obj(&meta_obj); -} +RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant) + : driver(_driver), tenant(_tenant) +{} -int RGWPubSub::remove(const DoutPrefixProvider *dpp, - const rgw_raw_obj& obj, - RGWObjVersionTracker *objv_tracker, - optional_yield y) const +int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics *result, + RGWObjVersionTracker *objv_tracker, optional_yield y) const { - int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y); + const int ret = driver->read_topics(tenant, *result, objv_tracker, y, dpp); if (ret < 0) { - return ret; - } - - return 0; -} - -int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker) const -{ - int ret = read(meta_obj, result, objv_tracker); - if (ret < 0) { - ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; + ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; return ret; } return 0; @@ -436,7 +421,7 @@ int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, RGWObjVersionTracker *objv_tracker, optional_yield y) const { - int ret = write(dpp, meta_obj, topics, objv_tracker, y); + const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; @@ -444,16 +429,12 @@ int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topi return 0; } -int RGWPubSub::get_topics(rgw_pubsub_topics *result) const +int RGWPubSub::Bucket::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics *result, + RGWObjVersionTracker *objv_tracker, optional_yield y) const { - return read_topics(result, nullptr); -} - -int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) const -{ - int ret = ps.read(bucket_meta_obj, result, objv_tracker); + const int ret = bucket->read_topics(*result, objv_tracker, y, dpp); if (ret < 0 && ret != -ENOENT) { - ldout(ps.store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } return 0; @@ -463,32 +444,27 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub RGWObjVersionTracker *objv_tracker, optional_yield y) const { - int ret = ps.write(dpp, bucket_meta_obj, topics, objv_tracker, y); + const int ret = bucket->write_topics(topics, objv_tracker, y, dpp); if (ret < 0) { - ldout(ps.store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; return ret; } return 0; } -int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) const -{ - return read_topics(result, nullptr); -} - -int RGWPubSub::get_topic(const std::string& name, rgw_pubsub_topic *result) const +int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic *result, optional_yield y) const { rgw_pubsub_topics topics; - int ret = get_topics(&topics); + const int ret = read_topics(dpp, &topics, nullptr, y); if (ret < 0) { - ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } auto iter = topics.topics.find(name); if (iter == topics.topics.end()) { - ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; + ldpp_dout(dpp, 1) << "ERROR: topic not found" << dendl; return -ENOENT; } @@ -505,7 +481,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const { rgw_pubsub_topic topic_info; - int ret = ps.get_topic(topic_name, &topic_info); + int ret = ps.get_topic(dpp, topic_name, &topic_info, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; return ret; @@ -515,14 +491,14 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const RGWObjVersionTracker objv_tracker; rgw_pubsub_bucket_topics bucket_topics; - ret = read_topics(&bucket_topics, &objv_tracker); + ret = read_topics(dpp, &bucket_topics, &objv_tracker, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" << - bucket.name << "': ret=" << ret << dendl; + bucket->get_name() << "': ret=" << ret << dendl; return ret; } ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << - bucket.name << "'" << dendl; + bucket->get_name() << "'" << dendl; auto& topic_filter = bucket_topics.topics[topic_name]; topic_filter.topic = topic_info; @@ -534,39 +510,34 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const ret = write_topics(dpp, bucket_topics, &objv_tracker, y); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket->get_name() << "': ret=" << ret << dendl; return ret; } - ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; + ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket->get_name() << "'" << dendl; return 0; } int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const { - rgw_pubsub_topic topic_info; - - int ret = ps.get_topic(topic_name, &topic_info); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; - return ret; - } - RGWObjVersionTracker objv_tracker; rgw_pubsub_bucket_topics bucket_topics; - ret = read_topics(&bucket_topics, &objv_tracker); + auto ret = read_topics(dpp, &bucket_topics, &objv_tracker, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } - bucket_topics.topics.erase(topic_name); + if (bucket_topics.topics.erase(topic_name) == 0) { + ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl; + return 0; + } if (bucket_topics.topics.empty()) { // no more topics - delete the notification object of the bucket - ret = ps.remove(dpp, bucket_meta_obj, &objv_tracker, y); + ret = bucket->remove_topics(&objv_tracker, y, dpp); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; return ret; @@ -588,9 +559,9 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio { // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - auto ret = get_topics(&bucket_topics); + auto ret = get_topics(dpp, &bucket_topics, y); if (ret < 0 && ret != -ENOENT) { - ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket->get_name() << "', ret=" << ret << dendl; return ret ; } @@ -604,7 +575,7 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio } // delete the notification object of the bucket - ret = ps.remove(dpp, bucket_meta_obj, nullptr, y); + ret = bucket->remove_topics(nullptr, y, dpp); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; return ret; @@ -622,7 +593,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& na RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; - int ret = read_topics(&topics, &objv_tracker); + int ret = read_topics(dpp, &topics, &objv_tracker, y); if (ret < 0 && ret != -ENOENT) { // its not an error if not topics exist, we create one ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; @@ -650,7 +621,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; - int ret = read_topics(&topics, &objv_tracker); + int ret = read_topics(dpp, &topics, &objv_tracker, y); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -671,11 +642,3 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na return 0; } -void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid()); -} - -void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); -} - diff --git a/src/rgw/driver/rados/rgw_pubsub.h b/src/rgw/rgw_pubsub.h similarity index 86% rename from src/rgw/driver/rados/rgw_pubsub.h rename to src/rgw/rgw_pubsub.h index 6b5f029d7ef..18828281ce6 100644 --- a/src/rgw/driver/rados/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -3,14 +3,12 @@ #pragma once -#include "services/svc_sys_obj.h" +#include "rgw_sal.h" #include "rgw_tools.h" #include "rgw_zone.h" #include "rgw_notify_event_type.h" #include -namespace rgw::sal { class RadosStore; } - class XMLObj; struct rgw_s3_key_filter { @@ -535,66 +533,46 @@ struct rgw_pubsub_topics { }; WRITE_CLASS_ENCODER(rgw_pubsub_topics) -static std::string pubsub_oid_prefix = "pubsub."; - class RGWPubSub { friend class Bucket; - rgw::sal::RadosStore* const store; + rgw::sal::Driver* const driver; const std::string tenant; - RGWSI_SysObj* const svc_sysobj; - - rgw_raw_obj meta_obj; - - std::string meta_oid() const { - return pubsub_oid_prefix + tenant; - } - - std::string bucket_meta_oid(const rgw_bucket& bucket) const { - return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker; - } - - template - int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker) const; - - template - int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info, - RGWObjVersionTracker* obj_tracker, optional_yield y) const; - int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker, - optional_yield y) const; - - int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker) const; + int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics *result, + RGWObjVersionTracker* objv_tracker, optional_yield y) const; int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y) const; public: - RGWPubSub(rgw::sal::RadosStore* _store, const std::string& tenant); + RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant); class Bucket { friend class RGWPubSub; const RGWPubSub& ps; - const rgw_bucket& bucket; - rgw_raw_obj bucket_meta_obj; + rgw::sal::Bucket* const bucket; // read the list of topics associated with a bucket and populate into result // use version tacker to enforce atomicity between read/write // return 0 on success or if no topic was associated with the bucket, error code otherwise - int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker) const; + int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics *result, + RGWObjVersionTracker* objv_tracker, optional_yield y) const; // set the list of topics associated with a bucket // use version tacker to enforce atomicity between read/write // return 0 on success, error code otherwise int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y) const; public: - Bucket(const RGWPubSub& _ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { - ps.get_bucket_meta_obj(bucket, &bucket_meta_obj); - } + Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) : + ps(_ps), bucket(_bucket) + {} - // read the list of topics associated with a bucket and populate into result + // get the list of topics associated with a bucket and populate into result // return 0 on success or if no topic was associated with the bucket, error code otherwise - int get_topics(rgw_pubsub_bucket_topics *result) const; + int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics *result, optional_yield y) const { + return read_topics(dpp, result, nullptr, y); + } // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket // assigning a notification name is optional (needed for S3 compatible notifications) // if the topic already exist on the bucket, the filter event list may be updated @@ -615,16 +593,15 @@ public: int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const; }; - void get_meta_obj(rgw_raw_obj *obj) const; - void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const; - - // get all topics (per tenant, if used)) and populate them into "result" - // return 0 on success or if no topics exist, error code otherwise - int get_topics(rgw_pubsub_topics *result) const; + // get the list of topics + // return 0 on success or if no topic was associated with the bucket, error code otherwise + int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics *result, optional_yield y) const { + return read_topics(dpp, result, nullptr, y); + } // get a topic with by its name and populate it into "result" // return -ENOENT if the topic does not exists // return 0 on success, error code otherwise - int get_topic(const std::string& name, rgw_pubsub_topic *result) const; + int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic *result, optional_yield y) const; // create a topic with a name only // if the topic already exists it is a no-op (considered success) // return 0 on success, error code otherwise @@ -640,37 +617,3 @@ public: int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const; }; -template -int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker) const -{ - bufferlist bl; - int ret = rgw_get_system_obj(svc_sysobj, - obj.pool, obj.oid, - bl, - objv_tracker, - nullptr, null_yield, nullptr, nullptr); - if (ret < 0) { - return ret; - } - - auto iter = bl.cbegin(); - try { - decode(*result, iter); - } catch (buffer::error& err) { - return -EIO; - } - - return 0; -} - -template -int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info, - RGWObjVersionTracker* objv_tracker, optional_yield y) const -{ - bufferlist bl; - encode(info, bl); - - return rgw_put_system_obj(dpp, svc_sysobj, obj.pool, obj.oid, - bl, false, objv_tracker, real_time(), y); -} - diff --git a/src/rgw/driver/rados/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc similarity index 94% rename from src/rgw/driver/rados/rgw_rest_pubsub.cc rename to src/rgw/rgw_rest_pubsub.cc index 5e6e60c4647..db361bce3bd 100644 --- a/src/rgw/driver/rados/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -13,7 +13,6 @@ #include "rgw_arn.h" #include "rgw_auth_s3.h" #include "rgw_notify.h" -#include "rgw_sal_rados.h" #include "services/svc_zone.h" #include "common/dout.h" #include "rgw_url.h" @@ -166,7 +165,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) { return; } - RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); + const RGWPubSub ps(driver, s->owner.get_id().tenant); op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; @@ -220,8 +219,8 @@ public: }; void RGWPSListTopicsOp::execute(optional_yield y) { - RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - op_ret = ps.get_topics(&result); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + op_ret = ps.get_topics(this, &result, y); // if there are no topics it is not considered an error op_ret = op_ret == -ENOENT ? 0 : op_ret; if (op_ret < 0) { @@ -298,8 +297,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) { if (op_ret < 0) { return; } - RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - op_ret = ps.get_topic(topic_name, &result); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + op_ret = ps.get_topic(this, topic_name, &result, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -374,8 +373,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) { if (op_ret < 0) { return; } - RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - op_ret = ps.get_topic(topic_name, &result); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + op_ret = ps.get_topic(this, topic_name, &result, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -459,7 +458,7 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { if (op_ret < 0) { return; } - RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); + const RGWPubSub ps(driver, s->owner.get_id().tenant); op_ret = ps.remove_topic(this, topic_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl; @@ -568,7 +567,7 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc class RGWPSCreateNotifOp : public RGWDefaultResponseOp { private: std::string bucket_name; - RGWBucketInfo bucket_info; + std::unique_ptr bucket; rgw_pubsub_s3_notifications configurations; int get_params() { @@ -646,15 +645,15 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { return; } - const RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - const RGWPubSub::Bucket b(ps, bucket_info.bucket); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); if(configurations.list.empty()) { // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - op_ret = b.get_topics(&bucket_topics); + op_ret = b.get_topics(this, &bucket_topics, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; + ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl; return; } @@ -692,7 +691,7 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { // get topic information. destination information is stored in the topic rgw_pubsub_topic topic_info; - op_ret = ps.get_topic(topic_name, &topic_info); + op_ret = ps.get_topic(this, topic_name, &topic_info, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -735,15 +734,13 @@ int RGWPSCreateNotifOp::verify_permission(optional_yield y) { } std::unique_ptr user = driver->get_user(s->owner.get_id()); - std::unique_ptr bucket; ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y); if (ret < 0) { ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl; return ret; } - bucket_info = bucket->get_info(); - if (bucket_info.owner != s->owner.get_id()) { + if (bucket->get_info().owner != s->owner.get_id()) { ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl; return -EPERM; } @@ -754,7 +751,7 @@ int RGWPSCreateNotifOp::verify_permission(optional_yield y) { class RGWPSDeleteNotifOp : public RGWDefaultResponseOp { private: std::string bucket_name; - RGWBucketInfo bucket_info; + std::unique_ptr bucket; std::string notif_name; public: @@ -792,14 +789,14 @@ void RGWPSDeleteNotifOp::execute(optional_yield y) { return; } - const RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - const RGWPubSub::Bucket b(ps, bucket_info.bucket); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - op_ret = b.get_topics(&bucket_topics); + op_ret = b.get_topics(this, &bucket_topics, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; + ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl; return; } @@ -826,14 +823,12 @@ int RGWPSDeleteNotifOp::verify_permission(optional_yield y) { } std::unique_ptr user = driver->get_user(s->owner.get_id()); - std::unique_ptr bucket; ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y); if (ret < 0) { return ret; } - bucket_info = bucket->get_info(); - if (bucket_info.owner != s->owner.get_id()) { + if (bucket->get_info().owner != s->owner.get_id()) { ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl; return -EPERM; } @@ -844,7 +839,7 @@ int RGWPSDeleteNotifOp::verify_permission(optional_yield y) { class RGWPSListNotifsOp : public RGWOp { private: std::string bucket_name; - RGWBucketInfo bucket_info; + std::unique_ptr bucket; std::string notif_name; rgw_pubsub_s3_notifications notifications; @@ -891,14 +886,14 @@ private: }; void RGWPSListNotifsOp::execute(optional_yield y) { - const RGWPubSub ps(static_cast(driver), s->owner.get_id().tenant); - const RGWPubSub::Bucket b(ps, bucket_info.bucket); + const RGWPubSub ps(driver, s->owner.get_id().tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - op_ret = b.get_topics(&bucket_topics); + op_ret = b.get_topics(this, &bucket_topics, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; + ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl; return; } if (!notif_name.empty()) { @@ -929,14 +924,12 @@ int RGWPSListNotifsOp::verify_permission(optional_yield y) { } std::unique_ptr user = driver->get_user(s->owner.get_id()); - std::unique_ptr bucket; ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y); if (ret < 0) { return ret; } - bucket_info = bucket->get_info(); - if (bucket_info.owner != s->owner.get_id()) { + if (bucket->get_info().owner != s->owner.get_id()) { ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl; return -EPERM; } diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 8e302b10d57..622fd434f70 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -35,6 +35,8 @@ class RGWDataSyncStatusManager; class RGWSyncModuleInstance; typedef std::shared_ptr RGWSyncModuleInstanceRef; class RGWCompressionInfo; +struct rgw_pubsub_topics; +struct rgw_pubsub_bucket_topics; using RGWBucketListNameFilter = std::function; @@ -336,7 +338,15 @@ class Driver { const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) = 0; - + /** Read the topic config entry into @a data and (optionally) @a objv_tracker */ + virtual int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) = 0; + /** Write @a info and (optionally) @a objv_tracker into the config */ + virtual int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) = 0; + /** Remove the topic config, optionally a specific version */ + virtual int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, + optional_yield y,const DoutPrefixProvider *dpp) = 0; /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ @@ -770,6 +780,16 @@ class Bucket { virtual int abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct) = 0; + /** Read the bucket notification config into @a notifications with and (optionally) @a objv_tracker */ + virtual int read_topics(rgw_pubsub_bucket_topics& notifications, + RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) = 0; + /** Write @a notifications with (optionally) @a objv_tracker into the bucket notification config */ + virtual int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) = 0; + /** Remove the bucket notification config with (optionally) @a objv_tracker */ + virtual int remove_topics(RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) = 0; + /* dang - This is temporary, until the API is completed */ virtual rgw_bucket& get_key() = 0; virtual RGWBucketInfo& get_info() = 0; diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index b00540310bd..774677a6bbb 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -211,6 +211,19 @@ public: std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override; + int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->read_topics(tenant, topics, objv_tracker, y, dpp); + } + int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->write_topics(tenant, topics, objv_tracker, y, dpp); + } + int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->remove_topics(tenant, objv_tracker, y, dpp); + } + virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; @@ -503,6 +516,19 @@ public: virtual int abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct) override; + int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->read_topics(notifications, objv_tracker, y, dpp); + } + int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* obj_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->write_topics(notifications, obj_tracker, y, dpp); + } + int remove_topics(RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override { + return next->remove_topics(objv_tracker, y, dpp); + } + virtual rgw_bucket& get_key() override { return next->get_key(); } virtual RGWBucketInfo& get_info() override { return next->get_info(); } diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index 78b32021fcb..c7c5f5ae611 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -27,6 +27,13 @@ class StoreDriver : public Driver { virtual uint64_t get_new_req_id() override { return ceph::util::generate_random_number(); } + + int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;} + int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;} + int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;} }; class StoreUser : public User { @@ -147,6 +154,13 @@ class StoreBucket : public Bucket { (info.bucket.bucket_id != sb.info.bucket.bucket_id); } + int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return 0;} + int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return 0;} + int remove_topics(RGWObjVersionTracker* objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override {return 0;} + friend class BucketList; protected: virtual void set_ent(RGWBucketEnt& _ent) { ent = _ent; info.bucket = ent.bucket; info.placement_rule = ent.placement_rule; } -- 2.47.3