]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: add kafka notification endpoint
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 10 Oct 2019 13:27:35 +0000 (16:27 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 5 Nov 2019 17:25:03 +0000 (19:25 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
21 files changed:
CMakeLists.txt
ceph.spec.in
cmake/modules/FindRDKafka.cmake [new file with mode: 0644]
debian/control
do_cmake.sh
do_freebsd.sh
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/include/config-h.in.cmake
src/rgw/CMakeLists.txt
src/rgw/rgw_amqp.cc
src/rgw/rgw_kafka.cc [new file with mode: 0644]
src/rgw/rgw_kafka.h [new file with mode: 0644]
src/rgw/rgw_main.cc
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_sync_module_pubsub.cc
src/test/CMakeLists.txt
src/test/rgw/CMakeLists.txt
src/test/rgw/kafka_stub.cc [new file with mode: 0644]
src/test/rgw/rgw_multi/tests_ps.py
src/tools/ceph-dencoder/CMakeLists.txt

index 48ff49df83ab3a82fb33d83dc702dde7bf20fe9f..603004e02e9c2afff1bcc1b896bc668768461fd4 100644 (file)
@@ -346,6 +346,7 @@ option(WITH_RADOSGW_FCGI_FRONTEND "Rados Gateway's FCGI frontend is enabled" OFF
 option(WITH_RADOSGW_BEAST_FRONTEND "Rados Gateway's Beast frontend is enabled" ON)
 option(WITH_RADOSGW_BEAST_OPENSSL "Rados Gateway's Beast frontend uses OpenSSL" ON)
 option(WITH_RADOSGW_AMQP_ENDPOINT "Rados Gateway's pubsub support for AMQP push endpoint" ON)
+option(WITH_RADOSGW_KAFKA_ENDPOINT "Rados Gateway's pubsub support for Kafka push endpoint" ON)
 
 if(WITH_RADOSGW)
   find_package(EXPAT REQUIRED)
index 519f0d257b57d54520ac45536798380f06df28f1..1e9c942d6acd94754e9e90b1650df666c884893e 100644 (file)
@@ -36,6 +36,7 @@
 %bcond_without cephfs_java
 %endif
 %bcond_without amqp_endpoint
+%bcond_without kafka_endpoint
 %bcond_without lttng
 %bcond_without libradosstriper
 %bcond_without ocf
@@ -45,6 +46,7 @@
 %bcond_with selinux
 %bcond_with cephfs_java
 %bcond_with amqp_endpoint
+%bcond_with kafka_endpoint
 #Compat macro for new _fillupdir macro introduced in Nov 2017
 %if ! %{defined _fillupdir}
 %global _fillupdir /var/adm/fillup-templates
@@ -200,6 +202,9 @@ BuildRequires:      yasm
 %if 0%{with amqp_endpoint}
 BuildRequires:  librabbitmq-devel
 %endif
+%if 0%{with kafka_endpoint}
+BuildRequires:  librdkafka-devel
+%endif
 %if 0%{with make_check}
 BuildRequires:  jq
 BuildRequires: libuuid-devel
@@ -1296,6 +1301,11 @@ ${CMAKE} .. \
     -DWITH_RADOSGW_AMQP_ENDPOINT=ON \
 %else
     -DWITH_RADOSGW_AMQP_ENDPOINT=OFF \
+%endif
+%if 0%{with kafka_endpoint}
+    -DWITH_RADOSGW_KAFKA_ENDPOINT=ON \
+%else
+    -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF \
 %endif
     -DBOOST_J=$CEPH_SMP_NCPUS \
     -DWITH_GRAFANA=ON
diff --git a/cmake/modules/FindRDKafka.cmake b/cmake/modules/FindRDKafka.cmake
new file mode 100644 (file)
index 0000000..0d2f3b7
--- /dev/null
@@ -0,0 +1,19 @@
+find_path(rdkafka_INCLUDE_DIR
+  NAMES librdkafka/rdkafka.h)
+
+find_library(rdkafka_LIBRARY
+  NAMES rdkafka)
+
+include(FindPackageHandleStandardArgs)
+
+find_package_handle_standard_args(RDKafka DEFAULT_MSG
+  rdkafka_INCLUDE_DIR
+  rdkafka_LIBRARY)
+
+if(RDKafka_FOUND AND NOT (TARGET RDKafka::RDKafka))
+  add_library(RDKafka::RDKafka UNKNOWN IMPORTED)
+  set_target_properties(RDKafka::RDKafka PROPERTIES
+    INTERFACE_INCLUDE_DIRECTORIES "${rdkafka_INCLUDE_DIR}"
+    IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+    IMPORTED_LOCATION "${rdkafka_LIBRARY}")
+endif()
index 3f43f25e776e38ccf23e8f83da5c617e40a73744..d1e549ea2b8df79945e4b8622c4ae530e001d804 100644 (file)
@@ -59,6 +59,7 @@ Build-Depends: cmake (>= 3.5),
                libxml2-dev,
 # Crimson      libyaml-cpp-dev,
                librabbitmq-dev,
+               librdkafka-dev,
 # Make-Check   libxmlsec1,
 # Make-Check   libxmlsec1-nss,
 # Make-Check   libxmlsec1-openssl,
index ad5d41f330c8c8d096eed93739111974cf89c6e9..45ea033186ffc3b7ce4b1f2135d47b49bfd63e7e 100755 (executable)
@@ -28,11 +28,13 @@ if [ -r /etc/os-release ]; then
       opensuse*|suse|sles)
           PYBUILD="3"
           ARGS+=" -DWITH_RADOSGW_AMQP_ENDPOINT=OFF"
+          ARGS+=" -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF"
           ;;
   esac
 elif [ "$(uname)" == FreeBSD ] ; then
   PYBUILD="3"
   ARGS+=" -DWITH_RADOSGW_AMQP_ENDPOINT=OFF"
+  ARGS+=" -DWITH_RADOSGW_KAFKA_ENDPOINT=OFF"
 else
   echo Unknown release
   exit 1
index 509384bee579ee9c2433521d050e0fac9d63d323..4c2422a69d8cab36e7481c88802d2784a2c2c319 100755 (executable)
@@ -43,6 +43,7 @@ mkdir ${BUILD_DIR}
        -D CMAKE_C_FLAGS_DEBUG="$CMAKE_C_FLAGS_DEBUG" \
        -D ENABLE_GIT_VERSION=OFF \
        -D WITH_RADOSGW_AMQP_ENDPOINT=OFF \
+       -D WITH_RADOSGW_KAFKA_ENDPOINT=OFF \
        -D WITH_SYSTEM_BOOST=ON \
        -D WITH_SYSTEM_NPM=ON \
        -D WITH_LTTNG=OFF \
index d5bd59d632a0e41a87138f0a83225669a0f38a7b..fee5eb0259f2dd209b04c6c0dc49f045e62fc7eb 100644 (file)
@@ -7,7 +7,7 @@ Bucket Notifications
 .. contents::
 
 Bucket notifications provide a mechanism for sending information out of the radosgw when certain events are happening on the bucket.
-Currently, notifications could be sent to HTTP and AMQP0.9.1 endpoints.
+Currently, notifications could be sent to: HTTP, AMQP0.9.1 and Kafka endpoints.
 
 Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint, 
 the `PubSub Module`_ should be used instead of the bucket notification mechanism.
@@ -67,24 +67,39 @@ To update a topic, use the same command used for topic creation, with the topic
    &Name=<topic-name>
    &push-endpoint=<endpoint>
    [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
-   [&Attributes.entry.2.key=amqp-sck-level&Attributes.entry.2.value=ack-level]
-   &Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+   [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
+   [&Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+   [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
+- HTTP endpoint 
 
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
 
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+- AMQP0.9.1 endpoint
 
- - "none" - message is considered "delivered" if sent to broker
- - "broker" message is considered "delivered" if acked by broker
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+  - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
+
+- Kafka endpoint 
+
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+  - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
 
 .. note:: 
 
index 9be303b857263bc2491f52c0fd1322a179843066..473a9c032d254b92451bc199fbbcd3b2558c12ab 100644 (file)
@@ -11,8 +11,8 @@ events. Events are published into predefined topics. Topics can be subscribed to
 can be pulled from them. Events need to be acked. Also, events will expire and disappear
 after a period of time. 
 
-A push notification mechanism exists too, currently supporting HTTP and
-AMQP0.9.1 endpoints, on top of storing the events in Ceph. If events should only be pushed to an endpoint
+A push notification mechanism exists too, currently supporting HTTP,
+AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
 and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module. 
 
 A user can create different topics. A topic entity is defined by its user and its name. A
@@ -150,23 +150,40 @@ To update a topic, use the same command used for topic creation, with the topic
 
 ::
 
-   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=<level>][&verify-ssl=true|false]]
+   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
 
 Request parameters:
 
-- push-endpoint: URI of endpoint to send push notification to
+- push-endpoint: URI of an endpoint to send push notification to
+
+The endpoint URI may include parameters depending with the type of endpoint:
+
+- HTTP endpoint 
+
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+
+- AMQP0.9.1 endpoint
+
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+  - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
 
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+- Kafka endpoint 
 
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
- "none" - message is considered "delivered" if sent to broker
- - "broker" message is considered "delivered" if acked by broker
 - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
 
 The topic ARN in the response will have the following format:
 
@@ -317,24 +334,42 @@ Creates a new subscription.
 
 ::
 
-   PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=<level>][&verify-ssl=true|false]]
+   PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
 
 Request parameters:
 
 - topic-name: name of topic
 - push-endpoint: URI of endpoint to send push notification to
 
- - URI schema is: ``http[s]|amqp://[<user>:<password>@]<fqdn>[:<port>][/<amqp-vhost>]``
- - Same schema is used for HTTP and AMQP endpoints (except amqp-vhost which is specific to AMQP)
- - Default values for HTTP/S: no user/password, port 80/443
- - Default values for AMQP: user/password=guest/guest, port 5672, amqp-vhost is "/"
+The endpoint URI may include parameters depending with the type of endpoint:
+
+- HTTP endpoint 
+
+ - URI: ``http[s]://<fqdn>[:<port]``
+ - port defaults to: 80/443 for HTTP/S accordingly
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+
+- AMQP0.9.1 endpoint
+
+ - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - user/password defaults to : guest/guest
+ - port defaults to: 5672
+ - vhost defaults to: "/"
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+
+  - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
+
+- Kafka endpoint 
+
+ - URI: ``kafka://<fqdn>[:<port]``
+ - port defaults to: 9092
+ - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
-- verify-ssl: can be used with https endpoints (ignored for other endpoints), indicate whether the server certificate is validated or not ("true" by default)
-- amqp-exchange: mandatory parameter for AMQP endpoint. The exchanges must exist and be able to route messages based on topics
-- amqp-ack-level: No end2end acking is required, as messages may persist in the broker before delivered into their final destination. 2 ack methods exist:
+  - "none": message is considered "delivered" if sent to broker
+  - "broker": message is considered "delivered" if acked by broker (default)
 
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker
 
 Get Subscription Information
 ````````````````````````````
index 4bd92e5db9281d37218ea4c50c35aeb2045c8a38..b9ca57d57dacee5080c3099004bb6d2873daaeef 100644 (file)
 /* Defined if rabbitmq-c is available for rgw amqp push endpoint */
 #cmakedefine WITH_RADOSGW_AMQP_ENDPOINT
 
+/* Defined if libedkafka is available for rgw kafka push endpoint */
+#cmakedefine WITH_RADOSGW_KAFKA_ENDPOINT
+
 /* Defined if std::map::merge() is supported */
 #cmakedefine HAVE_STDLIB_MAP_SPLICING
 
index c2de6a4d818bbadd48229bc44233bea76477053d..0a28f28933653463aa233af22a1a1b8c12d1a56a 100644 (file)
@@ -147,6 +147,9 @@ set(librgw_common_srcs
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   list(APPEND librgw_common_srcs rgw_amqp.cc)
 endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  list(APPEND librgw_common_srcs rgw_kafka.cc)
+endif()
 
 add_library(rgw_common OBJECT ${librgw_common_srcs})
 
@@ -210,6 +213,9 @@ target_include_directories(rgw_a SYSTEM PUBLIC "../rapidjson/include")
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   find_package(RabbitMQ REQUIRED)
 endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  find_package(RDKafka REQUIRED)
+endif()
 
 target_link_libraries(rgw_a
   PRIVATE
@@ -235,6 +241,10 @@ if(WITH_RADOSGW_AMQP_ENDPOINT)
   # used by rgw_amqp.cc
   list(APPEND rgw_libs RabbitMQ::RabbitMQ)
 endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  # used by rgw_kafka.cc
+  list(APPEND rgw_libs RDKafka::RDKafka)
+endif()
 
 set(rgw_schedulers_srcs
   rgw_dmclock_scheduler_ctx.cc
@@ -348,6 +358,10 @@ if(WITH_RADOSGW_AMQP_ENDPOINT)
   target_link_libraries(rgw PRIVATE RabbitMQ::RabbitMQ)
 endif()
 
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  target_link_libraries(rgw PRIVATE RDKafka::RDKafka)
+endif()
+
 set_target_properties(rgw PROPERTIES OUTPUT_NAME rgw VERSION 2.0.0
   SOVERSION 2)
 install(TARGETS rgw DESTINATION ${CMAKE_INSTALL_LIBDIR})
@@ -384,6 +398,9 @@ install(TARGETS rgw_admin_user DESTINATION ${CMAKE_INSTALL_LIBDIR})
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   target_link_libraries(rgw_admin_user PRIVATE RabbitMQ::RabbitMQ)
 endif()
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  target_link_libraries(rgw_admin_user PRIVATE RDKafka::RDKafka)
+endif()
 if(WITH_BOOST_CONTEXT)
   target_link_libraries(rgw_admin_user PRIVATE Boost::coroutine Boost::context)
 endif()
index c013ca88202a79373c4b66265e16c87bffb15a5f..a5f6ca00d84f411f257607bcf6980ea01864fa22 100644 (file)
@@ -1,7 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab ft=cpp
 
-#include "include/compat.h"
 #include "rgw_amqp.h"
 #include <amqp.h>
 #include <amqp_tcp_socket.h>
@@ -50,9 +49,9 @@ static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
 
 // key class for the connection list
 struct connection_id_t {
-  std::string host;
-  int port;
-  std::string vhost;
+  const std::string host;
+  const int port;
+  const std::string vhost;
   // constructed from amqp_connection_info struct
   connection_id_t(const amqp_connection_info& info) 
     : host(info.host), port(info.port), vhost(info.vhost) {}
@@ -73,7 +72,7 @@ struct connection_id_t {
 };
 
 std::string to_string(const connection_id_t& id) {
-    return id.host+":"+"/"+id.vhost;
+    return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
 }
 
 // connection_t state cleaner
@@ -633,7 +632,7 @@ private:
       {
         // thread safe access to the connection list
         // once the iterators are fetched they are guaranteed to remain valid
-        std::lock_guard<std::mutex> lock(connections_lock);
+        std::lock_guard lock(connections_lock);
         conn_it = connections.begin();
         end_it = connections.end();
       }
@@ -646,7 +645,7 @@ private:
         if (conn->marked_for_deletion) {
           ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
           conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
-          std::lock_guard<std::mutex> lock(connections_lock);
+          std::lock_guard lock(connections_lock);
           // erase is safe - does not invalidate any other iterator
           // lock so no insertion happens at the same time
           ERASE_AND_CONTINUE(conn_it, connections);
@@ -847,7 +846,7 @@ public:
     }
 
     const connection_id_t id(info);
-    std::lock_guard<std::mutex> lock(connections_lock);
+    std::lock_guard lock(connections_lock);
     const auto it = connections.find(id);
     if (it != connections.end()) {
       if (it->second->marked_for_deletion) {
@@ -931,7 +930,7 @@ public:
   // get the number of in-flight messages
   size_t get_inflight() const {
     size_t sum = 0;
-    std::lock_guard<std::mutex> lock(connections_lock);
+    std::lock_guard lock(connections_lock);
     std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
         sum += conn_pair.second->callbacks.size();
       });
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
new file mode 100644 (file)
index 0000000..b1b9749
--- /dev/null
@@ -0,0 +1,677 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+//#include "include/compat.h"
+#include "rgw_kafka.h"
+#include <librdkafka/rdkafka.h>
+#include "include/ceph_assert.h"
+#include <sstream>
+#include <cstring>
+#include <regex>
+#include <unordered_map>
+#include <string>
+#include <vector>
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <boost/lockfree/queue.hpp>
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+// TODO investigation, not necessarily issues:
+// (1) in case of single threaded writer context use spsc_queue
+// (2) check performance of emptying queue to local list, and go over the list and publish
+// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
+
+// cmparisson operator between topic pointer and name
+bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
+    return name == std::string_view(rd_kafka_topic_name(rkt)); 
+}
+
+namespace rgw::kafka {
+
+// status codes for publishing
+static const int STATUS_CONNECTION_CLOSED =      -0x1002;
+static const int STATUS_QUEUE_FULL =             -0x1003;
+static const int STATUS_MAX_INFLIGHT =           -0x1004;
+static const int STATUS_MANAGER_STOPPED =        -0x1005;
+// status code for connection opening
+static const int STATUS_CONF_ALLOC_FAILED =      -0x2001;
+static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002;
+static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003;
+
+static const int STATUS_CREATE_TOPIC_FAILED =    -0x3008;
+static const int NO_REPLY_CODE =                 0x0;
+static const int STATUS_OK =                     0x0;
+
+// struct for holding the callback and its tag in the callback list
+struct reply_callback_with_tag_t {
+  uint64_t tag;
+  reply_callback_t cb;
+  
+  reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
+  
+  bool operator==(uint64_t rhs) {
+    return tag == rhs;
+  }
+};
+
+typedef std::vector<reply_callback_with_tag_t> CallbackList;
+
+// struct for holding the connection state object as well as list of topics
+// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
+// since references to deleted objects may still exist in the calling code
+struct connection_t {
+  rd_kafka_t* producer = nullptr;
+  rd_kafka_conf_t* temp_conf = nullptr;
+  std::vector<rd_kafka_topic_t*> topics;
+  bool marked_for_deletion = false;
+  uint64_t delivery_tag = 1;
+  int status;
+  mutable std::atomic<int> ref_count = 0;
+  CephContext* cct = nullptr;
+  CallbackList callbacks;
+
+  // cleanup of all internal connection resource
+  // the object can still remain, and internal connection
+  // resources created again on successful reconnection
+  void destroy(int s) {
+    status = s;
+    // destroy temporary conf (if connection was never established)
+    if (temp_conf) {
+        rd_kafka_conf_destroy(temp_conf);
+        return;
+    }
+    // wait for all remaining acks/nacks
+    rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
+    // destroy all topics
+    std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
+    // destroy producer
+    rd_kafka_destroy(producer);
+    // fire all remaining callbacks (if not fired by rd_kafka_flush)
+    std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+        cb_tag.cb(status);
+        ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+      });
+    callbacks.clear();
+    delivery_tag = 1;
+  }
+
+  bool is_ok() const {
+    return (producer != nullptr && !marked_for_deletion);
+  }
+
+  // dtor also destroys the internals
+  ~connection_t() {
+    destroy(STATUS_CONNECTION_CLOSED);
+  }
+
+  friend void intrusive_ptr_add_ref(const connection_t* p);
+  friend void intrusive_ptr_release(const connection_t* p);
+};
+
+// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
+void intrusive_ptr_add_ref(const connection_t* p) {
+  ++p->ref_count;
+}
+void intrusive_ptr_release(const connection_t* p) {
+  if (--p->ref_count == 0) {
+    delete p;
+  }
+}
+
+// convert int status to string - including RGW specific values
+std::string status_to_string(int s) {
+  switch (s) {
+    case STATUS_CONNECTION_CLOSED:
+      return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
+    case STATUS_QUEUE_FULL:
+      return "RGW_KAFKA_STATUS_QUEUE_FULL";
+    case STATUS_MAX_INFLIGHT:
+      return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
+    case STATUS_MANAGER_STOPPED:
+      return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
+    case STATUS_CONF_ALLOC_FAILED:
+      return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+    case STATUS_CREATE_PRODUCER_FAILED:
+      return "STATUS_CREATE_PRODUCER_FAILED";
+    case STATUS_CREATE_TOPIC_FAILED:
+      return "STATUS_CREATE_TOPIC_FAILED";
+  }
+  // TODO: how to handle "s" in this case? 
+  return std::string(rd_kafka_err2str(rd_kafka_last_error()));
+}
+
+void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
+  ceph_assert(opaque);
+
+  const auto conn = reinterpret_cast<connection_t*>(opaque);
+  const auto result = rkmessage->err;
+
+  if (!rkmessage->_private) {
+    ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
+    return;  
+  }
+
+  const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private);
+  const auto& callbacks_end = conn->callbacks.end();
+  const auto& callbacks_begin = conn->callbacks.begin();
+  const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
+  if (tag_it != callbacks_end) {
+      ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << 
+          *tag << " and result=" << result << dendl;
+      tag_it->cb(result);
+      conn->callbacks.erase(tag_it);
+  } else {
+    // TODO add counter for acks with no callback
+    ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" << 
+        *tag << dendl;
+  }
+  delete tag;
+  // rkmessage is destroyed automatically by librdkafka
+}
+
+// utility function to create a connection, when the connection object already exists
+connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) {
+  // pointer must be valid and not marked for deletion
+  ceph_assert(conn && !conn->marked_for_deletion);
+  
+  // reset all status codes
+  conn->status = STATUS_OK; 
+
+  char errstr[512];
+
+  conn->temp_conf = rd_kafka_conf_new();
+  if (!conn->temp_conf) {
+    conn->status = STATUS_CONF_ALLOC_FAILED;
+    return conn;
+  }
+
+  // get list of brokers based on the bootsrap broker
+  if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+    conn->status = STATUS_GET_BROKER_LIST_FAILED;
+    // TODO: use errstr
+    return conn;
+  }
+
+  // set the global callback for delivery success/fail
+  rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
+
+  // set the global opaque pointer to be the connection itself
+  rd_kafka_conf_set_opaque (conn->temp_conf, conn.get());
+
+  // create the producer
+  conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
+  if (conn->producer) {
+    conn->status = STATUS_CREATE_PRODUCER_FAILED;
+    // TODO: use errstr
+    return conn;
+  }
+
+  // conf ownership passed to producer
+  conn->temp_conf = nullptr;
+  return conn;
+}
+
+
+// utility function to create a new connection
+connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) { 
+  // create connection state
+  connection_ptr_t conn = new connection_t;
+  conn->cct = cct;
+  return create_connection(conn, broker);
+}
+
+/// struct used for holding messages in the message queue
+struct message_wrapper_t {
+  connection_ptr_t conn; 
+  std::string topic;
+  std::string message;
+  reply_callback_t cb;
+  
+  message_wrapper_t(connection_ptr_t& _conn,
+      const std::string& _topic,
+      const std::string& _message,
+      reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+};
+
+// parse a URL of the form: kafka://<host>[:port]
+// to a: host[:port]
+int parse_url(const std::string& url, std::string& broker) {
+  std::regex url_regex (
+    R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)",
+    std::regex::extended
+  );
+  const auto HOST_AND_PORT = 4;
+  std::smatch url_match_result;
+  if (std::regex_match(url, url_match_result, url_regex)) {
+    broker = url_match_result[HOST_AND_PORT];
+       return 0;
+  }
+  return -1;
+}
+
+typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
+typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
+
+// macros used inside a loop where an iterator is either incremented or erased
+#define INCREMENT_AND_CONTINUE(IT) \
+          ++IT; \
+          continue;
+
+#define ERASE_AND_CONTINUE(IT,CONTAINER) \
+          IT=CONTAINER.erase(IT); \
+          --connection_count; \
+          continue;
+
+class Manager {
+public:
+  const size_t max_connections;
+  const size_t max_inflight;
+  const size_t max_queue;
+private:
+  std::atomic<size_t> connection_count;
+  bool stopped;
+  int read_timeout_ms;
+  ConnectionList connections;
+  MessageQueue messages;
+  std::atomic<size_t> queued;
+  std::atomic<size_t> dequeued;
+  CephContext* const cct;
+  mutable std::mutex connections_lock;
+  std::thread runner;
+
+  // TODO use rd_kafka_produce_batch for better performance
+  void publish_internal(message_wrapper_t* message) {
+    const std::unique_ptr<message_wrapper_t> msg_owner(message);
+    auto& conn = message->conn;
+
+    if (!conn->is_ok()) {
+      // connection had an issue while message was in the queue
+      // TODO add error stats
+      ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl;
+      if (message->cb) {
+        message->cb(STATUS_CONNECTION_CLOSED);
+      }
+      return;
+    }
+
+    // create a new topic unless it was already created
+    auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
+    rd_kafka_topic_t* topic = nullptr;
+    if (topic_it == conn->topics.end()) {
+      topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+      if (!topic) {
+        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl;
+        if (message->cb) {
+          message->cb(STATUS_CREATE_TOPIC_FAILED);
+        }
+        conn->destroy(STATUS_CREATE_TOPIC_FAILED);
+        return;
+      }
+      // TODO use the topics list as an LRU cache
+      conn->topics.push_back(topic);
+      ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
+    } else {
+        topic = *topic_it;
+        ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
+    }
+
+    const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
+    const auto rc = rd_kafka_produce(
+            topic,
+            // TODO: non builtin partitioning
+            RD_KAFKA_PARTITION_UA,
+            // make a copy of the payload
+            // so it is safe to pass the pointer from the string
+            RD_KAFKA_MSG_F_COPY,
+            message->message.data(),
+            message->message.length(),
+            // optional key and its length
+            nullptr, 
+            0,
+            // opaque data: tag, used in the global callback
+            // in order to invoke the real callback
+            // null if no callback exists
+            tag);
+    if (rc == -1) {
+      const auto err = rd_kafka_last_error();
+      ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+      // TODO: dont error on full queue, retry instead
+      // immediatly invoke callback on error if needed
+      if (message->cb) {
+        message->cb(err);
+      }
+      delete tag;
+    }
+   
+    if (tag) {
+      auto const q_len = conn->callbacks.size();
+      if (q_len < max_inflight) {
+        ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+        conn->callbacks.emplace_back(*tag, message->cb);
+      } else {
+        // immediately invoke callback with error
+        ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
+        message->cb(STATUS_MAX_INFLIGHT);
+        // tag will be deleted when the global callback is invoked
+      }
+    } else {
+        ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl;
+    }
+  }
+
+  // the managers thread:
+  // (1) empty the queue of messages to be published
+  // (2) loop over all connections and read acks
+  // (3) manages deleted connections
+  // (4) TODO reconnect on connection errors
+  // (5) TODO cleanup timedout callbacks
+  void run() {
+    while (!stopped) {
+
+      // publish all messages in the queue
+      auto event_count = 0U;
+      const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+      dequeued += count;
+      ConnectionList::iterator conn_it;
+      ConnectionList::const_iterator end_it;
+      {
+        // thread safe access to the connection list
+        // once the iterators are fetched they are guaranteed to remain valid
+        std::lock_guard lock(connections_lock);
+        conn_it = connections.begin();
+        end_it = connections.end();
+      }
+      // loop over all connections to read acks
+      for (;conn_it != end_it;) {
+        
+        auto& conn = conn_it->second;
+        // delete the connection if marked for deletion
+        if (conn->marked_for_deletion) {
+          ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
+          conn->destroy(STATUS_CONNECTION_CLOSED);
+          std::lock_guard lock(connections_lock);
+          // erase is safe - does not invalidate any other iterator
+          // lock so no insertion happens at the same time
+          ERASE_AND_CONTINUE(conn_it, connections);
+        }
+
+        // try to reconnect the connection if it has an error
+        if (!conn->is_ok()) {
+          const auto& broker = conn_it->first;
+          ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
+          if (create_connection(conn, broker)->is_ok() == false) {
+            ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
+            // TODO: add error counter for failed retries
+            // TODO: add exponential backoff for retries
+          } else {
+            ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
+          }
+          INCREMENT_AND_CONTINUE(conn_it);
+        }
+
+        event_count += rd_kafka_poll(conn->producer, read_timeout_ms);
+
+        // just increment the iterator
+        ++conn_it;
+      }
+      // if no messages were received or published
+      // across all connection, sleep for 100ms
+      if (count == 0 && event_count) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+      }
+    }
+  }
+
+  // used in the dtor for message cleanup
+  static void delete_message(const message_wrapper_t* message) {
+    delete message;
+  }
+
+public:
+  Manager(size_t _max_connections,
+      size_t _max_inflight,
+      size_t _max_queue, 
+      int _read_timeout_ms,
+      CephContext* _cct) : 
+    max_connections(_max_connections),
+    max_inflight(_max_inflight),
+    max_queue(_max_queue),
+    connection_count(0),
+    stopped(false),
+    read_timeout_ms(_read_timeout_ms),
+    connections(_max_connections),
+    messages(max_queue),
+    queued(0),
+    dequeued(0),
+    cct(_cct),
+    runner(&Manager::run, this) {
+      // The hashmap has "max connections" as the initial number of buckets, 
+      // and allows for 10 collisions per bucket before rehash.
+      // This is to prevent rehashing so that iterators are not invalidated 
+      // when a new connection is added.
+      connections.max_load_factor(10.0);
+      // give the runner thread a name for easier debugging
+      const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager");
+      ceph_assert(rc==0);
+  }
+
+  // non copyable
+  Manager(const Manager&) = delete;
+  const Manager& operator=(const Manager&) = delete;
+
+  // stop the main thread
+  void stop() {
+    stopped = true;
+  }
+
+  // disconnect from a broker
+  bool disconnect(connection_ptr_t& conn) {
+    if (!conn || stopped) {
+      return false;
+    }
+    conn->marked_for_deletion = true;
+    return true;
+  }
+
+  // connect to a broker, or reuse an existing connection if already connected
+  connection_ptr_t connect(const std::string& url) {
+    if (stopped) {
+      // TODO: increment counter
+      ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
+      return nullptr;
+    }
+
+    std::string broker;
+    if (0 != parse_url(url, broker)) {
+      // TODO: increment counter
+      ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
+      return nullptr;
+    }
+
+    std::lock_guard lock(connections_lock);
+    const auto it = connections.find(broker);
+    if (it != connections.end()) {
+      if (it->second->marked_for_deletion) {
+        // TODO: increment counter
+        ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
+        return nullptr;
+      }
+      // connection found - return even if non-ok
+      ldout(cct, 20) << "Kafka connect: connection found" << dendl;
+      return it->second;
+    }
+
+    // connection not found, creating a new one
+    if (connection_count >= max_connections) {
+      // TODO: increment counter
+      ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
+      return nullptr;
+    }
+    const auto conn = create_new_connection(broker, cct);
+    // create_new_connection must always return a connection object
+    // even if error occurred during creation. 
+    // in such a case the creation will be retried in the main thread
+    ceph_assert(conn);
+    ++connection_count;
+    ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
+    ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl;
+    return connections.emplace(broker, conn).first->second;
+  }
+
+  // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
+  int publish(connection_ptr_t& conn, 
+    const std::string& topic,
+    const std::string& message) {
+    if (stopped) {
+      return STATUS_MANAGER_STOPPED;
+    }
+    if (!conn || !conn->is_ok()) {
+      return STATUS_CONNECTION_CLOSED;
+    }
+    if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+      ++queued;
+      return STATUS_OK;
+    }
+    return STATUS_QUEUE_FULL;
+  }
+  
+  int publish_with_confirm(connection_ptr_t& conn, 
+    const std::string& topic,
+    const std::string& message,
+    reply_callback_t cb) {
+    if (stopped) {
+      return STATUS_MANAGER_STOPPED;
+    }
+    if (!conn || !conn->is_ok()) {
+      return STATUS_CONNECTION_CLOSED;
+    }
+    if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+      ++queued;
+      return STATUS_OK;
+    }
+    return STATUS_QUEUE_FULL;
+  }
+
+  // dtor wait for thread to stop
+  // then connection are cleaned-up
+  ~Manager() {
+    stopped = true;
+    runner.join();
+    messages.consume_all(delete_message);
+  }
+
+  // get the number of connections
+  size_t get_connection_count() const {
+    return connection_count;
+  }
+  
+  // get the number of in-flight messages
+  size_t get_inflight() const {
+    size_t sum = 0;
+    std::lock_guard lock(connections_lock);
+    std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+        sum += conn_pair.second->callbacks.size();
+      });
+    return sum;
+  }
+
+  // running counter of the queued messages
+  size_t get_queued() const {
+    return queued;
+  }
+
+  // running counter of the dequeued messages
+  size_t get_dequeued() const {
+    return dequeued;
+  }
+};
+
+// singleton manager
+// note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
+static Manager* s_manager = nullptr;
+
+static const size_t MAX_CONNECTIONS_DEFAULT = 256;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
+static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const int READ_TIMEOUT_MS_DEFAULT = 500;
+
+bool init(CephContext* cct) {
+  if (s_manager) {
+    return false;
+  }
+  // TODO: take conf from CephContext
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
+  return true;
+}
+
+void shutdown() {
+  delete s_manager;
+  s_manager = nullptr;
+}
+
+connection_ptr_t connect(const std::string& url) {
+  if (!s_manager) return nullptr;
+  return s_manager->connect(url);
+}
+
+int publish(connection_ptr_t& conn, 
+    const std::string& topic,
+    const std::string& message) {
+  if (!s_manager) return STATUS_MANAGER_STOPPED;
+  return s_manager->publish(conn, topic, message);
+}
+
+int publish_with_confirm(connection_ptr_t& conn, 
+    const std::string& topic,
+    const std::string& message,
+    reply_callback_t cb) {
+  if (!s_manager) return STATUS_MANAGER_STOPPED;
+  return s_manager->publish_with_confirm(conn, topic, message, cb);
+}
+
+size_t get_connection_count() {
+  if (!s_manager) return 0;
+  return s_manager->get_connection_count();
+}
+  
+size_t get_inflight() {
+  if (!s_manager) return 0;
+  return s_manager->get_inflight();
+}
+
+size_t get_queued() {
+  if (!s_manager) return 0;
+  return s_manager->get_queued();
+}
+
+size_t get_dequeued() {
+  if (!s_manager) return 0;
+  return s_manager->get_dequeued();
+}
+
+size_t get_max_connections() {
+  if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
+  return s_manager->max_connections;
+}
+
+size_t get_max_inflight() {
+  if (!s_manager) return MAX_INFLIGHT_DEFAULT;
+  return s_manager->max_inflight;
+}
+
+size_t get_max_queue() {
+  if (!s_manager) return MAX_QUEUE_DEFAULT;
+  return s_manager->max_queue;
+}
+
+bool disconnect(connection_ptr_t& conn) {
+  if (!s_manager) return false;
+  return s_manager->disconnect(conn);
+}
+
+} // namespace kafka
+
diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h
new file mode 100644 (file)
index 0000000..7319679
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+#include <functional>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
+class CephContext;
+
+namespace rgw::kafka {
+// forward declaration of connection object
+struct connection_t;
+
+typedef boost::intrusive_ptr<connection_t> connection_ptr_t;
+
+// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr
+void intrusive_ptr_add_ref(const connection_t* p);
+void intrusive_ptr_release(const connection_t* p);
+
+// the reply callback is expected to get an integer parameter
+// indicating the result, and not to return anything
+typedef std::function<void(int)> reply_callback_t;
+
+// initialize the kafka manager
+bool init(CephContext* cct);
+
+// shutdown the kafka manager
+void shutdown();
+
+// connect to a kafka endpoint
+connection_ptr_t connect(const std::string& url);
+
+// publish a message over a connection that was already created
+int publish(connection_ptr_t& conn,
+    const std::string& topic,
+    const std::string& message);
+
+// publish a message over a connection that was already created
+// and pass a callback that will be invoked (async) when broker confirms
+// receiving the message
+int publish_with_confirm(connection_ptr_t& conn, 
+    const std::string& topic,
+    const std::string& message,
+    reply_callback_t cb);
+
+// convert the integer status returned from the "publish" function to a string
+std::string status_to_string(int s);
+
+// number of connections
+size_t get_connection_count();
+  
+// return the number of messages that were sent
+// to broker, but were not yet acked/nacked/timedout
+size_t get_inflight();
+
+// running counter of successfully queued messages
+size_t get_queued();
+
+// running counter of dequeued messages
+size_t get_dequeued();
+
+// number of maximum allowed connections
+size_t get_max_connections();
+
+// number of maximum allowed inflight messages
+size_t get_max_inflight();
+
+// maximum number of messages in the queue
+size_t get_max_queue();
+
+// disconnect from a kafka broker
+bool disconnect(connection_ptr_t& conn);
+
+}
+
index d8afd86cdf1150111a40b75eb8e117ba7ea1258d..e348cf7b6cc00f0f59811e36f90b7afe716f26b2 100644 (file)
@@ -40,6 +40,9 @@
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 #include "rgw_amqp.h"
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
 #if defined(WITH_RADOSGW_BEAST_FRONTEND)
 #include "rgw_asio_frontend.h"
 #endif /* WITH_RADOSGW_BEAST_FRONTEND */
