]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: add ssl+sasl security to kafka 31834/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 3 Nov 2019 18:58:58 +0000 (20:58 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 8 Jan 2020 12:11:53 +0000 (14:11 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
22 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/mrgw.sh
src/rgw/CMakeLists.txt
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_pubsub_common.h
src/rgw/rgw_sync_module_pubsub_rest.cc
src/rgw/rgw_url.cc [new file with mode: 0644]
src/rgw/rgw_url.h [new file with mode: 0644]
src/test/rgw/CMakeLists.txt
src/test/rgw/rgw_multi/conn.py
src/test/rgw/rgw_multi/multisite.py
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py
src/test/rgw/test_multi.py
src/test/rgw/test_rgw_url.cc [new file with mode: 0644]

index dd6762e10a56946c074ffed25f601318d0915ffa..43bcced787c7b614a142b3ec1685870dc9e64a29 100644 (file)
@@ -68,8 +68,10 @@ To update a topic, use the same command used for topic creation, with the topic
    &push-endpoint=<endpoint>
    [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
    [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
-   [&Attributes.entry.3.key=verify-sll&Attributes.entry.3.value=true|false]
+   [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
    [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
+   [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
+   [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
 
 Request parameters:
 
@@ -83,7 +85,8 @@ Request parameters:
 - AMQP0.9.1 endpoint
 
  - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to : guest/guest
+ - user/password defaults to: guest/guest
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
  - port defaults to: 5672
  - vhost defaults to: "/"
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
@@ -94,7 +97,11 @@ Request parameters:
 
 - Kafka endpoint 
 
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
  - port defaults to: 9092
  - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
@@ -161,6 +168,7 @@ Response will have the following format:
 - User: name of the user that created the topic
 - Name: name of the topic
 - EndPoinjtAddress: the push-endpoint URL
+- if endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not 
 - EndPointArgs: the push-endpoint args
 - EndpointTopic: the topic name that should be sent to the endpoint (mat be different than the above topic name)
 - TopicArn: topic ARN
@@ -219,6 +227,8 @@ Response will have the following format:
         </ResponseMetadata>
     </ListTopicsResponse>    
 
+- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not 
+
 Notifications
 ~~~~~~~~~~~~~
 
index 32ff8bf7448a1dd775db9dd828baa0a108ba0e97..a727ad72bcc82e0abdb2b564a294bac100362fa3 100644 (file)
@@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic
 
 ::
 
-   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
+   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
 
 Request parameters:
 
@@ -167,7 +167,8 @@ The endpoint URI may include parameters depending with the type of endpoint:
 - AMQP0.9.1 endpoint
 
  - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to : guest/guest
+ - user/password defaults to: guest/guest
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
  - port defaults to: 5672
  - vhost defaults to: "/"
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
@@ -178,7 +179,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
 
 - Kafka endpoint 
 
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
  - port defaults to: 9092
  - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
@@ -212,7 +217,8 @@ Response will have the following format (JSON):
                "bucket_name":"",
                "oid_prefix":"",
                "push_endpoint":"",
-               "push_endpoint_args":""
+               "push_endpoint_args":"",
+               "push_endpoint_topic":""
            },
            "arn":""
        },
@@ -224,7 +230,9 @@ Response will have the following format (JSON):
 - dest.bucket_name: not used
 - dest.oid_prefix: not used
 - dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
+- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not 
 - dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
+- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
 - topic.arn: topic ARN
 - subs: list of subscriptions associated with this topic
 
@@ -246,6 +254,8 @@ List all topics that user defined.
 
    GET /topics
  
+- if push-endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not 
+
 S3-Compliant Notifications
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -316,7 +326,8 @@ Response will have the following format (JSON):
                "bucket_name":"",
                "oid_prefix":"",
                "push_endpoint":"",
-               "push_endpoint_args":""
+               "push_endpoint_args":"",
+               "push_endpoint_topic":""
             }
             "arn":""
          },
@@ -334,7 +345,7 @@ Creates a new subscription.
 
 ::
 
-   PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker]]
+   PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
 
 Request parameters:
 
@@ -363,7 +374,10 @@ The endpoint URI may include parameters depending with the type of endpoint:
 
 - Kafka endpoint 
 
- - URI: ``kafka://<fqdn>[:<port]``
+ - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
+ - if ``ca-location`` is provided, secure connection will be used for connection with the broker
+ - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided together with ``ca-location``. Topic creation request will be rejected if not
  - port defaults to: 9092
  - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
 
@@ -392,7 +406,8 @@ Response will have the following format (JSON):
            "bucket_name":"",
            "oid_prefix":"",
            "push_endpoint":"",
-           "push_endpoint_args":""
+           "push_endpoint_args":"",
+           "push_endpoint_topic":""
        }
        "s3_id":""
    }             
@@ -400,6 +415,13 @@ Response will have the following format (JSON):
 - user: name of the user that created the subscription
 - name: name of the subscription
 - topic: name of the topic the subscription is associated with
