&push-endpoint=<endpoint>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
- [&Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+ [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
+ [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
+ [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
Request parameters:
- AMQP0.9.1 endpoint
- URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to : guest/guest
+ - user/password defaults to: guest/guest
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- Kafka endpoint
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
- port defaults to: 9092
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- User: name of the user that created the topic
- Name: name of the topic
- EndPoinjtAddress: the push-endpoint URL
+- if endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
- EndPointArgs: the push-endpoint args
- EndpointTopic: the topic name that should be sent to the endpoint (mat be different than the above topic name)
- TopicArn: topic ARN
</ResponseMetadata>
</ListTopicsResponse>
+- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not
+
Notifications
~~~~~~~~~~~~~
::
- PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
+ PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
Request parameters:
- AMQP0.9.1 endpoint
- URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to : guest/guest
+ - user/password defaults to: guest/guest
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- Kafka endpoint
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
- port defaults to: 9092
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
"bucket_name":"",
"oid_prefix":"",
"push_endpoint":"",
- "push_endpoint_args":""
+ "push_endpoint_args":"",
+ "push_endpoint_topic":""
},
"arn":""
},
- dest.bucket_name: not used
- dest.oid_prefix: not used
- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
+- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
+- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
- topic.arn: topic ARN
- subs: list of subscriptions associated with this topic
GET /topics
+- if push-endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not
+
S3-Compliant Notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~
"bucket_name":"",
"oid_prefix":"",
"push_endpoint":"",
- "push_endpoint_args":""
+ "push_endpoint_args":"",
+ "push_endpoint_topic":""
}
"arn":""
},
::
- PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
+ PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
Request parameters:
- Kafka endpoint
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``ca-location`` is provided, secure connection will be used for connection with the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``ca-location``. Topic creation request will be rejected if not
- port defaults to: 9092
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
"bucket_name":"",
"oid_prefix":"",
"push_endpoint":"",
- "push_endpoint_args":""
+ "push_endpoint_args":"",
+ "push_endpoint_topic":""
}
"s3_id":""
}
- user: name of the user that created the subscription
- name: name of the subscription
- topic: name of the topic the subscription is associated with
+- dest.bucket_name: name of the bucket storing the events
+- dest.oid_prefix: oid prefix for the events stored in the bucket
+- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
+- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
+- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
+- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
+- s3_id: in case of S3-compliant notifications, this will hold the notification name that created the subscription
Delete Subscription
```````````````````
set -e
rgw_frontend=${RGW_FRONTEND:-"beast"}
-script_root=`dirname $0`
-script_root=`(cd $script_root;pwd)`
+script_root=$(dirname "$0")
+script_root=$(cd "$script_root" && pwd)
[ -z "$BUILD_DIR" ] && BUILD_DIR=build
if [ -e CMakeCache.txt ]; then
script_root=$PWD
-elif [ -e $script_root/../${BUILD_DIR}/CMakeCache.txt ]; then
- cd $script_root/../${BUILD_DIR}
+elif [ -e "$script_root"/../${BUILD_DIR}/CMakeCache.txt ]; then
+ cd "$script_root"/../${BUILD_DIR}
script_root=$PWD
fi
-ceph_bin=$script_root/bin
-vstart_path=`dirname $0`
+#ceph_bin=$script_root/bin
+vstart_path=$(dirname "$0")
-[ "$#" -lt 2 ] && echo "usage: $0 <name> <port> [params...]" && exit 1
+[ "$#" -lt 3 ] && echo "usage: $0 <name> <port> <ssl-port> [params...]" && exit 1
name=$1
port=$2
+ssl_port=$3
+cert_param=""
+port_param="port=$port"
+
+if [ "$ssl_port" -gt 0 ]; then
+ cert_param="ssl_certificate=./cert.pem"
+ if [ "$rgw_frontend" = "civetweb" ]; then
+ port_param="port=${port} port=${ssl_port}s"
+ else
+ port_param="port=${port} ssl_port=${ssl_port}"
+ fi
+fi
-if [ ! -z "$RGW_FRONTEND_THREADS" ]; then
+if [ -n "$RGW_FRONTEND_THREADS" ]; then
set_frontend_threads="num_threads=$RGW_FRONTEND_THREADS"
fi
-shift 2
+shift 3
run_root=$script_root/run/$name
pidfile=$run_root/out/radosgw.${port}.pid
asokfile=$run_root/out/radosgw.${port}.asok
logfile=$run_root/out/radosgw.${port}.log
-$vstart_path/mstop.sh $name radosgw $port
+"$vstart_path"/mstop.sh "$name" radosgw "$port"
-$vstart_path/mrun $name ceph -c $run_root/ceph.conf \
- -k $run_root/keyring auth get-or-create client.rgw.$port mon \
- 'allow rw' osd 'allow rwx' mgr 'allow rw' >> $run_root/keyring
+"$vstart_path"/mrun "$name" ceph -c "$run_root"/ceph.conf \
+ -k "$run_root"/keyring auth get-or-create client.rgw."$port" mon \
+ 'allow rw' osd 'allow rwx' mgr 'allow rw' >> "$run_root"/keyring
-$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port $set_frontend_threads" \
- -n client.rgw.$port --pid-file=$pidfile \
- --admin-socket=$asokfile "$@" --log-file=$logfile
+"$vstart_path"/mrun "$name" radosgw --rgw-frontends="$rgw_frontend $port_param $set_frontend_threads $cert_param" \
+ -n client.rgw."$port" --pid-file="$pidfile" \
+ --admin-socket="$asokfile" "$@" --log-file="$logfile"
rgw_perf_counters.cc
rgw_rest_iam.cc
rgw_object_lock.cc
- rgw_kms.cc)
+ rgw_kms.cc
+ rgw_url.cc)
if(WITH_RADOSGW_AMQP_ENDPOINT)
list(APPEND librgw_common_srcs rgw_amqp.cc)
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
-//#include "include/compat.h"
#include "rgw_kafka.h"
+#include "rgw_url.h"
#include <librdkafka/rdkafka.h>
#include "include/ceph_assert.h"
#include <sstream>
#include <cstring>
-#include <regex>
#include <unordered_map>
#include <string>
#include <vector>
namespace rgw::kafka {
// status codes for publishing
+// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
static const int STATUS_CONNECTION_CLOSED = -0x1002;
static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
// status code for connection opening
-static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
-static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002;
-static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003;
+static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
-static const int STATUS_CREATE_TOPIC_FAILED = -0x3008;
-static const int NO_REPLY_CODE = 0x0;
static const int STATUS_OK = 0x0;
// struct for holding the callback and its tag in the callback list
uint64_t delivery_tag = 1;
int status;
mutable std::atomic<int> ref_count = 0;
- CephContext* cct = nullptr;
+ CephContext* const cct;
CallbackList callbacks;
+ const std::string broker;
+ const bool use_ssl;
+ const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
+ const boost::optional<std::string> ca_location;
+ const std::string user;
+ const std::string password;
// cleanup of all internal connection resource
// the object can still remain, and internal connection
return (producer != nullptr && !marked_for_deletion);
}
+ // ctor for setting immutable values
+ connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
+ const boost::optional<const std::string&>& _ca_location,
+ const std::string& _user, const std::string& _password) :
+ cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
+
// dtor also destroys the internals
~connection_t() {
destroy(STATUS_CONNECTION_CLOSED);
friend void intrusive_ptr_release(const connection_t* p);
};
+std::string to_string(const connection_ptr_t& conn) {
+ std::string str;
+ str += "\nBroker: " + conn->broker;
+ str += conn->use_ssl ? "\nUse SSL" : "";
+ str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
+ return str;
+}
// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
void intrusive_ptr_add_ref(const connection_t* p) {
++p->ref_count;
// convert int status to string - including RGW specific values
std::string status_to_string(int s) {
switch (s) {
+ case STATUS_OK:
+ return "STATUS_OK";
case STATUS_CONNECTION_CLOSED:
return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
case STATUS_QUEUE_FULL:
return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
case STATUS_CONF_ALLOC_FAILED:
return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
- case STATUS_CREATE_PRODUCER_FAILED:
- return "STATUS_CREATE_PRODUCER_FAILED";
- case STATUS_CREATE_TOPIC_FAILED:
- return "STATUS_CREATE_TOPIC_FAILED";
}
- // TODO: how to handle "s" in this case?
- return std::string(rd_kafka_err2str(rd_kafka_last_error()));
+ return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
}
void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
if (tag_it != callbacks_end) {
ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
- *tag << " and result=" << result << dendl;
+ *tag << " and result=" << rd_kafka_err2str(result) << dendl;
tag_it->cb(result);
conn->callbacks.erase(tag_it);
} else {
}
// utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) {
+connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
ceph_assert(conn && !conn->marked_for_deletion);
// reset all status codes
conn->status = STATUS_OK;
-
- char errstr[512];
+ char errstr[512] = {0};
conn->temp_conf = rd_kafka_conf_new();
if (!conn->temp_conf) {
}
// get list of brokers based on the bootsrap broker
- if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- conn->status = STATUS_GET_BROKER_LIST_FAILED;
- // TODO: use errstr
- return conn;
+ if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+ if (conn->use_ssl) {
+ if (!conn->user.empty()) {
+ // use SSL+SASL
+ if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
+ } else {
+ // use only SSL
+ if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
+ }
+ if (conn->ca_location) {
+ if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
+ } else {
+ ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
+ }
+ // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
+ // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
}
// set the global callback for delivery success/fail
rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
// set the global opaque pointer to be the connection itself
- rd_kafka_conf_set_opaque (conn->temp_conf, conn.get());
+ rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
// create the producer
conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
- if (conn->producer) {
- conn->status = STATUS_CREATE_PRODUCER_FAILED;
- // TODO: use errstr
+ if (!conn->producer) {
+ conn->status = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
return conn;
}
+ ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
// conf ownership passed to producer
conn->temp_conf = nullptr;
return conn;
-}
+conf_error:
+ conn->status = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
+ return conn;
+}
// utility function to create a new connection
-connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) {
+connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ const std::string& user,
+ const std::string& password) {
// create connection state
- connection_ptr_t conn = new connection_t;
- conn->cct = cct;
- return create_connection(conn, broker);
+ connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
+ return create_connection(conn);
}
/// struct used for holding messages in the message queue
reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
};
-// parse a URL of the form: kafka://<host>[:port]
-// to a: host[:port]
-int parse_url(const std::string& url, std::string& broker) {
- std::regex url_regex (
- R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)",
- std::regex::extended
- );
- const auto HOST_AND_PORT = 4;
- std::smatch url_match_result;
- if (std::regex_match(url, url_match_result, url_regex)) {
- broker = url_match_result[HOST_AND_PORT];
- return 0;
- }
- return -1;
-}
-
typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
- ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
if (message->cb) {
- message->cb(STATUS_CONNECTION_CLOSED);
+ message->cb(conn->status);
}
return;
}
if (topic_it == conn->topics.end()) {
topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
if (!topic) {
- ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl;
+ const auto err = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
if (message->cb) {
- message->cb(STATUS_CREATE_TOPIC_FAILED);
+ message->cb(err);
}
- conn->destroy(STATUS_CREATE_TOPIC_FAILED);
+ conn->destroy(err);
return;
}
// TODO use the topics list as an LRU cache
tag);
if (rc == -1) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
- // TODO: dont error on full queue, retry instead
+ ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+ // TODO: dont error on full queue, and don't destroy connection, retry instead
// immediatly invoke callback on error if needed
if (message->cb) {
message->cb(err);
}
+ conn->destroy(err);
delete tag;
}
ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
conn->callbacks.emplace_back(*tag, message->cb);
} else {
- // immediately invoke callback with error
+ // immediately invoke callback with error - this is not a connection error
ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
message->cb(STATUS_MAX_INFLIGHT);
// tag will be deleted when the global callback is invoked
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
+ ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
const auto& broker = conn_it->first;
ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
- if (create_connection(conn, broker)->is_ok() == false) {
+ if (create_connection(conn)->is_ok() == false) {
ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url) {
+ connection_ptr_t connect(const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
}
std::string broker;
- if (0 != parse_url(url, broker)) {
+ std::string user;
+ std::string password;
+ if (!parse_url_authority(url, broker, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return nullptr;
}
+ // this should be validated by the regex in parse_url()
+ ceph_assert(user.empty() == password.empty());
+
+ if (!user.empty() && !use_ssl) {
+ ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
+ return nullptr;
+ }
+
std::lock_guard lock(connections_lock);
const auto it = connections.find(broker);
+ // note that ssl vs. non-ssl connection to the same host are two separate conenctions
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(broker, cct);
+ const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
ceph_assert(conn);
++connection_count;
ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
- ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl;
return connections.emplace(broker, conn).first->second;
}
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url) {
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (!s_manager) return nullptr;
- return s_manager->connect(url);
+ return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
}
int publish(connection_ptr_t& conn,
#include <string>
#include <functional>
#include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
class CephContext;
void shutdown();
// connect to a kafka endpoint
-connection_ptr_t connect(const std::string& url);
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location);
// publish a message over a connection that was already created
int publish(connection_ptr_t& conn,
// disconnect from a kafka broker
bool disconnect(connection_ptr_t& conn);
+// display connection as string
+std::string to_string(const connection_ptr_t& conn);
+
}
std::string push_endpoint;
std::string push_endpoint_args;
std::string arn_topic;
+ bool stored_secret = false;
void encode(bufferlist& bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(4, 1, bl);
encode(bucket_name, bl);
encode(oid_prefix, bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
+ encode(stored_secret, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(3, bl);
+ DECODE_START(4, bl);
decode(bucket_name, bl);
decode(oid_prefix, bl);
decode(push_endpoint, bl);
if (struct_v >= 3) {
decode(arn_topic, bl);
}
+ if (struct_v >= 4) {
+ decode(stored_secret, bl);
+ }
DECODE_FINISH(bl);
}
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
- const ack_level_t ack_level; // TODO not used for now
+ [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now
public:
AckPublishCR(CephContext* cct,
static const std::string AMQP_SCHEMA("amqp");
#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
+
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
private:
Broker,
};
CephContext* const cct;
- const std::string endpoint;
const std::string topic;
kafka::connection_ptr_t conn;
- ack_level_t ack_level;
- std::string str_ack_level;
+ const ack_level_t ack_level;
+
+ static bool get_verify_ssl(const RGWHTTPArgs& args) {
+ bool exists;
+ auto str_verify_ssl = args.get("verify-ssl", &exists);
+ if (!exists) {
+ // verify server certificate by default
+ return true;
+ }
+ boost::algorithm::to_lower(str_verify_ssl);
+ if (str_verify_ssl == "true") {
+ return true;
+ }
+ if (str_verify_ssl == "false") {
+ return false;
+ }
+ throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
+ }
+
+ static bool get_use_ssl(const RGWHTTPArgs& args) {
+ bool exists;
+ auto str_use_ssl = args.get("use-ssl", &exists);
+ if (!exists) {
+ // by default ssl not used
+ return false;
+ }
+ boost::algorithm::to_lower(str_use_ssl);
+ if (str_use_ssl == "true") {
+ return true;
+ }
+ if (str_use_ssl == "false") {
+ return false;
+ }
+ throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
+ }
+
+ static ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+ bool exists;
+ // get ack level
+ const auto str_ack_level = args.get("kafka-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ return ack_level_t::Broker;
+ }
+ if (str_ack_level == "none") {
+ return ack_level_t::None;
+ }
+ throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+ }
// NoAckPublishCR implements async kafka publishing via coroutine
// This coroutine ends when it send the message and does not wait for an ack
const RGWHTTPArgs& args,
CephContext* _cct) :
cct(_cct),
- endpoint(_endpoint),
topic(_topic),
- conn(kafka::connect(endpoint)) {
+ conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
+ ack_level(get_ack_level(args)) {
if (!conn) {
- throw configuration_error("Kafka: failed to create connection to: " + endpoint);
- }
- bool exists;
- // get ack level
- str_ack_level = args.get("kafka-ack-level", &exists);
- if (!exists || str_ack_level == "broker") {
- // "broker" is default
- ack_level = ack_level_t::Broker;
- } else if (str_ack_level == "none") {
- ack_level = ack_level_t::None;
- } else {
- throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+ throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
}
}
std::string to_str() const override {
std::string str("Kafka Endpoint");
- str += "\nURI: " + endpoint;
+ str += kafka::to_string(conn);
str += "\nTopic: " + topic;
- str += "\nAck Level: " + str_ack_level;
return str;
}
};
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+
// command (AWS compliant):
// POST
// Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
int get_params() override {
topic_name = s->info.args.get("Name");
if (topic_name.empty()) {
- ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
- return -EINVAL;
+ ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+ return -EINVAL;
}
dest.push_endpoint = s->info.args.get("push-endpoint");
+
+ if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+ return -EINVAL;
+ }
for (const auto param : s->info.args.get_params()) {
- if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
- continue;
- }
- dest.push_endpoint_args.append(param.first+"="+param.second+"&");
+ if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
+ continue;
+ }
+ dest.push_endpoint_args.append(param.first+"="+param.second+"&");
}
if (!dest.push_endpoint_args.empty()) {
- // remove last separator
- dest.push_endpoint_args.pop_back();
+ // remove last separator
+ dest.push_endpoint_args.pop_back();
}
// dest object only stores endpoint info
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
- ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
- return -EINVAL;
+ ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
}
topic_name = topic_arn->resource;
// ctor and set are done according to the "type" argument
// if type is not "key" or "value" its a no-op
class Attribute {
- std::string key;
- std::string value;
+ std::string key;
+ std::string value;
public:
- Attribute(const std::string& type, const std::string& key_or_value) {
- set(type, key_or_value);
- }
- void set(const std::string& type, const std::string& key_or_value) {
- if (type == "key") {
- key = key_or_value;
- } else if (type == "value") {
- value = key_or_value;
- }
+ Attribute(const std::string& type, const std::string& key_or_value) {
+ set(type, key_or_value);
+ }
+ void set(const std::string& type, const std::string& key_or_value) {
+ if (type == "key") {
+ key = key_or_value;
+ } else if (type == "value") {
+ value = key_or_value;
}
- const std::string& get_key() const { return key; }
- const std::string& get_value() const { return value; }
+ }
+ const std::string& get_key() const { return key; }
+ const std::string& get_value() const { return value; }
};
using AttributeMap = std::map<unsigned, Attribute>;
if (store->getRados()->get_sync_module()) {
const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
if (psmodule) {
- const auto& conf = psmodule->get_effective_conf();
- data_bucket_prefix = conf["data_bucket_prefix"];
- data_oid_prefix = conf["data_oid_prefix"];
- // TODO: allow "push-only" on PS zone as well
- push_only = false;
+ const auto& conf = psmodule->get_effective_conf();
+ data_bucket_prefix = conf["data_bucket_prefix"];
+ data_oid_prefix = conf["data_oid_prefix"];
+ // TODO: allow "push-only" on PS zone as well
+ push_only = false;
}
}
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "rgw_common.h"
#include "rgw_rest_pubsub_common.h"
#include "common/dout.h"
+#include "rgw_url.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+ if (dest.push_endpoint.empty()) {
+ return true;
+ }
+ std::string user;
+ std::string password;
+ if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+ ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+ return false;
+ }
+ // this should be verified inside parse_url()
+ ceph_assert(user.empty() == password.empty());
+ if (!user.empty()) {
+ dest.stored_secret = true;
+ if (!rgw_transport_is_secure(cct, env)) {
+ ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl;
+ return false;
+ }
+ }
+ return true;
+}
+
+bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) {
+ return sub.dest.stored_secret;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
+ return topic.topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+ for (const auto& topic : topics.topics) {
+ if (topic_has_endpoint_secret(topic.second)) return true;
+ }
+ return false;
+}
void RGWPSCreateTopicOp::execute() {
op_ret = get_params();
if (op_ret < 0) {
void RGWPSListTopicsOp::execute() {
ups.emplace(store, s->owner.get_id());
op_ret = ups->get_user_topics(&result);
+ // if there are no topics it is not considered an error
+ op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
return;
}
+ if (topics_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
ldout(s->cct, 20) << "successfully got topics" << dendl;
}
}
ups.emplace(store, s->owner.get_id());
op_ret = ups->get_topic(topic_name, &result);
+ if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
ups.emplace(store, s->owner.get_id());
auto sub = ups->get_sub(sub_name);
op_ret = sub->get_conf(&result);
+ if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
return -EPERM;
}
#include "rgw_op.h"
#include "rgw_pubsub.h"
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env);
+
// create a topic
class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
protected:
topic_name = s->object.name;
dest.push_endpoint = s->info.args.get("push-endpoint");
+
+ if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+ return -EINVAL;
+ }
dest.push_endpoint_args = s->info.args.get_str();
// dest object only stores endpoint info
// bucket to store events/records will be set only when subscription is created
const auto& conf = psmodule->get_effective_conf();
dest.push_endpoint = s->info.args.get("push-endpoint");
+ if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+ return -EINVAL;
+ }
+ dest.push_endpoint_args = s->info.args.get_str();
dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
- dest.push_endpoint_args = s->info.args.get_str();
dest.arn_topic = topic_name;
return 0;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <string>
+#include <regex>
+
+namespace rgw {
+
+namespace {
+ const auto USER_GROUP_IDX = 3;
+ const auto PASSWORD_GROUP_IDX = 4;
+ const auto HOST_GROUP_IDX = 5;
+
+ const std::string schema_re = "([[:alpha:]]+:\\/\\/)";
+ const std::string user_pass_re = "(([^:\\s]+):([^@\\s]+)@)?";
+ const std::string host_port_re = "([[:alnum:].:-]+)";
+}
+
+bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password) {
+ const std::string re = schema_re + user_pass_re + host_port_re;
+ const std::regex url_regex(re, std::regex::icase);
+ std::smatch url_match_result;
+
+ if (std::regex_match(url, url_match_result, url_regex)) {
+ host = url_match_result[HOST_GROUP_IDX];
+ user = url_match_result[USER_GROUP_IDX];
+ password = url_match_result[PASSWORD_GROUP_IDX];
+ return true;
+ }
+
+ return false;
+}
+
+bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password) {
+ const std::string re = schema_re + user_pass_re + host_port_re;
+ const std::regex url_regex(re);
+ std::smatch url_match_result;
+
+ if (std::regex_match(url, url_match_result, url_regex)) {
+ user = url_match_result[USER_GROUP_IDX];
+ password = url_match_result[PASSWORD_GROUP_IDX];
+ return true;
+ }
+
+ return false;
+}
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+namespace rgw {
+// parse a URL of the form: http|https|amqp|amqps|kafka://[user:password@]<host>[:port]
+bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password);
+bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password);
+}
+
target_link_libraries(unittest_rgw_kms ${rgw_libs})
+# unittest_rgw_url
+add_executable(unittest_rgw_url test_rgw_url.cc)
+add_ceph_unittest(unittest_rgw_url)
+
+target_link_libraries(unittest_rgw_url ${rgw_libs})
+
add_executable(ceph_test_rgw_gc_log test_rgw_gc_log.cc $<TARGET_OBJECTS:unit-main>)
target_link_libraries(ceph_test_rgw_gc_log ${rgw_libs} radostest-cxx)
install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR})
calling_format = boto.s3.connection.OrdinaryCallingFormat())
return gateway.connection
+def get_gateway_secure_connection(gateway, credentials):
+ """ secure connect to the given gateway """
+ if gateway.ssl_port == 0:
+ return None
+ if gateway.secure_connection is None:
+ gateway.secure_connection = boto.connect_s3(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = gateway.host,
+ port = gateway.ssl_port,
+ is_secure = True,
+ validate_certs=False,
+ calling_format = boto.s3.connection.OrdinaryCallingFormat())
+ return gateway.secure_connection
import json
-from .conn import get_gateway_connection
+from .conn import get_gateway_connection, get_gateway_secure_connection
class Cluster:
""" interface to run commands against a distinct ceph cluster """
""" interface to control a single radosgw instance """
__metaclass__ = ABCMeta
- def __init__(self, host = None, port = None, cluster = None, zone = None, proto = 'http', connection = None):
+ def __init__(self, host = None, port = None, cluster = None, zone = None, ssl_port = 0):
self.host = host
self.port = port
self.cluster = cluster
self.zone = zone
- self.proto = proto
- self.connection = connection
+ self.connection = None
+ self.secure_connection = None
+ self.ssl_port = ssl_port
@abstractmethod
def start(self, args = []):
pass
def endpoint(self):
- return '%s://%s:%d' % (self.proto, self.host, self.port)
+ return 'http://%s:%d' % (self.host, self.port)
class SystemObject:
""" interface for system objects, represented in json format and
if self.zone.gateways is not None:
self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials)
+ self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials)
def get_connection(self):
return self.conn
num_buckets = 0
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
-def get_gateway_connection(gateway, credentials):
- """ connect to the given gateway """
- if gateway.connection is None:
- gateway.connection = boto.connect_s3(
- aws_access_key_id = credentials.access_key,
- aws_secret_access_key = credentials.secret,
- host = gateway.host,
- port = gateway.port,
- is_secure = False,
- calling_format = boto.s3.connection.OrdinaryCallingFormat())
- return gateway.connection
-
def get_zone_connection(zone, credentials):
""" connect to the zone's first gateway """
if isinstance(credentials, list):
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic):
+ def __init__(self, topic, security_type):
from kafka import KafkaConsumer
remaining_retries = 10
+ port = 9092
+ if security_type != 'PLAINTEXT':
+ security_type = 'SSL'
+ port = 9093
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server)
+ self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
print('Kafka consumer created on topic: '+topic)
break
except Exception as error:
try:
log.info('Kafka receiver started')
print('Kafka receiver started')
- for msg in receiver.consumer:
- if receiver.stop:
- break
- receiver.events.append(json.loads(msg.value))
+ while not receiver.stop:
+ for msg in receiver.consumer:
+ receiver.events.append(json.loads(msg.value))
+ timer.sleep(0.1)
log.info('Kafka receiver ended')
print('Kafka receiver ended')
except Exception as error:
print('Kafka receiver ended unexpectedly: ' + str(error))
-def create_kafka_receiver_thread(topic):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
"""create kafka receiver and thread"""
- receiver = KafkaReceiver(topic)
+ receiver = KafkaReceiver(topic, security_type)
task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
def stop_kafka_receiver(receiver, task):
"""stop the receiver thread and wait for it to finis"""
receiver.stop = True
- task.join(5)
+ task.join(1)
try:
receiver.consumer.close()
except Exception as error:
log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
+# follow the instruction here to create and sign a broker certificate:
+# https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
+
+# the generated broker certificate should be stored in the java keystore for the use of the server
+# assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
+# following lines must be added to $KAFKA_DIR/config/server.properties
+# listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
+# sasl.enabled.mechanisms=PLAIN
+# ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
+# ssl.keystore.password = abcdefgh
+# ssl.key.password = abcdefgh
+# ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
+# ssl.truststore.password = abcdefgh
+
+# notes:
+# (1) we dont test client authentication, hence, no need to generate client keys
+# (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
+# should be copied to: $KAFKA_DIR
+
def init_kafka():
""" start kafka/zookeeper """
- KAFKA_DIR = os.environ['KAFKA_DIR']
+ try:
+ KAFKA_DIR = os.environ['KAFKA_DIR']
+ except:
+ KAFKA_DIR = ''
+
if KAFKA_DIR == '':
log.info('KAFKA_DIR must be set to where kafka is installed')
print('KAFKA_DIR must be set to where kafka is installed')
print('Starting kafka...')
kafka_log = open('./kafka.log', 'w')
try:
+ kafka_env = os.environ.copy()
+ kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf'
kafka_proc = subprocess.Popen([
KAFKA_DIR+'bin/kafka-server-start.sh',
KAFKA_DIR+'config/server.properties'],
- stdout=kafka_log)
+ stdout=kafka_log,
+ env=kafka_env)
except Exception as error:
log.info('failed to execute kafka: %s', str(error))
print('failed to execute kafka: %s' % str(error))
""" stop kafka/zookeeper """
try:
kafka_log.close()
+ print('Shutdown Kafka...')
kafka_proc.terminate()
+ time.sleep(5)
+ if kafka_proc.poll() is None:
+ print('Failed to shutdown Kafka... killing')
+ kafka_proc.kill()
+ print('Shutdown zookeeper...')
zk_proc.terminate()
+ time.sleep(5)
+ if zk_proc.poll() is None:
+ print('Failed to shutdown zookeeper... killing')
+ zk_proc.kill()
except:
log.info('kafka/zookeeper already terminated')
# delete the bucket
zones[0].delete_bucket(bucket_name)
+
def test_ps_s3_topic_on_master():
- """ test s3 notification set/get/delete on master """
+ """ test s3 topics set/get/delete on master """
zones, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
topic_name = bucket_name + TOPIC_SUFFIX
# clean all topics
- delete_all_s3_topics(zones[0].conn, zonegroup.name)
+ delete_all_s3_topics(zones[0], zonegroup.name)
# create s3 topics
endpoint_address = 'amqp://127.0.0.1:7001'
assert_equal(status, 404)
# get the remaining 2 topics
- result = topic_conf1.get_list()
- assert_equal(len(result['Topics']), 2)
+ result, status = topic_conf1.get_list()
+ assert_equal(status, 200)
+ assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
# delete topics
result = topic_conf2.del_config()
# assert_equal(status, 200)
# get topic list, make sure it is empty
- result = topic_conf1.get_list()
- assert_equal(len(result['Topics']), 0)
+ result, status = topic_conf1.get_list()
+ assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
+
+
+def test_ps_s3_topic_with_secret_on_master():
+ """ test s3 topics with secret set/get/delete on master """
+ zones, _ = init_env(require_ps=False)
+ if zones[0].secure_conn is None:
+ return SkipTest('secure connection is needed to test topic with secrets')
+
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # clean all topics
+ delete_all_s3_topics(zones[0], zonegroup.name)
+
+ # create s3 topics
+ endpoint_address = 'amqp://user:password@127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ try:
+ result = bad_topic_conf.set_config()
+ except Exception as err:
+ print 'Error is expected: ' + str(err)
+ else:
+ assert False, 'user password configuration set allowed only over HTTPS'
+
+ topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+ _, status = bad_topic_conf.get_config()
+ assert_equal(status/100, 4)
+
+ # get topic
+ result, status = topic_conf.get_config()
+ assert_equal(status, 200)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+
+ _, status = bad_topic_conf.get_config()
+ assert_equal(status/100, 4)
+
+ _, status = topic_conf.get_list()
+ assert_equal(status/100, 2)
+
+ # delete topics
+ result = topic_conf.del_config()
def test_ps_s3_notification_on_master():
clean_kafka(kafka_proc, zk_proc, kafka_log)
+def kafka_security(security_type):
+ """ test pushing kafka s3 notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ zones, _ = init_env(require_ps=False)
+ if security_type == 'SSL_SASL' and zones[0].secure_conn is None:
+ return SkipTest("secure connection is needed to test SASL_SSL security")
+ kafka_proc, zk_proc, kafka_log = init_kafka()
+ if kafka_proc is None or zk_proc is None:
+ return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+
+ # create s3 topic
+ if security_type == 'SSL_SASL':
+ endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ else:
+ # ssl only
+ endpoint_address = 'kafka://' + kafka_server + ':9093'
+
+ KAFKA_DIR = os.environ['KAFKA_DIR']
+
+ # without acks from broker, with root CA
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
+
+ if security_type == 'SSL_SASL':
+ topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ else:
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf.set_config()
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ try:
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+ except Exception as err:
+ assert False, str(err)
+ finally:
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ for key in bucket.list():
+ key.delete()
+ zones[0].delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+ clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
+def test_ps_s3_notification_push_kafka_security_ssl():
+ kafka_security('SSL')
+
+def test_ps_s3_notification_push_kafka_security_ssl_sasl():
+ kafka_security('SSL_SASL')
+
+
def test_ps_s3_notification_push_http_on_master():
""" test pushing http s3 notification on master """
if skip_push_tests:
import logging
import httplib
+import ssl
import urllib
import urlparse
import hmac
return self.send_request('GET', get_list=True)
-def delete_all_s3_topics(conn, region):
+def delete_all_s3_topics(zone, region):
try:
+ conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
+ protocol = 'https' if conn.is_secure else 'http'
client = boto3.client('sns',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key,
- region_name=region,
- config=Config(signature_version='s3'))
+ endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ verify='./cert.pem',
+ config=Config(signature_version='s3'))
topics = client.list_topics()['Topics']
for topic in topics:
print('topic cleanup, deleting: ' + topic['TopicArn'])
assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
- except:
- print('failed to do topic cleanup. if there are topics '
- 'they may need to be manually deleted')
+ except Exception as err:
+ print 'failed to do topic cleanup: ' + str(err)
class PSTopicS3:
self.attributes = {}
if endpoint_args is not None:
self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
- self.client = boto3.client('sns',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key,
- region_name=region,
- config=Config(signature_version='s3'))
+ protocol = 'https' if conn.is_secure else 'http'
+ self.client = boto3.client('sns',
+ endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ verify='./cert.pem',
+ config=Config(signature_version='s3'))
def get_config(self):
'Date': string_date,
'Host': self.conn.host+':'+str(self.conn.port),
'Content-Type': content_type}
- http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
- if log.getEffectiveLevel() <= 10:
- http_conn.set_debuglevel(5)
+ if self.conn.is_secure:
+ http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
http_conn.request(method, resource, body, headers)
response = http_conn.getresponse()
data = response.read()
def get_list(self):
"""list all topics"""
- return self.client.list_topics()
+ # note that boto3 supports list_topics(), however, the result only show ARNs
+ parameters = {'Action': 'ListTopics'}
+ body = urllib.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest())
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ if self.conn.is_secure:
+ http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
class PSNotification:
# e.g. RGW_FRONTEND=civetweb
# to run test under valgrind memcheck, set RGW_VALGRIND to 'yes'
# e.g. RGW_VALGRIND=yes
- cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
+ cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port), str(self.ssl_port)]
if self.id:
cmd += ['-i', self.id]
cmd += ['--debug-rgw=20', '--debug-ms=1']
'checkpoint_retries': 60,
'checkpoint_delay': 5,
'reconfigure_delay': 5,
+ 'use_ssl': 'false',
})
try:
path = os.environ['RGW_MULTI_TEST_CONF']
parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
parser.add_argument('--num-ps-zones', type=int, default=cfg.getint(section, 'num_ps_zones'))
+ parser.add_argument('--use-ssl', type=bool, default=cfg.getboolean(section, 'use_ssl'))
es_cfg = []
num_az_zones = cfg.getint(section, 'num_az_zones')
num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf
- print('num_ps_zones = ' + str(num_ps_zones))
num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones + num_az_zones
+ use_ssl = cfg.getboolean(section, 'use_ssl')
+
+ if use_ssl and bootstrap:
+ cmd = ['openssl', 'req',
+ '-x509',
+ '-newkey', 'rsa:4096',
+ '-sha256',
+ '-nodes',
+ '-keyout', 'key.pem',
+ '-out', 'cert.pem',
+ '-subj', '/CN=localhost',
+ '-days', '3650']
+ bash(cmd)
+ # append key to cert
+ fkey = open('./key.pem', 'r')
+ if fkey.mode == 'r':
+ fcert = open('./cert.pem', 'a')
+ fcert.write(fkey.read())
+ fcert.close()
+ fkey.close()
+
for zg in range(0, args.num_zonegroups):
zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
period.zonegroups.append(zonegroup)
if bootstrap:
period.update(zone, commit=True)
+ ssl_port_offset = 1000
# start the gateways
for g in range(0, args.gateways_per_zone):
port = gateway_port(zg, g + z * args.gateways_per_zone)
client_id = gateway_name(zg, z, g)
- gateway = Gateway(client_id, 'localhost', port, cluster, zone)
+ gateway = Gateway(client_id, 'localhost', port, cluster, zone,
+ ssl_port = port+ssl_port_offset if use_ssl else 0)
if bootstrap:
gateway.start()
zone.gateways.append(gateway)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw/rgw_url.h"
+#include <string>
+#include <gtest/gtest.h>
+
+using namespace rgw;
+
+TEST(TestURL, SimpleAuthority)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://example.com";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ ASSERT_TRUE(user.empty());
+ ASSERT_TRUE(password.empty());
+ EXPECT_STREQ(host.c_str(), "example.com");
+}
+
+TEST(TestURL, IPAuthority)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://1.2.3.4";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ ASSERT_TRUE(user.empty());
+ ASSERT_TRUE(password.empty());
+ EXPECT_STREQ(host.c_str(), "1.2.3.4");
+}
+
+TEST(TestURL, IPv6Authority)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://FE80:CD00:0000:0CDE:1257:0000:211E:729C";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ ASSERT_TRUE(user.empty());
+ ASSERT_TRUE(password.empty());
+ EXPECT_STREQ(host.c_str(), "FE80:CD00:0000:0CDE:1257:0000:211E:729C");
+}
+
+TEST(TestURL, AuthorityWithUserinfo)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://user:password@example.com";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ EXPECT_STREQ(host.c_str(), "example.com");
+ EXPECT_STREQ(user.c_str(), "user");
+ EXPECT_STREQ(password.c_str(), "password");
+}
+
+TEST(TestURL, AuthorityWithPort)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://user:password@example.com:1234";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ EXPECT_STREQ(host.c_str(), "example.com:1234");
+ EXPECT_STREQ(user.c_str(), "user");
+ EXPECT_STREQ(password.c_str(), "password");
+}
+
+TEST(TestURL, DifferentSchema)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "kafka://example.com";
+ ASSERT_TRUE(parse_url_authority(url, host, user, password));
+ ASSERT_TRUE(user.empty());
+ ASSERT_TRUE(password.empty());
+ EXPECT_STREQ(host.c_str(), "example.com");
+}
+
+TEST(TestURL, InvalidHost)
+{
+ std::string host;
+ std::string user;
+ std::string password;
+ const std::string url = "http://exa_mple.com";
+ ASSERT_FALSE(parse_url_authority(url, host, user, password));
+}
+