From cda89f29c89c09fad924608a7e7ad5bc6ff2ec82 Mon Sep 17 00:00:00 2001 From: Samarah Date: Mon, 22 Sep 2025 15:16:40 +0000 Subject: [PATCH] rgw/d4n: Implement bucket check_empty and remove methods Signed-off-by: Samarah --- src/rgw/driver/d4n/d4n_directory.cc | 113 ++++++++++- src/rgw/driver/d4n/d4n_directory.h | 6 + src/rgw/driver/d4n/rgw_sal_d4n.cc | 190 ++++++++++++++++++ src/rgw/driver/d4n/rgw_sal_d4n.h | 5 + src/test/rgw/test_d4n_filter.cc | 291 +++++++++++++++++++++++++++- 5 files changed, 600 insertions(+), 5 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index daadabed8db..f7c57c21644 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -159,6 +159,28 @@ void redis_exec_connection_pool(const DoutPrefixProvider* dpp, redis_exec_cp(dpp, redis_pool, ec, req, resp, y); } +int BucketDirectory::exist_key(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y) { + response resp; + + try { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucket_id); + + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return std::get<0>(resp).value(); +} + int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline) { try { @@ -323,6 +345,34 @@ int BucketDirectory::zrank(const DoutPrefixProvider* dpp, const std::string& buc return 0; } +int BucketDirectory::del(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y) +{ + try { + boost::system::error_code ec; + request req; + + req.push("UNLINK", bucket_id); + + response resp; + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != 1) { + ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -ENOENT; + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + std::string ObjectDirectory::build_index(CacheObj* object) { return object->bucketName + "_" + object->objName; @@ -511,6 +561,34 @@ int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const } } +int ObjectDirectory::del(const DoutPrefixProvider* dpp, std::vector& objects, optional_yield y) +{ + request req; + for (auto object : objects) { + std::string key = build_index(&object); + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): index is: " << key << dendl; + + req.push("UNLINK", key); + } + + try { + boost::system::error_code ec; + boost::redis::generic_response resp; + + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) { std::string key = build_index(object); @@ -520,7 +598,7 @@ int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, option boost::system::error_code ec; response resp; request req; - req.push("DEL", key); + req.push("UNLINK", key); redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); @@ -826,10 +904,10 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return false; + return -ec.value(); } } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } @@ -1326,6 +1404,33 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const } } +int BlockDirectory::del(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) +{ + request req; + for (auto block : blocks) { + std::string key = build_index(&block); + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + + req.push("UNLINK", key); + } + + try { + boost::system::error_code ec; + boost::redis::generic_response resp; + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); @@ -1334,7 +1439,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option try { boost::system::error_code ec; request req; - req.push("DEL", key); + req.push("UNLINK", key); response resp; redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (!std::get<0>(resp).value()) { diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index f20cb00d9b2..a52c931e588 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -174,11 +174,13 @@ class Pipeline { class BucketDirectory: public Directory { public: BucketDirectory(std::shared_ptr& conn) : conn(conn) {} + int exist_key(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y); int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr); int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y); int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector& members, optional_yield y); int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector& members, uint64_t next_cursor, optional_yield y); int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y); + int del(const DoutPrefixProvider* dpp, const std::string& bucket_id, optional_yield y); private: std::shared_ptr conn; @@ -193,6 +195,8 @@ class ObjectDirectory: public Directory { int set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); /* If nx is true, set only if key doesn't exist */ int get(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); + //Pipelined version of del using boost::redis::generic_response for del bucket + int del(const DoutPrefixProvider* dpp, std::vector& objects, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr); @@ -226,6 +230,8 @@ class BlockDirectory: public Directory { //Pipelined version of get using boost::redis::generic_response int get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y); + //Pipelined version of del using boost::redis::generic_response for del bucket + int del(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 59c9573715c..20c7e3fc93a 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -451,6 +451,7 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int } //end - else rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir(); + std::vector dir_blocks_objNames; int remainder_size = entries.size(); size_t j = 0, start_j = 0; while (remainder_size > 0) { @@ -467,6 +468,10 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int } blocks[i].cacheObj.bucketName = this->get_bucket_id(); ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " blocks[i].cacheObj.objName: " << blocks[i].cacheObj.objName << dendl; + + if (return_blocks) { + dir_blocks_objNames.push_back(blocks[i].cacheObj.objName); + } j++; } auto ret = blockDir->get(dpp, blocks, y); @@ -475,7 +480,22 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int return ret; } + unsigned int i = 0; for (auto block : blocks) { + if (return_blocks) { + if (i >= dir_blocks_objNames.size() || i >= block.cacheObj.objName.size()) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " Unable to construct list of cached blocks." << dendl; + return -EINVAL; + } + if (dir_blocks_objNames[i] == block.cacheObj.objName) { + dir_blocks.insert(std::make_pair(dir_blocks_objNames[i], block)); + } else { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " Error constructing list of cached blocks while handling object " << dir_blocks_objNames[i] << dendl; + return -EINVAL; + } + } + i++; + if (block.cacheObj.objName.empty()) { start_j++; continue; @@ -624,6 +644,176 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int return 0; } +constexpr int OBJECT_LIST_VAL = 1000; +constexpr int PIPELINE_MAX = 10000; +int D4NFilterBucket::remove(const DoutPrefixProvider* dpp, + bool delete_children, + optional_yield y) +{ + ListParams params; + params.list_versions = true; + ListResults results; + int ret; + + return_blocks = true; + auto blockDir = this->filter->get_block_dir(); + auto objDir = this->filter->get_obj_dir(); + std::vector blocks; + std::vector objects; + + do { + results.objs.clear(); + + ret = list(dpp, params, OBJECT_LIST_VAL, results, y); + if (ret < 0) { + return ret; + } + + if (!results.objs.empty() && !delete_children) { + ldpp_dout(dpp, 10) << "ERROR: could not remove non-empty bucket " << this->get_name() << dendl; + return -ENOTEMPTY; + } + + for (const auto& obj : results.objs) { + if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) { + if ((ret = blockDir->del(dpp, blocks, y) < 0)) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl; + return ret; + } + blocks.clear(); + } + + // Handle head objects + std::unique_ptr c_obj = this->get_object(obj.key); + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): handling object=" << obj.key << dendl; + + rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ + .objName = c_obj->get_name(), + .bucketName = this->get_bucket_id(), + }; + + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = object, + .blockID = 0, + .size = 0, + }; + + blocks.push_back(block); + objects.push_back(object); + + std::string oid_version; + if (c_obj->have_instance()) { + oid_version = c_obj->get_instance(); + } else { + oid_version = "null"; + } + off_t lst = obj.meta.size; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Obj size=" << lst << dendl; + block.cacheObj.objName = "_:" + oid_version + "_" + block.cacheObj.objName; + auto it = dir_blocks.find(block.cacheObj.objName); + if (it != dir_blocks.end() && it->second.cacheObj.dirty) { + if (!this->filter->get_policy_driver()->get_cache_policy()->invalidate_dirty_object(dpp, get_cache_block_prefix(c_obj.get(), it->second.version))) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to invalidate obj=" << c_obj->get_name() << " in cache" << dendl; + return -EINVAL; + } + /* For clean objects in the cache, inline deletes are avoided in favor of lazy deletes that occur through + * later eviction calls. */ + } else { + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Listing retrieved from backend for object " << c_obj->get_name() << dendl; + } + + // Handle versioned head objects + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): versioned oid: " << block.cacheObj.objName << dendl; + blocks.push_back(block); + + // Handle data blocks + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): Object size=" << lst << dendl; + off_t fst = 0; + do { + /* The addition of data blocks to the blocks structure may push its size over PIPELINE_MAX, so + * pipelined calls must also made during this loop. */ + if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) { + if ((ret = blockDir->del(dpp, blocks, y) < 0)) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl; + return ret; + } + blocks.clear(); + } + + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): handling object=" << obj.key << dendl; + rgw::d4n::CacheBlock data_block; + if (fst >= lst) { + break; + } + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + data_block.cacheObj.bucketName = this->get_bucket_id(); + data_block.cacheObj.objName = c_obj->get_oid(); + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): data_block=" << data_block.cacheObj.objName << dendl; + data_block.size = cur_len; + data_block.blockID = fst; + + fst += cur_len; + blocks.push_back(data_block); + + } while (fst < lst); // end - do + } + + /* Use pipelining for batches of ~10k commands since that is the max suggested + * in redis docs */ + if (((PIPELINE_MAX - blocks.size()) <= OBJECT_LIST_VAL) || (blocks.size() > (PIPELINE_MAX - 1000))) { + if ((ret = blockDir->del(dpp, blocks, y) < 0)) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl; + return ret; + } + blocks.clear(); + } + if ((PIPELINE_MAX - objects.size()) <= OBJECT_LIST_VAL) { + if ((ret = objDir->del(dpp, objects, y)) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl; + return ret; + } + objects.clear(); + } + } while (results.is_truncated); + + // One more delete to clean up remaining blocks if present + if (blocks.size()) { + if ((ret = blockDir->del(dpp, blocks, y) < 0)) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete cached object in block directory, ret=" << ret << dendl; + return ret; + } + } + if (objects.size()) { + if ((ret = objDir->del(dpp, objects, y)) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl; + return ret; + } + } + if ((ret = this->filter->get_bucket_dir()->del(dpp, this->get_bucket_id(), y)) < 0 && (ret != -ENOENT)) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to delete bucket in bucket directory, ret=" << ret << dendl; + return ret; + } + + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): calling next->remove" << dendl; + return next->remove(dpp, delete_children, y); +} + +int D4NFilterBucket::check_empty(const DoutPrefixProvider* dpp, optional_yield y) +{ + // if the bucket exists in the bucket directory, then there are objects in the local cache + int ret; + if ((ret = this->filter->get_bucket_dir()->exist_key(dpp, this->get_bucket_id(), y)) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterBucket::" << __func__ << "(): Failed to retrieve bucket in bucket directory, ret=" << ret << dendl; + return ret; + } if (ret == 0) { + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << "(): calling next->check_empty" << dendl; + return next->check_empty(dpp, y); + } else { + return -ENOTEMPTY; + } +} + std::unique_ptr D4NFilterBucket::get_multipart_upload( const std::string& oid, std::optional upload_id, diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 9bf937d006c..373a01d7442 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -113,6 +113,8 @@ class D4NFilterBucket : public FilterBucket { }; D4NFilterDriver* filter; bool cache_request{false}; + bool return_blocks{false}; // indicates whether dir_blocks should be populated + std::unordered_map dir_blocks; // for use in bucket removal public: D4NFilterBucket(std::unique_ptr _next, D4NFilterDriver* _filter) : @@ -123,9 +125,12 @@ class D4NFilterBucket : public FilterBucket { virtual std::unique_ptr get_object(const rgw_obj_key& key) override; virtual int list(const DoutPrefixProvider* dpp, ListParams& params, int max, ListResults& results, optional_yield y) override; + virtual int remove(const DoutPrefixProvider* dpp, bool delete_children, + optional_yield y) override; virtual int create(const DoutPrefixProvider* dpp, const CreateParams& params, optional_yield y) override; + virtual int check_empty(const DoutPrefixProvider* dpp, optional_yield y) override; virtual std::unique_ptr get_multipart_upload( const std::string& oid, std::optional upload_id=std::nullopt, diff --git a/src/test/rgw/test_d4n_filter.cc b/src/test/rgw/test_d4n_filter.cc index 9523243c783..f78be728dfe 100755 --- a/src/test/rgw/test_d4n_filter.cc +++ b/src/test/rgw/test_d4n_filter.cc @@ -97,6 +97,7 @@ class D4NFilterFixture: public ::testing::Test { fs::remove_all(TEST_DIR); fs::create_directory(TEST_DIR); + env->cct->_conf->rgw_redis_connection_pool_size = 1; env->cct->_conf->rgw_d4n_cache_cleaning_interval = 1; rgw_user uid{"test_tenant", "test_filter"}; owner = uid; @@ -3109,7 +3110,7 @@ TEST_F(D4NFilterFixture, DeleteVersionedObjectWrite) conn->async_exec(req, resp, yield[ec]); ASSERT_EQ((bool)ec, false); - //EXPECT_EQ((int)std::get<0>(resp).value(), 0); // TODO: Object entry is not deleted + EXPECT_EQ((int)std::get<0>(resp).value(), 0); EXPECT_EQ((int)std::get<1>(resp).value(), 0); EXPECT_EQ((int)std::get<2>(resp).value(), 0); EXPECT_EQ((int)std::get<3>(resp).value(), 0); @@ -3526,6 +3527,294 @@ TEST_F(D4NFilterFixture, ListObjectVersions) io.run(); } +TEST_F(D4NFilterFixture, BucketRemoveBeforeCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "PutObjectWrite"; + const std::string testName_1 = "PutObjectWrite_1"; + const std::string testName_2 = "PutObjectWrite_2"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string instance; + + net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &instance] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_object(testName, yield); + put_version_enabled_object(testName_1, instance, yield); + put_version_suspended_object(testName_2, yield); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY); + std::string version, version_1, version_2; + + { + boost::system::error_code ec; + request req; + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0", "version"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0", "version"); + + response resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + version = std::get<0>(resp).value(); + version_1 = std::get<1>(resp).value(); + version_2 = std::get<2>(resp).value(); + } + + EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0"); + req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0"); + req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName_2 + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_" + std::to_string(ofs)); + + response resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value(), 0); + EXPECT_EQ(std::get<1>(resp).value(), 0); + EXPECT_EQ(std::get<2>(resp).value(), 0); + EXPECT_EQ(std::get<3>(resp).value(), 0); + EXPECT_EQ(std::get<4>(resp).value(), 0); + EXPECT_EQ(std::get<5>(resp).value(), 0); + EXPECT_EQ(std::get<6>(resp).value(), 0); + EXPECT_EQ(std::get<7>(resp).value(), 0); + EXPECT_EQ(std::get<8>(resp).value(), 0); + EXPECT_EQ(std::get<9>(resp).value(), 0); + EXPECT_EQ(std::get<10>(resp).value(), 0); + EXPECT_EQ(std::get<11>(resp).value(), 0); + EXPECT_EQ(std::get<12>(resp).value(), 0); + } + + std::string attr_val; + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_1 + "/" + version_1; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_2 + "/" + version_2; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0); + + conn->cancel(); + driver->shutdown(); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, BucketRemoveAfterCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0; + const std::string testName = "PutObjectWrite"; + const std::string testName_1 = "PutObjectWrite_1"; + const std::string testName_2 = "PutObjectWrite_2"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string instance; + std::string version, version_1, version_2; + + net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &instance, &version, &version_1, &version_2] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_object(testName, yield); + put_version_enabled_object(testName_1, instance, yield); + put_version_suspended_object(testName_2, yield); + + { + boost::system::error_code ec; + request req; + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0", "version"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0", "version"); + + response resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + version = std::get<0>(resp).value(); + version_1 = std::get<1>(resp).value(); + version_2 = std::get<2>(resp).value(); + } + + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &testName_1, &testName_2, &bucketName, &version, &version_1, &version_2] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY); + EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_1 + "_0_0"); + req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_0"); + req.push("EXISTS", bucketName + "__:null" + TEST_OBJ + testName_2 + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:" + version_1 + TEST_OBJ + testName_1 + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName_2 + "_0_" + std::to_string(ofs)); + + response resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value(), 0); + EXPECT_EQ(std::get<1>(resp).value(), 0); + EXPECT_EQ(std::get<2>(resp).value(), 0); + EXPECT_EQ(std::get<3>(resp).value(), 0); + EXPECT_EQ(std::get<4>(resp).value(), 0); + EXPECT_EQ(std::get<5>(resp).value(), 0); + EXPECT_EQ(std::get<6>(resp).value(), 0); + EXPECT_EQ(std::get<7>(resp).value(), 0); + EXPECT_EQ(std::get<8>(resp).value(), 0); + EXPECT_EQ(std::get<9>(resp).value(), 0); + EXPECT_EQ(std::get<10>(resp).value(), 0); + EXPECT_EQ(std::get<11>(resp).value(), 0); + EXPECT_EQ(std::get<12>(resp).value(), 0); + } + + /* Eviction will eventually lazily delete leftover cache blocks, so simply ensure + * they are no longer dirty */ + std::string attr_val; + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_1 + "/" + version_1; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName_2 + "/" + version_2; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0); + + conn->cancel(); + driver->shutdown(); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, BucketRemoveDeleteMarker) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "PutObjectWrite"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string instance_1, instance_2, instance_3; + + net::spawn(io, [this, &testName, &bucketName, &instance_1, &instance_2, &instance_3] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_version_enabled_object(testName, instance_1, yield); + std::unique_ptr del_op = objEnabled->get_delete_op(); + objEnabled->set_instance(""); + EXPECT_EQ(del_op->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), -ENOTEMPTY); + std::string version, delete_marker; + + { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1"); + + response< std::vector > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + delete_marker = std::get<0>(resp).value()[0]; + version = std::get<0>(resp).value()[1]; + } + + EXPECT_EQ(testBucket->remove(env->dpp, true, yield), 0); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + version + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + delete_marker + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + version + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + + response resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value(), 0); + EXPECT_EQ(std::get<1>(resp).value(), 0); + EXPECT_EQ(std::get<2>(resp).value(), 0); + EXPECT_EQ(std::get<3>(resp).value(), 0); + EXPECT_EQ(std::get<4>(resp).value(), 0); + EXPECT_EQ(std::get<5>(resp).value(), 0); + } + + std::string attr_val; + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_INVALID, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + delete_marker; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DELETE_MARKER, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + EXPECT_EQ(testBucket->check_empty(env->dpp, yield), 0); + conn->cancel(); + driver->shutdown(); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + int main(int argc, char *argv[]) { ::testing::InitGoogleTest(&argc, argv); -- 2.47.3