]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/admin/notification: add command to dump notifications 58071/head
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 5 Jun 2024 16:08:16 +0000 (16:08 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 8 Jul 2024 10:32:25 +0000 (10:32 +0000)
command will dump pending notifications from a persistent queue in JSON
format:

radosgw-admin topic dump --topic <name>

Fixes: https://tracker.ceph.com/issues/66404
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
(cherry picked from commit e63e663dc9095589f4fe28d541fd5178c033bffd)

Conflicts:
src/rgw/driver/rados/rgw_notify.cc
        src/test/rgw/bucket_notification/test_bn.py

src/rgw/driver/rados/rgw_notify.cc
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/test/cli/radosgw-admin/help.t
src/test/rgw/bucket_notification/test_bn.py

index 31555a564cab960df90a6739d4b2e4d5866c6e3a..ac80a9441f2568fa1da6b70c94ef627ddda51d8c 100644 (file)
 
 namespace rgw::notify {
 
-struct event_entry_t {
-  rgw_pubsub_s3_event event;
-  std::string push_endpoint;
-  std::string push_endpoint_args;
-  std::string arn_topic;
-  ceph::coarse_real_time creation_time;
-  uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
-  uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
-  uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
-  
-  void encode(bufferlist& bl) const {
-    ENCODE_START(3, 1, bl);
-    encode(event, bl);
-    encode(push_endpoint, bl);
-    encode(push_endpoint_args, bl);
-    encode(arn_topic, bl);
-    encode(creation_time, bl);
-    encode(time_to_live, bl);
-    encode(max_retries, bl);
-    encode(retry_sleep_duration, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(3, bl);
-    decode(event, bl);
-    decode(push_endpoint, bl);
-    decode(push_endpoint_args, bl);
-    decode(arn_topic, bl);
-    if (struct_v > 1) {
-      decode(creation_time, bl);
-    } else {
-      creation_time = ceph::coarse_real_clock::zero();
-    }
-    if (struct_v > 2) {
-      decode(time_to_live, bl);
-      decode(max_retries, bl);
-      decode(retry_sleep_duration, bl);
-    }
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(event_entry_t)
-
-
 struct persistency_tracker {
   ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
   uint32_t retires_num {0};
index ec439e6618fbf22acf34744f46703e9359133e78..ac913342ddc16cf2cf0bfdf04774509a55c50850 100644 (file)
@@ -32,6 +32,8 @@ extern "C" {
 
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
 
 #include "include/utime.h"
 #include "include/str_list.h"
@@ -327,6 +329,7 @@ void usage()
   cout << "  topic get                        get a bucket notifications topic\n";
   cout << "  topic rm                         remove a bucket notifications topic\n";
   cout << "  topic stats                      get a bucket notifications persistent topic stats (i.e. reservations, entries & size)\n";
+  cout << "  topic dump                       dump (in JSON format) all pending bucket notifications of a persistent topic\n";
   cout << "  script put                       upload a Lua script to a context\n";
   cout << "  script get                       get the Lua script of a context\n";
   cout << "  script rm                        remove the Lua scripts of a context\n";
@@ -864,6 +867,7 @@ enum class OPT {
   PUBSUB_NOTIFICATION_GET,
   PUBSUB_NOTIFICATION_RM,
   PUBSUB_TOPIC_STATS,
+  PUBSUB_TOPIC_DUMP,
   SCRIPT_PUT,
   SCRIPT_GET,
   SCRIPT_RM,
@@ -1112,6 +1116,7 @@ static SimpleCmd::Commands all_cmds = {
   { "notification get", OPT::PUBSUB_NOTIFICATION_GET },
   { "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
   { "topic stats", OPT::PUBSUB_TOPIC_STATS },
+  { "topic dump", OPT::PUBSUB_TOPIC_DUMP },
   { "script put", OPT::SCRIPT_PUT },
   { "script get", OPT::SCRIPT_GET },
   { "script rm", OPT::SCRIPT_RM },
@@ -4323,6 +4328,7 @@ int main(int argc, const char **argv)
                         OPT::PUBSUB_TOPIC_GET,
        OPT::PUBSUB_NOTIFICATION_GET,
        OPT::PUBSUB_TOPIC_STATS  ,
+       OPT::PUBSUB_TOPIC_DUMP  ,
                         OPT::SCRIPT_GET,
     };
 
@@ -4423,6 +4429,7 @@ int main(int argc, const char **argv)
                           && opt_cmd != OPT::PUBSUB_TOPIC_RM
                           && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
                           && opt_cmd != OPT::PUBSUB_TOPIC_STATS
+                          && opt_cmd != OPT::PUBSUB_TOPIC_DUMP
                          && opt_cmd != OPT::SCRIPT_PUT
                          && opt_cmd != OPT::SCRIPT_GET
                          && opt_cmd != OPT::SCRIPT_RM
@@ -11260,9 +11267,10 @@ next:
       return ENOENT;
     }
 
+    auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
     rgw::notify::rgw_topic_stats stats;
     ret = rgw::notify::get_persistent_queue_stats(
-        dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
+        dpp(), ioctx,
         topic.dest.persistent_queue, stats, null_yield);
     if (ret < 0) {
       cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
@@ -11271,6 +11279,67 @@ next:
     encode_json("", stats, formatter.get());
     formatter->flush(cout);
   }
+  
+  if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) {
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    const std::string& account = !account_id.empty() ? account_id : tenant;
+    RGWPubSub ps(driver, account, *site);
+
+    rgw_pubsub_topic topic;
+    ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
+    if (ret < 0) {
+      cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    if (topic.dest.persistent_queue.empty()) {
+      cerr << "ERROR: topic does not have a persistent queue" << std::endl;
+      return ENOENT;
+    }
+
+    auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
+    std::string marker;
+    std::string end_marker;
+    librados::ObjectReadOperation rop;
+    std::vector<cls_queue_entry> queue_entries;
+    bool truncated = true;
+    formatter->open_array_section("eventEntries");
+    while (truncated) {
+      bufferlist bl;
+      int rc;
+      cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+      ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
+      if (rc < 0 ) {
+        cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+        return -rc;
+      }
+      rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+      if (rc < 0) {
+        cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+        return -rc;
+      }
+
+      std::for_each(queue_entries.cbegin(), 
+        queue_entries.cend(), 
+        [&formatter](const auto& queue_entry) {
+          rgw::notify::event_entry_t event_entry;
+          bufferlist::const_iterator iter{&queue_entry.data};
+          try {
+            event_entry.decode(iter);
+            encode_json("", event_entry, formatter.get());
+          } catch (const buffer::error& e) {
+            cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+          }
+        });
+      formatter->flush(cout);
+      marker = end_marker;
+    }
+    formatter->close_section();
+    formatter->flush(cout);
+  }
 
   if (opt_cmd == OPT::SCRIPT_PUT) {
     if (!str_script_ctx) {
index 160ecee1768876360e6f9312696fe069a7884190..91ff0e8ba662c2db8a0d3eeb8ed5b882d2724eb2 100644 (file)
@@ -378,6 +378,23 @@ void rgw_pubsub_s3_event::dump(Formatter *f) const {
   encode_json("opaqueData", opaque_data, f);
 }
 
+namespace rgw::notify {
+    void event_entry_t::dump(Formatter *f) const {
+      Formatter::ObjectSection s(*f, "entry");
+      {
+        Formatter::ObjectSection sub_s(*f, "event");
+        event.dump(f);
+      }
+      encode_json("pushEndpoint", push_endpoint, f);
+      encode_json("pushEndpointArgs", push_endpoint_args, f);
+      encode_json("topic", arn_topic, f);
+      encode_json("creationTime", creation_time, f);
+      encode_json("TTL", time_to_live, f);
+      encode_json("maxRetries", max_retries, f);
+      encode_json("retrySleepDuration", retry_sleep_duration, f);
+    }
+}
+
 void rgw_pubsub_topic::dump(Formatter *f) const
 {
   encode_json("owner", owner, f);
index 3835407eb4567d49b93027cf784b226ca06b8d15..b7ce443af037ec618146a8bb1d74c5410fb45733 100644 (file)
@@ -672,12 +672,56 @@ public:
 };
 
 namespace rgw::notify {
-
   // Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration)
   // defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration)
   constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
   // Used in case the topic is using the default global value for dumping in a formatter
   constexpr static const std::string_view DEFAULT_CONFIG{"None"};
+  struct event_entry_t {
+    rgw_pubsub_s3_event event;
+    std::string push_endpoint;
+    std::string push_endpoint_args;
+    std::string arn_topic;
+    ceph::coarse_real_time creation_time;
+    uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
+    uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
+    uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
+    
+    void encode(bufferlist& bl) const {
+      ENCODE_START(3, 1, bl);
+      encode(event, bl);
+      encode(push_endpoint, bl);
+      encode(push_endpoint_args, bl);
+      encode(arn_topic, bl);
+      encode(creation_time, bl);
+      encode(time_to_live, bl);
+      encode(max_retries, bl);
+      encode(retry_sleep_duration, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(3, bl);
+      decode(event, bl);
+      decode(push_endpoint, bl);
+      decode(push_endpoint_args, bl);
+      decode(arn_topic, bl);
+      if (struct_v > 1) {
+        decode(creation_time, bl);
+      } else {
+        creation_time = ceph::coarse_real_clock::zero();
+      }
+      if (struct_v > 2) {
+        decode(time_to_live, bl);
+        decode(max_retries, bl);
+        decode(retry_sleep_duration, bl);
+      }
+      DECODE_FINISH(bl);
+    }
+
+    void dump(Formatter *f) const;
+  };
+  WRITE_CLASS_ENCODER(event_entry_t)
 }
 
 std::string topic_to_unique(const std::string& topic,
index b334806398bb279b9a88be2c926e070724542340..b370427c59c6296b7218ca53bfee96721d6d4665 100644 (file)
     topic get                        get a bucket notifications topic
     topic rm                         remove a bucket notifications topic
     topic stats                      get a bucket notifications persistent topic stats (i.e. reservations, entries & size)
+    topic dump                       dump (in JSON format) all pending bucket notifications of a persistent topic
     script put                       upload a Lua script to a context
     script get                       get the Lua script of a context
     script rm                        remove the Lua scripts of a context
index 4a7bded054245fd94fd0fc16ceeaf5cd6b9c5557..cfbdb14b8a8d11d32d9442cf8f704431ba23fa8f 100644 (file)
@@ -3202,6 +3202,93 @@ def test_ps_s3_persistent_topic_stats():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+@attr('basic_test')
+def test_persistent_topic_dump():
+    """ test persistent topic dump """
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+            '&retry_sleep_duration=1'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 20
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    # topic dump
+    result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(len(parsed_result), number_of_objects)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    # topic stats
+    result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+    assert_equal(result[1], 0)
+    print(result[0])
+    parsed_result = json.loads(result[0])
+    assert_equal(len(parsed_result), 2*number_of_objects)
+
+    # start an http server in a separate thread
+    http_server = HTTPServerWithEvents((host, port))
+
+    wait_for_queue_to_drain(topic_name)
+
+    result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(len(parsed_result), 0)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     conn = connection()
     zonegroup = get_config_zonegroup()