log = logging.getLogger(__name__)
def rgwadmin(ctx, client, cmd):
+ log.info('radosgw-admin: %s' % cmd)
pre = [
'LD_LIBRARY_PATH=/tmp/cephtest/binary/usr/local/lib',
'/tmp/cephtest/enable-coredump',
j = None
if not r and out != '':
j = json.loads(out)
- log.info(j)
+ log.info(' json result: %s' % j)
return (r, j)
def task(ctx, config):
auid='1234'
access_key='9te6NH5mcdcq0Tc5i8i1'
secret_key='Ny4IOauQoL18Gp2zM7lC1vLmoawgqcYP/YGcWfXu'
+ access_key2='p5YnriCv1nAtykxBrupQ'
+ secret_key2='Q8Tk6Q/27hfbFSYdSkPtUqhqx1GgzvpXa4WARozh'
bucket_name='myfoo'
assert out['user_id'] == user
assert out['email'] == email
assert out['display_name'] == display_name
+ assert len(out['keys']) == 1
assert out['keys'][0]['access_key'] == access_key
assert out['keys'][0]['secret_key'] == secret_key
assert not out['suspended']
assert not out['suspended']
# add key
+ (err, out) = rgwadmin(ctx, client, [
+ 'key', 'create', '--uid', user,
+ '--access-key', access_key2, '--secret', secret_key2,
+ ])
+ assert not err
+ (err, out) = rgwadmin(ctx, client, ['user', 'info', '--uid', user])
+ assert not err
+ assert len(out['keys']) == 2
+ assert out['keys'][0]['access_key'] == access_key2 or out['keys'][1]['access_key'] == access_key2
+ assert out['keys'][0]['secret_key'] == secret_key2 or out['keys'][1]['secret_key'] == secret_key2
# remove key
+ (err, out) = rgwadmin(ctx, client, [
+ 'key', 'rm', '--uid', user,
+ '--access-key', access_key2,
+ ])
+ assert not err
+ assert len(out['keys']) == 1
+ assert out['keys'][0]['access_key'] == access_key
+ assert out['keys'][0]['secret_key'] == secret_key
+ # no buckets yet
+ (err, out) = rgwadmin(ctx, client, ['bucket', 'stats', '--uid', user])
+ assert not err
+ assert len(out) == 0
+
# connect to rgw
(remote,) = ctx.cluster.only(client).remotes.iterkeys()
(remote_user, remote_host) = remote.name.split('@')
(err, out) = rgwadmin(ctx, client, [
'bucket', 'stats', '--bucket', bucket_name])
assert not err
+ assert out['owner'] == user
bucket_id = out['id']
+
+ # no buckets yet
+ (err, out) = rgwadmin(ctx, client, ['bucket', 'stats', '--uid', user])
+ assert not err
+ assert len(out) == 1
+ assert out[0]['id'] == bucket_id
# use some space
key = boto.s3.key.Key(bucket)
(err, out) = rgwadmin(ctx, client, [
'bucket', 'stats', '--bucket-id', '%d' % bucket_id])
assert not err
- assert out['id'] == '%d' % bucket_id
-
+ assert out['id'] == bucket_id
+ assert out['usage']['rgw.main']['num_objects'] == 1
+ assert out['usage']['rgw.main']['size_kb'] > 0
+
+ # reclaim it
+ key.delete()
+ (err, out) = rgwadmin(ctx, client, [
+ 'bucket', 'stats', '--bucket-id', '%d' % bucket_id])
+ assert not err
+ assert out['id'] == bucket_id
+ assert out['usage']['rgw.main']['num_objects'] == 0
+
+ # list log objects
+ (err, out) = rgwadmin(ctx, client, ['log', 'list'])
+ assert not err
+ assert len(out) > 0
+
+ for obj in out:
+ (err, log) = rgwadmin(ctx, client, ['log', 'show', '--object', obj])
+ assert not err
+ assert len(log) > 0
+ assert log['bucket'] == bucket_name
+ assert log['bucket_id'] == bucket_id
+ assert log['bucket_owner'] == user
+ for entry in log['log_entries']:
+ assert entry['bucket'] == bucket_name
+ assert entry['user'] == user
+
+ (err, out) = rgwadmin(ctx, client, ['log', 'rm', '--object', obj])
+ assert not err
+
+ # TODO: show log by bucket+date
+
# remove user
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user])
assert not err