#!/usr/bin/python3
'''
-This workunits tests the functionality of the D4N read workflow on a small object of size 4.
+This workunit tests the functionality of the D4N read workflow on a small object of size 4 and
+a multipart object of a randomly generated size. Each test runs the following workflow:
+
+1. Upload the object
+2. Perform a GET call (object should be retrieved from backend)
+3. Compare the cached object's contents to the original object
+4. Check the directory contents
+5. Perform another GET call (object should be retrieved from datacache)
+6. Compare the cached object's contents to the original object
+7. Check the directory contents once more
'''
import logging as log
from configobj import ConfigObj
+import botocore
import boto3
import redis
import subprocess
-import json
import os
import hashlib
import string
import random
+import time
log.basicConfig(level=log.DEBUG)
got = got.decode()
return got
-def test_small_object(r, client, obj):
+def test_small_object(r, client, s3):
+ obj = s3.Object(bucket_name='bkt', key='test.txt')
test_txt = 'test'
response_put = obj.put(Body=test_txt)
body = get_body(response_get)
assert(body == "test")
- data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
- data = data.decode('latin-1').strip()
- output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
-
+ bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
+ datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/test.txt/'
+ datacache = subprocess.check_output(['ls', '-a', datacache_path])
+ datacache = datacache.decode('latin-1').strip().splitlines()
+ if '#' in datacache[3]: # datablock key
+ datacache = datacache[3]
+ else:
+ datacache = datacache[2]
+ output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
- data = r.hgetall('bkt_test.txt_0_4')
- output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
- attrs = json.loads(output.decode('latin-1'))
-
- # directory entry comparisons
- assert(data.get('blockID') == '0')
- assert(data.get('version') == attrs.get('tag'))
- assert(data.get('size') == '4')
- assert(data.get('globalWeight') == '0')
- assert(data.get('blockHosts') == '127.0.0.1:6379')
- assert(data.get('objName') == 'test.txt')
- assert(data.get('bucketName') == 'bkt')
- assert(data.get('creationTime') == attrs.get('mtime'))
- assert(data.get('dirty') == '0')
- assert(data.get('objHosts') == '')
-
- # repopulate cache
- response_put = obj.put(Body=test_txt)
- assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200)
+ data = {}
+ for entry in r.scan_iter("*_test.txt_0_4"):
+ data = r.hgetall(entry)
+
+ # directory entry comparisons
+ assert(data.get('blockID') == '0')
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == '4')
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('objName') == 'test.txt')
+ assert(data.get('bucketName') == bucketID)
+ assert(data.get('dirty') == '0')
+ assert(data.get('hosts') == '127.0.0.1:6379')
# second get call
response_get = obj.get()
assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200)
# check logs to ensure object was retrieved from cache
- res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ oid_in_cache = bucketID + "#" + data.get('version') + "test.txt#0" + data.get('size')
+ res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
assert(res >= 1)
# retrieve and compare cache contents
body = get_body(response_get)
assert(body == "test")
- data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
- data = data.decode('latin-1').strip()
- output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
-
+ datacache = subprocess.check_output(['ls', '-a', datacache_path])
+ datacache = datacache.decode('latin-1').strip().splitlines()
+ if '#' in datacache[3]: # datablock key
+ datacache = datacache[3]
+ else:
+ datacache = datacache[2]
+ output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
- data = r.hgetall('bkt_test.txt_0_4')
- output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
- attrs = json.loads(output.decode('latin-1'))
-
- # directory entries should remain consistent
- assert(data.get('blockID') == '0')
- assert(data.get('version') == attrs.get('tag'))
- assert(data.get('size') == '4')
- assert(data.get('globalWeight') == '0')
- assert(data.get('blockHosts') == '127.0.0.1:6379')
- assert(data.get('objName') == 'test.txt')
- assert(data.get('bucketName') == 'bkt')
- assert(data.get('creationTime') == attrs.get('mtime'))
- assert(data.get('dirty') == '0')
- assert(data.get('objHosts') == '')
+ for entry in r.scan_iter("*_test.txt_0_4"):
+ data = r.hgetall(entry)
+
+ # directory entries should remain consistent
+ assert(data.get('blockID') == '0')
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == '4')
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('objName') == 'test.txt')
+ assert(data.get('bucketName') == bucketID)
+ assert(data.get('dirty') == '0')
+ assert(data.get('hosts') == '127.0.0.1:6379')
r.flushall()
objlen = 30 * 1024 * 1024
metadata = {'foo': 'bar'}
- (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+ (upload_id, multipart_data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
file_path = os.path.dirname(__file__)+'mymultipart'
# first get
- s3.Object(bucket_name, key).download_file(file_path)
+ try:
+ s3.Object(bucket_name, key).download_file(file_path)
+ except botocore.exceptions.ClientError as e:
+ log.error("ERROR: " + e)
+ raise
# check logs to ensure object was retrieved from storage backend
res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
# retrieve and compare cache contents
with open(file_path, 'r') as body:
- assert(body.read() == data)
+ assert(body.read() == multipart_data)
- datacache_path = '/tmp/rgw_d4n_datacache/'
- datacache = subprocess.check_output(['ls', datacache_path])
- datacache = datacache.decode('latin-1').splitlines()
+ time.sleep(0.1)
+ bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
+ datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/mymultipart/'
+ datacache = subprocess.check_output(['ls', '-a', datacache_path])
+ datacache = datacache.decode('latin-1').splitlines()[2:]
for file in datacache:
- ofs = int(file.split("_")[3])
- size = int(file.split("_")[4])
- output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
- assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+ if '#' in file: # data blocks
+ ofs = int(file.split("#")[1])
+ size = file.split("#")[2]
+ if '_' in file: # account for temp files
+ size = size.split("_")[0]
- output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
- attrs = json.loads(output.decode('latin-1'))
+ output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+ assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
- for entry in r.scan_iter("bkt_mymultipart_*"):
+ data = {}
+ for entry in r.scan_iter("*_mymultipart_*"):
data = r.hgetall(entry)
- name = entry.split("_")
+ entry_name = entry.split("_")
# directory entry comparisons
- assert(data.get('blockID') == name[2])
- assert(data.get('version') == attrs.get('tag'))
- assert(data.get('size') == name[3])
+ if len(entry_name) == 6: # versioned block
+ assert(data.get('blockID') == entry_name[4])
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == entry_name[5])
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('objName') == '_:null_mymultipart')
+ assert(data.get('bucketName') == bucketID)
+ assert(data.get('dirty') == '0')
+ assert(data.get('hosts') == '127.0.0.1:6379')
+ continue
+
+ assert(data.get('blockID') == entry_name[2])
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == entry_name[3])
assert(data.get('globalWeight') == '0')
- assert(data.get('blockHosts') == '127.0.0.1:6379')
assert(data.get('objName') == 'mymultipart')
- assert(data.get('bucketName') == 'bkt')
- assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('bucketName') == bucketID)
assert(data.get('dirty') == '0')
- assert(data.get('objHosts') == '')
-
- # repopulate cache
- (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
- client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ assert(data.get('hosts') == '127.0.0.1:6379')
- #second get
- s3.Object(bucket_name, key).download_file(file_path)
+ # second get
+ try:
+ s3.Object(bucket_name, key).download_file(file_path)
+ except botocore.exceptions.ClientError as e:
+ log.error("ERROR: " + e)
+ raise
# check logs to ensure object was retrieved from cache
- res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ oid_in_cache = bucketID + "#" + data.get('version') + "mymultipart#0" + data.get('size')
+ res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
assert(res >= 1)
# retrieve and compare cache contents
with open(file_path, 'r') as body:
- assert(body.read() == data)
+ assert(body.read() == multipart_data)
- datacache_path = '/tmp/rgw_d4n_datacache/'
- datacache = subprocess.check_output(['ls', datacache_path])
- datacache = datacache.decode('latin-1').splitlines()
+ datacache = subprocess.check_output(['ls', '-a', datacache_path])
+ datacache = datacache.decode('latin-1').splitlines()[2:]
for file in datacache:
- ofs = int(file.split("_")[3])
- size = int(file.split("_")[4])
- output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
- assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
-
- output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
- attrs = json.loads(output.decode('latin-1'))
+ if '#' in file: # data blocks
+ ofs = int(file.split("#")[1])
+ size = file.split("#")[2]
+ if '_' in file: # account for temp files
+ size = size.split("_")[0]
- for key in r.scan_iter("bkt_mymultipart_*"):
- data = r.hgetall(key)
- name = key.split("_")
+ output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+ assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
- # directory entry comparisons
- assert(data.get('blockID') == name[2])
- assert(data.get('version') == attrs.get('tag'))
- assert(data.get('size') == name[3])
+ for entry in r.scan_iter("*_mymultipart_*"):
+ data = r.hgetall(entry)
+ entry_name = entry.split("_")
+
+ # directory entries should remain consistent
+ if len(entry_name) == 6: # versioned block
+ assert(data.get('blockID') == entry_name[4])
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == entry_name[5])
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('objName') == '_:null_mymultipart')
+ assert(data.get('bucketName') == bucketID)
+ assert(data.get('dirty') == '0')
+ assert(data.get('hosts') == '127.0.0.1:6379')
+ continue
+
+ assert(data.get('blockID') == entry_name[2])
+ assert(data.get('deleteMarker') == '0')
+ assert(data.get('size') == entry_name[3])
assert(data.get('globalWeight') == '0')
- assert(data.get('blockHosts') == '127.0.0.1:6379')
assert(data.get('objName') == 'mymultipart')
- assert(data.get('bucketName') == 'bkt')
- assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('bucketName') == bucketID)
assert(data.get('dirty') == '0')
- assert(data.get('objHosts') == '')
+ assert(data.get('hosts') == '127.0.0.1:6379')
r.flushall()
bucket = s3.Bucket('bkt')
bucket.create()
- obj = s3.Object(bucket_name='bkt', key='test.txt')
# Check for Redis instance
try:
connection = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
connection.ping()
except:
- log.debug("ERROR: Redis instance not running.")
+ log.error("ERROR: Redis instance not running.")
raise
# Create s3cmd config
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Run small object test
- test_small_object(r, client, obj)
+ test_small_object(r, client, s3)
# Run large object test
test_large_object(r, client, s3)
+
+ # close filter client
+ filter_client = [client for client in r.client_list()
+ if client.get('name') in ['D4N.Filter']]
+ r.client_kill_filter(_id=filter_client[0].get('id'))
log.info("D4NFilterTest completed.")