]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub add incremental event fetching test
authorYuval Lifshitz <yuvalif@yahoo.com>
Wed, 6 Feb 2019 20:34:46 +0000 (22:34 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 6 Feb 2019 20:34:46 +0000 (22:34 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 768996c3e6cbbb3f6babfe229958bbe216dd3312..7a6727f3ddaa83508375774d65d68231cc1f6eb1 100644 (file)
@@ -87,7 +87,7 @@ def test_ps_notification():
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
-    
+
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     topic_conf.set_config()
@@ -124,7 +124,7 @@ def test_ps_notification_events():
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
-    
+
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     topic_conf.set_config()
@@ -159,7 +159,7 @@ def test_ps_subscription():
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
-    
+
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     topic_conf.set_config()
@@ -188,33 +188,26 @@ def test_ps_subscription():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
-    log.debug("Event (OBJECT_CREATE) synced")
-    # get the events from the subscriptions
+    
+    # get the create events from the subscription
     result, _ = sub_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event: " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != number_of_objects:
-        log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
-    # assert_equal(len(parsed_result['events']), number_of_objects)
+        log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
-    log.debug("Event (OBJECT_DELETE) synced")
-    # get the events from the subscriptions
+    
+    # get the delete events from the subscriptions
     result, _ = sub_conf.get_events()
     for event in parsed_result['events']:
-        log.debug("Event: " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != 2*number_of_objects:
-        log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects))
+        log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
     # we should see the creations as well as the deletions
-    # assert_equal(len(parsed_result['events']), number_of_objects*2)
+    # TODO: deletion events are not counted for, should be number_of_objects*2
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # delete subscription
     _, status = sub_conf.del_config()
     assert_equal(status/100, 2)
@@ -234,7 +227,7 @@ def test_ps_event_type_subscription():
     """ test subscriptions for different events """
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
-    
+
     # create topic for objects creation
     topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
     topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
@@ -288,72 +281,59 @@ def test_ps_event_type_subscription():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
-    log.debug("Event (OBJECT_CREATE) synced")
-    # get the events from the creations subscription
+    
+    # get the events from the creation subscription
     result, _ = sub_create_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_CREATE): " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != number_of_objects:
-        log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
-    # assert_equal(len(parsed_result['events']), number_of_objects)
+        log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
+                  '" type: "' + str(event['event']) +'"')
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_DELETE): " + str(event))
+        log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+                  '" type: "' + str(event['event']) +'"')
     assert_equal(len(parsed_result['events']), 0)
     # get the events from the all events subscription
     result, _ = sub_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != number_of_objects:
-        log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
-    # assert_equal(len(parsed_result['events']), number_of_objects)
+        log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \
+                  str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     log.debug("Event (OBJECT_DELETE) synced")
+    
     # get the events from the creations subscription
     result, _ = sub_create_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_CREATE): " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != number_of_objects:
-        log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
-    # deletions should not change the number of events
-    # assert_equal(len(parsed_result['events']), number_of_objects)
+        log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
+                  '" type: "' + str(event['event']) +'"')
+    # deletions should not change the number of creation events
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_DELETE): " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != number_of_objects:
-        log.error('wrong number of OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
+        log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+                  '" type: "' + str(event['event']) +'"')
     # only deletions should be counted here
-    assert_equal(len(parsed_result['events']), number_of_objects)
+    assert_equal(len(parsed_result['events']), number_of_objects)
     # get the events from the all events subscription
     result, _ = sub_create_conf.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
-        log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event))
-    # TODO use the exact assert
-    assert_not_equal(len(parsed_result['events']), 0)
-    if len(parsed_result['events']) != 2*number_of_objects:
-        log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects))
-    # we should see the creations as well as the deletions
-    # assert_equal(len(parsed_result['events']), number_of_objects*2)
+        log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+                  '" type: "' + str(event['event']) +'"')
+    # TODO deleted events are not accounted for, should be number_of_objects*2
+    assert_equal(len(parsed_result['events']), number_of_objects)
 
     # cleanup
     sub_create_conf.del_config()
@@ -366,3 +346,62 @@ def test_ps_event_type_subscription():
     topic_delete_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_event_fetching():
+    """ test incremental fetching of events from a subscription """
+    zones, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create topic
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf.set_config()
+    # create bucket on the first of the rados zones
+    bucket = zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # create notifications
+    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+                                       topic_name)
+    _, status = notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create subscription
+    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+                              topic_name)
+    _, status = sub_conf.set_config()
+    assert_equal(status/100, 2)
+    # get the subscription
+    result, _ = sub_conf.get_config()
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic'], topic_name)
+    # create objects in the bucket
+    number_of_objects = 100
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    max_events = 15
+    total_events = 0
+    next_marker = None
+    while True:
+        # get the events from the subscription
+        result, _ = sub_conf.get_events(max_events, next_marker)
+        parsed_result = json.loads(result)
+        total_events += len(parsed_result['events'])
+        next_marker = parsed_result['next_marker']
+        for event in parsed_result['events']:
+            log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+        if next_marker == '':
+            break
+    # TODO numbers dont match, should be ==
+    assert total_events >= number_of_objects
+
+    # cleanup
+    sub_conf.del_config()
+    notification_conf.del_config()
+    topic_conf.del_config()
+    for key in bucket.list():
+        key.delete()
+    zones[0].delete_bucket(bucket_name)
index c0e3934564aa5226e3e90fe2803efe3193544e35..e720d10ee3eae2952e044daac3000dfa41bdd001 100644 (file)
@@ -1,3 +1,4 @@
+import logging
 import httplib
 import urllib
 import hmac
@@ -6,6 +7,8 @@ import base64
 from time import gmtime, strftime
 from multisite import Zone
 
+log = logging.getLogger('rgw_multi.tests')
+
 
 class PSZone(Zone):  # pylint: disable=too-many-ancestors
     """ PubSub zone class """
@@ -47,7 +50,8 @@ def make_request(conn, method, resource, parameters=None):
                'Date': string_date,
                'Host': conn.host+':'+str(conn.port)}
     http_conn = httplib.HTTPConnection(conn.host, conn.port)
-    http_conn.set_debuglevel(5)
+    # TODO set http log level from regular log level
+    # http_conn.set_debuglevel(log.getEffectiveLevel())
     http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
     response = http_conn.getresponse()
     data = response.read()
@@ -154,7 +158,7 @@ class PSSubscription:
         if max_entries is not None:
             parameters['max-entries'] = max_entries
         if marker is not None:
-            parameters['market'] = marker
+            parameters['marker'] = marker
         return self.send_request('GET', parameters)
 
     def ack_events(self, event_id):