]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/notification: Use topic attributes from topic rados object to create Pushendpoint...
authorkchheda3 <kchheda3@bloomberg.net>
Tue, 28 May 2024 20:50:33 +0000 (16:50 -0400)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 25 Jul 2024 13:43:32 +0000 (13:43 +0000)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
(cherry picked from commit c8f7d311ecc24befe985dd39391b38f5f1b60510)

 Conflicts:
src/rgw/driver/rados/rgw_notify.cc

src/rgw/driver/rados/rgw_notify.cc
src/test/rgw/bucket_notification/test_bn.py

index fd58079b90bf0a86d9f07888f825349d87241a34..87c0247cf0af883fe6c0386fa618d2b7caec7144 100644 (file)
@@ -61,6 +61,18 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
     }
 };
 
+static inline std::ostream& operator<<(std::ostream& out,
+                                       const event_entry_t& e) {
+  return out << "notification id: '" << e.event.configurationId
+             << "', topic: '" << e.arn_topic
+             << "', endpoint: '" << e.push_endpoint
+             << "', bucket_owner: '" << e.event.bucket_ownerIdentity
+             << "', bucket: '" << e.event.bucket_name
+             << "', object: '" << e.event.object_key
+             << "', event type: '" << e.event.eventName << "'";
+}
+
+
 class Manager : public DoutPrefixProvider {
   bool shutdown = false;
   const uint32_t queues_update_period_ms;
@@ -178,8 +190,13 @@ private:
 
   // processing of a specific entry
   // return whether processing was successful (true) or not (false)
-  EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker,
-                                      const cls_queue_entry& entry, spawn::yield_context yield) {
+  EntryProcessingResult process_entry(
+      const ConfigProxy& conf,
+      persistency_tracker& entry_persistency_tracker,
+      const cls_queue_entry& entry,
+      RGWPubSubEndpoint* const push_endpoint,
+      const rgw_pubsub_topic& topic,
+      spawn::yield_context yield) {
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
@@ -192,7 +209,12 @@ private:
     if (event_entry.creation_time == ceph::coarse_real_clock::zero()) {
       return EntryProcessingResult::Migrating;
     }
-
+    // overwrite the event entry values from the topics object fetched.
+    event_entry.event.opaque_data = topic.opaque_data;
+    event_entry.arn_topic = topic.dest.arn_topic;
+    event_entry.time_to_live = topic.dest.time_to_live;
+    event_entry.max_retries = topic.dest.max_retries;
+    event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration;
     const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
         event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
     const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
@@ -220,30 +242,23 @@ private:
 
     ++entry_persistency_tracker.retires_num;
     entry_persistency_tracker.last_retry_time = time_now;
-    ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
-    try {
-      // TODO move endpoint creation to queue level
-      const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
-          RGWHTTPArgs(event_entry.push_endpoint_args, this), 
-          cct);
-      ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
-        " for entry: " << entry.marker << dendl;
-      const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield));
-      if (ret < 0) {
-        ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
-          << " failed. error: " << ret << " (will retry)" << dendl;
-        return EntryProcessingResult::Failure;
-      } else {
-        ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
-          << " ok" <<  dendl;
-        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-        return EntryProcessingResult::Successful;
-      }
-    } catch (const RGWPubSubEndpoint::configuration_error& e) {
-      ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " 
-          << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
+    ldpp_dout(this, 20) << "Processing event entry with " << event_entry
+                        << " retry_number: "
+                        << entry_persistency_tracker.retires_num
+                        << " current time: " << time_now << dendl;
+    const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield});
+    if (ret < 0) {
+      ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
+                         << " failed. error: " << ret
+                         << " (will retry) for event with " << event_entry
+                         << dendl;
       return EntryProcessingResult::Failure;
     }
+    ldpp_dout(this, 5) << "INFO: push entry marker: " << entry.marker
+                       << " ok for event with " << event_entry << dendl;
+    if (perfcounter)
+      perfcounter->inc(l_rgw_pubsub_push_ok);
+    return EntryProcessingResult::Successful;
   }
 
   // clean stale reservation from queue
@@ -303,6 +318,42 @@ private:
     return ret;
   }
 
