]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW: added a per topic configuration to control the notification persistency
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Wed, 9 Aug 2023 11:53:04 +0000 (14:53 +0300)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Mon, 4 Sep 2023 07:20:55 +0000 (10:20 +0300)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
doc/radosgw/notifications.rst
src/rgw/driver/rados/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_xml.cc
src/rgw/rgw_xml.h
src/test/rgw/bucket_notification/test_bn.py

index 1cc538432f3dce16d2ffd241c8d6e53250b5e3ee..52c7e1b1adcb336b9f7908555b20b75c636197c7 100644 (file)
@@ -63,6 +63,8 @@ added when the notification is committed to persistent storage.
 
 .. note:: If the notification fails with an error, cannot be delivered, or
    times out, it is retried until it is successfully acknowledged.
+   You can control its retry with time_to_live/max_retries to have a time/retry limit and
+   control the retry frequency with retry_sleep_duration
 
 .. tip:: To minimize the latency added by asynchronous notification, we 
    recommended placing the "log" pool on fast media.
@@ -154,6 +156,9 @@ updating, use the name of an existing topic and different endpoint values).
    [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]
    [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false]
    [&Attributes.entry.11.key=mechanism&Attributes.entry.11.value=<mechanism>]
+   [&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=<seconds to live>]
+   [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
+   [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
 
 Request parameters:
 
@@ -162,6 +167,18 @@ Request parameters:
   notifications that are triggered by the topic.
 - persistent: This indicates whether notifications to this endpoint are
   persistent (=asynchronous) or not persistent. (This is "false" by default.)
+- time_to_live: This will limit the time (in seconds) to retain the notifications.
+  default value is taken from `rgw_topic_persistency_time_to_live`.
+  providing a value overrides the global value.
+  zero value means infinite time to live.
+- max_retries: This will limit the max retries before expiring notifications.
+  default value is taken from `rgw_topic_persistency_max_retries`.
+  providing a value overrides the global value.
+  zero value means infinite retries.
+- retry_sleep_duration: This will control the frequency of retrying the notifications.
+  default value is taken from `rgw_topic_persistency_sleep_duration`.
+  providing a value overrides the global value.
+  zero value mean there is no delay between retries.
 
 - HTTP endpoint
 
@@ -317,6 +334,9 @@ The response has the following format:
      information. In this case, the request must be made over HTTPS. The "topic
      get" request will otherwise be rejected.
    - Persistent: This is "true" if the topic is persistent.
+   - TimeToLive: This will limit the time (in seconds) to retain the notifications.
+   - MaxRetries: This will limit the max retries before expiring notifications.
+   - RetrySleepDuration: This will control the frequency of retrying the notifications.
 - TopicArn: topic `ARN
   <https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
 - OpaqueData: The opaque data set on the topic.
index 1d9f69498a018d07960f0c0b37e0ff1d3a123d45..205db107fa19633d154e1d9aea9ce33dcb14b0b8 100644 (file)
@@ -25,14 +25,20 @@ struct event_entry_t {
   std::string push_endpoint_args;
   std::string arn_topic;
   ceph::coarse_real_time creation_time;
+  uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
+  uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
+  uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
   
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(event, bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
     encode(creation_time, bl);
+    encode(time_to_live, bl);
+    encode(max_retries, bl);
+    encode(retry_sleep_duration, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -47,6 +53,11 @@ struct event_entry_t {
     } else {
       creation_time = ceph::coarse_real_clock::zero();
     }
+    if (struct_v > 2) {
+      decode(time_to_live, bl);
+      decode(max_retries, bl);
+      decode(retry_sleep_duration, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -55,7 +66,7 @@ WRITE_CLASS_ENCODER(event_entry_t)
 
 struct persistency_tracker {
   ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
-  uint64_t retires_num {0};
+  uint32_t retires_num {0};
 };
 
 using queues_t = std::set<std::string>;
@@ -195,8 +206,12 @@ private:
       return EntryProcessingResult::Failure;
     }
 
-    const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live;
-    const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries;
+    const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
+        event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
+    const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
+        event_entry.max_retries : conf->rgw_topic_persistency_max_retries;
+    const auto topic_persistency_sleep_duration = event_entry.retry_sleep_duration != DEFAULT_GLOBAL_VALUE ?
+        event_entry.retry_sleep_duration : conf->rgw_topic_persistency_sleep_duration;
     const auto time_now = ceph::coarse_real_clock::now();
     if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
          time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
@@ -205,8 +220,7 @@ private:
                           << event_entry.creation_time << " time_now:" << time_now << dendl;
       return EntryProcessingResult::Expired;
     }
-    if (time_now - entry_persistency_tracker.last_retry_time
-        < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) {
+    if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
       return EntryProcessingResult::Sleeping;
     }
     // TODO: write back the entry with creation time as if now
@@ -926,6 +940,9 @@ int publish_commit(rgw::sal::Object* obj,
        std::move(topic.cfg.dest.push_endpoint_args);
       event_entry.arn_topic = topic.cfg.dest.arn_topic;
       event_entry.creation_time = ceph::coarse_real_clock::now();
+      event_entry.time_to_live = topic.cfg.dest.time_to_live;
+      event_entry.max_retries = topic.cfg.dest.max_retries;
+      event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
       bufferlist bl;
       encode(event_entry, bl);
       const auto& queue_name = topic.cfg.dest.arn_topic;
index 36bef95e68c4306d0bdef034617a701e20da4dec..51c8b0281c68bccdb8283b7fe4eb487e0b014c9f 100644 (file)
@@ -413,24 +413,36 @@ void rgw_pubsub_topics::dump_xml(Formatter *f) const
 
 void rgw_pubsub_dest::dump(Formatter *f) const
 {
+  using rgw::notify::DEFAULT_GLOBAL_VALUE;
+  using rgw::notify::DEFAULT_CONFIG;
   encode_json("push_endpoint", push_endpoint, f);
   encode_json("push_endpoint_args", push_endpoint_args, f);
   encode_json("push_endpoint_topic", arn_topic, f);
   encode_json("stored_secret", stored_secret, f);
   encode_json("persistent", persistent, f);
+  encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
+  encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
+  encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
 }
 
 void rgw_pubsub_dest::dump_xml(Formatter *f) const
 {
+  using rgw::notify::DEFAULT_GLOBAL_VALUE;
+  using rgw::notify::DEFAULT_CONFIG;
   encode_xml("EndpointAddress", push_endpoint, f);
   encode_xml("EndpointArgs", push_endpoint_args, f);
   encode_xml("EndpointTopic", arn_topic, f);
   encode_xml("HasStoredSecret", stored_secret, f);
   encode_xml("Persistent", persistent, f);
+  encode_xml("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
+  encode_xml("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
+  encode_xml("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
 }
 
 std::string rgw_pubsub_dest::to_json_str() const
 {
+  using rgw::notify::DEFAULT_GLOBAL_VALUE;
+  using rgw::notify::DEFAULT_CONFIG;
   JSONFormatter f;
   f.open_object_section("");
   encode_json("EndpointAddress", push_endpoint, &f);
@@ -438,6 +450,9 @@ std::string rgw_pubsub_dest::to_json_str() const
   encode_json("EndpointTopic", arn_topic, &f);
   encode_json("HasStoredSecret", stored_secret, &f);
   encode_json("Persistent", persistent, &f);
+  encode_json("TimeToLive", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, &f);
+  encode_json("MaxRetries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, &f);
+  encode_json("RetrySleepDuration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, &f);
   f.close_section();
   std::stringstream ss;
   f.flush(ss);
index afab95ba378259de039675acb2aeaa3ffd5da88e..3a87d777d6cbbafe81a6a6ca945fde1d5ad14e28 100644 (file)
@@ -341,9 +341,12 @@ struct rgw_pubsub_dest {
   std::string arn_topic;
   bool stored_secret = false;
   bool persistent = false;
+  uint32_t time_to_live;
+  uint32_t max_retries;
+  uint32_t retry_sleep_duration;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(5, 1, bl);
+    ENCODE_START(6, 1, bl);
     encode("", bl);
     encode("", bl);
     encode(push_endpoint, bl);
@@ -351,6 +354,9 @@ struct rgw_pubsub_dest {
     encode(arn_topic, bl);
     encode(stored_secret, bl);
     encode(persistent, bl);
+    encode(time_to_live, bl);
+    encode(max_retries, bl);
+    encode(retry_sleep_duration, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -372,6 +378,11 @@ struct rgw_pubsub_dest {
     if (struct_v >= 5) {
         decode(persistent, bl);
     }
+    if (struct_v >= 6) {
+      decode(time_to_live, bl);
+      decode(max_retries, bl);
+      decode(retry_sleep_duration, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -625,3 +636,11 @@ public:
   int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
 };
 
+namespace rgw::notify {
+
+  // Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration)
+  // defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration)
+  constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
+  // Used in case the topic is using the default global value for dumping in a formatter
+  constexpr static const std::string_view DEFAULT_CONFIG{"None"};
+}
\ No newline at end of file
index de898a09317219eea5518e051f9a5f80408870bd..98ecd52e3ad91bd4448cfb6b9fd189f5bd831f89 100644 (file)
@@ -88,6 +88,9 @@ class RGWPSCreateTopicOp : public RGWOp {
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
     s->info.args.get_bool("persistent", &dest.persistent, false);
+    s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
+    s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
+    s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
       return -EINVAL;
index 22a62ac480a5a3ea94b8790bd452ddc36ad77c16..1bcbcdad2457b491412a6212801e46830725b2e3 100644 (file)
@@ -436,6 +436,11 @@ void encode_xml(const char *name, const string& val, Formatter *f)
   f->dump_string(name, val);
 }
 
+void encode_xml(const char *name, const string_view & val, Formatter *f)
+{
+  f->dump_string(name, val);
+}
+
 void encode_xml(const char *name, const char *val, Formatter *f)
 {
   f->dump_string(name, val);
index 74a8c27a095de1e1e3a563d1c8a217d43b399a88..5d3e727895232f8f28261bae26d120f76420689f 100644 (file)
@@ -331,6 +331,7 @@ static void encode_xml(const char *name, const char *ns, const T& val, ceph::For
 }
 
 void encode_xml(const char *name, const std::string& val, ceph::Formatter *f);
+void encode_xml(const char *name, const std::string_view& val, ceph::Formatter *f);
 void encode_xml(const char *name, const char *val, ceph::Formatter *f);
 void encode_xml(const char *name, bool val, ceph::Formatter *f);
 void encode_xml(const char *name, int val, ceph::Formatter *f);
index b7fe128cf2dfb4b0a1b230abfb3d278cc4d62d9f..d7e46a914aad07a726d6da5d788251bc5d161238 100644 (file)
@@ -3131,6 +3131,143 @@ def test_ps_s3_persistent_topic_stats():
     time.sleep(delay)
     http_server.close()
 
+def ps_s3_persistent_topic_configs(persistency_time, config_dict):
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(http_port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict)
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    delay = 20
+    time.sleep(delay)
+    http_server.close()
+    # topic get
+    result = admin(['topic', 'get', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    parsed_result_dest = parsed_result["dest"]
+    for key, value in config_dict.items():
+        assert_equal(parsed_result_dest[key], str(value))
+    assert_equal(result[1], 0)
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+    assert_equal(result[1], 0)
+
+    # wait as much as ttl and check if the persistent topics have expired
+    time.sleep(persistency_time)
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    count = 0
+    for key in bucket.list():
+        count += 1
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+        if count%100 == 0:
+            [thr.join() for thr in client_threads]
+            time_diff = time.time() - start_time
+            print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+            client_threads = []
+            start_time = time.time()
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+    assert_equal(result[1], 0)
+
+    # wait as much as ttl and check if the persistent topics have expired
+    time.sleep(persistency_time)
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    time.sleep(delay)
+
+def create_persistency_config_string(config_dict):
+    str_ret = ""
+    for key, value in config_dict.items():
+        if key != "None":
+            str_ret += key + "=" + str(value) + '&'
+
+    return str_ret[:-1]
+
+@attr('basic_test')
+def test_ps_s3_persistent_topic_configs_ttl():
+    """ test persistent topic configurations with time_to_live """
+    config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
+    buffer = 10
+    persistency_time =config_dict["time_to_live"] + buffer
+
+    ps_s3_persistent_topic_configs(persistency_time, config_dict)
+
+@attr('basic_test')
+def test_ps_s3_persistent_topic_configs_max_retries():
+    """ test persistent topic configurations with max_retries and retry_sleep_duration """
+    config_dict = {"time_to_live": "None", "max_retries": 30, "retry_sleep_duration": 1}
+    buffer = 10
+    persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] + buffer
+
+    ps_s3_persistent_topic_configs(persistency_time, config_dict)
+
 @attr('manual_test')
 def test_ps_s3_persistent_notification_pushback():
     """ test pushing persistent notification pushback """