From 529d5c63623a02c05ee70e9e8658b9ade05469ff Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 3 Nov 2019 20:58:58 +0200 Subject: [PATCH] rgw/pubsub: add ssl+sasl security to kafka Signed-off-by: Yuval Lifshitz --- doc/radosgw/notifications.rst | 16 +- doc/radosgw/pubsub-module.rst | 38 +++- src/mrgw.sh | 44 +++-- src/rgw/CMakeLists.txt | 3 +- src/rgw/rgw_kafka.cc | 166 +++++++++++------- src/rgw/rgw_kafka.h | 6 +- src/rgw/rgw_pubsub.h | 9 +- src/rgw/rgw_pubsub_push.cc | 75 +++++--- src/rgw/rgw_rest_pubsub.cc | 61 ++++--- src/rgw/rgw_rest_pubsub_common.cc | 57 +++++- src/rgw/rgw_rest_pubsub_common.h | 5 + src/rgw/rgw_sync_module_pubsub_rest.cc | 9 +- src/rgw/rgw_url.cc | 48 +++++ src/rgw/rgw_url.h | 12 ++ src/test/rgw/CMakeLists.txt | 6 + src/test/rgw/rgw_multi/conn.py | 14 ++ src/test/rgw/rgw_multi/multisite.py | 12 +- src/test/rgw/rgw_multi/tests.py | 12 -- src/test/rgw/rgw_multi/tests_ps.py | 232 +++++++++++++++++++++++-- src/test/rgw/rgw_multi/zone_ps.py | 72 ++++++-- src/test/rgw/test_multi.py | 30 +++- src/test/rgw/test_rgw_url.cc | 90 ++++++++++ 22 files changed, 818 insertions(+), 199 deletions(-) create mode 100644 src/rgw/rgw_url.cc create mode 100644 src/rgw/rgw_url.h create mode 100644 src/test/rgw/test_rgw_url.cc diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index dd6762e10a569..43bcced787c7b 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -68,8 +68,10 @@ To update a topic, use the same command used for topic creation, with the topic &push-endpoint= [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=] [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker] - [&Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false] + [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false] [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker] + [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false] + [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=] Request parameters: @@ -83,7 +85,8 @@ Request parameters: - AMQP0.9.1 endpoint - URI: ``amqp://[:@][:][/]`` - - user/password defaults to : guest/guest + - user/password defaults to: guest/guest + - user/password may only be provided over HTTPS. Topic creation request will be rejected if not - port defaults to: 5672 - vhost defaults to: "/" - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1) @@ -94,7 +97,11 @@ Request parameters: - Kafka endpoint - - URI: ``kafka://[::@][: +- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not + Notifications ~~~~~~~~~~~~~ diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 32ff8bf7448a1..a727ad72bcc82 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic :: - PUT /topics/[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]] + PUT /topics/[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] Request parameters: @@ -167,7 +167,8 @@ The endpoint URI may include parameters depending with the type of endpoint: - AMQP0.9.1 endpoint - URI: ``amqp://[:@][:][/]`` - - user/password defaults to : guest/guest + - user/password defaults to: guest/guest + - user/password may only be provided over HTTPS. Topic creation request will be rejected if not - port defaults to: 5672 - vhost defaults to: "/" - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1) @@ -178,7 +179,11 @@ The endpoint URI may include parameters depending with the type of endpoint: - Kafka endpoint - - URI: ``kafka://[::@][:?topic=[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]] + PUT /subscriptions/?topic=[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=]] Request parameters: @@ -363,7 +374,10 @@ The endpoint URI may include parameters depending with the type of endpoint: - Kafka endpoint - - URI: ``kafka://[::@][: [params...]" && exit 1 +[ "$#" -lt 3 ] && echo "usage: $0 [params...]" && exit 1 name=$1 port=$2 +ssl_port=$3 +cert_param="" +port_param="port=$port" + +if [ "$ssl_port" -gt 0 ]; then + cert_param="ssl_certificate=./cert.pem" + if [ "$rgw_frontend" = "civetweb" ]; then + port_param="port=${port} port=${ssl_port}s" + else + port_param="port=${port} ssl_port=${ssl_port}" + fi +fi -if [ ! -z "$RGW_FRONTEND_THREADS" ]; then +if [ -n "$RGW_FRONTEND_THREADS" ]; then set_frontend_threads="num_threads=$RGW_FRONTEND_THREADS" fi -shift 2 +shift 3 run_root=$script_root/run/$name pidfile=$run_root/out/radosgw.${port}.pid asokfile=$run_root/out/radosgw.${port}.asok logfile=$run_root/out/radosgw.${port}.log -$vstart_path/mstop.sh $name radosgw $port +"$vstart_path"/mstop.sh "$name" radosgw "$port" -$vstart_path/mrun $name ceph -c $run_root/ceph.conf \ - -k $run_root/keyring auth get-or-create client.rgw.$port mon \ - 'allow rw' osd 'allow rwx' mgr 'allow rw' >> $run_root/keyring +"$vstart_path"/mrun "$name" ceph -c "$run_root"/ceph.conf \ + -k "$run_root"/keyring auth get-or-create client.rgw."$port" mon \ + 'allow rw' osd 'allow rwx' mgr 'allow rw' >> "$run_root"/keyring -$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port $set_frontend_threads" \ - -n client.rgw.$port --pid-file=$pidfile \ - --admin-socket=$asokfile "$@" --log-file=$logfile +"$vstart_path"/mrun "$name" radosgw --rgw-frontends="$rgw_frontend $port_param $set_frontend_threads $cert_param" \ + -n client.rgw."$port" --pid-file="$pidfile" \ + --admin-socket="$asokfile" "$@" --log-file="$logfile" diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index d36d57adad604..1c95aed660d7e 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -142,7 +142,8 @@ set(librgw_common_srcs rgw_perf_counters.cc rgw_rest_iam.cc rgw_object_lock.cc - rgw_kms.cc) + rgw_kms.cc + rgw_url.cc) if(WITH_RADOSGW_AMQP_ENDPOINT) list(APPEND librgw_common_srcs rgw_amqp.cc) diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 4f7751ae6c6b3..dfaefdfb27071 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -1,13 +1,12 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp -//#include "include/compat.h" #include "rgw_kafka.h" +#include "rgw_url.h" #include #include "include/ceph_assert.h" #include #include -#include #include #include #include @@ -32,17 +31,14 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { namespace rgw::kafka { // status codes for publishing +// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client static const int STATUS_CONNECTION_CLOSED = -0x1002; static const int STATUS_QUEUE_FULL = -0x1003; static const int STATUS_MAX_INFLIGHT = -0x1004; static const int STATUS_MANAGER_STOPPED = -0x1005; // status code for connection opening -static const int STATUS_CONF_ALLOC_FAILED = -0x2001; -static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002; -static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003; +static const int STATUS_CONF_ALLOC_FAILED = -0x2001; -static const int STATUS_CREATE_TOPIC_FAILED = -0x3008; -static const int NO_REPLY_CODE = 0x0; static const int STATUS_OK = 0x0; // struct for holding the callback and its tag in the callback list @@ -70,8 +66,14 @@ struct connection_t { uint64_t delivery_tag = 1; int status; mutable std::atomic ref_count = 0; - CephContext* cct = nullptr; + CephContext* const cct; CallbackList callbacks; + const std::string broker; + const bool use_ssl; + const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6 + const boost::optional ca_location; + const std::string user; + const std::string password; // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -102,6 +104,12 @@ struct connection_t { return (producer != nullptr && !marked_for_deletion); } + // ctor for setting immutable values + connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, + const boost::optional& _ca_location, + const std::string& _user, const std::string& _password) : + cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {} + // dtor also destroys the internals ~connection_t() { destroy(STATUS_CONNECTION_CLOSED); @@ -111,6 +119,13 @@ struct connection_t { friend void intrusive_ptr_release(const connection_t* p); }; +std::string to_string(const connection_ptr_t& conn) { + std::string str; + str += "\nBroker: " + conn->broker; + str += conn->use_ssl ? "\nUse SSL" : ""; + str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : ""; + return str; +} // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr void intrusive_ptr_add_ref(const connection_t* p) { ++p->ref_count; @@ -124,6 +139,8 @@ void intrusive_ptr_release(const connection_t* p) { // convert int status to string - including RGW specific values std::string status_to_string(int s) { switch (s) { + case STATUS_OK: + return "STATUS_OK"; case STATUS_CONNECTION_CLOSED: return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; case STATUS_QUEUE_FULL: @@ -134,13 +151,8 @@ std::string status_to_string(int s) { return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; case STATUS_CONF_ALLOC_FAILED: return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; - case STATUS_CREATE_PRODUCER_FAILED: - return "STATUS_CREATE_PRODUCER_FAILED"; - case STATUS_CREATE_TOPIC_FAILED: - return "STATUS_CREATE_TOPIC_FAILED"; } - // TODO: how to handle "s" in this case? - return std::string(rd_kafka_err2str(rd_kafka_last_error())); + return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); } void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { @@ -160,7 +172,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag); if (tag_it != callbacks_end) { ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << - *tag << " and result=" << result << dendl; + *tag << " and result=" << rd_kafka_err2str(result) << dendl; tag_it->cb(result); conn->callbacks.erase(tag_it); } else { @@ -173,14 +185,13 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* } // utility function to create a connection, when the connection object already exists -connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) { +connection_ptr_t& create_connection(connection_ptr_t& conn) { // pointer must be valid and not marked for deletion ceph_assert(conn && !conn->marked_for_deletion); // reset all status codes conn->status = STATUS_OK; - - char errstr[512]; + char errstr[512] = {0}; conn->temp_conf = rd_kafka_conf_new(); if (!conn->temp_conf) { @@ -189,38 +200,68 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& b } // get list of brokers based on the bootsrap broker - if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { - conn->status = STATUS_GET_BROKER_LIST_FAILED; - // TODO: use errstr - return conn; + if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + + if (conn->use_ssl) { + if (!conn->user.empty()) { + // use SSL+SASL + if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; + } else { + // use only SSL + if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl; + } + if (conn->ca_location) { + if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl; + } else { + ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; + } + // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call + // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + + ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; } // set the global callback for delivery success/fail rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); // set the global opaque pointer to be the connection itself - rd_kafka_conf_set_opaque (conn->temp_conf, conn.get()); + rd_kafka_conf_set_opaque(conn->temp_conf, conn.get()); // create the producer conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); - if (conn->producer) { - conn->status = STATUS_CREATE_PRODUCER_FAILED; - // TODO: use errstr + if (!conn->producer) { + conn->status = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; return conn; } + ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; // conf ownership passed to producer conn->temp_conf = nullptr; return conn; -} +conf_error: + conn->status = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; + return conn; +} // utility function to create a new connection -connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) { +connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + const std::string& user, + const std::string& password) { // create connection state - connection_ptr_t conn = new connection_t; - conn->cct = cct; - return create_connection(conn, broker); + connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password)); + return create_connection(conn); } /// struct used for holding messages in the message queue @@ -236,22 +277,6 @@ struct message_wrapper_t { reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} }; -// parse a URL of the form: kafka://[:port] -// to a: host[:port] -int parse_url(const std::string& url, std::string& broker) { - std::regex url_regex ( - R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)", - std::regex::extended - ); - const auto HOST_AND_PORT = 4; - std::smatch url_match_result; - if (std::regex_match(url, url_match_result, url_regex)) { - broker = url_match_result[HOST_AND_PORT]; - return 0; - } - return -1; -} - typedef std::unordered_map ConnectionList; typedef boost::lockfree::queue> MessageQueue; @@ -290,9 +315,9 @@ private: if (!conn->is_ok()) { // connection had an issue while message was in the queue // TODO add error stats - ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl; + ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl; if (message->cb) { - message->cb(STATUS_CONNECTION_CLOSED); + message->cb(conn->status); } return; } @@ -303,11 +328,12 @@ private: if (topic_it == conn->topics.end()) { topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); if (!topic) { - ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl; + const auto err = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; if (message->cb) { - message->cb(STATUS_CREATE_TOPIC_FAILED); + message->cb(err); } - conn->destroy(STATUS_CREATE_TOPIC_FAILED); + conn->destroy(err); return; } // TODO use the topics list as an LRU cache @@ -337,12 +363,13 @@ private: tag); if (rc == -1) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; - // TODO: dont error on full queue, retry instead + ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; + // TODO: dont error on full queue, and don't destroy connection, retry instead // immediatly invoke callback on error if needed if (message->cb) { message->cb(err); } + conn->destroy(err); delete tag; } @@ -352,7 +379,7 @@ private: ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; conn->callbacks.emplace_back(*tag, message->cb); } else { - // immediately invoke callback with error + // immediately invoke callback with error - this is not a connection error ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl; message->cb(STATUS_MAX_INFLIGHT); // tag will be deleted when the global callback is invoked @@ -400,9 +427,10 @@ private: // try to reconnect the connection if it has an error if (!conn->is_ok()) { + ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; const auto& broker = conn_it->first; ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; - if (create_connection(conn, broker)->is_ok() == false) { + if (create_connection(conn)->is_ok() == false) { ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; // TODO: add error counter for failed retries // TODO: add exponential backoff for retries @@ -477,7 +505,10 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url) { + connection_ptr_t connect(const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location) { if (stopped) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; @@ -485,14 +516,25 @@ public: } std::string broker; - if (0 != parse_url(url, broker)) { + std::string user; + std::string password; + if (!parse_url_authority(url, broker, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return nullptr; } + // this should be validated by the regex in parse_url() + ceph_assert(user.empty() == password.empty()); + + if (!user.empty() && !use_ssl) { + ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; + return nullptr; + } + std::lock_guard lock(connections_lock); const auto it = connections.find(broker); + // note that ssl vs. non-ssl connection to the same host are two separate conenctions if (it != connections.end()) { if (it->second->marked_for_deletion) { // TODO: increment counter @@ -510,14 +552,13 @@ public: ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(broker, cct); + const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password); // create_new_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread ceph_assert(conn); ++connection_count; ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; - ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl; return connections.emplace(broker, conn).first->second; } @@ -613,9 +654,10 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url) { +connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, + boost::optional ca_location) { if (!s_manager) return nullptr; - return s_manager->connect(url); + return s_manager->connect(url, use_ssl, verify_ssl, ca_location); } int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 7319679b09ceb..cccdd65b6ab64 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -6,6 +6,7 @@ #include #include #include +#include class CephContext; @@ -30,7 +31,7 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -connection_ptr_t connect(const std::string& url); +connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location); // publish a message over a connection that was already created int publish(connection_ptr_t& conn, @@ -73,5 +74,8 @@ size_t get_max_queue(); // disconnect from a kafka broker bool disconnect(connection_ptr_t& conn); +// display connection as string +std::string to_string(const connection_ptr_t& conn); + } diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index a6f97ea6ff110..ee975700956f0 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -341,19 +341,21 @@ struct rgw_pubsub_sub_dest { std::string push_endpoint; std::string push_endpoint_args; std::string arn_topic; + bool stored_secret = false; void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(bucket_name, bl); encode(oid_prefix, bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); encode(arn_topic, bl); + encode(stored_secret, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(bucket_name, bl); decode(oid_prefix, bl); decode(push_endpoint, bl); @@ -363,6 +365,9 @@ struct rgw_pubsub_sub_dest { if (struct_v >= 3) { decode(arn_topic, bl); } + if (struct_v >= 4) { + decode(stored_secret, bl); + } DECODE_FINISH(bl); } diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 4d0496687ad0f..6230330d4a6a3 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -220,7 +220,7 @@ private: const std::string topic; amqp::connection_ptr_t conn; const std::string message; - const ack_level_t ack_level; // TODO not used for now + [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now public: AckPublishCR(CephContext* cct, @@ -415,6 +415,7 @@ static const std::string AMQP_1_0("1-0"); static const std::string AMQP_SCHEMA("amqp"); #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT + #ifdef WITH_RADOSGW_KAFKA_ENDPOINT class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { private: @@ -423,11 +424,57 @@ private: Broker, }; CephContext* const cct; - const std::string endpoint; const std::string topic; kafka::connection_ptr_t conn; - ack_level_t ack_level; - std::string str_ack_level; + const ack_level_t ack_level; + + static bool get_verify_ssl(const RGWHTTPArgs& args) { + bool exists; + auto str_verify_ssl = args.get("verify-ssl", &exists); + if (!exists) { + // verify server certificate by default + return true; + } + boost::algorithm::to_lower(str_verify_ssl); + if (str_verify_ssl == "true") { + return true; + } + if (str_verify_ssl == "false") { + return false; + } + throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); + } + + static bool get_use_ssl(const RGWHTTPArgs& args) { + bool exists; + auto str_use_ssl = args.get("use-ssl", &exists); + if (!exists) { + // by default ssl not used + return false; + } + boost::algorithm::to_lower(str_use_ssl); + if (str_use_ssl == "true") { + return true; + } + if (str_use_ssl == "false") { + return false; + } + throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl); + } + + static ack_level_t get_ack_level(const RGWHTTPArgs& args) { + bool exists; + // get ack level + const auto str_ack_level = args.get("kafka-ack-level", &exists); + if (!exists || str_ack_level == "broker") { + // "broker" is default + return ack_level_t::Broker; + } + if (str_ack_level == "none") { + return ack_level_t::None; + } + throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); + } // NoAckPublishCR implements async kafka publishing via coroutine // This coroutine ends when it send the message and does not wait for an ack @@ -524,22 +571,11 @@ public: const RGWHTTPArgs& args, CephContext* _cct) : cct(_cct), - endpoint(_endpoint), topic(_topic), - conn(kafka::connect(endpoint)) { + conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) , + ack_level(get_ack_level(args)) { if (!conn) { - throw configuration_error("Kafka: failed to create connection to: " + endpoint); - } - bool exists; - // get ack level - str_ack_level = args.get("kafka-ack-level", &exists); - if (!exists || str_ack_level == "broker") { - // "broker" is default - ack_level = ack_level_t::Broker; - } else if (str_ack_level == "none") { - ack_level = ack_level_t::None; - } else { - throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); + throw configuration_error("Kafka: failed to create connection to: " + _endpoint); } } @@ -638,9 +674,8 @@ public: std::string to_str() const override { std::string str("Kafka Endpoint"); - str += "\nURI: " + endpoint; + str += kafka::to_string(conn); str += "\nTopic: " + topic; - str += "\nAck Level: " + str_ack_level; return str; } }; diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 08d8c544cf158..1f7bce65adf54 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -19,6 +19,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw + // command (AWS compliant): // POST // Action=CreateTopic&Name=[&push-endpoint=[&=]] @@ -27,21 +28,25 @@ public: int get_params() override { topic_name = s->info.args.get("Name"); if (topic_name.empty()) { - ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl; - return -EINVAL; + ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl; + return -EINVAL; } dest.push_endpoint = s->info.args.get("push-endpoint"); + + if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { + return -EINVAL; + } for (const auto param : s->info.args.get_params()) { - if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { - continue; - } - dest.push_endpoint_args.append(param.first+"="+param.second+"&"); + if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { + continue; + } + dest.push_endpoint_args.append(param.first+"="+param.second+"&"); } if (!dest.push_endpoint_args.empty()) { - // remove last separator - dest.push_endpoint_args.pop_back(); + // remove last separator + dest.push_endpoint_args.pop_back(); } // dest object only stores endpoint info @@ -160,8 +165,8 @@ public: const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); if (!topic_arn || topic_arn->resource.empty()) { - ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl; - return -EINVAL; + ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl; + return -EINVAL; } topic_name = topic_arn->resource; @@ -199,21 +204,21 @@ namespace { // ctor and set are done according to the "type" argument // if type is not "key" or "value" its a no-op class Attribute { - std::string key; - std::string value; + std::string key; + std::string value; public: - Attribute(const std::string& type, const std::string& key_or_value) { - set(type, key_or_value); - } - void set(const std::string& type, const std::string& key_or_value) { - if (type == "key") { - key = key_or_value; - } else if (type == "value") { - value = key_or_value; - } + Attribute(const std::string& type, const std::string& key_or_value) { + set(type, key_or_value); + } + void set(const std::string& type, const std::string& key_or_value) { + if (type == "key") { + key = key_or_value; + } else if (type == "value") { + value = key_or_value; } - const std::string& get_key() const { return key; } - const std::string& get_value() const { return value; } + } + const std::string& get_key() const { return key; } + const std::string& get_value() const { return value; } }; using AttributeMap = std::map; @@ -431,11 +436,11 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { if (store->getRados()->get_sync_module()) { const auto psmodule = dynamic_cast(store->getRados()->get_sync_module().get()); if (psmodule) { - const auto& conf = psmodule->get_effective_conf(); - data_bucket_prefix = conf["data_bucket_prefix"]; - data_oid_prefix = conf["data_oid_prefix"]; - // TODO: allow "push-only" on PS zone as well - push_only = false; + const auto& conf = psmodule->get_effective_conf(); + data_bucket_prefix = conf["data_bucket_prefix"]; + data_oid_prefix = conf["data_oid_prefix"]; + // TODO: allow "push-only" on PS zone as well + push_only = false; } } diff --git a/src/rgw/rgw_rest_pubsub_common.cc b/src/rgw/rgw_rest_pubsub_common.cc index 50f567f7fc1f4..30d058f7bdb13 100644 --- a/src/rgw/rgw_rest_pubsub_common.cc +++ b/src/rgw/rgw_rest_pubsub_common.cc @@ -1,12 +1,50 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include "rgw_common.h" #include "rgw_rest_pubsub_common.h" #include "common/dout.h" +#include "rgw_url.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw +bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) { + if (dest.push_endpoint.empty()) { + return true; + } + std::string user; + std::string password; + if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) { + ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl; + return false; + } + // this should be verified inside parse_url() + ceph_assert(user.empty() == password.empty()); + if (!user.empty()) { + dest.stored_secret = true; + if (!rgw_transport_is_secure(cct, env)) { + ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl; + return false; + } + } + return true; +} + +bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) { + return sub.dest.stored_secret; +} + +bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) { + return topic.topic.dest.stored_secret; +} + +bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) { + for (const auto& topic : topics.topics) { + if (topic_has_endpoint_secret(topic.second)) return true; + } + return false; +} void RGWPSCreateTopicOp::execute() { op_ret = get_params(); if (op_ret < 0) { @@ -25,10 +63,17 @@ void RGWPSCreateTopicOp::execute() { void RGWPSListTopicsOp::execute() { ups.emplace(store, s->owner.get_id()); op_ret = ups->get_user_topics(&result); + // if there are no topics it is not considered an error + op_ret = op_ret == -ENOENT ? 0 : op_ret; if (op_ret < 0) { ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl; return; } + if (topics_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) { + ldout(s->cct, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl; + op_ret = -EPERM; + return; + } ldout(s->cct, 20) << "successfully got topics" << dendl; } @@ -39,6 +84,11 @@ void RGWPSGetTopicOp::execute() { } ups.emplace(store, s->owner.get_id()); op_ret = ups->get_topic(topic_name, &result); + if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) { + ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl; + op_ret = -EPERM; + return; + } if (op_ret < 0) { ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -83,6 +133,11 @@ void RGWPSGetSubOp::execute() { ups.emplace(store, s->owner.get_id()); auto sub = ups->get_sub(sub_name); op_ret = sub->get_conf(&result); + if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) { + ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl; + op_ret = -EPERM; + return; + } if (op_ret < 0) { ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl; return; @@ -195,7 +250,7 @@ int RGWPSListNotifsOp::verify_permission() { } if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl; + ldout(s->cct, 1) << "user doesn't own bucket, cannot get notification list" << dendl; return -EPERM; } diff --git a/src/rgw/rgw_rest_pubsub_common.h b/src/rgw/rgw_rest_pubsub_common.h index 6d78ce5ce1aa0..f11c75658f523 100644 --- a/src/rgw/rgw_rest_pubsub_common.h +++ b/src/rgw/rgw_rest_pubsub_common.h @@ -6,6 +6,11 @@ #include "rgw_op.h" #include "rgw_pubsub.h" +// make sure that endpoint is a valid URL +// make sure that if user/password are passed inside URL, it is over secure connection +// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL +bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env); + // create a topic class RGWPSCreateTopicOp : public RGWDefaultResponseOp { protected: diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index b198bb33b19f8..d95b264ea6a2c 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -26,6 +26,10 @@ public: topic_name = s->object.name; dest.push_endpoint = s->info.args.get("push-endpoint"); + + if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { + return -EINVAL; + } dest.push_endpoint_args = s->info.args.get_str(); // dest object only stores endpoint info // bucket to store events/records will be set only when subscription is created @@ -169,9 +173,12 @@ public: const auto& conf = psmodule->get_effective_conf(); dest.push_endpoint = s->info.args.get("push-endpoint"); + if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { + return -EINVAL; + } + dest.push_endpoint_args = s->info.args.get_str(); dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name; dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/"; - dest.push_endpoint_args = s->info.args.get_str(); dest.arn_topic = topic_name; return 0; diff --git a/src/rgw/rgw_url.cc b/src/rgw/rgw_url.cc new file mode 100644 index 0000000000000..24c25378239ff --- /dev/null +++ b/src/rgw/rgw_url.cc @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +namespace rgw { + +namespace { + const auto USER_GROUP_IDX = 3; + const auto PASSWORD_GROUP_IDX = 4; + const auto HOST_GROUP_IDX = 5; + + const std::string schema_re = "([[:alpha:]]+:\\/\\/)"; + const std::string user_pass_re = "(([^:\\s]+):([^@\\s]+)@)?"; + const std::string host_port_re = "([[:alnum:].:-]+)"; +} + +bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password) { + const std::string re = schema_re + user_pass_re + host_port_re; + const std::regex url_regex(re, std::regex::icase); + std::smatch url_match_result; + + if (std::regex_match(url, url_match_result, url_regex)) { + host = url_match_result[HOST_GROUP_IDX]; + user = url_match_result[USER_GROUP_IDX]; + password = url_match_result[PASSWORD_GROUP_IDX]; + return true; + } + + return false; +} + +bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password) { + const std::string re = schema_re + user_pass_re + host_port_re; + const std::regex url_regex(re); + std::smatch url_match_result; + + if (std::regex_match(url, url_match_result, url_regex)) { + user = url_match_result[USER_GROUP_IDX]; + password = url_match_result[PASSWORD_GROUP_IDX]; + return true; + } + + return false; +} +} + diff --git a/src/rgw/rgw_url.h b/src/rgw/rgw_url.h new file mode 100644 index 0000000000000..089401a49a814 --- /dev/null +++ b/src/rgw/rgw_url.h @@ -0,0 +1,12 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +namespace rgw { +// parse a URL of the form: http|https|amqp|amqps|kafka://[user:password@][:port] +bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password); +bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password); +} + diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 4093e4b01d129..5e360fef5455a 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -187,6 +187,12 @@ add_ceph_unittest(unittest_rgw_kms) target_link_libraries(unittest_rgw_kms ${rgw_libs}) +# unittest_rgw_url +add_executable(unittest_rgw_url test_rgw_url.cc) +add_ceph_unittest(unittest_rgw_url) + +target_link_libraries(unittest_rgw_url ${rgw_libs}) + add_executable(ceph_test_rgw_gc_log test_rgw_gc_log.cc $) target_link_libraries(ceph_test_rgw_gc_log ${rgw_libs} radostest-cxx) install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py index 1099664df201b..b03db36735f4d 100644 --- a/src/test/rgw/rgw_multi/conn.py +++ b/src/test/rgw/rgw_multi/conn.py @@ -14,3 +14,17 @@ def get_gateway_connection(gateway, credentials): calling_format = boto.s3.connection.OrdinaryCallingFormat()) return gateway.connection +def get_gateway_secure_connection(gateway, credentials): + """ secure connect to the given gateway """ + if gateway.ssl_port == 0: + return None + if gateway.secure_connection is None: + gateway.secure_connection = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = gateway.host, + port = gateway.ssl_port, + is_secure = True, + validate_certs=False, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + return gateway.secure_connection diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py index 47afe052048a2..f189a50783971 100644 --- a/src/test/rgw/rgw_multi/multisite.py +++ b/src/test/rgw/rgw_multi/multisite.py @@ -3,7 +3,7 @@ from six import StringIO import json -from .conn import get_gateway_connection +from .conn import get_gateway_connection, get_gateway_secure_connection class Cluster: """ interface to run commands against a distinct ceph cluster """ @@ -18,13 +18,14 @@ class Gateway: """ interface to control a single radosgw instance """ __metaclass__ = ABCMeta - def __init__(self, host = None, port = None, cluster = None, zone = None, proto = 'http', connection = None): + def __init__(self, host = None, port = None, cluster = None, zone = None, ssl_port = 0): self.host = host self.port = port self.cluster = cluster self.zone = zone - self.proto = proto - self.connection = connection + self.connection = None + self.secure_connection = None + self.ssl_port = ssl_port @abstractmethod def start(self, args = []): @@ -37,7 +38,7 @@ class Gateway: pass def endpoint(self): - return '%s://%s:%d' % (self.proto, self.host, self.port) + return 'http://%s:%d' % (self.host, self.port) class SystemObject: """ interface for system objects, represented in json format and @@ -181,6 +182,7 @@ class ZoneConn(object): if self.zone.gateways is not None: self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials) + self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials) def get_connection(self): return self.conn diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 7f013e788c46e..31b8233c0316c 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -66,18 +66,6 @@ log = logging.getLogger('rgw_multi.tests') num_buckets = 0 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) -def get_gateway_connection(gateway, credentials): - """ connect to the given gateway """ - if gateway.connection is None: - gateway.connection = boto.connect_s3( - aws_access_key_id = credentials.access_key, - aws_secret_access_key = credentials.secret, - host = gateway.host, - port = gateway.port, - is_secure = False, - calling_format = boto.s3.connection.OrdinaryCallingFormat()) - return gateway.connection - def get_zone_connection(zone, credentials): """ connect to the zone's first gateway """ if isinstance(credentials, list): diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index bedd189bc6308..1470d9a6e54be 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -382,12 +382,16 @@ kafka_server = 'localhost' class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" - def __init__(self, topic): + def __init__(self, topic, security_type): from kafka import KafkaConsumer remaining_retries = 10 + port = 9092 + if security_type != 'PLAINTEXT': + security_type = 'SSL' + port = 9093 while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server) + self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) print('Kafka consumer created on topic: '+topic) break except Exception as error: @@ -414,10 +418,10 @@ def kafka_receiver_thread_runner(receiver): try: log.info('Kafka receiver started') print('Kafka receiver started') - for msg in receiver.consumer: - if receiver.stop: - break - receiver.events.append(json.loads(msg.value)) + while not receiver.stop: + for msg in receiver.consumer: + receiver.events.append(json.loads(msg.value)) + timer.sleep(0.1) log.info('Kafka receiver ended') print('Kafka receiver ended') except Exception as error: @@ -425,9 +429,9 @@ def kafka_receiver_thread_runner(receiver): print('Kafka receiver ended unexpectedly: ' + str(error)) -def create_kafka_receiver_thread(topic): +def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): """create kafka receiver and thread""" - receiver = KafkaReceiver(topic) + receiver = KafkaReceiver(topic, security_type) task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver @@ -435,16 +439,39 @@ def create_kafka_receiver_thread(topic): def stop_kafka_receiver(receiver, task): """stop the receiver thread and wait for it to finis""" receiver.stop = True - task.join(5) + task.join(1) try: receiver.consumer.close() except Exception as error: log.info('failed to gracefuly stop Kafka receiver: %s', str(error)) +# follow the instruction here to create and sign a broker certificate: +# https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka + +# the generated broker certificate should be stored in the java keystore for the use of the server +# assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost" +# following lines must be added to $KAFKA_DIR/config/server.properties +# listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094 +# sasl.enabled.mechanisms=PLAIN +# ssl.keystore.location = $KAFKA_DIR/server.keystore.jks +# ssl.keystore.password = abcdefgh +# ssl.key.password = abcdefgh +# ssl.truststore.location = $KAFKA_DIR/server.truststore.jks +# ssl.truststore.password = abcdefgh + +# notes: +# (1) we dont test client authentication, hence, no need to generate client keys +# (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above +# should be copied to: $KAFKA_DIR + def init_kafka(): """ start kafka/zookeeper """ - KAFKA_DIR = os.environ['KAFKA_DIR'] + try: + KAFKA_DIR = os.environ['KAFKA_DIR'] + except: + KAFKA_DIR = '' + if KAFKA_DIR == '': log.info('KAFKA_DIR must be set to where kafka is installed') print('KAFKA_DIR must be set to where kafka is installed') @@ -468,10 +495,13 @@ def init_kafka(): print('Starting kafka...') kafka_log = open('./kafka.log', 'w') try: + kafka_env = os.environ.copy() + kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf' kafka_proc = subprocess.Popen([ KAFKA_DIR+'bin/kafka-server-start.sh', KAFKA_DIR+'config/server.properties'], - stdout=kafka_log) + stdout=kafka_log, + env=kafka_env) except Exception as error: log.info('failed to execute kafka: %s', str(error)) print('failed to execute kafka: %s' % str(error)) @@ -495,8 +525,18 @@ def clean_kafka(kafka_proc, zk_proc, kafka_log): """ stop kafka/zookeeper """ try: kafka_log.close() + print('Shutdown Kafka...') kafka_proc.terminate() + time.sleep(5) + if kafka_proc.poll() is None: + print('Failed to shutdown Kafka... killing') + kafka_proc.kill() + print('Shutdown zookeeper...') zk_proc.terminate() + time.sleep(5) + if zk_proc.poll() is None: + print('Failed to shutdown zookeeper... killing') + zk_proc.kill() except: log.info('kafka/zookeeper already terminated') @@ -766,8 +806,9 @@ def test_ps_s3_notification(): # delete the bucket zones[0].delete_bucket(bucket_name) + def test_ps_s3_topic_on_master(): - """ test s3 notification set/get/delete on master """ + """ test s3 topics set/get/delete on master """ zones, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() @@ -775,7 +816,7 @@ def test_ps_s3_topic_on_master(): topic_name = bucket_name + TOPIC_SUFFIX # clean all topics - delete_all_s3_topics(zones[0].conn, zonegroup.name) + delete_all_s3_topics(zones[0], zonegroup.name) # create s3 topics endpoint_address = 'amqp://127.0.0.1:7001' @@ -814,8 +855,9 @@ def test_ps_s3_topic_on_master(): assert_equal(status, 404) # get the remaining 2 topics - result = topic_conf1.get_list() - assert_equal(len(result['Topics']), 2) + result, status = topic_conf1.get_list() + assert_equal(status, 200) + assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2) # delete topics result = topic_conf2.del_config() @@ -826,8 +868,58 @@ def test_ps_s3_topic_on_master(): # assert_equal(status, 200) # get topic list, make sure it is empty - result = topic_conf1.get_list() - assert_equal(len(result['Topics']), 0) + result, status = topic_conf1.get_list() + assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None) + + +def test_ps_s3_topic_with_secret_on_master(): + """ test s3 topics with secret set/get/delete on master """ + zones, _ = init_env(require_ps=False) + if zones[0].secure_conn is None: + return SkipTest('secure connection is needed to test topic with secrets') + + realm = get_realm() + zonegroup = realm.master_zonegroup() + bucket_name = gen_bucket_name() + topic_name = bucket_name + TOPIC_SUFFIX + + # clean all topics + delete_all_s3_topics(zones[0], zonegroup.name) + + # create s3 topics + endpoint_address = 'amqp://user:password@127.0.0.1:7001' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' + bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + try: + result = bad_topic_conf.set_config() + except Exception as err: + print 'Error is expected: ' + str(err) + else: + assert False, 'user password configuration set allowed only over HTTPS' + + topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + + assert_equal(topic_arn, + 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) + + _, status = bad_topic_conf.get_config() + assert_equal(status/100, 4) + + # get topic + result, status = topic_conf.get_config() + assert_equal(status, 200) + assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) + assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) + + _, status = bad_topic_conf.get_config() + assert_equal(status/100, 4) + + _, status = topic_conf.get_list() + assert_equal(status/100, 2) + + # delete topics + result = topic_conf.del_config() def test_ps_s3_notification_on_master(): @@ -1505,6 +1597,112 @@ def test_ps_s3_notification_push_kafka_on_master(): clean_kafka(kafka_proc, zk_proc, kafka_log) +def kafka_security(security_type): + """ test pushing kafka s3 notification on master """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + zones, _ = init_env(require_ps=False) + if security_type == 'SSL_SASL' and zones[0].secure_conn is None: + return SkipTest("secure connection is needed to test SASL_SSL security") + kafka_proc, zk_proc, kafka_log = init_kafka() + if kafka_proc is None or zk_proc is None: + return SkipTest('end2end kafka tests require kafka/zookeeper installed') + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + # name is constant for manual testing + topic_name = bucket_name+'_topic' + # create consumer on the topic + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + + # create s3 topic + if security_type == 'SSL_SASL': + endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + else: + # ssl only + endpoint_address = 'kafka://' + kafka_server + ':9093' + + KAFKA_DIR = os.environ['KAFKA_DIR'] + + # without acks from broker, with root CA + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt' + + if security_type == 'SSL_SASL': + topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + else: + topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf.set_config() + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print 'average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds' + + try: + print 'wait for 5sec for the messages...' + time.sleep(5) + keys = list(bucket.list()) + receiver.verify_s3_events(keys, exact_match=True) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print 'average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds' + + print 'wait for 5sec for the messages...' + time.sleep(5) + receiver.verify_s3_events(keys, exact_match=True, deletions=True) + except Exception as err: + assert False, str(err) + finally: + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + for key in bucket.list(): + key.delete() + zones[0].delete_bucket(bucket_name) + stop_kafka_receiver(receiver, task) + clean_kafka(kafka_proc, zk_proc, kafka_log) + + +def test_ps_s3_notification_push_kafka_security_ssl(): + kafka_security('SSL') + +def test_ps_s3_notification_push_kafka_security_ssl_sasl(): + kafka_security('SSL_SASL') + + def test_ps_s3_notification_push_http_on_master(): """ test pushing http s3 notification on master """ if skip_push_tests: diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index cfdf480ee0b5d..a67a2fee43b0d 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -1,5 +1,6 @@ import logging import httplib +import ssl import urllib import urlparse import hmac @@ -130,22 +131,24 @@ class PSTopic: return self.send_request('GET', get_list=True) -def delete_all_s3_topics(conn, region): +def delete_all_s3_topics(zone, region): try: + conn = zone.secure_conn if zone.secure_conn is not None else zone.conn + protocol = 'https' if conn.is_secure else 'http' client = boto3.client('sns', - endpoint_url='http://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key, - region_name=region, - config=Config(signature_version='s3')) + endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + region_name=region, + verify='./cert.pem', + config=Config(signature_version='s3')) topics = client.list_topics()['Topics'] for topic in topics: print('topic cleanup, deleting: ' + topic['TopicArn']) assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200 - except: - print('failed to do topic cleanup. if there are topics ' - 'they may need to be manually deleted') + except Exception as err: + print 'failed to do topic cleanup: ' + str(err) class PSTopicS3: @@ -163,12 +166,14 @@ class PSTopicS3: self.attributes = {} if endpoint_args is not None: self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)} - self.client = boto3.client('sns', - endpoint_url='http://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key, - region_name=region, - config=Config(signature_version='s3')) + protocol = 'https' if conn.is_secure else 'http' + self.client = boto3.client('sns', + endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + region_name=region, + verify='./cert.pem', + config=Config(signature_version='s3')) def get_config(self): @@ -188,9 +193,11 @@ class PSTopicS3: 'Date': string_date, 'Host': self.conn.host+':'+str(self.conn.port), 'Content-Type': content_type} - http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port) - if log.getEffectiveLevel() <= 10: - http_conn.set_debuglevel(5) + if self.conn.is_secure: + http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port, + context=ssl.create_default_context(cafile='./cert.pem')) + else: + http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port) http_conn.request(method, resource, body, headers) response = http_conn.getresponse() data = response.read() @@ -212,7 +219,34 @@ class PSTopicS3: def get_list(self): """list all topics""" - return self.client.list_topics() + # note that boto3 supports list_topics(), however, the result only show ARNs + parameters = {'Action': 'ListTopics'} + body = urllib.urlencode(parameters) + string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()) + content_type = 'application/x-www-form-urlencoded; charset=utf-8' + resource = '/' + method = 'POST' + string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource + log.debug('StringTosign: %s', string_to_sign) + signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key, + string_to_sign.encode('utf-8'), + hashlib.sha1).digest()) + headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature, + 'Date': string_date, + 'Host': self.conn.host+':'+str(self.conn.port), + 'Content-Type': content_type} + if self.conn.is_secure: + http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port, + context=ssl.create_default_context(cafile='./cert.pem')) + else: + http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port) + http_conn.request(method, resource, body, headers) + response = http_conn.getresponse() + data = response.read() + status = response.status + http_conn.close() + dict_response = xmltodict.parse(data) + return dict_response, status class PSNotification: diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index 53ac815d94235..af45f3ab000a6 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -97,7 +97,7 @@ class Gateway(multisite.Gateway): # e.g. RGW_FRONTEND=civetweb # to run test under valgrind memcheck, set RGW_VALGRIND to 'yes' # e.g. RGW_VALGRIND=yes - cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)] + cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port), str(self.ssl_port)] if self.id: cmd += ['-i', self.id] cmd += ['--debug-rgw=20', '--debug-ms=1'] @@ -183,6 +183,7 @@ def init(parse_args): 'checkpoint_retries': 60, 'checkpoint_delay': 5, 'reconfigure_delay': 5, + 'use_ssl': 'false', }) try: path = os.environ['RGW_MULTI_TEST_CONF'] @@ -213,6 +214,7 @@ def init(parse_args): parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay')) parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay')) parser.add_argument('--num-ps-zones', type=int, default=cfg.getint(section, 'num_ps_zones')) + parser.add_argument('--use-ssl', type=bool, default=cfg.getboolean(section, 'use_ssl')) es_cfg = [] @@ -269,10 +271,30 @@ def init(parse_args): num_az_zones = cfg.getint(section, 'num_az_zones') num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf - print('num_ps_zones = ' + str(num_ps_zones)) num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones + num_az_zones + use_ssl = cfg.getboolean(section, 'use_ssl') + + if use_ssl and bootstrap: + cmd = ['openssl', 'req', + '-x509', + '-newkey', 'rsa:4096', + '-sha256', + '-nodes', + '-keyout', 'key.pem', + '-out', 'cert.pem', + '-subj', '/CN=localhost', + '-days', '3650'] + bash(cmd) + # append key to cert + fkey = open('./key.pem', 'r') + if fkey.mode == 'r': + fcert = open('./cert.pem', 'a') + fcert.write(fkey.read()) + fcert.close() + fkey.close() + for zg in range(0, args.num_zonegroups): zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period) period.zonegroups.append(zonegroup) @@ -362,11 +384,13 @@ def init(parse_args): if bootstrap: period.update(zone, commit=True) + ssl_port_offset = 1000 # start the gateways for g in range(0, args.gateways_per_zone): port = gateway_port(zg, g + z * args.gateways_per_zone) client_id = gateway_name(zg, z, g) - gateway = Gateway(client_id, 'localhost', port, cluster, zone) + gateway = Gateway(client_id, 'localhost', port, cluster, zone, + ssl_port = port+ssl_port_offset if use_ssl else 0) if bootstrap: gateway.start() zone.gateways.append(gateway) diff --git a/src/test/rgw/test_rgw_url.cc b/src/test/rgw/test_rgw_url.cc new file mode 100644 index 0000000000000..8422bca1b409c --- /dev/null +++ b/src/test/rgw/test_rgw_url.cc @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw/rgw_url.h" +#include +#include + +using namespace rgw; + +TEST(TestURL, SimpleAuthority) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://example.com"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + ASSERT_TRUE(user.empty()); + ASSERT_TRUE(password.empty()); + EXPECT_STREQ(host.c_str(), "example.com"); +} + +TEST(TestURL, IPAuthority) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://1.2.3.4"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + ASSERT_TRUE(user.empty()); + ASSERT_TRUE(password.empty()); + EXPECT_STREQ(host.c_str(), "1.2.3.4"); +} + +TEST(TestURL, IPv6Authority) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://FE80:CD00:0000:0CDE:1257:0000:211E:729C"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + ASSERT_TRUE(user.empty()); + ASSERT_TRUE(password.empty()); + EXPECT_STREQ(host.c_str(), "FE80:CD00:0000:0CDE:1257:0000:211E:729C"); +} + +TEST(TestURL, AuthorityWithUserinfo) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://user:password@example.com"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + EXPECT_STREQ(host.c_str(), "example.com"); + EXPECT_STREQ(user.c_str(), "user"); + EXPECT_STREQ(password.c_str(), "password"); +} + +TEST(TestURL, AuthorityWithPort) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://user:password@example.com:1234"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + EXPECT_STREQ(host.c_str(), "example.com:1234"); + EXPECT_STREQ(user.c_str(), "user"); + EXPECT_STREQ(password.c_str(), "password"); +} + +TEST(TestURL, DifferentSchema) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "kafka://example.com"; + ASSERT_TRUE(parse_url_authority(url, host, user, password)); + ASSERT_TRUE(user.empty()); + ASSERT_TRUE(password.empty()); + EXPECT_STREQ(host.c_str(), "example.com"); +} + +TEST(TestURL, InvalidHost) +{ + std::string host; + std::string user; + std::string password; + const std::string url = "http://exa_mple.com"; + ASSERT_FALSE(parse_url_authority(url, host, user, password)); +} + -- 2.39.5