]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: handle migration state between v1 and v2
authorYuval Lifshitz <ylifshit@redhat.com>
Sat, 10 Feb 2024 16:38:30 +0000 (16:38 +0000)
committerCasey Bodley <cbodley@redhat.com>
Tue, 5 Mar 2024 17:56:05 +0000 (12:56 -0500)
test instructions:
https://gist.github.com/yuvalif/21449e301732b719cd1ed97c3eeeabb2

* during migration all topic and notification operations must fail with HTTP error code 503
* read operations should return the values of the v1 topics and notifications
* sending notifications should continue based on v1 values

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_admin.cc
src/rgw/rgw_common.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h

index a1290ec9b20ae9f83835845115489ac3b6ff0719..19d4c0cbb6e3b1152decf55167756d9404f33091 100644 (file)
@@ -988,7 +988,8 @@ static inline bool notification_match(reservation_t& res,
                      const RGWObjTags* req_tags)
 {
   rgw_pubsub_bucket_topics bucket_topics;
-  if (all_zonegroups_support(site, zone_features::notification_v2)) {
+  if (all_zonegroups_support(site, zone_features::notification_v2) &&
+      res.store->stat_topics_v1(res.user_tenant, res.yield, res.dpp) == -ENOENT) {
     auto ret = 0;
     if (!res.s) {
       //  for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
@@ -1064,7 +1065,7 @@ static inline bool notification_match(reservation_t& res,
       ldpp_dout(res.dpp, 1)
           << "INFO: failed to load topic: " << topic_cfg.name
           << ". error: " << ret
-          << " while storing the persistent notification event" << dendl;
+          << " while resrving persistent notification event" << dendl;
       if (ret == -ENOENT) {
         // either the topic is deleted but the corresponding notification still
         // exist or in v2 mode the notification could have synced first but
index 0a6a6b7b19f81652de905c2df4ee97b7ad0a1198..193dfb007e805fef700ad381fdf97aa61635f1f4 100644 (file)
@@ -35,6 +35,7 @@
 #include "rgw_acl_s3.h"
 #include "rgw_aio.h"
 #include "rgw_aio_throttle.h"
+#include "rgw_tools.h"
 #include "rgw_tracer.h"
 
 #include "rgw_zone.h"
@@ -1117,6 +1118,10 @@ int RadosStore::read_topics(const std::string& tenant, rgw_pubsub_topics& topics
   return 0;
 }
 
+int RadosStore::stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) {
+  return rgw_stat_system_obj(dpp, svc()->sysobj, svc()->zone->get_zone_params().log_pool, topics_oid(tenant), nullptr, nullptr, y, nullptr);
+}
+
 int RadosStore::write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
        optional_yield y, const DoutPrefixProvider *dpp) {
   bufferlist bl;
index 1eccb89dad33a858e0372aff351d3b850cc0bcbc..c97d5e1832d47c74589ef9771f694bfd4a695b1d 100644 (file)
@@ -160,6 +160,7 @@ class RadosStore : public StoreDriver {
     std::string& _req_id, optional_yield y) override;
     int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) override;
+    int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override;
     int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
        optional_yield y, const DoutPrefixProvider *dpp) override;
     int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
index a5933604b7e1b8ecf03cbb4b99b4fdd8b4e2a27d..71f7abda805841f68e4ec0a3d9fd6ec0e4a50294 100644 (file)
@@ -10628,7 +10628,8 @@ next:
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+        driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
       ret = get_bucket_notifications(dpp(), bucket.get(), result);
       if (ret < 0) {
         cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
@@ -10667,7 +10668,8 @@ next:
           continue;
         }
         std::set<std::string> subscribed_buckets;
-        if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+        if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+            driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
           ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
                                                  null_yield, dpp());
           if (ret < 0) {
@@ -10709,7 +10711,8 @@ next:
       cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+        driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
       show_topics_info_v2(topic, subscribed_buckets, formatter.get());
     } else {
       encode_json("topic", topic, formatter.get());
@@ -10733,7 +10736,8 @@ next:
       return -ret;
     }
     rgw_pubsub_bucket_topics bucket_topics;
-    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+        driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
       ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics);
       if (ret < 0) {
         cerr << "ERROR: could not get bucket notifications: "
@@ -10799,6 +10803,11 @@ next:
     }
 
     if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+      if (ret = driver->stat_topics_v1(tenant, null_yield, dpp()); ret != -ENOENT) {
+        cerr << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+          << ". please try again later" << std::endl;
+        return -ret;
+      }
       ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id,
                                    null_yield);
     } else {
index 3ee98fa18ca16b68235adf639fcd20de970621e6..bfefde9fd7295933cdfc2f4a4d24a481cec018f9 100644 (file)
@@ -127,6 +127,7 @@ rgw_http_errors rgw_http_s3_errors({
     { ERR_INTERNAL_ERROR, {500, "InternalError" }},
     { ERR_NOT_IMPLEMENTED, {501, "NotImplemented" }},
     { ERR_SERVICE_UNAVAILABLE, {503, "ServiceUnavailable"}},
+    { EBUSY, {503, "ServiceUnavailable"}},
     { ERR_RATE_LIMITED, {503, "SlowDown"}},
     { ERR_ZERO_IN_URL, {400, "InvalidRequest" }},
     { ERR_NO_SUCH_TAG_SET, {404, "NoSuchTagSet"}},
index b1b71efdafa53c9ae99f4a4e074a04b79db67769..cf207db632c678551251fe5b0068909b22a5d30c 100644 (file)
@@ -539,11 +539,12 @@ int RGWPubSub::get_topics(const DoutPrefixProvider* dpp,
                           rgw_pubsub_topics& result, std::string& next_marker,
                           optional_yield y) const
 {
-  if (!use_notification_v2) {
+  if (!use_notification_v2 || driver->stat_topics_v1(tenant, y, dpp) != -ENOENT) {
+    // in case of v1 or during migration we use v1 topics
     // v1 returns all topics, ignoring marker/max_items
     return read_topics_v1(dpp, result, nullptr, y);
   }
-
   // TODO: prefix filter on 'tenant:'
   void* handle = NULL;
   int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle);
@@ -623,6 +624,13 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
                                        RGWObjVersionTracker *objv_tracker,
                                        optional_yield y) const
 {
+  if (ps.use_notification_v2) { 
+    if (const auto ret = ps.driver->stat_topics_v1(bucket->get_tenant(), y, dpp); ret != -ENOENT) {
+      ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+        << ". please try again later" << dendl; 
+      return -ERR_SERVICE_UNAVAILABLE;
+    }
+  }
   const int ret = bucket->write_topics(topics, objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
@@ -637,7 +645,8 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
                          rgw_pubsub_topic& result,
                          optional_yield y,
                          std::set<std::string>* subscribed_buckets) const {
-  if (use_notification_v2) {
+  if (use_notification_v2 && driver->stat_topics_v1(tenant, y, dpp) == -ENOENT) {
+    // in case of v1 or during migration we use v1 topics
     int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
     if (ret < 0) {
       ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
@@ -962,6 +971,11 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const std::string& policy_text,
                             optional_yield y) const {
   if (use_notification_v2) {
+    if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) {
+      ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+        << ". please try again later" << dendl; 
+      return -ERR_SERVICE_UNAVAILABLE;
+    }
     rgw_pubsub_topic new_topic;
     new_topic.user = user;
     new_topic.name = name;
@@ -994,6 +1008,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
     ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
+  ldpp_dout(dpp, 1) << "INFO: successfully created v1 topic" << dendl;
 
   return 0;
 }
@@ -1025,6 +1040,11 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
 int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
 {
   if (use_notification_v2) {
+    if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) {
+      ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+        << ". please try again later" << dendl; 
+      return -ERR_SERVICE_UNAVAILABLE;
+    }
     return remove_topic_v2(dpp, name, y);
   }
   RGWObjVersionTracker objv_tracker;
index 1ba234f9d41dee18120ea19a5e960925a5aef465..138150b002750287a31b17cc392ce9fc1bf8e5dd 100644 (file)
@@ -1152,6 +1152,13 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
     }
   }
 
+  if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
+    ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+      << ". please try again later" << dendl; 
+    op_ret = -ERR_SERVICE_UNAVAILABLE;
+    return;
+  }
+
   std::unique_ptr<rgw::sal::Bucket> bucket;
   op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
                                &bucket, y);
@@ -1381,6 +1388,13 @@ void RGWPSDeleteNotifOp::execute_v2(optional_yield y) {
     }
   }
 
+  if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
+    ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+      << ". please try again later" << dendl; 
+    op_ret = -ERR_SERVICE_UNAVAILABLE;
+    return;
+  }
+
   std::unique_ptr<rgw::sal::Bucket> bucket;
   op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
                                &bucket, y);
