From f08f70985086ce6c76fe0d2386531724128a22b5 Mon Sep 17 00:00:00 2001 From: Samarah Date: Tue, 24 Jun 2025 15:39:33 +0000 Subject: [PATCH] qa/d4n: Update D4N suite Signed-off-by: Samarah --- qa/workunits/rgw/test_rgw_d4n.py | 240 ++++++++++++++++++------------- 1 file changed, 141 insertions(+), 99 deletions(-) diff --git a/qa/workunits/rgw/test_rgw_d4n.py b/qa/workunits/rgw/test_rgw_d4n.py index c9e08bd439c89..2f84662ac216e 100644 --- a/qa/workunits/rgw/test_rgw_d4n.py +++ b/qa/workunits/rgw/test_rgw_d4n.py @@ -1,19 +1,29 @@ #!/usr/bin/python3 ''' -This workunits tests the functionality of the D4N read workflow on a small object of size 4. +This workunit tests the functionality of the D4N read workflow on a small object of size 4 and +a multipart object of a randomly generated size. Each test runs the following workflow: + +1. Upload the object +2. Perform a GET call (object should be retrieved from backend) +3. Compare the cached object's contents to the original object +4. Check the directory contents +5. Perform another GET call (object should be retrieved from datacache) +6. Compare the cached object's contents to the original object +7. Check the directory contents once more ''' import logging as log from configobj import ConfigObj +import botocore import boto3 import redis import subprocess -import json import os import hashlib import string import random +import time log.basicConfig(level=log.DEBUG) @@ -138,7 +148,8 @@ def get_body(response): got = got.decode() return got -def test_small_object(r, client, obj): +def test_small_object(r, client, s3): + obj = s3.Object(bucket_name='bkt', key='test.txt') test_txt = 'test' response_put = obj.put(Body=test_txt) @@ -156,65 +167,65 @@ def test_small_object(r, client, obj): body = get_body(response_get) assert(body == "test") - data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']) - data = data.decode('latin-1').strip() - output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1') - + bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip() + datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/test.txt/' + datacache = subprocess.check_output(['ls', '-a', datacache_path]) + datacache = datacache.decode('latin-1').strip().splitlines() + if '#' in datacache[3]: # datablock key + datacache = datacache[3] + else: + datacache = datacache[2] + output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1') assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest()) - data = r.hgetall('bkt_test.txt_0_4') - output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt']) - attrs = json.loads(output.decode('latin-1')) - - # directory entry comparisons - assert(data.get('blockID') == '0') - assert(data.get('version') == attrs.get('tag')) - assert(data.get('size') == '4') - assert(data.get('globalWeight') == '0') - assert(data.get('blockHosts') == '127.0.0.1:6379') - assert(data.get('objName') == 'test.txt') - assert(data.get('bucketName') == 'bkt') - assert(data.get('creationTime') == attrs.get('mtime')) - assert(data.get('dirty') == '0') - assert(data.get('objHosts') == '') - - # repopulate cache - response_put = obj.put(Body=test_txt) - assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200) + data = {} + for entry in r.scan_iter("*_test.txt_0_4"): + data = r.hgetall(entry) + + # directory entry comparisons + assert(data.get('blockID') == '0') + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == '4') + assert(data.get('globalWeight') == '0') + assert(data.get('objName') == 'test.txt') + assert(data.get('bucketName') == bucketID) + assert(data.get('dirty') == '0') + assert(data.get('hosts') == '127.0.0.1:6379') # second get call response_get = obj.get() assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200) # check logs to ensure object was retrieved from cache - res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log']) + oid_in_cache = bucketID + "#" + data.get('version') + "test.txt#0" + data.get('size') + res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log']) assert(res >= 1) # retrieve and compare cache contents body = get_body(response_get) assert(body == "test") - data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']) - data = data.decode('latin-1').strip() - output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1') - + datacache = subprocess.check_output(['ls', '-a', datacache_path]) + datacache = datacache.decode('latin-1').strip().splitlines() + if '#' in datacache[3]: # datablock key + datacache = datacache[3] + else: + datacache = datacache[2] + output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1') assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest()) - data = r.hgetall('bkt_test.txt_0_4') - output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt']) - attrs = json.loads(output.decode('latin-1')) - - # directory entries should remain consistent - assert(data.get('blockID') == '0') - assert(data.get('version') == attrs.get('tag')) - assert(data.get('size') == '4') - assert(data.get('globalWeight') == '0') - assert(data.get('blockHosts') == '127.0.0.1:6379') - assert(data.get('objName') == 'test.txt') - assert(data.get('bucketName') == 'bkt') - assert(data.get('creationTime') == attrs.get('mtime')) - assert(data.get('dirty') == '0') - assert(data.get('objHosts') == '') + for entry in r.scan_iter("*_test.txt_0_4"): + data = r.hgetall(entry) + + # directory entries should remain consistent + assert(data.get('blockID') == '0') + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == '4') + assert(data.get('globalWeight') == '0') + assert(data.get('objName') == 'test.txt') + assert(data.get('bucketName') == bucketID) + assert(data.get('dirty') == '0') + assert(data.get('hosts') == '127.0.0.1:6379') r.flushall() @@ -225,13 +236,17 @@ def test_large_object(r, client, s3): objlen = 30 * 1024 * 1024 metadata = {'foo': 'bar'} - (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata) + (upload_id, multipart_data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata) client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) file_path = os.path.dirname(__file__)+'mymultipart' # first get - s3.Object(bucket_name, key).download_file(file_path) + try: + s3.Object(bucket_name, key).download_file(file_path) + except botocore.exceptions.ClientError as e: + log.error("ERROR: " + e) + raise # check logs to ensure object was retrieved from storage backend res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log']) @@ -239,80 +254,103 @@ def test_large_object(r, client, s3): # retrieve and compare cache contents with open(file_path, 'r') as body: - assert(body.read() == data) + assert(body.read() == multipart_data) - datacache_path = '/tmp/rgw_d4n_datacache/' - datacache = subprocess.check_output(['ls', datacache_path]) - datacache = datacache.decode('latin-1').splitlines() + time.sleep(0.1) + bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip() + datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/mymultipart/' + datacache = subprocess.check_output(['ls', '-a', datacache_path]) + datacache = datacache.decode('latin-1').splitlines()[2:] for file in datacache: - ofs = int(file.split("_")[3]) - size = int(file.split("_")[4]) - output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') - assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest()) + if '#' in file: # data blocks + ofs = int(file.split("#")[1]) + size = file.split("#")[2] + if '_' in file: # account for temp files + size = size.split("_")[0] - output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart']) - attrs = json.loads(output.decode('latin-1')) + output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') + assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest()) - for entry in r.scan_iter("bkt_mymultipart_*"): + data = {} + for entry in r.scan_iter("*_mymultipart_*"): data = r.hgetall(entry) - name = entry.split("_") + entry_name = entry.split("_") # directory entry comparisons - assert(data.get('blockID') == name[2]) - assert(data.get('version') == attrs.get('tag')) - assert(data.get('size') == name[3]) + if len(entry_name) == 6: # versioned block + assert(data.get('blockID') == entry_name[4]) + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == entry_name[5]) + assert(data.get('globalWeight') == '0') + assert(data.get('objName') == '_:null_mymultipart') + assert(data.get('bucketName') == bucketID) + assert(data.get('dirty') == '0') + assert(data.get('hosts') == '127.0.0.1:6379') + continue + + assert(data.get('blockID') == entry_name[2]) + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == entry_name[3]) assert(data.get('globalWeight') == '0') - assert(data.get('blockHosts') == '127.0.0.1:6379') assert(data.get('objName') == 'mymultipart') - assert(data.get('bucketName') == 'bkt') - assert(data.get('creationTime') == attrs.get('mtime')) + assert(data.get('bucketName') == bucketID) assert(data.get('dirty') == '0') - assert(data.get('objHosts') == '') - - # repopulate cache - (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata) - client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + assert(data.get('hosts') == '127.0.0.1:6379') - #second get - s3.Object(bucket_name, key).download_file(file_path) + # second get + try: + s3.Object(bucket_name, key).download_file(file_path) + except botocore.exceptions.ClientError as e: + log.error("ERROR: " + e) + raise # check logs to ensure object was retrieved from cache - res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log']) + oid_in_cache = bucketID + "#" + data.get('version') + "mymultipart#0" + data.get('size') + res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log']) assert(res >= 1) # retrieve and compare cache contents with open(file_path, 'r') as body: - assert(body.read() == data) + assert(body.read() == multipart_data) - datacache_path = '/tmp/rgw_d4n_datacache/' - datacache = subprocess.check_output(['ls', datacache_path]) - datacache = datacache.decode('latin-1').splitlines() + datacache = subprocess.check_output(['ls', '-a', datacache_path]) + datacache = datacache.decode('latin-1').splitlines()[2:] for file in datacache: - ofs = int(file.split("_")[3]) - size = int(file.split("_")[4]) - output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') - assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest()) - - output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart']) - attrs = json.loads(output.decode('latin-1')) + if '#' in file: # data blocks + ofs = int(file.split("#")[1]) + size = file.split("#")[2] + if '_' in file: # account for temp files + size = size.split("_")[0] - for key in r.scan_iter("bkt_mymultipart_*"): - data = r.hgetall(key) - name = key.split("_") + output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1') + assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest()) - # directory entry comparisons - assert(data.get('blockID') == name[2]) - assert(data.get('version') == attrs.get('tag')) - assert(data.get('size') == name[3]) + for entry in r.scan_iter("*_mymultipart_*"): + data = r.hgetall(entry) + entry_name = entry.split("_") + + # directory entries should remain consistent + if len(entry_name) == 6: # versioned block + assert(data.get('blockID') == entry_name[4]) + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == entry_name[5]) + assert(data.get('globalWeight') == '0') + assert(data.get('objName') == '_:null_mymultipart') + assert(data.get('bucketName') == bucketID) + assert(data.get('dirty') == '0') + assert(data.get('hosts') == '127.0.0.1:6379') + continue + + assert(data.get('blockID') == entry_name[2]) + assert(data.get('deleteMarker') == '0') + assert(data.get('size') == entry_name[3]) assert(data.get('globalWeight') == '0') - assert(data.get('blockHosts') == '127.0.0.1:6379') assert(data.get('objName') == 'mymultipart') - assert(data.get('bucketName') == 'bkt') - assert(data.get('creationTime') == attrs.get('mtime')) + assert(data.get('bucketName') == bucketID) assert(data.get('dirty') == '0') - assert(data.get('objHosts') == '') + assert(data.get('hosts') == '127.0.0.1:6379') r.flushall() @@ -346,14 +384,13 @@ def main(): bucket = s3.Bucket('bkt') bucket.create() - obj = s3.Object(bucket_name='bkt', key='test.txt') # Check for Redis instance try: connection = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) connection.ping() except: - log.debug("ERROR: Redis instance not running.") + log.error("ERROR: Redis instance not running.") raise # Create s3cmd config @@ -363,10 +400,15 @@ def main(): r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) # Run small object test - test_small_object(r, client, obj) + test_small_object(r, client, s3) # Run large object test test_large_object(r, client, s3) + + # close filter client + filter_client = [client for client in r.client_list() + if client.get('name') in ['D4N.Filter']] + r.client_kill_filter(_id=filter_client[0].get('id')) log.info("D4NFilterTest completed.") -- 2.39.5