From 11e66bd16acdda0f2a097e94c66cc25f4550b99c Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Fri, 29 Nov 2019 07:31:07 +0200 Subject: [PATCH] rgw/pubsub: fix recerds/event json format to match documentation Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_pubsub.cc | 2 +- src/rgw/rgw_pubsub.h | 4 +- src/rgw/rgw_pubsub_push.cc | 8 +- src/rgw/rgw_sync_module_pubsub.cc | 2 +- src/test/rgw/rgw_multi/tests_ps.py | 207 +++++++++++++++++------------ 5 files changed, 129 insertions(+), 94 deletions(-) diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 6b439eb6ad1..6c2f6cd74cc 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -738,7 +738,7 @@ void RGWUserPubSub::SubWithEvents::list_events_result::dump(Formatter Formatter::ArraySection s(*f, EventType::json_type_plural); for (auto& event : events) { - encode_json(EventType::json_type_single, event, f); + encode_json("", event, f); } } diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 36edcd36cae..1270124c6dc 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -194,9 +194,8 @@ struct rgw_pubsub_s3_notifications { }*/ struct rgw_pubsub_s3_record { - constexpr static const char* const json_type_single = "Record"; constexpr static const char* const json_type_plural = "Records"; - // 2.1 + // 2.2 std::string eventVersion; // aws:s3 std::string eventSource; @@ -304,7 +303,6 @@ struct rgw_pubsub_s3_record { WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) struct rgw_pubsub_event { - constexpr static const char* const json_type_single = "event"; constexpr static const char* const json_type_plural = "events"; std::string id; std::string event_name; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index cef5ae6318f..4d0496687ad 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -29,7 +29,13 @@ template std::string json_format_pubsub_event(const EventType& event) { std::stringstream ss; JSONFormatter f(false); - encode_json(EventType::json_type_single, event, &f); + { + Formatter::ObjectSection s(f, EventType::json_type_plural); + { + Formatter::ArraySection s(f, EventType::json_type_plural); + encode_json("", event, &f); + } + } f.flush(ss); return ss.str(); } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 4ad59837d4a..598eee6e41c 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -484,7 +484,7 @@ public: PSEvent(const EventRef& _event) : event(_event) {} void format(bufferlist *bl) const { - bl->append(json_str(EventType::json_type_single, *event)); + bl->append(json_str("", *event)); } void encode_event(bufferlist& bl) const { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index ebac97d3dce..bedd189bc63 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -28,7 +28,7 @@ from nose.tools import assert_not_equal, assert_equal # configure logging for the tests module log = logging.getLogger(__name__) -skip_push_tests = False +skip_push_tests = True #################################### # utility functions for pubsub tests @@ -247,15 +247,30 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False): err = '' for key in keys: key_found = False - for event in events: - if event['info']['bucket']['name'] == key.bucket.name and \ - event['info']['key']['name'] == key.name: - if deletions and event['event'] == 'OBJECT_DELETE': - key_found = True - break - elif not deletions and event['event'] == 'OBJECT_CREATE': - key_found = True + if type(events) is list: + for event_list in events: + if key_found: break + for event in event_list['events']: + if event['info']['bucket']['name'] == key.bucket.name and \ + event['info']['key']['name'] == key.name: + if deletions and event['event'] == 'OBJECT_DELETE': + key_found = True + break + elif not deletions and event['event'] == 'OBJECT_CREATE': + key_found = True + break + else: + for event in events['events']: + if event['info']['bucket']['name'] == key.bucket.name and \ + event['info']['key']['name'] == key.name: + if deletions and event['event'] == 'OBJECT_DELETE': + key_found = True + break + elif not deletions and event['event'] == 'OBJECT_CREATE': + key_found = True + break + if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) log.error(events) @@ -274,27 +289,44 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa err = '' for key in keys: key_found = False - for record in records: - if record['s3']['bucket']['name'] == key.bucket.name and \ - record['s3']['object']['key'] == key.name: - if deletions and 'ObjectRemoved' in record['eventName']: - key_found = True - break - elif not deletions and 'ObjectCreated' in record['eventName']: - key_found = True + if type(records) is list: + for record_list in records: + if key_found: break + for record in record_list['Records']: + if record['s3']['bucket']['name'] == key.bucket.name and \ + record['s3']['object']['key'] == key.name: + if deletions and 'ObjectRemoved' in record['eventName']: + key_found = True + break + elif not deletions and 'ObjectCreated' in record['eventName']: + key_found = True + break + else: + for record in records['Records']: + if record['s3']['bucket']['name'] == key.bucket.name and \ + record['s3']['object']['key'] == key.name: + if deletions and 'ObjectRemoved' in record['eventName']: + key_found = True + break + elif not deletions and 'ObjectCreated' in record['eventName']: + key_found = True + break + if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) - for record in records: - log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) + for record_list in records: + for record in record_list['Records']: + log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err if not len(records) == len(keys): err = 'superfluous records are found' log.warning(err) if exact_match: - for record in records: - log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) + for record_list in records: + for record in record_list['Records']: + log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err @@ -654,12 +686,12 @@ def test_ps_s3_notification_records(): # get the events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for record in parsed_result['Records']: + records = json.loads(result) + for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) # cleanup _, status = s3_notification_conf.del_config() @@ -1021,8 +1053,8 @@ def ps_s3_notification_filter(on_master): found_in4 = [] for event in receiver.get_and_reset_events(): - notif_id = event['s3']['configurationId'] - key_name = event['s3']['object']['key'] + notif_id = event['Records'][0]['s3']['configurationId'] + key_name = event['Records'][0]['s3']['object']['key'] if notif_id == notification_name+'_1': found_in1.append(key_name) elif notif_id == notification_name+'_2': @@ -1721,12 +1753,12 @@ def test_ps_subscription(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() @@ -1736,11 +1768,11 @@ def test_ps_subscription(): # get the delete events from the subscriptions result, _ = sub_conf.get_events() - for event in parsed_result['events']: + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: check deletions # TODO: use exact match - # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # we should see the creations as well as the deletions # delete subscription _, status = sub_conf.del_config() @@ -1819,28 +1851,28 @@ def test_ps_event_type_subscription(): # get the events from the creation subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') - assert_equal(len(parsed_result['events']), 0) + assert_equal(len(events['events']), 0) # get the events from the all events subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() @@ -1850,32 +1882,32 @@ def test_ps_event_type_subscription(): # get the events from the creations subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # deletions should not change the creation events # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False) + verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # only deletions should be listed here # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + verify_events_by_elements(events, keys, exact_match=False, deletions=True) # get the events from the all events subscription result, _ = sub_create_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # both deletions and creations should be here # TODO: use exact match - verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False) - # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + verify_events_by_elements(events, keys, exact_match=False, deletions=False) + # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # TODO: (1) test deletions (2) test overall number of events # test subscription deletion when topic is specified @@ -1933,18 +1965,17 @@ def test_ps_event_fetching(): while True: # get the events from the subscription result, _ = sub_conf.get_events(max_events, next_marker) - parsed_result = json.loads(result) - events = parsed_result['events'] - total_events_count += len(events) - all_events.extend(events) - next_marker = parsed_result['next_marker'] - for event in events: + events = json.loads(result) + total_events_count += len(events['events']) + all_events.extend(events['events']) + next_marker = events['next_marker'] + for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') if next_marker == '': break keys = list(bucket.list()) # TODO: use exact match - verify_events_by_elements(all_events, keys, exact_match=False) + verify_events_by_elements({'events': all_events}, keys, exact_match=False) # cleanup sub_conf.del_config() @@ -1988,17 +2019,16 @@ def test_ps_event_acking(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - events = parsed_result['events'] + events = json.loads(result) original_number_of_events = len(events) - for event in events: + for event in events['events']: log.debug('Event (before ack) id: "' + str(event['id']) + '"') keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # ack half of the events events_to_ack = number_of_objects/2 - for event in events: + for event in events['events']: if events_to_ack == 0: break _, status = sub_conf.ack_events(event['id']) @@ -2007,10 +2037,10 @@ def test_ps_event_acking(): # verify that acked events are gone result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event (after ack) id: "' + str(event['id']) + '"') - assert len(parsed_result['events']) >= (original_number_of_events - number_of_objects/2) + assert len(events) >= (original_number_of_events - number_of_objects/2) # cleanup sub_conf.del_config() @@ -2063,12 +2093,12 @@ def test_ps_creation_triggers(): # get the create events from the subscription result, _ = sub_conf.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart' - assert len(parsed_result['events']) >= 3 + assert len(events['events']) >= 3 # cleanup sub_conf.del_config() notification_conf.del_config() @@ -2219,13 +2249,13 @@ def test_ps_s3_multipart_on_master(): events = receiver2.get_and_reset_events() assert_equal(len(events), 1) - assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post') - assert_equal(events[0]['s3']['configurationId'], notification_name+'_2') + assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post') + assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2') events = receiver3.get_and_reset_events() assert_equal(len(events), 1) - assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') - assert_equal(events[0]['s3']['configurationId'], notification_name+'_3') + assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') + assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') # cleanup stop_amqp_receiver(receiver1, task1) @@ -2310,14 +2340,14 @@ def test_ps_versioned_deletion(): # get the delete events from the subscription result, _ = sub_conf1.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type1) result, _ = sub_conf2.get_events() - parsed_result = json.loads(result) - for event in parsed_result['events']: + events = json.loads(result) + for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type2) @@ -2465,13 +2495,14 @@ def test_ps_s3_versioned_deletion_on_master(): events = receiver.get_and_reset_events() delete_events = 0 delete_marker_create_events = 0 - for event in events: - if event['eventName'] == 's3:ObjectRemoved:Delete': - delete_events += 1 - assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] - if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': - delete_marker_create_events += 1 - assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] + for event_list in events: + for event in event_list['Records']: + if event['eventName'] == 's3:ObjectRemoved:Delete': + delete_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] + if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': + delete_marker_create_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] # 3 key versions were deleted (v1, v2 and the deletion marker) # notified over the same topic via 2 notifications (1,3) @@ -2799,9 +2830,9 @@ def test_ps_delete_bucket(): sub_conf = PSSubscription(ps_zones[0].conn, notification_name, topic_name) result, _ = sub_conf.get_events() - parsed_result = json.loads(result) + records = json.loads(result) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) # s3 notification is deleted with bucket _, status = s3_notification_conf.get_config(notification=notification_name) @@ -3150,12 +3181,12 @@ def test_ps_s3_multiple_topics_notification(): # get the events from both of the subscription result, _ = sub_conf1.get_events() - parsed_result = json.loads(result) - for record in parsed_result['Records']: + records = json.loads(result) + for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) receiver.verify_s3_events(keys, exact_match=False) result, _ = sub_conf2.get_events() @@ -3164,7 +3195,7 @@ def test_ps_s3_multiple_topics_notification(): log.debug(record) keys = list(bucket.list()) # TODO: use exact match - verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + verify_s3_records_by_elements(records, keys, exact_match=False) http_server.verify_s3_events(keys, exact_match=False) # cleanup -- 2.39.5