]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: fix recerds/event json format to match documentation
authorYuval Lifshitz <yuvalif@yahoo.com>
Fri, 29 Nov 2019 05:31:07 +0000 (07:31 +0200)
committerNathan Cutler <ncutler@suse.com>
Fri, 24 Jan 2020 15:10:49 +0000 (16:10 +0100)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
(cherry picked from commit 11e66bd16acdda0f2a097e94c66cc25f4550b99c)

src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_sync_module_pubsub.cc
src/test/rgw/rgw_multi/tests_ps.py

index ed9da7eec1caa16180ab5a7b68a87f83f37bbc3f..bb53dc9aaad4c1bf06caa6e0afc9ea13580924b4 100644 (file)
@@ -732,7 +732,7 @@ void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter
 
   Formatter::ArraySection s(*f, EventType::json_type_plural);
   for (auto& event : events) {
-    encode_json(EventType::json_type_single, event, f);
+    encode_json("", event, f);
   }
 }
 
index e73fc8b46b0df5a96e1c8eaf83f8b8ef5d6ba167..8c8f0a7fc6da850e3efdad47b2e6dc5d74c2e414 100644 (file)
@@ -194,9 +194,8 @@ struct rgw_pubsub_s3_notifications {
 }*/
 
 struct rgw_pubsub_s3_record {
-  constexpr static const char* const json_type_single = "Record";
   constexpr static const char* const json_type_plural = "Records";
-  // 2.1
+  // 2.2
   std::string eventVersion;
   // aws:s3
   std::string eventSource;
@@ -304,7 +303,6 @@ struct rgw_pubsub_s3_record {
 WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
 
 struct rgw_pubsub_event {
-  constexpr static const char* const json_type_single = "event";
   constexpr static const char* const json_type_plural = "events";
   std::string id;
   std::string event_name;
index 0c13886e01b31b8bf8f39dac66d3048f3da1c949..6b3ecef8df0f29b79212f1371cf262f9e57194f4 100644 (file)
@@ -26,7 +26,13 @@ template<typename EventType>
 std::string json_format_pubsub_event(const EventType& event) {
   std::stringstream ss;
   JSONFormatter f(false);
-  encode_json(EventType::json_type_single, event, &f);
+  {
+    Formatter::ObjectSection s(f, EventType::json_type_plural);
+    {
+      Formatter::ArraySection s(f, EventType::json_type_plural);
+      encode_json("", event, &f);
+    }
+  }
   f.flush(ss);
   return ss.str();
 }
index cf1d9543ac139f487add36b60f1f33d22fdf2ed5..3fae724a78dba69bd6919e7d644b869e515f0a5d 100644 (file)
@@ -477,7 +477,7 @@ public:
   PSEvent(const EventRef<EventType>& _event) : event(_event) {}
 
   void format(bufferlist *bl) const {
-    bl->append(json_str(EventType::json_type_single, *event));
+    bl->append(json_str("", *event));
   }
 
   void encode_event(bufferlist& bl) const {
index bee36670eaf58240ec1395067ef164fedd1b529f..9acf9e4ad92cefd8276f95ec49181fb44f0972c4 100644 (file)
@@ -247,15 +247,30 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
     err = ''
     for key in keys:
         key_found = False
-        for event in events:
-            if event['info']['bucket']['name'] == key.bucket.name and \
-                event['info']['key']['name'] == key.name:
-                if deletions and event['event'] == 'OBJECT_DELETE':
-                    key_found = True
-                    break
-                elif not deletions and event['event'] == 'OBJECT_CREATE':
-                    key_found = True
+        if type(events) is list:
+            for event_list in events: 
+                if key_found:
                     break
+                for event in event_list['events']:
+                    if event['info']['bucket']['name'] == key.bucket.name and \
+                        event['info']['key']['name'] == key.name:
+                        if deletions and event['event'] == 'OBJECT_DELETE':
+                            key_found = True
+                            break
+                        elif not deletions and event['event'] == 'OBJECT_CREATE':
+                            key_found = True
+                            break
+        else:
+            for event in events['events']:
+                if event['info']['bucket']['name'] == key.bucket.name and \
+                    event['info']['key']['name'] == key.name:
+                    if deletions and event['event'] == 'OBJECT_DELETE':
+                        key_found = True
+                        break
+                    elif not deletions and event['event'] == 'OBJECT_CREATE':
+                        key_found = True
+                        break
+
         if not key_found:
             err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
             log.error(events)
@@ -274,27 +289,44 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
     err = ''
     for key in keys:
         key_found = False
-        for record in records:
-            if record['s3']['bucket']['name'] == key.bucket.name and \
-                record['s3']['object']['key'] == key.name:
-                if deletions and 'ObjectRemoved' in record['eventName']:
-                    key_found = True
-                    break
-                elif not deletions and 'ObjectCreated' in record['eventName']:
-                    key_found = True
+        if type(records) is list:
+            for record_list in records:
+                if key_found:
                     break
+                for record in record_list['Records']:
+                    if record['s3']['bucket']['name'] == key.bucket.name and \
+                        record['s3']['object']['key'] == key.name:
+                        if deletions and 'ObjectRemoved' in record['eventName']:
+                            key_found = True
+                            break
+                        elif not deletions and 'ObjectCreated' in record['eventName']:
+                            key_found = True
+                            break
+        else:
+            for record in records['Records']:
+                if record['s3']['bucket']['name'] == key.bucket.name and \
+                    record['s3']['object']['key'] == key.name:
+                    if deletions and 'ObjectRemoved' in record['eventName']:
+                        key_found = True
+                        break
+                    elif not deletions and 'ObjectCreated' in record['eventName']:
+                        key_found = True
+                        break
+
         if not key_found:
             err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
-            for record in records:
-                log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
+            for record_list in records:
+                for record in record_list['Records']:
+                    log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
             assert False, err
 
     if not len(records) == len(keys):
         err = 'superfluous records are found'
         log.warning(err)
         if exact_match:
-            for record in records:
-                log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
+            for record_list in records:
+                for record in record_list['Records']:
+                    log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
             assert False, err
 
 
@@ -529,12 +561,12 @@ def test_ps_s3_notification_records():
 
     # get the events from the subscription
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    for record in parsed_result['Records']:
+    records = json.loads(result)
+    for record in records['Records']:
         log.debug(record)
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(records, keys, exact_match=False)
 
     # cleanup
     _, status = s3_notification_conf.del_config()
@@ -896,8 +928,8 @@ def ps_s3_notification_filter(on_master):
     found_in4 = []
 
     for event in receiver.get_and_reset_events():
-        notif_id = event['s3']['configurationId']
-        key_name = event['s3']['object']['key']
+        notif_id = event['Records'][0]['s3']['configurationId']
+        key_name = event['Records'][0]['s3']['object']['key']
         if notif_id == notification_name+'_1':
             found_in1.append(key_name)
         elif notif_id == notification_name+'_2':
@@ -1432,12 +1464,12 @@ def test_ps_subscription():
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(events, keys, exact_match=False)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
@@ -1447,11 +1479,11 @@ def test_ps_subscription():
 
     # get the delete events from the subscriptions
     result, _ = sub_conf.get_events()
-    for event in parsed_result['events']:
+    for event in events['events']:
         log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
     # TODO: check deletions
     # TODO: use exact match
-    # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+    # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
     # we should see the creations as well as the deletions
     # delete subscription
     _, status = sub_conf.del_config()
@@ -1530,28 +1562,28 @@ def test_ps_event_type_subscription():
 
     # get the events from the creation subscription
     result, _ = sub_create_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(events, keys, exact_match=False)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
-    assert_equal(len(parsed_result['events']), 0)
+    assert_equal(len(events['events']), 0)
     # get the events from the all events subscription
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
                   str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(events, keys, exact_match=False)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
@@ -1561,32 +1593,32 @@ def test_ps_event_type_subscription():
 
     # get the events from the creations subscription
     result, _ = sub_create_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # deletions should not change the creation events
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(events, keys, exact_match=False)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # only deletions should be listed here
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+    verify_events_by_elements(events, keys, exact_match=False, deletions=True)
     # get the events from the all events subscription
     result, _ = sub_create_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # both deletions and creations should be here
     # TODO: use exact match
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False)
-    # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+    verify_events_by_elements(events, keys, exact_match=False, deletions=False)
+    # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
     # TODO: (1) test deletions (2) test overall number of events
 
     # test subscription deletion when topic is specified
@@ -1644,18 +1676,17 @@ def test_ps_event_fetching():
     while True:
         # get the events from the subscription
         result, _ = sub_conf.get_events(max_events, next_marker)
-        parsed_result = json.loads(result)
-        events = parsed_result['events']
-        total_events_count += len(events)
-        all_events.extend(events)
-        next_marker = parsed_result['next_marker']
-        for event in events:
+        events = json.loads(result)
+        total_events_count += len(events['events'])
+        all_events.extend(events['events'])
+        next_marker = events['next_marker']
+        for event in events['events']:
             log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
         if next_marker == '':
             break
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_events_by_elements(all_events, keys, exact_match=False)
+    verify_events_by_elements({'events': all_events}, keys, exact_match=False)
 
     # cleanup
     sub_conf.del_config()
@@ -1699,17 +1730,16 @@ def test_ps_event_acking():
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    events = parsed_result['events']
+    events = json.loads(result)
     original_number_of_events = len(events)
-    for event in events:
+    for event in events['events']:
         log.debug('Event (before ack)  id: "' + str(event['id']) + '"')
     keys = list(bucket.list())
     # TODO: use exact match
     verify_events_by_elements(events, keys, exact_match=False)
     # ack half of the  events
     events_to_ack = number_of_objects/2
-    for event in events:
+    for event in events['events']:
         if events_to_ack == 0:
             break
         _, status = sub_conf.ack_events(event['id'])
@@ -1718,10 +1748,10 @@ def test_ps_event_acking():
 
     # verify that acked events are gone
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event (after ack) id: "' + str(event['id']) + '"')
-    assert len(parsed_result['events']) >= (original_number_of_events - number_of_objects/2)
+    assert len(events) >= (original_number_of_events - number_of_objects/2)
 
     # cleanup
     sub_conf.del_config()
@@ -1774,12 +1804,12 @@ def test_ps_creation_triggers():
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
 
     # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
-    assert len(parsed_result['events']) >= 3
+    assert len(events['events']) >= 3
     # cleanup
     sub_conf.del_config()
     notification_conf.del_config()
@@ -1930,13 +1960,13 @@ def test_ps_s3_multipart_on_master():
 
     events = receiver2.get_and_reset_events()
     assert_equal(len(events), 1)
-    assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post')
-    assert_equal(events[0]['s3']['configurationId'], notification_name+'_2')
+    assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
+    assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2')
 
     events = receiver3.get_and_reset_events()
     assert_equal(len(events), 1)
-    assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
-    assert_equal(events[0]['s3']['configurationId'], notification_name+'_3')
+    assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
+    assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
 
     # cleanup
     stop_amqp_receiver(receiver1, task1)
@@ -2021,14 +2051,14 @@ def test_ps_versioned_deletion():
 
     # get the delete events from the subscription
     result, _ = sub_conf1.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
         assert_equal(str(event['event']), event_type1)
 
     result, _ = sub_conf2.get_events()
-    parsed_result = json.loads(result)
-    for event in parsed_result['events']:
+    events = json.loads(result)
+    for event in events['events']:
         log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
         assert_equal(str(event['event']), event_type2)
 
@@ -2176,13 +2206,14 @@ def test_ps_s3_versioned_deletion_on_master():
     events = receiver.get_and_reset_events()
     delete_events = 0
     delete_marker_create_events = 0
-    for event in events:
-        if event['eventName'] == 's3:ObjectRemoved:Delete':
-            delete_events += 1
-            assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
-        if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
-            delete_marker_create_events += 1
-            assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
+    for event_list in events:
+        for event in event_list['Records']:
+            if event['eventName'] == 's3:ObjectRemoved:Delete':
+                delete_events += 1
+                assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
+            if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
+                delete_marker_create_events += 1
+                assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
    
     # 3 key versions were deleted (v1, v2 and the deletion marker)
     # notified over the same topic via 2 notifications (1,3)
@@ -2510,9 +2541,9 @@ def test_ps_delete_bucket():
     sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
                               topic_name)
     result, _ = sub_conf.get_events()
-    parsed_result = json.loads(result)
+    records = json.loads(result)
     # TODO: use exact match
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(records, keys, exact_match=False)
 
     # s3 notification is deleted with bucket
     _, status = s3_notification_conf.get_config(notification=notification_name)
@@ -2861,12 +2892,12 @@ def test_ps_s3_multiple_topics_notification():
 
     # get the events from both of the subscription
     result, _ = sub_conf1.get_events()
-    parsed_result = json.loads(result)
-    for record in parsed_result['Records']:
+    records = json.loads(result)
+    for record in records['Records']:
         log.debug(record)
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(records, keys, exact_match=False)
     receiver.verify_s3_events(keys, exact_match=False)
 
     result, _ = sub_conf2.get_events()
@@ -2875,7 +2906,7 @@ def test_ps_s3_multiple_topics_notification():
         log.debug(record)
     keys = list(bucket.list())
     # TODO: use exact match
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(records, keys, exact_match=False)
     http_server.verify_s3_events(keys, exact_match=False)
 
     # cleanup