From 142ffde01cf8436b8062ddcd34d2d0309bfe5f4a Mon Sep 17 00:00:00 2001 From: Samarah Date: Thu, 30 Jan 2025 16:50:06 +0000 Subject: [PATCH] test/d4n: Add D4N filter test program Signed-off-by: Samarah --- src/test/rgw/CMakeLists.txt | 15 + src/test/rgw/test_d4n_filter.cc | 3488 +++++++++++++++++++++++++++++++ 2 files changed, 3503 insertions(+) create mode 100755 src/test/rgw/test_d4n_filter.cc diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index f98fedd36c0..2c6c45ad99c 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -21,6 +21,21 @@ if(WITH_JAEGER) endif() if(WITH_RADOSGW_D4N) +add_executable(ceph_test_rgw_d4n_filter + test_d4n_filter.cc + ) +target_include_directories(ceph_test_rgw_d4n_filter + SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw/driver/d4n") +target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE + rgw_common + librados + ceph-common + ${rgw_libs} + ${UNITTEST_LIBS} + ${EXTRALIBS} + ) +install(TARGETS ceph_test_rgw_d4n_filter DESTINATION ${CMAKE_INSTALL_BINDIR}) + add_executable(ceph_test_rgw_d4n_directory test_d4n_directory.cc ) diff --git a/src/test/rgw/test_d4n_filter.cc b/src/test/rgw/test_d4n_filter.cc new file mode 100755 index 00000000000..84de5141cca --- /dev/null +++ b/src/test/rgw/test_d4n_filter.cc @@ -0,0 +1,3488 @@ +#include +#include +#include + +#include "common/async/context_pool.h" + +#include +#include +#include "gtest/gtest.h" +#include "common/ceph_argparse.h" +#include "rgw_auth_registry.h" +#include "rgw_aio_throttle.h" +#include "rgw_sal.h" +#include "rgw_sal_store.h" +#include "driver/dbstore/common/dbstore.h" +#include "rgw_sal_d4n.h" +#include "rgw_sal_filter.h" + +#define dout_subsys ceph_subsys_rgw + +const static std::string TEST_DIR = "d4n_filter_tests"; +const static std::string CACHE_DIR = "/tmp/rgw_d4n_datacache"; +const static std::string TEST_BUCKET = "test_bucket_"; +const static std::string TEST_OBJ = "test_object_"; +uint64_t ofs; + +extern "C" { +extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, boost::asio::io_context& io_context); +} + +namespace fs = std::filesystem; +namespace net = boost::asio; +using boost::redis::config; +using boost::redis::connection; +using boost::redis::request; +using boost::redis::response; + +std::string getTestDir() { + auto test_dir = fs::temp_directory_path() / TEST_DIR; + return test_dir.string(); +} + +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + +class Environment* env; + +class Environment : public ::testing::Environment { + public: + Environment() {} + + virtual ~Environment() {} + + void SetUp() override { + std::vector args; + cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + env->cct.get()->_conf.set_val_or_die("dbstore_db_dir", getTestDir()); + common_init_finish(g_ceph_context); + + dpp = new DoutPrefix(cct->get(), dout_subsys, "D4N Object Directory Test: "); + + redisHost = cct->_conf->rgw_d4n_address; + } + + virtual void TearDown() { + delete dpp; + fs::remove_all(TEST_DIR); + } + + std::string redisHost; + boost::intrusive_ptr cct; + DoutPrefixProvider* dpp; +}; + +class Read_CB : public RGWGetDataCB +{ +public: + bufferlist *save_bl; + explicit Read_CB(bufferlist *_bl) : save_bl(_bl) {} + ~Read_CB() override {} + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + save_bl->append(bl); + return 0; + } +}; + +class D4NFilterFixture: public ::testing::Test { + protected: + virtual void SetUp() { + fs::current_path(fs::temp_directory_path()); + fs::remove_all(TEST_DIR); + fs::create_directory(TEST_DIR); + + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 1; + rgw_user uid{"test_tenant", "test_filter"}; + owner = uid; + acl_owner.id = owner; + + conn = new connection{net::make_strand(io)}; + ASSERT_NE(conn, nullptr); + + /* Run fixture's connection */ + config conf; + conf.addr.host = env->redisHost.substr(0, env->redisHost.find(":")); + conf.addr.port = env->redisHost.substr(env->redisHost.find(":") + 1, env->redisHost.length()); + + conn->async_run(conf, {}, net::detached); + + const rgw::SiteConfig site_config; + DriverManager::Config cfg = DriverManager::get_config(true, g_ceph_context); + cfg.store_name = "dbstore"; + cfg.filter_name = "d4n"; + + auto filterDriver = DriverManager::get_raw_storage(env->dpp, g_ceph_context, + cfg, io, site_config); + + rgw::sal::Driver* next = filterDriver; + driver = newD4NFilter(next, io); + d4nFilter = dynamic_cast(driver); + + /* Reset Redis state */ + net::spawn(io, [this] (net::yield_context yield) { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + }, rethrow); + } + + virtual void TearDown() { + delete conn; + } + + void init_driver(net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + driver->initialize(env->cct.get(), env->dpp); + + ASSERT_NE(driver, nullptr); + } + + void create_user(net::yield_context yield) { + rgw_user u("test_tenant", "test_user", "ns"); + + testUser = driver->get_user(u); + testUser->get_info().user_id = u; + + ASSERT_EQ(testUser->store_user(env->dpp, optional_yield{yield}, false), 0); + } + + void create_bucket(std::string name, net::yield_context yield) { + rgw::sal::Bucket::CreateParams createParams; + rgw_bucket b; + init_bucket(&b, "test_tenant", "test_name", "test_data_pool", "test_index_pool", "test_marker", "test_id"); + + EXPECT_EQ(driver->load_bucket(env->dpp, b, &testBucket, optional_yield{yield}), -2); + ASSERT_EQ(testBucket->create(env->dpp, createParams, optional_yield{yield}), 0); + testBucket->get_info().bucket.bucket_id = "test_bucket_" + name; + } + + void put_object(std::string name, net::yield_context yield) { + std::string object_name = "test_object_" + name; + obj = testBucket->get_object(rgw_obj_key(object_name)); + ASSERT_NE(obj.get(), nullptr); + obj.get()->set_obj_size(9); + + testWriter = driver->get_atomic_writer(env->dpp, + optional_yield{yield}, + obj.get(), + acl_owner, + nullptr, + 0, + "test_filter"); + + const req_context rctx{env->dpp, optional_yield{yield}, nullptr}; + ceph::real_time mtime; + + buffer::list bl; + bl.append("test_version\0", 13); + /* DBStore does not provide the RGW_ATTR_ID_TAG attr, so it is being manually written + here since D4N cache backend versioning relies on it. */ + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + bl.append("test_etag"); + attrs.insert({RGW_ATTR_ETAG, std::move(bl)}); + + bl.append("test data", 9); + ASSERT_EQ(testWriter->prepare(optional_yield{yield}), 0); + ASSERT_EQ(testWriter->process(std::move(bl), 0), 0); + ASSERT_EQ(testWriter->complete(ofs, etag, + &mtime, real_time(), + attrs, std::nullopt, + real_time(), + nullptr, nullptr, nullptr, + nullptr, nullptr, rctx, 0), 0); + } + + void put_version_enabled_object(std::string name, std::string& instance, net::yield_context yield) { + testBucket->get_info().flags |= BUCKET_VERSIONED; + std::string object_name = "test_object_" + name; + objEnabled = testBucket->get_object(rgw_obj_key(object_name)); + ASSERT_NE(objEnabled.get(), nullptr); + objEnabled.get()->set_obj_size(9); + objEnabled->gen_rand_obj_instance_name(); + instance = objEnabled->get_instance(); + + testWriter = driver->get_atomic_writer(env->dpp, + optional_yield{yield}, + objEnabled.get(), + acl_owner, + nullptr, + 0, + "test_filter"); + + const req_context rctx{env->dpp, optional_yield{yield}, nullptr}; + ceph::real_time mtime; + + buffer::list bl; + bl.append("test_version\0", 13); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + bl.append("test_etag"); + attrs.insert({RGW_ATTR_ETAG, std::move(bl)}); + + bl.append("test data", 9); + ASSERT_EQ(testWriter->prepare(optional_yield{yield}), 0); + ASSERT_EQ(testWriter->process(std::move(bl), 0), 0); + ASSERT_EQ(testWriter->complete(ofs, etag, + &mtime, real_time(), + attrs, std::nullopt, + real_time(), + nullptr, nullptr, nullptr, + nullptr, nullptr, rctx, 0), 0); + } + + void put_version_suspended_object(std::string name, net::yield_context yield) { + testBucket->get_info().flags |= BUCKET_VERSIONS_SUSPENDED; + std::string object_name = "test_object_" + name; + objSuspended = testBucket->get_object(rgw_obj_key(object_name)); + ASSERT_NE(objSuspended.get(), nullptr); + objSuspended.get()->set_obj_size(9); + + testWriter = driver->get_atomic_writer(env->dpp, + optional_yield{yield}, + objSuspended.get(), + acl_owner, + nullptr, + 0, + "test_filter"); + + const req_context rctx{env->dpp, optional_yield{yield}, nullptr}; + ceph::real_time mtime; + + buffer::list bl; + bl.append("test_version\0", 13); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + bl.append("test_etag"); + attrs.insert({RGW_ATTR_ETAG, std::move(bl)}); + + bl.append("test data", 9); + ASSERT_EQ(testWriter->prepare(optional_yield{yield}), 0); + ASSERT_EQ(testWriter->process(std::move(bl), 0), 0); + ASSERT_EQ(testWriter->complete(ofs, etag, + &mtime, real_time(), + attrs, std::nullopt, + real_time(), + nullptr, nullptr, nullptr, + nullptr, nullptr, rctx, 0), 0); + } + + + rgw_owner owner; + ACLOwner acl_owner; + size_t ofs = 9; + std::string etag = "test_etag"; + + net::io_context io; + connection* conn; + + rgw::sal::Driver* driver; + rgw::sal::D4NFilterDriver* d4nFilter; + std::unique_ptr obj; + std::unique_ptr objEnabled; + std::unique_ptr objSuspended; + std::unique_ptr testUser = nullptr; + std::unique_ptr testBucket = nullptr; + std::unique_ptr testWriter = nullptr; +}; + +class DriverDestructor { + rgw::sal::Driver* driver; + +public: + explicit DriverDestructor(rgw::sal::D4NFilterDriver* _s) : driver(_s) {} + ~DriverDestructor() { + DriverManager::close_storage(driver); + } +}; + +// Read cache tests, unversioned +TEST_F(D4NFilterFixture, PutObjectRead) +{ + const std::string testName = "PutObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + // Check directory values + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGET", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0", "version"); // To check cache block(s) + + response< int, int, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ(std::get<2>(resp).value().size(), 14); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + std::string version = std::get<4>(resp).value(); + std::error_code err; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, GetObjectRead) +{ + const std::string testName = "GetObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + bufferlist bl; + Read_CB cb(&bl); + std::unique_ptr read_op(obj->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); // Data block entry + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0", "version"); // To check cache contents + + response< int, int, int, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + EXPECT_EQ(std::get<4>(resp).value().size(), 14); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + + std::string version = std::get<6>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + // Check cache contents + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + ASSERT_EQ(testData.empty(), false); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cb.save_bl->to_str()); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyNoneObjectRead) +{ + const std::string testName = "CopyNoneObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + + response< int, int, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ(std::get<2>(resp).value().size(), 14); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + std::error_code err; + std::string version = "test_version"; // Expected version for copy object + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + + /* TODO: DBStore has no copy_object implementation, so the below code will fail if uncommented. Once it is implemented, the following + should be uncommented and added to the other copy_object unit tests for the read cache. */ + /* + // Read copy object + Read_CB cb(&bl); + std::unique_ptr read_op(destObj->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + + // Check cache contents for copy object + std::string testData; + std::ifstream testFile; + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + destName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + destName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + ASSERT_EQ(testData.empty(), false); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches copy object's cached data + EXPECT_EQ(testData, cb.save_bl->to_str()); + */ + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyMergeObjectRead) +{ + const std::string testName = "CopyMergeObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + + response< int, int, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ(std::get<2>(resp).value().size(), 14); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + std::error_code err; + std::string version = "dest_object_version"; // Expected version for copy object + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyReplaceObjectRead) +{ + const std::string testName = "CopyReplaceObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + + response< int, int, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ(std::get<2>(resp).value().size(), 14); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + std::error_code err; + std::string version = "dest_object_version"; // Expected version for copy object + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, DeleteObjectRead) +{ + const std::string testName = "DeleteObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + std::unique_ptr read_op(obj->get_read_op()); + ASSERT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + ASSERT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0); + + std::string version; + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + + version = std::get<3>(resp).value(); + } + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + std::unique_ptr del_op = obj->get_delete_op(); + EXPECT_EQ(del_op->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + + response< int, int, int > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 0); + EXPECT_EQ((int)std::get<1>(resp).value(), 0); + EXPECT_EQ((int)std::get<2>(resp).value(), 0); + } + + /* TODO: Eviction cycle to delete cache blocks + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), false); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), false); + */ + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +// Read cache tests, versioned +TEST_F(D4NFilterFixture, PutVersionedObjectRead) +{ + const std::string testName = "PutVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + + response< int, int, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ(std::get<2>(resp).value().size(), 14); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, GetVersionedObjectRead) +{ + const std::string testName = "GetVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + // For version enabled object + bufferlist blEnabled; + Read_CB cbEnabled(&blEnabled); + std::unique_ptr read_op_enabled(objEnabled->get_read_op()); + EXPECT_EQ(read_op_enabled->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, ofs, &cbEnabled, optional_yield{yield}), 0); + + // For version suspended object + bufferlist blSuspended; + Read_CB cbSuspended(&blSuspended); + std::unique_ptr read_op_suspended(objSuspended->get_read_op()); + EXPECT_EQ(read_op_suspended->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, ofs, &cbSuspended, optional_yield{yield}), 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ(std::get<3>(resp).value().size(), 14); + + std::string version = std::get<4>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + ASSERT_EQ(testData.empty(), false); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cbEnabled.save_bl->to_str()); + + testFile.close(); + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + ASSERT_EQ(testData.empty(), false); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cbSuspended.save_bl->to_str()); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyNoneVersionedObjectRead) +{ + const std::string testName = "CopyNoneVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + } + + put_version_suspended_object(testName, yield); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objSuspended->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/test_version", err), true); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyMergeVersionedObjectRead) +{ + const std::string testName = "CopyMergeVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + } + + put_version_suspended_object(testName, yield); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/dest_object_version", err), true); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyReplaceVersionedObjectRead) +{ + const std::string testName = "CopyReplaceVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + bl.append("dest_object_version\0", 20); + rgw::sal::Attrs attrs{{RGW_ATTR_ID_TAG, std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + } + + put_version_suspended_object(testName, yield); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + + response< int, std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ(std::get<1>(resp).value().size(), 14); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/dest_object_version", err), true); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, DeleteVersionedObjectRead) +{ + const std::string testName = "DeleteVersionedObjectRead"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + std::unique_ptr read_op_enabled(objEnabled->get_read_op()); + EXPECT_EQ(read_op_enabled->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0); + + std::unique_ptr read_op_suspended(objSuspended->get_read_op()); + EXPECT_EQ(read_op_suspended->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0); + std::string version; + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + + version = std::get<3>(resp).value(); + } + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + instance, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + // For version enabled object + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + // For version suspended object + std::unique_ptr del_op_suspended = objSuspended->get_delete_op(); + EXPECT_EQ(del_op_suspended->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + + response< int, int, int, int, int > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 0); + EXPECT_EQ((int)std::get<1>(resp).value(), 0); + EXPECT_EQ((int)std::get<2>(resp).value(), 0); + EXPECT_EQ((int)std::get<3>(resp).value(), 0); + EXPECT_EQ((int)std::get<4>(resp).value(), 0); + } + + /* TODO: Eviction cycle to delete cache blocks + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + instance, err), false); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), false); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), false); + oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), false); + */ + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +// Write cache tests, unversioned +TEST_F(D4NFilterFixture, PutObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "PutObjectWrite"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string version; + + net::spawn(io, [this, &testName, &bucketName, &version] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + // Overwrite testName because the cleaning method derives the bucket name differently from other ops. + testBucket->get_info().bucket.bucket_id = bucketName; + put_object(testName, yield); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); // obj dir entry + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1"); + req.push("HGETALL", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "dirty"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + EXPECT_EQ(std::get<8>(resp).value(), "1"); + + version = std::get<9>(resp).value(); + } + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(fs::exists(location, err), true); + + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + testFile.open(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + ASSERT_EQ(testData.empty(), false); + EXPECT_EQ(testData, "test data"); + testFile.close(); + + // The cleaning method must use a yield that is not out of scope, so set back to null_yield before exiting this coroutine + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &version] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + { + boost::system::error_code ec; + request req; + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "dirty"); + + response< std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value(), "0"); + } + + // Ensure object is written to backend + auto next = dynamic_cast(obj.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + + std::string oid = version + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, GetObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "GetObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + bufferlist bl; + Read_CB cb(&bl); + std::unique_ptr read_op(obj->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + TEST_OBJ + testName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cb.save_bl->c_str()); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyNoneObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyNoneObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + obj->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + destName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure attr is not modified + rgw_obj copyObj = destObj->get_obj(); + ASSERT_EQ(destObj->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObj->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "test_value"); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyMergeObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyMergeObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + obj->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + destName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure attr is merged + rgw_obj copyObj = destObj->get_obj(); + ASSERT_EQ(destObj->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObj->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyReplaceObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyReplaceObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + obj->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + std::string destName = "dest_object"; + std::unique_ptr destObj = testBucket->get_object(rgw_obj_key(destName)); + EXPECT_NE(destObj.get(), nullptr); + + int ret = obj->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObj.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + destName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid, err), true); + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure attr is replaced + rgw_obj copyObj = destObj->get_obj(); + ASSERT_EQ(destObj->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObj->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.close(); + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, DeleteObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "DeleteObjectWrite"; + std::string version; + + net::spawn(io, [this, &testName, &version] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + put_object(testName, yield); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + + version = std::get<4>(resp).value(); + } + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + std::unique_ptr del_op = obj->get_delete_op(); + EXPECT_EQ(del_op->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &version] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + + response< int, int, int, int > resp; + + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 0); + EXPECT_EQ((int)std::get<1>(resp).value(), 0); + EXPECT_EQ((int)std::get<2>(resp).value(), 0); + EXPECT_EQ((int)std::get<3>(resp).value(), 0); + } + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), false); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid, err), false); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +// Write cache tests, versioned +TEST_F(D4NFilterFixture, PutVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "PutVersionedObjectWrite"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string version, instance; + + net::spawn(io, [this, &testName, &bucketName, &version, &instance] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + { + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1"); + req.push("HGETALL", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGETALL", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGETALL", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGET", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0", "dirty"); + req.push("HGET", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0", "dirty"); + req.push("HGET", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::map, + std::map, + std::string, std::string, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ((int)std::get<4>(resp).value(), 1); + EXPECT_EQ((int)std::get<5>(resp).value(), 1); + EXPECT_EQ(std::get<6>(resp).value()[0], "null"); + EXPECT_EQ(std::get<6>(resp).value()[1], instance); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + EXPECT_EQ(std::get<8>(resp).value().size(), 14); + EXPECT_EQ(std::get<9>(resp).value().size(), 14); + EXPECT_EQ(std::get<10>(resp).value().size(), 14); + EXPECT_EQ(std::get<11>(resp).value().size(), 14); + EXPECT_EQ(std::get<12>(resp).value(), "1"); + EXPECT_EQ(std::get<13>(resp).value(), "1"); + + version = std::get<14>(resp).value(); + } + + std::error_code err; + std::string testData; + std::ifstream testFile; + std::string attr_val; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(fs::exists(location, err), true); + + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + testFile.open(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + testFile.close(); + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version, err), true); + oid = version + "#0#" + std::to_string(ofs); + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(fs::exists(location, err), true); + + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + testFile.open(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + testFile.close(); + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &version, &instance] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + { + boost::system::error_code ec; + request req; + req.push("HGET", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0", "dirty"); + req.push("HGET", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0", "dirty"); + + response< std::string, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value(), "0"); + EXPECT_EQ(std::get<1>(resp).value(), "0"); + } + + // Ensure object with versioning enabled is written to backend + { + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + } + + // Ensure object with versioning suspended is written to backend + { + auto next = dynamic_cast(objSuspended.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + } + + std::string attr_val; + std::string oid = instance + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + oid = version + "#0#" + std::to_string(ofs); + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, GetVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "GetVersionedObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + { + bufferlist bl; + Read_CB cb(&bl); + std::unique_ptr read_op_enabled(objEnabled->get_read_op()); + EXPECT_EQ(read_op_enabled->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + TEST_OBJ + testName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], instance); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(fs::exists(location, err), true); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cb.save_bl->c_str()); + + testFile.close(); + } + + // Remove object entry so the sorted set order will not interfere with the checks for objSuspended + { + boost::system::error_code ec; + request req; + req.push("DEL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + } + + put_version_suspended_object(testName, yield); + + { + bufferlist bl; + Read_CB cb(&bl); + std::unique_ptr read_op_suspended(objSuspended->get_read_op()); + EXPECT_EQ(read_op_suspended->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + TEST_OBJ + testName, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + instance, err), true); + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + std::string location = CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid; + EXPECT_EQ(fs::exists(location, err), true); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + TEST_OBJ + testName + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + // Ensure data returned from GET op matches cached data + EXPECT_EQ(testData, cb.save_bl->c_str()); + + testFile.close(); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyNoneVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyNoneVersionedObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + objEnabled->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameEnabled, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], instance); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid, err), true); + + // Ensure attr is not modified + rgw_obj copyObj = destObjEnabled->get_obj(); + ASSERT_EQ(destObjEnabled->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjEnabled->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "test_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + put_version_suspended_object(testName, yield); + bl.append("test_value", 10); + rgw::sal::Attrs testAttrs{{"user.rgw.test_attr", std::move(bl)}}; + objSuspended->set_obj_attrs(env->dpp, &testAttrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objSuspended->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_NONE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameSuspended, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid, err), true); + + // Ensure attr is not modified + rgw_obj copyObj = destObjSuspended->get_obj(); + ASSERT_EQ(destObjSuspended->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjSuspended->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "test_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyMergeVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyMergeVersionedObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + objEnabled->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameEnabled, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], instance); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid, err), true); + + // Ensure attr is merged + rgw_obj copyObj = destObjEnabled->get_obj(); + ASSERT_EQ(destObjEnabled->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjEnabled->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + put_version_suspended_object(testName, yield); + bl.append("test_value", 10); + rgw::sal::Attrs testAttrs{{"user.rgw.test_attr", std::move(bl)}}; + objSuspended->set_obj_attrs(env->dpp, &testAttrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objSuspended->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_MERGE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameSuspended, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid, err), true); + + // Ensure attr is merged + rgw_obj copyObj = destObjSuspended->get_obj(); + ASSERT_EQ(destObjSuspended->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjSuspended->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, CopyReplaceVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + const std::string testName = "CopyReplaceVersionedObjectWrite"; + + net::spawn(io, [this, &testName] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + std::string instance; + put_version_enabled_object(testName, instance, yield); + + RGWEnv rgw_env; + req_info info(env->cct.get(), &rgw_env); + rgw_zone_id zone; + rgw_placement_rule placement; + ceph::real_time mtime; + + buffer::list bl; + { + bl.append("test_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + objEnabled->set_obj_attrs(env->dpp, &attrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + } + bl.append("copy_value", 10); + rgw::sal::Attrs attrs{{"user.rgw.test_attr", std::move(bl)}}; + + std::string tag; + + { + std::string destNameEnabled = "dest_object_enabled"; + std::unique_ptr destObjEnabled = testBucket->get_object(rgw_obj_key(destNameEnabled)); + EXPECT_NE(destObjEnabled.get(), nullptr); + destObjEnabled->gen_rand_obj_instance_name(); + instance = destObjEnabled->get_instance(); + + int ret = objEnabled->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjEnabled.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameEnabled, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:" + instance + "_" + destNameEnabled + "_0_" + std::to_string(ofs)); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], instance); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid, err), true); + + // Ensure attr is replaced + rgw_obj copyObj = destObjEnabled->get_obj(); + ASSERT_EQ(destObjEnabled->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjEnabled->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameEnabled + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + put_version_suspended_object(testName, yield); + bl.append("test_value", 10); + rgw::sal::Attrs testAttrs{{"user.rgw.test_attr", std::move(bl)}}; + objSuspended->set_obj_attrs(env->dpp, &testAttrs, nullptr, optional_yield{yield}, rgw::sal::FLAG_LOG_OP); + + { + std::string destNameSuspended = "dest_object_suspended"; + std::unique_ptr destObjSuspended = testBucket->get_object(rgw_obj_key(destNameSuspended)); + EXPECT_NE(destObjSuspended.get(), nullptr); + + int ret = objSuspended->copy_object(acl_owner, + std::get(owner), + &info, + zone, + destObjSuspended.get(), + testBucket.get(), + testBucket.get(), + placement, + &mtime, + &mtime, + nullptr, + nullptr, + false, + nullptr, + nullptr, + rgw::sal::ATTRSMOD_REPLACE, + false, + attrs, + RGWObjCategory::Main, + 0, + boost::none, + nullptr, + &tag, + &tag, + nullptr, + nullptr, + env->dpp, + optional_yield({yield})); + EXPECT_EQ(ret, 0); + + boost::system::error_code ec; + request req; + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("EXISTS", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("ZREVRANGE", TEST_BUCKET + testName + "_" + destNameSuspended, "0", "-1"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0"); + req.push("HGETALL", TEST_BUCKET + testName + "_" + destNameSuspended + "_0_" + std::to_string(ofs)); + req.push("HGET", TEST_BUCKET + testName + "__:null_" + destNameSuspended + "_0_0", "version"); + + response< int, int, int, int, + std::vector, + std::map, + std::map, + std::map, + std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ(std::get<4>(resp).value()[0], "null"); + EXPECT_EQ(std::get<5>(resp).value().size(), 14); + EXPECT_EQ(std::get<6>(resp).value().size(), 14); + EXPECT_EQ(std::get<7>(resp).value().size(), 14); + + std::string version = std::get<8>(resp).value(); + std::error_code err; + std::string testData; + std::ifstream testFile; + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + version, err), true); + std::string oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid, err), true); + + // Ensure attr is replaced + rgw_obj copyObj = destObjSuspended->get_obj(); + ASSERT_EQ(destObjSuspended->get_obj_attrs(optional_yield{yield}, env->dpp, ©Obj), 0); + rgw::sal::Attrs copyAttrs = destObjSuspended->get_attrs(); + buffer::list val = copyAttrs["user.rgw.test_attr"]; + EXPECT_EQ(val.to_str(), "copy_value"); + + testFile.open(CACHE_DIR + "/" + TEST_BUCKET + testName + "/" + destNameSuspended + "/" + oid); + ASSERT_EQ(testFile.is_open(), true); + getline(testFile, testData); + EXPECT_EQ(testData, "test data"); + + testFile.close(); + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, DeleteVersionedObjectWrite) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "DeleteVersionedObjectRead"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string version, instance; + + net::spawn(io, [this, &testName, &bucketName, &version, &instance] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("HGET", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, int, int, int, + int, int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + EXPECT_EQ((int)std::get<1>(resp).value(), 1); + EXPECT_EQ((int)std::get<2>(resp).value(), 1); + EXPECT_EQ((int)std::get<3>(resp).value(), 1); + EXPECT_EQ((int)std::get<4>(resp).value(), 1); + EXPECT_EQ((int)std::get<5>(resp).value(), 1); + + version = std::get<6>(resp).value(); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instance, err), true); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version, err), true); + oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + // For version enabled object + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + // For version suspended object + std::unique_ptr del_op_suspended = objSuspended->get_delete_op(); + EXPECT_EQ(del_op_suspended->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &version, &instance] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + boost::system::error_code ec; + request req; + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "__:" + instance + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + req.push("EXISTS", bucketName + "__:null_" + TEST_OBJ + testName + "_0_0"); + req.push("EXISTS", bucketName + "_" + TEST_OBJ + testName + "_0_" + std::to_string(ofs)); + + response< int, int, int, int, + int, int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + //EXPECT_EQ((int)std::get<0>(resp).value(), 0); // TODO: Object entry is not deleted + EXPECT_EQ((int)std::get<1>(resp).value(), 0); + EXPECT_EQ((int)std::get<2>(resp).value(), 0); + EXPECT_EQ((int)std::get<3>(resp).value(), 0); + EXPECT_EQ((int)std::get<4>(resp).value(), 0); + EXPECT_EQ((int)std::get<5>(resp).value(), 0); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instance, err), false); + std::string oid = instance + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), false); + + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + version, err), false); + oid = version + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), false); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +// Additional workflow tests +TEST_F(D4NFilterFixture, SimpleDeleteBeforeCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "SimpleDeleteBeforeCleaning"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::vector instances; + std::string deleteMarker, location; + + net::spawn(io, [this, &testName, &bucketName, &instances, &deleteMarker, &location] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + + for (int i = 0; i <= 1; ++i) { + std::string instance; + put_version_enabled_object(testName, instance, yield); // Upload two versions + instances.push_back(instance); + } + + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + objEnabled->set_instance(""); // Simple delete + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + boost::system::error_code ec; + request req; + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "deleteMarker"); + req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "version"); + + response< int, std::string > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ((int)std::get<0>(resp).value(), 1); + deleteMarker = std::get<1>(resp).value(); + + // Simple delete's head object in cache + std::error_code err; + location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + deleteMarker; + EXPECT_EQ(fs::exists(location, err), true); + + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "1"); + + /* TODO: The following code allows the DB::Object::Delete::delete_obj op to succeed in the cleaning method by removing the instance value. + However, this causes the wrong head_oid_in_cache key to be generated since it uses the empty version rather than the delete marker. As + a result, the head block for the delete marker does not get cleaned. If this code is not used, then the DBStore delete_obj method returns + -ENOENT, which also prevents the head block from being cleaned. + objEnabled->set_instance(""); + std::string key = url_encode(bucketName, true) + "#" + deleteMarker + "#" + TEST_OBJ + testName; + std::string etag = "test_etag"; + auto creationTime = ceph::real_clock::to_time_t(objEnabled->get_mtime()); + d4nFilter->get_policy_driver()->get_cache_policy()->erase_dirty_object(env->dpp, key, optional_yield{yield}); + d4nFilter->get_policy_driver()->get_cache_policy()->update_dirty_object(env->dpp, key, "", true, objEnabled->get_accounted_size(), creationTime, + std::get(objEnabled->get_bucket()->get_owner()), etag, + objEnabled->get_bucket()->get_name(), objEnabled->get_bucket()->get_bucket_id(), + objEnabled->get_key(), rgw::d4n::REFCOUNT_NOOP, optional_yield{yield});*/ + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &deleteMarker, &location] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + std::unique_ptr read_op(objEnabled->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), -2); // Simple read; should return -ENOENT + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), -2); + + /* TODO: + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0");*/ + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, VersionedDeleteBeforeCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "VersionedDeleteBeforeCleaning"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::vector instances; + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + + for (int i = 0; i <= 1; ++i) { + std::string instance; + put_version_enabled_object(testName, instance, yield); // Upload two versions + instances.push_back(instance); + } + + { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1"); + + response< std::vector > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value().size(), 2); + EXPECT_EQ(std::get<0>(resp).value()[0], instances[1]); + EXPECT_EQ(std::get<0>(resp).value()[1], instances[0]); + } + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instances[1], err), true); + std::string oid = instances[1] + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), true); + + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + objEnabled->set_instance(instances[1]); // Latest version + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", bucketName + "_" + TEST_OBJ + testName, "0", "-1"); + + response< std::vector > resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value().size(), 1); + EXPECT_EQ(std::get<0>(resp).value()[0], instances[0]); + } + + objEnabled->set_instance(""); // Simple get + + std::unique_ptr read_op(objEnabled->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0); + EXPECT_EQ(objEnabled->get_instance(), instances[0]); // Next latest version + + objEnabled->set_instance(instances[0]); + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instances[1], err), false); + std::string oid = instances[1] + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), false); + + objEnabled->set_instance(instances[1]); + + // Make sure deleted instance isn't written to backend + { + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), -2); + } + + objEnabled->set_instance(instances[0]); + + // First instance should still be available in backend + { + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + + bufferlist bl; + Read_CB cb(&bl); + next->set_instance(""); + std::unique_ptr read_op(next->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(next->get_instance(), instances[0]); // Next latest version + } + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, SimpleDeleteAfterCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "SimpleDeleteAfterCleaning"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::vector instances; + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + + for (int i = 0; i <= 1; ++i) { + std::string instance; + put_version_enabled_object(testName, instance, yield); // Upload two versions + instances.push_back(instance); + } + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + objEnabled->set_instance(""); // Simple delete + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + // Retrieve delete marker + std::string deleteMarker = objEnabled->get_instance(); + + // Ensure delete marker head block is written to cache + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + deleteMarker, err), true); + + // Make sure delete marker is written to backend + objEnabled->set_instance(deleteMarker); + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, VersionedDeleteAfterCleaning) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "VersionedDeleteAfterCleaning"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::vector instances; + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + + for (int i = 0; i <= 1; ++i) { + std::string instance; + put_version_enabled_object(testName, instance, yield); // Upload two versions + instances.push_back(instance); + } + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + std::error_code err; + std::string location = CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instances[1]; + EXPECT_EQ(fs::exists(location, err), true); + std::string oid = "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(location + oid, err), true); + + { + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + } + + { + std::string attr_val; + EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location + oid, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0); + EXPECT_EQ(attr_val, "0"); + } + + std::unique_ptr del_op_enabled = objEnabled->get_delete_op(); + objEnabled->set_instance(instances[1]); // Latest version + EXPECT_EQ(del_op_enabled->delete_obj(env->dpp, optional_yield{yield}, rgw::sal::FLAG_LOG_OP), 0); + + // Make sure deleted instance is deleted from backend + { + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), -2); + } + + objEnabled->set_instance(instances[0]); + + // First instance should still be available in backend + { + auto next = dynamic_cast(objEnabled.get())->get_next(); + EXPECT_EQ(next->load_obj_state(env->dpp, optional_yield{yield}), 0); + EXPECT_EQ(next->exists(), 1); + + bufferlist bl; + Read_CB cb(&bl); + next->set_instance(""); + std::unique_ptr read_op(next->get_read_op()); + EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(next->get_instance(), instances[0]); // Next latest version + } + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); + + net::spawn(io, [this, &testName, &bucketName, &instances] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + /* TODO: Cleaning method removes head object, so delete_obj calls after a cleaning cycle will not find the head object and will immediately call the backend's delete_obj, + resulting in the cache block not getting deleted properly + std::error_code err; + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + instances[1], err), false); + std::string oid = instances[1] + "#0#" + std::to_string(ofs); + EXPECT_EQ(fs::exists(CACHE_DIR + "/" + url_encode(bucketName, true) + "/" + TEST_OBJ + testName + "/" + oid, err), false); + */ + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +TEST_F(D4NFilterFixture, ListObjectVersions) +{ + env->cct->_conf->d4n_writecache_enabled = true; + env->cct->_conf->rgw_d4n_cache_cleaning_interval = 0.5; + const std::string testName = "ListObjectVersions"; + const std::string bucketName = "/tmp/d4n_filter_tests/dbstore-default_ns.1"; + std::string instance; + + net::spawn(io, [this, &testName, &bucketName, &instance] (net::yield_context yield) { + init_driver(yield); + create_bucket(testName, yield); + testBucket->get_info().bucket.bucket_id = bucketName; + put_object(testName, yield); + put_version_enabled_object(testName, instance, yield); + put_version_suspended_object(testName, yield); + + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(null_yield); + }, rethrow); + + io.run_for(std::chrono::seconds(2)); // Allow cleaning cycle to complete + + net::spawn(io, [this, &instance] (net::yield_context yield) { + dynamic_cast(d4nFilter->get_policy_driver()->get_cache_policy())->save_y(optional_yield{yield}); + + // Ensure versions are correctly ordered in backend + size_t max = env->dpp->get_cct()->_conf->rgw_list_buckets_max_chunk; + rgw::sal::Bucket::ListParams params; + params.list_versions = true; + rgw::sal::Bucket::ListResults results; + + EXPECT_EQ(testBucket->list(env->dpp, params, max, results, optional_yield{yield}), 0); + EXPECT_EQ(results.objs[0].key.instance, "null"); + EXPECT_EQ(results.objs[1].key.instance, instance); + + conn->cancel(); + testBucket->remove(env->dpp, true, optional_yield{yield}); + DriverDestructor driver_destructor(static_cast(driver)); + }, rethrow); + + io.run(); +} + +int main(int argc, char *argv[]) { + ::testing::InitGoogleTest(&argc, argv); + + env = new Environment(); + ::testing::AddGlobalTestEnvironment(env); + + return RUN_ALL_TESTS(); +} -- 2.39.5