+- dest.bucket_name: name of the bucket storing the events
+- dest.oid_prefix: oid prefix for the events stored in the bucket
+- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
+- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not 
+- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
+- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
+- s3_id: in case of S3-compliant notifications, this will hold the notification name that created the subscription
 
 Delete Subscription
 ```````````````````
index 972d8099c166e0fe41b3c4d91e23285ac7d93c5e..05739bf015ebc3b2204e394ee5aa252dc4e38756 100755 (executable)
@@ -3,40 +3,52 @@
 set -e
 
 rgw_frontend=${RGW_FRONTEND:-"beast"}
-script_root=`dirname $0`
-script_root=`(cd $script_root;pwd)`
+script_root=$(dirname "$0")
+script_root=$(cd "$script_root" && pwd)
 [ -z "$BUILD_DIR" ] && BUILD_DIR=build
 if [ -e CMakeCache.txt ]; then
     script_root=$PWD
-elif [ -e $script_root/../${BUILD_DIR}/CMakeCache.txt ]; then
-    cd $script_root/../${BUILD_DIR}
+elif [ -e "$script_root"/../${BUILD_DIR}/CMakeCache.txt ]; then
+    cd "$script_root"/../${BUILD_DIR}
     script_root=$PWD
 fi
-ceph_bin=$script_root/bin
-vstart_path=`dirname $0`
+#ceph_bin=$script_root/bin
+vstart_path=$(dirname "$0")
 
-[ "$#" -lt 2 ] && echo "usage: $0 <name> <port> [params...]" && exit 1
+[ "$#" -lt 3 ] && echo "usage: $0 <name> <port> <ssl-port> [params...]" && exit 1
 
 name=$1
 port=$2
+ssl_port=$3
+cert_param=""
+port_param="port=$port"
+
+if [ "$ssl_port" -gt 0 ]; then
+    cert_param="ssl_certificate=./cert.pem"
+    if [ "$rgw_frontend" = "civetweb" ]; then
+        port_param="port=${port} port=${ssl_port}s"
+    else
+        port_param="port=${port} ssl_port=${ssl_port}"
+    fi
+fi
 
-if [ ! -z "$RGW_FRONTEND_THREADS" ]; then
+if [ -n "$RGW_FRONTEND_THREADS" ]; then
     set_frontend_threads="num_threads=$RGW_FRONTEND_THREADS"
 fi
 
-shift 2
+shift 3
 
 run_root=$script_root/run/$name
 pidfile=$run_root/out/radosgw.${port}.pid
 asokfile=$run_root/out/radosgw.${port}.asok
 logfile=$run_root/out/radosgw.${port}.log
 
-$vstart_path/mstop.sh $name radosgw $port
+"$vstart_path"/mstop.sh "$name" radosgw "$port"
 
-$vstart_path/mrun $name ceph -c $run_root/ceph.conf \
-       -k $run_root/keyring auth get-or-create client.rgw.$port mon \
-       'allow rw' osd 'allow rwx' mgr 'allow rw' >> $run_root/keyring
+"$vstart_path"/mrun "$name" ceph -c "$run_root"/ceph.conf \
+       -k "$run_root"/keyring auth get-or-create client.rgw."$port" mon \
+       'allow rw' osd 'allow rwx' mgr 'allow rw' >> "$run_root"/keyring
 
-$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port $set_frontend_threads" \
-       -n client.rgw.$port --pid-file=$pidfile \
-       --admin-socket=$asokfile "$@" --log-file=$logfile
+"$vstart_path"/mrun "$name" radosgw --rgw-frontends="$rgw_frontend $port_param $set_frontend_threads $cert_param" \
+       -n client.rgw."$port" --pid-file="$pidfile" \
+       --admin-socket="$asokfile" "$@" --log-file="$logfile"
index d36d57adad604de456d20f0725fc0a389eb0aed5..1c95aed660d7e19d6626633dc0cb72644d29c0c5 100644 (file)
@@ -142,7 +142,8 @@ set(librgw_common_srcs
   rgw_perf_counters.cc
   rgw_rest_iam.cc
   rgw_object_lock.cc
-  rgw_kms.cc)
+  rgw_kms.cc
+  rgw_url.cc)
 
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   list(APPEND librgw_common_srcs rgw_amqp.cc)
index 4f7751ae6c6b3906d3870a66122fd0182700fff2..dfaefdfb270710f9165b23f7fe872f30398b131d 100644 (file)
@@ -1,13 +1,12 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab ft=cpp
 
-//#include "include/compat.h"
 #include "rgw_kafka.h"
+#include "rgw_url.h"
 #include <librdkafka/rdkafka.h>
 #include "include/ceph_assert.h"
 #include <sstream>
 #include <cstring>
-#include <regex>
 #include <unordered_map>
 #include <string>
 #include <vector>
@@ -32,17 +31,14 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
 namespace rgw::kafka {
 
 // status codes for publishing
+// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
 static const int STATUS_CONNECTION_CLOSED =      -0x1002;
 static const int STATUS_QUEUE_FULL =             -0x1003;
 static const int STATUS_MAX_INFLIGHT =           -0x1004;
 static const int STATUS_MANAGER_STOPPED =        -0x1005;
 // status code for connection opening
-static const int STATUS_CONF_ALLOC_FAILED =      -0x2001;
-static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002;
-static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003;
+static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
 
-static const int STATUS_CREATE_TOPIC_FAILED =    -0x3008;
-static const int NO_REPLY_CODE =                 0x0;
 static const int STATUS_OK =                     0x0;
 
 // struct for holding the callback and its tag in the callback list
@@ -70,8 +66,14 @@ struct connection_t {
   uint64_t delivery_tag = 1;
   int status;
   mutable std::atomic<int> ref_count = 0;
-  CephContext* cct = nullptr;
+  CephContext* const cct;
   CallbackList callbacks;
+  const std::string broker;
+  const bool use_ssl;
+  const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
+  const boost::optional<std::string> ca_location;
+  const std::string user;
+  const std::string password;
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -102,6 +104,12 @@ struct connection_t {
     return (producer != nullptr && !marked_for_deletion);
   }
 
+  // ctor for setting immutable values
+  connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, 
+          const boost::optional<const std::string&>& _ca_location,
+          const std::string& _user, const std::string& _password) :
+      cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
+
   // dtor also destroys the internals
   ~connection_t() {
     destroy(STATUS_CONNECTION_CLOSED);
@@ -111,6 +119,13 @@ struct connection_t {
   friend void intrusive_ptr_release(const connection_t* p);
 };
 
+std::string to_string(const connection_ptr_t& conn) {
+    std::string str;
+    str += "\nBroker: " + conn->broker; 
+    str += conn->use_ssl ? "\nUse SSL" : ""; 
+    str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
+    return str;
+}
 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
 void intrusive_ptr_add_ref(const connection_t* p) {
   ++p->ref_count;
@@ -124,6 +139,8 @@ void intrusive_ptr_release(const connection_t* p) {
 // convert int status to string - including RGW specific values
 std::string status_to_string(int s) {
   switch (s) {
+    case STATUS_OK:
+        return "STATUS_OK";
     case STATUS_CONNECTION_CLOSED:
       return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
     case STATUS_QUEUE_FULL:
@@ -134,13 +151,8 @@ std::string status_to_string(int s) {
       return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
     case STATUS_CONF_ALLOC_FAILED:
       return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
-    case STATUS_CREATE_PRODUCER_FAILED:
-      return "STATUS_CREATE_PRODUCER_FAILED";
-    case STATUS_CREATE_TOPIC_FAILED:
-      return "STATUS_CREATE_TOPIC_FAILED";
   }
-  // TODO: how to handle "s" in this case? 
-  return std::string(rd_kafka_err2str(rd_kafka_last_error()));
+  return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
 }
 
 void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
@@ -160,7 +172,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
   const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
   if (tag_it != callbacks_end) {
       ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << 
-          *tag << " and result=" << result << dendl;
+          *tag << " and result=" << rd_kafka_err2str(result) << dendl;
       tag_it->cb(result);
       conn->callbacks.erase(tag_it);
   } else {
@@ -173,14 +185,13 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
 }
 
 // utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) {
+connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // pointer must be valid and not marked for deletion
   ceph_assert(conn && !conn->marked_for_deletion);
   
   // reset all status codes
   conn->status = STATUS_OK; 
-
-  char errstr[512];
+  char errstr[512] = {0};
 
   conn->temp_conf = rd_kafka_conf_new();
   if (!conn->temp_conf) {
@@ -189,38 +200,68 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& b
   }
 
   // get list of brokers based on the bootsrap broker
-  if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
-    conn->status = STATUS_GET_BROKER_LIST_FAILED;
-    // TODO: use errstr
-    return conn;
+  if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+  if (conn->use_ssl) {
+    if (!conn->user.empty()) {
+      // use SSL+SASL
+      if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
+    } else {
+      // use only SSL
+      if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
+    }
+    if (conn->ca_location) {
+      if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
+    } else {
+      ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
+    }
+    // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
+    // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+    ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
   }
 
   // set the global callback for delivery success/fail
   rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
 
   // set the global opaque pointer to be the connection itself
-  rd_kafka_conf_set_opaque (conn->temp_conf, conn.get());
+  rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
 
   // create the producer
   conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
-  if (conn->producer) {
-    conn->status = STATUS_CREATE_PRODUCER_FAILED;
-    // TODO: use errstr
+  if (!conn->producer) {
+    conn->status = rd_kafka_last_error();
+    ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
     return conn;
   }
+  ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
 
   // conf ownership passed to producer
   conn->temp_conf = nullptr;
   return conn;
-}
 
+conf_error:
+  conn->status = rd_kafka_last_error();
+  ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
+  return conn;
+}
 
 // utility function to create a new connection
-connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) { 
+connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
+        bool use_ssl,
+        bool verify_ssl,
+        boost::optional<const std::string&> ca_location, 
+        const std::string& user, 
+        const std::string& password) { 
   // create connection state
-  connection_ptr_t conn = new connection_t;
-  conn->cct = cct;
-  return create_connection(conn, broker);
+  connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
+  return create_connection(conn);
 }
 
 /// struct used for holding messages in the message queue
@@ -236,22 +277,6 @@ struct message_wrapper_t {
       reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
 };
 
-// parse a URL of the form: kafka://<host>[:port]
-// to a: host[:port]
-int parse_url(const std::string& url, std::string& broker) {
-  std::regex url_regex (
-    R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)",
-    std::regex::extended
-  );
-  const auto HOST_AND_PORT = 4;
-  std::smatch url_match_result;
-  if (std::regex_match(url, url_match_result, url_regex)) {
-    broker = url_match_result[HOST_AND_PORT];
-       return 0;
-  }
-  return -1;
-}
-
 typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
@@ -290,9 +315,9 @@ private:
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
-      ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl;
+      ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
       if (message->cb) {
-        message->cb(STATUS_CONNECTION_CLOSED);
+        message->cb(conn->status);
       }
       return;
     }
@@ -303,11 +328,12 @@ private:
     if (topic_it == conn->topics.end()) {
       topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
       if (!topic) {
-        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl;
+        const auto err = rd_kafka_last_error();
+        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
         if (message->cb) {
-          message->cb(STATUS_CREATE_TOPIC_FAILED);
+          message->cb(err);
         }
-        conn->destroy(STATUS_CREATE_TOPIC_FAILED);
+        conn->destroy(err);
         return;
       }
       // TODO use the topics list as an LRU cache
@@ -337,12 +363,13 @@ private:
             tag);
     if (rc == -1) {
       const auto err = rd_kafka_last_error();
-      ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
-      // TODO: dont error on full queue, retry instead
+      ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+      // TODO: dont error on full queue, and don't destroy connection, retry instead
       // immediatly invoke callback on error if needed
       if (message->cb) {
         message->cb(err);
       }
+      conn->destroy(err);
       delete tag;
     }
    
@@ -352,7 +379,7 @@ private:
         ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
         conn->callbacks.emplace_back(*tag, message->cb);
       } else {
-        // immediately invoke callback with error
+        // immediately invoke callback with error - this is not a connection error
         ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
         message->cb(STATUS_MAX_INFLIGHT);
         // tag will be deleted when the global callback is invoked
@@ -400,9 +427,10 @@ private:
 
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
+          ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
           const auto& broker = conn_it->first;
           ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
-          if (create_connection(conn, broker)->is_ok() == false) {
+          if (create_connection(conn)->is_ok() == false) {
             ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
             // TODO: add error counter for failed retries
             // TODO: add exponential backoff for retries
@@ -477,7 +505,10 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url) {
+  connection_ptr_t connect(const std::string& url, 
+          bool use_ssl,
+          bool verify_ssl,
+          boost::optional<const std::string&> ca_location) {
     if (stopped) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
@@ -485,14 +516,25 @@ public:
     }
 
     std::string broker;
-    if (0 != parse_url(url, broker)) {
+       std::string user;
+       std::string password;
+    if (!parse_url_authority(url, broker, user, password)) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
       return nullptr;
     }
 
+    // this should be validated by the regex in parse_url()
+    ceph_assert(user.empty() == password.empty());
+
+       if (!user.empty() && !use_ssl) {
+      ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
+      return nullptr;
+       }
+
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(broker);
+    // note that ssl vs. non-ssl connection to the same host are two separate conenctions
     if (it != connections.end()) {
       if (it->second->marked_for_deletion) {
         // TODO: increment counter
@@ -510,14 +552,13 @@ public:
       ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(broker, cct);
+    const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
     // create_new_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
     ceph_assert(conn);
     ++connection_count;
     ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
-    ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl;
     return connections.emplace(broker, conn).first->second;
   }
 
@@ -613,9 +654,10 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url) {
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url);
+  return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
 }
 
 int publish(connection_ptr_t& conn, 
index 7319679b09ceb6782647df616bbae9995c7bd270..cccdd65b6ab6487bb70bf51ce6b9ffc27bebd5af 100644 (file)
@@ -6,6 +6,7 @@
 #include <string>
 #include <functional>
 #include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
 
 class CephContext;
 
@@ -30,7 +31,7 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to a kafka endpoint
-connection_ptr_t connect(const std::string& url);
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location);
 
 // publish a message over a connection that was already created
 int publish(connection_ptr_t& conn,
@@ -73,5 +74,8 @@ size_t get_max_queue();
 // disconnect from a kafka broker
 bool disconnect(connection_ptr_t& conn);
 
+// display connection as string
+std::string to_string(const connection_ptr_t& conn);
+
 }
 
index a6f97ea6ff1102a9efe65c851e87434942afa8f1..ee975700956f03147c36ab01bf1a74fbf45ae144 100644 (file)
@@ -341,19 +341,21 @@ struct rgw_pubsub_sub_dest {
   std::string push_endpoint;
   std::string push_endpoint_args;
   std::string arn_topic;
+  bool stored_secret = false;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(bucket_name, bl);
     encode(oid_prefix, bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
+    encode(stored_secret, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     decode(bucket_name, bl);
     decode(oid_prefix, bl);
     decode(push_endpoint, bl);
@@ -363,6 +365,9 @@ struct rgw_pubsub_sub_dest {
     if (struct_v >= 3) {
         decode(arn_topic, bl);
     }
+    if (struct_v >= 4) {
+        decode(stored_secret, bl);
+    }
     DECODE_FINISH(bl);
   }
 
index 4d0496687ad0ff84da0e83077c5f64369c9133f2..6230330d4a6a32440a0bfe485b6b1c40670e6e7e 100644 (file)
@@ -220,7 +220,7 @@ private:
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
-    const ack_level_t ack_level; // TODO not used for now
+    [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now
 
   public:
     AckPublishCR(CephContext* cct,
@@ -415,6 +415,7 @@ static const std::string AMQP_1_0("1-0");
 static const std::string AMQP_SCHEMA("amqp");
 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
 
+
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
 private:
@@ -423,11 +424,57 @@ private:
     Broker,
   };
   CephContext* const cct;
-  const std::string endpoint;
   const std::string topic;
   kafka::connection_ptr_t conn;
-  ack_level_t ack_level;
-  std::string str_ack_level;
+  const ack_level_t ack_level;
+
+  static bool get_verify_ssl(const RGWHTTPArgs& args) {
+    bool exists;
+    auto str_verify_ssl = args.get("verify-ssl", &exists);
+    if (!exists) {
+      // verify server certificate by default
+      return true;
+    }
+    boost::algorithm::to_lower(str_verify_ssl);
+    if (str_verify_ssl == "true") {
+      return true;
+    }
+    if (str_verify_ssl == "false") {
+      return false;
+    }
+    throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
+  }
+
+  static bool get_use_ssl(const RGWHTTPArgs& args) {
+    bool exists;
+    auto str_use_ssl = args.get("use-ssl", &exists);
+    if (!exists) {
+      // by default ssl not used
+      return false;
+    }
+    boost::algorithm::to_lower(str_use_ssl);
+    if (str_use_ssl == "true") {
+      return true;
+    }
+    if (str_use_ssl == "false") {
+      return false;
+    }
+    throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
+  }
+
+  static ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+    bool exists;
+    // get ack level
+    const auto str_ack_level = args.get("kafka-ack-level", &exists);
+    if (!exists || str_ack_level == "broker") {
+      // "broker" is default
+      return ack_level_t::Broker;
+    }
+    if (str_ack_level == "none") {
+      return ack_level_t::None;
+    }
+    throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+  }
 
   // NoAckPublishCR implements async kafka publishing via coroutine
   // This coroutine ends when it send the message and does not wait for an ack
@@ -524,22 +571,11 @@ public:
       const RGWHTTPArgs& args,
       CephContext* _cct) : 
         cct(_cct),
-        endpoint(_endpoint), 
         topic(_topic),
-        conn(kafka::connect(endpoint)) {
+        conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
+        ack_level(get_ack_level(args)) {
     if (!conn) { 
-      throw configuration_error("Kafka: failed to create connection to: " + endpoint);
-    }
-    bool exists;
-    // get ack level
-    str_ack_level = args.get("kafka-ack-level", &exists);
-    if (!exists || str_ack_level == "broker") {
-      // "broker" is default
-      ack_level = ack_level_t::Broker;
-    } else if (str_ack_level == "none") {
-      ack_level = ack_level_t::None;
-    } else {
-      throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
+      throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
     }
   }
 
@@ -638,9 +674,8 @@ public:
 
   std::string to_str() const override {
     std::string str("Kafka Endpoint");
-    str += "\nURI: " + endpoint;
+    str += kafka::to_string(conn);
     str += "\nTopic: " + topic;
-    str += "\nAck Level: " + str_ack_level;
     return str;
   }
 };
index 08d8c544cf1589d0d26f0d0dd56e70cedb30e1f2..1f7bce65adf544d6e32a31accf1d1e383ff484d0 100644 (file)
@@ -19,6 +19,7 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+
 // command (AWS compliant): 
 // POST
 // Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
@@ -27,21 +28,25 @@ public:
   int get_params() override {
     topic_name = s->info.args.get("Name");
     if (topic_name.empty()) {
-        ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
-        return -EINVAL;
+      ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+      return -EINVAL;
     }
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
+
+    if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+      return -EINVAL;
+    }
     for (const auto param : s->info.args.get_params()) {
-        if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
-            continue;
-        }
-        dest.push_endpoint_args.append(param.first+"="+param.second+"&");
+      if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
+        continue;
+      }
+      dest.push_endpoint_args.append(param.first+"="+param.second+"&");
     }
 
     if (!dest.push_endpoint_args.empty()) {
-        // remove last separator
-        dest.push_endpoint_args.pop_back();
+      // remove last separator
+      dest.push_endpoint_args.pop_back();
     }
     
     // dest object only stores endpoint info
@@ -160,8 +165,8 @@ public:
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
-        ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
-        return -EINVAL;
+      ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+      return -EINVAL;
     }
 
     topic_name = topic_arn->resource;
@@ -199,21 +204,21 @@ namespace {
 // ctor and set are done according to the "type" argument
 // if type is not "key" or "value" its a no-op
 class Attribute {
-    std::string key;
-    std::string value;
+  std::string key;
+  std::string value;
 public:
-    Attribute(const std::string& type, const std::string& key_or_value) {
-        set(type, key_or_value);
-    }
-    void set(const std::string& type, const std::string& key_or_value) {
-        if (type == "key") {
-            key = key_or_value;
-        } else if (type == "value") {
-            value = key_or_value;
-        }
+  Attribute(const std::string& type, const std::string& key_or_value) {
+    set(type, key_or_value);
+  }
+  void set(const std::string& type, const std::string& key_or_value) {
+    if (type == "key") {
+      key = key_or_value;
+    } else if (type == "value") {
+      value = key_or_value;
     }
-    const std::string& get_key() const { return key; }
-    const std::string& get_value() const { return value; }
+  }
+  const std::string& get_key() const { return key; }
+  const std::string& get_value() const { return value; }
 };
 
 using AttributeMap = std::map<unsigned, Attribute>;
@@ -431,11 +436,11 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
   if (store->getRados()->get_sync_module()) {
     const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
     if (psmodule) {
-        const auto& conf = psmodule->get_effective_conf();
-        data_bucket_prefix = conf["data_bucket_prefix"];
-        data_oid_prefix = conf["data_oid_prefix"];
-        // TODO: allow "push-only" on PS zone as well
-        push_only = false;
+      const auto& conf = psmodule->get_effective_conf();
+      data_bucket_prefix = conf["data_bucket_prefix"];
+      data_oid_prefix = conf["data_oid_prefix"];
+      // TODO: allow "push-only" on PS zone as well
+      push_only = false;
     }
   }
 
index 50f567f7fc1f4f903db6f4074570261b1736c8cb..30d058f7bdb13c0ca8aad821a4bc2447333e119e 100644 (file)
@@ -1,12 +1,50 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "rgw_common.h"
 #include "rgw_rest_pubsub_common.h"
 #include "common/dout.h"
+#include "rgw_url.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+  if (dest.push_endpoint.empty()) {
+      return true;
+  }
+  std::string user;
+  std::string password;
+  if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+    ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+    return false;
+  }
+  // this should be verified inside parse_url()
+  ceph_assert(user.empty() == password.empty());
+  if (!user.empty()) {
+      dest.stored_secret = true;
+      if (!rgw_transport_is_secure(cct, env)) {
+        ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl;
+        return false;
+      }
+  }
+  return true;
+}
+
+bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) {
+    return sub.dest.stored_secret;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
+    return topic.topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+    for (const auto& topic : topics.topics) {
+        if (topic_has_endpoint_secret(topic.second)) return true;
+    }
+    return false;
+}
 void RGWPSCreateTopicOp::execute() {
   op_ret = get_params();
   if (op_ret < 0) {
@@ -25,10 +63,17 @@ void RGWPSCreateTopicOp::execute() {
 void RGWPSListTopicsOp::execute() {
   ups.emplace(store, s->owner.get_id());
   op_ret = ups->get_user_topics(&result);
+  // if there are no topics it is not considered an error
+  op_ret = op_ret == -ENOENT ? 0 : op_ret;
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
     return;
   }
+  if (topics_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+    ldout(s->cct, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
   ldout(s->cct, 20) << "successfully got topics" << dendl;
 }
 
@@ -39,6 +84,11 @@ void RGWPSGetTopicOp::execute() {
   }
   ups.emplace(store, s->owner.get_id());
   op_ret = ups->get_topic(topic_name, &result);
+  if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+    ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -83,6 +133,11 @@ void RGWPSGetSubOp::execute() {
   ups.emplace(store, s->owner.get_id());
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->get_conf(&result);
+  if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+    ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
@@ -195,7 +250,7 @@ int RGWPSListNotifsOp::verify_permission() {
   }
 
   if (bucket_info.owner != s->owner.get_id()) {
-    ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
+    ldout(s->cct, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
     return -EPERM;
   }
 
index 6d78ce5ce1aa07ceb227a003f99ebce7e27787fc..f11c75658f52395d93443c70b5f14837fd341a1f 100644 (file)
@@ -6,6 +6,11 @@
 #include "rgw_op.h"
 #include "rgw_pubsub.h"
 
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env);
+
 // create a topic
 class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
 protected:
index b198bb33b19f8e27daccfe5b274cabcb429bb14c..d95b264ea6a2c77f8d904ba8c54b4f604cfffe34 100644 (file)
@@ -26,6 +26,10 @@ public:
     topic_name = s->object.name;
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
+    
+    if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+      return -EINVAL;
+    }
     dest.push_endpoint_args = s->info.args.get_str();
     // dest object only stores endpoint info
     // bucket to store events/records will be set only when subscription is created
@@ -169,9 +173,12 @@ public:
     const auto& conf = psmodule->get_effective_conf();
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
+    if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+      return -EINVAL;
+    }
+    dest.push_endpoint_args = s->info.args.get_str();
     dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
     dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
-    dest.push_endpoint_args = s->info.args.get_str();
     dest.arn_topic = topic_name;
 
     return 0;
diff --git a/src/rgw/rgw_url.cc b/src/rgw/rgw_url.cc
new file mode 100644 (file)
index 0000000..24c2537
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <string>
+#include <regex>
+
+namespace rgw {
+
+namespace {
+  const auto USER_GROUP_IDX = 3;
+  const auto PASSWORD_GROUP_IDX = 4;
+  const auto HOST_GROUP_IDX = 5;
+
+  const std::string schema_re = "([[:alpha:]]+:\\/\\/)";
+  const std::string user_pass_re = "(([^:\\s]+):([^@\\s]+)@)?";
+  const std::string host_port_re = "([[:alnum:].:-]+)";
+}
+
+bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password) {
+  const std::string re = schema_re + user_pass_re + host_port_re;
+  const std::regex url_regex(re, std::regex::icase);
+  std::smatch url_match_result;
+
+  if (std::regex_match(url, url_match_result, url_regex)) {
+    host = url_match_result[HOST_GROUP_IDX];
+    user = url_match_result[USER_GROUP_IDX];  
+    password = url_match_result[PASSWORD_GROUP_IDX];
+       return true; 
+  }
+
+  return false;
+}
+
+bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password) {
+  const std::string re = schema_re + user_pass_re + host_port_re;
+  const std::regex url_regex(re);
+  std::smatch url_match_result;
+
+  if (std::regex_match(url, url_match_result, url_regex)) {
+    user = url_match_result[USER_GROUP_IDX];  
+    password = url_match_result[PASSWORD_GROUP_IDX];
+       return true; 
+  }
+
+  return false;
+}
+}
+
diff --git a/src/rgw/rgw_url.h b/src/rgw/rgw_url.h
new file mode 100644 (file)
index 0000000..089401a
--- /dev/null
@@ -0,0 +1,12 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+namespace rgw {
+// parse a URL of the form: http|https|amqp|amqps|kafka://[user:password@]<host>[:port]
+bool parse_url_authority(const std::string& url, std::string& host, std::string& user, std::string& password);
+bool parse_url_userinfo(const std::string& url, std::string& user, std::string& password);
+}
+
index 4093e4b01d129e263245dc001c7fc398a439c1aa..5e360fef5455a78aa71cfd9e211b72595f65fbb4 100644 (file)
@@ -187,6 +187,12 @@ add_ceph_unittest(unittest_rgw_kms)
 
 target_link_libraries(unittest_rgw_kms ${rgw_libs})
 
+# unittest_rgw_url
+add_executable(unittest_rgw_url test_rgw_url.cc)
+add_ceph_unittest(unittest_rgw_url)
+
+target_link_libraries(unittest_rgw_url ${rgw_libs})
+
 add_executable(ceph_test_rgw_gc_log test_rgw_gc_log.cc $<TARGET_OBJECTS:unit-main>)
 target_link_libraries(ceph_test_rgw_gc_log ${rgw_libs} radostest-cxx)
 install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR})
index 1099664df201b55c09685bde9985a984127238fe..b03db36735f4d9699b71406546e621afe45391f4 100644 (file)
@@ -14,3 +14,17 @@ def get_gateway_connection(gateway, credentials):
                 calling_format = boto.s3.connection.OrdinaryCallingFormat())
     return gateway.connection
 
+def get_gateway_secure_connection(gateway, credentials):
+    """ secure connect to the given gateway """
+    if gateway.ssl_port == 0:
+        return None
+    if gateway.secure_connection is None:
+        gateway.secure_connection = boto.connect_s3(
+            aws_access_key_id = credentials.access_key,
+            aws_secret_access_key = credentials.secret,
+            host = gateway.host,
+            port = gateway.ssl_port,
+            is_secure = True,
+            validate_certs=False,
+            calling_format = boto.s3.connection.OrdinaryCallingFormat())
+    return gateway.secure_connection
index 47afe052048a2b5de77d5015c9d0f5e8901172ec..f189a507839715739338dca250fbf7ac90e54232 100644 (file)
@@ -3,7 +3,7 @@ from six import StringIO
 
 import json
 
-from .conn import get_gateway_connection
+from .conn import get_gateway_connection, get_gateway_secure_connection
 
 class Cluster:
     """ interface to run commands against a distinct ceph cluster """
@@ -18,13 +18,14 @@ class Gateway:
     """ interface to control a single radosgw instance """
     __metaclass__ = ABCMeta
 
-    def __init__(self, host = None, port = None, cluster = None, zone = None, proto = 'http', connection = None):
+    def __init__(self, host = None, port = None, cluster = None, zone = None, ssl_port = 0):
         self.host = host
         self.port = port
         self.cluster = cluster
         self.zone = zone
-        self.proto = proto
-        self.connection = connection
+        self.connection = None
+        self.secure_connection = None
+        self.ssl_port = ssl_port
 
     @abstractmethod
     def start(self, args = []):
@@ -37,7 +38,7 @@ class Gateway:
         pass
 
     def endpoint(self):
-        return '%s://%s:%d' % (self.proto, self.host, self.port)
+        return 'http://%s:%d' % (self.host, self.port)
 
 class SystemObject:
     """ interface for system objects, represented in json format and
@@ -181,6 +182,7 @@ class ZoneConn(object):
 
         if self.zone.gateways is not None:
             self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials)
+            self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials)
 
     def get_connection(self):
         return self.conn
index 7f013e788c46efb08f9b71b41fbf20cd09ee4778..31b8233c0316cdd30133f72165a601b0c04e2df8 100644 (file)
@@ -66,18 +66,6 @@ log = logging.getLogger('rgw_multi.tests')
 num_buckets = 0
 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
 
-def get_gateway_connection(gateway, credentials):
-    """ connect to the given gateway """
-    if gateway.connection is None:
-        gateway.connection = boto.connect_s3(
-                aws_access_key_id = credentials.access_key,
-                aws_secret_access_key = credentials.secret,
-                host = gateway.host,
-                port = gateway.port,
-                is_secure = False,
-                calling_format = boto.s3.connection.OrdinaryCallingFormat())
-    return gateway.connection
-
 def get_zone_connection(zone, credentials):
     """ connect to the zone's first gateway """
     if isinstance(credentials, list):
index bedd189bc630804a47a49d04b659d0d49aa37ade..1470d9a6e54beca8deb8e04437d5f250db63a57e 100644 (file)
@@ -382,12 +382,16 @@ kafka_server = 'localhost'
 
 class KafkaReceiver(object):
     """class for receiving and storing messages on a topic from the kafka broker"""
-    def __init__(self, topic):
+    def __init__(self, topic, security_type):
         from kafka import KafkaConsumer
         remaining_retries = 10
+        port = 9092
+        if security_type != 'PLAINTEXT':
+            security_type = 'SSL'
+            port = 9093
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server)
+                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
                 print('Kafka consumer created on topic: '+topic)
                 break
             except Exception as error:
@@ -414,10 +418,10 @@ def kafka_receiver_thread_runner(receiver):
     try:
         log.info('Kafka receiver started')
         print('Kafka receiver started')
-        for msg in receiver.consumer:
-            if receiver.stop:
-                break
-            receiver.events.append(json.loads(msg.value))
+        while not receiver.stop:
+            for msg in receiver.consumer:
+                receiver.events.append(json.loads(msg.value))
+            timer.sleep(0.1)
         log.info('Kafka receiver ended')
         print('Kafka receiver ended')
     except Exception as error:
@@ -425,9 +429,9 @@ def kafka_receiver_thread_runner(receiver):
         print('Kafka receiver ended unexpectedly: ' + str(error))
 
 
-def create_kafka_receiver_thread(topic):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
     """create kafka receiver and thread"""
-    receiver = KafkaReceiver(topic)
+    receiver = KafkaReceiver(topic, security_type)
     task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
     task.daemon = True
     return task, receiver
@@ -435,16 +439,39 @@ def create_kafka_receiver_thread(topic):
 def stop_kafka_receiver(receiver, task):
     """stop the receiver thread and wait for it to finis"""
     receiver.stop = True
-    task.join(5)
+    task.join(1)
     try:
         receiver.consumer.close()
     except Exception as error:
         log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
 
 
+# follow the instruction here to create and sign a broker certificate:
+# https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
+
+# the generated broker certificate should be stored in the java keystore for the use of the server
+# assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
+# following lines must be added to $KAFKA_DIR/config/server.properties
+# listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
+# sasl.enabled.mechanisms=PLAIN
+# ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
+# ssl.keystore.password = abcdefgh
+# ssl.key.password = abcdefgh
+# ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
+# ssl.truststore.password = abcdefgh
+
+# notes:
+# (1) we dont test client authentication, hence, no need to generate client keys
+# (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
+# should be copied to: $KAFKA_DIR
+
 def init_kafka():
     """ start kafka/zookeeper """
-    KAFKA_DIR = os.environ['KAFKA_DIR']
+    try:
+        KAFKA_DIR = os.environ['KAFKA_DIR']
+    except:
+        KAFKA_DIR = ''
+
     if KAFKA_DIR == '':
         log.info('KAFKA_DIR must be set to where kafka is installed')
         print('KAFKA_DIR must be set to where kafka is installed')
@@ -468,10 +495,13 @@ def init_kafka():
     print('Starting kafka...')
     kafka_log = open('./kafka.log', 'w')
     try:
+        kafka_env = os.environ.copy()
+        kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf'
         kafka_proc = subprocess.Popen([
             KAFKA_DIR+'bin/kafka-server-start.sh', 
             KAFKA_DIR+'config/server.properties'], 
-            stdout=kafka_log)
+            stdout=kafka_log,
+            env=kafka_env)
     except Exception as error:
         log.info('failed to execute kafka: %s', str(error))
         print('failed to execute kafka: %s' % str(error))
@@ -495,8 +525,18 @@ def clean_kafka(kafka_proc, zk_proc, kafka_log):
     """ stop kafka/zookeeper """
     try:
         kafka_log.close()
+        print('Shutdown Kafka...')
         kafka_proc.terminate()
+        time.sleep(5)
+        if kafka_proc.poll() is None:
+            print('Failed to shutdown Kafka... killing')
+            kafka_proc.kill()
+        print('Shutdown zookeeper...')
         zk_proc.terminate()
+        time.sleep(5)
+        if zk_proc.poll() is None:
+            print('Failed to shutdown zookeeper... killing')
+            zk_proc.kill()
     except:
         log.info('kafka/zookeeper already terminated')
 
@@ -766,8 +806,9 @@ def test_ps_s3_notification():
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
 
+
 def test_ps_s3_topic_on_master():
-    """ test s3 notification set/get/delete on master """
+    """ test s3 topics set/get/delete on master """
     zones, _  = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
@@ -775,7 +816,7 @@ def test_ps_s3_topic_on_master():
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # clean all topics
-    delete_all_s3_topics(zones[0].conn, zonegroup.name)
+    delete_all_s3_topics(zones[0], zonegroup.name)
    
     # create s3 topics
     endpoint_address = 'amqp://127.0.0.1:7001'
@@ -814,8 +855,9 @@ def test_ps_s3_topic_on_master():
     assert_equal(status, 404)
 
     # get the remaining 2 topics
-    result = topic_conf1.get_list()
-    assert_equal(len(result['Topics']), 2)
+    result, status = topic_conf1.get_list()
+    assert_equal(status, 200)
+    assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
     
     # delete topics
     result = topic_conf2.del_config()
@@ -826,8 +868,58 @@ def test_ps_s3_topic_on_master():
     # assert_equal(status, 200)
 
     # get topic list, make sure it is empty
-    result = topic_conf1.get_list()
-    assert_equal(len(result['Topics']), 0)
+    result, status = topic_conf1.get_list()
+    assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
+
+
+def test_ps_s3_topic_with_secret_on_master():
+    """ test s3 topics with secret set/get/delete on master """
+    zones, _  = init_env(require_ps=False)
+    if zones[0].secure_conn is None:
+        return SkipTest('secure connection is needed to test topic with secrets')
+
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # clean all topics
+    delete_all_s3_topics(zones[0], zonegroup.name)
+   
+    # create s3 topics
+    endpoint_address = 'amqp://user:password@127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    try:
+        result = bad_topic_conf.set_config()
+    except Exception as err:
+        print 'Error is expected: ' + str(err)
+    else:
+        assert False, 'user password configuration set allowed only over HTTPS'
+    
+    topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+    _, status = bad_topic_conf.get_config()
+    assert_equal(status/100, 4)
+
+    # get topic
+    result, status = topic_conf.get_config()
+    assert_equal(status, 200)
+    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+    
+    _, status = bad_topic_conf.get_config()
+    assert_equal(status/100, 4)
+    
+    _, status = topic_conf.get_list()
+    assert_equal(status/100, 2)
+
+    # delete topics
+    result = topic_conf.del_config()
 
 
 def test_ps_s3_notification_on_master():
@@ -1505,6 +1597,112 @@ def test_ps_s3_notification_push_kafka_on_master():
     clean_kafka(kafka_proc, zk_proc, kafka_log)
 
 
+def kafka_security(security_type):
+    """ test pushing kafka s3 notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    zones, _  = init_env(require_ps=False)
+    if security_type == 'SSL_SASL' and zones[0].secure_conn is None:
+        return SkipTest("secure connection is needed to test SASL_SSL security")
+    kafka_proc, zk_proc, kafka_log = init_kafka()
+    if kafka_proc is None or zk_proc is None:
+        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    # name is constant for manual testing
+    topic_name = bucket_name+'_topic'
+    # create consumer on the topic
+    task, receiver = create_kafka_receiver_thread(topic_name)
+    task.start()
+
+    # create s3 topic
+    if security_type == 'SSL_SASL':
+        endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+    else:
+        # ssl only
+        endpoint_address = 'kafka://' + kafka_server + ':9093'
+
+    KAFKA_DIR = os.environ['KAFKA_DIR']
+
+    # without acks from broker, with root CA
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
+
+    if security_type == 'SSL_SASL':
+        topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    else:
+        topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                         'Events': []
+                       }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf.set_config()
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print 'average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    try:
+        print 'wait for 5sec for the messages...'
+        time.sleep(5)
+        keys = list(bucket.list())
+        receiver.verify_s3_events(keys, exact_match=True)
+
+        # delete objects from the bucket
+        client_threads = []
+        start_time = time.time()
+        for key in bucket.list():
+            thr = threading.Thread(target = key.delete, args=())
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads] 
+        
+        time_diff = time.time() - start_time
+        print 'average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+        print 'wait for 5sec for the messages...'
+        time.sleep(5)
+        receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+    except Exception as err:
+        assert False, str(err)
+    finally: 
+        # cleanup
+        s3_notification_conf.del_config()
+        topic_conf.del_config()
+        # delete the bucket
+        for key in bucket.list():
+            key.delete()
+        zones[0].delete_bucket(bucket_name)
+        stop_kafka_receiver(receiver, task)
+        clean_kafka(kafka_proc, zk_proc, kafka_log)
+
+
+def test_ps_s3_notification_push_kafka_security_ssl():
+    kafka_security('SSL')
+
+def test_ps_s3_notification_push_kafka_security_ssl_sasl():
+    kafka_security('SSL_SASL')
+
+
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     if skip_push_tests:
index cfdf480ee0b5d1d5f52bfb3f2a9b3b1663522495..a67a2fee43b0dd9d1490f1dcb602c0e2199f1bd1 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import httplib
+import ssl
 import urllib
 import urlparse
 import hmac
@@ -130,22 +131,24 @@ class PSTopic:
         return self.send_request('GET', get_list=True)
 
 
-def delete_all_s3_topics(conn, region):
+def delete_all_s3_topics(zone, region):
     try:
+        conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
+        protocol = 'https' if conn.is_secure else 'http'
         client = boto3.client('sns',
-                              endpoint_url='http://'+conn.host+':'+str(conn.port),
-                              aws_access_key_id=conn.aws_access_key_id,
-                              aws_secret_access_key=conn.aws_secret_access_key,
-                              region_name=region,
-                              config=Config(signature_version='s3'))
+                endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+                aws_access_key_id=conn.aws_access_key_id,
+                aws_secret_access_key=conn.aws_secret_access_key,
+                region_name=region,
+                verify='./cert.pem',
+                config=Config(signature_version='s3'))
 
         topics = client.list_topics()['Topics']
         for topic in topics:
             print('topic cleanup, deleting: ' + topic['TopicArn'])
             assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
-    except:
-        print('failed to do topic cleanup. if there are topics '
-              'they may need to be manually deleted')
+    except Exception as err:
+        print 'failed to do topic cleanup: ' + str(err)
     
 
 class PSTopicS3:
@@ -163,12 +166,14 @@ class PSTopicS3:
         self.attributes = {}
         if endpoint_args is not None:
             self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
-        self.client = boto3.client('sns',
-                                   endpoint_url='http://'+conn.host+':'+str(conn.port),
-                                   aws_access_key_id=conn.aws_access_key_id,
-                                   aws_secret_access_key=conn.aws_secret_access_key,
-                                   region_name=region,
-                                   config=Config(signature_version='s3'))
+            protocol = 'https' if conn.is_secure else 'http'
+            self.client = boto3.client('sns',
+                               endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+                               aws_access_key_id=conn.aws_access_key_id,
+                               aws_secret_access_key=conn.aws_secret_access_key,
+                               region_name=region,
+                               verify='./cert.pem',
+                               config=Config(signature_version='s3'))
 
 
     def get_config(self):
@@ -188,9 +193,11 @@ class PSTopicS3:
                    'Date': string_date,
                    'Host': self.conn.host+':'+str(self.conn.port),
                    'Content-Type': content_type}
-        http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
-        if log.getEffectiveLevel() <= 10:
-            http_conn.set_debuglevel(5)
+        if self.conn.is_secure:
+            http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port, 
+                    context=ssl.create_default_context(cafile='./cert.pem'))
+        else:
+            http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
         http_conn.request(method, resource, body, headers)
         response = http_conn.getresponse()
         data = response.read()
@@ -212,7 +219,34 @@ class PSTopicS3:
     
     def get_list(self):
         """list all topics"""
-        return self.client.list_topics()
+        # note that boto3 supports list_topics(), however, the result only show ARNs
+        parameters = {'Action': 'ListTopics'}
+        body = urllib.urlencode(parameters)
+        string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+        content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+        resource = '/'
+        method = 'POST'
+        string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+        log.debug('StringTosign: %s', string_to_sign) 
+        signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+                                     string_to_sign.encode('utf-8'),
+                                     hashlib.sha1).digest())
+        headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+                   'Date': string_date,
+                   'Host': self.conn.host+':'+str(self.conn.port),
+                   'Content-Type': content_type}
+        if self.conn.is_secure:
+            http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port, 
+                    context=ssl.create_default_context(cafile='./cert.pem'))
+        else:
+            http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+        http_conn.request(method, resource, body, headers)
+        response = http_conn.getresponse()
+        data = response.read()
+        status = response.status
+        http_conn.close()
+        dict_response = xmltodict.parse(data)
+        return dict_response, status
 
 
 class PSNotification:
index 53ac815d942350a79b7537b1839dbea66add1cc7..af45f3ab000a668424470759595f5a38a0816255 100644 (file)
@@ -97,7 +97,7 @@ class Gateway(multisite.Gateway):
         # e.g. RGW_FRONTEND=civetweb
         # to run test under valgrind memcheck, set RGW_VALGRIND to 'yes'
         # e.g. RGW_VALGRIND=yes
-        cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
+        cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port), str(self.ssl_port)]
         if self.id:
             cmd += ['-i', self.id]
         cmd += ['--debug-rgw=20', '--debug-ms=1']
@@ -183,6 +183,7 @@ def init(parse_args):
                                          'checkpoint_retries': 60,
                                          'checkpoint_delay': 5,
                                          'reconfigure_delay': 5,
+                                         'use_ssl': 'false',
                                          })
     try:
         path = os.environ['RGW_MULTI_TEST_CONF']
@@ -213,6 +214,7 @@ def init(parse_args):
     parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
     parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
     parser.add_argument('--num-ps-zones', type=int, default=cfg.getint(section, 'num_ps_zones'))
+    parser.add_argument('--use-ssl', type=bool, default=cfg.getboolean(section, 'use_ssl'))
 
 
     es_cfg = []
@@ -269,10 +271,30 @@ def init(parse_args):
     num_az_zones = cfg.getint(section, 'num_az_zones')
 
     num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf 
-    print('num_ps_zones = ' + str(num_ps_zones))
 
     num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones + num_az_zones
 
+    use_ssl = cfg.getboolean(section, 'use_ssl')
+
+    if use_ssl and bootstrap:
+        cmd = ['openssl', 'req', 
+                '-x509', 
+                '-newkey', 'rsa:4096', 
+                '-sha256', 
+                '-nodes', 
+                '-keyout', 'key.pem', 
+                '-out', 'cert.pem', 
+                '-subj', '/CN=localhost', 
+                '-days', '3650']
+        bash(cmd)
+        # append key to cert
+        fkey = open('./key.pem', 'r')
+        if fkey.mode == 'r':
+            fcert = open('./cert.pem', 'a')
+            fcert.write(fkey.read())
+            fcert.close()
+        fkey.close()
+
     for zg in range(0, args.num_zonegroups):
         zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
         period.zonegroups.append(zonegroup)
@@ -362,11 +384,13 @@ def init(parse_args):
             if bootstrap:
                 period.update(zone, commit=True)
 
+            ssl_port_offset = 1000
             # start the gateways
             for g in range(0, args.gateways_per_zone):
                 port = gateway_port(zg, g + z * args.gateways_per_zone)
                 client_id = gateway_name(zg, z, g)
-                gateway = Gateway(client_id, 'localhost', port, cluster, zone)
+                gateway = Gateway(client_id, 'localhost', port, cluster, zone, 
+                        ssl_port = port+ssl_port_offset if use_ssl else 0)
                 if bootstrap:
                     gateway.start()
                 zone.gateways.append(gateway)
diff --git a/src/test/rgw/test_rgw_url.cc b/src/test/rgw/test_rgw_url.cc
new file mode 100644 (file)
index 0000000..8422bca
--- /dev/null
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw/rgw_url.h"
+#include <string>
+#include <gtest/gtest.h>
+
+using namespace rgw;
+
+TEST(TestURL, SimpleAuthority)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://example.com";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    ASSERT_TRUE(user.empty());
+    ASSERT_TRUE(password.empty());
+    EXPECT_STREQ(host.c_str(), "example.com"); 
+}
+
+TEST(TestURL, IPAuthority)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://1.2.3.4";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    ASSERT_TRUE(user.empty());
+    ASSERT_TRUE(password.empty());
+    EXPECT_STREQ(host.c_str(), "1.2.3.4"); 
+}
+
+TEST(TestURL, IPv6Authority)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://FE80:CD00:0000:0CDE:1257:0000:211E:729C";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    ASSERT_TRUE(user.empty());
+    ASSERT_TRUE(password.empty());
+    EXPECT_STREQ(host.c_str(), "FE80:CD00:0000:0CDE:1257:0000:211E:729C"); 
+}
+
+TEST(TestURL, AuthorityWithUserinfo)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://user:password@example.com";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    EXPECT_STREQ(host.c_str(), "example.com"); 
+    EXPECT_STREQ(user.c_str(), "user"); 
+    EXPECT_STREQ(password.c_str(), "password"); 
+}
+
+TEST(TestURL, AuthorityWithPort)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://user:password@example.com:1234";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    EXPECT_STREQ(host.c_str(), "example.com:1234"); 
+    EXPECT_STREQ(user.c_str(), "user"); 
+    EXPECT_STREQ(password.c_str(), "password"); 
+}
+
+TEST(TestURL, DifferentSchema)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "kafka://example.com";
+    ASSERT_TRUE(parse_url_authority(url, host, user, password));
+    ASSERT_TRUE(user.empty());
+    ASSERT_TRUE(password.empty());
+    EXPECT_STREQ(host.c_str(), "example.com"); 
+}
+
+TEST(TestURL, InvalidHost)
+{
+    std::string host;
+    std::string user;
+    std::string password;
+    const std::string url = "http://exa_mple.com";
+    ASSERT_FALSE(parse_url_authority(url, host, user, password));
+}
+