import json
import os
import hashlib
+import string
+import random
log.basicConfig(level=log.DEBUG)
f.close()
log.info("s3cmd config written")
+def generate_random(size, part_size=5*1024*1024):
+ """
+ Generate the specified number random data.
+ (actually each MB is a repetition of the first KB)
+ """
+ chunk = 1024
+ allowed = string.ascii_letters
+ for x in range(0, size, part_size):
+ strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
+ s = ''
+ left = size - x
+ this_part_size = min(left, part_size)
+ for y in range(this_part_size // chunk):
+ s = s + strpart
+ if this_part_size > len(s):
+ s = s + strpart[0:this_part_size - len(s)]
+ yield s
+ if (x == size):
+ return
+
+def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]):
+ """
+ generate a multi-part upload for a random file of specifed size,
+ if requested, generate a list of the parts
+ return the upload descriptor
+ """
+
+ if content_type == None and metadata == None:
+ response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
+ else:
+ response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type)
+
+ upload_id = response['UploadId']
+ s = ''
+ parts = []
+ for i, part in enumerate(generate_random(size, part_size)):
+ # part_num is necessary because PartNumber for upload_part and in parts must start at 1 and i starts at 0
+ part_num = i+1
+ s += part
+ response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part)
+ parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num})
+ if i in resend_parts:
+ client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part)
+
+ return (upload_id, s, parts)
+
def get_cmd_output(cmd_out):
out = cmd_out.decode('utf8')
out = out.strip('\n')
r.flushall()
+def test_large_object(r, client, s3):
+ key="mymultipart"
+ bucket_name="bkt"
+ content_type='text/bla'
+ objlen = 30 * 1024 * 1024
+ metadata = {'foo': 'bar'}
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+ client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ file_path = os.path.dirname(__file__)+'mymultipart'
+
+ # first get
+ s3.Object(bucket_name, key).download_file(file_path)
+
+ # check logs to ensure object was retrieved from storage backend
+ res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ assert(res >= 1)
+
+ # retrieve and compare cache contents
+ with open(file_path, 'r') as body:
+ assert(body.read() == data)
+
+ datacache_path = '/tmp/rgw_d4n_datacache/'
+ datacache = subprocess.check_output(['ls', datacache_path])
+ datacache = datacache.decode('latin-1').splitlines()
+
+ for file in datacache:
+ ofs = int(file.split("_")[3])
+ size = int(file.split("_")[4])
+ output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+ assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+
+ output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
+ attrs = json.loads(output.decode('latin-1'))
+
+ for entry in r.scan_iter("bkt_mymultipart_*"):
+ data = r.hgetall(entry)
+ name = entry.split("_")
+
+ # directory entry comparisons
+ assert(data.get('blockID') == name[2])
+ assert(data.get('version') == attrs.get('tag'))
+ assert(data.get('size') == name[3])
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('blockHosts') == '127.0.0.1:6379')
+ assert(data.get('objName') == 'mymultipart')
+ assert(data.get('bucketName') == 'bkt')
+ assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('dirty') == '0')
+ assert(data.get('objHosts') == '')
+
+ # repopulate cache
+ (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
+ client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ #second get
+ s3.Object(bucket_name, key).download_file(file_path)
+
+ # check logs to ensure object was retrieved from cache
+ res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ assert(res >= 1)
+
+ # retrieve and compare cache contents
+ with open(file_path, 'r') as body:
+ assert(body.read() == data)
+
+ datacache_path = '/tmp/rgw_d4n_datacache/'
+ datacache = subprocess.check_output(['ls', datacache_path])
+ datacache = datacache.decode('latin-1').splitlines()
+
+ for file in datacache:
+ ofs = int(file.split("_")[3])
+ size = int(file.split("_")[4])
+ output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
+ assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
+
+ output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
+ attrs = json.loads(output.decode('latin-1'))
+
+ for key in r.scan_iter("bkt_mymultipart_*"):
+ data = r.hgetall(key)
+ name = key.split("_")
+
+ # directory entry comparisons
+ assert(data.get('blockID') == name[2])
+ assert(data.get('version') == attrs.get('tag'))
+ assert(data.get('size') == name[3])
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('blockHosts') == '127.0.0.1:6379')
+ assert(data.get('objName') == 'mymultipart')
+ assert(data.get('bucketName') == 'bkt')
+ assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('dirty') == '0')
+ assert(data.get('objHosts') == '')
+
+ r.flushall()
+
def main():
"""
execute the d4n test
# Run small object test
test_small_object(r, client, obj)
+ # Run large object test
+ test_large_object(r, client, s3)
+
log.info("D4NFilterTest completed.")
main()