namespace rgw::notify {
-struct event_entry_t {
- rgw_pubsub_s3_event event;
- std::string push_endpoint;
- std::string push_endpoint_args;
- std::string arn_topic;
- ceph::coarse_real_time creation_time;
- uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
- uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
- uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(3, 1, bl);
- encode(event, bl);
- encode(push_endpoint, bl);
- encode(push_endpoint_args, bl);
- encode(arn_topic, bl);
- encode(creation_time, bl);
- encode(time_to_live, bl);
- encode(max_retries, bl);
- encode(retry_sleep_duration, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(3, bl);
- decode(event, bl);
- decode(push_endpoint, bl);
- decode(push_endpoint_args, bl);
- decode(arn_topic, bl);
- if (struct_v > 1) {
- decode(creation_time, bl);
- } else {
- creation_time = ceph::coarse_real_clock::zero();
- }
- if (struct_v > 2) {
- decode(time_to_live, bl);
- decode(max_retries, bl);
- decode(retry_sleep_duration, bl);
- }
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(event_entry_t)
-
-
struct persistency_tracker {
ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
uint32_t retires_num {0};
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
#include "include/utime.h"
#include "include/str_list.h"
cout << " topic get get a bucket notifications topic\n";
cout << " topic rm remove a bucket notifications topic\n";
cout << " topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size)\n";
+ cout << " topic dump dump (in JSON format) all pending bucket notifications of a persistent topic\n";
cout << " script put upload a Lua script to a context\n";
cout << " script get get the Lua script of a context\n";
cout << " script rm remove the Lua scripts of a context\n";
PUBSUB_NOTIFICATION_GET,
PUBSUB_NOTIFICATION_RM,
PUBSUB_TOPIC_STATS,
+ PUBSUB_TOPIC_DUMP,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
{ "topic stats", OPT::PUBSUB_TOPIC_STATS },
+ { "topic dump", OPT::PUBSUB_TOPIC_DUMP },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
OPT::PUBSUB_TOPIC_GET,
OPT::PUBSUB_NOTIFICATION_GET,
OPT::PUBSUB_TOPIC_STATS ,
+ OPT::PUBSUB_TOPIC_DUMP ,
OPT::SCRIPT_GET,
};
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
&& opt_cmd != OPT::PUBSUB_TOPIC_STATS
+ && opt_cmd != OPT::PUBSUB_TOPIC_DUMP
&& opt_cmd != OPT::SCRIPT_PUT
&& opt_cmd != OPT::SCRIPT_GET
&& opt_cmd != OPT::SCRIPT_RM
return ENOENT;
}
+ auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
rgw::notify::rgw_topic_stats stats;
ret = rgw::notify::get_persistent_queue_stats(
- dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
+ dpp(), ioctx,
topic.dest.persistent_queue, stats, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
encode_json("", stats, formatter.get());
formatter->flush(cout);
}
+
+ if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) {
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ const std::string& account = !account_id.empty() ? account_id : tenant;
+ RGWPubSub ps(driver, account, *site);
+
+ rgw_pubsub_topic topic;
+ ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ if (topic.dest.persistent_queue.empty()) {
+ cerr << "ERROR: topic does not have a persistent queue" << std::endl;
+ return ENOENT;
+ }
+
+ auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
+ std::string marker;
+ std::string end_marker;
+ librados::ObjectReadOperation rop;
+ std::vector<cls_queue_entry> queue_entries;
+ bool truncated = true;
+ formatter->open_array_section("eventEntries");
+ while (truncated) {
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+ ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
+ if (rc < 0 ) {
+ cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+ rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+ if (rc < 0) {
+ cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+
+ std::for_each(queue_entries.cbegin(),
+ queue_entries.cend(),
+ [&formatter](const auto& queue_entry) {
+ rgw::notify::event_entry_t event_entry;
+ bufferlist::const_iterator iter{&queue_entry.data};
+ try {
+ event_entry.decode(iter);
+ encode_json("", event_entry, formatter.get());
+ } catch (const buffer::error& e) {
+ cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+ }
+ });
+ formatter->flush(cout);
+ marker = end_marker;
+ }
+ formatter->close_section();
+ formatter->flush(cout);
+ }
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
encode_json("opaqueData", opaque_data, f);
}
+namespace rgw::notify {
+ void event_entry_t::dump(Formatter *f) const {
+ Formatter::ObjectSection s(*f, "entry");
+ {
+ Formatter::ObjectSection sub_s(*f, "event");
+ event.dump(f);
+ }
+ encode_json("pushEndpoint", push_endpoint, f);
+ encode_json("pushEndpointArgs", push_endpoint_args, f);
+ encode_json("topic", arn_topic, f);
+ encode_json("creationTime", creation_time, f);
+ encode_json("TTL", time_to_live, f);
+ encode_json("maxRetries", max_retries, f);
+ encode_json("retrySleepDuration", retry_sleep_duration, f);
+ }
+}
+
void rgw_pubsub_topic::dump(Formatter *f) const
{
encode_json("owner", owner, f);
};
namespace rgw::notify {
-
// Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration)
// defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration)
constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
// Used in case the topic is using the default global value for dumping in a formatter
constexpr static const std::string_view DEFAULT_CONFIG{"None"};
+ struct event_entry_t {
+ rgw_pubsub_s3_event event;
+ std::string push_endpoint;
+ std::string push_endpoint_args;
+ std::string arn_topic;
+ ceph::coarse_real_time creation_time;
+ uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
+ uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
+ uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(3, 1, bl);
+ encode(event, bl);
+ encode(push_endpoint, bl);
+ encode(push_endpoint_args, bl);
+ encode(arn_topic, bl);
+ encode(creation_time, bl);
+ encode(time_to_live, bl);
+ encode(max_retries, bl);
+ encode(retry_sleep_duration, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(3, bl);
+ decode(event, bl);
+ decode(push_endpoint, bl);
+ decode(push_endpoint_args, bl);
+ decode(arn_topic, bl);
+ if (struct_v > 1) {
+ decode(creation_time, bl);
+ } else {
+ creation_time = ceph::coarse_real_clock::zero();
+ }
+ if (struct_v > 2) {
+ decode(time_to_live, bl);
+ decode(max_retries, bl);
+ decode(retry_sleep_duration, bl);
+ }
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+ };
+ WRITE_CLASS_ENCODER(event_entry_t)
}
std::string topic_to_unique(const std::string& topic,
topic get get a bucket notifications topic
topic rm remove a bucket notifications topic
topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size)
+ topic dump dump (in JSON format) all pending bucket notifications of a persistent topic
script put upload a Lua script to a context
script get get the Lua script of a context
script rm remove the Lua scripts of a context
conn.delete_bucket(bucket_name)
http_server.close()
+@attr('basic_test')
+def test_persistent_topic_dump():
+ """ test persistent topic dump """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 20
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # topic dump
+ result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result), number_of_objects)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # topic stats
+ result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+ assert_equal(result[1], 0)
+ print(result[0])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result), 2*number_of_objects)
+
+ # start an http server in a separate thread
+ http_server = HTTPServerWithEvents((host, port))
+
+ wait_for_queue_to_drain(topic_name)
+
+ result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result), 0)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
def ps_s3_persistent_topic_configs(persistency_time, config_dict):
conn = connection()
zonegroup = get_config_zonegroup()