@@ -375,6 +378,11 @@ int main(int argc, const char **argv)
     if (!rgw::amqp::init(cct.get())) {
         dout(1) << "ERROR: failed to initialize AMQP manager" << dendl;
     }
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+    if (!rgw::kafka::init(cct.get())) {
+        dout(1) << "ERROR: failed to initialize Kafka manager" << dendl;
+    }
 #endif
   }
 
@@ -607,6 +615,9 @@ int main(int argc, const char **argv)
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   rgw::amqp::shutdown();
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  rgw::kafka::shutdown();
+#endif
 
   rgw_perf_stop(g_ceph_context);
 
index 44292a3ce607b491902ba479218d5809e655e3b4..cef5ae6318f2890aa0b0deb2c33ed2c6a131ae17 100644 (file)
@@ -15,6 +15,9 @@
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 #include "rgw_amqp.h"
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
 #include <boost/asio/yield.hpp>
 #include <boost/algorithm/string.hpp>
 #include <functional>
@@ -152,10 +155,10 @@ public:
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
 private:
-  enum ack_level_t {
-    ACK_LEVEL_NONE,
-    ACK_LEVEL_BROKER,
-    ACK_LEVEL_ROUTEABLE
+  enum class ack_level_t {
+    None,
+    Broker,
+    Routable
   };
   CephContext* const cct;
   const std::string endpoint;
@@ -283,11 +286,11 @@ public:
     str_ack_level = args.get("amqp-ack-level", &exists);
     if (!exists || str_ack_level == "broker") {
       // "broker" is default
-      ack_level = ACK_LEVEL_BROKER;
+      ack_level = ack_level_t::Broker;
     } else if (str_ack_level == "none") {
-      ack_level = ACK_LEVEL_NONE;
+      ack_level = ack_level_t::None;
     } else if (str_ack_level == "routable") {
-      ack_level = ACK_LEVEL_ROUTEABLE;
+      ack_level = ack_level_t::Routable;
     } else {
       throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
     }
@@ -295,7 +298,7 @@ public:
 
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
     ceph_assert(conn);
-    if (ack_level == ACK_LEVEL_NONE) {
+    if (ack_level == ack_level_t::None) {
       return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags
@@ -306,7 +309,7 @@ public:
   
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
     ceph_assert(conn);
-    if (ack_level == ACK_LEVEL_NONE) {
+    if (ack_level == ack_level_t::None) {
       return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags
@@ -373,7 +376,7 @@ public:
 
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
     ceph_assert(conn);
-    if (ack_level == ACK_LEVEL_NONE) {
+    if (ack_level == ack_level_t::None) {
       return amqp::publish(conn, topic, json_format_pubsub_event(record));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
@@ -406,6 +409,239 @@ static const std::string AMQP_1_0("1-0");
 static const std::string AMQP_SCHEMA("amqp");
 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
 
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
+private:
+  enum class ack_level_t {
+    None,
+    Broker,
+  };
+  CephContext* const cct;
+  const std::string endpoint;
+  const std::string topic;
+  kafka::connection_ptr_t conn;
+  ack_level_t ack_level;
+  std::string str_ack_level;
+
+  // NoAckPublishCR implements async kafka publishing via coroutine
+  // This coroutine ends when it send the message and does not wait for an ack
+  class NoAckPublishCR : public RGWCoroutine {
+  private:
+    const std::string topic;
+    kafka::connection_ptr_t conn;
+    const std::string message;
+
+  public:
+    NoAckPublishCR(CephContext* cct,
+              const std::string& _topic,
+              kafka::connection_ptr_t& _conn,
+              const std::string& _message) :
+      RGWCoroutine(cct),
+      topic(_topic), conn(_conn), message(_message) {}
+
+    // send message to endpoint, without waiting for reply
+    int operate() override {
+      reenter(this) {
+        const auto rc = kafka::publish(conn, topic, message);
+        if (rc < 0) {
+          return set_cr_error(rc);
+        }
+        return set_cr_done();
+      }
+      return 0;
+    }
+  };
+
+  // AckPublishCR implements async kafka publishing via coroutine
+  // This coroutine ends when an ack is received from the borker 
+  // note that it does not wait for an ack fron the end client
+  class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
+  private:
+    const std::string topic;
+    kafka::connection_ptr_t conn;
+    const std::string message;
+
+  public:
+    AckPublishCR(CephContext* cct,
+              const std::string& _topic,
+              kafka::connection_ptr_t& _conn,
+              const std::string& _message) :
+      RGWCoroutine(cct),
+      topic(_topic), conn(_conn), message(_message) {}
+
+    // send message to endpoint, waiting for reply
+    int operate() override {
+      reenter(this) {
+        yield {
+          init_new_io(this);
+          const auto rc = kafka::publish_with_confirm(conn, 
+              topic,
+              message,
+              std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
+          if (rc < 0) {
+            // failed to publish, does not wait for reply
+            return set_cr_error(rc);
+          }
+          // mark as blocked on the kafka answer
+          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+          io_block();
+          return 0;
+        }
+        return set_cr_done();
+      }
+      return 0;
+    }
+
+    // callback invoked from the kafka manager thread when ack/nack is received
+    void request_complete(int status) {
+      ceph_assert(!is_done());
+      if (status != 0) {
+        // server replied with a nack
+        set_cr_error(status);
+      }
+      io_complete();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+    }
+   
+    // TODO: why are these mandatory in RGWIOProvider?
+    void set_io_user_info(void *_user_info) override {
+    }
+
+    void *get_io_user_info() override {
+      return nullptr;
+    }
+  };
+
+public:
+  RGWPubSubKafkaEndpoint(const std::string& _endpoint,
+      const std::string& _topic,
+      const RGWHTTPArgs& args,
+      CephContext* _cct) : 
+        cct(_cct),
+        endpoint(_endpoint), 
+        topic(_topic),
+        conn(kafka::connect(endpoint)) {
+    if (!conn) { 
+      throw configuration_error("Kafka: failed to create connection to: " + endpoint);
+    }
+    bool exists;
+    // get ack level
+    str_ack_level = args.get("kafka-ack-level", &exists);
+    if (!exists || str_ack_level == "broker") {
+      // "broker" is default
+      ack_level = ack_level_t::Broker;
+    } else if (str_ack_level == "none") {
+      ack_level = ack_level_t::None;
+    } else {
+      throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+    }
+  }
+
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
+    if (ack_level == ack_level_t::None) {
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+    } else {
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+    }
+  }
+  
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
+    if (ack_level == ack_level_t::None) {
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+    } else {
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+    }
+  }
+
+  // this allows waiting untill "finish()" is called from a different thread
+  // waiting could be blocking the waiting thread or yielding, depending
+  // with compilation flag support and whether the optional_yield is set
+  class Waiter {
+    using Signature = void(boost::system::error_code);
+    using Completion = ceph::async::Completion<Signature>;
+    std::unique_ptr<Completion> completion = nullptr;
+    int ret;
+
+    mutable std::atomic<bool> done = false;
+    mutable std::mutex lock;
+    mutable std::condition_variable cond;
+
+    template <typename ExecutionContext, typename CompletionToken>
+    auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+      boost::asio::async_completion<CompletionToken, Signature> init(token);
+      auto& handler = init.completion_handler;
+      {
+        std::unique_lock l{lock};
+        completion = Completion::create(ctx.get_executor(), std::move(handler));
+      }
+      return init.result.get();
+    }
+
+  public:
+    int wait(optional_yield y) {
+      if (done) {
+        return ret;
+      }
+#ifdef HAVE_BOOST_CONTEXT
+      if (y) {
+        auto& io_ctx = y.get_io_context();
+        auto& yield_ctx = y.get_yield_context();
+        boost::system::error_code ec;
+        async_wait(io_ctx, yield_ctx[ec]);
+        return -ec.value();
+      }
+#endif
+      std::unique_lock l(lock);
+      cond.wait(l, [this]{return (done==true);});
+      return ret;
+    }
+
+    void finish(int r) {
+      std::unique_lock l{lock};
+      ret = r;
+      done = true;
+      if (completion) {
+        boost::system::error_code ec(-ret, boost::system::system_category());
+        Completion::post(std::move(completion), ec);
+      } else {
+        cond.notify_all();
+      }
+    }
+  };
+
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+    ceph_assert(conn);
+    if (ack_level == ack_level_t::None) {
+      return kafka::publish(conn, topic, json_format_pubsub_event(record));
+    } else {
+      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+      auto w = std::unique_ptr<Waiter>(new Waiter);
+      const auto rc = kafka::publish_with_confirm(conn, 
+        topic,
+        json_format_pubsub_event(record),
+        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+      if (rc < 0) {
+        // failed to publish, does not wait for reply
+        return rc;
+      }
+      return w->wait(y);
+    }
+  }
+
+  std::string to_str() const override {
+    std::string str("Kafka Endpoint");
+    str += "\nURI: " + endpoint;
+    str += "\nTopic: " + topic;
+    str += "\nAck Level: " + str_ack_level;
+    return str;
+  }
+};
+
+static const std::string KAFKA_SCHEMA("kafka");
+#endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+
 static const std::string WEBHOOK_SCHEMA("webhook");
 static const std::string UNKNOWN_SCHEMA("unknown");
 static const std::string NO_SCHEMA("");
@@ -424,6 +660,10 @@ const std::string& get_schema(const std::string& endpoint) {
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   } else if (schema == "amqp") {
     return AMQP_SCHEMA;
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  } else if (schema == "kafka") {
+    return KAFKA_SCHEMA;
 #endif
   }
   return UNKNOWN_SCHEMA;
@@ -455,6 +695,10 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
   } else if (schema == "amqps") {
     throw configuration_error("AMQP: ssl not supported");
     return nullptr;
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  } else if (schema == KAFKA_SCHEMA) {
+      return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
 #endif
   }
 
index 5d2c1482f5ce19a747361e722370b384fbe6c74b..4ad59837d4a562ba8361cab8be3b5134fbf75256 100644 (file)
@@ -20,6 +20,9 @@
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 #include "rgw_amqp.h"
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
 
 #include <boost/algorithm/hex.hpp>
 #include <boost/asio/yield.hpp>
@@ -1552,12 +1555,20 @@ RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFor
     ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
   }
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  if (!rgw::kafka::init(cct)) {
+    ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
+  }
+#endif
 }
 
 RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   rgw::amqp::shutdown();
 #endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  rgw::kafka::shutdown();
