option(WITH_RADOSGW_BEAST_FRONTEND "Rados Gateway's Beast frontend is enabled" ON)
option(WITH_RADOSGW_BEAST_OPENSSL "Rados Gateway's Beast frontend uses OpenSSL" ON)
option(WITH_RADOSGW_AMQP_ENDPOINT "Rados Gateway's pubsub support for AMQP push endpoint" ON)
+option(WITH_RADOSGW_KAFKA_ENDPOINT "Rados Gateway's pubsub support for Kafka push endpoint" ON)
if(WITH_RADOSGW)
find_package(EXPAT REQUIRED)
%bcond_without cephfs_java
%endif
%bcond_without amqp_endpoint
+%bcond_without kafka_endpoint
%bcond_without lttng
%bcond_without libradosstriper
%bcond_without ocf
%bcond_with selinux
%bcond_with cephfs_java
%bcond_with amqp_endpoint
+%bcond_with kafka_endpoint
#Compat macro for new _fillupdir macro introduced in Nov 2017
%if ! %{defined _fillupdir}
%global _fillupdir /var/adm/fillup-templates
%if 0%{with amqp_endpoint}
BuildRequires: librabbitmq-devel
%endif
+%if 0%{with kafka_endpoint}
+BuildRequires: librdkafka-devel
+%endif
%if 0%{with make_check}
BuildRequires: jq
BuildRequires: libuuid-devel
-DWITH_RADOSGW_AMQP_ENDPOINT=ON \
%else
-DWITH_RADOSGW_AMQP_ENDPOINT=OFF \
+%endif
+%if 0%{with kafka_endpoint}
+ -DWITH_RADOSGW_KAFKA_ENDPOINT=ON \
+%else
+ -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF \
%endif
-DBOOST_J=$CEPH_SMP_NCPUS \
-DWITH_GRAFANA=ON
--- /dev/null
+find_path(rdkafka_INCLUDE_DIR
+ NAMES librdkafka/rdkafka.h)
+
+find_library(rdkafka_LIBRARY
+ NAMES rdkafka)
+
+include(FindPackageHandleStandardArgs)
+
+find_package_handle_standard_args(RDKafka DEFAULT_MSG
+ rdkafka_INCLUDE_DIR
+ rdkafka_LIBRARY)
+
+if(RDKafka_FOUND AND NOT (TARGET RDKafka::RDKafka))
+ add_library(RDKafka::RDKafka UNKNOWN IMPORTED)
+ set_target_properties(RDKafka::RDKafka PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${rdkafka_INCLUDE_DIR}"
+ IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+ IMPORTED_LOCATION "${rdkafka_LIBRARY}")
+endif()
libxml2-dev,
# Crimson libyaml-cpp-dev,
librabbitmq-dev,
+ librdkafka-dev,
# Make-Check libxmlsec1,
# Make-Check libxmlsec1-nss,
# Make-Check libxmlsec1-openssl,
opensuse*|suse|sles)
PYBUILD="3"
ARGS+=" -DWITH_RADOSGW_AMQP_ENDPOINT=OFF"
+ ARGS+=" -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF"
;;
esac
elif [ "$(uname)" == FreeBSD ] ; then
PYBUILD="3"
ARGS+=" -DWITH_RADOSGW_AMQP_ENDPOINT=OFF"
+ ARGS+=" -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF"
else
echo Unknown release
exit 1
-D CMAKE_C_FLAGS_DEBUG="$CMAKE_C_FLAGS_DEBUG" \
-D ENABLE_GIT_VERSION=OFF \
-D WITH_RADOSGW_AMQP_ENDPOINT=OFF \
+ -D WITH_RADOSGW_KAFKA_ENDPOINT=OFF \
-D WITH_SYSTEM_BOOST=ON \
-D WITH_SYSTEM_NPM=ON \
-D WITH_LTTNG=OFF \
.. contents::
Bucket notifications provide a mechanism for sending information out of the radosgw when certain events are happening on the bucket.
-Currently, notifications could be sent to HTTP and AMQP0.9.1 endpoints.
+Currently, notifications could be sent to: HTTP, AMQP0.9.1 and Kafka endpoints.
Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
the `PubSub Module`_ should be used instead of the bucket notification mechanism.
&Name=<topic-name>
&push-endpoint=<endpoint>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
- [&Attributes.entry.2.key=amqp-sck-level&Attributes.entry.2.value=ack-level]
- &Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+ [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
+ [&Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+ [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
Request parameters:
- push-endpoint: URI of an endpoint to send push notification to
+- HTTP endpoint
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+- AMQP0.9.1 endpoint
- - "none" - message is considered "delivered" if sent to broker
- - "broker" message is considered "delivered" if acked by broker
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
+
+- Kafka endpoint
+
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
.. note::
can be pulled from them. Events need to be acked. Also, events will expire and disappear
after a period of time.
-A push notification mechanism exists too, currently supporting HTTP and
-AMQP0.9.1 endpoints, on top of storing the events in Ceph. If events should only be pushed to an endpoint
+A push notification mechanism exists too, currently supporting HTTP,
+AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module.
A user can create different topics. A topic entity is defined by its user and its name. A
::
- PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=<level>][&verify-ssl=true|false]]
+ PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
Request parameters:
-- push-endpoint: URI of endpoint to send push notification to
+- push-endpoint: URI of an endpoint to send push notification to
+
+The endpoint URI may include parameters depending with the type of endpoint:
+
+- HTTP endpoint
+
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+
+- AMQP0.9.1 endpoint
+
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+- Kafka endpoint
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- - "none" - message is considered "delivered" if sent to broker
- - "broker" message is considered "delivered" if acked by broker
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
The topic ARN in the response will have the following format:
::
- PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=<level>][&verify-ssl=true|false]]
+ PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
Request parameters:
- topic-name: name of topic
- push-endpoint: URI of endpoint to send push notification to
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+The endpoint URI may include parameters depending with the type of endpoint:
+
+- HTTP endpoint
+
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+
+- AMQP0.9.1 endpoint
+
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
+
+- Kafka endpoint
+
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+ - "none": message is considered "delivered" if sent to broker
+ - "broker": message is considered "delivered" if acked by broker (default)
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker
Get Subscription Information
````````````````````````````
/* Defined if rabbitmq-c is available for rgw amqp push endpoint */
#cmakedefine WITH_RADOSGW_AMQP_ENDPOINT
+/* Defined if libedkafka is available for rgw kafka push endpoint */
+#cmakedefine WITH_RADOSGW_KAFKA_ENDPOINT
+
/* Defined if std::map::merge() is supported */
#cmakedefine HAVE_STDLIB_MAP_SPLICING
if(WITH_RADOSGW_AMQP_ENDPOINT)
list(APPEND librgw_common_srcs rgw_amqp.cc)
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ list(APPEND librgw_common_srcs rgw_kafka.cc)
+endif()
add_library(rgw_common OBJECT ${librgw_common_srcs})
if(WITH_RADOSGW_AMQP_ENDPOINT)
find_package(RabbitMQ REQUIRED)
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ find_package(RDKafka REQUIRED)
+endif()
target_link_libraries(rgw_a
PRIVATE
# used by rgw_amqp.cc
list(APPEND rgw_libs RabbitMQ::RabbitMQ)
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ # used by rgw_kafka.cc
+ list(APPEND rgw_libs RDKafka::RDKafka)
+endif()
set(rgw_schedulers_srcs
rgw_dmclock_scheduler_ctx.cc
target_link_libraries(rgw PRIVATE RabbitMQ::RabbitMQ)
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ target_link_libraries(rgw PRIVATE RDKafka::RDKafka)
+endif()
+
set_target_properties(rgw PROPERTIES OUTPUT_NAME rgw VERSION 2.0.0
SOVERSION 2)
install(TARGETS rgw DESTINATION ${CMAKE_INSTALL_LIBDIR})
if(WITH_RADOSGW_AMQP_ENDPOINT)
target_link_libraries(rgw_admin_user PRIVATE RabbitMQ::RabbitMQ)
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ target_link_libraries(rgw_admin_user PRIVATE RDKafka::RDKafka)
+endif()
if(WITH_BOOST_CONTEXT)
target_link_libraries(rgw_admin_user PRIVATE Boost::coroutine Boost::context)
endif()
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
-#include "include/compat.h"
#include "rgw_amqp.h"
#include <amqp.h>
#include <amqp_tcp_socket.h>
// key class for the connection list
struct connection_id_t {
- std::string host;
- int port;
- std::string vhost;
+ const std::string host;
+ const int port;
+ const std::string vhost;
// constructed from amqp_connection_info struct
connection_id_t(const amqp_connection_info& info)
: host(info.host), port(info.port), vhost(info.vhost) {}
};
std::string to_string(const connection_id_t& id) {
- return id.host+":"+"/"+id.vhost;
+ return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
}
// connection_t state cleaner
{
// thread safe access to the connection list
// once the iterators are fetched they are guaranteed to remain valid
- std::lock_guard<std::mutex> lock(connections_lock);
+ std::lock_guard lock(connections_lock);
conn_it = connections.begin();
end_it = connections.end();
}
if (conn->marked_for_deletion) {
ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
- std::lock_guard<std::mutex> lock(connections_lock);
+ std::lock_guard lock(connections_lock);
// erase is safe - does not invalidate any other iterator
// lock so no insertion happens at the same time
ERASE_AND_CONTINUE(conn_it, connections);
}
const connection_id_t id(info);
- std::lock_guard<std::mutex> lock(connections_lock);
+ std::lock_guard lock(connections_lock);
const auto it = connections.find(id);
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// get the number of in-flight messages
size_t get_inflight() const {
size_t sum = 0;
- std::lock_guard<std::mutex> lock(connections_lock);
+ std::lock_guard lock(connections_lock);
std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
sum += conn_pair.second->callbacks.size();
});
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+//#include "include/compat.h"
+#include "rgw_kafka.h"
+#include <librdkafka/rdkafka.h>
+#include "include/ceph_assert.h"
+#include <sstream>
+#include <cstring>
+#include <regex>
+#include <unordered_map>
+#include <string>
+#include <vector>
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <boost/lockfree/queue.hpp>
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+// TODO investigation, not necessarily issues:
+// (1) in case of single threaded writer context use spsc_queue
+// (2) check performance of emptying queue to local list, and go over the list and publish
+// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
+
+// cmparisson operator between topic pointer and name
+bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
+ return name == std::string_view(rd_kafka_topic_name(rkt));
+}
+
+namespace rgw::kafka {
+
+// status codes for publishing
+static const int STATUS_CONNECTION_CLOSED = -0x1002;
+static const int STATUS_QUEUE_FULL = -0x1003;
+static const int STATUS_MAX_INFLIGHT = -0x1004;
+static const int STATUS_MANAGER_STOPPED = -0x1005;
+// status code for connection opening
+static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
+static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002;
+static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003;
+
+static const int STATUS_CREATE_TOPIC_FAILED = -0x3008;
+static const int NO_REPLY_CODE = 0x0;
+static const int STATUS_OK = 0x0;
+
+// struct for holding the callback and its tag in the callback list
+struct reply_callback_with_tag_t {
+ uint64_t tag;
+ reply_callback_t cb;
+
+ reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
+
+ bool operator==(uint64_t rhs) {
+ return tag == rhs;
+ }
+};
+
+typedef std::vector<reply_callback_with_tag_t> CallbackList;
+
+// struct for holding the connection state object as well as list of topics
+// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
+// since references to deleted objects may still exist in the calling code
+struct connection_t {
+ rd_kafka_t* producer = nullptr;
+ rd_kafka_conf_t* temp_conf = nullptr;
+ std::vector<rd_kafka_topic_t*> topics;
+ bool marked_for_deletion = false;
+ uint64_t delivery_tag = 1;
+ int status;
+ mutable std::atomic<int> ref_count = 0;
+ CephContext* cct = nullptr;
+ CallbackList callbacks;
+
+ // cleanup of all internal connection resource
+ // the object can still remain, and internal connection
+ // resources created again on successful reconnection
+ void destroy(int s) {
+ status = s;
+ // destroy temporary conf (if connection was never established)
+ if (temp_conf) {
+ rd_kafka_conf_destroy(temp_conf);
+ return;
+ }
+ // wait for all remaining acks/nacks
+ rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
+ // destroy all topics
+ std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
+ // destroy producer
+ rd_kafka_destroy(producer);
+ // fire all remaining callbacks (if not fired by rd_kafka_flush)
+ std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+ cb_tag.cb(status);
+ ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+ });
+ callbacks.clear();
+ delivery_tag = 1;
+ }
+
+ bool is_ok() const {
+ return (producer != nullptr && !marked_for_deletion);
+ }
+
+ // dtor also destroys the internals
+ ~connection_t() {
+ destroy(STATUS_CONNECTION_CLOSED);
+ }
+
+ friend void intrusive_ptr_add_ref(const connection_t* p);
+ friend void intrusive_ptr_release(const connection_t* p);
+};
+
+// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
+void intrusive_ptr_add_ref(const connection_t* p) {
+ ++p->ref_count;
+}
+void intrusive_ptr_release(const connection_t* p) {
+ if (--p->ref_count == 0) {
+ delete p;
+ }
+}
+
+// convert int status to string - including RGW specific values
+std::string status_to_string(int s) {
+ switch (s) {
+ case STATUS_CONNECTION_CLOSED:
+ return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
+ case STATUS_QUEUE_FULL:
+ return "RGW_KAFKA_STATUS_QUEUE_FULL";
+ case STATUS_MAX_INFLIGHT:
+ return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
+ case STATUS_MANAGER_STOPPED:
+ return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
+ case STATUS_CONF_ALLOC_FAILED:
+ return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+ case STATUS_CREATE_PRODUCER_FAILED:
+ return "STATUS_CREATE_PRODUCER_FAILED";
+ case STATUS_CREATE_TOPIC_FAILED:
+ return "STATUS_CREATE_TOPIC_FAILED";
+ }
+ // TODO: how to handle "s" in this case?
+ return std::string(rd_kafka_err2str(rd_kafka_last_error()));
+}
+
+void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
+ ceph_assert(opaque);
+
+ const auto conn = reinterpret_cast<connection_t*>(opaque);
+ const auto result = rkmessage->err;
+
+ if (!rkmessage->_private) {
+ ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
+ return;
+ }
+
+ const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private);
+ const auto& callbacks_end = conn->callbacks.end();
+ const auto& callbacks_begin = conn->callbacks.begin();
+ const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
+ if (tag_it != callbacks_end) {
+ ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
+ *tag << " and result=" << result << dendl;
+ tag_it->cb(result);
+ conn->callbacks.erase(tag_it);
+ } else {
+ // TODO add counter for acks with no callback
+ ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
+ *tag << dendl;
+ }
+ delete tag;
+ // rkmessage is destroyed automatically by librdkafka
+}
+
+// utility function to create a connection, when the connection object already exists
+connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) {
+ // pointer must be valid and not marked for deletion
+ ceph_assert(conn && !conn->marked_for_deletion);
+
+ // reset all status codes
+ conn->status = STATUS_OK;
+
+ char errstr[512];
+
+ conn->temp_conf = rd_kafka_conf_new();
+ if (!conn->temp_conf) {
+ conn->status = STATUS_CONF_ALLOC_FAILED;
+ return conn;
+ }
+
+ // get list of brokers based on the bootsrap broker
+ if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ conn->status = STATUS_GET_BROKER_LIST_FAILED;
+ // TODO: use errstr
+ return conn;
+ }
+
+ // set the global callback for delivery success/fail
+ rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
+
+ // set the global opaque pointer to be the connection itself
+ rd_kafka_conf_set_opaque (conn->temp_conf, conn.get());
+
+ // create the producer
+ conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
+ if (conn->producer) {
+ conn->status = STATUS_CREATE_PRODUCER_FAILED;
+ // TODO: use errstr
+ return conn;
+ }
+
+ // conf ownership passed to producer
+ conn->temp_conf = nullptr;
+ return conn;
+}
+
+
+// utility function to create a new connection
+connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) {
+ // create connection state
+ connection_ptr_t conn = new connection_t;
+ conn->cct = cct;
+ return create_connection(conn, broker);
+}
+
+/// struct used for holding messages in the message queue
+struct message_wrapper_t {
+ connection_ptr_t conn;
+ std::string topic;
+ std::string message;
+ reply_callback_t cb;
+
+ message_wrapper_t(connection_ptr_t& _conn,
+ const std::string& _topic,
+ const std::string& _message,
+ reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+};
+
+// parse a URL of the form: kafka://<host>[:port]
+// to a: host[:port]
+int parse_url(const std::string& url, std::string& broker) {
+ std::regex url_regex (
+ R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)",
+ std::regex::extended
+ );
+ const auto HOST_AND_PORT = 4;
+ std::smatch url_match_result;
+ if (std::regex_match(url, url_match_result, url_regex)) {
+ broker = url_match_result[HOST_AND_PORT];
+ return 0;
+ }
+ return -1;
+}
+
+typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
+typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
+
+// macros used inside a loop where an iterator is either incremented or erased
+#define INCREMENT_AND_CONTINUE(IT) \
+ ++IT; \
+ continue;
+
+#define ERASE_AND_CONTINUE(IT,CONTAINER) \
+ IT=CONTAINER.erase(IT); \
+ --connection_count; \
+ continue;
+
+class Manager {
+public:
+ const size_t max_connections;
+ const size_t max_inflight;
+ const size_t max_queue;
+private:
+ std::atomic<size_t> connection_count;
+ bool stopped;
+ int read_timeout_ms;
+ ConnectionList connections;
+ MessageQueue messages;
+ std::atomic<size_t> queued;
+ std::atomic<size_t> dequeued;
+ CephContext* const cct;
+ mutable std::mutex connections_lock;
+ std::thread runner;
+
+ // TODO use rd_kafka_produce_batch for better performance
+ void publish_internal(message_wrapper_t* message) {
+ const std::unique_ptr<message_wrapper_t> msg_owner(message);
+ auto& conn = message->conn;
+
+ if (!conn->is_ok()) {
+ // connection had an issue while message was in the queue
+ // TODO add error stats
+ ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl;
+ if (message->cb) {
+ message->cb(STATUS_CONNECTION_CLOSED);
+ }
+ return;
+ }
+
+ // create a new topic unless it was already created
+ auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
+ rd_kafka_topic_t* topic = nullptr;
+ if (topic_it == conn->topics.end()) {
+ topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+ if (!topic) {
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl;
+ if (message->cb) {
+ message->cb(STATUS_CREATE_TOPIC_FAILED);
+ }
+ conn->destroy(STATUS_CREATE_TOPIC_FAILED);
+ return;
+ }
+ // TODO use the topics list as an LRU cache
+ conn->topics.push_back(topic);
+ ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
+ } else {
+ topic = *topic_it;
+ ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
+ }
+
+ const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
+ const auto rc = rd_kafka_produce(
+ topic,
+ // TODO: non builtin partitioning
+ RD_KAFKA_PARTITION_UA,
+ // make a copy of the payload
+ // so it is safe to pass the pointer from the string
+ RD_KAFKA_MSG_F_COPY,
+ message->message.data(),
+ message->message.length(),
+ // optional key and its length
+ nullptr,
+ 0,
+ // opaque data: tag, used in the global callback
+ // in order to invoke the real callback
+ // null if no callback exists
+ tag);
+ if (rc == -1) {
+ const auto err = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+ // TODO: dont error on full queue, retry instead
+ // immediatly invoke callback on error if needed
+ if (message->cb) {
+ message->cb(err);
+ }
+ delete tag;
+ }
+
+ if (tag) {
+ auto const q_len = conn->callbacks.size();
+ if (q_len < max_inflight) {
+ ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+ conn->callbacks.emplace_back(*tag, message->cb);
+ } else {
+ // immediately invoke callback with error
+ ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
+ message->cb(STATUS_MAX_INFLIGHT);
+ // tag will be deleted when the global callback is invoked
+ }
+ } else {
+ ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl;
+ }
+ }
+
+ // the managers thread:
+ // (1) empty the queue of messages to be published
+ // (2) loop over all connections and read acks
+ // (3) manages deleted connections
+ // (4) TODO reconnect on connection errors
+ // (5) TODO cleanup timedout callbacks
+ void run() {
+ while (!stopped) {
+
+ // publish all messages in the queue
+ auto event_count = 0U;
+ const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+ dequeued += count;
+ ConnectionList::iterator conn_it;
+ ConnectionList::const_iterator end_it;
+ {
+ // thread safe access to the connection list
+ // once the iterators are fetched they are guaranteed to remain valid
+ std::lock_guard lock(connections_lock);
+ conn_it = connections.begin();
+ end_it = connections.end();
+ }
+ // loop over all connections to read acks
+ for (;conn_it != end_it;) {
+
+ auto& conn = conn_it->second;
+ // delete the connection if marked for deletion
+ if (conn->marked_for_deletion) {
+ ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
+ conn->destroy(STATUS_CONNECTION_CLOSED);
+ std::lock_guard lock(connections_lock);
+ // erase is safe - does not invalidate any other iterator
+ // lock so no insertion happens at the same time
+ ERASE_AND_CONTINUE(conn_it, connections);
+ }
+
+ // try to reconnect the connection if it has an error
+ if (!conn->is_ok()) {
+ const auto& broker = conn_it->first;
+ ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
+ if (create_connection(conn, broker)->is_ok() == false) {
+ ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
+ // TODO: add error counter for failed retries
+ // TODO: add exponential backoff for retries
+ } else {
+ ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
+ }
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ event_count += rd_kafka_poll(conn->producer, read_timeout_ms);
+
+ // just increment the iterator
+ ++conn_it;
+ }
+ // if no messages were received or published
+ // across all connection, sleep for 100ms
+ if (count == 0 && event_count) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ }
+ }
+
+ // used in the dtor for message cleanup
+ static void delete_message(const message_wrapper_t* message) {
+ delete message;
+ }
+
+public:
+ Manager(size_t _max_connections,
+ size_t _max_inflight,
+ size_t _max_queue,
+ int _read_timeout_ms,
+ CephContext* _cct) :
+ max_connections(_max_connections),
+ max_inflight(_max_inflight),
+ max_queue(_max_queue),
+ connection_count(0),
+ stopped(false),
+ read_timeout_ms(_read_timeout_ms),
+ connections(_max_connections),
+ messages(max_queue),
+ queued(0),
+ dequeued(0),
+ cct(_cct),
+ runner(&Manager::run, this) {
+ // The hashmap has "max connections" as the initial number of buckets,
+ // and allows for 10 collisions per bucket before rehash.
+ // This is to prevent rehashing so that iterators are not invalidated
+ // when a new connection is added.
+ connections.max_load_factor(10.0);
+ // give the runner thread a name for easier debugging
+ const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager");
+ ceph_assert(rc==0);
+ }
+
+ // non copyable
+ Manager(const Manager&) = delete;
+ const Manager& operator=(const Manager&) = delete;
+
+ // stop the main thread
+ void stop() {
+ stopped = true;
+ }
+
+ // disconnect from a broker
+ bool disconnect(connection_ptr_t& conn) {
+ if (!conn || stopped) {
+ return false;
+ }
+ conn->marked_for_deletion = true;
+ return true;
+ }
+
+ // connect to a broker, or reuse an existing connection if already connected
+ connection_ptr_t connect(const std::string& url) {
+ if (stopped) {
+ // TODO: increment counter
+ ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
+ return nullptr;
+ }
+
+ std::string broker;
+ if (0 != parse_url(url, broker)) {
+ // TODO: increment counter
+ ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
+ return nullptr;
+ }
+
+ std::lock_guard lock(connections_lock);
+ const auto it = connections.find(broker);
+ if (it != connections.end()) {
+ if (it->second->marked_for_deletion) {
+ // TODO: increment counter
+ ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
+ return nullptr;
+ }
+ // connection found - return even if non-ok
+ ldout(cct, 20) << "Kafka connect: connection found" << dendl;
+ return it->second;
+ }
+
+ // connection not found, creating a new one
+ if (connection_count >= max_connections) {
+ // TODO: increment counter
+ ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
+ return nullptr;
+ }
+ const auto conn = create_new_connection(broker, cct);
+ // create_new_connection must always return a connection object
+ // even if error occurred during creation.
+ // in such a case the creation will be retried in the main thread
+ ceph_assert(conn);
+ ++connection_count;
+ ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
+ ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl;
+ return connections.emplace(broker, conn).first->second;
+ }
+
+ // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
+ int publish(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message) {
+ if (stopped) {
+ return STATUS_MANAGER_STOPPED;
+ }
+ if (!conn || !conn->is_ok()) {
+ return STATUS_CONNECTION_CLOSED;
+ }
+ if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+ ++queued;
+ return STATUS_OK;
+ }
+ return STATUS_QUEUE_FULL;
+ }
+
+ int publish_with_confirm(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
+ if (stopped) {
+ return STATUS_MANAGER_STOPPED;
+ }
+ if (!conn || !conn->is_ok()) {
+ return STATUS_CONNECTION_CLOSED;
+ }
+ if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+ ++queued;
+ return STATUS_OK;
+ }
+ return STATUS_QUEUE_FULL;
+ }
+
+ // dtor wait for thread to stop
+ // then connection are cleaned-up
+ ~Manager() {
+ stopped = true;
+ runner.join();
+ messages.consume_all(delete_message);
+ }
+
+ // get the number of connections
+ size_t get_connection_count() const {
+ return connection_count;
+ }
+
+ // get the number of in-flight messages
+ size_t get_inflight() const {
+ size_t sum = 0;
+ std::lock_guard lock(connections_lock);
+ std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+ sum += conn_pair.second->callbacks.size();
+ });
+ return sum;
+ }
+
+ // running counter of the queued messages
+ size_t get_queued() const {
+ return queued;
+ }
+
+ // running counter of the dequeued messages
+ size_t get_dequeued() const {
+ return dequeued;
+ }
+};
+
+// singleton manager
+// note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
+static Manager* s_manager = nullptr;
+
+static const size_t MAX_CONNECTIONS_DEFAULT = 256;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
+static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const int READ_TIMEOUT_MS_DEFAULT = 500;
+
+bool init(CephContext* cct) {
+ if (s_manager) {
+ return false;
+ }
+ // TODO: take conf from CephContext
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
+ return true;
+}
+
+void shutdown() {
+ delete s_manager;
+ s_manager = nullptr;
+}
+
+connection_ptr_t connect(const std::string& url) {
+ if (!s_manager) return nullptr;
+ return s_manager->connect(url);
+}
+
+int publish(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message) {
+ if (!s_manager) return STATUS_MANAGER_STOPPED;
+ return s_manager->publish(conn, topic, message);
+}
+
+int publish_with_confirm(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
+ if (!s_manager) return STATUS_MANAGER_STOPPED;
+ return s_manager->publish_with_confirm(conn, topic, message, cb);
+}
+
+size_t get_connection_count() {
+ if (!s_manager) return 0;
+ return s_manager->get_connection_count();
+}
+
+size_t get_inflight() {
+ if (!s_manager) return 0;
+ return s_manager->get_inflight();
+}
+
+size_t get_queued() {
+ if (!s_manager) return 0;
+ return s_manager->get_queued();
+}
+
+size_t get_dequeued() {
+ if (!s_manager) return 0;
+ return s_manager->get_dequeued();
+}
+
+size_t get_max_connections() {
+ if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
+ return s_manager->max_connections;
+}
+
+size_t get_max_inflight() {
+ if (!s_manager) return MAX_INFLIGHT_DEFAULT;
+ return s_manager->max_inflight;
+}
+
+size_t get_max_queue() {
+ if (!s_manager) return MAX_QUEUE_DEFAULT;
+ return s_manager->max_queue;
+}
+
+bool disconnect(connection_ptr_t& conn) {
+ if (!s_manager) return false;
+ return s_manager->disconnect(conn);
+}
+
+} // namespace kafka
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+#include <functional>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
+class CephContext;
+
+namespace rgw::kafka {
+// forward declaration of connection object
+struct connection_t;
+
+typedef boost::intrusive_ptr<connection_t> connection_ptr_t;
+
+// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr
+void intrusive_ptr_add_ref(const connection_t* p);
+void intrusive_ptr_release(const connection_t* p);
+
+// the reply callback is expected to get an integer parameter
+// indicating the result, and not to return anything
+typedef std::function<void(int)> reply_callback_t;
+
+// initialize the kafka manager
+bool init(CephContext* cct);
+
+// shutdown the kafka manager
+void shutdown();
+
+// connect to a kafka endpoint
+connection_ptr_t connect(const std::string& url);
+
+// publish a message over a connection that was already created
+int publish(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message);
+
+// publish a message over a connection that was already created
+// and pass a callback that will be invoked (async) when broker confirms
+// receiving the message
+int publish_with_confirm(connection_ptr_t& conn,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb);
+
+// convert the integer status returned from the "publish" function to a string
+std::string status_to_string(int s);
+
+// number of connections
+size_t get_connection_count();
+
+// return the number of messages that were sent
+// to broker, but were not yet acked/nacked/timedout
+size_t get_inflight();
+
+// running counter of successfully queued messages
+size_t get_queued();
+
+// running counter of dequeued messages
+size_t get_dequeued();
+
+// number of maximum allowed connections
+size_t get_max_connections();
+
+// number of maximum allowed inflight messages
+size_t get_max_inflight();
+
+// maximum number of messages in the queue
+size_t get_max_queue();
+
+// disconnect from a kafka broker
+bool disconnect(connection_ptr_t& conn);
+
+}
+
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
#include "rgw_amqp.h"
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
#if defined(WITH_RADOSGW_BEAST_FRONTEND)
#include "rgw_asio_frontend.h"
#endif /* WITH_RADOSGW_BEAST_FRONTEND */
if (!rgw::amqp::init(cct.get())) {
dout(1) << "ERROR: failed to initialize AMQP manager" << dendl;
}
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ if (!rgw::kafka::init(cct.get())) {
+ dout(1) << "ERROR: failed to initialize Kafka manager" << dendl;
+ }
#endif
}
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
rgw::amqp::shutdown();
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ rgw::kafka::shutdown();
+#endif
rgw_perf_stop(g_ceph_context);
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
#include "rgw_amqp.h"
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
#include <boost/asio/yield.hpp>
#include <boost/algorithm/string.hpp>
#include <functional>
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
private:
- enum ack_level_t {
- ACK_LEVEL_NONE,
- ACK_LEVEL_BROKER,
- ACK_LEVEL_ROUTEABLE
+ enum class ack_level_t {
+ None,
+ Broker,
+ Routable
};
CephContext* const cct;
const std::string endpoint;
str_ack_level = args.get("amqp-ack-level", &exists);
if (!exists || str_ack_level == "broker") {
// "broker" is default
- ack_level = ACK_LEVEL_BROKER;
+ ack_level = ack_level_t::Broker;
} else if (str_ack_level == "none") {
- ack_level = ACK_LEVEL_NONE;
+ ack_level = ack_level_t::None;
} else if (str_ack_level == "routable") {
- ack_level = ACK_LEVEL_ROUTEABLE;
+ ack_level = ack_level_t::Routable;
} else {
throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
ceph_assert(conn);
- if (ack_level == ACK_LEVEL_NONE) {
+ if (ack_level == ack_level_t::None) {
return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags
RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
ceph_assert(conn);
- if (ack_level == ACK_LEVEL_NONE) {
+ if (ack_level == ack_level_t::None) {
return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
} else {
// TODO: currently broker and routable are the same - this will require different flags
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
ceph_assert(conn);
- if (ack_level == ACK_LEVEL_NONE) {
+ if (ack_level == ack_level_t::None) {
return amqp::publish(conn, topic, json_format_pubsub_event(record));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
static const std::string AMQP_SCHEMA("amqp");
#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
+private:
+ enum class ack_level_t {
+ None,
+ Broker,
+ };
+ CephContext* const cct;
+ const std::string endpoint;
+ const std::string topic;
+ kafka::connection_ptr_t conn;
+ ack_level_t ack_level;
+ std::string str_ack_level;
+
+ // NoAckPublishCR implements async kafka publishing via coroutine
+ // This coroutine ends when it send the message and does not wait for an ack
+ class NoAckPublishCR : public RGWCoroutine {
+ private:
+ const std::string topic;
+ kafka::connection_ptr_t conn;
+ const std::string message;
+
+ public:
+ NoAckPublishCR(CephContext* cct,
+ const std::string& _topic,
+ kafka::connection_ptr_t& _conn,
+ const std::string& _message) :
+ RGWCoroutine(cct),
+ topic(_topic), conn(_conn), message(_message) {}
+
+ // send message to endpoint, without waiting for reply
+ int operate() override {
+ reenter(this) {
+ const auto rc = kafka::publish(conn, topic, message);
+ if (rc < 0) {
+ return set_cr_error(rc);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
+
+ // AckPublishCR implements async kafka publishing via coroutine
+ // This coroutine ends when an ack is received from the borker
+ // note that it does not wait for an ack fron the end client
+ class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
+ private:
+ const std::string topic;
+ kafka::connection_ptr_t conn;
+ const std::string message;
+
+ public:
+ AckPublishCR(CephContext* cct,
+ const std::string& _topic,
+ kafka::connection_ptr_t& _conn,
+ const std::string& _message) :
+ RGWCoroutine(cct),
+ topic(_topic), conn(_conn), message(_message) {}
+
+ // send message to endpoint, waiting for reply
+ int operate() override {
+ reenter(this) {
+ yield {
+ init_new_io(this);
+ const auto rc = kafka::publish_with_confirm(conn,
+ topic,
+ message,
+ std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ return set_cr_error(rc);
+ }
+ // mark as blocked on the kafka answer
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ io_block();
+ return 0;
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
+ // callback invoked from the kafka manager thread when ack/nack is received
+ void request_complete(int status) {
+ ceph_assert(!is_done());
+ if (status != 0) {
+ // server replied with a nack
+ set_cr_error(status);
+ }
+ io_complete();
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ }
+
+ // TODO: why are these mandatory in RGWIOProvider?
+ void set_io_user_info(void *_user_info) override {
+ }
+
+ void *get_io_user_info() override {
+ return nullptr;
+ }
+ };
+
+public:
+ RGWPubSubKafkaEndpoint(const std::string& _endpoint,
+ const std::string& _topic,
+ const RGWHTTPArgs& args,
+ CephContext* _cct) :
+ cct(_cct),
+ endpoint(_endpoint),
+ topic(_topic),
+ conn(kafka::connect(endpoint)) {
+ if (!conn) {
+ throw configuration_error("Kafka: failed to create connection to: " + endpoint);
+ }
+ bool exists;
+ // get ack level
+ str_ack_level = args.get("kafka-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ ack_level = ack_level_t::Broker;
+ } else if (str_ack_level == "none") {
+ ack_level = ack_level_t::None;
+ } else {
+ throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+ }
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
+ if (ack_level == ack_level_t::None) {
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+ } else {
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+ }
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
+ if (ack_level == ack_level_t::None) {
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ } else {
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ }
+ }
+
+ // this allows waiting untill "finish()" is called from a different thread
+ // waiting could be blocking the waiting thread or yielding, depending
+ // with compilation flag support and whether the optional_yield is set
+ class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ mutable std::atomic<bool> done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ {
+ std::unique_lock l{lock};
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ }
+ return init.result.get();
+ }
+
+ public:
+ int wait(optional_yield y) {
+ if (done) {
+ return ret;
+ }
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& io_ctx = y.get_io_context();
+ auto& yield_ctx = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(io_ctx, yield_ctx[ec]);
+ return -ec.value();
+ }
+#endif
+ std::unique_lock l(lock);
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
+ }
+
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.notify_all();
+ }
+ }
+ };
+
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ ceph_assert(conn);
+ if (ack_level == ack_level_t::None) {
+ return kafka::publish(conn, topic, json_format_pubsub_event(record));
+ } else {
+ // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+ auto w = std::unique_ptr<Waiter>(new Waiter);
+ const auto rc = kafka::publish_with_confirm(conn,
+ topic,
+ json_format_pubsub_event(record),
+ std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ return rc;
+ }
+ return w->wait(y);
+ }
+ }
+
+ std::string to_str() const override {
+ std::string str("Kafka Endpoint");
+ str += "\nURI: " + endpoint;
+ str += "\nTopic: " + topic;
+ str += "\nAck Level: " + str_ack_level;
+ return str;
+ }
+};
+
+static const std::string KAFKA_SCHEMA("kafka");
+#endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+
static const std::string WEBHOOK_SCHEMA("webhook");
static const std::string UNKNOWN_SCHEMA("unknown");
static const std::string NO_SCHEMA("");
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
} else if (schema == "amqp") {
return AMQP_SCHEMA;
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ } else if (schema == "kafka") {
+ return KAFKA_SCHEMA;
#endif
}
return UNKNOWN_SCHEMA;
} else if (schema == "amqps") {
throw configuration_error("AMQP: ssl not supported");
return nullptr;
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ } else if (schema == KAFKA_SCHEMA) {
+ return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
#endif
}
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
#include "rgw_amqp.h"
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
#include <boost/algorithm/hex.hpp>
#include <boost/asio/yield.hpp>
ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
}
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ if (!rgw::kafka::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
+ }
+#endif
}
RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
rgw::amqp::shutdown();
#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ rgw::kafka::shutdown();
+#endif
}
RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
if(WITH_RADOSGW_AMQP_ENDPOINT)
list(APPEND rgw_libs amqp_mock)
endif()
+ if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ list(APPEND rgw_libs kafka_stub)
+ endif()
add_subdirectory(rgw)
endif(WITH_RADOSGW)
if(WITH_RBD)
add_library(amqp_mock STATIC ${amqp_mock_src})
endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ # kafka stub library
+ set(kafka_stub_src
+ kafka_stub.cc)
+ add_library(kafka_stub STATIC ${kafka_stub_src})
+endif()
+
#unittest_rgw_bencode
add_executable(unittest_rgw_bencode test_rgw_bencode.cc)
add_ceph_unittest(unittest_rgw_bencode)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <librdkafka/rdkafka.h>
+
+const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt) {
+ return "";
+}
+
+rd_kafka_resp_err_t rd_kafka_last_error() {
+ return rd_kafka_resp_err_t();
+}
+
+const char *rd_kafka_err2str(rd_kafka_resp_err_t err) {
+ return "";
+}
+
+rd_kafka_conf_t *rd_kafka_conf_new() {
+ return nullptr;
+}
+
+rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
+ const char *name,
+ const char *value,
+ char *errstr, size_t errstr_size) {
+ return rd_kafka_conf_res_t();
+}
+
+void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
+ void (*dr_msg_cb) (rd_kafka_t *rk,
+ const rd_kafka_message_t *
+ rkmessage,
+ void *opaque)) {}
+
+void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque) {}
+
+rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
+ char *errstr, size_t errstr_size) {
+ return nullptr;
+}
+
+void rd_kafka_conf_destroy(rd_kafka_conf_t *conf) {}
+
+rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
+ return rd_kafka_resp_err_t();
+}
+
+void rd_kafka_destroy(rd_kafka_t *rk) {}
+
+rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
+ rd_kafka_topic_conf_t *conf) {
+ return nullptr;
+}
+
+int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
+ int msgflags,
+ void *payload, size_t len,
+ const void *key, size_t keylen,
+ void *msg_opaque) {
+ return 0;
+}
+
+int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
+ return 0;
+}
+
+void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt) {}
+
# log.info('rabbitmq directories already removed')
+# Kafka endpoint functions
+
+kafka_server = 'localhost'
+
+class KafkaReceiver(object):
+ """class for receiving and storing messages on a topic from the kafka broker"""
+ def __init__(self, topic):
+ from kafka import KafkaConsumer
+ remaining_retries = 10
+ while remaining_retries > 0:
+ try:
+ self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server)
+ print('Kafka consumer created on topic: '+topic)
+ break
+ except Exception as error:
+ remaining_retries -= 1
+ print('failed to connect to kafka (remaining retries '
+ + str(remaining_retries) + '): ' + str(error))
+ time.sleep(1)
+
+ if remaining_retries == 0:
+ raise Exception('failed to connect to kafka - no retries left')
+
+ self.events = []
+ self.topic = topic
+ self.stop = False
+
+ def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ """verify stored s3 records agains a list of keys"""
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ self.events = []
+
+
+def kafka_receiver_thread_runner(receiver):
+ """main thread function for the kafka receiver"""
+ try:
+ log.info('Kafka receiver started')
+ print('Kafka receiver started')
+ for msg in receiver.consumer:
+ if receiver.stop:
+ break
+ receiver.events.append(json.loads(msg.value))
+ log.info('Kafka receiver ended')
+ print('Kafka receiver ended')
+ except Exception as error:
+ log.info('Kafka receiver ended unexpectedly: %s', str(error))
+ print('Kafka receiver ended unexpectedly: ' + str(error))
+
+
+def create_kafka_receiver_thread(topic):
+ """create kafka receiver and thread"""
+ receiver = KafkaReceiver(topic)
+ task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
+ task.daemon = True
+ return task, receiver
+
+def stop_kafka_receiver(receiver, task):
+ """stop the receiver thread and wait for it to finis"""
+ receiver.stop = True
+ task.join(5)
+ try:
+ receiver.consumer.close()
+ except Exception as error:
+ log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
+
+
+def init_kafka():
+ """ start kafka/zookeeper """
+ KAFKA_DIR = os.environ['KAFKA_DIR']
+ if KAFKA_DIR == '':
+ log.info('KAFKA_DIR must be set to where kafka is installed')
+ print('KAFKA_DIR must be set to where kafka is installed')
+ return None, None, None
+
+ DEVNULL = open(os.devnull, 'wb')
+
+ print('\nStarting zookeeper...')
+ try:
+ zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL)
+ except Exception as error:
+ log.info('failed to execute zookeeper: %s', str(error))
+ print('failed to execute zookeeper: %s' % str(error))
+ return None, None, None
+
+ time.sleep(5)
+ if zk_proc.poll() is not None:
+ print('zookeeper failed to start')
+ return None, None, None
+ print('Zookeeper started')
+ print('Starting kafka...')
+ kafka_log = open('./kafka.log', 'w')
+ try:
+ kafka_proc = subprocess.Popen([
+ KAFKA_DIR+'bin/kafka-server-start.sh',
+ KAFKA_DIR+'config/server.properties'],
+ stdout=kafka_log)
+ except Exception as error:
+ log.info('failed to execute kafka: %s', str(error))
+ print('failed to execute kafka: %s' % str(error))
+ zk_proc.terminate()
+ kafka_log.close()
+ return None, None, None
+
+ # TODO add kafka checkpoint instead of sleep
+ time.sleep(15)
+ if kafka_proc.poll() is not None:
+ zk_proc.terminate()
+ print('kafka failed to start. details in: ./kafka.log')
+ kafka_log.close()
+ return None, None, None
+
+ print('Kafka started')
+ return kafka_proc, zk_proc, kafka_log
+
+
+def clean_kafka(kafka_proc, zk_proc, kafka_log):
+ """ stop kafka/zookeeper """
+ try:
+ kafka_log.close()
+ kafka_proc.terminate()
+ zk_proc.terminate()
+ except:
+ log.info('kafka/zookeeper already terminated')
+
+
def init_env(require_ps=True):
"""initialize the environment"""
if require_ps:
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print 'average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
print('wait for 5sec for the messages...')
time.sleep(5)
err = 'amqp receiver 2 should have no deletions'
assert False, err
-
# cleanup
stop_amqp_receiver(receiver1, task1)
stop_amqp_receiver(receiver2, task2)
clean_rabbitmq(proc)
+def test_ps_s3_notification_push_kafka():
+ """ test pushing kafka s3 notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ kafka_proc, zk_proc, kafka_log = init_kafka()
+ if kafka_proc is None or zk_proc is None:
+ return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+
+ zones, ps_zones = init_env(require_ps=True)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+
+ # create topic
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ endpoint='kafka://' + kafka_server,
+ endpoint_args='kafka-ack-level=broker')
+ result, status = topic_conf.set_config()
+ assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ # wait for sync
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+ clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
+def test_ps_s3_notification_push_kafka_on_master():
+ """ test pushing kafka s3 notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ kafka_proc, zk_proc, kafka_log = init_kafka()
+ if kafka_proc is None or zk_proc is None:
+ return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+ task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'kafka://' + kafka_server
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
+ topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ },
+ {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+ clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
def test_ps_s3_notification_push_http_on_master():
""" test pushing http s3 notification on master """
if skip_push_tests:
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
print('wait for 5sec for the messages...')
time.sleep(5)
list(APPEND DENCODER_EXTRALIBS
rabbitmq)
endif()
+ if(WITH_RADOSGW_KAFKA_ENDPOINT)
+ list(APPEND DENCODER_EXTRALIBS
+ rdkafka)
+ endif()
endif()
if(WITH_RBD)