From a45ab45f74042617ec382f3d50f4d45d883042f5 Mon Sep 17 00:00:00 2001 From: Marcus Watts Date: Fri, 4 Aug 2017 20:01:32 -0400 Subject: [PATCH] Test bytes_sent bugs. Rearrange logic to make it easier to measure accumulation. Instrument the boto request/response loop to count bytes in and out. Accumulate byte counts in usage like structure. Compare actual usage reported by ceph against local usage measured. Report and assert if there are any short-comings. Remove zone placement rule that was newly added at end: tests should be rerunable. Nit: the logic to wait for "delete_obj" is not quite right. Fixes: http://tracker.ceph.com/issues/19870 Signed-off-by: Marcus Watts --- qa/tasks/radosgw_admin.py | 425 +++++++++++++++++++++++++++++++++----- 1 file changed, 370 insertions(+), 55 deletions(-) diff --git a/qa/tasks/radosgw_admin.py b/qa/tasks/radosgw_admin.py index 86903476434fc..8e744e3b6628e 100644 --- a/qa/tasks/radosgw_admin.py +++ b/qa/tasks/radosgw_admin.py @@ -6,18 +6,26 @@ Rgw admin testing against a running instance # # grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //' # +# to run this standalone: +# python qa/tasks/radosgw_admin.py [USER] HOSTNAME +# import copy import json import logging import time import datetime +import Queue +import bunch + +import sys from cStringIO import StringIO import boto.exception import boto.s3.connection import boto.s3.acl +from boto.utils import RequestHook import httplib2 @@ -27,6 +35,197 @@ from util.rgw import rgwadmin, get_user_summary, get_user_successful_ops log = logging.getLogger(__name__) +def usage_acc_findentry2(entries, user, add=True): + for e in entries: + if e['user'] == user: + return e + if not add: + return None + e = {'user': user, 'buckets': []} + entries.append(e) + return e +def usage_acc_findsum2(summaries, user, add=True): + for e in summaries: + if e['user'] == user: + return e + if not add: + return None + e = {'user': user, 'categories': [], + 'total': {'bytes_received': 0, + 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }} + summaries.append(e) + return e +def usage_acc_update2(x, out, b_in, err): + x['bytes_sent'] += b_in + x['bytes_received'] += out + x['ops'] += 1 + if not err: + x['successful_ops'] += 1 +def usage_acc_validate_fields(r, x, x2, what): + q=[] + for field in ['bytes_sent', 'bytes_received', 'ops', 'successful_ops']: + try: + if x2[field] < x[field]: + q.append("field %s: %d < %d" % (field, x2[field], x[field])) + except Exception as ex: + r.append( "missing/bad field " + field + " in " + what + " " + str(ex)) + return + if len(q) > 0: + r.append("incomplete counts in " + what + ": " + ", ".join(q)) +class usage_acc: + def __init__(self): + self.results = {'entries': [], 'summary': []} + def findentry(self, user): + return usage_acc_findentry2(self.results['entries'], user) + def findsum(self, user): + return usage_acc_findsum2(self.results['summary'], user) + def e2b(self, e, bucket, add=True): + for b in e['buckets']: + if b['bucket'] == bucket: + return b + if not add: + return None + b = {'bucket': bucket, 'categories': []} + e['buckets'].append(b) + return b + def c2x(self, c, cat, add=True): + for x in c: + if x['category'] == cat: + return x + if not add: + return None + x = {'bytes_received': 0, 'category': cat, + 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 } + c.append(x) + return x + def update(self, c, cat, user, out, b_in, err): + x = self.c2x(c, cat) + usage_acc_update2(x, out, b_in, err) + if not err and cat == 'create_bucket' and not x.has_key('owner'): + x['owner'] = user + def make_entry(self, cat, bucket, user, out, b_in, err): + if cat == 'create_bucket' and err: + return + e = self.findentry(user) + b = self.e2b(e, bucket) + self.update(b['categories'], cat, user, out, b_in, err) + s = self.findsum(user) + x = self.c2x(s['categories'], cat) + usage_acc_update2(x, out, b_in, err) + x = s['total'] + usage_acc_update2(x, out, b_in, err) + def generate_make_entry(self): + return lambda cat,bucket,user,out,b_in,err: self.make_entry(cat, bucket, user, out, b_in, err) + def get_usage(self): + return self.results + def compare_results(self, results): + if not results.has_key('entries') or not results.has_key('summary'): + return ['Missing entries or summary'] + r = [] + for e in self.results['entries']: + try: + e2 = usage_acc_findentry2(results['entries'], e['user'], False) + except Exception as ex: + r.append("malformed entry looking for user " + + e['user'] + " " + str(ex)) + break + if e2 == None: + r.append("missing entry for user " + e['user']) + continue + for b in e['buckets']: + c = b['categories'] + if b['bucket'] == 'nosuchbucket': + print "got here" + try: + b2 = self.e2b(e2, b['bucket'], False) + if b2 != None: + c2 = b2['categories'] + except Exception as ex: + r.append("malformed entry looking for bucket " + + b['bucket'] + " in user " + e['user'] + " " + str(ex)) + break + if b2 == None: + r.append("can't find bucket " + b['bucket'] + + " in user " + e['user']) + continue + for x in c: + try: + x2 = self.c2x(c2, x['category'], False) + except Exception as ex: + r.append("malformed entry looking for " + + x['category'] + " in bucket " + b['bucket'] + + " user " + e['user'] + " " + str(ex)) + break + usage_acc_validate_fields(r, x, x2, "entry: category " + + x['category'] + " bucket " + b['bucket'] + + " in user " + e['user']) + for s in self.results['summary']: + c = s['categories'] + try: + s2 = usage_acc_findsum2(results['summary'], s['user'], False) + except Exception as ex: + r.append("malformed summary looking for user " + e['user'] + + " " + str(ex)) + break + if s2 == None: + r.append("missing summary for user " + e['user'] + " " + str(ex)) + continue + try: + c2 = s2['categories'] + except Exception as ex: + r.append("malformed summary missing categories for user " + + e['user'] + " " + str(ex)) + break + for x in c: + try: + x2 = self.c2x(c2, x['category'], False) + except Exception as ex: + r.append("malformed summary looking for " + + x['category'] + " user " + e['user'] + " " + str(ex)) + break + usage_acc_validate_fields(r, x, x2, "summary: category " + + x['category'] + " in user " + e['user']) + x = s['total'] + try: + x2 = s2['total'] + except Exception as ex: + r.append("malformed summary looking for totals for user " + + e['user'] + " " + str(ex)) + break + usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user']) + return r + +def ignore_this_entry(cat, bucket, user, out, b_in, err): + pass +class requestlog_queue(): + def __init__(self, add): + self.q = Queue.Queue(1000) + self.adder = add + def handle_request_data(self, request, response, error=False): + now = datetime.datetime.now() + if error: + pass + elif response.status < 200 or response.status >= 400: + error = True + self.q.put(bunch.Bunch({'t': now, 'o': request, 'i': response, 'e': error})) + def clear(self): + with self.q.mutex: + self.q.queue.clear() + def log_and_clear(self, cat, bucket, user, add_entry = None): + while not self.q.empty(): + j = self.q.get() + bytes_out = 0 + if 'Content-Length' in j.o.headers: + bytes_out = int(j.o.headers['Content-Length']) + bytes_in = 0 + if 'content-length' in j.i.msg.dict: + bytes_in = int(j.i.msg.dict['content-length']) + log.info('RL: %s %s %s bytes_out=%d bytes_in=%d failed=%r' + % (cat, bucket, user, bytes_out, bytes_in, j.e)) + if add_entry == None: + add_entry = self.adder + add_entry(cat, bucket, user, bytes_out, bytes_in, j.e) + def create_presigned_url(conn, method, bucket_name, key_name, expiration): return conn.generate_url(expires_in=expiration, method=method, @@ -119,8 +318,17 @@ def task(ctx, config): calling_format=boto.s3.connection.OrdinaryCallingFormat(), ) + acc = usage_acc() + rl = requestlog_queue(acc.generate_make_entry()) + connection.set_request_hook(rl) + connection2.set_request_hook(rl) + # legend (test cases can be easily grep-ed out) # TESTCASE 'testname','object','method','operation','assertion' + + # TESTCASE 'usage-show0' 'usage' 'show' 'all usage' 'succeeds' + (err, summary0) = rgwadmin(ctx, client, ['usage', 'show'], check_status=True) + # TESTCASE 'info-nosuch','user','info','non-existent user','fails' (err, out) = rgwadmin(ctx, client, ['user', 'info', '--uid', user1]) assert err @@ -266,11 +474,19 @@ def task(ctx, config): # create a first bucket bucket = connection.create_bucket(bucket_name) + rl.log_and_clear("create_bucket", bucket_name, user1) + # TESTCASE 'bucket-list','bucket','list','one bucket','succeeds, expected list' (err, out) = rgwadmin(ctx, client, ['bucket', 'list', '--uid', user1], check_status=True) assert len(out) == 1 assert out[0] == bucket_name + bucket_list = connection.get_all_buckets() + assert len(bucket_list) == 1 + assert bucket_list[0].name == bucket_name + + rl.log_and_clear("list_buckets", '', user1) + # TESTCASE 'bucket-list-all','bucket','list','all buckets','succeeds, expected list' (err, out) = rgwadmin(ctx, client, ['bucket', 'list'], check_status=True) assert len(out) >= 1 @@ -278,8 +494,11 @@ def task(ctx, config): # TESTCASE 'max-bucket-limit,'bucket','create','4 buckets','5th bucket fails due to max buckets == 4' bucket2 = connection.create_bucket(bucket_name + '2') + rl.log_and_clear("create_bucket", bucket_name + '2', user1) bucket3 = connection.create_bucket(bucket_name + '3') + rl.log_and_clear("create_bucket", bucket_name + '3', user1) bucket4 = connection.create_bucket(bucket_name + '4') + rl.log_and_clear("create_bucket", bucket_name + '4', user1) # the 5th should fail. failed = False try: @@ -287,11 +506,15 @@ def task(ctx, config): except Exception: failed = True assert failed + rl.log_and_clear("create_bucket", bucket_name + '5', user1) # delete the buckets bucket2.delete() + rl.log_and_clear("delete_bucket", bucket_name + '2', user1) bucket3.delete() + rl.log_and_clear("delete_bucket", bucket_name + '3', user1) bucket4.delete() + rl.log_and_clear("delete_bucket", bucket_name + '4', user1) # TESTCASE 'bucket-stats3','bucket','stats','new empty bucket','succeeds, empty list' (err, out) = rgwadmin(ctx, client, [ @@ -307,6 +530,7 @@ def task(ctx, config): # use some space key = boto.s3.key.Key(bucket) key.set_contents_from_string('one') + rl.log_and_clear("put_obj", bucket_name, user1) # TESTCASE 'bucket-stats5','bucket','stats','after creating key','succeeds, lists one non-empty object' (err, out) = rgwadmin(ctx, client, [ @@ -317,6 +541,7 @@ def task(ctx, config): # reclaim it key.delete() + rl.log_and_clear("delete_obj", bucket_name, user1) # TESTCASE 'bucket unlink', 'bucket', 'unlink', 'unlink bucket from user', 'fails', 'access denied error' (err, out) = rgwadmin(ctx, client, @@ -344,9 +569,11 @@ def task(ctx, config): denied = True assert not denied + rl.log_and_clear("put_obj", bucket_name, user1) # delete the object key.delete() + rl.log_and_clear("delete_obj", bucket_name, user1) # link the bucket to another user (err, out) = rgwadmin(ctx, client, ['metadata', 'get', 'bucket:{n}'.format(n=bucket_name)], @@ -383,6 +610,17 @@ def task(ctx, config): object_name = 'four' key = boto.s3.key.Key(bucket, object_name) key.set_contents_from_string(object_name) + rl.log_and_clear("put_obj", bucket_name, user1) + + # fetch it too (for usage stats presently) + s = key.get_contents_as_string() + rl.log_and_clear("get_obj", bucket_name, user1) + assert s == object_name + # list bucket too (for usage stats presently) + keys = list(bucket.list()) + rl.log_and_clear("list_bucket", bucket_name, user1) + assert len(keys) == 1 + assert keys[0].name == object_name # now delete it (err, out) = rgwadmin(ctx, client, @@ -428,72 +666,22 @@ def task(ctx, config): # TODO: show log by bucket+date - # need to wait for all usage data to get flushed, should take up to 30 seconds - timestamp = time.time() - while time.time() - timestamp <= (20 * 60): # wait up to 20 minutes - (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--categories', 'delete_obj']) # last operation we did is delete obj, wait for it to flush - if get_user_successful_ops(out, user1) > 0: - break - time.sleep(1) - - assert time.time() - timestamp <= (20 * 60) - - # TESTCASE 'usage-show' 'usage' 'show' 'all usage' 'succeeds' - (err, out) = rgwadmin(ctx, client, ['usage', 'show'], check_status=True) - assert len(out['entries']) > 0 - assert len(out['summary']) > 0 - - user_summary = get_user_summary(out, user1) - - total = user_summary['total'] - assert total['successful_ops'] > 0 - - # TESTCASE 'usage-show2' 'usage' 'show' 'user usage' 'succeeds' - (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1], - check_status=True) - assert len(out['entries']) > 0 - assert len(out['summary']) > 0 - user_summary = out['summary'][0] - for entry in user_summary['categories']: - assert entry['successful_ops'] > 0 - assert user_summary['user'] == user1 - - # TESTCASE 'usage-show3' 'usage' 'show' 'user usage categories' 'succeeds' - test_categories = ['create_bucket', 'put_obj', 'delete_obj', 'delete_bucket'] - for cat in test_categories: - (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1, '--categories', cat], - check_status=True) - assert len(out['summary']) > 0 - user_summary = out['summary'][0] - assert user_summary['user'] == user1 - assert len(user_summary['categories']) == 1 - entry = user_summary['categories'][0] - assert entry['category'] == cat - assert entry['successful_ops'] > 0 - - # the usage flush interval is 30 seconds, wait that much an then some - # to make sure everything has been flushed - time.sleep(35) - - # TESTCASE 'usage-trim' 'usage' 'trim' 'user usage' 'succeeds, usage removed' - (err, out) = rgwadmin(ctx, client, ['usage', 'trim', '--uid', user1], - check_status=True) - (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1], - check_status=True) - assert len(out['entries']) == 0 - assert len(out['summary']) == 0 - # TESTCASE 'user-suspend2','user','suspend','existing user','succeeds' (err, out) = rgwadmin(ctx, client, ['user', 'suspend', '--uid', user1], check_status=True) # TESTCASE 'user-suspend3','user','suspend','suspended user','cannot write objects' + denied = False try: key = boto.s3.key.Key(bucket) key.set_contents_from_string('five') except boto.exception.S3ResponseError as e: + denied = True assert e.status == 403 + assert denied + rl.log_and_clear("put_obj", bucket_name, user1) + # TESTCASE 'user-renable2','user','enable','suspended user','succeeds' (err, out) = rgwadmin(ctx, client, ['user', 'enable', '--uid', user1], check_status=True) @@ -501,6 +689,7 @@ def task(ctx, config): # TESTCASE 'user-renable3','user','enable','reenabled user','can write objects' key = boto.s3.key.Key(bucket) key.set_contents_from_string('six') + rl.log_and_clear("put_obj", bucket_name, user1) # TESTCASE 'gc-list', 'gc', 'list', 'get list of objects ready for garbage collection' @@ -509,9 +698,11 @@ def task(ctx, config): big_key = boto.s3.key.Key(bucket) big_key.set_contents_from_string(test_string) + rl.log_and_clear("put_obj", bucket_name, user1) # now delete the head big_key.delete() + rl.log_and_clear("delete_obj", bucket_name, user1) # wait a bit to give the garbage collector time to cycle time.sleep(15) @@ -537,45 +728,56 @@ def task(ctx, config): bucket.delete() except boto.exception.S3ResponseError as e: assert e.status == 409 + rl.log_and_clear("delete_bucket", bucket_name, user1) key.delete() + rl.log_and_clear("delete_obj", bucket_name, user1) bucket.delete() + rl.log_and_clear("delete_bucket", bucket_name, user1) # TESTCASE 'policy', 'bucket', 'policy', 'get bucket policy', 'returns S3 policy' bucket = connection.create_bucket(bucket_name) + rl.log_and_clear("create_bucket", bucket_name, user1) # create an object key = boto.s3.key.Key(bucket) key.set_contents_from_string('seven') + rl.log_and_clear("put_obj", bucket_name, user1) # should be private already but guarantee it key.set_acl('private') + rl.log_and_clear("put_acls", bucket_name, user1) (err, out) = rgwadmin(ctx, client, ['policy', '--bucket', bucket.name, '--object', key.key], check_status=True, format='xml') acl = get_acl(key) + rl.log_and_clear("get_acls", bucket_name, user1) assert acl == out.strip('\n') # add another grantee by making the object public read key.set_acl('public-read') + rl.log_and_clear("put_acls", bucket_name, user1) (err, out) = rgwadmin(ctx, client, ['policy', '--bucket', bucket.name, '--object', key.key], check_status=True, format='xml') acl = get_acl(key) + rl.log_and_clear("get_acls", bucket_name, user1) assert acl == out.strip('\n') # TESTCASE 'rm-bucket', 'bucket', 'rm', 'bucket with objects', 'succeeds' bucket = connection.create_bucket(bucket_name) + rl.log_and_clear("create_bucket", bucket_name, user1) key_name = ['eight', 'nine', 'ten', 'eleven'] for i in range(4): key = boto.s3.key.Key(bucket) key.set_contents_from_string(key_name[i]) + rl.log_and_clear("put_obj", bucket_name, user1) (err, out) = rgwadmin(ctx, client, ['bucket', 'rm', '--bucket', bucket_name, '--purge-objects'], @@ -594,6 +796,7 @@ def task(ctx, config): # TESTCASE 'rm-user','user','rm','existing user','fails, still has buckets' bucket = connection.create_bucket(bucket_name) + rl.log_and_clear("create_bucket", bucket_name, user1) key = boto.s3.key.Key(bucket) (err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1]) @@ -601,8 +804,78 @@ def task(ctx, config): # TESTCASE 'rm-user2', 'user', 'rm', 'user with data', 'succeeds' bucket = connection.create_bucket(bucket_name) + rl.log_and_clear("create_bucket", bucket_name, user1) key = boto.s3.key.Key(bucket) key.set_contents_from_string('twelve') + rl.log_and_clear("put_obj", bucket_name, user1) + + time.sleep(35) + + # need to wait for all usage data to get flushed, should take up to 30 seconds + timestamp = time.time() + while time.time() - timestamp <= (2 * 60): # wait up to 20 minutes + (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--categories', 'delete_obj']) # one of the operations we did is delete_obj, should be present. + if get_user_successful_ops(out, user1) > 0: + break + time.sleep(1) + + assert time.time() - timestamp <= (20 * 60) + + # TESTCASE 'usage-show' 'usage' 'show' 'all usage' 'succeeds' + (err, out) = rgwadmin(ctx, client, ['usage', 'show'], check_status=True) + assert len(out['entries']) > 0 + assert len(out['summary']) > 0 + + r = acc.compare_results(out) + if len(r) != 0: + sys.stderr.write(("\n".join(r))+"\n") + assert(len(r) == 0) + + user_summary = get_user_summary(out, user1) + + total = user_summary['total'] + assert total['successful_ops'] > 0 + + # TESTCASE 'usage-show2' 'usage' 'show' 'user usage' 'succeeds' + (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1], + check_status=True) + assert len(out['entries']) > 0 + assert len(out['summary']) > 0 + user_summary = out['summary'][0] + for entry in user_summary['categories']: + assert entry['successful_ops'] > 0 + assert user_summary['user'] == user1 + + # TESTCASE 'usage-show3' 'usage' 'show' 'user usage categories' 'succeeds' + test_categories = ['create_bucket', 'put_obj', 'delete_obj', 'delete_bucket'] + for cat in test_categories: + (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1, '--categories', cat], + check_status=True) + assert len(out['summary']) > 0 + user_summary = out['summary'][0] + assert user_summary['user'] == user1 + assert len(user_summary['categories']) == 1 + entry = user_summary['categories'][0] + assert entry['category'] == cat + assert entry['successful_ops'] > 0 + + # should be all through with connection. (anything using connection + # should be BEFORE the usage stuff above.) + rl.log_and_clear("(before-close)", '-', '-', ignore_this_entry) + connection.close() + connection = None + + # the usage flush interval is 30 seconds, wait that much an then some + # to make sure everything has been flushed + time.sleep(35) + + # TESTCASE 'usage-trim' 'usage' 'trim' 'user usage' 'succeeds, usage removed' + (err, out) = rgwadmin(ctx, client, ['usage', 'trim', '--uid', user1], + check_status=True) + (err, out) = rgwadmin(ctx, client, ['usage', 'show', '--uid', user1], + check_status=True) + assert len(out['entries']) == 0 + assert len(out['summary']) == 0 (err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1, '--purge-data' ], @@ -638,3 +911,45 @@ def task(ctx, config): (err, out) = rgwadmin(ctx, client, ['zone', 'get','--rgw-zone','default']) assert len(out) > 0 assert len(out['placement_pools']) == orig_placement_pools + 1 + + zonecmd = ['zone', 'placement', 'rm', + '--rgw-zone', 'default', + '--placement-id', 'new-placement'] + + (err, out) = rgwadmin(ctx, client, zonecmd, check_status=True) + +import sys +from tasks.radosgw_admin import task +from teuthology.config import config +from teuthology.orchestra import cluster, remote +import argparse; + +def main(): + if len(sys.argv) == 3: + user = sys.argv[1] + "@" + host = sys.argv[2] + elif len(sys.argv) == 2: + user = "" + host = sys.argv[1] + else: + sys.stderr.write("usage: radosgw_admin.py [user] host\n") + exit(1) + client0 = remote.Remote(user + host) + ctx = config + ctx.cluster=cluster.Cluster(remotes=[(client0, + [ 'ceph.client.rgw.%s' % (host), ]),]) + + ctx.rgw = argparse.Namespace() + endpoints = {} + endpoints['ceph.client.rgw.%s' % host] = (host, 80) + ctx.rgw.role_endpoints = endpoints + ctx.rgw.realm = None + ctx.rgw.regions = {'region0': { 'api name': 'api1', + 'is master': True, 'master zone': 'r0z0', + 'zones': ['r0z0', 'r0z1'] }} + ctx.rgw.config = {'ceph.client.rgw.%s' % host: {'system user': {'name': '%s-system-user' % host}}} + task(config, None) + exit() + +if __name__ == '__main__': + main() -- 2.39.5