From fb243dd0bc57ad246eb8fa31ee6e5e20cc9018ef Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 5 Jun 2024 16:08:16 +0000 Subject: [PATCH] rgw/admin/notification: add command to dump notifications command will dump pending notifications from a persistent queue in JSON format: radosgw-admin topic dump --topic Fixes: https://tracker.ceph.com/issues/66404 Signed-off-by: Yuval Lifshitz (cherry picked from commit e63e663dc9095589f4fe28d541fd5178c033bffd) Conflicts: src/rgw/driver/rados/rgw_notify.cc src/rgw/rgw_admin.cc src/rgw/rgw_pubsub.h src/test/cli/radosgw-admin/help.t src/test/rgw/bucket_notification/test_bn.py --- src/rgw/driver/rados/rgw_notify.cc | 26 ------------ src/rgw/rgw_admin.cc | 67 ++++++++++++++++++++++++++++++ src/rgw/rgw_pubsub.cc | 13 ++++++ src/rgw/rgw_pubsub.h | 31 ++++++++++++++ src/test/cli/radosgw-admin/help.t | 1 + 5 files changed, 112 insertions(+), 26 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index b1835016ec0..d90ea1a79a9 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -19,32 +19,6 @@ namespace rgw::notify { -struct event_entry_t { - rgw_pubsub_s3_event event; - std::string push_endpoint; - std::string push_endpoint_args; - std::string arn_topic; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(event, bl); - encode(push_endpoint, bl); - encode(push_endpoint_args, bl); - encode(arn_topic, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(event, bl); - decode(push_endpoint, bl); - decode(push_endpoint_args, bl); - decode(arn_topic, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(event_entry_t) - using queues_t = std::set; // use mmap/mprotect to allocate 128k coroutine stacks diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 5cabd2a9bd4..b6abf30dec2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -31,6 +31,8 @@ extern "C" { #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" +#include "cls/2pc_queue/cls_2pc_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_client.h" #include "include/utime.h" #include "include/str_list.h" @@ -315,6 +317,7 @@ void usage() cout << " topic list list bucket notifications topics\n"; cout << " topic get get a bucket notifications topic\n"; cout << " topic rm remove a bucket notifications topic\n"; + cout << " topic dump dump (in JSON format) all pending bucket notifications of a persistent topic\n"; cout << " script put upload a lua script to a context\n"; cout << " script get get the lua script of a context\n"; cout << " script rm remove the lua scripts of a context\n"; @@ -850,6 +853,7 @@ enum class OPT { PUBSUB_NOTIFICATION_LIST, PUBSUB_NOTIFICATION_GET, PUBSUB_NOTIFICATION_RM, + PUBSUB_TOPIC_DUMP, SCRIPT_PUT, SCRIPT_GET, SCRIPT_RM, @@ -1087,6 +1091,7 @@ static SimpleCmd::Commands all_cmds = { { "notification list", OPT::PUBSUB_NOTIFICATION_LIST }, { "notification get", OPT::PUBSUB_NOTIFICATION_GET }, { "notification rm", OPT::PUBSUB_NOTIFICATION_RM }, + { "topic dump", OPT::PUBSUB_TOPIC_DUMP }, { "script put", OPT::SCRIPT_PUT }, { "script get", OPT::SCRIPT_GET }, { "script rm", OPT::SCRIPT_RM }, @@ -4256,6 +4261,7 @@ int main(int argc, const char **argv) OPT::PUBSUB_NOTIFICATION_LIST, OPT::PUBSUB_TOPIC_GET, OPT::PUBSUB_NOTIFICATION_GET, + OPT::PUBSUB_TOPIC_DUMP , OPT::SCRIPT_GET, }; @@ -4340,6 +4346,7 @@ int main(int argc, const char **argv) && opt_cmd != OPT::PUBSUB_TOPIC_GET && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET && opt_cmd != OPT::PUBSUB_TOPIC_RM + && opt_cmd != OPT::PUBSUB_TOPIC_DUMP && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) { cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl; return EINVAL; @@ -10665,6 +10672,66 @@ next: } } + if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) { + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + RGWPubSub ps(driver, tenant); + + rgw_pubsub_topic topic; + ret = ps.get_topic(dpp(), topic_name, topic, null_yield); + if (ret < 0) { + cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + if (!topic.dest.persistent) { + cerr << "ERROR: topic does not have a persistent queue" << std::endl; + return ENOENT; + } + + auto ioctx = static_cast(driver)->getRados()->get_notif_pool_ctx(); + std::string marker; + std::string end_marker; + librados::ObjectReadOperation rop; + std::vector queue_entries; + bool truncated = true; + formatter->open_array_section("eventEntries"); + while (truncated) { + bufferlist bl; + int rc; + cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc); + ioctx.operate(topic.dest.arn_topic, &rop, nullptr); + if (rc < 0 ) { + cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl; + return -rc; + } + rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker); + if (rc < 0) { + cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl; + return -rc; + } + + std::for_each(queue_entries.cbegin(), + queue_entries.cend(), + [&formatter](const auto& queue_entry) { + rgw::notify::event_entry_t event_entry; + bufferlist::const_iterator iter{&queue_entry.data}; + try { + event_entry.decode(iter); + encode_json("", event_entry, formatter.get()); + } catch (const buffer::error& e) { + cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl; + } + }); + formatter->flush(cout); + marker = end_marker; + } + formatter->close_section(); + formatter->flush(cout); + } + if (opt_cmd == OPT::SCRIPT_PUT) { if (!str_script_ctx) { cerr << "ERROR: context was not provided (via --context)" << std::endl; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 2b0cffd47c1..e5f71fe9dff 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -444,6 +444,19 @@ std::string rgw_pubsub_dest::to_json_str() const return ss.str(); } +namespace rgw::notify { +void event_entry_t::dump(Formatter *f) const { + Formatter::ObjectSection s(*f, "entry"); + { + Formatter::ObjectSection sub_s(*f, "event"); + event.dump(f); + } + encode_json("pushEndpoint", push_endpoint, f); + encode_json("pushEndpointArgs", push_endpoint_args, f); + encode_json("topic", arn_topic, f); +} +} + RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant) : driver(_driver), tenant(_tenant) {} diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 290c52c2b8f..b6aad94de14 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -536,6 +536,37 @@ struct rgw_pubsub_topics { }; WRITE_CLASS_ENCODER(rgw_pubsub_topics) +namespace rgw::notify { + +struct event_entry_t { + rgw_pubsub_s3_event event; + std::string push_endpoint; + std::string push_endpoint_args; + std::string arn_topic; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(event, bl); + encode(push_endpoint, bl); + encode(push_endpoint_args, bl); + encode(arn_topic, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(event, bl); + decode(push_endpoint, bl); + decode(push_endpoint_args, bl); + decode(arn_topic, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(event_entry_t) +} + class RGWPubSub { friend class Bucket; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 828bebf0776..45762c37c86 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -184,6 +184,7 @@ topic list list bucket notifications topics topic get get a bucket notifications topic topic rm remove a bucket notifications topic + topic dump dump (in JSON format) all pending bucket notifications of a persistent topic script put upload a lua script to a context script get get the lua script of a context script rm remove the lua scripts of a context -- 2.39.5