};
const std::string topic;
const ack_level_t ack_level;
- std::string conn_name;
-
+ kafka::connection_id_t conn_id;
ack_level_t get_ack_level(const RGWHTTPArgs& args) {
bool exists;
const RGWHTTPArgs& args) :
topic(_topic),
ack_level(get_ack_level(args)) {
- if (!kafka::connect(conn_name, _endpoint,
- get_bool(args, "use-ssl", false),
- get_bool(args, "verify-ssl", true),
- args.get_optional("ca-location"),
- args.get_optional("mechanism"),
- args.get_optional("user-name"),
- args.get_optional("password"))) {
- throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
- }
- }
+ if (!kafka::connect(
+ conn_id, _endpoint, get_bool(args, "use-ssl", false),
+ get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
+ args.get_optional("mechanism"), args.get_optional("user-name"),
+ args.get_optional("password"))) {
+ throw configuration_error("Kafka: failed to create connection to: " +
+ _endpoint);
+ }
+ }
int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
- return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
+ return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
auto w = std::make_unique<Waiter>();
- const auto rc = kafka::publish_with_confirm(conn_name,
- topic,
- json_format_pubsub_event(event),
- [wp = w.get()](int r) { wp->finish(r); }
- );
+ const auto rc = kafka::publish_with_confirm(
+ conn_id, topic, json_format_pubsub_event(event),
+ [wp = w.get()](int r) { wp->finish(r); });
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;
std::string to_str() const override {
std::string str("Kafka Endpoint");
- str += "\nBroker: " + conn_name;
+ str += "\nBroker: " + to_string(conn_id);
str += "\nTopic: " + topic;
return str;
}
#include <thread>
#include <atomic>
#include <mutex>
+#include <boost/functional/hash.hpp>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
}
}
+connection_id_t::connection_id_t(
+ const std::string& _broker,
+ const std::string& _user,
+ const std::string& _password,
+ const boost::optional<const std::string&>& _ca_location,
+ const boost::optional<const std::string&>& _mechanism,
+ bool _ssl)
+ : broker(_broker), user(_user), password(_password), ssl(_ssl) {
+ if (_ca_location.has_value()) {
+ ca_location = _ca_location.get();
+ }
+ if (_mechanism.has_value()) {
+ mechanism = _mechanism.get();
+ }
+}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+ return lhs.broker == rhs.broker && lhs.user == rhs.user &&
+ lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
+ lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl;
+}
+
+struct connection_id_hasher {
+ std::size_t operator()(const connection_id_t& k) const {
+ std::size_t h = 0;
+ boost::hash_combine(h, k.broker);
+ boost::hash_combine(h, k.user);
+ boost::hash_combine(h, k.password);
+ boost::hash_combine(h, k.ca_location);
+ boost::hash_combine(h, k.mechanism);
+ boost::hash_combine(h, k.ssl);
+ return h;
+ }
+};
+
+std::string to_string(const connection_id_t& id) {
+ return id.broker + ":" + id.user;
+}
+
// convert int status to errno - both RGW and librdkafka values
inline int status_to_errno(int s) {
if (s == 0) return 0;
ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
rd_kafka_err2str(result) << dendl;
} else {
- ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
- rd_kafka_err2str(result) << dendl;
+ ldout(conn->cct, 1) << "Kafka run: nack received with result="
+ << rd_kafka_err2str(result)
+ << " for broker: " << conn->broker << dendl;
}
if (!rkmessage->_private) {
// struct used for holding messages in the message queue
struct message_wrapper_t {
- std::string conn_name;
+ connection_id_t conn_id;
std::string topic;
std::string message;
const reply_callback_t cb;
-
- message_wrapper_t(const std::string& _conn_name,
- const std::string& _topic,
- const std::string& _message,
- reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
+
+ message_wrapper_t(const connection_id_t& _conn_id,
+ const std::string& _topic,
+ const std::string& _message,
+ reply_callback_t _cb)
+ : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
};
-typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
+typedef std::
+ unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher>
+ ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
class Manager {
// TODO use rd_kafka_produce_batch for better performance
void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_deleter(message);
- const auto conn_it = connections.find(message->conn_name);
+ const auto conn_it = connections.find(message->conn_id);
if (conn_it == connections.end()) {
ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl;
if (message->cb) {
tag);
if (rc == -1) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: failed to produce for topic: "
+ << message->topic
+ << ". with error: " << rd_kafka_err2str(err) << dendl;
// immediatly invoke callback on error if needed
if (message->cb) {
message->cb(-rd_kafka_err2errno(err));
}
// connect to a broker, or reuse an existing connection if already connected
- bool connect(std::string& broker,
- const std::string& url,
- bool use_ssl,
- bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism,
- boost::optional<const std::string&> topic_user_name,
- boost::optional<const std::string&> topic_password) {
+ bool connect(connection_id_t& conn_id,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> topic_user_name,
+ boost::optional<const std::string&> topic_password) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
std::string user;
std::string password;
+ std::string broker;
if (!parse_url_authority(url, broker, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
return false;
}
-
+ connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+ use_ssl);
std::lock_guard lock(connections_lock);
- const auto it = connections.find(broker);
+ const auto it = connections.find(tmp_id);
// note that ssl vs. non-ssl connection to the same host are two separate connections
if (it != connections.end()) {
// connection found - return even if non-ok
- ldout(cct, 20) << "Kafka connect: connection found" << dendl;
- return it->second.get();
+ ldout(cct, 20) << "Kafka connect: connection found: " << to_string(tmp_id)
+ << dendl;
+ conn_id = std::move(tmp_id);
+ return true;
}
// connection not found, creating a new one
return false;
}
++connection_count;
- connections.emplace(broker, std::move(conn));
+ connections.emplace(tmp_id, std::move(conn));
- ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
+ ldout(cct, 10) << "Kafka connect: new connection is created: "
+ << to_string(tmp_id)
+ << " . Total connections: " << connection_count << dendl;
+ conn_id = std::move(tmp_id);
return true;
}
// TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
- int publish(const std::string& conn_name,
- const std::string& topic,
- const std::string& message) {
+ int publish(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message) {
if (stopped) {
return -ESRCH;
}
- auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, nullptr);
+ auto message_wrapper =
+ std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
if (messages.push(message_wrapper.get())) {
std::ignore = message_wrapper.release();
++queued;
}
return -EBUSY;
}
-
- int publish_with_confirm(const std::string& conn_name,
- const std::string& topic,
- const std::string& message,
- reply_callback_t cb) {
+
+ int publish_with_confirm(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
if (stopped) {
return -ESRCH;
}
- auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, cb);
+ auto message_wrapper =
+ std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
if (messages.push(message_wrapper.get())) {
std::ignore = message_wrapper.release();
++queued;
s_manager = nullptr;
}
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism,
- boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password) {
+bool connect(connection_id_t& conn_id,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> user_name,
+ boost::optional<const std::string&> password) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
- return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
+ return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
+ mechanism, user_name, password);
}
-int publish(const std::string& conn_name,
- const std::string& topic,
- const std::string& message) {
+int publish(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return -ESRCH;
- return s_manager->publish(conn_name, topic, message);
+ return s_manager->publish(conn_id, topic, message);
}
-int publish_with_confirm(const std::string& conn_name,
- const std::string& topic,
- const std::string& message,
- reply_callback_t cb) {
+int publish_with_confirm(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return -ESRCH;
- return s_manager->publish_with_confirm(conn_name, topic, message, cb);
+ return s_manager->publish_with_confirm(conn_id, topic, message, cb);
}
size_t get_connection_count() {
// shutdown the kafka manager
void shutdown();
+// key class for the connection list
+struct connection_id_t {
+ std::string broker;
+ std::string user;
+ std::string password;
+ std::string ca_location;
+ std::string mechanism;
+ bool ssl = false;
+ connection_id_t() = default;
+ connection_id_t(const std::string& _broker,
+ const std::string& _user,
+ const std::string& _password,
+ const boost::optional<const std::string&>& _ca_location,
+ const boost::optional<const std::string&>& _mechanism,
+ bool _ssl);
+};
+
+std::string to_string(const connection_id_t& id);
+
// connect to a kafka endpoint
-bool connect(std::string& broker,
- const std::string& url,
- bool use_ssl,
- bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism,
- boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password);
+bool connect(connection_id_t& conn_id,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> user_name,
+ boost::optional<const std::string&> password);
// publish a message over a connection that was already created
-int publish(const std::string& conn_name,
- const std::string& topic,
- const std::string& message);
+int publish(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message);
// publish a message over a connection that was already created
// and pass a callback that will be invoked (async) when broker confirms
// receiving the message
-int publish_with_confirm(const std::string& conn_name,
- const std::string& topic,
- const std::string& message,
- reply_callback_t cb);
+int publish_with_confirm(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb);
// convert the integer status returned from the "publish" function to a string
std::string status_to_string(int s);
conn.delete_bucket(bucket_name)
if receiver is not None:
stop_kafka_receiver(receiver, task)
+
+
+@attr('kafka_test')
+def test_connection_caching():
+ """ test connection caching """
+ conn = connection()
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name_1 = bucket_name + TOPIC_SUFFIX + "-without-ssl"
+ topic_name_2 = bucket_name + TOPIC_SUFFIX + "-with-ssl"
+
+ # start kafka receiver
+ task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1)
+ task_1.start()
+ task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2)
+ task_2.start()
+ endpoint_address = 'kafka://' + kafka_server
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
+
+ # initially create both s3 topics with `use-ssl=true`
+ zonegroup = get_config_zonegroup()
+ topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup,
+ endpoint_args=endpoint_args)
+ topic_arn_1 = topic_conf_1.set_config()
+ topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup,
+ endpoint_args=endpoint_args)
+ topic_arn_2 = topic_conf_2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1,
+ 'Events': []
+ },
+ {'Id': notification_name + '_2', 'TopicArn': topic_arn_2,
+ 'Events': []}]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status / 100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024))
+ thr = threading.Thread(target=set_contents_from_string,
+ args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + async notification is: ' + str(
+ time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target=key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + async notification is: ' + str(
+ time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+ time.sleep(30)
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name_1],
+ get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+ # remove the ssl from topic1 and update the topic.
+ endpoint_address = 'kafka://' + kafka_server
+ topic_conf_1.set_attributes(attribute_name="use-ssl",
+ attribute_val="false")
+ keys = list(bucket.list())
+ wait_for_queue_to_drain(topic_name_1)
+ receiver_1.verify_s3_events(keys, deletions=True)
+ # topic stats for 2nd topic will still reflect entries
+ result = admin(['topic', 'stats', '--topic', topic_name_2],
+ get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf_1.del_config()
+ topic_conf_2.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ if receiver_1 is not None:
+ stop_kafka_receiver(receiver_1, task_1)
+ if receiver_2 is not None:
+ stop_kafka_receiver(receiver_2, task_2)