]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add re-connect and infligh enable feature
authorshangdehao1 <dehao.shang@intel.com>
Fri, 31 May 2019 01:47:31 +0000 (09:47 +0800)
committerJason Dillaman <dillaman@redhat.com>
Mon, 24 Jun 2019 21:35:40 +0000 (17:35 -0400)
- reconnect : when session don't work or RO daemon crash,
  parent cache will try to re-connect daemon.
- in-fligh enable RO daemon.

Signed-off-by: Dehao Shang <dehao.shang@intel.com>
src/librbd/cache/ParentCacheObjectDispatch.cc
src/librbd/cache/ParentCacheObjectDispatch.h
src/test/immutable_object_cache/MockCacheDaemon.h
src/test/librbd/cache/test_mock_ParentImageCache.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/CacheSession.h

index 2581fd8852367595d477d724654f9b055cc095a2..af30d28d9f01c076cbb125c5fda7d6b0e7339b56 100644 (file)
@@ -27,7 +27,7 @@ namespace cache {
 template <typename I>
 ParentCacheObjectDispatch<I>::ParentCacheObjectDispatch(
     I* image_ctx) : m_image_ctx(image_ctx), m_cache_client(nullptr),
-    m_initialized(false), m_object_store(nullptr) {
+    m_initialized(false), m_object_store(nullptr), m_re_connecting(false) {
   std::string controller_path =
     ((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
   m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
@@ -39,7 +39,6 @@ ParentCacheObjectDispatch<I>::~ParentCacheObjectDispatch() {
     delete m_cache_client;
 }
 
-// TODO if connect fails, init will return error to high layer.
 template <typename I>
 void ParentCacheObjectDispatch<I>::init() {
   auto cct = m_image_ctx->cct;
@@ -50,33 +49,16 @@ void ParentCacheObjectDispatch<I>::init() {
     return;
   }
 
-  ldout(cct, 5) << "parent image: setup SRO cache client" << dendl;
+  C_SaferCond* cond = new C_SaferCond();
+  Context* create_session_ctx = new FunctionContext([cond](int ret) {
+    cond->complete(0);
+  });
 
-  m_cache_client->run();
+  create_cache_session(create_session_ctx, false);
+  cond->wait();
 
-  int ret = m_cache_client->connect();
-  if (ret < 0) {
-    ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
-                  << "please start ceph-immutable-object-cache daemon"
-                 << dendl;
-  } else {
-    ldout(cct, 5) << "SRO cache client to register volume "
-                  << "name = " << m_image_ctx->id
-                  << " on ceph-immutable-object-cache daemon"
-                  << dendl;
-
-    auto ctx = new FunctionContext([this](bool reg) {
-      handle_register_client(reg);
-    });
-
-    ret = m_cache_client->register_client(ctx);
-
-    if (ret >= 0) {
-      // add ourself to the IO object dispatcher chain
-      m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
-      m_initialized = true;
-    }
-  }
+  m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+  m_initialized = true;
 }
 
 template <typename I>
@@ -90,14 +72,27 @@ bool ParentCacheObjectDispatch<I>::read(
   auto cct = m_image_ctx->cct;
   ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
                  << object_len << dendl;
+  ceph_assert(m_initialized);
+
+  /* if RO daemon still don't startup, or RO daemon crash,
+   * or session have any error, try to re-connect daemon.*/
+  if (!m_cache_client->is_session_work()) {
+    if(!m_re_connecting.load()) {
+      ldout(cct, 20) << "try to re-connct RO daemon. " << dendl;
+      m_re_connecting.store(true);
+
+      Context* on_finish = new FunctionContext([this](int ret) {
+        m_re_connecting.store(false);
+      });
+      create_cache_session(on_finish, true);
+    }
 
-  // if any failse, reads will go to rados
-  if(m_cache_client == nullptr || !m_cache_client->is_session_work() ||
-     m_object_store == nullptr || !m_initialized) {
-    ldout(cct, 5) << "SRO cache client session failed " << dendl;
+    ldout(cct, 5) << "session don't work, dispatch current request to lower object layer " << dendl;
     return false;
   }
 
+  ceph_assert(m_cache_client->is_session_work());
+
   CacheGenContextURef ctx = make_gen_lambda_context<ObjectCacheRequest*,
                                      std::function<void(ObjectCacheRequest*)>>
    ([this, snap_id, read_data, dispatch_result, on_dispatched,
@@ -149,12 +144,59 @@ int ParentCacheObjectDispatch<I>::handle_register_client(bool reg) {
   ldout(cct, 20) << dendl;
 
   if (reg) {
-    ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+    ldout(cct, 20) << "Parent cache open cache handler" << dendl;
     m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
   }
   return 0;
 }
 
+template <typename I>
+int ParentCacheObjectDispatch<I>::create_cache_session(Context* on_finish, bool is_reconnect) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << dendl;
+
+  Context* register_ctx = new FunctionContext([this, cct, on_finish](int ret) {
+    if (ret < 0) {
+      ldout(cct, 20) << "Parent cache fail to register client." << dendl;
+      handle_register_client(false);
+      on_finish->complete(-1);
+      return;
+    }
+    ceph_assert(m_cache_client->is_session_work());
+
+    handle_register_client(true);
+    on_finish->complete(0);
+  });
+
+  Context* connect_ctx = new FunctionContext(
+    [this, cct, register_ctx](int ret) {
+    if (ret < 0) {
+      ldout(cct, 20) << "Parent cache fail to connect RO daeomn." << dendl;
+      register_ctx->complete(-1);
+      return;
+    }
+
+    ldout(cct, 20) << "Parent cache connected to RO daemon." << dendl;
+
+    m_cache_client->register_client(register_ctx);
+  });
+
+  if (m_cache_client != nullptr && is_reconnect) {
+    // CacheClient's destruction will cleanup all details on old session.
+    delete m_cache_client;
+
+    // create new CacheClient to connect RO daemon.
+    std::string controller_path =
+      ((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
+    m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
+  }
+
+  m_cache_client->run();
+
+  m_cache_client->connect(connect_ctx);
+  return 0;
+}
+
 } // namespace cache
 } // namespace librbd
 
index 9fcc1da5c56c314fc01d3aa3a2d9b5cf7c8fe0c9..3459f74f26af62b7135c74125be673ae0cba16a0 100644 (file)
@@ -127,11 +127,13 @@ private:
          io::DispatchResult* dispatch_result,
          Context* on_dispatched);
   int handle_register_client(bool reg);
+  int create_cache_session(Context* on_finish, bool is_reconnect);
 
   CacheClient *m_cache_client = nullptr;
   ImageCtxT* m_image_ctx;
   SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
   bool m_initialized;
+  std::atomic<bool> m_re_connecting;
 };
 
 } // namespace cache
index c189dd127de477d5769becc8cdc467efee9a7cb5..71fd062650b54deb3f3688715d364c4304c99360 100644 (file)
@@ -23,6 +23,7 @@ class MockCacheClient {
   MOCK_METHOD0(close, void());
   MOCK_METHOD0(stop, void());
   MOCK_METHOD0(connect, int());
+  MOCK_METHOD1(connect, void(Context*));
   void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
               std::string oid, CacheGenContextURef&& on_finish) {
     // gmock don't support move
index b8916107e85c539abbdbfc77393e97a9d671d8bf..acfc91e464071f1f82a1b6fda3f0d5d4a5fe3563 100644 (file)
@@ -85,6 +85,16 @@ class TestMockParentImageCache : public TestMockFixture {
       })));
    }
 
+   void expect_cache_async_connect(MockParentImageCache& mparent_image_cache, int ret_val,
+                                   Context* on_finish) {
+     auto& expect = EXPECT_CALL(*(mparent_image_cache.get_cache_client()), connect(_));
+
+     expect.WillOnce(WithArg<0>(Invoke([on_finish, ret_val](Context* ctx) {
+       ctx->complete(ret_val);
+       on_finish->complete(ret_val);
+     })));
+   }
+
   void expect_cache_lookup_object(MockParentImageCache& mparent_image_cache,
                                   Context* on_finish) {
     auto& expect = EXPECT_CALL(*(mparent_image_cache.get_cache_client()),
@@ -143,7 +153,12 @@ TEST_F(TestMockParentImageCache, test_initialization_success) {
   auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
 
   expect_cache_run(*mock_parent_image_cache, 0);
-  expect_cache_connect(*mock_parent_image_cache, 0);
+  C_SaferCond cond;
+  Context* handle_connect = new FunctionContext([&cond](int ret) {
+    ASSERT_EQ(ret, 0);
+    cond.complete(0);
+  });
+  expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
   Context* ctx = new FunctionContext([](bool reg) {
     ASSERT_EQ(reg, true);
   });
@@ -154,10 +169,12 @@ TEST_F(TestMockParentImageCache, test_initialization_success) {
   expect_cache_stop(*mock_parent_image_cache, 0);
 
   mock_parent_image_cache->init();
+  cond.wait();
 
   ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
             io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
   ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+  expect_cache_session_state(*mock_parent_image_cache, true);
   ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
 
   mock_parent_image_cache->get_cache_client()->close();
@@ -174,7 +191,13 @@ TEST_F(TestMockParentImageCache, test_initialization_fail_at_connect) {
   auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
 
   expect_cache_run(*mock_parent_image_cache, 0);
-  expect_cache_connect(*mock_parent_image_cache, -1);
+  C_SaferCond cond;
+  Context* handle_connect = new FunctionContext([&cond](int ret) {
+    ASSERT_EQ(ret, -1);
+    cond.complete(0);
+  });
+  expect_cache_async_connect(*mock_parent_image_cache, -1, handle_connect);
+  expect_io_object_dispatcher_register_state(*mock_parent_image_cache, 0);
   expect_cache_session_state(*mock_parent_image_cache, false);
   expect_cache_close(*mock_parent_image_cache, 0);
   expect_cache_stop(*mock_parent_image_cache, 0);
@@ -184,7 +207,7 @@ TEST_F(TestMockParentImageCache, test_initialization_fail_at_connect) {
   // initialization fails.
   ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
             io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
-  ASSERT_EQ(mock_parent_image_cache->get_state(), false);
+  ASSERT_EQ(mock_parent_image_cache->get_state(), true);
   ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), false);
 
   mock_parent_image_cache->get_cache_client()->close();
@@ -202,21 +225,29 @@ TEST_F(TestMockParentImageCache, test_initialization_fail_at_register) {
   auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
 
   expect_cache_run(*mock_parent_image_cache, 0);
-  expect_cache_connect(*mock_parent_image_cache, 0);
+  C_SaferCond cond;
+  Context* handle_connect = new FunctionContext([&cond](int ret) {
+    ASSERT_EQ(ret, 0);
+    cond.complete(0);
+  });
+  expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
   Context* ctx = new FunctionContext([](bool reg) {
     ASSERT_EQ(reg, false);
   });
   expect_cache_register(*mock_parent_image_cache, ctx, -1);
-  expect_cache_session_state(*mock_parent_image_cache, false);
+  expect_cache_session_state(*mock_parent_image_cache, true);
+  expect_io_object_dispatcher_register_state(*mock_parent_image_cache, 0);
   expect_cache_close(*mock_parent_image_cache, 0);
   expect_cache_stop(*mock_parent_image_cache, 0);
 
   mock_parent_image_cache->init();
+  cond.wait();
 
   ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
             io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
-  ASSERT_EQ(mock_parent_image_cache->get_state(), false);
-  ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), false);
+  ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+  expect_cache_session_state(*mock_parent_image_cache, true);
+  ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
  
   mock_parent_image_cache->get_cache_client()->close();
   mock_parent_image_cache->get_cache_client()->stop();
@@ -272,7 +303,12 @@ TEST_F(TestMockParentImageCache, test_read) {
   auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
 
   expect_cache_run(*mock_parent_image_cache, 0);
-  expect_cache_connect(*mock_parent_image_cache, 0);
+  C_SaferCond conn_cond;
+  Context* handle_connect = new FunctionContext([&conn_cond](int ret) {
+    ASSERT_EQ(ret, 0);
+    conn_cond.complete(0);
+  });
+  expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
   Context* ctx = new FunctionContext([](bool reg) {
     ASSERT_EQ(reg, true);
   });
@@ -283,16 +319,19 @@ TEST_F(TestMockParentImageCache, test_read) {
   expect_cache_stop(*mock_parent_image_cache, 0);
 
   mock_parent_image_cache->init();
+  conn_cond.wait();
 
   ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
             io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
   ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+  expect_cache_session_state(*mock_parent_image_cache, true);
   ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
 
   C_SaferCond cond;
   Context* on_finish = &cond;
 
   auto& expect = EXPECT_CALL(*(mock_parent_image_cache->get_cache_client()), is_session_work())
+                   .WillOnce(Return(true))
                    .WillOnce(Return(true));
   expect_cache_lookup_object(*mock_parent_image_cache, on_finish); 
 
index bba3820f48868745c1fe32b87c7c344ade9c5841..b50491ce8dd57ec3951bbe838d3f53cabfb15524 100644 (file)
@@ -98,7 +98,8 @@ namespace immutable_obj_cache {
   void CacheClient::handle_connect(Context* on_finish,
                                    const boost::system::error_code& err) {
     if (err) {
-      ldout(m_cct, 20) << "fails to connect to cache server." << dendl;
+      ldout(m_cct, 20) << "fails to connect to cache server. error : "
+                       << err.message() << dendl;
       fault(ASIO_ERROR_CONNECT, err);
       on_finish->complete(-1);
       return;
@@ -111,6 +112,7 @@ namespace immutable_obj_cache {
   void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
                                   uint64_t snap_id, std::string oid,
                                   CacheGenContextURef&& on_finish) {
+    ldout(m_cct, 20) << dendl;
     ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
                                     ++m_sequence_id, 0, 0,
                                     pool_id, snap_id, oid, pool_nspace);
@@ -132,6 +134,7 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::try_send() {
+    ldout(m_cct, 20) << dendl;
     if (!m_writing.load()) {
       m_writing.store(true);
       send_message();
@@ -139,6 +142,7 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::send_message() {
+    ldout(m_cct, 20) << dendl;
     bufferlist bl;
     {
       Mutex::Locker locker(m_lock);
@@ -155,6 +159,7 @@ namespace immutable_obj_cache {
            fault(ASIO_ERROR_WRITE, err);
            return;
         }
+
         ceph_assert(cb == bl.length());
 
         {
@@ -172,6 +177,7 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::try_receive() {
+    ldout(m_cct, 20) << dendl;
     if (!m_reading.load()) {
       m_reading.store(true);
       receive_message();
@@ -179,11 +185,13 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::receive_message() {
+    ldout(m_cct, 20) << dendl;
     ceph_assert(m_reading.load());
     read_reply_header();
   }
 
   void CacheClient::read_reply_header() {
+    ldout(m_cct, 20) << dendl;
     /* create new head buffer for every reply */
     bufferptr bp_head(buffer::create(get_header_size()));
     auto raw_ptr = bp_head.c_str();
@@ -200,6 +208,7 @@ namespace immutable_obj_cache {
   void CacheClient::handle_reply_header(bufferptr bp_head,
          const boost::system::error_code& ec,
          size_t bytes_transferred) {
+    ldout(m_cct, 20) << dendl;
     if (ec || bytes_transferred != get_header_size()) {
       fault(ASIO_ERROR_READ, ec);
       return;
@@ -216,6 +225,7 @@ namespace immutable_obj_cache {
   void CacheClient::read_reply_data(bufferptr&& bp_head,
                                     bufferptr&& bp_data,
                                     const uint64_t data_len) {
+    ldout(m_cct, 20) << dendl;
     auto raw_ptr = bp_data.c_str();
     boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
       boost::asio::transfer_exactly(data_len),
@@ -230,6 +240,7 @@ namespace immutable_obj_cache {
                                       const uint64_t data_len,
                                       const boost::system::error_code& ec,
                                       size_t bytes_transferred) {
+    ldout(m_cct, 20) << dendl;
     if (ec || bytes_transferred != data_len) {
       fault(ASIO_ERROR_WRITE, ec);
       return;
@@ -259,6 +270,7 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+    ldout(m_cct, 20) << dendl;
     ObjectCacheRequest* current_request = nullptr;
     {
       Mutex::Locker locker(m_lock);
@@ -361,6 +373,7 @@ namespace immutable_obj_cache {
                        << ec.message() << dendl;
   }
 
+  // TODO : re-implement this method
   int CacheClient::register_client(Context* on_finish) {
     ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
                                                          m_sequence_id++);
@@ -403,14 +416,13 @@ namespace immutable_obj_cache {
     data_buffer.append(std::move(bp_data));
     ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
     if (req->type == RBDSC_REGISTER_REPLY) {
-      on_finish->complete(true);
+      m_session_work.store(true);
+      on_finish->complete(0);
     } else {
-      on_finish->complete(false);
+      on_finish->complete(-1);
     }
 
     delete req;
-    m_session_work.store(true);
-
     return 0;
   }
 
index c5ec0342878e332e07aa626ff67ed98fb7d85d83..5bf730354ab3e6390667210611aedc41ee6efc31 100644 (file)
@@ -59,7 +59,7 @@ void CacheSession::handle_request_header(const boost::system::error_code& err,
                                          size_t bytes_transferred) {
   ldout(m_cct, 20) << dendl;
   if (err || bytes_transferred != get_header_size()) {
-    fault();
+    fault(err);
     return;
   }
 
@@ -83,7 +83,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
                                       size_t bytes_transferred) {
   ldout(m_cct, 20) << dendl;
   if (err || bytes_transferred != data_len) {
-    fault();
+    fault(err);
     return;
   }
 
@@ -93,6 +93,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
   bl_data.append(std::move(bp));
 
   ObjectCacheRequest* req = decode_object_cache_request(bl_data);
+
   process(req);
   delete req;
   read_request_header();
@@ -116,15 +117,14 @@ void CacheSession::send(ObjectCacheRequest* reply) {
           size_t bytes_transferred) {
           delete reply;
           if (err || bytes_transferred != bl.length()) {
-            fault();
+            fault(err);
             return;
           }
         });
 }
 
-void CacheSession::fault() {
-  ldout(m_cct, 20) << dendl;
-  // TODO(dehao)
+void CacheSession::fault(const boost::system::error_code& ec) {
+  ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
 }
 
 }  // namespace immutable_obj_cache
index 575dcf9221d22bfa94e679a4bd6dee6dd3ac9fa3..d530b529c8c22008d5aa746a05833bb55d6406f9 100644 (file)
@@ -33,7 +33,7 @@ class CacheSession : public std::enable_shared_from_this<CacheSession> {
                           const boost::system::error_code& err,
                           size_t bytes_transferred);
   void process(ObjectCacheRequest* req);
-  void fault();
+  void fault(const boost::system::error_code& ec);
   void send(ObjectCacheRequest* msg);
 
  private: