]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/d4n: Update D4N suite 62982/head
authorSamarah <samarah.uriarte@ibm.com>
Tue, 24 Jun 2025 15:39:33 +0000 (15:39 +0000)
committerSamarah <samarah.uriarte@ibm.com>
Mon, 14 Jul 2025 13:15:26 +0000 (13:15 +0000)
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
qa/workunits/rgw/test_rgw_d4n.py

index c9e08bd439c8900654fa957defd28ffa663fe65c..2f84662ac216eb0ca0057bf64bf959a957dfaa26 100644 (file)
@@ -1,19 +1,29 @@
 #!/usr/bin/python3
 
 '''
-This workunits tests the functionality of the D4N read workflow on a small object of size 4.
+This workunit tests the functionality of the D4N read workflow on a small object of size 4 and 
+a multipart object of a randomly generated size. Each test runs the following workflow:
+
+1. Upload the object
+2. Perform a GET call (object should be retrieved from backend)
+3. Compare the cached object's contents to the original object
+4. Check the directory contents
+5. Perform another GET call (object should be retrieved from datacache)
+6. Compare the cached object's contents to the original object
+7. Check the directory contents once more
 '''
 
 import logging as log
 from configobj import ConfigObj
+import botocore
 import boto3
 import redis
 import subprocess
-import json
 import os
 import hashlib
 import string
 import random
+import time
 
 log.basicConfig(level=log.DEBUG)
 
@@ -138,7 +148,8 @@ def get_body(response):
         got = got.decode()
     return got
 
-def test_small_object(r, client, obj):
+def test_small_object(r, client, s3):
+    obj = s3.Object(bucket_name='bkt', key='test.txt')
     test_txt = 'test'
 
     response_put = obj.put(Body=test_txt)
@@ -156,65 +167,65 @@ def test_small_object(r, client, obj):
     body = get_body(response_get)
     assert(body == "test")
 
-    data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
-    data = data.decode('latin-1').strip()
-    output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
-
+    bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
+    datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/test.txt/'
+    datacache = subprocess.check_output(['ls', '-a', datacache_path])
+    datacache = datacache.decode('latin-1').strip().splitlines()
+    if '#' in datacache[3]: # datablock key
+      datacache = datacache[3]
+    else:
+      datacache = datacache[2]
+    output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
     assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
 
-    data = r.hgetall('bkt_test.txt_0_4')
-    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
-    attrs = json.loads(output.decode('latin-1'))
-
-    # directory entry comparisons
-    assert(data.get('blockID') == '0')
-    assert(data.get('version') == attrs.get('tag'))
-    assert(data.get('size') == '4')
-    assert(data.get('globalWeight') == '0')
-    assert(data.get('blockHosts') == '127.0.0.1:6379')
-    assert(data.get('objName') == 'test.txt')
-    assert(data.get('bucketName') == 'bkt')
-    assert(data.get('creationTime') == attrs.get('mtime'))
-    assert(data.get('dirty') == '0')
-    assert(data.get('objHosts') == '')
-
-    # repopulate cache
-    response_put = obj.put(Body=test_txt)
-    assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200)
+    data = {}
+    for entry in r.scan_iter("*_test.txt_0_4"):
+        data = r.hgetall(entry)
+
+        # directory entry comparisons
+        assert(data.get('blockID') == '0')
+        assert(data.get('deleteMarker') == '0')
+        assert(data.get('size') == '4')
+        assert(data.get('globalWeight') == '0')
+        assert(data.get('objName') == 'test.txt')
+        assert(data.get('bucketName') == bucketID)
+        assert(data.get('dirty') == '0')
+        assert(data.get('hosts') == '127.0.0.1:6379')
 
     # second get call
     response_get = obj.get()
     assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200)
 
     # check logs to ensure object was retrieved from cache
-    res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+    oid_in_cache = bucketID + "#" + data.get('version') + "test.txt#0" + data.get('size')
+    res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
     assert(res >= 1)
 
     # retrieve and compare cache contents
     body = get_body(response_get)
     assert(body == "test")
 
-    data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
-    data = data.decode('latin-1').strip()
-    output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
-
+    datacache = subprocess.check_output(['ls', '-a', datacache_path])
+    datacache = datacache.decode('latin-1').strip().splitlines()
+    if '#' in datacache[3]: # datablock key
+      datacache = datacache[3]
+    else:
+      datacache = datacache[2]
+    output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
     assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
 
-    data = r.hgetall('bkt_test.txt_0_4')
-    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
-    attrs = json.loads(output.decode('latin-1'))
-
-    # directory entries should remain consistent
-    assert(data.get('blockID') == '0')
-    assert(data.get('version') == attrs.get('tag'))
-    assert(data.get('size') == '4')
-    assert(data.get('globalWeight') == '0')
-    assert(data.get('blockHosts') == '127.0.0.1:6379')
-    assert(data.get('objName') == 'test.txt')
-    assert(data.get('bucketName') == 'bkt')
-    assert(data.get('creationTime') == attrs.get('mtime'))
-    assert(data.get('dirty') == '0')
-    assert(data.get('objHosts') == '')
+    for entry in r.scan_iter("*_test.txt_0_4"):
+        data = r.hgetall(entry)
+
+        # directory entries should remain consistent
+        assert(data.get('blockID') == '0')
+        assert(data.get('deleteMarker') == '0')
+        assert(data.get('size') == '4')
+        assert(data.get('globalWeight') == '0')
+        assert(data.get('objName') == 'test.txt')
+        assert(data.get('bucketName') == bucketID)
+        assert(data.get('dirty') == '0')
+        assert(data.get('hosts') == '127.0.0.1:6379')
 
     r.flushall()
 
@@ -225,13 +236,17 @@ def test_large_object(r, client, s3):
     objlen = 30 * 1024 * 1024
     metadata = {'foo': 'bar'}
 
