#!/usr/bin/python3
+'''
+This workunits tests the functionality of the D4N read workflow on a small object of size 4.
+'''
+
import logging as log
from configobj import ConfigObj
import boto3
import redis
import subprocess
import json
+import os
+import hashlib
log.basicConfig(level=log.DEBUG)
out = out.strip('\n')
return out
-def test_directory_methods(r, client, obj):
- test_txt = b'test'
+def get_body(response):
+ body = response['Body']
+ got = body.read()
+ if type(got) is bytes:
+ got = got.decode()
+ return got
- # setValue call
- response_put = obj.put(Body=test_txt)
+def test_small_object(r, client, obj):
+ test_txt = 'test'
+ response_put = obj.put(Body=test_txt)
assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200)
- data = r.hgetall('rgw-object:test.txt:directory')
-
- assert(data.get('key') == 'rgw-object:test.txt:directory')
- assert(data.get('size') == '4')
- assert(data.get('bucket_name') == 'bkt')
- assert(data.get('obj_name') == 'test.txt')
- assert(data.get('hosts') == '127.0.0.1:6379')
-
- # getValue call
+ # first get call
response_get = obj.get()
-
assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200)
- data = r.hgetall('rgw-object:test.txt:directory')
+ # check logs to ensure object was retrieved from storage backend
+ res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ assert(res >= 1)
- assert(data.get('key') == 'rgw-object:test.txt:directory')
- assert(data.get('size') == '4')
- assert(data.get('bucket_name') == 'bkt')
- assert(data.get('obj_name') == 'test.txt')
- assert(data.get('hosts') == '127.0.0.1:6379')
-
- # delValue call
- response_del = obj.delete()
+ # retrieve and compare cache contents
+ body = get_body(response_get)
+ assert(body == "test")
- assert(response_del.get('ResponseMetadata').get('HTTPStatusCode') == 204)
- assert(r.exists('rgw-object:test.txt:directory') == False)
+ data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
+ data = data.decode('latin-1').strip()
+ output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
- r.flushall()
+ assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
-def test_cache_methods(r, client, obj):
- test_txt = b'test'
+ data = r.hgetall('bkt_test.txt_0_4')
+ output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
+ attrs = json.loads(output.decode('latin-1'))
- # setObject call
+ # directory entry comparisons
+ assert(data.get('blockID') == '0')
+ assert(data.get('version') == attrs.get('tag'))
+ assert(data.get('size') == '4')
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('blockHosts') == '127.0.0.1:6379')
+ assert(data.get('objName') == 'test.txt')
+ assert(data.get('bucketName') == 'bkt')
+ assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('dirty') == '0')
+ assert(data.get('objHosts') == '')
+
+ # repopulate cache
response_put = obj.put(Body=test_txt)
-
assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200)
- data = r.hgetall('rgw-object:test.txt:cache')
- output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
- attrs = json.loads(output.decode('latin-1'))
-
- assert((data.get(b'user.rgw.tail_tag')) == attrs.get('attrs').get('user.rgw.tail_tag').encode("latin-1") + b'\x00')
- assert((data.get(b'user.rgw.idtag')) == attrs.get('tag').encode("latin-1") + b'\x00')
- assert((data.get(b'user.rgw.etag')) == attrs.get('etag').encode("latin-1"))
- assert((data.get(b'user.rgw.x-amz-content-sha256')) == attrs.get('attrs').get('user.rgw.x-amz-content-sha256').encode("latin-1") + b'\x00')
- assert((data.get(b'user.rgw.x-amz-date')) == attrs.get('attrs').get('user.rgw.x-amz-date').encode("latin-1") + b'\x00')
-
- tmp1 = '\x08\x06L\x01\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x06\x84\x00\x00\x00\n\nj\x00\x00\x00\x03\x00\x00\x00bkt+\x00\x00\x00'
- tmp2 = '+\x00\x00\x00'
- tmp3 = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\b\x00\x00\x00test.txt\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00!\x00\x00\x00'
- tmp4 = '\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01 \x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
- '\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00default-placement\x11\x00\x00\x00default-placement\x00\x00\x00\x00\x02\x02\x18' \
- '\x00\x00\x00\x04\x00\x00\x00none\x01\x01\t\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- assert(data.get(b'user.rgw.manifest') == tmp1.encode("latin-1") + attrs.get('manifest').get('tail_placement').get('bucket').get('bucket_id').encode("utf-8")
- + tmp2.encode("latin-1") + attrs.get('manifest').get('tail_placement').get('bucket').get('bucket_id').encode("utf-8")
- + tmp3.encode("latin-1") + attrs.get('manifest').get('prefix').encode("utf-8")
- + tmp4.encode("latin-1"))
-
- tmp5 = '\x02\x02\x81\x00\x00\x00\x03\x02\x11\x00\x00\x00\x06\x00\x00\x00s3main\x03\x00\x00\x00Foo\x04\x03d\x00\x00\x00\x01\x01\x00\x00\x00\x06\x00\x00' \
- '\x00s3main\x0f\x00\x00\x00\x01\x00\x00\x00\x06\x00\x00\x00s3main\x05\x035\x00\x00\x00\x02\x02\x04\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00s3main' \
- '\x00\x00\x00\x00\x00\x00\x00\x00\x02\x02\x04\x00\x00\x00\x0f\x00\x00\x00\x03\x00\x00\x00Foo\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
- '\x00\x00\x00'
- assert((data.get(b'user.rgw.acl')) == tmp5.encode("latin-1"))
-
- # getObject call
+ # second get call
response_get = obj.get()
-
assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200)
- # Copy to new object with 'COPY' directive; metadata value should not change
- obj.metadata.update({'test':'value'})
- m = obj.metadata
- m['test'] = 'value_replace'
-
- # copyObject call
- client.copy_object(Bucket='bkt', Key='test_copy.txt', CopySource='bkt/test.txt', Metadata = m, MetadataDirective='COPY')
-
- assert(r.hexists('rgw-object:test_copy.txt:cache', b'user.rgw.x-amz-meta-test') == 0)
+ # check logs to ensure object was retrieved from cache
+ res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
+ assert(res >= 1)
- # Update object with 'REPLACE' directive; metadata value should change
- client.copy_object(Bucket='bkt', Key='test.txt', CopySource='bkt/test.txt', Metadata = m, MetadataDirective='REPLACE')
+ # retrieve and compare cache contents
+ body = get_body(response_get)
+ assert(body == "test")
- data = r.hget('rgw-object:test.txt:cache', b'user.rgw.x-amz-meta-test')
+ data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
+ data = data.decode('latin-1').strip()
+ output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
- assert(data == b'value_replace\x00')
+ assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
- # Ensure cache entry exists in cache before deletion
- assert(r.exists('rgw-object:test.txt:cache') == True)
-
- # delObject call
- response_del = obj.delete()
+ data = r.hgetall('bkt_test.txt_0_4')
+ output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
+ attrs = json.loads(output.decode('latin-1'))
- assert(response_del.get('ResponseMetadata').get('HTTPStatusCode') == 204)
- assert(r.exists('rgw-object:test.txt:cache') == False)
+ # directory entries should remain consistent
+ assert(data.get('blockID') == '0')
+ assert(data.get('version') == attrs.get('tag'))
+ assert(data.get('size') == '4')
+ assert(data.get('globalWeight') == '0')
+ assert(data.get('blockHosts') == '127.0.0.1:6379')
+ assert(data.get('objName') == 'test.txt')
+ assert(data.get('bucketName') == 'bkt')
+ assert(data.get('creationTime') == attrs.get('mtime'))
+ assert(data.get('dirty') == '0')
+ assert(data.get('objHosts') == '')
r.flushall()
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- test_directory_methods(r, client, obj)
-
- # Responses should not be decoded
- r = redis.Redis(host='localhost', port=6379, db=0)
-
- test_cache_methods(r, client, obj)
+ # Run small object test
+ test_small_object(r, client, obj)
- log.info("D4NFilterTest successfully completed.")
+ log.info("D4NFilterTest completed.")
main()
log.info("Completed D4N tests")
#endif
#include <filesystem>
+#include <errno.h>
namespace efs = std::filesystem;
namespace rgw { namespace cache {
if(partition_info.location.back() != '/') {
partition_info.location += "/";
}
+
try {
if (efs::exists(partition_info.location)) {
if (dpp->get_cct()->_conf->rgw_d4n_l1_evict_cache_on_start) {
ldpp_dout(dpp, 5) << "initialize: evicting the persistent storage directory on start" << dendl;
+
+ uid_t uid = dpp->get_cct()->get_set_uid();
+ gid_t gid = dpp->get_cct()->get_set_gid();
+
+ ldpp_dout(dpp, 5) << "initialize:: uid is " << uid << " and gid is " << gid << dendl;
+ ldpp_dout(dpp, 5) << "initialize:: changing permissions for datacache directory." << dendl;
+
+ if (uid) {
+ if (chown(partition_info.location.c_str(), uid, gid) == -1) {
+ ldpp_dout(dpp, 5) << "initialize: chown return error: " << strerror(errno) << dendl;
+ }
+
+ if (chmod(partition_info.location.c_str(), S_IRWXU|S_IRWXG|S_IRWXO) == -1) {
+ ldpp_dout(dpp, 5) << "initialize: chmod return error: " << strerror(errno) << dendl;
+ }
+ }
+
for (auto& p : efs::directory_iterator(partition_info.location)) {
efs::remove_all(p.path());
}
}
} else {
- ldpp_dout(dpp, 5) << "initialize:: creating the persistent storage directory on start" << dendl;
- efs::create_directories(partition_info.location);
+ ldpp_dout(dpp, 5) << "initialize:: creating the persistent storage directory on start: " << partition_info.location << dendl;
+ std::error_code ec;
+ if (!efs::create_directories(partition_info.location, ec)) {
+ ldpp_dout(dpp, 0) << "initialize::: ERROR initializing the cache storage directory: '" << partition_info.location <<
+ "' : " << ec.value() << dendl;
+ } else {
+ uid_t uid = dpp->get_cct()->get_set_uid();
+ gid_t gid = dpp->get_cct()->get_set_gid();
+
+ ldpp_dout(dpp, 5) << "initialize:: uid is " << uid << " and gid is " << gid << dendl;
+ ldpp_dout(dpp, 5) << "initialize:: changing permissions for datacache directory." << dendl;
+
+ if (uid) {
+ if (chown(partition_info.location.c_str(), uid, gid) == -1) {
+ ldpp_dout(dpp, 5) << "initialize: chown return error: " << strerror(errno) << dendl;
+ }
+
+ if (chmod(partition_info.location.c_str(), S_IRWXU|S_IRWXG|S_IRWXO) == -1) {
+ ldpp_dout(dpp, 5) << "initialize: chmod return error: " << strerror(errno) << dendl;
+ }
+ }
+ }
}
} catch (const efs::filesystem_error& e) {
ldpp_dout(dpp, 0) << "initialize::: ERROR initializing the cache storage directory '" << partition_info.location <<
} else {
(void)p.release();
}
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Before init.result.get()" << ret << dendl;
return init.result.get();
}