"""
Test radosgw-admin functionality against a running rgw instance.
"""
+ global log
assert config is None or isinstance(config, list) \
or isinstance(config, dict), \
"task s3tests only supports a list or dictionary for configuration"
assert not out['suspended']
# compare the metadata between different regions, make sure it matches
+ log.debug('compare the metadata between different regions, make sure it matches')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
assert out1 == out2
# suspend a user on the master, then check the status on the destination
+ log.debug('suspend a user on the master, then check the status on the destination')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
assert out['suspended']
# delete a user on the master, then check that it's gone on the destination
+ log.debug('delete a user on the master, then check that it\'s gone on the destination')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
check_status=True)
# now do the multi-region bucket tests
+ log.debug('now do the multi-region bucket tests')
# Create a second user for the following tests
+ log.debug('Create a second user for the following tests')
(err, out) = rgwadmin(ctx, client, [
'user', 'create',
'--uid', user2,
assert out is not None
# create a bucket and do a sync
+ log.debug('create a bucket and do a sync')
bucket = connection.create_bucket(bucket_name2)
rgw_utils.radosgw_agent_sync_all(ctx)
# compare the metadata for the bucket between different regions, make sure it matches
+ log.debug('compare the metadata for the bucket between different regions, make sure it matches')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
dest_client = c_config['dest']
# Attempt to create a new connection with user1 to the destination RGW
+ log.debug('Attempt to create a new connection with user1 to the destination RGW')
# and use that to attempt a delete (that should fail)
exception_encountered = False
try:
assert exception_encountered
# now delete the bucket on the source RGW and do another sync
+ log.debug('now delete the bucket on the source RGW and do another sync')
bucket.delete()
rgw_utils.radosgw_agent_sync_all(ctx)
# make sure that the bucket no longer exists in either region
+ log.debug('make sure that the bucket no longer exists in either region')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
assert err2
# create a bucket and then sync it
+ log.debug('create a bucket and then sync it')
bucket = connection.create_bucket(bucket_name2)
rgw_utils.radosgw_agent_sync_all(ctx)
# compare the metadata for the bucket between different regions, make sure it matches
+ log.debug('compare the metadata for the bucket between different regions, make sure it matches')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
assert out1 == out2
# Now delete the bucket and recreate it with a different user
+ log.debug('Now delete the bucket and recreate it with a different user')
# within the same window of time and then sync.
bucket.delete()
bucket = connection2.create_bucket(bucket_name2)
rgw_utils.radosgw_agent_sync_all(ctx)
# compare the metadata for the bucket between different regions, make sure it matches
+ log.debug('compare the metadata for the bucket between different regions, make sure it matches')
# user2 should own the bucket in both regions
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
assert out1['data']['owner'] != user1
# now we're going to use this bucket to test meta-data update propagation
+ log.debug('now we\'re going to use this bucket to test meta-data update propagation')
for agent_client, c_config in ctx.radosgw_agent.config.iteritems():
source_client = c_config['src']
dest_client = c_config['dest']
# get the metadata so we can tweak it
+ log.debug('get the metadata so we can tweak it')
(err, orig_data) = rgwadmin(ctx, source_client,
['metadata', 'get', 'bucket:{bucket_name}'.format(bucket_name=bucket_name2)],
check_status=True)
# manually edit mtime for this bucket to be 300 seconds in the past
+ log.debug('manually edit mtime for this bucket to be 300 seconds in the past')
new_data = copy.deepcopy(orig_data)
new_data['mtime'] = orig_data['mtime'] - 300
assert new_data != orig_data
check_status=True)
# get the metadata and make sure that the 'put' worked
+ log.debug('get the metadata and make sure that the \'put\' worked')
(err, out) = rgwadmin(ctx, source_client,
['metadata', 'get', 'bucket:{bucket_name}'.format(bucket_name=bucket_name2)],
check_status=True)
assert out == new_data
# sync to propagate the new metadata
+ log.debug('sync to propagate the new metadata')
rgw_utils.radosgw_agent_sync_all(ctx)
# get the metadata from the dest and compare it to what we just set
+ log.debug('get the metadata from the dest and compare it to what we just set')
# and what the source region has.
(err1, out1) = rgwadmin(ctx, source_client,
['metadata', 'get', 'bucket:{bucket_name}'.format(bucket_name=bucket_name2)],
assert out1 == new_data
# now we delete the bucket
+ log.debug('now we delete the bucket')
bucket.delete()
# Delete user2 as later tests do not expect it to exist.
if obj[:4] == 'meta' or obj[:4] == 'data':
continue
- (err, log) = rgwadmin(ctx, client, ['log', 'show', '--object', obj],
+ (err, rgwlog) = rgwadmin(ctx, client, ['log', 'show', '--object', obj],
check_status=True)
- assert len(log) > 0
+ assert len(rgwlog) > 0
# exempt bucket_name2 from checking as it was only used for multi-region tests
- assert log['bucket'].find(bucket_name) == 0 or log['bucket'].find(bucket_name2) == 0
- assert log['bucket'] != bucket_name or log['bucket_id'] == bucket_id
- assert log['bucket_owner'] == user1 or log['bucket'] == bucket_name + '5' or log['bucket'] == bucket_name2
- for entry in log['log_entries']:
- assert entry['bucket'] == log['bucket']
- assert entry['user'] == user1 or log['bucket'] == bucket_name + '5' or log['bucket'] == bucket_name2
+ assert rgwlog['bucket'].find(bucket_name) == 0 or rgwlog['bucket'].find(bucket_name2) == 0
+ assert rgwlog['bucket'] != bucket_name or rgwlog['bucket_id'] == bucket_id
+ assert rgwlog['bucket_owner'] == user1 or rgwlog['bucket'] == bucket_name + '5' or rgwlog['bucket'] == bucket_name2
+ for entry in rgwlog['log_entries']:
+ log.debug('checking log entry: ', entry)
+ assert entry['bucket'] == rgwlog['bucket']
+ possible_buckets = [bucket_name + '5', bucket_name2]
+ user = entry['user']
+ assert user == user1 or user.endswith('system-user') or \
+ rgwlog['bucket'] in possible_buckets
# TESTCASE 'log-rm','log','rm','delete log objects','succeeds'
(err, out) = rgwadmin(ctx, client, ['log', 'rm', '--object', obj],