}
};
+static inline std::ostream& operator<<(std::ostream& out,
+ const event_entry_t& e) {
+ return out << "notification id: '" << e.event.configurationId
+ << "', topic: '" << e.arn_topic
+ << "', endpoint: '" << e.push_endpoint
+ << "', bucket_owner: '" << e.event.bucket_ownerIdentity
+ << "', bucket: '" << e.event.bucket_name
+ << "', object: '" << e.event.object_key
+ << "', event type: '" << e.event.eventName << "'";
+}
+
+
class Manager : public DoutPrefixProvider {
bool shutdown = false;
const uint32_t queues_update_period_ms;
// processing of a specific entry
// return whether processing was successful (true) or not (false)
- EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker,
- const cls_queue_entry& entry, spawn::yield_context yield) {
+ EntryProcessingResult process_entry(
+ const ConfigProxy& conf,
+ persistency_tracker& entry_persistency_tracker,
+ const cls_queue_entry& entry,
+ RGWPubSubEndpoint* const push_endpoint,
+ const rgw_pubsub_topic& topic,
+ spawn::yield_context yield) {
event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
if (event_entry.creation_time == ceph::coarse_real_clock::zero()) {
return EntryProcessingResult::Migrating;
}
-
+ // overwrite the event entry values from the topics object fetched.
+ event_entry.event.opaque_data = topic.opaque_data;
+ event_entry.arn_topic = topic.dest.arn_topic;
+ event_entry.time_to_live = topic.dest.time_to_live;
+ event_entry.max_retries = topic.dest.max_retries;
+ event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration;
const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
++entry_persistency_tracker.retires_num;
entry_persistency_tracker.last_retry_time = time_now;
- ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
- try {
- // TODO move endpoint creation to queue level
- const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
- RGWHTTPArgs(event_entry.push_endpoint_args, this),
- cct);
- ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
- " for entry: " << entry.marker << dendl;
- const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield));
- if (ret < 0) {
- ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
- << " failed. error: " << ret << " (will retry)" << dendl;
- return EntryProcessingResult::Failure;
- } else {
- ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
- << " ok" << dendl;
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- return EntryProcessingResult::Successful;
- }
- } catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
- << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
+ ldpp_dout(this, 20) << "Processing event entry with " << event_entry
+ << " retry_number: "
+ << entry_persistency_tracker.retires_num
+ << " current time: " << time_now << dendl;
+ const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield});
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
+ << " failed. error: " << ret
+ << " (will retry) for event with " << event_entry
+ << dendl;
return EntryProcessingResult::Failure;
}
+ ldpp_dout(this, 5) << "INFO: push entry marker: " << entry.marker
+ << " ok for event with " << event_entry << dendl;
+ if (perfcounter)
+ perfcounter->inc(l_rgw_pubsub_push_ok);
+ return EntryProcessingResult::Successful;
}
// clean stale reservation from queue
return ret;
}
+ int get_topic_info(const std::string& queue_name,
+ const cls_queue_entry& queue_entry,
+ rgw_pubsub_topic& topic,
+ spawn::yield_context yield) {
+ std::string queue_topic_tenant;
+ std::string queue_topic_name;
+ parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
+ rgw_pubsub_topic topic_info;
+ RGWPubSub ps(&rados_store, queue_topic_tenant, site);
+ int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
+ << queue_topic_name << " error: " << ret
+ << ". using cached topic attributes!" << dendl;
+ event_entry_t event_entry;
+ auto iter = queue_entry.data.cbegin();
+ try {
+ decode(event_entry, iter);
+ } catch (buffer::error& err) {
+ ldpp_dout(this, 1) << "ERROR: failed to decode entry. error: "
+ << err.what() << dendl;
+ return -EIO;
+ }
+ topic_info.dest.push_endpoint = event_entry.push_endpoint;
+ topic_info.dest.push_endpoint_args = event_entry.push_endpoint_args;
+ topic_info.dest.arn_topic = event_entry.arn_topic;
+ topic_info.dest.arn_topic = event_entry.arn_topic;
+ topic_info.dest.time_to_live = event_entry.time_to_live;
+ topic_info.dest.max_retries = event_entry.max_retries;
+ topic_info.dest.retry_sleep_duration = event_entry.retry_sleep_duration;
+ topic_info.opaque_data = event_entry.event.opaque_data;
+ }
+ topic = std::move(topic_info);
+ return 0;
+ }
+
// processing of a specific queue
void process_queue(const std::string& queue_name, spawn::yield_context yield) {
constexpr auto max_elements = 1024;
// log when queue is not idle
ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
". end marker is: " << end_marker << dendl;
-
+ rgw_pubsub_topic topic_info;
+ if (get_topic_info(queue_name, entries.front(), topic_info, yield) < 0) {
+ continue;
+ }
+ RGWPubSubEndpoint::Ptr push_endpoint;
+ try {
+ push_endpoint = RGWPubSubEndpoint::create(
+ topic_info.dest.push_endpoint, topic_info.dest.arn_topic,
+ RGWHTTPArgs(topic_info.dest.push_endpoint_args, this), cct);
+ ldpp_dout(this, 20)
+ << "INFO: push endpoint created: " << topic_info.dest.push_endpoint
+ << dendl;
+ } catch (const RGWPubSubEndpoint::configuration_error& e) {
+ ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
+ << topic_info.dest.push_endpoint
+ << ". error: " << e.what()
+ << " (will retry sending events) " << dendl;
+ continue;
+ }
is_idle = false;
auto has_error = false;
auto remove_entries = false;
}
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
- spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
- &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](spawn::yield_context yield) {
+ spawn::spawn(yield,[this, ¬ifs_persistency_tracker, &queue_name, entry_idx,
+ total_entries, &end_marker, &remove_entries, &has_error, &waiter,
+ &entry, &needs_migration_vector,
+ push_endpoint = push_endpoint.get(),
+ &topic_info](spawn::yield_context yield) {
const auto token = waiter.make_token();
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
- auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
+ auto result =
+ process_entry(this->get_cct()->_conf, persistency_tracker,
+ entry, push_endpoint, topic_info, yield);
if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
|| result == EntryProcessingResult::Migrating) {
ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
for conn, bucket in zip(connections_list, buckets_list):
conn.delete_bucket(bucket.name)
+
+@attr('kafka_test')
+def test_notification_caching():
+ """ test notification caching """
+ conn = connection()
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ incorrect_port = 8080
+ endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true'
+
+ # create s3 topic
+ zonegroup = get_config_zonegroup()
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup,
+ endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status / 100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024 * 1024))
+ thr = threading.Thread(target=set_contents_from_string,
+ args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + async notification is: ' + str(
+ time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target=key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + async notification is: ' + str(
+ time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+ time.sleep(30)
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name],
+ get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+ # remove the port and update the topic, so its pointing to correct endpoint.
+ endpoint_address = 'kafka://' + kafka_server
+ # update s3 topic
+ topic_conf.set_attributes(attribute_name="push-endpoint",
+ attribute_val=endpoint_address)
+ keys = list(bucket.list())
+ wait_for_queue_to_drain(topic_name)
+ receiver.verify_s3_events(keys, deletions=True)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ if receiver is not None:
+ stop_kafka_receiver(receiver, task)