.. note:: If the notification fails with an error, cannot be delivered, or
times out, it is retried until it is successfully acknowledged.
+ You can control its retry with time_to_live/max_retries to have a time/retry limit and
+ control the retry frequency with retry_sleep_duration
.. tip:: To minimize the latency added by asynchronous notification, we
recommended placing the "log" pool on fast media.
[&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]
[&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false]
[&Attributes.entry.11.key=mechanism&Attributes.entry.11.value=<mechanism>]
+ [&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=<seconds to live>]
+ [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
+ [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
Request parameters:
notifications that are triggered by the topic.
- persistent: This indicates whether notifications to this endpoint are
persistent (=asynchronous) or not persistent. (This is "false" by default.)
+- time_to_live: This will limit the time (in seconds) to retain the notifications.
+ default value is taken from `rgw_topic_persistency_time_to_live`.
+ providing a value overrides the global value.
+ zero value means infinite time to live.
+- max_retries: This will limit the max retries before expiring notifications.
+ default value is taken from `rgw_topic_persistency_max_retries`.
+ providing a value overrides the global value.
+ zero value means infinite retries.
+- retry_sleep_duration: This will control the frequency of retrying the notifications.
+ default value is taken from `rgw_topic_persistency_sleep_duration`.
+ providing a value overrides the global value.
+ zero value mean there is no delay between retries.
- HTTP endpoint
information. In this case, the request must be made over HTTPS. The "topic
get" request will otherwise be rejected.
- Persistent: This is "true" if the topic is persistent.
+ - TimeToLive: This will limit the time (in seconds) to retain the notifications.
+ - MaxRetries: This will limit the max retries before expiring notifications.
+ - RetrySleepDuration: This will control the frequency of retrying the notifications.
- TopicArn: topic `ARN
<https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
- OpaqueData: The opaque data set on the topic.
std::string push_endpoint_args;
std::string arn_topic;
ceph::coarse_real_time creation_time;
+ uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
+ uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
+ uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(event, bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
encode(creation_time, bl);
+ encode(time_to_live, bl);
+ encode(max_retries, bl);
+ encode(retry_sleep_duration, bl);
ENCODE_FINISH(bl);
}
} else {
creation_time = ceph::coarse_real_clock::zero();
}
+ if (struct_v > 2) {
+ decode(time_to_live, bl);
+ decode(max_retries, bl);
+ decode(retry_sleep_duration, bl);
+ }
DECODE_FINISH(bl);
}
};
struct persistency_tracker {
ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
- uint64_t retires_num {0};
+ uint32_t retires_num {0};
};
using queues_t = std::set<std::string>;
return EntryProcessingResult::Failure;
}
- const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live;
- const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries;
+ const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
+ event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
+ const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
+ event_entry.max_retries : conf->rgw_topic_persistency_max_retries;
+ const auto topic_persistency_sleep_duration = event_entry.retry_sleep_duration != DEFAULT_GLOBAL_VALUE ?
+ event_entry.retry_sleep_duration : conf->rgw_topic_persistency_sleep_duration;
const auto time_now = ceph::coarse_real_clock::now();
if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
<< event_entry.creation_time << " time_now:" << time_now << dendl;
return EntryProcessingResult::Expired;
}
- if (time_now - entry_persistency_tracker.last_retry_time
- < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) {
+ if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
return EntryProcessingResult::Sleeping;
}
// TODO: write back the entry with creation time as if now
std::move(topic.cfg.dest.push_endpoint_args);
event_entry.arn_topic = topic.cfg.dest.arn_topic;
event_entry.creation_time = ceph::coarse_real_clock::now();
+ event_entry.time_to_live = topic.cfg.dest.time_to_live;
+ event_entry.max_retries = topic.cfg.dest.max_retries;
+ event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
bufferlist bl;
encode(event_entry, bl);
const auto& queue_name = topic.cfg.dest.arn_topic;
void rgw_pubsub_dest::dump(Formatter *f) const
{
+ using rgw::notify::DEFAULT_GLOBAL_VALUE;
+ using rgw::notify::DEFAULT_CONFIG;
encode_json("push_endpoint", push_endpoint, f);
encode_json("push_endpoint_args", push_endpoint_args, f);
encode_json("push_endpoint_topic", arn_topic, f);
encode_json("stored_secret", stored_secret, f);
encode_json("persistent", persistent, f);
+ encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
+ encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
+ encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
}
void rgw_pubsub_dest::dump_xml(Formatter *f) const
{
+ using rgw::notify::DEFAULT_GLOBAL_VALUE;
+ using rgw::notify::DEFAULT_CONFIG;
encode_xml("EndpointAddress", push_endpoint, f);
encode_xml("EndpointArgs", push_endpoint_args, f);
encode_xml("EndpointTopic", arn_topic, f);
encode_xml("HasStoredSecret", stored_secret, f);
encode_xml("Persistent", persistent, f);
+ encode_xml("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
+ encode_xml("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
+ encode_xml("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
}
std::string rgw_pubsub_dest::to_json_str() const
{
+ using rgw::notify::DEFAULT_GLOBAL_VALUE;
+ using rgw::notify::DEFAULT_CONFIG;
JSONFormatter f;
f.open_object_section("");
encode_json("EndpointAddress", push_endpoint, &f);
encode_json("EndpointTopic", arn_topic, &f);
encode_json("HasStoredSecret", stored_secret, &f);
encode_json("Persistent", persistent, &f);
+ encode_json("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, &f);
+ encode_json("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, &f);
+ encode_json("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, &f);
f.close_section();
std::stringstream ss;
f.flush(ss);
std::string arn_topic;
bool stored_secret = false;
bool persistent = false;
+ uint32_t time_to_live;
+ uint32_t max_retries;
+ uint32_t retry_sleep_duration;
void encode(bufferlist& bl) const {
- ENCODE_START(5, 1, bl);
+ ENCODE_START(6, 1, bl);
encode("", bl);
encode("", bl);
encode(push_endpoint, bl);
encode(arn_topic, bl);
encode(stored_secret, bl);
encode(persistent, bl);
+ encode(time_to_live, bl);
+ encode(max_retries, bl);
+ encode(retry_sleep_duration, bl);
ENCODE_FINISH(bl);
}
if (struct_v >= 5) {
decode(persistent, bl);
}
+ if (struct_v >= 6) {
+ decode(time_to_live, bl);
+ decode(max_retries, bl);
+ decode(retry_sleep_duration, bl);
+ }
DECODE_FINISH(bl);
}
int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
};
+namespace rgw::notify {
+
+ // Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration)
+ // defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration)
+ constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
+ // Used in case the topic is using the default global value for dumping in a formatter
+ constexpr static const std::string_view DEFAULT_CONFIG{"None"};
+}
\ No newline at end of file
dest.push_endpoint = s->info.args.get("push-endpoint");
s->info.args.get_bool("persistent", &dest.persistent, false);
+ s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
+ s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
+ s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
return -EINVAL;
f->dump_string(name, val);
}
+void encode_xml(const char *name, const string_view & val, Formatter *f)
+{
+ f->dump_string(name, val);
+}
+
void encode_xml(const char *name, const char *val, Formatter *f)
{
f->dump_string(name, val);
}
void encode_xml(const char *name, const std::string& val, ceph::Formatter *f);
+void encode_xml(const char *name, const std::string_view& val, ceph::Formatter *f);
void encode_xml(const char *name, const char *val, ceph::Formatter *f);
void encode_xml(const char *name, bool val, ceph::Formatter *f);
void encode_xml(const char *name, int val, ceph::Formatter *f);
time.sleep(delay)
http_server.close()
+def ps_s3_persistent_topic_configs(persistency_time, config_dict):
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict)
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ delay = 20
+ time.sleep(delay)
+ http_server.close()
+ # topic get
+ result = admin(['topic', 'get', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ parsed_result_dest = parsed_result["dest"]
+ for key, value in config_dict.items():
+ assert_equal(parsed_result_dest[key], str(value))
+ assert_equal(result[1], 0)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+ assert_equal(result[1], 0)
+
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ count = 0
+ for key in bucket.list():
+ count += 1
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ if count%100 == 0:
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ client_threads = []
+ start_time = time.time()
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+ assert_equal(result[1], 0)
+
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ time.sleep(delay)
+
+def create_persistency_config_string(config_dict):
+ str_ret = ""
+ for key, value in config_dict.items():
+ if key != "None":
+ str_ret += key + "=" + str(value) + '&'
+
+ return str_ret[:-1]
+
+@attr('basic_test')
+def test_ps_s3_persistent_topic_configs_ttl():
+ """ test persistent topic configurations with time_to_live """
+ config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
+ buffer = 10
+ persistency_time =config_dict["time_to_live"] + buffer
+
+ ps_s3_persistent_topic_configs(persistency_time, config_dict)
+
+@attr('basic_test')
+def test_ps_s3_persistent_topic_configs_max_retries():
+ """ test persistent topic configurations with max_retries and retry_sleep_duration """
+ config_dict = {"time_to_live": "None", "max_retries": 30, "retry_sleep_duration": 1}
+ buffer = 10
+ persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] + buffer
+
+ ps_s3_persistent_topic_configs(persistency_time, config_dict)
+
@attr('manual_test')
def test_ps_s3_persistent_notification_pushback():
""" test pushing persistent notification pushback """