From: Ali Masarwa Date: Tue, 19 Sep 2023 11:32:42 +0000 (+0300) Subject: RGW | Bucket Notification: migrating old entries to support persistency control X-Git-Tag: v19.0.0~378^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F53580%2Fhead;p=ceph.git RGW | Bucket Notification: migrating old entries to support persistency control Signed-off-by: Ali Masarwa --- diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 44d11a58a7127..46b3a2302cd4c 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -43,7 +43,7 @@ struct event_entry_t { } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(event, bl); decode(push_endpoint, bl); decode(push_endpoint_args, bl); @@ -97,7 +97,7 @@ class Manager : public DoutPrefixProvider { const uint32_t reservations_cleanup_period_s; queues_persistency_tracker topics_persistency_tracker; public: - librados::IoCtx& rados_ioctx; + rgw::sal::RadosStore& rados_store; private: @@ -115,7 +115,7 @@ private: librados::ObjectReadOperation op; queues_t queues_chunk; op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval); - const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y); + const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, &op, nullptr, y); if (ret == -ENOENT) { // queue list object was not created - nothing to do return 0; @@ -190,8 +190,9 @@ private: }; enum class EntryProcessingResult { - Failure, Successful, Sleeping, Expired + Failure, Successful, Sleeping, Expired, Migrating }; + std::vector entryProcessingResultString = {"Failure", "Successful", "Sleeping", "Expired", "Migrating"}; // processing of a specific entry // return whether processing was successfull (true) or not (false) @@ -206,6 +207,10 @@ private: return EntryProcessingResult::Failure; } + if (event_entry.creation_time == ceph::coarse_real_clock::zero()) { + return EntryProcessingResult::Migrating; + } + const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ? event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live; const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ? @@ -230,8 +235,6 @@ private: if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) { return EntryProcessingResult::Sleeping; } - // TODO: write back the entry with creation time as if now - // if event_entry.creation_time == zero ++entry_persistency_tracker.retires_num; entry_persistency_tracker.last_retry_time = time_now; @@ -275,7 +278,7 @@ private: "" /*no tag*/); cls_2pc_queue_expire_reservations(op, stale_time); // check ownership and do reservation cleanup in one batch - auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield)); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 5) << "INFO: queue: " @@ -318,6 +321,7 @@ private: } // get list of entries in the queue + auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); is_idle = true; bool truncated = false; std::string end_marker; @@ -373,6 +377,7 @@ private: auto remove_entries = false; auto entry_idx = 1U; tokens_waiter waiter(io_context); + std::vector needs_migration_vector(entries.size(), false); for (auto& entry : entries) { if (has_error) { // bail out on first error @@ -381,23 +386,25 @@ private: entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, - &remove_entries, &has_error, &waiter, &entry](yield_context yield) { + &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](yield_context yield) { const auto token = waiter.make_token(); auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield); - if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) { - ldpp_dout(this, 20) << "INFO: processing of entry: " << - entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name - << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl; + if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired + || result == EntryProcessingResult::Migrating) { + ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker + << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name + << entryProcessingResultString[static_cast(result)] << dendl; remove_entries = true; + needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); notifs_persistency_tracker.erase(entry.marker); } else { if (set_min_marker(end_marker, entry.marker) < 0) { - ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; + ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; } else { ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; } - has_error = true; + has_error = (result == EntryProcessingResult::Failure); ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; } @@ -410,14 +417,21 @@ private: // delete all published entries from queue if (remove_entries) { - uint64_t entries_to_remove = 0; + std::vector entries_to_migrate; + uint64_t index = 0; + for (const auto& entry: entries) { if (end_marker == entry.marker) { break; } - entries_to_remove++; + if (needs_migration_vector[index]) { + ldpp_dout(this, 20) << "INFO: migrating entry " << entry.marker << " from: " << queue_name << dendl; + entries_to_migrate.push_back(entry); + } + index++; } + uint64_t entries_to_remove = index; librados::ObjectWriteOperation op; op.assert_exists(); rados::cls::lock::assert_locked(&op, queue_name+"_lock", @@ -426,11 +440,10 @@ private: "" /*no tag*/); cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove); // check ownership and deleted entries in one batch - const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); if (ret == -ENOENT) { // queue was deleted - ldpp_dout(this, 5) << "INFO: queue: " - << queue_name << ". was removed. processing will stop" << dendl; + ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl; return; } if (ret == -EBUSY) { @@ -440,9 +453,69 @@ private: if (ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: " << queue_name << ". error: " << ret << dendl; + return; } else { - ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: " - << queue_name << dendl; + ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: " << queue_name << dendl; + } + + // reserving and committing the migrating entries + if (!entries_to_migrate.empty()) { + std::vector migration_vector; + std::string tenant_name; + // TODO: extract tenant name from queue_name once it is fixed + uint64_t size_to_migrate = 0; + RGWPubSub ps(&rados_store, tenant_name); + + rgw_pubsub_topic topic; + auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield)); + if (ret_of_get_topic < 0) { + // we can't migrate entries without topic info + ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: " + << ret_of_get_topic << ". Aborting migration!" << dendl; + return; + } + + for (auto entry: entries_to_migrate) { + event_entry_t event_entry; + auto iter = entry.data.cbegin(); + try { + decode(event_entry, iter); + } catch (buffer::error& err) { + ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl; + continue; + } + size_to_migrate += entry.data.length(); + event_entry.creation_time = ceph::coarse_real_clock::now(); + event_entry.time_to_live = topic.dest.time_to_live; + event_entry.max_retries = topic.dest.max_retries; + event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration; + + bufferlist bl; + encode(event_entry, bl); + migration_vector.push_back(bl); + } + + cls_2pc_reservation::id_t reservation_id; + buffer::list obl; + int rval; + cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl; + return; + } + ret = cls_2pc_queue_reserve_result(obl, reservation_id); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to parse reservation id for migration. error: " << ret << dendl; + return; + } + + cls_2pc_queue_commit(op, migration_vector, reservation_id); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + reservation_id = cls_2pc_reservation::NO_ID; + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl; + } } } } @@ -498,7 +571,7 @@ private: failover_time, LOCK_FLAG_MAY_RENEW); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield)); if (ret == -EBUSY) { // lock is already taken by another RGW ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; @@ -569,7 +642,7 @@ public: worker_count(_worker_count), stale_reservations_period_s(_stale_reservations_period_s), reservations_cleanup_period_s(_reservations_cleanup_period_s), - rados_ioctx(store->getRados()->get_notif_pool_ctx()) + rados_store(*store) { spawn::spawn(io_context, [this] (yield_context yield) { process_queues(yield); @@ -601,6 +674,7 @@ public: librados::ObjectWriteOperation op; op.create(true); cls_2pc_queue_init(op, topic_name, max_queue_size); + auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y); if (ret == -EEXIST) { // queue already exists - nothing to do @@ -696,7 +770,7 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) { if (!s_manager) { return -EAGAIN; } - return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y); + return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_name, y); } rgw::sal::Object* get_object_with_atttributes( diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index ffb8e488bbff0..041ca3ebe1146 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3262,8 +3262,8 @@ def test_ps_s3_persistent_topic_configs_ttl(): @attr('basic_test') def test_ps_s3_persistent_topic_configs_max_retries(): """ test persistent topic configurations with max_retries and retry_sleep_duration """ - config_dict = {"time_to_live": "None", "max_retries": 20, "retry_sleep_duration": 1} - buffer = 20 + config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1} + buffer = 30 persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] + buffer ps_s3_persistent_topic_configs(persistency_time, config_dict)