From: Ali Masarwa Date: Wed, 9 Aug 2023 11:53:04 +0000 (+0300) Subject: RGW: added a per topic configuration to control the notification persistency X-Git-Tag: v19.0.0~566^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4abb31a5f411d09a3e3a18c50ba77a9a3053b33b;p=ceph.git RGW: added a per topic configuration to control the notification persistency Signed-off-by: Ali Masarwa --- diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 1cc538432f3d..52c7e1b1adcb 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -63,6 +63,8 @@ added when the notification is committed to persistent storage. .. note:: If the notification fails with an error, cannot be delivered, or times out, it is retried until it is successfully acknowledged. + You can control its retry with time_to_live/max_retries to have a time/retry limit and + control the retry frequency with retry_sleep_duration .. tip:: To minimize the latency added by asynchronous notification, we recommended placing the "log" pool on fast media. @@ -154,6 +156,9 @@ updating, use the name of an existing topic and different endpoint values). [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false] [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false] [&Attributes.entry.11.key=mechanism&Attributes.entry.11.value=] + [&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=] + [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=] + [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=] Request parameters: @@ -162,6 +167,18 @@ Request parameters: notifications that are triggered by the topic. - persistent: This indicates whether notifications to this endpoint are persistent (=asynchronous) or not persistent. (This is "false" by default.) +- time_to_live: This will limit the time (in seconds) to retain the notifications. + default value is taken from `rgw_topic_persistency_time_to_live`. + providing a value overrides the global value. + zero value means infinite time to live. +- max_retries: This will limit the max retries before expiring notifications. + default value is taken from `rgw_topic_persistency_max_retries`. + providing a value overrides the global value. + zero value means infinite retries. +- retry_sleep_duration: This will control the frequency of retrying the notifications. + default value is taken from `rgw_topic_persistency_sleep_duration`. + providing a value overrides the global value. + zero value mean there is no delay between retries. - HTTP endpoint @@ -317,6 +334,9 @@ The response has the following format: information. In this case, the request must be made over HTTPS. The "topic get" request will otherwise be rejected. - Persistent: This is "true" if the topic is persistent. + - TimeToLive: This will limit the time (in seconds) to retain the notifications. + - MaxRetries: This will limit the max retries before expiring notifications. + - RetrySleepDuration: This will control the frequency of retrying the notifications. - TopicArn: topic `ARN `_. - OpaqueData: The opaque data set on the topic. diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 1d9f69498a01..205db107fa19 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -25,14 +25,20 @@ struct event_entry_t { std::string push_endpoint_args; std::string arn_topic; ceph::coarse_real_time creation_time; + uint32_t time_to_live = DEFAULT_GLOBAL_VALUE; + uint32_t max_retries = DEFAULT_GLOBAL_VALUE; + uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE; void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(event, bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); encode(arn_topic, bl); encode(creation_time, bl); + encode(time_to_live, bl); + encode(max_retries, bl); + encode(retry_sleep_duration, bl); ENCODE_FINISH(bl); } @@ -47,6 +53,11 @@ struct event_entry_t { } else { creation_time = ceph::coarse_real_clock::zero(); } + if (struct_v > 2) { + decode(time_to_live, bl); + decode(max_retries, bl); + decode(retry_sleep_duration, bl); + } DECODE_FINISH(bl); } }; @@ -55,7 +66,7 @@ WRITE_CLASS_ENCODER(event_entry_t) struct persistency_tracker { ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()}; - uint64_t retires_num {0}; + uint32_t retires_num {0}; }; using queues_t = std::set; @@ -195,8 +206,12 @@ private: return EntryProcessingResult::Failure; } - const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live; - const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries; + const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ? + event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live; + const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ? + event_entry.max_retries : conf->rgw_topic_persistency_max_retries; + const auto topic_persistency_sleep_duration = event_entry.retry_sleep_duration != DEFAULT_GLOBAL_VALUE ? + event_entry.retry_sleep_duration : conf->rgw_topic_persistency_sleep_duration; const auto time_now = ceph::coarse_real_clock::now(); if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() && time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl)) @@ -205,8 +220,7 @@ private: << event_entry.creation_time << " time_now:" << time_now << dendl; return EntryProcessingResult::Expired; } - if (time_now - entry_persistency_tracker.last_retry_time - < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) { + if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) { return EntryProcessingResult::Sleeping; } // TODO: write back the entry with creation time as if now @@ -926,6 +940,9 @@ int publish_commit(rgw::sal::Object* obj, std::move(topic.cfg.dest.push_endpoint_args); event_entry.arn_topic = topic.cfg.dest.arn_topic; event_entry.creation_time = ceph::coarse_real_clock::now(); + event_entry.time_to_live = topic.cfg.dest.time_to_live; + event_entry.max_retries = topic.cfg.dest.max_retries; + event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration; bufferlist bl; encode(event_entry, bl); const auto& queue_name = topic.cfg.dest.arn_topic; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 36bef95e68c4..51c8b0281c68 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -413,24 +413,36 @@ void rgw_pubsub_topics::dump_xml(Formatter *f) const void rgw_pubsub_dest::dump(Formatter *f) const { + using rgw::notify::DEFAULT_GLOBAL_VALUE; + using rgw::notify::DEFAULT_CONFIG; encode_json("push_endpoint", push_endpoint, f); encode_json("push_endpoint_args", push_endpoint_args, f); encode_json("push_endpoint_topic", arn_topic, f); encode_json("stored_secret", stored_secret, f); encode_json("persistent", persistent, f); + encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f); + encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f); + encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f); } void rgw_pubsub_dest::dump_xml(Formatter *f) const { + using rgw::notify::DEFAULT_GLOBAL_VALUE; + using rgw::notify::DEFAULT_CONFIG; encode_xml("EndpointAddress", push_endpoint, f); encode_xml("EndpointArgs", push_endpoint_args, f); encode_xml("EndpointTopic", arn_topic, f); encode_xml("HasStoredSecret", stored_secret, f); encode_xml("Persistent", persistent, f); + encode_xml("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f); + encode_xml("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f); + encode_xml("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f); } std::string rgw_pubsub_dest::to_json_str() const { + using rgw::notify::DEFAULT_GLOBAL_VALUE; + using rgw::notify::DEFAULT_CONFIG; JSONFormatter f; f.open_object_section(""); encode_json("EndpointAddress", push_endpoint, &f); @@ -438,6 +450,9 @@ std::string rgw_pubsub_dest::to_json_str() const encode_json("EndpointTopic", arn_topic, &f); encode_json("HasStoredSecret", stored_secret, &f); encode_json("Persistent", persistent, &f); + encode_json("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, &f); + encode_json("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, &f); + encode_json("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, &f); f.close_section(); std::stringstream ss; f.flush(ss); diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index afab95ba3782..3a87d777d6cb 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -341,9 +341,12 @@ struct rgw_pubsub_dest { std::string arn_topic; bool stored_secret = false; bool persistent = false; + uint32_t time_to_live; + uint32_t max_retries; + uint32_t retry_sleep_duration; void encode(bufferlist& bl) const { - ENCODE_START(5, 1, bl); + ENCODE_START(6, 1, bl); encode("", bl); encode("", bl); encode(push_endpoint, bl); @@ -351,6 +354,9 @@ struct rgw_pubsub_dest { encode(arn_topic, bl); encode(stored_secret, bl); encode(persistent, bl); + encode(time_to_live, bl); + encode(max_retries, bl); + encode(retry_sleep_duration, bl); ENCODE_FINISH(bl); } @@ -372,6 +378,11 @@ struct rgw_pubsub_dest { if (struct_v >= 5) { decode(persistent, bl); } + if (struct_v >= 6) { + decode(time_to_live, bl); + decode(max_retries, bl); + decode(retry_sleep_duration, bl); + } DECODE_FINISH(bl); } @@ -625,3 +636,11 @@ public: int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const; }; +namespace rgw::notify { + + // Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration) + // defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration) + constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX; + // Used in case the topic is using the default global value for dumping in a formatter + constexpr static const std::string_view DEFAULT_CONFIG{"None"}; +} \ No newline at end of file diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index de898a093172..98ecd52e3ad9 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -88,6 +88,9 @@ class RGWPSCreateTopicOp : public RGWOp { dest.push_endpoint = s->info.args.get("push-endpoint"); s->info.args.get_bool("persistent", &dest.persistent, false); + s->info.args.get_int("time_to_live", reinterpret_cast(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE); + s->info.args.get_int("max_retries", reinterpret_cast(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE); + s->info.args.get_int("retry_sleep_duration", reinterpret_cast(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { return -EINVAL; diff --git a/src/rgw/rgw_xml.cc b/src/rgw/rgw_xml.cc index 22a62ac480a5..1bcbcdad2457 100644 --- a/src/rgw/rgw_xml.cc +++ b/src/rgw/rgw_xml.cc @@ -436,6 +436,11 @@ void encode_xml(const char *name, const string& val, Formatter *f) f->dump_string(name, val); } +void encode_xml(const char *name, const string_view & val, Formatter *f) +{ + f->dump_string(name, val); +} + void encode_xml(const char *name, const char *val, Formatter *f) { f->dump_string(name, val); diff --git a/src/rgw/rgw_xml.h b/src/rgw/rgw_xml.h index 74a8c27a095d..5d3e72789523 100644 --- a/src/rgw/rgw_xml.h +++ b/src/rgw/rgw_xml.h @@ -331,6 +331,7 @@ static void encode_xml(const char *name, const char *ns, const T& val, ceph::For } void encode_xml(const char *name, const std::string& val, ceph::Formatter *f); +void encode_xml(const char *name, const std::string_view& val, ceph::Formatter *f); void encode_xml(const char *name, const char *val, ceph::Formatter *f); void encode_xml(const char *name, bool val, ceph::Formatter *f); void encode_xml(const char *name, int val, ceph::Formatter *f); diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index b7fe128cf2df..d7e46a914aad 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3131,6 +3131,143 @@ def test_ps_s3_persistent_topic_stats(): time.sleep(delay) http_server.close() +def ps_s3_persistent_topic_configs(persistency_time, config_dict): + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(http_port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict) + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + delay = 20 + time.sleep(delay) + http_server.close() + # topic get + result = admin(['topic', 'get', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + parsed_result_dest = parsed_result["dest"] + for key, value in config_dict.items(): + assert_equal(parsed_result_dest[key], str(value)) + assert_equal(result[1], 0) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) + assert_equal(result[1], 0) + + # wait as much as ttl and check if the persistent topics have expired + time.sleep(persistency_time) + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + count = 0 + for key in bucket.list(): + count += 1 + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + if count%100 == 0: + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + client_threads = [] + start_time = time.time() + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) + assert_equal(result[1], 0) + + # wait as much as ttl and check if the persistent topics have expired + time.sleep(persistency_time) + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + time.sleep(delay) + +def create_persistency_config_string(config_dict): + str_ret = "" + for key, value in config_dict.items(): + if key != "None": + str_ret += key + "=" + str(value) + '&' + + return str_ret[:-1] + +@attr('basic_test') +def test_ps_s3_persistent_topic_configs_ttl(): + """ test persistent topic configurations with time_to_live """ + config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"} + buffer = 10 + persistency_time =config_dict["time_to_live"] + buffer + + ps_s3_persistent_topic_configs(persistency_time, config_dict) + +@attr('basic_test') +def test_ps_s3_persistent_topic_configs_max_retries(): + """ test persistent topic configurations with max_retries and retry_sleep_duration """ + config_dict = {"time_to_live": "None", "max_retries": 30, "retry_sleep_duration": 1} + buffer = 10 + persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] + buffer + + ps_s3_persistent_topic_configs(persistency_time, config_dict) + @attr('manual_test') def test_ps_s3_persistent_notification_pushback(): """ test pushing persistent notification pushback """