From: Ali Masarwa Date: Wed, 12 Jul 2023 10:41:47 +0000 (+0300) Subject: RGW: control the persistency of the notification X-Git-Tag: v19.0.0~751^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c583f69e0c30c197001ff2cb059bcd34b8d667d6;p=ceph.git RGW: control the persistency of the notification via adding expiry by number of retries and time and controling the frequency of retries by sleep duration (in the options of vstart) Signed-off-by: Ali Masarwa --- diff --git a/doc/radosgw/config-ref.rst b/doc/radosgw/config-ref.rst index c2d7633f29769..8907a370e9c64 100644 --- a/doc/radosgw/config-ref.rst +++ b/doc/radosgw/config-ref.rst @@ -309,3 +309,19 @@ below. .. confval:: rgw_d4n_host .. confval:: rgw_d4n_port + +Topic persistency settings +========================== + +Topic persistency will persistently push the notification until it succeeds + +The default behavior is to push indefinitely as frequently as possible. +With these settings you can control how long to persistently push and their frequency (or it succeeds before that), +you can either control by providing maximum time of retention or maximum amount of pushing, +and you can control the frequency with the sleep duration parameter + +All of these values have default value 0 (persistency retention is indefinite, and as frequently as possible) + +.. confval:: rgw_topic_persistency_time_to_live +.. confval:: rgw_topic_persistency_max_retries +.. confval:: rgw_topic_persistency_sleep_duration diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 9c3c482f2f56c..bc66e361a2b0f 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3805,3 +3805,37 @@ options: flags: - startup with_legacy: true +- name: rgw_topic_persistency_time_to_live + type: uint + level: advanced + desc: The rgw retention of persistent topics by time (seconds) + default: 0 + services: + - rgw + flags: + - startup + with_legacy: true +- name: rgw_topic_persistency_max_retries + type: uint + level: advanced + desc: The maximum number sending a persistent notification would be tried. + Note that the value of one would mean no retries, + and the value of zero would mean that the notification would be tried indefinitely + default: 0 + services: + - rgw + flags: + - startup + with_legacy: true +- name: rgw_topic_persistency_sleep_duration + type: uint + level: advanced + desc: The minimum time (in seconds) between two tries of the same persistent notification. + note that the actual time between the tries may be longer + default: 0 + services: + - rgw + flags: + - startup + with_legacy: true + diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 7197df0ecf0b3..b258b53614373 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -24,28 +24,43 @@ struct event_entry_t { std::string push_endpoint; std::string push_endpoint_args; std::string arn_topic; + ceph::coarse_real_time creation_time; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(event, bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); encode(arn_topic, bl); + encode(creation_time, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(event, bl); decode(push_endpoint, bl); decode(push_endpoint_args, bl); decode(arn_topic, bl); + if (struct_v > 1) { + decode(creation_time, bl); + } else { + creation_time = ceph::coarse_real_clock::zero(); + } DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(event_entry_t) + +struct persistency_tracker { + ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()}; + uint64_t retires_num {0}; +}; + using queues_t = std::set; +using entries_persistency_tracker = ceph::unordered_map; +using queues_persistency_tracker = ceph::unordered_map; // use mmap/mprotect to allocate 128k coroutine stacks auto make_stack_allocator() { @@ -69,8 +84,10 @@ class Manager : public DoutPrefixProvider { std::vector workers; const uint32_t stale_reservations_period_s; const uint32_t reservations_cleanup_period_s; + queues_persistency_tracker topics_persistency_tracker; public: librados::IoCtx& rados_ioctx; + private: CephContext *get_cct() const override { return cct; } @@ -161,17 +178,43 @@ private: } }; + enum class EntryProcessingResult { + Failure, Successful, Sleeping, Expired + }; + // processing of a specific entry // return whether processing was successfull (true) or not (false) - bool process_entry(const cls_queue_entry& entry, yield_context yield) { + EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker, + const cls_queue_entry& entry, yield_context yield) { event_entry_t event_entry; auto iter = entry.data.cbegin(); try { decode(event_entry, iter); } catch (buffer::error& err) { ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl; - return false; + return EntryProcessingResult::Failure; + } + + const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live; + const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries; + const auto time_now = ceph::coarse_real_clock::now(); + if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() && + time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl)) + || ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) { + ldpp_dout(this, 20) << "Expiring entry retry_number=" << entry_persistency_tracker.retires_num << " creation_time=" + << event_entry.creation_time << " time_now:" << time_now << dendl; + return EntryProcessingResult::Expired; + } + if (time_now - entry_persistency_tracker.last_retry_time + < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) { + return EntryProcessingResult::Sleeping; } + // TODO: write back the entry with creation time as if now + // if event_entry.creation_time == zero + + ++entry_persistency_tracker.retires_num; + entry_persistency_tracker.last_retry_time = time_now; + ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl; try { // TODO move endpoint creation to queue level const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic, @@ -183,17 +226,17 @@ private: if (ret < 0) { ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint << " failed. error: " << ret << " (will retry)" << dendl; - return false; + return EntryProcessingResult::Failure; } else { ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint << " ok" << dendl; if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); - return true; + return EntryProcessingResult::Successful; } } catch (const RGWPubSubEndpoint::configuration_error& e) { ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; - return false; + return EntryProcessingResult::Failure; } } @@ -273,11 +316,13 @@ private: auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield)); if (ret == -ENOENT) { // queue was deleted + topics_persistency_tracker.erase(queue_name); ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl; return; } if (ret == -EBUSY) { + topics_persistency_tracker.erase(queue_name); ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; return; } @@ -313,14 +358,20 @@ private: // bail out on first error break; } - // TODO pass entry pointer instead of by-value - spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) { + + entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; + spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, + &entries_to_remove, &has_error, &waiter, &entry](yield_context yield) { const auto token = waiter.make_token(); - if (process_entry(entry, yield)) { + auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; + auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield); + if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) { ldpp_dout(this, 20) << "INFO: processing of entry: " << - entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl; + entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name + << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl; remove_entries = true; ++entries_to_remove; + notifs_persistency_tracker.erase(entry.marker); } else { if (set_min_marker(end_marker, entry.marker) < 0) { ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; @@ -458,6 +509,7 @@ private: { std::lock_guard lock_guard(queue_gc_lock); std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) { + topics_persistency_tracker.erase(queue_name); owned_queues.erase(queue_name); ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl; }); @@ -867,6 +919,7 @@ int publish_commit(rgw::sal::Object* obj, event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args); event_entry.arn_topic = topic.cfg.dest.arn_topic; + event_entry.creation_time = ceph::coarse_real_clock::now(); bufferlist bl; encode(event_entry, bl); const auto& queue_name = topic.cfg.dest.arn_topic;