]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: add cloudevents support to HTTP endpoint 44301/head
authoryuval Lifshitz <ylifshit@redhat.com>
Mon, 13 Dec 2021 18:56:20 +0000 (20:56 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Fri, 7 Jan 2022 07:56:06 +0000 (09:56 +0200)
following the cloudevents HTTP spec:
https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
and more specifically this aws-s3 spec:
https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md

Signed-off-by: yuval Lifshitz <ylifshit@redhat.com>
doc/radosgw/notifications.rst
src/rgw/rgw_pubsub_push.cc
src/test/rgw/bucket_notification/requirements.txt
src/test/rgw/bucket_notification/test_bn.py

index 897ce81a5ba2851879b02b72f364c6e990c701f8..f9fc8173910d9536baa88a6251dba65f347b1a25 100644 (file)
@@ -122,6 +122,7 @@ To update a topic, use the same command used for topic creation, with the topic
    [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
    [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
    [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]
+   [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false]
 
 Request parameters:
 
@@ -134,6 +135,7 @@ Request parameters:
  - URI: ``http[s]://<fqdn>[:<port]``
  - port defaults to: 80/443 for HTTP/S accordingly
  - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+ - cloudevents: indicate whether the HTTP header should contain attributes according to the `S3 CloudEvents Spec`_ ("false" by default)
 
 - AMQP0.9.1 endpoint
 
@@ -450,3 +452,4 @@ pushed or pulled using the pubsub sync module. For example:
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
 .. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
 .. _Bucket Operations: ../s3/bucketops
+.. _S3 CloudEvents Spec: https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md
index 7aff6eb443b230f0123031f9a992884afa389872..79fa736e9250421ff87bf85a4c5877759dbb6874 100644 (file)
@@ -7,6 +7,7 @@
 #include <algorithm>
 #include "include/buffer_fwd.h"
 #include "common/Formatter.h"
+#include "common/iso_8601.h"
 #include "common/async/completion.h"
 #include "rgw_common.h"
 #include "rgw_data_sync.h"
@@ -39,14 +40,26 @@ std::string json_format_pubsub_event(const EventType& event) {
   f.flush(ss);
   return ss.str();
 }
+  
+bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
+  bool value;
+  bool exists;
+  if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
+    throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
+  }
+  if (!exists) {
+    return default_value;
+  }
+  return value;
+}
 
 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
 private:
   const std::string endpoint;
-  std::string str_ack_level;
   typedef unsigned ack_level_t;
   ack_level_t ack_level; // TODO: not used for now
-  bool verify_ssl;
+  const bool verify_ssl;
+  const bool cloudevents;
   static const ack_level_t ACK_LEVEL_ANY = 0;
   static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
 
@@ -98,11 +111,11 @@ private:
   };
 
 public:
-  RGWPubSubHTTPEndpoint(const std::string& _endpoint, 
-    const RGWHTTPArgs& args) : endpoint(_endpoint) {
+  RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : 
+    endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) 
+  {
     bool exists;
-
-    str_ack_level = args.get("http-ack-level", &exists);
+    const auto& str_ack_level = args.get("http-ack-level", &exists);
     if (!exists || str_ack_level == "any") {
       // "any" is default
       ack_level = ACK_LEVEL_ANY;
@@ -114,17 +127,6 @@ public:
         throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
       }
     }
-
-    auto str_verify_ssl = args.get("verify-ssl", &exists);
-    boost::algorithm::to_lower(str_verify_ssl);
-    // verify server certificate by default
-    if (!exists || str_verify_ssl == "true") {
-      verify_ssl = true;
-    } else if (str_verify_ssl == "false") {
-      verify_ssl = false;
-    } else {
-        throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
-    }
   }
 
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
@@ -139,6 +141,17 @@ public:
     bufferlist read_bl;
     RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
     const auto post_data = json_format_pubsub_event(event);
+    if (cloudevents) {
+      // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
+      // using "Binary Content Mode"
+      request.append_header("ce-specversion", "1.0");
+      request.append_header("ce-type", "com.amazonaws." + event.eventName);
+      request.append_header("ce-time", to_iso_8601(event.eventTime)); 
+      // default output of iso8601 is also RFC3339 compatible
+      request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
+      request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
+      request.append_header("ce-subject", event.object_key);
+    }
     request.set_post_data(post_data);
     request.set_send_length(post_data.length());
     request.append_header("Content-Type", "application/json");
@@ -152,10 +165,8 @@ public:
   std::string to_str() const override {
     std::string str("HTTP/S Endpoint");
     str += "\nURI: " + endpoint;
-    str += "\nAck Level: " + str_ack_level;
     str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
     return str;
-
   }
 };
 
@@ -440,44 +451,10 @@ private:
   kafka::connection_ptr_t conn;
   const ack_level_t ack_level;
 
