]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW | Bucket Notification: migrating old entries to support persistency control 53580/head
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Tue, 19 Sep 2023 11:32:42 +0000 (14:32 +0300)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Thu, 28 Sep 2023 13:30:23 +0000 (16:30 +0300)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
src/rgw/driver/rados/rgw_notify.cc
src/test/rgw/bucket_notification/test_bn.py

index 44d11a58a712728cba32550734b0998d700029c3..46b3a2302cd4c39f410aa5dbf7154bd4587352b5 100644 (file)
@@ -43,7 +43,7 @@ struct event_entry_t {
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(event, bl);
     decode(push_endpoint, bl);
     decode(push_endpoint_args, bl);
@@ -97,7 +97,7 @@ class Manager : public DoutPrefixProvider {
   const uint32_t reservations_cleanup_period_s;
   queues_persistency_tracker topics_persistency_tracker;
 public:
-  librados::IoCtx& rados_ioctx;
+  rgw::sal::RadosStore& rados_store;
 
 private:
 
@@ -115,7 +115,7 @@ private:
       librados::ObjectReadOperation op;
       queues_t queues_chunk;
       op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
-      const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
+      const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, &op, nullptr, y);
       if (ret == -ENOENT) {
         // queue list object was not created - nothing to do
         return 0;
@@ -190,8 +190,9 @@ private:
   };
 
   enum class EntryProcessingResult {
-    Failure, Successful, Sleeping, Expired
+    Failure, Successful, Sleeping, Expired, Migrating
   };
+  std::vector<std::string> entryProcessingResultString = {"Failure", "Successful", "Sleeping", "Expired", "Migrating"};
 
   // processing of a specific entry
   // return whether processing was successfull (true) or not (false)
@@ -206,6 +207,10 @@ private:
       return EntryProcessingResult::Failure;
     }
 
+    if (event_entry.creation_time == ceph::coarse_real_clock::zero()) {
+      return EntryProcessingResult::Migrating;
+    }
+
     const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
         event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
     const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
@@ -230,8 +235,6 @@ private:
     if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
       return EntryProcessingResult::Sleeping;
     }
-    // TODO: write back the entry with creation time as if now
-    // if event_entry.creation_time == zero
 
     ++entry_persistency_tracker.retires_num;
     entry_persistency_tracker.last_retry_time = time_now;
@@ -275,7 +278,7 @@ private:
         "" /*no tag*/);
       cls_2pc_queue_expire_reservations(op, stale_time);
       // check ownership and do reservation cleanup in one batch
-      auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
       if (ret == -ENOENT) {
         // queue was deleted
         ldpp_dout(this, 5) << "INFO: queue: " 
@@ -318,6 +321,7 @@ private:
       }
 
       // get list of entries in the queue
+      auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
       is_idle = true;
       bool truncated = false;
       std::string end_marker;
@@ -373,6 +377,7 @@ private:
       auto remove_entries = false;
       auto entry_idx = 1U;
       tokens_waiter waiter(io_context);
