]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW: control the persistency of the notification 52634/head
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Wed, 12 Jul 2023 10:41:47 +0000 (13:41 +0300)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Wed, 2 Aug 2023 13:40:41 +0000 (16:40 +0300)
via adding expiry by number of retries and time
and controling the frequency of retries by sleep duration (in the options of vstart)

Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
doc/radosgw/config-ref.rst
src/common/options/rgw.yaml.in
src/rgw/driver/rados/rgw_notify.cc

index c2d7633f29769b4061382775aff967ece103bfb2..8907a370e9c648d7671fceaa77b142ed04b49623 100644 (file)
@@ -309,3 +309,19 @@ below.
 
 .. confval:: rgw_d4n_host
 .. confval:: rgw_d4n_port
+
+Topic persistency settings
+==========================
+
+Topic persistency will persistently push the notification until it succeeds
+
+The default behavior is to push indefinitely as frequently as possible.
+With these settings you can control how long to persistently push and their frequency (or it succeeds before that),
+you can either control by providing maximum time of retention or maximum amount of pushing,
+and you can control the frequency with the sleep duration parameter
+
+All of these values have default value 0 (persistency retention is indefinite, and as frequently as possible)
+
+.. confval:: rgw_topic_persistency_time_to_live
+.. confval:: rgw_topic_persistency_max_retries
+.. confval:: rgw_topic_persistency_sleep_duration
index 9c3c482f2f56c3d9ac5a05fe3467475702052944..bc66e361a2b0f437d985771c443f556b10a51028 100644 (file)
@@ -3805,3 +3805,37 @@ options:
   flags:
   - startup
   with_legacy: true
+- name: rgw_topic_persistency_time_to_live
+  type: uint
+  level: advanced
+  desc: The rgw retention of persistent topics by time (seconds)
+  default: 0
+  services:
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
+- name: rgw_topic_persistency_max_retries
+  type: uint
+  level: advanced
+  desc: The maximum number sending a persistent notification would be tried.
+    Note that the value of one would mean no retries,
+    and the value of zero would mean that the notification would be tried indefinitely
+  default: 0
+  services:
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
+- name: rgw_topic_persistency_sleep_duration
+  type: uint
+  level: advanced
+  desc: The minimum time (in seconds) between two tries of the same persistent notification.
+    note that the actual time between the tries may be longer
+  default: 0
+  services:
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
+
index 7197df0ecf0b349365ed777d961bec62885fe056..b258b5361437372624bfa5c05508220e9b03de0d 100644 (file)
@@ -24,28 +24,43 @@ struct event_entry_t {
   std::string push_endpoint;
   std::string push_endpoint_args;
   std::string arn_topic;
+  ceph::coarse_real_time creation_time;
   
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(event, bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
+    encode(creation_time, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(event, bl);
     decode(push_endpoint, bl);
     decode(push_endpoint_args, bl);
     decode(arn_topic, bl);
+    if (struct_v > 1) {
+      decode(creation_time, bl);
+    } else {
+      creation_time = ceph::coarse_real_clock::zero();
+    }
     DECODE_FINISH(bl);
   }
 };
 WRITE_CLASS_ENCODER(event_entry_t)
 
+
+struct persistency_tracker {
+  ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
+  uint64_t retires_num {0};
+};
+
 using queues_t = std::set<std::string>;
+using entries_persistency_tracker = ceph::unordered_map<std::string, persistency_tracker>;
+using queues_persistency_tracker = ceph::unordered_map<std::string, entries_persistency_tracker>;
 
 // use mmap/mprotect to allocate 128k coroutine stacks
 auto make_stack_allocator() {
@@ -69,8 +84,10 @@ class Manager : public DoutPrefixProvider {
   std::vector<std::thread> workers;
   const uint32_t stale_reservations_period_s;
   const uint32_t reservations_cleanup_period_s;
+  queues_persistency_tracker topics_persistency_tracker;
 public:
   librados::IoCtx& rados_ioctx;
+
 private:
 
   CephContext *get_cct() const override { return cct; }
@@ -161,17 +178,43 @@ private:
     }   
   };
 
+  enum class EntryProcessingResult {
+    Failure, Successful, Sleeping, Expired
+  };
+
   // processing of a specific entry
   // return whether processing was successfull (true) or not (false)
-  bool process_entry(const cls_queue_entry& entry, yield_context yield) {
+  EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker,
+                                      const cls_queue_entry& entry, yield_context yield) {
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
       decode(event_entry, iter);
     } catch (buffer::error& err) {
       ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
-      return false;
+      return EntryProcessingResult::Failure;
+    }
+
+    const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live;
+    const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries;
+    const auto time_now = ceph::coarse_real_clock::now();
+    if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
+         time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
+         || ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num >  topic_persistency_max_retries) ) {
+      ldpp_dout(this, 20) << "Expiring entry retry_number=" << entry_persistency_tracker.retires_num << " creation_time="
+                          << event_entry.creation_time << " time_now:" << time_now << dendl;
+      return EntryProcessingResult::Expired;
+    }
+    if (time_now - entry_persistency_tracker.last_retry_time
+        < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) {
+      return EntryProcessingResult::Sleeping;
     }
+    // TODO: write back the entry with creation time as if now
+    // if event_entry.creation_time == zero
+
+    ++entry_persistency_tracker.retires_num;
+    entry_persistency_tracker.last_retry_time = time_now;
+    ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
     try {
       // TODO move endpoint creation to queue level
       const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
@@ -183,17 +226,17 @@ private:
       if (ret < 0) {
         ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " failed. error: " << ret << " (will retry)" << dendl;
-        return false;
+        return EntryProcessingResult::Failure;
       } else {
         ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " ok" <<  dendl;
         if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-        return true;
+        return EntryProcessingResult::Successful;
       }
     } catch (const RGWPubSubEndpoint::configuration_error& e) {
       ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " 
           << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
-      return false;
+      return EntryProcessingResult::Failure;
     }
   }
 
@@ -273,11 +316,13 @@ private:
         auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
         if (ret == -ENOENT) {
           // queue was deleted
+          topics_persistency_tracker.erase(queue_name);
           ldpp_dout(this, 5) << "INFO: queue: " 
             << queue_name << ". was removed. processing will stop" << dendl;
           return;
         }
         if (ret == -EBUSY) {
+          topics_persistency_tracker.erase(queue_name);
           ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
           return;
         }
@@ -313,14 +358,20 @@ private:
           // bail out on first error
           break;
         }
-        // TODO pass entry pointer instead of by-value
-        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) {
+
+        entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
+        spawn::spawn(yield, [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries,
+                             &entries_to_remove, &has_error, &waiter, &entry](yield_context yield) {
             const auto token = waiter.make_token();
-            if (process_entry(entry, yield)) {
+            auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
+            auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
+            if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
-                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
+                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+                << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl;
               remove_entries = true;
               ++entries_to_remove;
+              notifs_persistency_tracker.erase(entry.marker);
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
                 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
@@ -458,6 +509,7 @@ private:
       {
         std::lock_guard lock_guard(queue_gc_lock);
         std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
+          topics_persistency_tracker.erase(queue_name);
           owned_queues.erase(queue_name);
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
         });
@@ -867,6 +919,7 @@ int publish_commit(rgw::sal::Object* obj,
       event_entry.push_endpoint_args =
        std::move(topic.cfg.dest.push_endpoint_args);
       event_entry.arn_topic = topic.cfg.dest.arn_topic;
+      event_entry.creation_time = ceph::coarse_real_clock::now();
       bufferlist bl;
       encode(event_entry, bl);
       const auto& queue_name = topic.cfg.dest.arn_topic;