import subprocess
import json
import boto3
-from pprint import pprint
-import re
"""
Rgw manual and dynamic resharding testing against a running instance
_, ret = exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY), check_retcode=False)
assert(ret == 0 or errno.EEXIST)
- def boto_connect(portnum, ssl, proto):
- endpoint = proto + '://localhost:' + portnum
- conn = boto3.resource('s3',
- aws_access_key_id=ACCESS_KEY,
- aws_secret_access_key=SECRET_KEY,
- use_ssl=ssl,
- endpoint_url=endpoint,
- verify=False,
- config=None,
- )
- return conn
-
port = get_radosgw_port()
-
- proto = ('https' if port == '443' else 'http')
- connection = boto_connect(port, ssl=False, proto='http')
+ secure = (port == '443')
+ proto = ('https' if secure else 'http')
+ endpoint = proto + '://localhost:' + port
+ connection = boto3.resource('s3',
+ aws_access_key_id=ACCESS_KEY,
+ aws_secret_access_key=SECRET_KEY,
+ use_ssl=secure,
+ endpoint_url=endpoint,
+ verify=False)
# create a bucket
bucket = connection.create_bucket(Bucket=BUCKET_NAME)