From: kchheda3 Date: Tue, 28 May 2024 20:50:33 +0000 (-0400) Subject: rgw/notification: Use topic attributes from topic rados object to create Pushendpoint... X-Git-Tag: v19.1.1~27^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c1f80896b5a6ef8ec87cbd283f95ea6101fa29ee;p=ceph.git rgw/notification: Use topic attributes from topic rados object to create Pushendpoint and stop calling RGWPubSubEndpoint::create for every event. Signed-off-by: kchheda3 (cherry picked from commit c8f7d311ecc24befe985dd39391b38f5f1b60510) Conflicts: src/rgw/driver/rados/rgw_notify.cc --- diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index fd58079b90bf..87c0247cf0af 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -61,6 +61,18 @@ void publish_commit_completion(rados_completion_t completion, void *arg) { } }; +static inline std::ostream& operator<<(std::ostream& out, + const event_entry_t& e) { + return out << "notification id: '" << e.event.configurationId + << "', topic: '" << e.arn_topic + << "', endpoint: '" << e.push_endpoint + << "', bucket_owner: '" << e.event.bucket_ownerIdentity + << "', bucket: '" << e.event.bucket_name + << "', object: '" << e.event.object_key + << "', event type: '" << e.event.eventName << "'"; +} + + class Manager : public DoutPrefixProvider { bool shutdown = false; const uint32_t queues_update_period_ms; @@ -178,8 +190,13 @@ private: // processing of a specific entry // return whether processing was successful (true) or not (false) - EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker, - const cls_queue_entry& entry, spawn::yield_context yield) { + EntryProcessingResult process_entry( + const ConfigProxy& conf, + persistency_tracker& entry_persistency_tracker, + const cls_queue_entry& entry, + RGWPubSubEndpoint* const push_endpoint, + const rgw_pubsub_topic& topic, + spawn::yield_context yield) { event_entry_t event_entry; auto iter = entry.data.cbegin(); try { @@ -192,7 +209,12 @@ private: if (event_entry.creation_time == ceph::coarse_real_clock::zero()) { return EntryProcessingResult::Migrating; } - + // overwrite the event entry values from the topics object fetched. + event_entry.event.opaque_data = topic.opaque_data; + event_entry.arn_topic = topic.dest.arn_topic; + event_entry.time_to_live = topic.dest.time_to_live; + event_entry.max_retries = topic.dest.max_retries; + event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration; const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ? event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live; const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ? @@ -220,30 +242,23 @@ private: ++entry_persistency_tracker.retires_num; entry_persistency_tracker.last_retry_time = time_now; - ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl; - try { - // TODO move endpoint creation to queue level - const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic, - RGWHTTPArgs(event_entry.push_endpoint_args, this), - cct); - ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << - " for entry: " << entry.marker << dendl; - const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield)); - if (ret < 0) { - ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint - << " failed. error: " << ret << " (will retry)" << dendl; - return EntryProcessingResult::Failure; - } else { - ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint - << " ok" << dendl; - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); - return EntryProcessingResult::Successful; - } - } catch (const RGWPubSubEndpoint::configuration_error& e) { - ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " - << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; + ldpp_dout(this, 20) << "Processing event entry with " << event_entry + << " retry_number: " + << entry_persistency_tracker.retires_num + << " current time: " << time_now << dendl; + const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield}); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker + << " failed. error: " << ret + << " (will retry) for event with " << event_entry + << dendl; return EntryProcessingResult::Failure; } + ldpp_dout(this, 5) << "INFO: push entry marker: " << entry.marker + << " ok for event with " << event_entry << dendl; + if (perfcounter) + perfcounter->inc(l_rgw_pubsub_push_ok); + return EntryProcessingResult::Successful; } // clean stale reservation from queue @@ -303,6 +318,42 @@ private: return ret; } + int get_topic_info(const std::string& queue_name, + const cls_queue_entry& queue_entry, + rgw_pubsub_topic& topic, + spawn::yield_context yield) { + std::string queue_topic_tenant; + std::string queue_topic_name; + parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name); + rgw_pubsub_topic topic_info; + RGWPubSub ps(&rados_store, queue_topic_tenant, site); + int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr); + if (ret < 0) { + ldpp_dout(this, 1) << "WARNING: failed to fetch topic: " + << queue_topic_name << " error: " << ret + << ". using cached topic attributes!" << dendl; + event_entry_t event_entry; + auto iter = queue_entry.data.cbegin(); + try { + decode(event_entry, iter); + } catch (buffer::error& err) { + ldpp_dout(this, 1) << "ERROR: failed to decode entry. error: " + << err.what() << dendl; + return -EIO; + } + topic_info.dest.push_endpoint = event_entry.push_endpoint; + topic_info.dest.push_endpoint_args = event_entry.push_endpoint_args; + topic_info.dest.arn_topic = event_entry.arn_topic; + topic_info.dest.arn_topic = event_entry.arn_topic; + topic_info.dest.time_to_live = event_entry.time_to_live; + topic_info.dest.max_retries = event_entry.max_retries; + topic_info.dest.retry_sleep_duration = event_entry.retry_sleep_duration; + topic_info.opaque_data = event_entry.event.opaque_data; + } + topic = std::move(topic_info); + return 0; + } + // processing of a specific queue void process_queue(const std::string& queue_name, spawn::yield_context yield) { constexpr auto max_elements = 1024; @@ -376,7 +427,25 @@ private: // log when queue is not idle ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name << ". end marker is: " << end_marker << dendl; - + rgw_pubsub_topic topic_info; + if (get_topic_info(queue_name, entries.front(), topic_info, yield) < 0) { + continue; + } + RGWPubSubEndpoint::Ptr push_endpoint; + try { + push_endpoint = RGWPubSubEndpoint::create( + topic_info.dest.push_endpoint, topic_info.dest.arn_topic, + RGWHTTPArgs(topic_info.dest.push_endpoint_args, this), cct); + ldpp_dout(this, 20) + << "INFO: push endpoint created: " << topic_info.dest.push_endpoint + << dendl; + } catch (const RGWPubSubEndpoint::configuration_error& e) { + ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " + << topic_info.dest.push_endpoint + << ". error: " << e.what() + << " (will retry sending events) " << dendl; + continue; + } is_idle = false; auto has_error = false; auto remove_entries = false; @@ -390,11 +459,16 @@ private: } entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; - spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, - &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](spawn::yield_context yield) { + spawn::spawn(yield,[this, ¬ifs_persistency_tracker, &queue_name, entry_idx, + total_entries, &end_marker, &remove_entries, &has_error, &waiter, + &entry, &needs_migration_vector, + push_endpoint = push_endpoint.get(), + &topic_info](spawn::yield_context yield) { const auto token = waiter.make_token(); auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; - auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield); + auto result = + process_entry(this->get_cct()->_conf, persistency_tracker, + entry, push_endpoint, topic_info, yield); if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired || result == EntryProcessingResult::Migrating) { ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 418d65dc4b43..415cb9947938 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -5420,3 +5420,89 @@ def test_ps_s3_data_path_v2_mixed_migration(): for conn, bucket in zip(connections_list, buckets_list): conn.delete_bucket(bucket.name) + +@attr('kafka_test') +def test_notification_caching(): + """ test notification caching """ + conn = connection() + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + incorrect_port = 8080 + endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true' + + # create s3 topic + zonegroup = get_config_zonegroup() + topic_conf = PSTopicS3(conn, topic_name, zonegroup, + endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status / 100, 2) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024 * 1024)) + thr = threading.Thread(target=set_contents_from_string, + args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + async notification is: ' + str( + time_diff * 1000 / number_of_objects) + ' milliseconds') + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target=key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + async notification is: ' + str( + time_diff * 1000 / number_of_objects) + ' milliseconds') + + time.sleep(30) + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], + get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + + # remove the port and update the topic, so its pointing to correct endpoint. + endpoint_address = 'kafka://' + kafka_server + # update s3 topic + topic_conf.set_attributes(attribute_name="push-endpoint", + attribute_val=endpoint_address) + keys = list(bucket.list()) + wait_for_queue_to_drain(topic_name) + receiver.verify_s3_events(keys, deletions=True) + + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + if receiver is not None: + stop_kafka_receiver(receiver, task)