#include <algorithm>
#include "include/buffer_fwd.h"
#include "common/Formatter.h"
+#include "common/iso_8601.h"
#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
f.flush(ss);
return ss.str();
}
+
+bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
+ bool value;
+ bool exists;
+ if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
+ throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
+ }
+ if (!exists) {
+ return default_value;
+ }
+ return value;
+}
class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
const std::string endpoint;
- std::string str_ack_level;
typedef unsigned ack_level_t;
ack_level_t ack_level; // TODO: not used for now
- bool verify_ssl;
+ const bool verify_ssl;
+ const bool cloudevents;
static const ack_level_t ACK_LEVEL_ANY = 0;
static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
};
public:
- RGWPubSubHTTPEndpoint(const std::string& _endpoint,
- const RGWHTTPArgs& args) : endpoint(_endpoint) {
+ RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
+ endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
+ {
bool exists;
-
- str_ack_level = args.get("http-ack-level", &exists);
+ const auto& str_ack_level = args.get("http-ack-level", &exists);
if (!exists || str_ack_level == "any") {
// "any" is default
ack_level = ACK_LEVEL_ANY;
throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
}
}
-
- auto str_verify_ssl = args.get("verify-ssl", &exists);
- boost::algorithm::to_lower(str_verify_ssl);
- // verify server certificate by default
- if (!exists || str_verify_ssl == "true") {
- verify_ssl = true;
- } else if (str_verify_ssl == "false") {
- verify_ssl = false;
- } else {
- throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
- }
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
+ if (cloudevents) {
+ // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
+ // using "Binary Content Mode"
+ request.append_header("ce-specversion", "1.0");
+ request.append_header("ce-type", "com.amazonaws." + event.eventName);
+ request.append_header("ce-time", to_iso_8601(event.eventTime));
+ // default output of iso8601 is also RFC3339 compatible
+ request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
+ request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
+ request.append_header("ce-subject", event.object_key);
+ }
request.set_post_data(post_data);
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
std::string to_str() const override {
std::string str("HTTP/S Endpoint");
str += "\nURI: " + endpoint;
- str += "\nAck Level: " + str_ack_level;
str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
return str;
-
}
};
kafka::connection_ptr_t conn;
const ack_level_t ack_level;
- bool get_verify_ssl(const RGWHTTPArgs& args) {
- bool exists;
- auto str_verify_ssl = args.get("verify-ssl", &exists);
- if (!exists) {
- // verify server certificate by default
- return true;
- }
- boost::algorithm::to_lower(str_verify_ssl);
- if (str_verify_ssl == "true") {
- return true;
- }
- if (str_verify_ssl == "false") {
- return false;
- }
- throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
- }
-
- bool get_use_ssl(const RGWHTTPArgs& args) {
- bool exists;
- auto str_use_ssl = args.get("use-ssl", &exists);
- if (!exists) {
- // by default ssl not used
- return false;
- }
- boost::algorithm::to_lower(str_use_ssl);
- if (str_use_ssl == "true") {
- return true;
- }
- if (str_use_ssl == "false") {
- return false;
- }
- throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
- }
ack_level_t get_ack_level(const RGWHTTPArgs& args) {
bool exists;
- // get ack level
- const auto str_ack_level = args.get("kafka-ack-level", &exists);
+ const auto& str_ack_level = args.get("kafka-ack-level", &exists);
if (!exists || str_ack_level == "broker") {
// "broker" is default
return ack_level_t::Broker;
CephContext* _cct) :
cct(_cct),
topic(_topic),
- conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
+ conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
ack_level(get_ack_level(args)) {
if (!conn) {
throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
from nose.plugins.attrib import attr
import boto3
import datetime
+from cloudevents.http import from_http
+from dateutil import parser
from boto.s3.connection import S3Connection
"""HTTP POST hanler class storing the received events in its http server"""
def do_POST(self):
"""implementation of POST handler"""
- try:
- content_length = int(self.headers['Content-Length'])
- body = self.rfile.read(content_length)
- log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
- self.server.append(json.loads(body))
- except:
- log.error('HTTP Server received empty event')
- self.send_response(400)
+ content_length = int(self.headers['Content-Length'])
+ body = self.rfile.read(content_length)
+ if self.server.cloudevents:
+ event = from_http(self.headers, body)
+ record = json.loads(body)['Records'][0]
+ assert_equal(event['specversion'], '1.0')
+ assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
+ assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
+ assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
+ assert_equal(event['datacontenttype'], 'application/json')
+ assert_equal(event['subject'], record['s3']['object']['key'])
+ assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
+ log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+ self.server.append(json.loads(body))
+ if self.headers.get('Expect') == '100-continue':
+ self.send_response(100)
else:
- if self.headers.get('Expect') == '100-continue':
- self.send_response(100)
- else:
- self.send_response(200)
- finally:
- if self.server.delay > 0:
- time.sleep(self.server.delay)
- self.end_headers()
+ self.send_response(200)
+ if self.server.delay > 0:
+ time.sleep(self.server.delay)
+ self.end_headers()
class HTTPServerWithEvents(http_server.HTTPServer):
"""HTTP server used by the handler to store events"""
- def __init__(self, addr, handler, worker_id, delay=0):
+ def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False):
http_server.HTTPServer.__init__(self, addr, handler, False)
self.worker_id = worker_id
self.events = []
self.delay = delay
+ self.cloudevents = cloudevents
def append(self, event):
self.events.append(event)
class HTTPServerThread(threading.Thread):
"""thread for running the HTTP server. reusing the same socket for all threads"""
- def __init__(self, i, sock, addr, delay=0):
+ def __init__(self, i, sock, addr, delay=0, cloudevents=False):
threading.Thread.__init__(self)
self.i = i
self.daemon = True
- self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay)
+ self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents)
self.httpd.socket = sock
# prevent the HTTP server from re-binding every handler
self.httpd.server_bind = self.server_close = lambda self: None
class StreamingHTTPServer:
"""multi-threaded http server class also holding list of events received into the handler
each thread has its own server, and all servers share the same socket"""
- def __init__(self, host, port, num_workers=100, delay=0):
+ def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False):
addr = (host, port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(addr)
self.sock.listen(num_workers)
- self.workers = [HTTPServerThread(i, self.sock, addr, delay) for i in range(num_workers)]
+ self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)]
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
"""verify stored s3 records agains a list of keys"""
http_server.close()
+@attr('http_test')
+def test_ps_s3_notification_push_cloudevents_on_master():
+ """ test pushing cloudevents notification on master """
+ hostname = get_ip_http()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ objects_size = {}
+ start_time = time.time()
+ for i in range(number_of_objects):
+ content = str(os.urandom(randint(1, 1024)))
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
@attr('http_test')
def test_ps_s3_opaque_data_on_master():
""" test that opaque id set in topic, is sent in notification on master """