]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/admin/notification: add command to dump notifications 58070/head
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 5 Jun 2024 16:08:16 +0000 (16:08 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Sun, 16 Jun 2024 13:06:26 +0000 (13:06 +0000)
command will dump pending notifications from a persistent queue in JSON
format:

radosgw-admin topic dump --topic <name>

Fixes: https://tracker.ceph.com/issues/66404
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
(cherry picked from commit e63e663dc9095589f4fe28d541fd5178c033bffd)

 Conflicts:
src/rgw/driver/rados/rgw_notify.cc
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.h
src/test/cli/radosgw-admin/help.t
src/test/rgw/bucket_notification/test_bn.py

src/rgw/driver/rados/rgw_notify.cc
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/test/cli/radosgw-admin/help.t

index b1835016ec0eef21b9adc01ace1e43c67f74fc50..d90ea1a79a989d8e46e1c4aa3c86f0615088f73c 100644 (file)
 
 namespace rgw::notify {
 
-struct event_entry_t {
-  rgw_pubsub_s3_event event;
-  std::string push_endpoint;
-  std::string push_endpoint_args;
-  std::string arn_topic;
-  
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(event, bl);
-    encode(push_endpoint, bl);
-    encode(push_endpoint_args, bl);
-    encode(arn_topic, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(event, bl);
-    decode(push_endpoint, bl);
-    decode(push_endpoint_args, bl);
-    decode(arn_topic, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(event_entry_t)
-
 using queues_t = std::set<std::string>;
 
 // use mmap/mprotect to allocate 128k coroutine stacks
index 5cabd2a9bd40b0a9787bed97e98ee7f0dc945045..b6abf30dec2238dda26cfffeb49f5116142b517d 100644 (file)
@@ -31,6 +31,8 @@ extern "C" {
 
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
 
 #include "include/utime.h"
 #include "include/str_list.h"
@@ -315,6 +317,7 @@ void usage()
   cout << "  topic list                 list bucket notifications topics\n";
   cout << "  topic get                  get a bucket notifications topic\n";
   cout << "  topic rm                   remove a bucket notifications topic\n";
+  cout << "  topic dump                       dump (in JSON format) all pending bucket notifications of a persistent topic\n";
   cout << "  script put                 upload a lua script to a context\n";
   cout << "  script get                 get the lua script of a context\n";
   cout << "  script rm                  remove the lua scripts of a context\n";
@@ -850,6 +853,7 @@ enum class OPT {
   PUBSUB_NOTIFICATION_LIST,
   PUBSUB_NOTIFICATION_GET,
   PUBSUB_NOTIFICATION_RM,
+  PUBSUB_TOPIC_DUMP,
   SCRIPT_PUT,
   SCRIPT_GET,
   SCRIPT_RM,
@@ -1087,6 +1091,7 @@ static SimpleCmd::Commands all_cmds = {
   { "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
   { "notification get", OPT::PUBSUB_NOTIFICATION_GET },
   { "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
+  { "topic dump", OPT::PUBSUB_TOPIC_DUMP },
   { "script put", OPT::SCRIPT_PUT },
   { "script get", OPT::SCRIPT_GET },
   { "script rm", OPT::SCRIPT_RM },
@@ -4256,6 +4261,7 @@ int main(int argc, const char **argv)
        OPT::PUBSUB_NOTIFICATION_LIST,
                         OPT::PUBSUB_TOPIC_GET,
        OPT::PUBSUB_NOTIFICATION_GET,
+       OPT::PUBSUB_TOPIC_DUMP  ,
                         OPT::SCRIPT_GET,
     };
 
@@ -4340,6 +4346,7 @@ int main(int argc, const char **argv)
                           && opt_cmd != OPT::PUBSUB_TOPIC_GET
                           && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
                           && opt_cmd != OPT::PUBSUB_TOPIC_RM
+                          && opt_cmd != OPT::PUBSUB_TOPIC_DUMP
                           && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
         cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
         return EINVAL;
@@ -10665,6 +10672,66 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) {
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    RGWPubSub ps(driver, tenant);
+
+    rgw_pubsub_topic topic;
+    ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    if (!topic.dest.persistent) {
+      cerr << "ERROR: topic does not have a persistent queue" << std::endl;
+      return ENOENT;
+    }
+
+    auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
+    std::string marker;
+    std::string end_marker;
+    librados::ObjectReadOperation rop;
+    std::vector<cls_queue_entry> queue_entries;
+    bool truncated = true;
+    formatter->open_array_section("eventEntries");
+    while (truncated) {
+      bufferlist bl;
+      int rc;
+      cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+      ioctx.operate(topic.dest.arn_topic, &rop, nullptr);
+      if (rc < 0 ) {
+        cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+        return -rc;
+      }
+      rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+      if (rc < 0) {
+        cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+        return -rc;
+      }
+
+      std::for_each(queue_entries.cbegin(), 
+        queue_entries.cend(), 
+        [&formatter](const auto& queue_entry) {
+          rgw::notify::event_entry_t event_entry;
+          bufferlist::const_iterator iter{&queue_entry.data};
+          try {
+            event_entry.decode(iter);
+            encode_json("", event_entry, formatter.get());
+          } catch (const buffer::error& e) {
+            cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+          }
+        });
+      formatter->flush(cout);
+      marker = end_marker;
+    }
+    formatter->close_section();
+    formatter->flush(cout);
+  }
+
   if (opt_cmd == OPT::SCRIPT_PUT) {
     if (!str_script_ctx) {
       cerr << "ERROR: context was not provided (via --context)" << std::endl;
index 2b0cffd47c1fb72d74d24b257c773dd337eecfc9..e5f71fe9dffca53e2393b395172d0487ca6d3c14 100644 (file)
@@ -444,6 +444,19 @@ std::string rgw_pubsub_dest::to_json_str() const
   return ss.str();
 }
 
+namespace rgw::notify {
+void event_entry_t::dump(Formatter *f) const {
+  Formatter::ObjectSection s(*f, "entry");
+  {
+    Formatter::ObjectSection sub_s(*f, "event");
+    event.dump(f);
+  }
+  encode_json("pushEndpoint", push_endpoint, f);
+  encode_json("pushEndpointArgs", push_endpoint_args, f);
+  encode_json("topic", arn_topic, f);
+}
+}
+
 RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant)
   : driver(_driver), tenant(_tenant)
 {}
index 290c52c2b8fe58044c6a3a241cb83eb01641cba1..b6aad94de144384b26dd8ecee32629872b4ae3c1 100644 (file)
@@ -536,6 +536,37 @@ struct rgw_pubsub_topics {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topics)
 
+namespace rgw::notify {
+
+struct event_entry_t {
+  rgw_pubsub_s3_event event;
+  std::string push_endpoint;
+  std::string push_endpoint_args;
+  std::string arn_topic;
+  
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(event, bl);
+    encode(push_endpoint, bl);
+    encode(push_endpoint_args, bl);
+    encode(arn_topic, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(event, bl);
+    decode(push_endpoint, bl);
+    decode(push_endpoint_args, bl);
+    decode(arn_topic, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(event_entry_t)
+}
+
 class RGWPubSub
 {
   friend class Bucket;
index 828bebf0776f6cf3082ed600b6a98c567fb47550..45762c37c8679a567865f00bdab4d0911b633dad 100644 (file)
     topic list                 list bucket notifications topics
     topic get                  get a bucket notifications topic
     topic rm                   remove a bucket notifications topic
+    topic dump                       dump (in JSON format) all pending bucket notifications of a persistent topic
     script put                 upload a lua script to a context
     script get                 get the lua script of a context
     script rm                  remove the lua scripts of a context