redis_exec_cp(dpp, redis_pool, ec, req, resp, y);
}
+int BucketDirectory::exist_key(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y) {
+ response<int> resp;
+
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", bucket_id);
+
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return std::get<0>(resp).value();
+}
+
int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline)
{
try {
return 0;
}
+int BucketDirectory::del(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+
+ req.push("UNLINK", bucket_id);
+
+ response<int> resp;
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != 1) {
+ ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -ENOENT;
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
std::string ObjectDirectory::build_index(CacheObj* object)
{
return object->bucketName + "_" + object->objName;
}
}
+int ObjectDirectory::del(const DoutPrefixProvider* dpp, std::vector<CacheObj>& objects, optional_yield y)
+{
+ request req;
+ for (auto object : objects) {
+ std::string key = build_index(&object);
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+ req.push("UNLINK", key);
+ }
+
+ try {
+ boost::system::error_code ec;
+ boost::redis::generic_response resp;
+
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
std::string key = build_index(object);
boost::system::error_code ec;
response<int> resp;
request req;
- req.push("DEL", key);
+ req.push("UNLINK", key);
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return false;
+ return -ec.value();
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
}
}
+int BlockDirectory::del(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
+{
+ request req;
+ for (auto block : blocks) {
+ std::string key = build_index(&block);
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+ req.push("UNLINK", key);
+ }
+
+ try {
+ boost::system::error_code ec;
+ boost::redis::generic_response resp;
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
std::string key = build_index(block);
try {
boost::system::error_code ec;
request req;
- req.push("DEL", key);
+ req.push("UNLINK", key);
response<int> resp;
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (!std::get<0>(resp).value()) {
} //end - else
rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir();
+ std::vector<std::string> dir_blocks_objNames;
int remainder_size = entries.size();
size_t j = 0, start_j = 0;
while (remainder_size > 0) {
}
blocks[i].cacheObj.bucketName = this->get_bucket_id();
ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " blocks[i].cacheObj.objName: " << blocks[i].cacheObj.objName << dendl;
+
+ if (return_blocks) {
+ dir_blocks_objNames.push_back(blocks[i].cacheObj.objName);
+ }
j++;
}
auto ret = blockDir->get(dpp, blocks, y);
return ret;
}
+ unsigned int i = 0;
for (auto block : blocks) {
+ if (return_blocks) {
+ if (i >= dir_blocks_objNames.size() || i >= block.cacheObj.objName.size()) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " Unable to construct list of cached blocks." << dendl;
+ return -EINVAL;
+ }
+ if (dir_blocks_objNames[i] == block.cacheObj.objName) {
+ dir_blocks.insert(std::make_pair(dir_blocks_objNames[i], block));
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " Error constructing list of cached blocks while handling object " << dir_blocks_objNames[i] << dendl;
+ return -EINVAL;
+ }
+ }
+ i++;
+
if (block.cacheObj.objName.empty()) {
start_j++;
continue;
return 0;
}
+constexpr int OBJECT_LIST_VAL = 1000;
+constexpr int PIPELINE_MAX = 10000;
+int D4NFilterBucket::remove(const DoutPrefixProvider* dpp,
+ bool delete_children,
+ optional_yield y)
+{
+ ListParams params;
+ params.list_versions = true;
+ ListResults results;
+ int ret;
+
+ return_blocks = true;
+ auto blockDir = this->filter->get_block_dir();
+ auto objDir = this->filter->get_obj_dir();
+ std::vector<rgw::d4n::CacheBlock> blocks;
+ std::vector<rgw::d4n::CacheObj> objects;
+
+ do {
+ results.objs.clear();
+
+ ret = list(dpp, params, OBJECT_LIST_VAL, results, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (!results.objs.empty() && !delete_children) {
+ ldpp_dout(dpp, 10) << "ERROR: could not remove non-empty bucket " << this->get_name() << dendl;
+ return -ENOTEMPTY;
+ }
+
+ for (const auto& obj : results.objs) {
+ if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) {
+ if ((ret = blockDir->del(dpp, blocks, y) < 0)) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl;
+ return ret;
+ }
+ blocks.clear();
+ }
+
+ // Handle head objects
+ std::unique_ptr<rgw::sal::Object> c_obj = this->get_object(obj.key);
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): handling object=" << obj.key << dendl;
+
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = c_obj->get_name(),
+ .bucketName = this->get_bucket_id(),
+ };
+
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = object,
+ .blockID = 0,
+ .size = 0,
+ };
+
+ blocks.push_back(block);
+ objects.push_back(object);
+
+ std::string oid_version;
+ if (c_obj->have_instance()) {
+ oid_version = c_obj->get_instance();
+ } else {
+ oid_version = "null";
+ }
+ off_t lst = obj.meta.size;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Obj size=" << lst << dendl;
+ block.cacheObj.objName = "_:" + oid_version + "_" + block.cacheObj.objName;
+ auto it = dir_blocks.find(block.cacheObj.objName);
+ if (it != dir_blocks.end() && it->second.cacheObj.dirty) {
+ if (!this->filter->get_policy_driver()->get_cache_policy()->invalidate_dirty_object(dpp, get_cache_block_prefix(c_obj.get(), it->second.version))) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to invalidate obj=" << c_obj->get_name() << " in cache" << dendl;
+ return -EINVAL;
+ }
+ /* For clean objects in the cache, inline deletes are avoided in favor of lazy deletes that occur through
+ * later eviction calls. */
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Listing retrieved from backend for object " << c_obj->get_name() << dendl;
+ }
+
+ // Handle versioned head objects
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): versioned oid: " << block.cacheObj.objName << dendl;
+ blocks.push_back(block);
+
+ // Handle data blocks
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Object size=" << lst << dendl;
+ off_t fst = 0;
+ do {
+ /* The addition of data blocks to the blocks structure may push its size over PIPELINE_MAX, so
+ * pipelined calls must also made during this loop. */
+ if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) {
+ if ((ret = blockDir->del(dpp, blocks, y) < 0)) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl;
+ return ret;
+ }
+ blocks.clear();
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): handling object=" << obj.key << dendl;
+ rgw::d4n::CacheBlock data_block;
+ if (fst >= lst) {
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ data_block.cacheObj.bucketName = this->get_bucket_id();
+ data_block.cacheObj.objName = c_obj->get_oid();
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): data_block=" << data_block.cacheObj.objName << dendl;
+ data_block.size = cur_len;
+ data_block.blockID = fst;
+
+ fst += cur_len;
+ blocks.push_back(data_block);
+
+ } while (fst < lst); // end - do
+ }
+
+ /* Use pipelining for batches of ~10k commands since that is the max suggested
+ * in redis docs */
+ if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) {
+ if ((ret = blockDir->del(dpp, blocks, y) < 0)) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl;
+ return ret;
+ }
+ blocks.clear();
+ }
+ if ((PIPELINE_MAX - objects.size()) <= OBJECT_LIST_VAL) {
+ if ((ret = objDir->del(dpp, objects, y)) < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl;
+ return ret;
+ }
+ objects.clear();
+ }
+ } while (results.is_truncated);
+
+ // One more delete to clean up remaining blocks if present
+ if (blocks.size()) {
+ if ((ret = blockDir->del(dpp, blocks, y) < 0)) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl;
+ return ret;
+ }
+ }
+ if (objects.size()) {
+ if ((ret = objDir->del(dpp, objects, y)) < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl;
+ return ret;
+ }
+ }
+ if ((ret = this->filter->get_bucket_dir()->del(dpp, this->get_bucket_id(), y)) < 0 && (ret != -ENOENT)) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl;
+ return ret;
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): calling next->remove" << dendl;
+ return next->remove(dpp, delete_children, y);
+}
+
+int D4NFilterBucket::check_empty(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ // if the bucket exists in the bucket directory, then there are objects in the local cache
+ int ret;
+ if ((ret = this->filter->get_bucket_dir()->exist_key(dpp, this->get_bucket_id(), y)) < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to retrieve bucket in bucket directory, ret=" << ret << dendl;
+ return ret;
+ } if (ret == 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): calling next->check_empty" << dendl;
+ return next->check_empty(dpp, y);
+ } else {
+ return -ENOTEMPTY;
+ }
+}
+
std::unique_ptr<MultipartUpload> D4NFilterBucket::get_multipart_upload(
const std::string& oid,
std::optional<std::string> upload_id,
fs::remove_all(TEST_DIR);
fs::create_directory(TEST_DIR);
+ env->cct->_conf->rgw_redis_connection_pool_size = 1;
env->cct->_conf->rgw_d4n_cache_cleaning_interval = 1;
rgw_user uid{"test_tenant", "test_filter"};
owner = uid;
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
- //EXPECT_EQ((int)std::get<0>(resp).value(), 0); // TODO: Object entry is not deleted
+ EXPECT_EQ((int)std::get<0>(resp).value(), 0);
EXPECT_EQ((int)std::get<1>(resp).value(), 0);
EXPECT_EQ((int)std::get<2>(resp).value(), 0);
EXPECT_EQ((int)std::get<3>(resp).value(), 0);
io.run();
}
+TEST_F(D4NFilterFixture, BucketRemoveBeforeCleaning)
+{
+ env->cct->_conf->d4n_writecache_enabled = true;
+ const std::string testName = "PutObjectWrite";
+ const std::string testName_1 = "PutObjectWrite_1";
+ const std::string testName_2 = "PutObjectWrite_2";
+ const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1";
+ std::string instance;
+
+ net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &instance] (net::yield_context yield) {
+ init_driver(yield);
+ create_bucket(testName, yield);
+ testBucket->get_info().bucket.bucket_id = bucketName;
+ put_object(testName, yield);
+ put_version_enabled_object(testName_1, instance, yield);
+ put_version_suspended_object(testName_2, yield);
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY);
+ std::string version, version_1, version_2;
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version");
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0", "version");
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0", "version");
+
+ response<std::string, std::string, std::string> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ version = std::get<0>(resp).value();
+ version_1 = std::get<1>(resp).value();
+ version_2 = std::get<2>(resp).value();
+ }
+
+ EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0);
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", bucketName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0");
+ req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0");
+ req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName_2 + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs));
+ req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_" + std::to_string(ofs));
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_" + std::to_string(ofs));
+
+ response<int, int, int, int, int, int,
+ int, int, int, int, int, int, int > resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value(), 0);
+ EXPECT_EQ(std::get<1>(resp).value(), 0);
+ EXPECT_EQ(std::get<2>(resp).value(), 0);
+ EXPECT_EQ(std::get<3>(resp).value(), 0);
+ EXPECT_EQ(std::get<4>(resp).value(), 0);
+ EXPECT_EQ(std::get<5>(resp).value(), 0);
+ EXPECT_EQ(std::get<6>(resp).value(), 0);
+ EXPECT_EQ(std::get<7>(resp).value(), 0);
+ EXPECT_EQ(std::get<8>(resp).value(), 0);
+ EXPECT_EQ(std::get<9>(resp).value(), 0);
+ EXPECT_EQ(std::get<10>(resp).value(), 0);
+ EXPECT_EQ(std::get<11>(resp).value(), 0);
+ EXPECT_EQ(std::get<12>(resp).value(), 0);
+ }
+
+ std::string attr_val;
+ std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "1");
+
+ location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_1 + "/" + version_1;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "1");
+
+ location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_2 + "/" + version_2;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "1");
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0);
+
+ conn->cancel();
+ driver->shutdown();
+ DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
+ }, rethrow);
+
+ io.run();
+}
+
+TEST_F(D4NFilterFixture, BucketRemoveAfterCleaning)
+{
+ env->cct->_conf->d4n_writecache_enabled = true;
+ env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0;
+ const std::string testName = "PutObjectWrite";
+ const std::string testName_1 = "PutObjectWrite_1";
+ const std::string testName_2 = "PutObjectWrite_2";
+ const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1";
+ std::string instance;
+ std::string version, version_1, version_2;
+
+ net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &instance, &version, &version_1, &version_2] (net::yield_context yield) {
+ init_driver(yield);
+ create_bucket(testName, yield);
+ testBucket->get_info().bucket.bucket_id = bucketName;
+ put_object(testName, yield);
+ put_version_enabled_object(testName_1, instance, yield);
+ put_version_suspended_object(testName_2, yield);
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version");
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0", "version");
+ req.push("HGET", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0", "version");
+
+ response<std::string, std::string, std::string> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ version = std::get<0>(resp).value();
+ version_1 = std::get<1>(resp).value();
+ version_2 = std::get<2>(resp).value();
+ }
+
+
+ dynamic_cast<rgw::d4n::LFUDAPolicy*>(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield);
+ }, rethrow);
+
+ io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete
+
+ net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &version, &version_1, &version_2] (net::yield_context yield) {
+ dynamic_cast<rgw::d4n::LFUDAPolicy*>(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield});
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY);
+ EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0);
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", bucketName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0");
+ req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0");
+ req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName_2 + "_0_0");
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs));
+ req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_" + std::to_string(ofs));
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_" + std::to_string(ofs));
+
+ response<int, int, int, int, int, int,
+ int, int, int, int, int, int, int > resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value(), 0);
+ EXPECT_EQ(std::get<1>(resp).value(), 0);
+ EXPECT_EQ(std::get<2>(resp).value(), 0);
+ EXPECT_EQ(std::get<3>(resp).value(), 0);
+ EXPECT_EQ(std::get<4>(resp).value(), 0);
+ EXPECT_EQ(std::get<5>(resp).value(), 0);
+ EXPECT_EQ(std::get<6>(resp).value(), 0);
+ EXPECT_EQ(std::get<7>(resp).value(), 0);
+ EXPECT_EQ(std::get<8>(resp).value(), 0);
+ EXPECT_EQ(std::get<9>(resp).value(), 0);
+ EXPECT_EQ(std::get<10>(resp).value(), 0);
+ EXPECT_EQ(std::get<11>(resp).value(), 0);
+ EXPECT_EQ(std::get<12>(resp).value(), 0);
+ }
+
+ /* Eviction will eventually lazily delete leftover cache blocks, so simply ensure
+ * they are no longer dirty */
+ std::string attr_val;
+ std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "0");
+
+ location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_1 + "/" + version_1;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "0");
+
+ location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_2 + "/" + version_2;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "0");
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0);
+
+ conn->cancel();
+ driver->shutdown();
+ DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
+ }, rethrow);
+
+ io.run();
+}
+
+TEST_F(D4NFilterFixture, BucketRemoveDeleteMarker)
+{
+ env->cct->_conf->d4n_writecache_enabled = true;
+ const std::string testName = "PutObjectWrite";
+ const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1";
+ std::string instance_1, instance_2, instance_3;
+
+ net::spawn(io, [this, &testName, &bucketName, &instance_1, &instance_2, &instance_3] (net::yield_context yield) {
+ init_driver(yield);
+ create_bucket(testName, yield);
+ testBucket->get_info().bucket.bucket_id = bucketName;
+ put_version_enabled_object(testName, instance_1, yield);
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = objEnabled->get_delete_op();
+ objEnabled->set_instance("");
+ EXPECT_EQ(del_op->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0);
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY);
+ std::string version, delete_marker;
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1");
+
+ response< std::vector<std::string> > resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ delete_marker = std::get<0>(resp).value()[0];
+ version = std::get<0>(resp).value()[1];
+ }
+
+ EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0);
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", bucketName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName);
+ req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "__:" + version + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "__:" + delete_marker + TEST_OBJ + testName + "_0_0");
+ req.push("EXISTS", bucketName + "__:" + version + TEST_OBJ + testName + "_0_" + std::to_string(ofs));
+
+ response<int, int, int,
+ int, int, int > resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value(), 0);
+ EXPECT_EQ(std::get<1>(resp).value(), 0);
+ EXPECT_EQ(std::get<2>(resp).value(), 0);
+ EXPECT_EQ(std::get<3>(resp).value(), 0);
+ EXPECT_EQ(std::get<4>(resp).value(), 0);
+ EXPECT_EQ(std::get<5>(resp).value(), 0);
+ }
+
+ std::string attr_val;
+ std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "1");
+
+ location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + delete_marker;
+ EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DELETE_MARKER, attr_val, optional_yield({yield})), 0);
+ EXPECT_EQ(attr_val, "1");
+
+ EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0);
+ conn->cancel();
+ driver->shutdown();
+ DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
+ }, rethrow);
+
+ io.run();
+}
+
int main(int argc, char *argv[]) {
::testing::InitGoogleTest(&argc, argv);