From bcd79d23df63e72839c652cbc20ebb67a024ba85 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sat, 10 Feb 2024 16:38:30 +0000 Subject: [PATCH] rgw/notifications: handle migration state between v1 and v2 test instructions: https://gist.github.com/yuvalif/21449e301732b719cd1ed97c3eeeabb2 * during migration all topic and notification operations must fail with HTTP error code 503 * read operations should return the values of the v1 topics and notifications * sending notifications should continue based on v1 values Signed-off-by: Yuval Lifshitz --- src/rgw/driver/rados/rgw_notify.cc | 5 +++-- src/rgw/driver/rados/rgw_sal_rados.cc | 5 +++++ src/rgw/driver/rados/rgw_sal_rados.h | 1 + src/rgw/rgw_admin.cc | 17 +++++++++++++---- src/rgw/rgw_common.cc | 1 + src/rgw/rgw_pubsub.cc | 26 +++++++++++++++++++++++--- src/rgw/rgw_rest_pubsub.cc | 17 ++++++++++++++++- src/rgw/rgw_sal.h | 2 ++ src/rgw/rgw_sal_filter.h | 3 +++ src/rgw/rgw_sal_store.h | 1 + 10 files changed, 68 insertions(+), 10 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index a1290ec9b20..19d4c0cbb6e 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -988,7 +988,8 @@ static inline bool notification_match(reservation_t& res, const RGWObjTags* req_tags) { rgw_pubsub_bucket_topics bucket_topics; - if (all_zonegroups_support(site, zone_features::notification_v2)) { + if (all_zonegroups_support(site, zone_features::notification_v2) && + res.store->stat_topics_v1(res.user_tenant, res.yield, res.dpp) == -ENOENT) { auto ret = 0; if (!res.s) { // for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs @@ -1064,7 +1065,7 @@ static inline bool notification_match(reservation_t& res, ldpp_dout(res.dpp, 1) << "INFO: failed to load topic: " << topic_cfg.name << ". error: " << ret - << " while storing the persistent notification event" << dendl; + << " while resrving persistent notification event" << dendl; if (ret == -ENOENT) { // either the topic is deleted but the corresponding notification still // exist or in v2 mode the notification could have synced first but diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 0a6a6b7b19f..193dfb007e8 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -35,6 +35,7 @@ #include "rgw_acl_s3.h" #include "rgw_aio.h" #include "rgw_aio_throttle.h" +#include "rgw_tools.h" #include "rgw_tracer.h" #include "rgw_zone.h" @@ -1117,6 +1118,10 @@ int RadosStore::read_topics(const std::string& tenant, rgw_pubsub_topics& topics return 0; } +int RadosStore::stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) { + return rgw_stat_system_obj(dpp, svc()->sysobj, svc()->zone->get_zone_params().log_pool, topics_oid(tenant), nullptr, nullptr, y, nullptr); +} + int RadosStore::write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) { bufferlist bl; diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 1eccb89dad3..c97d5e1832d 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -160,6 +160,7 @@ class RadosStore : public StoreDriver { std::string& _req_id, optional_yield y) override; int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override; + int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override; int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override; int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index a5933604b7e..71f7abda805 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -10628,7 +10628,8 @@ next: cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) && + driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) { ret = get_bucket_notifications(dpp(), bucket.get(), result); if (ret < 0) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) @@ -10667,7 +10668,8 @@ next: continue; } std::set subscribed_buckets; - if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) && + driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) { ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets, null_yield, dpp()); if (ret < 0) { @@ -10709,7 +10711,8 @@ next: cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; return -ret; } - if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) && + driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) { show_topics_info_v2(topic, subscribed_buckets, formatter.get()); } else { encode_json("topic", topic, formatter.get()); @@ -10733,7 +10736,8 @@ next: return -ret; } rgw_pubsub_bucket_topics bucket_topics; - if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) && + driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) { ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics); if (ret < 0) { cerr << "ERROR: could not get bucket notifications: " @@ -10799,6 +10803,11 @@ next: } if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + if (ret = driver->stat_topics_v1(tenant, null_yield, dpp()); ret != -ENOENT) { + cerr << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << std::endl; + return -ret; + } ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id, null_yield); } else { diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index 3ee98fa18ca..bfefde9fd72 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -127,6 +127,7 @@ rgw_http_errors rgw_http_s3_errors({ { ERR_INTERNAL_ERROR, {500, "InternalError" }}, { ERR_NOT_IMPLEMENTED, {501, "NotImplemented" }}, { ERR_SERVICE_UNAVAILABLE, {503, "ServiceUnavailable"}}, + { EBUSY, {503, "ServiceUnavailable"}}, { ERR_RATE_LIMITED, {503, "SlowDown"}}, { ERR_ZERO_IN_URL, {400, "InvalidRequest" }}, { ERR_NO_SUCH_TAG_SET, {404, "NoSuchTagSet"}}, diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index b1b71efdafa..cf207db632c 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -539,11 +539,12 @@ int RGWPubSub::get_topics(const DoutPrefixProvider* dpp, rgw_pubsub_topics& result, std::string& next_marker, optional_yield y) const { - if (!use_notification_v2) { + if (!use_notification_v2 || driver->stat_topics_v1(tenant, y, dpp) != -ENOENT) { + // in case of v1 or during migration we use v1 topics // v1 returns all topics, ignoring marker/max_items return read_topics_v1(dpp, result, nullptr, y); } - + // TODO: prefix filter on 'tenant:' void* handle = NULL; int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle); @@ -623,6 +624,13 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub RGWObjVersionTracker *objv_tracker, optional_yield y) const { + if (ps.use_notification_v2) { + if (const auto ret = ps.driver->stat_topics_v1(bucket->get_tenant(), y, dpp); ret != -ENOENT) { + ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << dendl; + return -ERR_SERVICE_UNAVAILABLE; + } + } const int ret = bucket->write_topics(topics, objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; @@ -637,7 +645,8 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp, rgw_pubsub_topic& result, optional_yield y, std::set* subscribed_buckets) const { - if (use_notification_v2) { + if (use_notification_v2 && driver->stat_topics_v1(tenant, y, dpp) == -ENOENT) { + // in case of v1 or during migration we use v1 topics int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name @@ -962,6 +971,11 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const std::string& policy_text, optional_yield y) const { if (use_notification_v2) { + if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) { + ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << dendl; + return -ERR_SERVICE_UNAVAILABLE; + } rgw_pubsub_topic new_topic; new_topic.user = user; new_topic.name = name; @@ -994,6 +1008,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } + ldpp_dout(dpp, 1) << "INFO: successfully created v1 topic" << dendl; return 0; } @@ -1025,6 +1040,11 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const { if (use_notification_v2) { + if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) { + ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << dendl; + return -ERR_SERVICE_UNAVAILABLE; + } return remove_topic_v2(dpp, name, y); } RGWObjVersionTracker objv_tracker; diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 1ba234f9d41..138150b0027 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -1152,6 +1152,13 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) { } } + if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) { + ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << dendl; + op_ret = -ERR_SERVICE_UNAVAILABLE; + return; + } + std::unique_ptr bucket; op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), &bucket, y); @@ -1381,6 +1388,13 @@ void RGWPSDeleteNotifOp::execute_v2(optional_yield y) { } } + if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) { + ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret)) + << ". please try again later" << dendl; + op_ret = -ERR_SERVICE_UNAVAILABLE; + return; + } + std::unique_ptr bucket; op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), &bucket, y); @@ -1459,7 +1473,8 @@ void RGWPSListNotifsOp::execute(optional_yield y) { // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) { + if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2) && + driver->stat_topics_v1(s->bucket_tenant, y, this) == -ENOENT) { op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics); } else { const RGWPubSub ps(driver, s->owner.id.tenant); diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 060cfc1e351..7202d9c90dc 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -308,6 +308,8 @@ class Driver { /** Read the topic config entry into @a data and (optionally) @a objv_tracker */ virtual int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) = 0; + /** check if the v1 topics object exists */ + virtual int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) = 0; /** Write @a info and (optionally) @a objv_tracker into the config */ virtual int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) = 0; diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 83832922f90..5095f675f16 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -186,6 +186,9 @@ public: optional_yield y, const DoutPrefixProvider *dpp) override { return next->read_topics(tenant, topics, objv_tracker, y, dpp); } + int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override { + return next->stat_topics_v1(tenant, y, dpp); + } int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override { return next->write_topics(tenant, topics, objv_tracker, y, dpp); diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index f0ac762554e..23fc3eb7685 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -30,6 +30,7 @@ class StoreDriver : public Driver { int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;} + int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;} int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;} int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, -- 2.39.5