+  int get_topic_info(const std::string& queue_name,
+                     const cls_queue_entry& queue_entry,
+                     rgw_pubsub_topic& topic,
+                     spawn::yield_context yield) {
+    std::string queue_topic_tenant;
+    std::string queue_topic_name;
+    parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
+    rgw_pubsub_topic topic_info;
+    RGWPubSub ps(&rados_store, queue_topic_tenant, site);
+    int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr);
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
+                         << queue_topic_name << " error: " << ret
+                         << ". using cached topic attributes!" << dendl;
+      event_entry_t event_entry;
+      auto iter = queue_entry.data.cbegin();
+      try {
+        decode(event_entry, iter);
+      } catch (buffer::error& err) {
+        ldpp_dout(this, 1) << "ERROR: failed to decode entry. error: "
+                           << err.what() << dendl;
+        return -EIO;
+      }
+      topic_info.dest.push_endpoint = event_entry.push_endpoint;
+      topic_info.dest.push_endpoint_args = event_entry.push_endpoint_args;
+      topic_info.dest.arn_topic = event_entry.arn_topic;
+      topic_info.dest.arn_topic = event_entry.arn_topic;
+      topic_info.dest.time_to_live = event_entry.time_to_live;
+      topic_info.dest.max_retries = event_entry.max_retries;
+      topic_info.dest.retry_sleep_duration = event_entry.retry_sleep_duration;
+      topic_info.opaque_data = event_entry.event.opaque_data;
+    }
+    topic = std::move(topic_info);
+    return 0;
+  }
+
   // processing of a specific queue
   void process_queue(const std::string& queue_name, spawn::yield_context yield) {
     constexpr auto max_elements = 1024;
@@ -376,7 +427,25 @@ private:
       // log when queue is not idle
       ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
         ". end marker is: " << end_marker << dendl;
-      
+      rgw_pubsub_topic topic_info;
+      if (get_topic_info(queue_name, entries.front(), topic_info, yield) < 0) {
+        continue;
+      }
+      RGWPubSubEndpoint::Ptr push_endpoint;
+      try {
+        push_endpoint = RGWPubSubEndpoint::create(
+            topic_info.dest.push_endpoint, topic_info.dest.arn_topic,
+            RGWHTTPArgs(topic_info.dest.push_endpoint_args, this), cct);
+        ldpp_dout(this, 20)
+            << "INFO: push endpoint created: " << topic_info.dest.push_endpoint
+            << dendl;
+      } catch (const RGWPubSubEndpoint::configuration_error& e) {
+        ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
+                           << topic_info.dest.push_endpoint
+                           << ". error: " << e.what()
+                           << " (will retry sending events) " << dendl;
+        continue;
+      }
       is_idle = false;
       auto has_error = false;
       auto remove_entries = false;
@@ -390,11 +459,16 @@ private:
         }
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
-        spawn::spawn(yield, [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
-                             &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](spawn::yield_context yield) {
+        spawn::spawn(yield,[this, &notifs_persistency_tracker, &queue_name, entry_idx,
+           total_entries, &end_marker, &remove_entries, &has_error, &waiter,
+           &entry, &needs_migration_vector,
+           push_endpoint = push_endpoint.get(),
+           &topic_info](spawn::yield_context yield) {
             const auto token = waiter.make_token();
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
-            auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
+            auto result =
+                process_entry(this->get_cct()->_conf, persistency_tracker,
+                              entry, push_endpoint, topic_info, yield);
             if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
                 || result == EntryProcessingResult::Migrating) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
index 418d65dc4b43ff0ee2ef48a1657cae89458e5c3f..415cb9947938955480c9106ebc1190d7efb73861 100644 (file)
@@ -5420,3 +5420,89 @@ def test_ps_s3_data_path_v2_mixed_migration():
     for conn, bucket in zip(connections_list, buckets_list):
         conn.delete_bucket(bucket.name)
 
+
+@attr('kafka_test')
+def test_notification_caching():
+    """ test notification caching """
+    conn = connection()
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start kafka receiver
+    task, receiver = create_kafka_receiver_thread(topic_name)
+    task.start()
+    incorrect_port = 8080
+    endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port)
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true'
+
+    # create s3 topic
+    zonegroup = get_config_zonegroup()
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup,
+                           endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status / 100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024 * 1024))
+        thr = threading.Thread(target=set_contents_from_string,
+                               args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + async notification is: ' + str(
+      time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target=key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + async notification is: ' + str(
+      time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    time.sleep(30)
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name],
+                   get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+    # remove the port and update the topic, so its pointing to correct endpoint.
+    endpoint_address = 'kafka://' + kafka_server
+    # update s3 topic
+    topic_conf.set_attributes(attribute_name="push-endpoint",
+                              attribute_val=endpoint_address)
+    keys = list(bucket.list())
+    wait_for_queue_to_drain(topic_name)
+    receiver.verify_s3_events(keys, deletions=True)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    if receiver is not None:
+        stop_kafka_receiver(receiver, task)