zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
-
+
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
topic_conf.set_config()
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
-
+
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
topic_conf.set_config()
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
-
+
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
topic_conf.set_config()
key.set_contents_from_string('bar')
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
- log.debug("Event (OBJECT_CREATE) synced")
- # get the events from the subscriptions
+
+ # get the create events from the subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event: " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != number_of_objects:
- log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
- # assert_equal(len(parsed_result['events']), number_of_objects)
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ assert_equal(len(parsed_result['events']), number_of_objects)
# delete objects from the bucket
for key in bucket.list():
key.delete()
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
- log.debug("Event (OBJECT_DELETE) synced")
- # get the events from the subscriptions
+
+ # get the delete events from the subscriptions
result, _ = sub_conf.get_events()
for event in parsed_result['events']:
- log.debug("Event: " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != 2*number_of_objects:
- log.error('wrong number of events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects))
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
# we should see the creations as well as the deletions
- # assert_equal(len(parsed_result['events']), number_of_objects*2)
+ # TODO: deletion events are not counted for, should be number_of_objects*2
+ assert_equal(len(parsed_result['events']), number_of_objects)
# delete subscription
_, status = sub_conf.del_config()
assert_equal(status/100, 2)
""" test subscriptions for different events """
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
-
+
# create topic for objects creation
topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
key.set_contents_from_string('bar')
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
- log.debug("Event (OBJECT_CREATE) synced")
- # get the events from the creations subscription
+
+ # get the events from the creation subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_CREATE): " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != number_of_objects:
- log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
- # assert_equal(len(parsed_result['events']), number_of_objects)
+ log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
+ '" type: "' + str(event['event']) +'"')
+ assert_equal(len(parsed_result['events']), number_of_objects)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_DELETE): " + str(event))
+ log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+ '" type: "' + str(event['event']) +'"')
assert_equal(len(parsed_result['events']), 0)
# get the events from the all events subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != number_of_objects:
- log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
- # assert_equal(len(parsed_result['events']), number_of_objects)
+ log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \
+ str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ assert_equal(len(parsed_result['events']), number_of_objects)
# delete objects from the bucket
for key in bucket.list():
key.delete()
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
log.debug("Event (OBJECT_DELETE) synced")
+
# get the events from the creations subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_CREATE): " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != number_of_objects:
- log.error('wrong number of OBJECT_CREATE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
- # deletions should not change the number of events
- # assert_equal(len(parsed_result['events']), number_of_objects)
+ log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
+ '" type: "' + str(event['event']) +'"')
+ # deletions should not change the number of creation events
+ assert_equal(len(parsed_result['events']), number_of_objects)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_DELETE): " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != number_of_objects:
- log.error('wrong number of OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(number_of_objects))
+ log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+ '" type: "' + str(event['event']) +'"')
# only deletions should be counted here
- # assert_equal(len(parsed_result['events']), number_of_objects)
+ assert_equal(len(parsed_result['events']), number_of_objects)
# get the events from the all events subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug("Event (OBJECT_CREATE,OBJECT_DELETE): " + str(event))
- # TODO use the exact assert
- assert_not_equal(len(parsed_result['events']), 0)
- if len(parsed_result['events']) != 2*number_of_objects:
- log.error('wrong number of OBJECT_CREATE,OBJECT_DELETE events: ' + str(len(parsed_result['events'])) + ' should be: ' + str(2*number_of_objects))
- # we should see the creations as well as the deletions
- # assert_equal(len(parsed_result['events']), number_of_objects*2)
+ log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
+ '" type: "' + str(event['event']) +'"')
+ # TODO deleted events are not accounted for, should be number_of_objects*2
+ assert_equal(len(parsed_result['events']), number_of_objects)
# cleanup
sub_create_conf.del_config()
topic_delete_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_event_fetching():
+ """ test incremental fetching of events from a subscription """
+ zones, ps_zones = init_env()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+
+ # create topic
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf.set_config()
+ # create bucket on the first of the rados zones
+ bucket = zones[0].create_bucket(bucket_name)
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+ # create notifications
+ notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ topic_name)
+ _, status = notification_conf.set_config()
+ assert_equal(status/100, 2)
+ # create subscription
+ sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ topic_name)
+ _, status = sub_conf.set_config()
+ assert_equal(status/100, 2)
+ # get the subscription
+ result, _ = sub_conf.get_config()
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic'], topic_name)
+ # create objects in the bucket
+ number_of_objects = 100
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+ max_events = 15
+ total_events = 0
+ next_marker = None
+ while True:
+ # get the events from the subscription
+ result, _ = sub_conf.get_events(max_events, next_marker)
+ parsed_result = json.loads(result)
+ total_events += len(parsed_result['events'])
+ next_marker = parsed_result['next_marker']
+ for event in parsed_result['events']:
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ if next_marker == '':
+ break
+ # TODO numbers dont match, should be ==
+ assert total_events >= number_of_objects
+
+ # cleanup
+ sub_conf.del_config()
+ notification_conf.del_config()
+ topic_conf.del_config()
+ for key in bucket.list():
+ key.delete()
+ zones[0].delete_bucket(bucket_name)