#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
#include "include/utime.h"
#include "include/str_list.h"
cout << " topic list list bucket notifications topics\n";
cout << " topic get get a bucket notifications topic\n";
cout << " topic rm remove a bucket notifications topic\n";
+ cout << " topic dump dump (in JSON format) all pending bucket notifications of a persistent topic\n";
cout << " script put upload a lua script to a context\n";
cout << " script get get the lua script of a context\n";
cout << " script rm remove the lua scripts of a context\n";
PUBSUB_NOTIFICATION_LIST,
PUBSUB_NOTIFICATION_GET,
PUBSUB_NOTIFICATION_RM,
+ PUBSUB_TOPIC_DUMP,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
{ "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
+ { "topic dump", OPT::PUBSUB_TOPIC_DUMP },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
OPT::PUBSUB_NOTIFICATION_LIST,
OPT::PUBSUB_TOPIC_GET,
OPT::PUBSUB_NOTIFICATION_GET,
+ OPT::PUBSUB_TOPIC_DUMP ,
OPT::SCRIPT_GET,
};
&& opt_cmd != OPT::PUBSUB_TOPIC_GET
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
+ && opt_cmd != OPT::PUBSUB_TOPIC_DUMP
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
return EINVAL;
}
}
+ if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) {
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ RGWPubSub ps(driver, tenant);
+
+ rgw_pubsub_topic topic;
+ ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ if (!topic.dest.persistent) {
+ cerr << "ERROR: topic does not have a persistent queue" << std::endl;
+ return ENOENT;
+ }
+
+ auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
+ std::string marker;
+ std::string end_marker;
+ librados::ObjectReadOperation rop;
+ std::vector<cls_queue_entry> queue_entries;
+ bool truncated = true;
+ formatter->open_array_section("eventEntries");
+ while (truncated) {
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+ ioctx.operate(topic.dest.arn_topic, &rop, nullptr);
+ if (rc < 0 ) {
+ cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+ rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+ if (rc < 0) {
+ cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+
+ std::for_each(queue_entries.cbegin(),
+ queue_entries.cend(),
+ [&formatter](const auto& queue_entry) {
+ rgw::notify::event_entry_t event_entry;
+ bufferlist::const_iterator iter{&queue_entry.data};
+ try {
+ event_entry.decode(iter);
+ encode_json("", event_entry, formatter.get());
+ } catch (const buffer::error& e) {
+ cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+ }
+ });
+ formatter->flush(cout);
+ marker = end_marker;
+ }
+ formatter->close_section();
+ formatter->flush(cout);
+ }
+
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topics)
+namespace rgw::notify {
+
+struct event_entry_t {
+ rgw_pubsub_s3_event event;
+ std::string push_endpoint;
+ std::string push_endpoint_args;
+ std::string arn_topic;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(event, bl);
+ encode(push_endpoint, bl);
+ encode(push_endpoint_args, bl);
+ encode(arn_topic, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(event, bl);
+ decode(push_endpoint, bl);
+ decode(push_endpoint_args, bl);
+ decode(arn_topic, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(event_entry_t)
+}
+
class RGWPubSub
{
friend class Bucket;