import subprocess
import json
import boto3
+from pprint import pprint
+import re
"""
Rgw manual and dynamic resharding testing against a running instance
#
#
-log.basicConfig(level=log.DEBUG)
+log.basicConfig(format = '%(message)s', level=log.DEBUG)
log.getLogger('botocore').setLevel(log.CRITICAL)
log.getLogger('boto3').setLevel(log.CRITICAL)
log.getLogger('urllib3').setLevel(log.CRITICAL)
DISPLAY_NAME = 'Testing'
ACCESS_KEY = 'NX5QOQKC6BH2IDN8HC7A'
SECRET_KEY = 'LnEsqNNqZIpkzauboDcLXLcYaWwLQ3Kop0zAnKIn'
-BUCKET_NAME1 = 'myfoo'
-BUCKET_NAME2 = 'mybar'
+BUCKET_NAME1 = 'a-bucket'
+BUCKET_NAME2 = 'b-bucket'
+BUCKET_NAME3 = 'c-bucket'
+BUCKET_NAME4 = 'd-bucket'
+BUCKET_NAME5 = 'e-bucket'
VER_BUCKET_NAME = 'myver'
-
def exec_cmd(cmd):
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = proc.communicate()
+ log.info(proc.args)
if proc.returncode == 0:
- log.info('command succeeded')
- if out is not None: log.info(out)
+ if out is not None: log.debug('{} \n {}'.format(out.decode('utf-8'), err.decode('utf-8')))
return out
else:
- raise Exception("error: %s \nreturncode: %s" % (err, proc.returncode))
+ raise Exception("error: {} \nreturncode: {}".format(err, proc.returncode))
except Exception as e:
- log.error('command failed')
+ log.error('command failed\n')
log.error(e)
return False
def get_radosgw_port():
out = exec_cmd('sudo netstat -nltp | grep radosgw')
- log.debug('output: %s' % out)
+ log.debug('output: {}'.format(out))
x = out.decode('utf8').split(" ")
port = [i for i in x if ':' in i][0].split(':')[1]
- log.info('radosgw port: %s' % port)
+ log.info('radosgw port: {}'.format(port))
return port
"""
function to get bucket stats
"""
- cmd = exec_cmd("radosgw-admin bucket stats --bucket %s" % bucket_name)
+ cmd = exec_cmd("radosgw-admin bucket stats --bucket {}".format(bucket_name))
json_op = json.loads(cmd)
+ #print(json.dumps(json_op, indent = 4, sort_keys=True))
bucket_id = json_op['id']
- num_shards_op = json_op['num_shards']
+ num_shards = json_op['num_shards']
if len(json_op['usage']) > 0:
num_objects = json_op['usage']['rgw.main']['num_objects']
size_kb = json_op['usage']['rgw.main']['size_kb']
else:
num_objects = 0
size_kb = 0
- log.debug("bucket %s id %s num_objects %d size_kb %d num_shards %d", bucket_name, bucket_id,
- num_objects, size_kb, num_shards_op)
- return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards_op)
+ log.debug(" \nBUCKET_STATS: \nbucket: {} id: {} num_objects: {} size_kb: {} num_shards: {}\n".format(bucket_name, bucket_id,
+ num_objects, size_kb, num_shards))
+ return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards)
def get_bucket_num_shards(bucket_name, bucket_id):
function to get bucket num shards
"""
metadata = 'bucket.instance:' + bucket_name + ':' + bucket_id
- log.debug("metadata %s", metadata)
- cmd = exec_cmd('radosgw-admin metadata get %s' % metadata)
+ cmd = exec_cmd('radosgw-admin metadata get {}'.format(metadata))
json_op = json.loads(cmd)
num_shards = json_op['data']['bucket_info']['num_shards']
- log.debug("bucket %s id %s num_shards %d", bucket_name, bucket_id, num_shards)
return num_shards
+def run_bucket_reshard_cmd(bucket_name, num_shards, **location):
+
+ # TODO: Get rid of duplication. use list
+ if ('error_location' in location):
+ return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-error-at {}'.format(bucket_name,
+ num_shards, location.get('error_location', None)))
+ elif ('abort_location' in location):
+ return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-abort-at {}'.format(bucket_name,
+ num_shards, location.get('abort_location', None)))
+ else:
+ return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(bucket_name, num_shards))
def main():
"""
execute manual and dynamic resharding commands
"""
# create user
- exec_cmd('radosgw-admin user create --uid %s --display-name %s --access-key %s --secret %s'
- % (USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY))
+ exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY))
def boto_connect(portnum, ssl, proto):
endpoint = proto + '://localhost:' + portnum
port = get_radosgw_port()
- if port == '80':
- connection = boto_connect(port, ssl=False, proto='http')
- elif port == '443':
- connection = boto_connect(port, ssl=True, proto='https')
+ proto = ('https' if port == '443' else 'http')
+ connection = boto_connect(port, ssl=False, proto='http')
# create a bucket
bucket1 = connection.create_bucket(Bucket=BUCKET_NAME1)
bucket2 = connection.create_bucket(Bucket=BUCKET_NAME2)
+ bucket3 = connection.create_bucket(Bucket=BUCKET_NAME3)
+ bucket4 = connection.create_bucket(Bucket=BUCKET_NAME4)
+ bucket5 = connection.create_bucket(Bucket=BUCKET_NAME5)
ver_bucket = connection.create_bucket(Bucket=VER_BUCKET_NAME)
connection.BucketVersioning('ver_bucket')
- bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
- bucket_stats2 = get_bucket_stats(BUCKET_NAME2)
- ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME)
-
bucket1_acl = connection.BucketAcl(BUCKET_NAME1).load()
bucket2_acl = connection.BucketAcl(BUCKET_NAME2).load()
ver_bucket_acl = connection.BucketAcl(VER_BUCKET_NAME).load()
# TESTCASE 'reshard-add','reshard','add','add bucket to resharding queue','succeeds'
- log.debug(' test: reshard add')
- num_shards_expected = bucket_stats1.num_shards + 1
- cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected))
+ log.debug('TEST: reshard add\n')
+
+ num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1
+ cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected))
cmd = exec_cmd('radosgw-admin reshard list')
json_op = json.loads(cmd)
- log.debug('bucket name %s', json_op[0]['bucket_name'])
+ log.debug('bucket name {}'.format(json_op[0]['bucket_name']))
assert json_op[0]['bucket_name'] == BUCKET_NAME1
assert json_op[0]['new_num_shards'] == num_shards_expected
# TESTCASE 'reshard-process','reshard','','process bucket resharding','succeeds'
- log.debug(' test: reshard process')
+ log.debug('TEST: reshard process\n')
cmd = exec_cmd('radosgw-admin reshard process')
time.sleep(5)
# check bucket shards num
bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
- bucket_stats1.get_num_shards()
if bucket_stats1.num_shards != num_shards_expected:
- log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1)
+ log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1))
# TESTCASE 'reshard-add','reshard','add','add non empty bucket to resharding queue','succeeds'
- log.debug(' test: reshard add non empty bucket')
+ log.debug('TEST: reshard add non empty bucket\n')
# create objs
num_objs = 8
for i in range(0, num_objs):
connection.Object(BUCKET_NAME1, ('key'+str(i))).put(Body=b"some_data")
- num_shards_expected = bucket_stats1.num_shards + 1
- cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected))
+ num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1
+ cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected))
cmd = exec_cmd('radosgw-admin reshard list')
json_op = json.loads(cmd)
- log.debug('bucket name %s', json_op[0]['bucket_name'])
assert json_op[0]['bucket_name'] == BUCKET_NAME1
assert json_op[0]['new_num_shards'] == num_shards_expected
# TESTCASE 'reshard process ,'reshard','process','reshard non empty bucket','succeeds'
- log.debug(' test: reshard process non empty bucket')
+ log.debug(' TEST: reshard process non empty bucket\n')
cmd = exec_cmd('radosgw-admin reshard process')
# check bucket shards num
bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
- bucket_stats1.get_num_shards()
if bucket_stats1.num_shards != num_shards_expected:
- log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1)
+ log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1))
- # TESTCASE 'manual resharding','bucket', 'reshard','','manual bucket resharding','succeeds'
- log.debug(' test: manual reshard bucket')
+ # TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with error injection\n')
# create objs
num_objs = 11
for i in range(0, num_objs):
connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
+ time.sleep(5)
+ old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, error_location = "before_target_shard_entry")
+
+ # check bucket shards num
+ cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME2))
+ for key in bucket2.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected)
+
+ #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with error injection')
+ # create objs
+ num_objs = 11
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+ time.sleep(5)
+ old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, error_location = "after_target_shard_entry")
+ # check bucket shards num
+ bucket_stats3 = get_bucket_stats(BUCKET_NAME3)
+ cur_shard_count = bucket_stats3.num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+ for key in bucket3.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected)
+
+ #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with error injection')
+ # create objs
+ num_objs = 11
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
time.sleep(10)
- num_shards_expected = bucket_stats2.num_shards + 1
- cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (BUCKET_NAME2,
- num_shards_expected))
+ old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, error_location = "before_layout_overwrite")
+
# check bucket shards num
- bucket_stats2 = get_bucket_stats(BUCKET_NAME2)
- bucket_stats2.get_num_shards()
- if bucket_stats2.num_shards != num_shards_expected:
- log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME2)
+ cur_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+ for key in bucket4.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected)
+
+ # TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with crash injection\n')
+
+ old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, abort_location = "before_target_shard_entry")
+
+ # check bucket shards num
+ cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME2))
+ for key in bucket2.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected)
+
+ #TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with error injection')
+
+ old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, abort_location = "after_target_shard_entry")
+
+ # check bucket shards num
+ cur_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+ for key in bucket3.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected)
+
+ #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+ log.debug('TEST: reshard bucket with error injection')
+
+ old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+ num_shards_expected = old_shard_count + 1
+ run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, abort_location = "before_layout_overwrite")
+ # check bucket shards num
+ bucket_stats4 = get_bucket_stats(BUCKET_NAME4)
+ cur_shard_count = bucket_stats4.num_shards
+ assert(cur_shard_count == old_shard_count)
+
+ #verify if bucket is accessible
+ log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+ for key in bucket4.objects.all():
+ print(key.key)
+
+ #verify if new objects can be added
+ num_objs = 5
+ for i in range(0, num_objs):
+ connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
+ #retry reshard
+ run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected)
# TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds'
log.debug(' test: reshard versioned bucket')
- num_shards_expected = ver_bucket_stats.num_shards + 1
- cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (VER_BUCKET_NAME,
+ num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1
+ cmd = exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(VER_BUCKET_NAME,
num_shards_expected))
# check bucket shards num
ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME)
assert new_ver_bucket_acl == ver_bucket_acl
# Clean up
- log.debug("Deleting bucket %s", BUCKET_NAME1)
+ log.debug("Deleting bucket {}".format(BUCKET_NAME1))
bucket1.objects.all().delete()
bucket1.delete()
- log.debug("Deleting bucket %s", BUCKET_NAME2)
+ log.debug("Deleting bucket {}".format(BUCKET_NAME2))
bucket2.objects.all().delete()
bucket2.delete()
- log.debug("Deleting bucket %s", VER_BUCKET_NAME)
+ log.debug("Deleting bucket {}".format(VER_BUCKET_NAME))
ver_bucket.delete()