import boto.s3.acl
import time
-import xml.etree.ElementTree as ET
from teuthology import misc as teuthology
out = proc.stdout.getvalue()
j = None
if not r and out != '':
- j = json.loads(out)
- log.info(' json result: %s' % j)
+ try:
+ j = json.loads(out)
+ log.info(' json result: %s' % j)
+ except ValueError:
+ j = out
+ log.info(' raw result: %s' % j)
return (r, j)
def task(ctx, config):
# upload an object
object_name = 'four'
- key = boto.s3.key.Key(bucket)
+ key = boto.s3.key.Key(bucket, object_name)
key.set_contents_from_string(object_name)
# now delete it
# create an object large enough to be split into multiple parts
test_string = 'foo'*10000000
- key = boto.s3.key.Key(bucket)
- key.set_contents_from_string(test_string)
+ big_key = boto.s3.key.Key(bucket)
+ big_key.set_contents_from_string(test_string)
# now delete the head
- key.delete()
+ big_key.delete()
# wait a bit to give the garbage collector time to cycle
time.sleep(15)
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1])
assert err
- # delete should fail
+ # delete should fail because ``key`` still exists
fails = False
try:
bucket.delete()
- except:
- fails = True
- assert fails
+ except boto.exception.S3ResponseError as e:
+ assert e.status == 409
key.delete()
bucket.delete()
(err, out) = rgwadmin(ctx, client, ['policy', '--bucket', bucket.name, '--object', key.key])
- assert out
+ assert not err
- policy = ET.fromstring(out)
- assert len(policy.findall('Grantee')) == 1
+ acl = key.get_xml_acl()
+
+ assert acl == out.strip('\n')
# add another grantee by making the object public read
key.set_acl('public-read')
(err, out) = rgwadmin(ctx, client, ['policy', '--bucket', bucket.name, '--object', key.key])
- assert out
+ assert not err
- policy = ET.fromstring(out)
- assert len(policy.findall('Grantee')) == 2
+ acl = key.get_xml_acl()
+ assert acl == out.strip('\n')
# TESTCASE 'rm-bucket', 'bucket', 'rm', 'bucket with objects', 'succeeds'
bucket = connection.create_bucket(bucket_name)
caps='user=read'
(err, out) = rgwadmin(ctx, client, ['caps', 'add', '--uid', user1, '--caps', caps])
- assert out['caps']['perm'] == 'read'
+ assert out['caps'][0]['perm'] == 'read'
# TESTCASE 'caps-rm', 'caps', 'rm', 'remove existing cap from user', 'succeeds'
(err, out) = rgwadmin(ctx, client, ['caps', 'rm', '--uid', user1, '--caps', caps])
# TESTCASE 'pool-add', 'pool', 'add', 'make default pool available', 'succeeds'
(err, out) = rgwadmin(ctx, client, ['pool', 'add', '--pool', default_pool])
- assert out['name'] == default_pool
+ assert not err
+
+ (err, out) = rgwadmin(ctx, client, ['pools', 'list'])
+ assert out[0]['name'] == default_pool
# TESTCASE 'cluster-info', 'cluster', 'info', 'get cluser info', 'succeeds'
(err, out) = rgwadmin(ctx, client, ['cluster', 'info'])