From e35d511cb71ca1756386beb53e77cc042ba0877e Mon Sep 17 00:00:00 2001 From: yuval Lifshitz Date: Mon, 13 Dec 2021 20:56:20 +0200 Subject: [PATCH] rgw/notifications: add cloudevents support to HTTP endpoint following the cloudevents HTTP spec: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md and more specifically this aws-s3 spec: https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md Signed-off-by: yuval Lifshitz --- doc/radosgw/notifications.rst | 3 + src/rgw/rgw_pubsub_push.cc | 87 +++++------- .../rgw/bucket_notification/requirements.txt | 3 + src/test/rgw/bucket_notification/test_bn.py | 133 +++++++++++++++--- 4 files changed, 150 insertions(+), 76 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 897ce81a5ba..f9fc8173910 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -122,6 +122,7 @@ To update a topic, use the same command used for topic creation, with the topic [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=] [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=] [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false] + [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false] Request parameters: @@ -134,6 +135,7 @@ Request parameters: - URI: ``http[s]://[: #include "include/buffer_fwd.h" #include "common/Formatter.h" +#include "common/iso_8601.h" #include "common/async/completion.h" #include "rgw_common.h" #include "rgw_data_sync.h" @@ -39,14 +40,26 @@ std::string json_format_pubsub_event(const EventType& event) { f.flush(ss); return ss.str(); } + +bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) { + bool value; + bool exists; + if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) { + throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name); + } + if (!exists) { + return default_value; + } + return value; +} class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { private: const std::string endpoint; - std::string str_ack_level; typedef unsigned ack_level_t; ack_level_t ack_level; // TODO: not used for now - bool verify_ssl; + const bool verify_ssl; + const bool cloudevents; static const ack_level_t ACK_LEVEL_ANY = 0; static const ack_level_t ACK_LEVEL_NON_ERROR = 1; @@ -98,11 +111,11 @@ private: }; public: - RGWPubSubHTTPEndpoint(const std::string& _endpoint, - const RGWHTTPArgs& args) : endpoint(_endpoint) { + RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : + endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) + { bool exists; - - str_ack_level = args.get("http-ack-level", &exists); + const auto& str_ack_level = args.get("http-ack-level", &exists); if (!exists || str_ack_level == "any") { // "any" is default ack_level = ACK_LEVEL_ANY; @@ -114,17 +127,6 @@ public: throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); } } - - auto str_verify_ssl = args.get("verify-ssl", &exists); - boost::algorithm::to_lower(str_verify_ssl); - // verify server certificate by default - if (!exists || str_verify_ssl == "true") { - verify_ssl = true; - } else if (str_verify_ssl == "false") { - verify_ssl = false; - } else { - throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl); - } } RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { @@ -139,6 +141,17 @@ public: bufferlist read_bl; RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); const auto post_data = json_format_pubsub_event(event); + if (cloudevents) { + // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md + // using "Binary Content Mode" + request.append_header("ce-specversion", "1.0"); + request.append_header("ce-type", "com.amazonaws." + event.eventName); + request.append_header("ce-time", to_iso_8601(event.eventTime)); + // default output of iso8601 is also RFC3339 compatible + request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2); + request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name); + request.append_header("ce-subject", event.object_key); + } request.set_post_data(post_data); request.set_send_length(post_data.length()); request.append_header("Content-Type", "application/json"); @@ -152,10 +165,8 @@ public: std::string to_str() const override { std::string str("HTTP/S Endpoint"); str += "\nURI: " + endpoint; - str += "\nAck Level: " + str_ack_level; str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL"); return str; - } }; @@ -440,44 +451,10 @@ private: kafka::connection_ptr_t conn; const ack_level_t ack_level; - bool get_verify_ssl(const RGWHTTPArgs& args) { - bool exists; - auto str_verify_ssl = args.get("verify-ssl", &exists); - if (!exists) { - // verify server certificate by default - return true; - } - boost::algorithm::to_lower(str_verify_ssl); - if (str_verify_ssl == "true") { - return true; - } - if (str_verify_ssl == "false") { - return false; - } - throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); - } - - bool get_use_ssl(const RGWHTTPArgs& args) { - bool exists; - auto str_use_ssl = args.get("use-ssl", &exists); - if (!exists) { - // by default ssl not used - return false; - } - boost::algorithm::to_lower(str_use_ssl); - if (str_use_ssl == "true") { - return true; - } - if (str_use_ssl == "false") { - return false; - } - throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl); - } ack_level_t get_ack_level(const RGWHTTPArgs& args) { bool exists; - // get ack level - const auto str_ack_level = args.get("kafka-ack-level", &exists); + const auto& str_ack_level = args.get("kafka-ack-level", &exists); if (!exists || str_ack_level == "broker") { // "broker" is default return ack_level_t::Broker; @@ -584,7 +561,7 @@ public: CephContext* _cct) : cct(_cct), topic(_topic), - conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) , + conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) , ack_level(get_ack_level(args)) { if (!conn) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index 0027741063e..a3cff2bedab 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -3,3 +3,6 @@ boto >=2.6.0 boto3 >=1.0.0 configparser >=5.0.0 kafka-python >=2.0.0 +pika +cloudevents +xmltodict diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 12113295dc8..0f74ae991ad 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -15,6 +15,8 @@ import hashlib from nose.plugins.attrib import attr import boto3 import datetime +from cloudevents.http import from_http +from dateutil import parser from boto.s3.connection import S3Connection @@ -66,43 +68,48 @@ class HTTPPostHandler(http_server.BaseHTTPRequestHandler): """HTTP POST hanler class storing the received events in its http server""" def do_POST(self): """implementation of POST handler""" - try: - content_length = int(self.headers['Content-Length']) - body = self.rfile.read(content_length) - log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) - self.server.append(json.loads(body)) - except: - log.error('HTTP Server received empty event') - self.send_response(400) + content_length = int(self.headers['Content-Length']) + body = self.rfile.read(content_length) + if self.server.cloudevents: + event = from_http(self.headers, body) + record = json.loads(body)['Records'][0] + assert_equal(event['specversion'], '1.0') + assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2']) + assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name']) + assert_equal(event['type'], 'com.amazonaws.' + record['eventName']) + assert_equal(event['datacontenttype'], 'application/json') + assert_equal(event['subject'], record['s3']['object']['key']) + assert_equal(parser.parse(event['time']), parser.parse(record['eventTime'])) + log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) + self.server.append(json.loads(body)) + if self.headers.get('Expect') == '100-continue': + self.send_response(100) else: - if self.headers.get('Expect') == '100-continue': - self.send_response(100) - else: - self.send_response(200) - finally: - if self.server.delay > 0: - time.sleep(self.server.delay) - self.end_headers() + self.send_response(200) + if self.server.delay > 0: + time.sleep(self.server.delay) + self.end_headers() class HTTPServerWithEvents(http_server.HTTPServer): """HTTP server used by the handler to store events""" - def __init__(self, addr, handler, worker_id, delay=0): + def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False): http_server.HTTPServer.__init__(self, addr, handler, False) self.worker_id = worker_id self.events = [] self.delay = delay + self.cloudevents = cloudevents def append(self, event): self.events.append(event) class HTTPServerThread(threading.Thread): """thread for running the HTTP server. reusing the same socket for all threads""" - def __init__(self, i, sock, addr, delay=0): + def __init__(self, i, sock, addr, delay=0, cloudevents=False): threading.Thread.__init__(self) self.i = i self.daemon = True - self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay) + self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents) self.httpd.socket = sock # prevent the HTTP server from re-binding every handler self.httpd.server_bind = self.server_close = lambda self: None @@ -129,13 +136,13 @@ class HTTPServerThread(threading.Thread): class StreamingHTTPServer: """multi-threaded http server class also holding list of events received into the handler each thread has its own server, and all servers share the same socket""" - def __init__(self, host, port, num_workers=100, delay=0): + def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False): addr = (host, port) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(addr) self.sock.listen(num_workers) - self.workers = [HTTPServerThread(i, self.sock, addr, delay) for i in range(num_workers)] + self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)] def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): """verify stored s3 records agains a list of keys""" @@ -1500,6 +1507,90 @@ def test_ps_s3_notification_push_http_on_master(): http_server.close() +@attr('http_test') +def test_ps_s3_notification_push_cloudevents_on_master(): + """ test pushing cloudevents notification on master """ + hostname = get_ip_http() + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + objects_size = {} + start_time = time.time() + for i in range(number_of_objects): + content = str(os.urandom(randint(1, 1024))) + object_size = len(content) + key = bucket.new_key(str(i)) + objects_size[key.name] = object_size + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) + + # cleanup + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + @attr('http_test') def test_ps_s3_opaque_data_on_master(): """ test that opaque id set in topic, is sent in notification on master """ -- 2.39.5