From: Yuval Lifshitz Date: Fri, 29 Nov 2019 05:31:07 +0000 (+0200) Subject: rgw/pubsub: fix recerds/event json format to match documentation X-Git-Tag: v14.2.8~102^2~1^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=319543fb34bd595886ef3072e7223aa2f1a887fa;p=ceph.git rgw/pubsub: fix recerds/event json format to match documentation Signed-off-by: Yuval Lifshitz (cherry picked from commit 11e66bd16acdda0f2a097e94c66cc25f4550b99c) --- diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index ed9da7eec1c..bb53dc9aaad 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -732,7 +732,7 @@ void RGWUserPubSub::SubWithEvents::list_events_result::dump(Formatter Formatter::ArraySection s(*f, EventType::json_type_plural); for (auto& event : events) { - encode_json(EventType::json_type_single, event, f); + encode_json("", event, f); } } diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index e73fc8b46b0..8c8f0a7fc6d 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -194,9 +194,8 @@ struct rgw_pubsub_s3_notifications { }*/ struct rgw_pubsub_s3_record { - constexpr static const char* const json_type_single = "Record"; constexpr static const char* const json_type_plural = "Records"; - // 2.1 + // 2.2 std::string eventVersion; // aws:s3 std::string eventSource; @@ -304,7 +303,6 @@ struct rgw_pubsub_s3_record { WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) struct rgw_pubsub_event { - constexpr static const char* const json_type_single = "event"; constexpr static const char* const json_type_plural = "events"; std::string id; std::string event_name; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 0c13886e01b..6b3ecef8df0 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -26,7 +26,13 @@ template std::string json_format_pubsub_event(const EventType& event) { std::stringstream ss; JSONFormatter f(false); - encode_json(EventType::json_type_single, event, &f); + { + Formatter::ObjectSection s(f, EventType::json_type_plural); + { + Formatter::ArraySection s(f, EventType::json_type_plural); + encode_json("", event, &f); + } + } f.flush(ss); return ss.str(); } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index cf1d9543ac1..3fae724a78d 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -477,7 +477,7 @@ public: PSEvent(const EventRef& _event) : event(_event) {} void format(bufferlist *bl) const { - bl->append(json_str(EventType::json_type_single, *event)); + bl->append(json_str("", *event)); } void encode_event(bufferlist& bl) const { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index bee36670eaf..9acf9e4ad92 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -247,15 +247,30 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False): err = '' for key in keys: key_found = False - for event in events: - if event['info']['bucket']['name'] == key.bucket.name and \ - event['info']['key']['name'] == key.name: - if deletions and event['event'] == 'OBJECT_DELETE': - key_found = True - break - elif not deletions and event['event'] == 'OBJECT_CREATE': - key_found = True + if type(events) is list: + for event_list in events: + if key_found: break + for event in event_list['events']: + if event['info']['bucket']['name'] == key.bucket.name and \ + event['info']['key']['name'] == key.name: + if deletions and event['event'] == 'OBJECT_DELETE': + key_found = True + break + elif not deletions and event['event'] == 'OBJECT_CREATE': + key_found = True + break + else: + for event in events['events']: + if event['info']['bucket']['name'] == key.bucket.name and \ + event['info']['key']['name'] == key.name: + if deletions and event['event'] == 'OBJECT_DELETE': + key_found = True + break + elif not deletions and event['event'] == 'OBJECT_CREATE': + key_found = True + break + if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) log.error(events) @@ -274,27 +289,44 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa err = '' for key in keys: key_found = False - for record in records: - if record['s3']['bucket']['name'] == key.bucket.name and \ - record['s3']['object']['key'] == key.name: - if deletions and 'ObjectRemoved' in record['eventName']: - key_found = True - break - elif not deletions and 'ObjectCreated' in record['eventName']: - key_found = True + if type(records) is list: + for record_list in records: + if key_found: break + for record in record_list['Records']: + if record['s3']['bucket']['name'] == key.bucket.name and \ + record['s3']['object']['key'] == key.name: + if deletions and 'ObjectRemoved' in record['eventName']: + key_found = True + break + elif not deletions and 'ObjectCreated' in record['eventName']: + key_found = True + break + else: + for record in records['Records']: + if record['s3']['bucket']['name'] == key.bucket.name and \ + record['s3']['object']['key'] == key.name: + if deletions and 'ObjectRemoved' in record['eventName']: + key_found = True + break + elif not deletions and 'ObjectCreated' in record['eventName']: + key_found = True + break + if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) - for record in records: - log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) + for record_list in records: + for record in record_list['Records']: + log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err if not len(records) == len(keys): err = 'superfluous records are found' log.warning(err) if exact_match: - for record in records: - log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) + for record_list in records: + for record in record_list['Records']: + log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err @@ -529,12 +561,12 @@ def test_ps_s3_notification_records(): # get the events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for record in parsed_result['Records']: + records = json.loads(result) + for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) # cleanup _, status = s3_notification_conf.del_config() @@ -896,8 +928,8 @@ def ps_s3_notification_filter(on_master): found_in4 = [] for event in receiver.get_and_reset_events(): - notif_id = event['s3']['configurationId'] - key_name = event['s3']['object']['key'] + notif_id = event['Records'][0]['s3']['configurationId'] + key_name = event['Records'][0]['s3']['object']['key'] if notif_id == notification_name+'_1': found_in1.append(key_name) elif notif_id == notification_name+'_2': @@ -1432,12 +1464,12 @@ def test_ps_subscription(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() @@ -1447,11 +1479,11 @@ def test_ps_subscription(): # get the delete events from the subscriptions result, _ = sub_conf.get_events() - for event in parsed_result['events']: + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: check deletions # TODO: use exact match - # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # we should see the creations as well as the deletions # delete subscription _, status = sub_conf.del_config() @@ -1530,28 +1562,28 @@ def test_ps_event_type_subscription(): # get the events from the creation subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') - assert_equal(len(parsed_result['events']), 0) + assert_equal(len(events['events']), 0) # get the events from the all events subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() @@ -1561,32 +1593,32 @@ def test_ps_event_type_subscription(): # get the events from the creations subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # deletions should not change the creation events # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # only deletions should be listed here # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + verify_events_by_elements(events, keys, exact_match=False, deletions=True) # get the events from the all events subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # both deletions and creations should be here # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False) - # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + verify_events_by_elements(events, keys, exact_match=False, deletions=False) + # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # TODO: (1) test deletions (2) test overall number of events # test subscription deletion when topic is specified @@ -1644,18 +1676,17 @@ def test_ps_event_fetching(): while True: # get the events from the subscription result, _ = sub_conf.get_events(max_events, next_marker) - parsed_result = json.loads(result) - events = parsed_result['events'] - total_events_count += len(events) - all_events.extend(events) - next_marker = parsed_result['next_marker'] - for event in events: + events = json.loads(result) + total_events_count += len(events['events']) + all_events.extend(events['events']) + next_marker = events['next_marker'] + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') if next_marker == '': break keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(all_events, keys, exact_match=False) + verify_events_by_elements({'events': all_events}, keys, exact_match=False) # cleanup sub_conf.del_config() @@ -1699,17 +1730,16 @@ def test_ps_event_acking(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - events = parsed_result['events'] + events = json.loads(result) original_number_of_events = len(events) - for event in events: + for event in events['events']: log.debug('Event (before ack) id: "' + str(event['id']) + '"') keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # ack half of the events events_to_ack = number_of_objects/2 - for event in events: + for event in events['events']: if events_to_ack == 0: break _, status = sub_conf.ack_events(event['id']) @@ -1718,10 +1748,10 @@ def test_ps_event_acking(): # verify that acked events are gone result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (after ack) id: "' + str(event['id']) + '"') - assert len(parsed_result['events']) >= (original_number_of_events - number_of_objects/2) + assert len(events) >= (original_number_of_events - number_of_objects/2) # cleanup sub_conf.del_config() @@ -1774,12 +1804,12 @@ def test_ps_creation_triggers(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart' - assert len(parsed_result['events']) >= 3 + assert len(events['events']) >= 3 # cleanup sub_conf.del_config() notification_conf.del_config() @@ -1930,13 +1960,13 @@ def test_ps_s3_multipart_on_master(): events = receiver2.get_and_reset_events() assert_equal(len(events), 1) - assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post') - assert_equal(events[0]['s3']['configurationId'], notification_name+'_2') + assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post') + assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2') events = receiver3.get_and_reset_events() assert_equal(len(events), 1) - assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') - assert_equal(events[0]['s3']['configurationId'], notification_name+'_3') + assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') + assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') # cleanup stop_amqp_receiver(receiver1, task1) @@ -2021,14 +2051,14 @@ def test_ps_versioned_deletion(): # get the delete events from the subscription result, _ = sub_conf1.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type1) result, _ = sub_conf2.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type2) @@ -2176,13 +2206,14 @@ def test_ps_s3_versioned_deletion_on_master(): events = receiver.get_and_reset_events() delete_events = 0 delete_marker_create_events = 0 - for event in events: - if event['eventName'] == 's3:ObjectRemoved:Delete': - delete_events += 1 - assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] - if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': - delete_marker_create_events += 1 - assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] + for event_list in events: + for event in event_list['Records']: + if event['eventName'] == 's3:ObjectRemoved:Delete': + delete_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] + if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': + delete_marker_create_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] # 3 key versions were deleted (v1, v2 and the deletion marker) # notified over the same topic via 2 notifications (1,3) @@ -2510,9 +2541,9 @@ def test_ps_delete_bucket(): sub_conf = PSSubscription(ps_zones[0].conn, notification_name, topic_name) result, _ = sub_conf.get_events() - parsed_result = json.loads(result) + records = json.loads(result) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) # s3 notification is deleted with bucket _, status = s3_notification_conf.get_config(notification=notification_name) @@ -2861,12 +2892,12 @@ def test_ps_s3_multiple_topics_notification(): # get the events from both of the subscription result, _ = sub_conf1.get_events() - parsed_result = json.loads(result) - for record in parsed_result['Records']: + records = json.loads(result) + for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) receiver.verify_s3_events(keys, exact_match=False) result, _ = sub_conf2.get_events() @@ -2875,7 +2906,7 @@ def test_ps_s3_multiple_topics_notification(): log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) http_server.verify_s3_events(keys, exact_match=False) # cleanup