+#endif
 }
 
 RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
index 19ba0d53a2eba1804af6e68615eca10af94e5df8..c9bfe9399398d2246576969aed1ca3ef8f2c2a67 100644 (file)
@@ -57,6 +57,9 @@ if(WITH_RADOSGW)
   if(WITH_RADOSGW_AMQP_ENDPOINT)
     list(APPEND rgw_libs amqp_mock)
   endif()
+  if(WITH_RADOSGW_KAFKA_ENDPOINT)
+    list(APPEND rgw_libs kafka_stub)
+  endif()
   add_subdirectory(rgw)
 endif(WITH_RADOSGW)
 if(WITH_RBD)
index 4f068d6bbb449f420614d486dd25a93b44477411..4093e4b01d129e263245dc001c7fc398a439c1aa 100644 (file)
@@ -6,6 +6,13 @@ if(WITH_RADOSGW_AMQP_ENDPOINT)
   add_library(amqp_mock STATIC ${amqp_mock_src})
 endif()
 
+if(WITH_RADOSGW_KAFKA_ENDPOINT)
+  # kafka stub library
+  set(kafka_stub_src
+    kafka_stub.cc)
+  add_library(kafka_stub STATIC ${kafka_stub_src})
+endif()
+
 #unittest_rgw_bencode
 add_executable(unittest_rgw_bencode test_rgw_bencode.cc)
 add_ceph_unittest(unittest_rgw_bencode)
