From 50df1e14438b3f7165fd1c6db3c744b35b068e94 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Thu, 17 Jan 2019 13:47:31 +0800 Subject: [PATCH] test: enable asio tests for immutable obj cache controller cleanups for immutable obj cache controller Signed-off-by: Yuan Zhou --- .../immutable_object_cache/CMakeLists.txt | 4 +- .../test_DomainSocket.cc | 89 ++++++++++--------- .../test_multi_session.cc | 32 +++---- .../immutable_object_cache/CacheClient.cc | 31 ++++--- .../immutable_object_cache/CacheClient.h | 11 ++- .../immutable_object_cache/CacheServer.cc | 1 - .../immutable_object_cache/SocketCommon.h | 1 - src/tools/immutable_object_cache/Types.h | 1 - 8 files changed, 87 insertions(+), 83 deletions(-) diff --git a/src/test/immutable_object_cache/CMakeLists.txt b/src/test/immutable_object_cache/CMakeLists.txt index fa2601da49be5..aade49afba88b 100644 --- a/src/test/immutable_object_cache/CMakeLists.txt +++ b/src/test/immutable_object_cache/CMakeLists.txt @@ -3,8 +3,8 @@ add_executable(unittest_ceph_immutable_obj_cache test_main.cc test_SimplePolicy.cc test_sync_file.cc - #test_DomainSocket.cc // TODO - #test_multi_session.cc // TODO + test_DomainSocket.cc + test_multi_session.cc test_object_store.cc test_message.cc ) diff --git a/src/test/immutable_object_cache/test_DomainSocket.cc b/src/test/immutable_object_cache/test_DomainSocket.cc index bbac47044fcbe..944e535e574e6 100644 --- a/src/test/immutable_object_cache/test_DomainSocket.cc +++ b/src/test/immutable_object_cache/test_DomainSocket.cc @@ -29,7 +29,8 @@ public: unordered_set m_hit_entry_set; TestCommunication() - : m_cache_server(nullptr), m_cache_client(nullptr), m_local_path("/tmp/ceph_test_domain_socket"), + : m_cache_server(nullptr), m_cache_client(nullptr), + m_local_path("/tmp/ceph_test_domain_socket"), m_send_request_index(0), m_recv_ack_index(0) {} @@ -40,8 +41,9 @@ public: void SetUp() override { std::remove(m_local_path.c_str()); - m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t xx, std::string yy ){ - handle_request(xx, yy); + m_cache_server = new CacheServer(g_ceph_context, m_local_path, + [this](uint64_t sid, ObjectCacheRequest* req){ + handle_request(sid, req); }); ASSERT_TRUE(m_cache_server != nullptr); srv_thd = new std::thread([this]() {m_cache_server->run();}); @@ -50,7 +52,7 @@ public: ASSERT_TRUE(m_cache_client != nullptr); m_cache_client->run(); - while(true) { + while (true) { if (0 == m_cache_client->connect()) { break; } @@ -75,59 +77,60 @@ public: delete srv_thd; } - void handle_request(uint64_t session_id, std::string msg){ - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); + void handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (io_ctx->type) { + switch (req->m_head.type) { case RBDSC_REGISTER: { - io_ctx->type = RBDSC_REGISTER_REPLY; - m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + req->m_head.type = RBDSC_REGISTER_REPLY; + m_cache_server->send(session_id, req); break; } case RBDSC_READ: { - if (m_hit_entry_set.find(io_ctx->oid) == m_hit_entry_set.end()) { - io_ctx->type = RBDSC_READ_RADOS; + if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) { + req->m_head.type = RBDSC_READ_RADOS; } else { - io_ctx->type = RBDSC_READ_REPLY; + req->m_head.type = RBDSC_READ_REPLY; } - m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + m_cache_server->send(session_id, req); break; } } } - // times: message number - // queue_deqth : imitate message queue depth - // thinking : imitate handing message time - void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) { - m_send_request_index.store(0); - m_recv_ack_index.store(0); - for (uint64_t index = 0; index < times; index++) { - auto ctx = new FunctionContext([this, thinking, times](bool req){ - if (thinking != 0) { - usleep(thinking); // handling message - } - m_recv_ack_index++; - if (m_recv_ack_index == times) { - m_wait_event.signal(); - } - }); - - // simple queue depth - while (m_send_request_index - m_recv_ack_index > queue_depth) { - usleep(1); - } - - m_cache_client->lookup_object("test_pool", "123456", ctx); - m_send_request_index++; - } - m_wait_event.wait(); - } + // times: message number + // queue_depth : imitate message queue depth + // thinking : imitate handing message time + void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) { + m_send_request_index.store(0); + m_recv_ack_index.store(0); + for (uint64_t index = 0; index < times; index++) { + auto ctx = new LambdaGenContext, + ObjectCacheRequest*>([this, thinking, times](ObjectCacheRequest* ack){ + if (thinking != 0) { + usleep(thinking); // handling message + } + m_recv_ack_index++; + if (m_recv_ack_index == times) { + m_wait_event.signal(); + } + }); + + // simple queue depth + while (m_send_request_index - m_recv_ack_index > queue_depth) { + usleep(1); + } + + m_cache_client->lookup_object("test_pool", "123456", ctx); + m_send_request_index++; + } + m_wait_event.wait(); + } bool startup_lookupobject_testing(std::string pool_name, std::string object_id) { bool hit; - auto ctx = new FunctionContext([this, &hit](bool req){ - hit = req; + auto ctx = new LambdaGenContext, + ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){ + hit = ack->m_head.type == RBDSC_READ_REPLY; m_wait_event.signal(); }); m_cache_client->lookup_object(pool_name, object_id, ctx); @@ -148,8 +151,6 @@ TEST_F(TestCommunication, test_pingpong) { ASSERT_TRUE(m_send_request_index == m_recv_ack_index); startup_pingpong_testing(200, 128, 0); ASSERT_TRUE(m_send_request_index == m_recv_ack_index); - startup_pingpong_testing(10000, 512, 0); - ASSERT_TRUE(m_send_request_index == m_recv_ack_index); } TEST_F(TestCommunication, test_lookup_object) { diff --git a/src/test/immutable_object_cache/test_multi_session.cc b/src/test/immutable_object_cache/test_multi_session.cc index af786dd93ecbe..d8cc2ce2391bd 100644 --- a/src/test/immutable_object_cache/test_multi_session.cc +++ b/src/test/immutable_object_cache/test_multi_session.cc @@ -38,8 +38,9 @@ public: void SetUp() override { std::remove(m_local_path.c_str()); - m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t session_id, std::string request ){ - server_handle_request(session_id, request); + m_cache_server = new CacheServer(g_ceph_context, m_local_path, + [this](uint64_t session_id, ObjectCacheRequest* req){ + server_handle_request(session_id, req); }); ASSERT_TRUE(m_cache_server != nullptr); @@ -56,14 +57,14 @@ public: } void TearDown() override { - for(uint64_t i = 0; i < m_session_num; i++) { - if(m_cache_client_vec[i] != nullptr) { + for (uint64_t i = 0; i < m_session_num; i++) { + if (m_cache_client_vec[i] != nullptr) { m_cache_client_vec[i]->close(); delete m_cache_client_vec[i]; } } m_cache_server->stop(); - if(m_cache_server_thread->joinable()) { + if (m_cache_server_thread->joinable()) { m_cache_server_thread->join(); } delete m_cache_server; @@ -84,19 +85,17 @@ public: return cache_client; } - void server_handle_request(uint64_t session_id, std::string request) { - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(request.c_str()); + void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (io_ctx->type) { + switch (req->m_head.type) { case RBDSC_REGISTER: { - - io_ctx->type = RBDSC_REGISTER_REPLY; - m_cache_server->send(session_id, std::string((char*)io_ctx, request.size())); + req->m_head.type = RBDSC_REGISTER_REPLY; + m_cache_server->send(session_id, req); break; } case RBDSC_READ: { - io_ctx->type = RBDSC_READ_REPLY; - m_cache_server->send(session_id, std::string((char*)io_ctx, request.size())); + req->m_head.type = RBDSC_READ_REPLY; + m_cache_server->send(session_id, req); break; } } @@ -118,7 +117,8 @@ public: void test_lookup_object(std::string pool, uint64_t index, uint64_t request_num, bool is_last) { for (uint64_t i = 0; i < request_num; i++) { - auto ctx = new FunctionContext([this](bool ret) { + auto ctx = new LambdaGenContext, + ObjectCacheRequest*>([this](ObjectCacheRequest* ack) { m_recv_ack_index++; }); m_send_request_index++; @@ -126,7 +126,7 @@ public: m_cache_client_vec[index]->lookup_object(pool, "1234", ctx); } - if(is_last) { + if (is_last) { while(m_send_request_index != m_recv_ack_index) { usleep(1); } @@ -141,7 +141,7 @@ TEST_F(TestMultiSession, test_multi_session) { uint64_t test_times = 1000; uint64_t test_session_num = 100; - for(uint64_t i = 0; i <= test_times; i++) { + for (uint64_t i = 0; i <= test_times; i++) { uint64_t random_index = random() % test_session_num; if (m_cache_client_vec[random_index] == nullptr) { test_register_client(random_index); diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 648102d4a0a91..8017afb3289eb 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -13,17 +13,13 @@ namespace ceph { namespace immutable_obj_cache { CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx) - : m_io_service_work(m_io_service), - m_dm_socket(m_io_service), - m_ep(stream_protocol::endpoint(file)), - m_io_thread(nullptr), - m_session_work(false), - m_writing(false), + : cct(ceph_ctx), m_io_service_work(m_io_service), + m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)), + m_io_thread(nullptr), m_session_work(false), m_writing(false), + m_reading(false), m_sequence_id(0), m_lock("ceph::cache::cacheclient::m_lock"), m_map_lock("ceph::cache::cacheclient::m_map_lock"), - m_sequence_id(0), - m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]), - cct(ceph_ctx) + m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]) { // TODO : release these resources. m_use_dedicated_worker = true; @@ -32,7 +28,7 @@ namespace immutable_obj_cache { if(m_use_dedicated_worker) { m_worker = new boost::asio::io_service(); m_worker_io_service_work = new boost::asio::io_service::work(*m_worker); - for(int i = 0; i < m_worker_thread_num; i++) { + for(uint64_t i = 0; i < m_worker_thread_num; i++) { std::thread* thd = new std::thread([this](){m_worker->run();}); m_worker_threads.push_back(thd); } @@ -104,7 +100,16 @@ namespace immutable_obj_cache { return 0; } - void CacheClient::lookup_object(ObjectCacheRequest* req) { + void CacheClient::lookup_object(std::string pool_name, std::string oid, + GenContext* on_finish) { + + ObjectCacheRequest* req = new ObjectCacheRequest(); + req->m_head.version = 0; + req->m_head.reserved = 0; + req->m_head.type = RBDSC_READ; + req->m_data.m_pool_name = pool_name; + req->m_data.m_oid = oid; + req->m_process_msg = on_finish; req->m_head.seq = m_sequence_id++; req->encode(); @@ -151,7 +156,7 @@ namespace immutable_obj_cache { boost::asio::buffer(bl.c_str(), bl.length()), boost::asio::transfer_exactly(bl.length()), [this, bl](const boost::system::error_code& err, size_t cb) { - if(err || cb != bl.length()) { + if (err || cb != bl.length()) { fault(ASIO_ERROR_WRITE, err); return; } @@ -178,6 +183,7 @@ namespace immutable_obj_cache { } } + //TODO(): split into smaller functions void CacheClient::receive_message() { ceph_assert(m_reading.load()); @@ -252,6 +258,7 @@ namespace immutable_obj_cache { { Mutex::Locker locker(m_map_lock); ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); + delete m_seq_to_req[seq_id]; m_seq_to_req.erase(seq_id); if(m_seq_to_req.size() == 0) { m_reading.store(false); diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index 7ecf7c851b498..8cb7f320e32b7 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -32,7 +32,7 @@ public: int stop(); int connect(); - void lookup_object(ObjectCacheRequest* req); + void lookup_object(std::string pool_name, std::string oid, GenContext* on_finish); void send_message(); void try_send(); void fault(const int err_type, const boost::system::error_code& err); @@ -41,29 +41,28 @@ public: int register_client(Context* on_finish); private: + CephContext* cct; boost::asio::io_service m_io_service; boost::asio::io_service::work m_io_service_work; stream_protocol::socket m_dm_socket; - ClientProcessMsg m_client_process_msg; stream_protocol::endpoint m_ep; std::shared_ptr m_io_thread; std::atomic m_session_work; - CephContext* cct; bool m_use_dedicated_worker; - int m_worker_thread_num; + uint64_t m_worker_thread_num; boost::asio::io_service* m_worker; std::vector m_worker_threads; boost::asio::io_service::work* m_worker_io_service_work; - char* m_header_buffer; std::atomic m_writing; std::atomic m_reading; std::atomic m_sequence_id; Mutex m_lock; - bufferlist m_outcoming_bl; Mutex m_map_lock; std::map m_seq_to_req; + bufferlist m_outcoming_bl; + char* m_header_buffer; }; } // namespace immutable_obj_cache diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc index a9f4ba44b5186..e2a1e4549aaa6 100644 --- a/src/tools/immutable_object_cache/CacheServer.cc +++ b/src/tools/immutable_object_cache/CacheServer.cc @@ -86,7 +86,6 @@ void CacheServer::accept() { void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) { ldout(cct, 20) << dendl; - std::cout << "new session arrived....." << std::endl; if (error) { // operation_absort lderr(cct) << "async accept fails : " << error.message() << dendl; diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h index 77f4d184739ca..7c914213d16ff 100644 --- a/src/tools/immutable_object_cache/SocketCommon.h +++ b/src/tools/immutable_object_cache/SocketCommon.h @@ -24,7 +24,6 @@ static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05; class ObjectCacheRequest; typedef std::function ProcessMsg; -typedef std::function ClientProcessMsg; } // namespace immutable_obj_cache } // namespace ceph diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index f4b3c18b2cfd5..efd818177fbca 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -75,7 +75,6 @@ public: ObjectCacheMsgData m_data; bufferlist m_head_buffer; bufferlist m_data_buffer; - Context* m_on_finish; GenContext* m_process_msg; ObjectCacheRequest() {} -- 2.39.5