-    (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+    (upload_id, multipart_data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
     client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
 
     file_path = os.path.dirname(__file__)+'mymultipart'
 
     # first get
-    s3.Object(bucket_name, key).download_file(file_path)
+    try:
+        s3.Object(bucket_name, key).download_file(file_path)
+    except botocore.exceptions.ClientError as e:
+        log.error("ERROR: " + e)
+        raise
 
     # check logs to ensure object was retrieved from storage backend
     res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
@@ -239,80 +254,103 @@ def test_large_object(r, client, s3):
 
     # retrieve and compare cache contents
     with open(file_path, 'r') as body:
-        assert(body.read() == data)
+        assert(body.read() == multipart_data)
 
-    datacache_path = '/tmp/rgw_d4n_datacache/'
-    datacache = subprocess.check_output(['ls', datacache_path])
-    datacache = datacache.decode('latin-1').splitlines()
+    time.sleep(0.1)
+    bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
+    datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/mymultipart/'
+    datacache = subprocess.check_output(['ls', '-a', datacache_path])
+    datacache = datacache.decode('latin-1').splitlines()[2:]
 
     for file in datacache:
-        ofs = int(file.split("_")[3])
-        size = int(file.split("_")[4])
-        output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
-        assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+        if '#' in file: # data blocks
+            ofs = int(file.split("#")[1])
+            size = file.split("#")[2]
+            if '_' in file: # account for temp files
+                size = size.split("_")[0]
 
-    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
-    attrs = json.loads(output.decode('latin-1'))
+            output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+            assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
 
-    for entry in r.scan_iter("bkt_mymultipart_*"):
+    data = {}
+    for entry in r.scan_iter("*_mymultipart_*"):
         data = r.hgetall(entry)
-        name = entry.split("_")
+        entry_name = entry.split("_")
 
         # directory entry comparisons
-        assert(data.get('blockID') == name[2])
-        assert(data.get('version') == attrs.get('tag'))
-        assert(data.get('size') == name[3])
+        if len(entry_name) == 6: # versioned block
+            assert(data.get('blockID') == entry_name[4])
+            assert(data.get('deleteMarker') == '0')
+            assert(data.get('size') == entry_name[5])
+            assert(data.get('globalWeight') == '0')
+            assert(data.get('objName') == '_:null_mymultipart')
+            assert(data.get('bucketName') == bucketID)
+            assert(data.get('dirty') == '0')
+            assert(data.get('hosts') == '127.0.0.1:6379')
+            continue
+
+        assert(data.get('blockID') == entry_name[2])
+        assert(data.get('deleteMarker') == '0')
+        assert(data.get('size') == entry_name[3])
         assert(data.get('globalWeight') == '0')
-        assert(data.get('blockHosts') == '127.0.0.1:6379')
         assert(data.get('objName') == 'mymultipart')
-        assert(data.get('bucketName') == 'bkt')
-        assert(data.get('creationTime') == attrs.get('mtime'))
+        assert(data.get('bucketName') == bucketID)
         assert(data.get('dirty') == '0')
-        assert(data.get('objHosts') == '')
-
-    # repopulate cache
-    (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
-    client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+        assert(data.get('hosts') == '127.0.0.1:6379')
 
-    #second get
-    s3.Object(bucket_name, key).download_file(file_path)
+    # second get
+    try:
+        s3.Object(bucket_name, key).download_file(file_path)
+    except botocore.exceptions.ClientError as e:
+        log.error("ERROR: " + e)
+        raise
 
     # check logs to ensure object was retrieved from cache
-    res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+    oid_in_cache = bucketID + "#" + data.get('version') + "mymultipart#0" + data.get('size')
+    res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
     assert(res >= 1)
 
     # retrieve and compare cache contents
     with open(file_path, 'r') as body:
-        assert(body.read() == data)
+        assert(body.read() == multipart_data)
 
-    datacache_path = '/tmp/rgw_d4n_datacache/'
-    datacache = subprocess.check_output(['ls', datacache_path])
-    datacache = datacache.decode('latin-1').splitlines()
+    datacache = subprocess.check_output(['ls', '-a', datacache_path])
+    datacache = datacache.decode('latin-1').splitlines()[2:]
 
     for file in datacache:
-        ofs = int(file.split("_")[3])
-        size = int(file.split("_")[4])
-        output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
-        assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
-
-    output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
-    attrs = json.loads(output.decode('latin-1'))
+        if '#' in file: # data blocks
+            ofs = int(file.split("#")[1])
+            size = file.split("#")[2]
+            if '_' in file: # account for temp files
+                size = size.split("_")[0]
 
-    for key in r.scan_iter("bkt_mymultipart_*"):
-        data = r.hgetall(key)
-        name = key.split("_")
+            output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+            assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
 
-        # directory entry comparisons
-        assert(data.get('blockID') == name[2])
-        assert(data.get('version') == attrs.get('tag'))
-        assert(data.get('size') == name[3])
+    for entry in r.scan_iter("*_mymultipart_*"):
+        data = r.hgetall(entry)
+        entry_name = entry.split("_")
+
+        # directory entries should remain consistent
+        if len(entry_name) == 6: # versioned block
+            assert(data.get('blockID') == entry_name[4])
+            assert(data.get('deleteMarker') == '0')
+            assert(data.get('size') == entry_name[5])
+            assert(data.get('globalWeight') == '0')
+            assert(data.get('objName') == '_:null_mymultipart')
+            assert(data.get('bucketName') == bucketID)
+            assert(data.get('dirty') == '0')
+            assert(data.get('hosts') == '127.0.0.1:6379')
+            continue
+
+        assert(data.get('blockID') == entry_name[2])
+        assert(data.get('deleteMarker') == '0')
+        assert(data.get('size') == entry_name[3])
         assert(data.get('globalWeight') == '0')
-        assert(data.get('blockHosts') == '127.0.0.1:6379')
         assert(data.get('objName') == 'mymultipart')
-        assert(data.get('bucketName') == 'bkt')
-        assert(data.get('creationTime') == attrs.get('mtime'))
+        assert(data.get('bucketName') == bucketID)
         assert(data.get('dirty') == '0')
-        assert(data.get('objHosts') == '')
+        assert(data.get('hosts') == '127.0.0.1:6379')
 
     r.flushall()
 
@@ -346,14 +384,13 @@ def main():
 
     bucket = s3.Bucket('bkt')
     bucket.create()
-    obj = s3.Object(bucket_name='bkt', key='test.txt')
 
     # Check for Redis instance
     try:
         connection = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
         connection.ping() 
     except:
-        log.debug("ERROR: Redis instance not running.")
+        log.error("ERROR: Redis instance not running.")
         raise
 
     # Create s3cmd config
@@ -363,10 +400,15 @@ def main():
     r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
 
     # Run small object test
-    test_small_object(r, client, obj)
+    test_small_object(r, client, s3)
 
     # Run large object test
     test_large_object(r, client, s3)
+    
+    # close filter client
+    filter_client = [client for client in r.client_list()
+                       if client.get('name') in ['D4N.Filter']]
+    r.client_kill_filter(_id=filter_client[0].get('id'))
 
     log.info("D4NFilterTest completed.")