-  bool get_verify_ssl(const RGWHTTPArgs& args) {
-    bool exists;
-    auto str_verify_ssl = args.get("verify-ssl", &exists);
-    if (!exists) {
-      // verify server certificate by default
-      return true;
-    }
-    boost::algorithm::to_lower(str_verify_ssl);
-    if (str_verify_ssl == "true") {
-      return true;
-    }
-    if (str_verify_ssl == "false") {
-      return false;
-    }
-    throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
-  }
-
-  bool get_use_ssl(const RGWHTTPArgs& args) {
-    bool exists;
-    auto str_use_ssl = args.get("use-ssl", &exists);
-    if (!exists) {
-      // by default ssl not used
-      return false;
-    }
-    boost::algorithm::to_lower(str_use_ssl);
-    if (str_use_ssl == "true") {
-      return true;
-    }
-    if (str_use_ssl == "false") {
-      return false;
-    }
-    throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
-  }
 
   ack_level_t get_ack_level(const RGWHTTPArgs& args) {
     bool exists;
-    // get ack level
-    const auto str_ack_level = args.get("kafka-ack-level", &exists);
+    const auto& str_ack_level = args.get("kafka-ack-level", &exists);
     if (!exists || str_ack_level == "broker") {
       // "broker" is default
       return ack_level_t::Broker;
@@ -584,7 +561,7 @@ public:
       CephContext* _cct) : 
         cct(_cct),
         topic(_topic),
-        conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
+        conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
         ack_level(get_ack_level(args)) {
     if (!conn) { 
       throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
index 0027741063e0be547dd881c8878367ccd81cc5df..a3cff2bedabb333094172eba9ce56eed380cc5ce 100644 (file)
@@ -3,3 +3,6 @@ boto >=2.6.0
 boto3 >=1.0.0
 configparser >=5.0.0
 kafka-python >=2.0.0
+pika
+cloudevents
+xmltodict
index 12113295dc8a7cdb1f4db3054634c760479978a7..0f74ae991ad99e0fda2f8d4e0f6d25adf31c3cdd 100644 (file)
@@ -15,6 +15,8 @@ import hashlib
 from nose.plugins.attrib import attr
 import boto3
 import datetime
+from cloudevents.http import from_http
+from dateutil import parser
 
 from boto.s3.connection import S3Connection
 
@@ -66,43 +68,48 @@ class HTTPPostHandler(http_server.BaseHTTPRequestHandler):
     """HTTP POST hanler class storing the received events in its http server"""
     def do_POST(self):
         """implementation of POST handler"""
-        try:
-            content_length = int(self.headers['Content-Length'])
-            body = self.rfile.read(content_length)
-            log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
-            self.server.append(json.loads(body))
-        except:
-            log.error('HTTP Server received empty event')
-            self.send_response(400)
+        content_length = int(self.headers['Content-Length'])
+        body = self.rfile.read(content_length)
+        if self.server.cloudevents:
+            event = from_http(self.headers, body) 
+            record = json.loads(body)['Records'][0]
+            assert_equal(event['specversion'], '1.0')
+            assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
+            assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
+            assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
+            assert_equal(event['datacontenttype'], 'application/json') 
+            assert_equal(event['subject'], record['s3']['object']['key'])
+            assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
+        log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+        self.server.append(json.loads(body))
+        if self.headers.get('Expect') == '100-continue':
+            self.send_response(100)
         else:
-            if self.headers.get('Expect') == '100-continue':
-                self.send_response(100)
-            else:
-                self.send_response(200)
-        finally:
-            if self.server.delay > 0:
-                time.sleep(self.server.delay)
-            self.end_headers()
+            self.send_response(200)
+        if self.server.delay > 0:
+            time.sleep(self.server.delay)
+        self.end_headers()
 
 
 class HTTPServerWithEvents(http_server.HTTPServer):
     """HTTP server used by the handler to store events"""
-    def __init__(self, addr, handler, worker_id, delay=0):
+    def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False):
         http_server.HTTPServer.__init__(self, addr, handler, False)
         self.worker_id = worker_id
         self.events = []
         self.delay = delay
+        self.cloudevents = cloudevents
 
     def append(self, event):
         self.events.append(event)
 
 class HTTPServerThread(threading.Thread):
     """thread for running the HTTP server. reusing the same socket for all threads"""
-    def __init__(self, i, sock, addr, delay=0):
+    def __init__(self, i, sock, addr, delay=0, cloudevents=False):
         threading.Thread.__init__(self)
         self.i = i
         self.daemon = True
-        self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay)
+        self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents)
         self.httpd.socket = sock
         # prevent the HTTP server from re-binding every handler
         self.httpd.server_bind = self.server_close = lambda self: None
@@ -129,13 +136,13 @@ class HTTPServerThread(threading.Thread):
 class StreamingHTTPServer:
     """multi-threaded http server class also holding list of events received into the handler
     each thread has its own server, and all servers share the same socket"""
-    def __init__(self, host, port, num_workers=100, delay=0):
+    def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False):
         addr = (host, port)
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.sock.bind(addr)
         self.sock.listen(num_workers)
-        self.workers = [HTTPServerThread(i, self.sock, addr, delay) for i in range(num_workers)]
+        self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)]
 
     def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
         """verify stored s3 records agains a list of keys"""
@@ -1500,6 +1507,90 @@ def test_ps_s3_notification_push_http_on_master():
     http_server.close()
 
 
+@attr('http_test')
+def test_ps_s3_notification_push_cloudevents_on_master():
+    """ test pushing cloudevents notification on master """
+    hostname = get_ip_http()
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    objects_size = {}
+    start_time = time.time()
+    for i in range(number_of_objects):
+        content = str(os.urandom(randint(1, 1024)))
+        object_size = len(content)
+        key = bucket.new_key(str(i))
+        objects_size[key.name] = object_size
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check http receiver
+    keys = list(bucket.list())
+    http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check http receiver
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
+
+    # cleanup
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 @attr('http_test')
 def test_ps_s3_opaque_data_on_master():
     """ test that opaque id set in topic, is sent in notification on master """