From bbc5d486dbddf01cab98f5dbef88ac4500733482 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 26 Apr 2017 13:02:54 -0700 Subject: [PATCH] test/rgw/test_multi: initial es functional tests Signed-off-by: Yehuda Sadeh --- src/test/rgw/rgw_multi/tools.py | 82 +++++++++++++++++++++++++ src/test/rgw/rgw_multi/zone_es.py | 99 ++++++++++++++++++++----------- 2 files changed, 145 insertions(+), 36 deletions(-) create mode 100644 src/test/rgw/rgw_multi/tools.py diff --git a/src/test/rgw/rgw_multi/tools.py b/src/test/rgw/rgw_multi/tools.py new file mode 100644 index 000000000000..da32516435e4 --- /dev/null +++ b/src/test/rgw/rgw_multi/tools.py @@ -0,0 +1,82 @@ +import json +import boto + +def append_attr_value(d, attr, attrv): + if attrv and len(str(attrv)) > 0: + d[attr] = attrv + +def append_attr(d, k, attr): + try: + attrv = getattr(k, attr) + except: + return + append_attr_value(d, attr, attrv) + +def get_attrs(k, attrs): + d = {} + for a in attrs: + append_attr(d, k, a) + + return d + +def append_query_arg(s, n, v): + if not v: + return s + nv = '{n}={v}'.format(n=n, v=v) + if not s: + return nv + return '{s}&{nv}'.format(s=s, nv=nv) + +class KeyJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k, versioned=False): + attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control', + 'content_type', 'content_disposition', 'content_language', + 'owner', 'storage_class', 'md5', 'version_id', 'encrypted', + 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag'] + d = get_attrs(k, attrs) + d['etag'] = k.etag[1:-1] + if versioned: + d['is_latest'] = k.is_latest + return d + +class DeleteMarkerJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k): + attrs = ['name', 'version_id', 'last_modified', 'owner'] + d = get_attrs(k, attrs) + d['delete_marker'] = True + d['is_latest'] = k.is_latest + return d + +class UserJSONEncoder(boto.s3.user.User): + @staticmethod + def default(k): + attrs = ['id', 'display_name'] + return get_attrs(k, attrs) + +class BucketJSONEncoder(boto.s3.bucket.Bucket): + @staticmethod + def default(k): + attrs = ['name', 'creation_date'] + return get_attrs(k, attrs) + +class BotoJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, boto.s3.key.Key): + return KeyJSONEncoder.default(obj) + if isinstance(obj, boto.s3.deletemarker.DeleteMarker): + return DeleteMarkerJSONEncoder.default(obj) + if isinstance(obj, boto.s3.user.User): + return UserJSONEncoder.default(obj) + if isinstance(obj, boto.s3.prefix.Prefix): + return (lambda x: {'prefix': x.name})(obj) + if isinstance(obj, boto.s3.bucket.Bucket): + return BucketJSONEncoder.default(obj) + return json.JSONEncoder.default(self, obj) + + +def dump_json(o, cls=BotoJSONEncoder): + return json.dumps(o, cls=cls, indent=4) + + diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py index edb08cfee1a0..662f377ca2d0 100644 --- a/src/test/rgw/rgw_multi/zone_es.py +++ b/src/test/rgw/rgw_multi/zone_es.py @@ -5,6 +5,8 @@ import logging import boto import boto.s3.connection +import dateutil.parser + from nose.tools import eq_ as eq try: from itertools import izip_longest as zip_longest @@ -12,9 +14,15 @@ except ImportError: from itertools import zip_longest from rgw_multi.multisite import * +from rgw_multi.tools import * log = logging.getLogger(__name__) +def get_key_ver(k): + if not k.version_id: + return 'null' + return k.version_id + def check_object_eq(k1, k2, check_extra = True): assert k1 assert k2 @@ -27,13 +35,15 @@ def check_object_eq(k1, k2, check_extra = True): # eq(k1.content_disposition, k2.content_disposition) # eq(k1.content_language, k2.content_language) eq(k1.etag, k2.etag) - eq(k1.last_modified, k2.last_modified) + mtime1 = dateutil.parser.parse(k1.last_modified) + mtime2 = dateutil.parser.parse(k2.last_modified) + assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution if check_extra: eq(k1.owner.id, k2.owner.id) eq(k1.owner.display_name, k2.owner.display_name) # eq(k1.storage_class, k2.storage_class) eq(k1.size, k2.size) - eq(k1.version_id, k2.version_id) + eq(get_key_ver(k1), get_key_ver(k2)) # eq(k1.encrypted, k2.encrypted) def make_request(conn, method, bucket, key, query_args, headers): @@ -42,9 +52,6 @@ def make_request(conn, method, bucket, key, query_args, headers): raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) return result -def dump_json(o): - return json.dumps(o, indent=4) - def append_query_arg(s, n, v): if not v: return s @@ -57,12 +64,16 @@ class MDSearch: def __init__(self, conn, bucket_name, query, query_args = None, marker = None): self.conn = conn self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None self.query = query self.query_args = query_args self.max_keys = None self.marker = marker - def search(self): + def raw_search(self): q = self.query or '' query_args = append_query_arg(self.query_args, 'query', urllib.quote_plus(q)) if self.max_keys is not None: @@ -75,52 +86,68 @@ class MDSearch: headers = {} result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers) - return json.loads(result.read()) + l = [] -class ESZoneBucket: - def __init__(self, zone_conn, name, conn): - self.zone_conn = zone_conn - self.name = name - self.conn = conn + result_dict = json.loads(result.read()) - self.bucket = boto.s3.bucket.Bucket(name=name) + for entry in result_dict['Objects']: + k = boto.s3.key.Key(self.bucket, entry['Key']) - def get_all_versions(self): + k.version_id = entry['Instance'] + k.etag = entry['ETag'] + k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName']) + k.last_modified = entry['LastModified'] + k.size = entry['Size'] + k.content_type = entry['ContentType'] + k.versioned_epoch = entry['VersionedEpoch'] - marker = None - is_done = False + k.metadata = {} + for e in entry['CustomMetadata']: + k.metadata[e['Name']] = e['Value'] + + l.append(k) + return result_dict, l + + def search(self, drain = True, sort = True, sort_key = None): l = [] + is_done = False + while not is_done: - req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker) + result, result_keys = self.raw_search() + + l = l + result_keys - result = req.search() + is_done = not (drain and (result['IsTruncated'] == "true")) + marker = result['Marker'] - for entry in result['Objects']: - k = boto.s3.key.Key(self.bucket, entry['Key']) + if sort: + if not sort_key: + sort_key = lambda k: (k.name, -k.versioned_epoch) + l.sort(key = sort_key) - k.version_id = entry['Instance'] - k.etag = entry['ETag'] - k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName']) - k.last_modified = entry['LastModified'] - k.size = entry['Size'] - k.content_type = entry['ContentType'] - k.versioned_epoch = entry['VersionedEpoch'] + return l - k.metadata = {} - for e in entry['CustomMetadata']: - k.metadata[e['Name']] = e['Value'] - l.append(k) - is_done = (result['IsTruncated'] == "false") - marker = result['Marker'] +class ESZoneBucket: + def __init__(self, zone_conn, name, conn): + self.zone_conn = zone_conn + self.name = name + self.conn = conn + + self.bucket = boto.s3.bucket.Bucket(name=name) + + def get_all_versions(self): + + marker = None + is_done = False - l.sort(key = lambda l: (l.name, -l.versioned_epoch)) + req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker) - for k in l: + for k in req.search(): yield k @@ -180,7 +207,7 @@ class ESZone(Zone): for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): if k1 is None: - log.critical('key=%s is missing from zone=%s', k2.name, self.self.name) + log.critical('key=%s is missing from zone=%s', k2.name, self.name) assert False if k2 is None: log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) -- 2.47.3