diff --git a/src/test/rgw/kafka_stub.cc b/src/test/rgw/kafka_stub.cc
new file mode 100644 (file)
index 0000000..6125a94
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <librdkafka/rdkafka.h>
+
+const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt) {
+    return "";
+}
+
+rd_kafka_resp_err_t rd_kafka_last_error() {
+    return rd_kafka_resp_err_t();
+}
+
+const char *rd_kafka_err2str(rd_kafka_resp_err_t err) {
+    return "";
+}
+
+rd_kafka_conf_t *rd_kafka_conf_new() {
+    return nullptr;
+}
+
+rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
+                                      const char *name,
+                                      const char *value,
+                                      char *errstr, size_t errstr_size) {
+    return rd_kafka_conf_res_t();
+}
+
+void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
+                                  void (*dr_msg_cb) (rd_kafka_t *rk,
+                                                     const rd_kafka_message_t *
+                                                     rkmessage,
+                                                     void *opaque)) {}
+
+void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque) {}
+
+rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
+                      char *errstr, size_t errstr_size) {
+    return nullptr;
+}
+
+void rd_kafka_conf_destroy(rd_kafka_conf_t *conf) {}
+
+rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
+    return rd_kafka_resp_err_t();
+}
+
+void rd_kafka_destroy(rd_kafka_t *rk) {}
+
+rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
+                              rd_kafka_topic_conf_t *conf) {
+    return nullptr;
+}
+
+int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
+                     int msgflags,
+                     void *payload, size_t len,
+                     const void *key, size_t keylen,
+                     void *msg_opaque) {
+    return 0;
+}
+
+int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
+    return 0;
+}
+
+void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt) {}
+
index b2bdf3287c41953fa58a3d3422cde5b07a9d5580..ebac97d3dce047000fa1b359975ab99701fc60e2 100644 (file)
@@ -344,6 +344,131 @@ def clean_rabbitmq(proc): #, data_dir, log_dir)
     #    log.info('rabbitmq directories already removed')
 
 