@@ -1459,7 +1473,8 @@ void RGWPSListNotifsOp::execute(optional_yield y) {
 
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
-  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2) &&
+      driver->stat_topics_v1(s->bucket_tenant, y, this) == -ENOENT) {
     op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
   } else {
     const RGWPubSub ps(driver, s->owner.id.tenant);
index 060cfc1e3513bad94b81ec8c1994cdd083c72f15..7202d9c90dca9eb5caf42eef99c73e353893a340 100644 (file)
@@ -308,6 +308,8 @@ class Driver {
     /** Read the topic config entry into @a data and (optionally) @a objv_tracker */
     virtual int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) = 0;
+    /** check if the v1 topics object exists */
+    virtual int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) = 0;
     /** Write @a info and (optionally) @a objv_tracker into the config */
     virtual int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) = 0;
index 83832922f90ddd53cfb33e17644cf9e13f23a869..5095f675f16250f6c494418d266761e1eb3ef6fa 100644 (file)
@@ -186,6 +186,9 @@ public:
       optional_yield y, const DoutPrefixProvider *dpp) override {
     return next->read_topics(tenant, topics, objv_tracker, y, dpp);
   }
+  int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override {
+    return next->stat_topics_v1(tenant, y, dpp);
+  }
   int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
       optional_yield y, const DoutPrefixProvider *dpp) override {
     return next->write_topics(tenant, topics, objv_tracker, y, dpp);
index f0ac762554e64a9ec1f414e7b21b9c3c9e8e2d5e..23fc3eb76856572a46e7d92680e2e7330467463e 100644 (file)
@@ -30,6 +30,7 @@ class StoreDriver : public Driver {
 
     int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;}
+    int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;}
     int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
        optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
     int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,