]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: eliminate session_map race between CacheSession
authorshangdehao1 <dehao.shang@intel.com>
Thu, 7 Mar 2019 18:35:15 +0000 (02:35 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:31 +0000 (00:16 +0800)
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
src/test/immutable_object_cache/test_DomainSocket.cc
src/test/immutable_object_cache/test_multi_session.cc
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/CacheController.h
src/tools/immutable_object_cache/CacheServer.cc
src/tools/immutable_object_cache/CacheServer.h
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/SocketCommon.h

index 0b6720e9ad32cdd6b1893afdd341059214cec00e..647c65f2e72a8eef33dc1ccff1b11935e4ef4ef1 100644 (file)
@@ -42,7 +42,7 @@ public:
   void SetUp() override {
     std::remove(m_local_path.c_str());
     m_cache_server = new CacheServer(g_ceph_context, m_local_path,
-      [this](uint64_t sid, ObjectCacheRequest* req){
+      [this](CacheSession* sid, ObjectCacheRequest* req){
         handle_request(sid, req);
     });
     ASSERT_TRUE(m_cache_server != nullptr);
@@ -77,12 +77,12 @@ public:
     delete srv_thd;
   }
 
-  void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
+  void handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
 
     switch (req->get_request_type()) {
       case RBDSC_REGISTER: {
         ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
-        m_cache_server->send(session_id, reply);
+        session_id->send(reply);
         break;
       }
       case RBDSC_READ: {
@@ -93,7 +93,7 @@ public:
         } else {
           reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/fakepath");
         }
-        m_cache_server->send(session_id, reply);
+        session_id->send(reply);
         break;
       }
     }
index 3ebdd4583ee8a3450555e49e3f0577cd2c68fde6..cdbb17fb16b60607a25094e9602a8bf6bb7d9a13 100644 (file)
@@ -39,7 +39,7 @@ public:
   void SetUp() override {
     std::remove(m_local_path.c_str());
     m_cache_server = new CacheServer(g_ceph_context, m_local_path,
-      [this](uint64_t session_id, ObjectCacheRequest* req){
+      [this](CacheSession* session_id, ObjectCacheRequest* req){
         server_handle_request(session_id, req);
     });
     ASSERT_TRUE(m_cache_server != nullptr);
@@ -85,19 +85,19 @@ public:
     return cache_client;
   }
 
-  void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
+  void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
 
     switch (req->get_request_type()) {
       case RBDSC_REGISTER: {
         ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
                                                                 req->seq);
-        m_cache_server->send(session_id, reply);
+        session_id->send(reply);
         break;
       }
       case RBDSC_READ: {
         ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
                                                                 req->seq);
-        m_cache_server->send(session_id, reply);
+        session_id->send(reply);
         break;
       }
     }
index 4fc2dce910ce05868806e665419e76e13f99aaf8..4e406bd54c0cc5e32e93a1e0d7e5a6e4cefd4e7b 100644 (file)
@@ -71,7 +71,7 @@ void CacheController::run() {
     std::remove(controller_path.c_str());
 
     m_cache_server = new CacheServer(m_cct, controller_path,
-      ([&](uint64_t p, ObjectCacheRequest* s){handle_request(p, s);}));
+      ([&](CacheSession* p, ObjectCacheRequest* s){handle_request(p, s);}));
 
     int ret = m_cache_server->run();
     if (ret != 0) {
@@ -82,7 +82,7 @@ void CacheController::run() {
   }
 }
 
-void CacheController::handle_request(uint64_t session_id,
+void CacheController::handle_request(CacheSession* session,
                                      ObjectCacheRequest* req) {
   ldout(m_cct, 20) << dendl;
 
@@ -92,7 +92,7 @@ void CacheController::handle_request(uint64_t session_id,
 
       ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
         RBDSC_REGISTER_REPLY, req->seq);
-      m_cache_server->send(session_id, reply);
+      session->send(reply);
       break;
     }
     case RBDSC_READ: {
@@ -109,7 +109,7 @@ void CacheController::handle_request(uint64_t session_id,
         reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
                                              req->seq, cache_path);
       }
-      m_cache_server->send(session_id, reply);
+      session->send(reply);
       break;
     }
     default:
index ef8495f767ef34a0f21adfb2006b968a419b37d3..6c46b37b207a632e4cdca8c6e6b578fa3eab8af6 100644 (file)
@@ -26,6 +26,7 @@ class CacheController {
   void run();
 
   void handle_request(uint64_t sesstion_id, ObjectCacheRequest* msg);
+  void handle_request(CacheSession* session, ObjectCacheRequest* msg);
 
  private:
   CacheServer *m_cache_server;
index 0df58d55f50afde8bf738db1373476e0bc3b3816..d6fecb079d4e306f5d61ecdc9bb28e09bd846539 100644 (file)
@@ -102,17 +102,5 @@ void CacheServer::handle_accept(CacheSessionPtr new_session,
   accept();
 }
 
-void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
-  ldout(cct, 20) << dendl;
-
-  auto it = m_session_map.find(session_id);
-  if (it != m_session_map.end()) {
-    it->second->send(msg);
-  } else {
-    ldout(cct, 20) << "missing reply session id" << dendl;
-    ceph_assert(0);
-  }
-}
-
 }  // namespace immutable_obj_cache
 }  // namespace ceph
index 84cdd89e5e1f0f9644635997fd19ec0943d08c08..045669921786a3f2d8e968cf640881fa0e554776 100644 (file)
@@ -25,7 +25,6 @@ class CacheServer {
   int run();
   int start_accept();
   int stop();
-  void send(uint64_t session_id, ObjectCacheRequest* msg);
 
  private:
   void accept();
@@ -39,7 +38,6 @@ class CacheServer {
   stream_protocol::endpoint m_local_path;
   stream_protocol::acceptor m_acceptor;
   uint64_t m_session_id = 1;
-  // TODO(dehao) : need to lock it.
   std::map<uint64_t, CacheSessionPtr> m_session_map;
 };
 
index 6069f811259821286cb6de4c699cbef60b82052c..e567f43a7c60e7b670ad6591a1220717589d3443 100644 (file)
@@ -99,7 +99,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
 
 void CacheSession::process(ObjectCacheRequest* req) {
   ldout(cct, 20) << dendl;
-  m_server_process_msg(m_session_id, req);
+  m_server_process_msg(this, req);
 }
 
 void CacheSession::send(ObjectCacheRequest* reply) {
index 9cd108ca06d213099834bc73bc46f06c87fa77da..4bbc2f611e68da14a7006b9d56e4232f71fc048b 100644 (file)
@@ -20,8 +20,9 @@ static const int ASIO_ERROR_ACCEPT = 0X04;
 static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
 
 class ObjectCacheRequest;
+class CacheSession;
 
-typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
+typedef std::function<void(CacheSession*, ObjectCacheRequest*)> ProcessMsg;
 
 }  // namespace immutable_obj_cache
 }  // namespace ceph