From b0cdb195fe3b8a7b8885fb1c57307611342687a9 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 19 Feb 2019 19:18:01 +0200 Subject: [PATCH] rgw: pubsub tests. use bukcet checkpoint. allow redundant events Signed-off-by: Yuval Lifshitz --- src/test/rgw/rgw_multi/tests.py | 4 +- src/test/rgw/rgw_multi/tests_ps.py | 147 ++++++++++++++++++++--------- 2 files changed, 106 insertions(+), 45 deletions(-) diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index e7e3d668ed59b..22f8f5a14a2a7 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -281,7 +281,7 @@ def bucket_sync_status(target_zone, source_zone, bucket_name): def data_source_log_status(source_zone): source_cluster = source_zone.cluster cmd = ['datalog', 'status'] + source_zone.zone_args() - datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True) + datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True) datalog_status = json.loads(datalog_status_json.decode('utf-8')) markers = {i: s['marker'] for i, s in enumerate(datalog_status)} @@ -346,7 +346,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn return True -def zone_data_checkpoint(target_zone, source_zone_conn): +def zone_data_checkpoint(target_zone, source_zone): if target_zone == source_zone: return diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 118c70ac198e6..38d7088182f13 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -5,6 +5,9 @@ from rgw_multi.tests import get_realm, \ ZonegroupConns, \ zonegroup_meta_checkpoint, \ zone_meta_checkpoint, \ + zone_bucket_checkpoint, \ + zone_data_checkpoint, \ + check_bucket_eq, \ gen_bucket_name from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription from nose import SkipTest @@ -13,6 +16,11 @@ from nose.tools import assert_not_equal, assert_equal # configure logging for the tests module log = logging.getLogger('rgw_multi.tests') +#################################### +# utility functions for pubsub tests +#################################### + + def check_ps_configured(): """check if at least one pubsub zone exist""" realm = get_realm() @@ -30,6 +38,33 @@ def is_ps_zone(zone_conn): return zone_conn.zone.tier_type() == "pubsub" +def verify_events_by_elements(events, keys, exact_match=False, deletions=False): + """ verify there is at least one event per element """ + err = '' + for key in keys: + key_found = False + for event in events: + if event['info']['bucket']['name'] == key.bucket.name and \ + event['info']['key']['name'] == key.name: + if deletions and event['event'] == 'OBJECT_DELETE': + key_found = True + break + elif not deletions and event['event'] == 'OBJECT_CREATE': + key_found = True + break + if not key_found: + err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) + log.error(events) + assert False, err + + if not len(events) == len(keys): + err = 'superfluous events are found' + log.debug(err) + if exact_match: + log.error(events) + assert False, err + + def init_env(): """initialize the environment""" check_ps_configured() @@ -57,6 +92,10 @@ def init_env(): TOPIC_SUFFIX = "_topic" SUB_SUFFIX = "_sub" +############## +# pubsub tests +############## + def test_ps_topic(): """ test set/get/delete of topic """ @@ -188,27 +227,30 @@ def test_ps_subscription(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + # get the create events from the subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') - assert_equal(len(parsed_result['events']), number_of_objects) + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') + keys = list(bucket.list()) + # TODO: set exact_match to true + verify_events_by_elements(parsed_result['events'], keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zones[0].zone) - + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + # get the delete events from the subscriptions result, _ = sub_conf.get_events() for event in parsed_result['events']: - log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') + # TODO: check deletions + # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) # we should see the creations as well as the deletions - # TODO: deletion events are not counted for, should be number_of_objects*2 - assert_equal(len(parsed_result['events']), number_of_objects) # delete subscription _, status = sub_conf.del_config() assert_equal(status/100, 2) @@ -245,6 +287,7 @@ def test_ps_event_type_subscription(): bucket = zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # create notifications for objects creation notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name, topic_create_name, "OBJECT_CREATE") @@ -281,34 +324,37 @@ def test_ps_event_type_subscription(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # get the events from the creation subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \ - '" type: "' + str(event['event']) +'"') - assert_equal(len(parsed_result['events']), number_of_objects) + '" type: "' + str(event['event']) + '"') + keys = list(bucket.list()) + # TODO: set exact_match to true + verify_events_by_elements(parsed_result['events'], keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ - '" type: "' + str(event['event']) +'"') + '" type: "' + str(event['event']) + '"') assert_equal(len(parsed_result['events']), 0) # get the events from the all events subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \ - str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') - assert_equal(len(parsed_result['events']), number_of_objects) + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') + # TODO: set exact_match to true + verify_events_by_elements(parsed_result['events'], keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) log.debug("Event (OBJECT_DELETE) synced") # get the events from the creations subscription @@ -316,25 +362,29 @@ def test_ps_event_type_subscription(): parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \ - '" type: "' + str(event['event']) +'"') - # deletions should not change the number of creation events - assert_equal(len(parsed_result['events']), number_of_objects) + '" type: "' + str(event['event']) + '"') + # deletions should not change the creation events + # TODO: set exact_match to true + verify_events_by_elements(parsed_result['events'], keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ - '" type: "' + str(event['event']) +'"') - # only deletions should be counted here - assert_equal(len(parsed_result['events']), number_of_objects) + '" type: "' + str(event['event']) + '"') + # only deletions should be listed here + # TODO: set exact_match to true + verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) # get the events from the all events subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ - '" type: "' + str(event['event']) +'"') - # TODO deleted events are not accounted for, should be number_of_objects*2 - assert_equal(len(parsed_result['events']), number_of_objects) + '" type: "' + str(event['event']) + '"') + # both deletions and creations should be here + verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False) + # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True) + # TODO: (1) test deletions (2) test overall number of events # cleanup sub_create_conf.del_config() @@ -378,22 +428,26 @@ def test_ps_event_fetching(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) max_events = 15 - total_events = 0 + total_events_count = 0 next_marker = None + all_events = [] while True: # get the events from the subscription result, _ = sub_conf.get_events(max_events, next_marker) parsed_result = json.loads(result) - total_events += len(parsed_result['events']) + events = parsed_result['events'] + total_events_count += len(events) + all_events.extend(events) next_marker = parsed_result['next_marker'] - for event in parsed_result['events']: - log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + for event in events: + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') if next_marker == '': break - # TODO numbers dont match, should be == - assert total_events >= number_of_objects + keys = list(bucket.list()) + # TODO: set exact_match to true + verify_events_by_elements(all_events, keys, exact_match=False) # cleanup sub_conf.del_config() @@ -433,17 +487,21 @@ def test_ps_event_acking(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) - for event in parsed_result['events']: + events = parsed_result['events'] + original_number_of_events = len(events) + for event in events: log.debug('Event (before ack) id: "' + str(event['id']) + '"') - assert_equal(len(parsed_result['events']), number_of_objects) + keys = list(bucket.list()) + # TODO: set exact_match to true + verify_events_by_elements(events, keys, exact_match=False) # ack half of the events events_to_ack = number_of_objects/2 - for event in parsed_result['events']: + for event in events: if events_to_ack == 0: break _, status = sub_conf.ack_events(event['id']) @@ -455,7 +513,7 @@ def test_ps_event_acking(): parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event (after ack) id: "' + str(event['id']) + '"') - assert_equal(len(parsed_result['events']), number_of_objects - number_of_objects/2) + assert_equal(len(parsed_result['events']), original_number_of_events - number_of_objects/2) # cleanup sub_conf.del_config() @@ -503,15 +561,16 @@ def test_ps_creation_triggers(): uploader.complete_upload() fp.close() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') - assert_equal(len(parsed_result['events']), 3) + # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart' + assert len(parsed_result['events']) >= 3 # cleanup sub_conf.del_config() notification_conf.del_config() @@ -552,20 +611,22 @@ def test_ps_versioned_deletion(): key.set_contents_from_string('kaboom') v2 = key.version_id # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # set delete markers bucket.delete_key(key.name, version_id=v2) bucket.delete_key(key.name, version_id=v1) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') - assert_equal(len(parsed_result['events']), 2) + # TODO: verify the specific events + assert len(parsed_result['events']) >= 2 + # cleanup sub_conf.del_config() notification_conf.del_config() -- 2.47.3