]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/d4n: Add test for large object
authorSamarah <samarah.uriarte@ibm.com>
Wed, 21 Feb 2024 15:55:26 +0000 (15:55 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
qa/workunits/rgw/test_rgw_d4n.py

index 440938db7af64bfc10ff64f764f85de722fa5896..c9e08bd439c8900654fa957defd28ffa663fe65c 100644 (file)
@@ -12,6 +12,8 @@ import subprocess
 import json
 import os
 import hashlib
+import string
+import random
 
 log.basicConfig(level=log.DEBUG)
 
@@ -78,6 +80,52 @@ def create_s3cmd_config(path, proto):
     f.close()
     log.info("s3cmd config written")
 
+def generate_random(size, part_size=5*1024*1024):
+    """
+    Generate the specified number random data.
+    (actually each MB is a repetition of the first KB)
+    """
+    chunk = 1024
+    allowed = string.ascii_letters
+    for x in range(0, size, part_size):
+        strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
+        s = ''
+        left = size - x
+        this_part_size = min(left, part_size)
+        for y in range(this_part_size // chunk):
+            s = s + strpart
+        if this_part_size > len(s):
+            s = s + strpart[0:this_part_size - len(s)]
+        yield s
+        if (x == size):
+            return
+
+def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]):
+    """
+    generate a multi-part upload for a random file of specifed size,
+    if requested, generate a list of the parts
+    return the upload descriptor
+    """
+
+    if content_type == None and metadata == None:
+        response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
+    else:
+        response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type)
+
+    upload_id = response['UploadId']
+    s = ''
+    parts = []
+    for i, part in enumerate(generate_random(size, part_size)):
+        # part_num is necessary because PartNumber for upload_part and in parts must start at 1 and i starts at 0
+        part_num = i+1
+        s += part
+        response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part)
+        parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num})
+        if i in resend_parts:
+            client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part)
+
+    return (upload_id, s, parts)
+
 def get_cmd_output(cmd_out):
     out = cmd_out.decode('utf8')
     out = out.strip('\n')
@@ -170,6 +218,104 @@ def test_small_object(r, client, obj):
 
     r.flushall()
 
+def test_large_object(r, client, s3):
+    key="mymultipart"
+    bucket_name="bkt"
+    content_type='text/bla'
+    objlen = 30 * 1024 * 1024
+    metadata = {'foo': 'bar'}
+
+    (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+    client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+    file_path = os.path.dirname(__file__)+'mymultipart'
+
+    # first get
+    s3.Object(bucket_name, key).download_file(file_path)
+
+    # check logs to ensure object was retrieved from storage backend
+    res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
+    assert(res >= 1)
+
+    # retrieve and compare cache contents
+    with open(file_path, 'r') as body:
+        assert(body.read() == data)
+
+    datacache_path = '/tmp/rgw_d4n_datacache/'
+    datacache = subprocess.check_output(['ls', datacache_path])
+    datacache = datacache.decode('latin-1').splitlines()
+
+    for file in datacache:
+        ofs = int(file.split("_")[3])
+        size = int(file.split("_")[4])
+        output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+        assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+
+    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
+    attrs = json.loads(output.decode('latin-1'))
+
+    for entry in r.scan_iter("bkt_mymultipart_*"):
+        data = r.hgetall(entry)
+        name = entry.split("_")
+
+        # directory entry comparisons
+        assert(data.get('blockID') == name[2])
+        assert(data.get('version') == attrs.get('tag'))
+        assert(data.get('size') == name[3])
+        assert(data.get('globalWeight') == '0')
+        assert(data.get('blockHosts') == '127.0.0.1:6379')
+        assert(data.get('objName') == 'mymultipart')
+        assert(data.get('bucketName') == 'bkt')
+        assert(data.get('creationTime') == attrs.get('mtime'))
+        assert(data.get('dirty') == '0')
+        assert(data.get('objHosts') == '')
+
+    # repopulate cache
+    (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+    client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+    #second get
+    s3.Object(bucket_name, key).download_file(file_path)
+
+    # check logs to ensure object was retrieved from cache
+    res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+    assert(res >= 1)
+
+    # retrieve and compare cache contents
+    with open(file_path, 'r') as body:
+        assert(body.read() == data)
+
+    datacache_path = '/tmp/rgw_d4n_datacache/'
+    datacache = subprocess.check_output(['ls', datacache_path])
+    datacache = datacache.decode('latin-1').splitlines()
+
+    for file in datacache:
+        ofs = int(file.split("_")[3])
+        size = int(file.split("_")[4])
+        output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+        assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+
+    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
+    attrs = json.loads(output.decode('latin-1'))
+
+    for key in r.scan_iter("bkt_mymultipart_*"):
+        data = r.hgetall(key)
+        name = key.split("_")
+
+        # directory entry comparisons
+        assert(data.get('blockID') == name[2])
+        assert(data.get('version') == attrs.get('tag'))
+        assert(data.get('size') == name[3])
+        assert(data.get('globalWeight') == '0')
+        assert(data.get('blockHosts') == '127.0.0.1:6379')
+        assert(data.get('objName') == 'mymultipart')
+        assert(data.get('bucketName') == 'bkt')
+        assert(data.get('creationTime') == attrs.get('mtime'))
+        assert(data.get('dirty') == '0')
+        assert(data.get('objHosts') == '')
+
+    r.flushall()
+
 def main():
     """
     execute the d4n test
@@ -219,6 +365,9 @@ def main():
     # Run small object test
     test_small_object(r, client, obj)
 
+    # Run large object test
+    test_large_object(r, client, s3)
+
     log.info("D4NFilterTest completed.")
 
 main()