+# Kafka endpoint functions
+
+kafka_server = 'localhost'
+
+class KafkaReceiver(object):
+    """class for receiving and storing messages on a topic from the kafka broker"""
+    def __init__(self, topic):
+        from kafka import KafkaConsumer
+        remaining_retries = 10
+        while remaining_retries > 0:
+            try:
+                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server)
+                print('Kafka consumer created on topic: '+topic)
+                break
+            except Exception as error:
+                remaining_retries -= 1
+                print('failed to connect to kafka (remaining retries '
+                    + str(remaining_retries) + '): ' + str(error))
+                time.sleep(1)
+
+        if remaining_retries == 0:
+            raise Exception('failed to connect to kafka - no retries left')
+
+        self.events = []
+        self.topic = topic
+        self.stop = False
+
+    def verify_s3_events(self, keys, exact_match=False, deletions=False):
+        """verify stored s3 records agains a list of keys"""
+        verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+        self.events = []
+
+
+def kafka_receiver_thread_runner(receiver):
+    """main thread function for the kafka receiver"""
+    try:
+        log.info('Kafka receiver started')
+        print('Kafka receiver started')
+        for msg in receiver.consumer:
+            if receiver.stop:
+                break
+            receiver.events.append(json.loads(msg.value))
+        log.info('Kafka receiver ended')
+        print('Kafka receiver ended')
+    except Exception as error:
+        log.info('Kafka receiver ended unexpectedly: %s', str(error))
+        print('Kafka receiver ended unexpectedly: ' + str(error))
+
+
+def create_kafka_receiver_thread(topic):
+    """create kafka receiver and thread"""
+    receiver = KafkaReceiver(topic)
+    task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
+    task.daemon = True
+    return task, receiver
+
+def stop_kafka_receiver(receiver, task):
+    """stop the receiver thread and wait for it to finis"""
+    receiver.stop = True
+    task.join(5)
+    try:
+        receiver.consumer.close()
+    except Exception as error:
+        log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
+
+
+def init_kafka():
+    """ start kafka/zookeeper """
+    KAFKA_DIR = os.environ['KAFKA_DIR']
+    if KAFKA_DIR == '':
+        log.info('KAFKA_DIR must be set to where kafka is installed')
+        print('KAFKA_DIR must be set to where kafka is installed')
+        return None, None, None
+    
+    DEVNULL = open(os.devnull, 'wb')
+
+    print('\nStarting zookeeper...')
+    try:
+        zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL)
+    except Exception as error:
+        log.info('failed to execute zookeeper: %s', str(error))
+        print('failed to execute zookeeper: %s' % str(error))
+        return None, None, None
+
+    time.sleep(5)
+    if zk_proc.poll() is not None:
+        print('zookeeper failed to start')
+        return None, None, None
+    print('Zookeeper started')
+    print('Starting kafka...')
+    kafka_log = open('./kafka.log', 'w')
+    try:
+        kafka_proc = subprocess.Popen([
+            KAFKA_DIR+'bin/kafka-server-start.sh', 
+            KAFKA_DIR+'config/server.properties'], 
+            stdout=kafka_log)
+    except Exception as error:
+        log.info('failed to execute kafka: %s', str(error))
+        print('failed to execute kafka: %s' % str(error))
+        zk_proc.terminate()
+        kafka_log.close()
+        return None, None, None
+
+    # TODO add kafka checkpoint instead of sleep
+    time.sleep(15)
+    if kafka_proc.poll() is not None:
+        zk_proc.terminate()
+        print('kafka failed to start. details in: ./kafka.log')
+        kafka_log.close()
+        return None, None, None
+
+    print('Kafka started')
+    return kafka_proc, zk_proc, kafka_log
+
+
+def clean_kafka(kafka_proc, zk_proc, kafka_log):
+    """ stop kafka/zookeeper """
+    try:
+        kafka_log.close()
+        kafka_proc.terminate()
+        zk_proc.terminate()
+    except:
+        log.info('kafka/zookeeper already terminated')
+
+
 def init_env(require_ps=True):
     """initialize the environment"""
     if require_ps:
@@ -1156,7 +1281,7 @@ def test_ps_s3_notification_push_amqp_on_master():
     [thr.join() for thr in client_threads] 
     
     time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    print 'average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
 
     print('wait for 5sec for the messages...')
     time.sleep(5)
@@ -1172,7 +1297,6 @@ def test_ps_s3_notification_push_amqp_on_master():
         err = 'amqp receiver 2 should have no deletions'
         assert False, err
 
-
     # cleanup
     stop_amqp_receiver(receiver1, task1)
     stop_amqp_receiver(receiver2, task2)
@@ -1184,6 +1308,171 @@ def test_ps_s3_notification_push_amqp_on_master():
     clean_rabbitmq(proc)
 
 
+def test_ps_s3_notification_push_kafka():
+    """ test pushing kafka s3 notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    kafka_proc, zk_proc, kafka_log = init_kafka()
+    if kafka_proc is  None or zk_proc is None:
+        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+
+    zones, ps_zones  = init_env(require_ps=True)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # name is constant for manual testing
+    topic_name = bucket_name+'_topic'
+    # create consumer on the topic
+    task, receiver = create_kafka_receiver_thread(topic_name)
+    task.start()
+
+    # create topic
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+                         endpoint='kafka://' + kafka_server,
+                         endpoint_args='kafka-ack-level=broker')
+    result, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                         'Events': []
+                       }]
+
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    keys = list(bucket.list())
+    receiver.verify_s3_events(keys, exact_match=True)
+
+    # delete objects from the bucket
+    client_threads = []
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+    
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    stop_kafka_receiver(receiver, task)
+    clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
+def test_ps_s3_notification_push_kafka_on_master():
+    """ test pushing kafka s3 notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    kafka_proc, zk_proc, kafka_log = init_kafka()
+    if kafka_proc is None or zk_proc is None:
+        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    # name is constant for manual testing
+    topic_name = bucket_name+'_topic'
+    # create consumer on the topic
+    task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'kafka://' + kafka_server
+    # without acks from broker
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
+    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn2 = topic_conf2.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+                         'Events': []
+                       },
+                       {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
+                         'Events': []
+                       }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print 'average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    keys = list(bucket.list())
+    receiver.verify_s3_events(keys, exact_match=True)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    time_diff = time.time() - start_time
+    print 'average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+    
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    stop_kafka_receiver(receiver, task)
+    clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     if skip_push_tests:
@@ -1252,7 +1541,7 @@ def test_ps_s3_notification_push_http_on_master():
     [thr.join() for thr in client_threads] 
     
     time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
 
     print('wait for 5sec for the messages...')
     time.sleep(5)
index 4de963ea262e86e504f752f33aa722eba8399bc1..5f6b426cae6c9e4e0d83a54cf9b369449892926f 100644 (file)
@@ -34,6 +34,10 @@ if(WITH_RADOSGW)
     list(APPEND DENCODER_EXTRALIBS
       rabbitmq)
   endif()
+  if(WITH_RADOSGW_KAFKA_ENDPOINT)
+    list(APPEND DENCODER_EXTRALIBS
+      rdkafka)
+  endif()
 endif()
 
 if(WITH_RBD)