]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test: enable asio tests for immutable obj cache controller
authorYuan Zhou <yuan.zhou@intel.com>
Thu, 17 Jan 2019 05:47:31 +0000 (13:47 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:27 +0000 (00:16 +0800)
cleanups for immutable obj cache controller

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
src/test/immutable_object_cache/CMakeLists.txt
src/test/immutable_object_cache/test_DomainSocket.cc
src/test/immutable_object_cache/test_multi_session.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h
src/tools/immutable_object_cache/CacheServer.cc
src/tools/immutable_object_cache/SocketCommon.h
src/tools/immutable_object_cache/Types.h

index fa2601da49be503906dc21c3d11c142b38eaf2d7..aade49afba88bfb5862e1518e27ab8fe8917d472 100644 (file)
@@ -3,8 +3,8 @@ add_executable(unittest_ceph_immutable_obj_cache
   test_main.cc
   test_SimplePolicy.cc
   test_sync_file.cc
-  #test_DomainSocket.cc // TODO
-  #test_multi_session.cc // TODO
+  test_DomainSocket.cc
+  test_multi_session.cc
   test_object_store.cc
   test_message.cc
   )
index bbac47044fcbec24090f110854944ede6071ee6c..944e535e574e6a94dd7acaea98593920e03ac10e 100644 (file)
@@ -29,7 +29,8 @@ public:
   unordered_set<std::string> m_hit_entry_set;
 
   TestCommunication()
-    : m_cache_server(nullptr), m_cache_client(nullptr), m_local_path("/tmp/ceph_test_domain_socket"),
+    : m_cache_server(nullptr), m_cache_client(nullptr),
+      m_local_path("/tmp/ceph_test_domain_socket"),
       m_send_request_index(0), m_recv_ack_index(0)
     {}
 
@@ -40,8 +41,9 @@ public:
 
   void SetUp() override {
     std::remove(m_local_path.c_str());
-    m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t xx, std::string yy ){
-        handle_request(xx, yy);
+    m_cache_server = new CacheServer(g_ceph_context, m_local_path,
+      [this](uint64_t sid, ObjectCacheRequest* req){
+        handle_request(sid, req);
     });
     ASSERT_TRUE(m_cache_server != nullptr);
     srv_thd = new std::thread([this]() {m_cache_server->run();});
@@ -50,7 +52,7 @@ public:
     ASSERT_TRUE(m_cache_client != nullptr);
     m_cache_client->run();
 
-    while(true) {
+    while (true) {
       if (0 == m_cache_client->connect()) {
         break;
       }
@@ -75,59 +77,60 @@ public:
     delete srv_thd;
   }
 
-  void handle_request(uint64_t session_id, std::string msg){
-    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+  void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (io_ctx->type) {
+    switch (req->m_head.type) {
       case RBDSC_REGISTER: {
-        io_ctx->type = RBDSC_REGISTER_REPLY;
-        m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+        req->m_head.type = RBDSC_REGISTER_REPLY;
+        m_cache_server->send(session_id, req);
         break;
       }
       case RBDSC_READ: {
-        if (m_hit_entry_set.find(io_ctx->oid) == m_hit_entry_set.end()) {
-          io_ctx->type = RBDSC_READ_RADOS;
+        if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) {
+          req->m_head.type = RBDSC_READ_RADOS;
         } else {
-          io_ctx->type = RBDSC_READ_REPLY;
+          req->m_head.type = RBDSC_READ_REPLY;
         }
-        m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+        m_cache_server->send(session_id, req);
         break;
       }
     }
   }
 
-   // times: message number
-   // queue_deqth : imitate message queue depth
-   // thinking : imitate handing message time
-   void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
-     m_send_request_index.store(0);
-     m_recv_ack_index.store(0);
-     for (uint64_t index = 0; index < times; index++) {
-       auto ctx = new FunctionContext([this, thinking, times](bool req){
-          if (thinking != 0) {
-            usleep(thinking); // handling message
-          }
-          m_recv_ack_index++;
-          if (m_recv_ack_index == times) {
-            m_wait_event.signal();
-          }
-       });
-
-       // simple queue depth
-       while (m_send_request_index - m_recv_ack_index > queue_depth) {
-         usleep(1);
-       }
-
-       m_cache_client->lookup_object("test_pool", "123456", ctx);
-       m_send_request_index++;
-     }
-     m_wait_event.wait();
-   }
+  // times: message number
+  // queue_depth : imitate message queue depth
+  // thinking : imitate handing message time
+  void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
+    m_send_request_index.store(0);
+    m_recv_ack_index.store(0);
+    for (uint64_t index = 0; index < times; index++) {
+      auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+       ObjectCacheRequest*>([this, thinking, times](ObjectCacheRequest* ack){
+         if (thinking != 0) {
+           usleep(thinking); // handling message
+         }
+         m_recv_ack_index++;
+         if (m_recv_ack_index == times) {
+           m_wait_event.signal();
+         }
+      });
+
+      // simple queue depth
+      while (m_send_request_index - m_recv_ack_index > queue_depth) {
+        usleep(1);
+      }
+
+      m_cache_client->lookup_object("test_pool", "123456", ctx);
+      m_send_request_index++;
+    }
+    m_wait_event.wait();
+  }
 
   bool startup_lookupobject_testing(std::string pool_name, std::string object_id) {
     bool hit;
-    auto ctx = new FunctionContext([this, &hit](bool req){
-       hit = req;
+    auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+        ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){
+       hit = ack->m_head.type == RBDSC_READ_REPLY;
        m_wait_event.signal();
     });
     m_cache_client->lookup_object(pool_name, object_id, ctx);
@@ -148,8 +151,6 @@ TEST_F(TestCommunication, test_pingpong) {
   ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
   startup_pingpong_testing(200, 128, 0);
   ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
-  startup_pingpong_testing(10000, 512, 0);
-  ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
 }
 
 TEST_F(TestCommunication, test_lookup_object) {
index af786dd93ecbea5f72aae24c1df998bdcd2790f3..d8cc2ce2391bdeb46a4fe141cc878b86cb0c3662 100644 (file)
@@ -38,8 +38,9 @@ public:
 
   void SetUp() override {
     std::remove(m_local_path.c_str());
-    m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t session_id, std::string request ){
-        server_handle_request(session_id, request);
+    m_cache_server = new CacheServer(g_ceph_context, m_local_path,
+      [this](uint64_t session_id, ObjectCacheRequest* req){
+        server_handle_request(session_id, req);
     });
     ASSERT_TRUE(m_cache_server != nullptr);
 
@@ -56,14 +57,14 @@ public:
   }
 
   void TearDown() override {
-    for(uint64_t i = 0; i < m_session_num; i++) {
-      if(m_cache_client_vec[i] != nullptr) {
+    for (uint64_t i = 0; i < m_session_num; i++) {
+      if (m_cache_client_vec[i] != nullptr) {
         m_cache_client_vec[i]->close();
         delete m_cache_client_vec[i];
       }
     }
     m_cache_server->stop();
-    if(m_cache_server_thread->joinable()) {
+    if (m_cache_server_thread->joinable()) {
       m_cache_server_thread->join();
     }
     delete m_cache_server;
@@ -84,19 +85,17 @@ public:
     return cache_client;
   }
 
-  void server_handle_request(uint64_t session_id, std::string request) {
-    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(request.c_str());
+  void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (io_ctx->type) {
+    switch (req->m_head.type) {
       case RBDSC_REGISTER: {
-
-        io_ctx->type = RBDSC_REGISTER_REPLY;
-        m_cache_server->send(session_id, std::string((char*)io_ctx, request.size()));
+        req->m_head.type = RBDSC_REGISTER_REPLY;
+        m_cache_server->send(session_id, req);
         break;
       }
       case RBDSC_READ: {
-        io_ctx->type = RBDSC_READ_REPLY;
-        m_cache_server->send(session_id, std::string((char*)io_ctx, request.size()));
+        req->m_head.type = RBDSC_READ_REPLY;
+        m_cache_server->send(session_id, req);
         break;
       }
     }
@@ -118,7 +117,8 @@ public:
   void test_lookup_object(std::string pool, uint64_t index, uint64_t request_num, bool is_last) {
 
     for (uint64_t i = 0; i < request_num; i++) {
-      auto ctx = new FunctionContext([this](bool ret) {
+      auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+        ObjectCacheRequest*>([this](ObjectCacheRequest* ack) {
         m_recv_ack_index++;
       });
       m_send_request_index++;
@@ -126,7 +126,7 @@ public:
       m_cache_client_vec[index]->lookup_object(pool, "1234", ctx);
     }
 
-    if(is_last) {
+    if (is_last) {
       while(m_send_request_index != m_recv_ack_index) {
         usleep(1);
       }
@@ -141,7 +141,7 @@ TEST_F(TestMultiSession, test_multi_session) {
   uint64_t test_times = 1000;
   uint64_t test_session_num = 100;
 
-  for(uint64_t i = 0; i <= test_times; i++) {
+  for (uint64_t i = 0; i <= test_times; i++) {
     uint64_t random_index = random() % test_session_num;
     if (m_cache_client_vec[random_index] == nullptr) {
       test_register_client(random_index);
index 648102d4a0a91e2e8994c61b7d96e5ab116e9a97..8017afb3289eb36e9ebcbfd6260ac8e5c66d84fd 100644 (file)
@@ -13,17 +13,13 @@ namespace ceph {
 namespace immutable_obj_cache {
 
   CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
-    : m_io_service_work(m_io_service),
-      m_dm_socket(m_io_service),
-      m_ep(stream_protocol::endpoint(file)),
-      m_io_thread(nullptr),
-      m_session_work(false),
-      m_writing(false),
+    : cct(ceph_ctx), m_io_service_work(m_io_service),
+      m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
+      m_io_thread(nullptr), m_session_work(false), m_writing(false),
+      m_reading(false), m_sequence_id(0),
       m_lock("ceph::cache::cacheclient::m_lock"),
       m_map_lock("ceph::cache::cacheclient::m_map_lock"),
-      m_sequence_id(0),
-      m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
-      cct(ceph_ctx)
+      m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)])
   {
     // TODO : release these resources.
     m_use_dedicated_worker = true;
@@ -32,7 +28,7 @@ namespace immutable_obj_cache {
     if(m_use_dedicated_worker) {
       m_worker = new boost::asio::io_service();
       m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
-      for(int i = 0; i < m_worker_thread_num; i++) {
+      for(uint64_t i = 0; i < m_worker_thread_num; i++) {
         std::thread* thd = new std::thread([this](){m_worker->run();});
         m_worker_threads.push_back(thd);
       }
@@ -104,7 +100,16 @@ namespace immutable_obj_cache {
     return 0;
   }
 
-  void CacheClient::lookup_object(ObjectCacheRequest* req) {
+  void CacheClient::lookup_object(std::string pool_name, std::string oid,
+                                  GenContext<ObjectCacheRequest*>* on_finish) {
+
+    ObjectCacheRequest* req = new ObjectCacheRequest();
+    req->m_head.version = 0;
+    req->m_head.reserved = 0;
+    req->m_head.type = RBDSC_READ;
+    req->m_data.m_pool_name = pool_name;
+    req->m_data.m_oid = oid;
+    req->m_process_msg = on_finish;
 
     req->m_head.seq = m_sequence_id++;
     req->encode();
@@ -151,7 +156,7 @@ namespace immutable_obj_cache {
         boost::asio::buffer(bl.c_str(), bl.length()),
         boost::asio::transfer_exactly(bl.length()),
         [this, bl](const boost::system::error_code& err, size_t cb) {
-        if(err || cb != bl.length()) {
+        if (err || cb != bl.length()) {
            fault(ASIO_ERROR_WRITE, err);
            return;
         }
@@ -178,6 +183,7 @@ namespace immutable_obj_cache {
     }
   }
 
+  //TODO(): split into smaller functions
   void CacheClient::receive_message() {
     ceph_assert(m_reading.load());
 
@@ -252,6 +258,7 @@ namespace immutable_obj_cache {
             {
               Mutex::Locker locker(m_map_lock);
               ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+              delete m_seq_to_req[seq_id];
               m_seq_to_req.erase(seq_id);
               if(m_seq_to_req.size() == 0) {
                 m_reading.store(false);
index 7ecf7c851b4987cc98804be32379ef04bd6b3728..8cb7f320e32b71048a314b2a8276e582e4d38272 100644 (file)
@@ -32,7 +32,7 @@ public:
   int stop();
   int connect();
 
-  void lookup_object(ObjectCacheRequest* req);
+  void lookup_object(std::string pool_name, std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
   void send_message();
   void try_send();
   void fault(const int err_type, const boost::system::error_code& err);
@@ -41,29 +41,28 @@ public:
   int register_client(Context* on_finish);
 
 private:
+  CephContext* cct;
   boost::asio::io_service m_io_service;
   boost::asio::io_service::work m_io_service_work;
   stream_protocol::socket m_dm_socket;
-  ClientProcessMsg m_client_process_msg;
   stream_protocol::endpoint m_ep;
   std::shared_ptr<std::thread> m_io_thread;
   std::atomic<bool> m_session_work;
-  CephContext* cct;
 
   bool m_use_dedicated_worker;
-  int m_worker_thread_num;
+  uint64_t m_worker_thread_num;
   boost::asio::io_service* m_worker;
   std::vector<std::thread*> m_worker_threads;
   boost::asio::io_service::work* m_worker_io_service_work;
 
-  char* m_header_buffer;
   std::atomic<bool> m_writing;
   std::atomic<bool> m_reading;
   std::atomic<uint64_t> m_sequence_id;
   Mutex m_lock;
-  bufferlist m_outcoming_bl;
   Mutex m_map_lock;
   std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
+  bufferlist m_outcoming_bl;
+  char* m_header_buffer;
 };
 
 } // namespace immutable_obj_cache
index a9f4ba44b518636add0b8539efead3833fe373e8..e2a1e4549aaa65586f4af711fe3f20f537139923 100644 (file)
@@ -86,7 +86,6 @@ void CacheServer::accept() {
 void CacheServer::handle_accept(CacheSessionPtr new_session,
                                 const boost::system::error_code& error) {
   ldout(cct, 20) << dendl;
-  std::cout << "new session arrived....." << std::endl;
   if (error) {
     // operation_absort
     lderr(cct) << "async accept fails : " << error.message() << dendl;
index 77f4d184739cadf756e440f96129a812bb84d133..7c914213d16ffc7aa2ed9efe6f508c5c551df8b2 100644 (file)
@@ -24,7 +24,6 @@ static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
 class ObjectCacheRequest;
 
 typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
-typedef std::function<void(std::string)> ClientProcessMsg;
 
 } // namespace immutable_obj_cache
 } // namespace ceph
index f4b3c18b2cfd53314325f6f31e988330b15d031e..efd818177fbca6a97d265359c1400a5b88093381 100644 (file)
@@ -75,7 +75,6 @@ public:
     ObjectCacheMsgData m_data;
     bufferlist m_head_buffer;
     bufferlist m_data_buffer;
-    Context* m_on_finish;
     GenContext<ObjectCacheRequest*>* m_process_msg;
 
     ObjectCacheRequest() {}