+      std::vector<bool> needs_migration_vector(entries.size(), false);
       for (auto& entry : entries) {
         if (has_error) {
           // bail out on first error
@@ -381,23 +386,25 @@ private:
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
         spawn::spawn(yield, [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
-                             &remove_entries, &has_error, &waiter, &entry](yield_context yield) {
+                             &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](yield_context yield) {
             const auto token = waiter.make_token();
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
             auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
-            if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) {
-              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
-                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
-                << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl;
+            if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
+                || result == EntryProcessingResult::Migrating) {
+              ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
+                << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+                << entryProcessingResultString[static_cast<unsigned int>(result)] << dendl;
               remove_entries = true;
+              needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
-                ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+                ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
               } else {
                 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
               }
-              has_error = true;
+              has_error = (result == EntryProcessingResult::Failure);
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
             } 
@@ -410,14 +417,21 @@ private:
 
       // delete all published entries from queue
       if (remove_entries) {
-        uint64_t entries_to_remove = 0;
+        std::vector<cls_queue_entry> entries_to_migrate;
+        uint64_t index = 0;
+
         for (const auto& entry: entries) {
           if (end_marker == entry.marker) {
             break;
           }
-          entries_to_remove++;
+          if (needs_migration_vector[index]) {
+            ldpp_dout(this, 20) << "INFO: migrating entry " << entry.marker << " from: " << queue_name  << dendl;
+            entries_to_migrate.push_back(entry);
+          }
+          index++;
         }
 
+        uint64_t entries_to_remove = index;
         librados::ObjectWriteOperation op;
         op.assert_exists();
         rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
@@ -426,11 +440,10 @@ private:
           "" /*no tag*/);
         cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
         // check ownership and deleted entries in one batch
-        const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
         if (ret == -ENOENT) {
           // queue was deleted
-          ldpp_dout(this, 5) << "INFO: queue: " 
-            << queue_name << ". was removed. processing will stop" << dendl;
+          ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
           return;
         }
         if (ret == -EBUSY) {
@@ -440,9 +453,69 @@ private:
         if (ret < 0) {
           ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker <<  " from queue: " 
             << queue_name << ". error: " << ret << dendl;
+          return;
         } else {
-          ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " 
-          << queue_name << dendl;
+          ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " << queue_name << dendl;
+        }
+
+        // reserving and committing the migrating entries
+        if (!entries_to_migrate.empty()) {
+          std::vector<bufferlist> migration_vector;
+          std::string tenant_name;
+          // TODO: extract tenant name from queue_name once it is fixed
+          uint64_t size_to_migrate = 0;
+          RGWPubSub ps(&rados_store, tenant_name);
+
+          rgw_pubsub_topic topic;
+          auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
+          if (ret_of_get_topic < 0) {
+            // we can't migrate entries without topic info
+            ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
+              << ret_of_get_topic << ". Aborting migration!" << dendl;
+            return;
+          }
+
+          for (auto entry: entries_to_migrate) {
+            event_entry_t event_entry;
+            auto iter = entry.data.cbegin();
+            try {
+              decode(event_entry, iter);
+            } catch (buffer::error& err) {
+              ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
+              continue;
+            }
+            size_to_migrate += entry.data.length();
+            event_entry.creation_time = ceph::coarse_real_clock::now();
+            event_entry.time_to_live = topic.dest.time_to_live;
+            event_entry.max_retries = topic.dest.max_retries;
+            event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration;
+
+            bufferlist bl;
+            encode(event_entry, bl);
+            migration_vector.push_back(bl);
+          }
+
+          cls_2pc_reservation::id_t reservation_id;
+          buffer::list obl;
+          int rval;
+          cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval);
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC);
+          if (ret < 0) {
+            ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl;
+            return;
+          }
+          ret = cls_2pc_queue_reserve_result(obl, reservation_id);
+          if (ret < 0) {
+            ldpp_dout(this, 1) << "ERROR: failed to parse reservation id for migration. error: " << ret << dendl;
+            return;
+          }
+
+          cls_2pc_queue_commit(op, migration_vector, reservation_id);
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+          reservation_id = cls_2pc_reservation::NO_ID;
+          if (ret < 0) {
+            ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl;
+          }
         }
       }
     }
@@ -498,7 +571,7 @@ private:
               failover_time,
               LOCK_FLAG_MAY_RENEW);
 
-        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
         if (ret == -EBUSY) {
           // lock is already taken by another RGW
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
@@ -569,7 +642,7 @@ public:
     worker_count(_worker_count),
     stale_reservations_period_s(_stale_reservations_period_s),
     reservations_cleanup_period_s(_reservations_cleanup_period_s),
-    rados_ioctx(store->getRados()->get_notif_pool_ctx())
+    rados_store(*store)
     {
       spawn::spawn(io_context, [this] (yield_context yield) {
             process_queues(yield);
@@ -601,6 +674,7 @@ public:
     librados::ObjectWriteOperation op;
     op.create(true);
     cls_2pc_queue_init(op, topic_name, max_queue_size);
+    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
     auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
     if (ret == -EEXIST) {
       // queue already exists - nothing to do
@@ -696,7 +770,7 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
   if (!s_manager) {
     return -EAGAIN;
   }
-  return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
+  return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_name, y);
 }
 
 rgw::sal::Object* get_object_with_atttributes(
index ffb8e488bbff03aca99e2b5a0d0865df1159eb75..041ca3ebe1146a4229955f0af0b8aaf149b13c05 100644 (file)
@@ -3262,8 +3262,8 @@ def test_ps_s3_persistent_topic_configs_ttl():
 @attr('basic_test')
 def test_ps_s3_persistent_topic_configs_max_retries():
     """ test persistent topic configurations with max_retries and retry_sleep_duration """
-    config_dict = {"time_to_live": "None", "max_retries": 20, "retry_sleep_duration": 1}
-    buffer = 20
+    config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1}
+    buffer = 30
     persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] + buffer
 
     ps_s3_persistent_topic_configs(persistency_time, config_dict)