From e63e663dc9095589f4fe28d541fd5178c033bffd Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 5 Jun 2024 16:08:16 +0000 Subject: [PATCH] rgw/admin/notification: add command to dump notifications command will dump pending notifications from a persistent queue in JSON format: radosgw-admin topic dump --topic Fixes: https://tracker.ceph.com/issues/66404 Signed-off-by: Yuval Lifshitz --- src/rgw/driver/rados/rgw_notify.cc | 44 ----------- src/rgw/rgw_admin.cc | 71 ++++++++++++++++- src/rgw/rgw_pubsub.cc | 17 ++++ src/rgw/rgw_pubsub.h | 46 ++++++++++- src/test/cli/radosgw-admin/help.t | 1 + src/test/rgw/bucket_notification/test_bn.py | 87 +++++++++++++++++++++ 6 files changed, 220 insertions(+), 46 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index dd94d3155f8ca..0c6ef7bfdd560 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -26,50 +26,6 @@ namespace rgw::notify { -struct event_entry_t { - rgw_pubsub_s3_event event; - std::string push_endpoint; - std::string push_endpoint_args; - std::string arn_topic; - ceph::coarse_real_time creation_time; - uint32_t time_to_live = DEFAULT_GLOBAL_VALUE; - uint32_t max_retries = DEFAULT_GLOBAL_VALUE; - uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE; - - void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); - encode(event, bl); - encode(push_endpoint, bl); - encode(push_endpoint_args, bl); - encode(arn_topic, bl); - encode(creation_time, bl); - encode(time_to_live, bl); - encode(max_retries, bl); - encode(retry_sleep_duration, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(3, bl); - decode(event, bl); - decode(push_endpoint, bl); - decode(push_endpoint_args, bl); - decode(arn_topic, bl); - if (struct_v > 1) { - decode(creation_time, bl); - } else { - creation_time = ceph::coarse_real_clock::zero(); - } - if (struct_v > 2) { - decode(time_to_live, bl); - decode(max_retries, bl); - decode(retry_sleep_duration, bl); - } - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(event_entry_t) - static inline std::ostream& operator<<(std::ostream& out, const event_entry_t& e) { std::string host; diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index f03d4c5f565f0..aa2c92fbcab99 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -32,6 +32,8 @@ extern "C" { #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" +#include "cls/2pc_queue/cls_2pc_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_client.h" #include "include/utime.h" #include "include/str_list.h" @@ -327,6 +329,7 @@ void usage() cout << " topic get get a bucket notifications topic\n"; cout << " topic rm remove a bucket notifications topic\n"; cout << " topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size)\n"; + cout << " topic dump dump (in JSON format) all pending bucket notifications of a persistent topic\n"; cout << " script put upload a Lua script to a context\n"; cout << " script get get the Lua script of a context\n"; cout << " script rm remove the Lua scripts of a context\n"; @@ -867,6 +870,7 @@ enum class OPT { PUBSUB_NOTIFICATION_GET, PUBSUB_NOTIFICATION_RM, PUBSUB_TOPIC_STATS, + PUBSUB_TOPIC_DUMP, SCRIPT_PUT, SCRIPT_GET, SCRIPT_RM, @@ -1115,6 +1119,7 @@ static SimpleCmd::Commands all_cmds = { { "notification get", OPT::PUBSUB_NOTIFICATION_GET }, { "notification rm", OPT::PUBSUB_NOTIFICATION_RM }, { "topic stats", OPT::PUBSUB_TOPIC_STATS }, + { "topic dump", OPT::PUBSUB_TOPIC_DUMP }, { "script put", OPT::SCRIPT_PUT }, { "script get", OPT::SCRIPT_GET }, { "script rm", OPT::SCRIPT_RM }, @@ -4326,6 +4331,7 @@ int main(int argc, const char **argv) OPT::PUBSUB_TOPIC_GET, OPT::PUBSUB_NOTIFICATION_GET, OPT::PUBSUB_TOPIC_STATS , + OPT::PUBSUB_TOPIC_DUMP , OPT::SCRIPT_GET, }; @@ -4426,6 +4432,7 @@ int main(int argc, const char **argv) && opt_cmd != OPT::PUBSUB_TOPIC_RM && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM && opt_cmd != OPT::PUBSUB_TOPIC_STATS + && opt_cmd != OPT::PUBSUB_TOPIC_DUMP && opt_cmd != OPT::SCRIPT_PUT && opt_cmd != OPT::SCRIPT_GET && opt_cmd != OPT::SCRIPT_RM @@ -11270,9 +11277,10 @@ next: return ENOENT; } + auto ioctx = static_cast(driver)->getRados()->get_notif_pool_ctx(); rgw::notify::rgw_topic_stats stats; ret = rgw::notify::get_persistent_queue_stats( - dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), + dpp(), ioctx, topic.dest.persistent_queue, stats, null_yield); if (ret < 0) { cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl; @@ -11281,6 +11289,67 @@ next: encode_json("", stats, formatter.get()); formatter->flush(cout); } + + if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) { + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + const std::string& account = !account_id.empty() ? account_id : tenant; + RGWPubSub ps(driver, account, *site); + + rgw_pubsub_topic topic; + ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr); + if (ret < 0) { + cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + if (topic.dest.persistent_queue.empty()) { + cerr << "ERROR: topic does not have a persistent queue" << std::endl; + return ENOENT; + } + + auto ioctx = static_cast(driver)->getRados()->get_notif_pool_ctx(); + std::string marker; + std::string end_marker; + librados::ObjectReadOperation rop; + std::vector queue_entries; + bool truncated = true; + formatter->open_array_section("eventEntries"); + while (truncated) { + bufferlist bl; + int rc; + cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc); + ioctx.operate(topic.dest.persistent_queue, &rop, nullptr); + if (rc < 0 ) { + cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl; + return -rc; + } + rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker); + if (rc < 0) { + cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl; + return -rc; + } + + std::for_each(queue_entries.cbegin(), + queue_entries.cend(), + [&formatter](const auto& queue_entry) { + rgw::notify::event_entry_t event_entry; + bufferlist::const_iterator iter{&queue_entry.data}; + try { + event_entry.decode(iter); + encode_json("", event_entry, formatter.get()); + } catch (const buffer::error& e) { + cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl; + } + }); + formatter->flush(cout); + marker = end_marker; + } + formatter->close_section(); + formatter->flush(cout); + } if (opt_cmd == OPT::SCRIPT_PUT) { if (!str_script_ctx) { diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 08118f57b3646..37538aa7a9b6c 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -378,6 +378,23 @@ void rgw_pubsub_s3_event::dump(Formatter *f) const { encode_json("opaqueData", opaque_data, f); } +namespace rgw::notify { + void event_entry_t::dump(Formatter *f) const { + Formatter::ObjectSection s(*f, "entry"); + { + Formatter::ObjectSection sub_s(*f, "event"); + event.dump(f); + } + encode_json("pushEndpoint", push_endpoint, f); + encode_json("pushEndpointArgs", push_endpoint_args, f); + encode_json("topic", arn_topic, f); + encode_json("creationTime", creation_time, f); + encode_json("TTL", time_to_live, f); + encode_json("maxRetries", max_retries, f); + encode_json("retrySleepDuration", retry_sleep_duration, f); + } +} + void rgw_pubsub_topic::dump(Formatter *f) const { encode_json("owner", owner, f); diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 3835407eb4567..b7ce443af037e 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -672,12 +672,56 @@ public: }; namespace rgw::notify { - // Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration) // defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration) constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX; // Used in case the topic is using the default global value for dumping in a formatter constexpr static const std::string_view DEFAULT_CONFIG{"None"}; + struct event_entry_t { + rgw_pubsub_s3_event event; + std::string push_endpoint; + std::string push_endpoint_args; + std::string arn_topic; + ceph::coarse_real_time creation_time; + uint32_t time_to_live = DEFAULT_GLOBAL_VALUE; + uint32_t max_retries = DEFAULT_GLOBAL_VALUE; + uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE; + + void encode(bufferlist& bl) const { + ENCODE_START(3, 1, bl); + encode(event, bl); + encode(push_endpoint, bl); + encode(push_endpoint_args, bl); + encode(arn_topic, bl); + encode(creation_time, bl); + encode(time_to_live, bl); + encode(max_retries, bl); + encode(retry_sleep_duration, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(3, bl); + decode(event, bl); + decode(push_endpoint, bl); + decode(push_endpoint_args, bl); + decode(arn_topic, bl); + if (struct_v > 1) { + decode(creation_time, bl); + } else { + creation_time = ceph::coarse_real_clock::zero(); + } + if (struct_v > 2) { + decode(time_to_live, bl); + decode(max_retries, bl); + decode(retry_sleep_duration, bl); + } + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + }; + WRITE_CLASS_ENCODER(event_entry_t) } std::string topic_to_unique(const std::string& topic, diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index b0db0d5dd7729..32117fd083871 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -194,6 +194,7 @@ topic get get a bucket notifications topic topic rm remove a bucket notifications topic topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size) + topic dump dump (in JSON format) all pending bucket notifications of a persistent topic script put upload a Lua script to a context script get get the Lua script of a context script rm remove the Lua scripts of a context diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 61b7374b7ef0f..98eeb5f6ad694 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3016,6 +3016,93 @@ def test_ps_s3_persistent_topic_stats(): conn.delete_bucket(bucket_name) http_server.close() +@attr('basic_test') +def test_persistent_topic_dump(): + """ test persistent topic dump """ + conn = connection() + zonegroup = get_config_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ + '&retry_sleep_duration=1' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket (async) + number_of_objects = 20 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # topic dump + result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result), number_of_objects) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # topic stats + result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) + assert_equal(result[1], 0) + print(result[0]) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result), 2*number_of_objects) + + # start an http server in a separate thread + http_server = HTTPServerWithEvents((host, port)) + + wait_for_queue_to_drain(topic_name, http_port=port) + + result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result), 0) + + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + def ps_s3_persistent_topic_configs(persistency_time, config_dict): conn = connection() zonegroup = get_config_zonegroup() -- 2.39.5