From 1909fca65de8c2d1fce881186faddef9a5dcb13c Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 6 Feb 2019 22:34:46 +0200 Subject: [PATCH] rgw: pubsub add incremental event fetching test Signed-off-by: Yuval Lifshitz --- src/test/rgw/rgw_multi/tests_ps.py | 149 ++++++++++++++++++----------- src/test/rgw/rgw_multi/zone_ps.py | 8 +- 2 files changed, 100 insertions(+), 57 deletions(-) diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 768996c3e6cbb..7a6727f3ddaa8 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -87,7 +87,7 @@ def test_ps_notification(): zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX - + # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name) topic_conf.set_config() @@ -124,7 +124,7 @@ def test_ps_notification_events(): zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX - + # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name) topic_conf.set_config() @@ -159,7 +159,7 @@ def test_ps_subscription(): zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX - + # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name) topic_conf.set_config() @@ -188,33 +188,26 @@ def test_ps_subscription(): key.set_contents_from_string('bar') # wait for sync zone_meta_checkpoint(ps_zones[0].zone) - log.debug("Event (OBJECT_CREATE) synced") - # get the events from the subscriptions + + # get the create events from the subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event: " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != number_of_objects: - log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects)) - # assert_equal(len(parsed_result['events']), number_of_objects) + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + assert_equal(len(parsed_result['events']), number_of_objects) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zones[0].zone) - log.debug("Event (OBJECT_DELETE) synced") - # get the events from the subscriptions + + # get the delete events from the subscriptions result, _ = sub_conf.get_events() for event in parsed_result['events']: - log.debug("Event: " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != 2*number_of_objects: - log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects)) + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') # we should see the creations as well as the deletions - # assert_equal(len(parsed_result['events']), number_of_objects*2) + # TODO: deletion events are not counted for, should be number_of_objects*2 + assert_equal(len(parsed_result['events']), number_of_objects) # delete subscription _, status = sub_conf.del_config() assert_equal(status/100, 2) @@ -234,7 +227,7 @@ def test_ps_event_type_subscription(): """ test subscriptions for different events """ zones, ps_zones = init_env() bucket_name = gen_bucket_name() - + # create topic for objects creation topic_create_name = bucket_name+TOPIC_SUFFIX+'_create' topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name) @@ -288,72 +281,59 @@ def test_ps_event_type_subscription(): key.set_contents_from_string('bar') # wait for sync zone_meta_checkpoint(ps_zones[0].zone) - log.debug("Event (OBJECT_CREATE) synced") - # get the events from the creations subscription + + # get the events from the creation subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_CREATE): " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != number_of_objects: - log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects)) - # assert_equal(len(parsed_result['events']), number_of_objects) + log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \ + '" type: "' + str(event['event']) +'"') + assert_equal(len(parsed_result['events']), number_of_objects) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_DELETE): " + str(event)) + log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ + '" type: "' + str(event['event']) +'"') assert_equal(len(parsed_result['events']), 0) # get the events from the all events subscription result, _ = sub_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != number_of_objects: - log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects)) - # assert_equal(len(parsed_result['events']), number_of_objects) + log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \ + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + assert_equal(len(parsed_result['events']), number_of_objects) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zones[0].zone) log.debug("Event (OBJECT_DELETE) synced") + # get the events from the creations subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_CREATE): " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != number_of_objects: - log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects)) - # deletions should not change the number of events - # assert_equal(len(parsed_result['events']), number_of_objects) + log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \ + '" type: "' + str(event['event']) +'"') + # deletions should not change the number of creation events + assert_equal(len(parsed_result['events']), number_of_objects) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_DELETE): " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != number_of_objects: - log.error('wrong number of OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects)) + log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ + '" type: "' + str(event['event']) +'"') # only deletions should be counted here - # assert_equal(len(parsed_result['events']), number_of_objects) + assert_equal(len(parsed_result['events']), number_of_objects) # get the events from the all events subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: - log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event)) - # TODO use the exact assert - assert_not_equal(len(parsed_result['events']), 0) - if len(parsed_result['events']) != 2*number_of_objects: - log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects)) - # we should see the creations as well as the deletions - # assert_equal(len(parsed_result['events']), number_of_objects*2) + log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \ + '" type: "' + str(event['event']) +'"') + # TODO deleted events are not accounted for, should be number_of_objects*2 + assert_equal(len(parsed_result['events']), number_of_objects) # cleanup sub_create_conf.del_config() @@ -366,3 +346,62 @@ def test_ps_event_type_subscription(): topic_delete_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + + +def test_ps_event_fetching(): + """ test incremental fetching of events from a subscription """ + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name = bucket_name+TOPIC_SUFFIX + + # create topic + topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf.set_config() + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create notifications + notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + topic_name) + _, status = notification_conf.set_config() + assert_equal(status/100, 2) + # create subscription + sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + topic_name) + _, status = sub_conf.set_config() + assert_equal(status/100, 2) + # get the subscription + result, _ = sub_conf.get_config() + parsed_result = json.loads(result) + assert_equal(parsed_result['topic'], topic_name) + # create objects in the bucket + number_of_objects = 100 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + max_events = 15 + total_events = 0 + next_marker = None + while True: + # get the events from the subscription + result, _ = sub_conf.get_events(max_events, next_marker) + parsed_result = json.loads(result) + total_events += len(parsed_result['events']) + next_marker = parsed_result['next_marker'] + for event in parsed_result['events']: + log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"') + if next_marker == '': + break + # TODO numbers dont match, should be == + assert total_events >= number_of_objects + + # cleanup + sub_conf.del_config() + notification_conf.del_config() + topic_conf.del_config() + for key in bucket.list(): + key.delete() + zones[0].delete_bucket(bucket_name) diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index c0e3934564aa5..e720d10ee3eae 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -1,3 +1,4 @@ +import logging import httplib import urllib import hmac @@ -6,6 +7,8 @@ import base64 from time import gmtime, strftime from multisite import Zone +log = logging.getLogger('rgw_multi.tests') + class PSZone(Zone): # pylint: disable=too-many-ancestors """ PubSub zone class """ @@ -47,7 +50,8 @@ def make_request(conn, method, resource, parameters=None): 'Date': string_date, 'Host': conn.host+':'+str(conn.port)} http_conn = httplib.HTTPConnection(conn.host, conn.port) - http_conn.set_debuglevel(5) + # TODO set http log level from regular log level + # http_conn.set_debuglevel(log.getEffectiveLevel()) http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers) response = http_conn.getresponse() data = response.read() @@ -154,7 +158,7 @@ class PSSubscription: if max_entries is not None: parameters['max-entries'] = max_entries if marker is not None: - parameters['market'] = marker + parameters['marker'] = marker return self.send_request('GET', parameters) def ack_events(self, event_id): -- 2.39.5