ZonegroupConns, \
zonegroup_meta_checkpoint, \
zone_meta_checkpoint, \
+ zone_bucket_checkpoint, \
+ zone_data_checkpoint, \
+ check_bucket_eq, \
gen_bucket_name
from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription
from nose import SkipTest
# configure logging for the tests module
log = logging.getLogger('rgw_multi.tests')
+####################################
+# utility functions for pubsub tests
+####################################
+
+
def check_ps_configured():
"""check if at least one pubsub zone exist"""
realm = get_realm()
return zone_conn.zone.tier_type() == "pubsub"
+def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
+ """ verify there is at least one event per element """
+ err = ''
+ for key in keys:
+ key_found = False
+ for event in events:
+ if event['info']['bucket']['name'] == key.bucket.name and \
+ event['info']['key']['name'] == key.name:
+ if deletions and event['event'] == 'OBJECT_DELETE':
+ key_found = True
+ break
+ elif not deletions and event['event'] == 'OBJECT_CREATE':
+ key_found = True
+ break
+ if not key_found:
+ err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
+ log.error(events)
+ assert False, err
+
+ if not len(events) == len(keys):
+ err = 'superfluous events are found'
+ log.debug(err)
+ if exact_match:
+ log.error(events)
+ assert False, err
+
+
def init_env():
"""initialize the environment"""
check_ps_configured()
TOPIC_SUFFIX = "_topic"
SUB_SUFFIX = "_sub"
+##############
+# pubsub tests
+##############
+
def test_ps_topic():
""" test set/get/delete of topic """
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
-
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
# get the create events from the subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
- assert_equal(len(parsed_result['events']), number_of_objects)
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
key.delete()
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
-
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
# get the delete events from the subscriptions
result, _ = sub_conf.get_events()
for event in parsed_result['events']:
- log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+ # TODO: check deletions
+ # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
# we should see the creations as well as the deletions
- # TODO: deletion events are not counted for, should be number_of_objects*2
- assert_equal(len(parsed_result['events']), number_of_objects)
# delete subscription
_, status = sub_conf.del_config()
assert_equal(status/100, 2)
bucket = zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# create notifications for objects creation
notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name,
topic_create_name, "OBJECT_CREATE")
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the events from the creation subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
- '" type: "' + str(event['event']) +'"')
- assert_equal(len(parsed_result['events']), number_of_objects)
+ '" type: "' + str(event['event']) + '"')
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
- '" type: "' + str(event['event']) +'"')
+ '" type: "' + str(event['event']) + '"')
assert_equal(len(parsed_result['events']), 0)
# get the events from the all events subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \
- str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
- assert_equal(len(parsed_result['events']), number_of_objects)
+ str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+ # TODO: set exact_match to true
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
log.debug("Event (OBJECT_DELETE) synced")
# get the events from the creations subscription
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
- '" type: "' + str(event['event']) +'"')
- # deletions should not change the number of creation events
- assert_equal(len(parsed_result['events']), number_of_objects)
+ '" type: "' + str(event['event']) + '"')
+ # deletions should not change the creation events
+ # TODO: set exact_match to true
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
- '" type: "' + str(event['event']) +'"')
- # only deletions should be counted here
- assert_equal(len(parsed_result['events']), number_of_objects)
+ '" type: "' + str(event['event']) + '"')
+ # only deletions should be listed here
+ # TODO: set exact_match to true
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
# get the events from the all events subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
- '" type: "' + str(event['event']) +'"')
- # TODO deleted events are not accounted for, should be number_of_objects*2
- assert_equal(len(parsed_result['events']), number_of_objects)
+ '" type: "' + str(event['event']) + '"')
+ # both deletions and creations should be here
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False)
+ # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+ # TODO: (1) test deletions (2) test overall number of events
# cleanup
sub_create_conf.del_config()
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
max_events = 15
- total_events = 0
+ total_events_count = 0
next_marker = None
+ all_events = []
while True:
# get the events from the subscription
result, _ = sub_conf.get_events(max_events, next_marker)
parsed_result = json.loads(result)
- total_events += len(parsed_result['events'])
+ events = parsed_result['events']
+ total_events_count += len(events)
+ all_events.extend(events)
next_marker = parsed_result['next_marker']
- for event in parsed_result['events']:
- log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ for event in events:
+ log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
if next_marker == '':
break
- # TODO numbers dont match, should be ==
- assert total_events >= number_of_objects
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_events_by_elements(all_events, keys, exact_match=False)
# cleanup
sub_conf.del_config()
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
- for event in parsed_result['events']:
+ events = parsed_result['events']
+ original_number_of_events = len(events)
+ for event in events:
log.debug('Event (before ack) id: "' + str(event['id']) + '"')
- assert_equal(len(parsed_result['events']), number_of_objects)
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_events_by_elements(events, keys, exact_match=False)
# ack half of the events
events_to_ack = number_of_objects/2
- for event in parsed_result['events']:
+ for event in events:
if events_to_ack == 0:
break
_, status = sub_conf.ack_events(event['id'])
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (after ack) id: "' + str(event['id']) + '"')
- assert_equal(len(parsed_result['events']), number_of_objects - number_of_objects/2)
+ assert_equal(len(parsed_result['events']), original_number_of_events - number_of_objects/2)
# cleanup
sub_conf.del_config()
uploader.complete_upload()
fp.close()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
- assert_equal(len(parsed_result['events']), 3)
+ # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
+ assert len(parsed_result['events']) >= 3
# cleanup
sub_conf.del_config()
notification_conf.del_config()
key.set_contents_from_string('kaboom')
v2 = key.version_id
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# set delete markers
bucket.delete_key(key.name, version_id=v2)
bucket.delete_key(key.name, version_id=v1)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
- log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) +'"')
+ log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
- assert_equal(len(parsed_result['events']), 2)
+ # TODO: verify the specific events
+ assert len(parsed_result['events']) >= 2
+
# cleanup
sub_conf.del_config()
notification_conf.del_config()