From 939735842d0a70b20cbb6d2972b1e519afdc86fa Mon Sep 17 00:00:00 2001 From: Samarah Date: Wed, 21 Feb 2024 15:55:26 +0000 Subject: [PATCH] qa/d4n: Add test for large object Signed-off-by: Samarah --- qa/workunits/rgw/test_rgw_d4n.py | 149 +++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/qa/workunits/rgw/test_rgw_d4n.py b/qa/workunits/rgw/test_rgw_d4n.py index 440938db7af..c9e08bd439c 100644 --- a/qa/workunits/rgw/test_rgw_d4n.py +++ b/qa/workunits/rgw/test_rgw_d4n.py @@ -12,6 +12,8 @@ import subprocess import json import os import hashlib +import string +import random log.basicConfig(level=log.DEBUG) @@ -78,6 +80,52 @@ def create_s3cmd_config(path, proto): f.close() log.info("s3cmd config written") +def generate_random(size, part_size=5*1024*1024): + """ + Generate the specified number random data. + (actually each MB is a repetition of the first KB) + """ + chunk = 1024 + allowed = string.ascii_letters + for x in range(0, size, part_size): + strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)]) + s = '' + left = size - x + this_part_size = min(left, part_size) + for y in range(this_part_size // chunk): + s = s + strpart + if this_part_size > len(s): + s = s + strpart[0:this_part_size - len(s)] + yield s + if (x == size): + return + +def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]): + """ + generate a multi-part upload for a random file of specifed size, + if requested, generate a list of the parts + return the upload descriptor + """ + + if content_type == None and metadata == None: + response = client.create_multipart_upload(Bucket=bucket_name, Key=key) + else: + response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type) + + upload_id = response['UploadId'] + s = '' + parts = [] + for i, part in enumerate(generate_random(size, part_size)): + # part_num is necessary because PartNumber for upload_part and in parts must start at 1 and i starts at 0 + part_num = i+1 + s += part + response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part) + parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num}) + if i in resend_parts: + client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part) + + return (upload_id, s, parts) + def get_cmd_output(cmd_out): out = cmd_out.decode('utf8') out = out.strip('\n') @@ -170,6 +218,104 @@ def test_small_object(r, client, obj): r.flushall() +def test_large_object(r, client, s3): + key="mymultipart" + bucket_name="bkt" + content_type='text/bla' + objlen = 30 * 1024 * 1024 + metadata = {'foo': 'bar'} + + (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata) + client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + file_path = os.path.dirname(__file__)+'mymultipart' + + # first get + s3.Object(bucket_name, key).download_file(file_path) + + # check logs to ensure object was retrieved from storage backend + res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log']) + assert(res >= 1) + + # retrieve and compare cache contents + with open(file_path, 'r') as body: + assert(body.read() == data) + + datacache_path = '/tmp/rgw_d4n_datacache/' + datacache = subprocess.check_output(['ls', datacache_path]) + datacache = datacache.decode('latin-1').splitlines() + + for file in datacache: + ofs = int(file.split("_")[3]) + size = int(file.split("_")[4]) + output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') + assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest()) + + output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart']) + attrs = json.loads(output.decode('latin-1')) + + for entry in r.scan_iter("bkt_mymultipart_*"): + data = r.hgetall(entry) + name = entry.split("_") + + # directory entry comparisons + assert(data.get('blockID') == name[2]) + assert(data.get('version') == attrs.get('tag')) + assert(data.get('size') == name[3]) + assert(data.get('globalWeight') == '0') + assert(data.get('blockHosts') == '127.0.0.1:6379') + assert(data.get('objName') == 'mymultipart') + assert(data.get('bucketName') == 'bkt') + assert(data.get('creationTime') == attrs.get('mtime')) + assert(data.get('dirty') == '0') + assert(data.get('objHosts') == '') + + # repopulate cache + (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata) + client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + #second get + s3.Object(bucket_name, key).download_file(file_path) + + # check logs to ensure object was retrieved from cache + res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log']) + assert(res >= 1) + + # retrieve and compare cache contents + with open(file_path, 'r') as body: + assert(body.read() == data) + + datacache_path = '/tmp/rgw_d4n_datacache/' + datacache = subprocess.check_output(['ls', datacache_path]) + datacache = datacache.decode('latin-1').splitlines() + + for file in datacache: + ofs = int(file.split("_")[3]) + size = int(file.split("_")[4]) + output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') + assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest()) + + output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart']) + attrs = json.loads(output.decode('latin-1')) + + for key in r.scan_iter("bkt_mymultipart_*"): + data = r.hgetall(key) + name = key.split("_") + + # directory entry comparisons + assert(data.get('blockID') == name[2]) + assert(data.get('version') == attrs.get('tag')) + assert(data.get('size') == name[3]) + assert(data.get('globalWeight') == '0') + assert(data.get('blockHosts') == '127.0.0.1:6379') + assert(data.get('objName') == 'mymultipart') + assert(data.get('bucketName') == 'bkt') + assert(data.get('creationTime') == attrs.get('mtime')) + assert(data.get('dirty') == '0') + assert(data.get('objHosts') == '') + + r.flushall() + def main(): """ execute the d4n test @@ -219,6 +365,9 @@ def main(): # Run small object test test_small_object(r, client, obj) + # Run large object test + test_large_object(r, client, s3) + log.info("D4NFilterTest completed.") main() -- 2.39.5