template <typename I>
ParentCacheObjectDispatch<I>::ParentCacheObjectDispatch(
I* image_ctx) : m_image_ctx(image_ctx), m_cache_client(nullptr),
- m_initialized(false), m_object_store(nullptr) {
+ m_initialized(false), m_object_store(nullptr), m_re_connecting(false) {
std::string controller_path =
((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
delete m_cache_client;
}
-// TODO if connect fails, init will return error to high layer.
template <typename I>
void ParentCacheObjectDispatch<I>::init() {
auto cct = m_image_ctx->cct;
return;
}
- ldout(cct, 5) << "parent image: setup SRO cache client" << dendl;
+ C_SaferCond* cond = new C_SaferCond();
+ Context* create_session_ctx = new FunctionContext([cond](int ret) {
+ cond->complete(0);
+ });
- m_cache_client->run();
+ create_cache_session(create_session_ctx, false);
+ cond->wait();
- int ret = m_cache_client->connect();
- if (ret < 0) {
- ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
- << "please start ceph-immutable-object-cache daemon"
- << dendl;
- } else {
- ldout(cct, 5) << "SRO cache client to register volume "
- << "name = " << m_image_ctx->id
- << " on ceph-immutable-object-cache daemon"
- << dendl;
-
- auto ctx = new FunctionContext([this](bool reg) {
- handle_register_client(reg);
- });
-
- ret = m_cache_client->register_client(ctx);
-
- if (ret >= 0) {
- // add ourself to the IO object dispatcher chain
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
- m_initialized = true;
- }
- }
+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_initialized = true;
}
template <typename I>
auto cct = m_image_ctx->cct;
ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
<< object_len << dendl;
+ ceph_assert(m_initialized);
+
+ /* if RO daemon still don't startup, or RO daemon crash,
+ * or session have any error, try to re-connect daemon.*/
+ if (!m_cache_client->is_session_work()) {
+ if(!m_re_connecting.load()) {
+ ldout(cct, 20) << "try to re-connct RO daemon. " << dendl;
+ m_re_connecting.store(true);
+
+ Context* on_finish = new FunctionContext([this](int ret) {
+ m_re_connecting.store(false);
+ });
+ create_cache_session(on_finish, true);
+ }
- // if any failse, reads will go to rados
- if(m_cache_client == nullptr || !m_cache_client->is_session_work() ||
- m_object_store == nullptr || !m_initialized) {
- ldout(cct, 5) << "SRO cache client session failed " << dendl;
+ ldout(cct, 5) << "session don't work, dispatch current request to lower object layer " << dendl;
return false;
}
+ ceph_assert(m_cache_client->is_session_work());
+
CacheGenContextURef ctx = make_gen_lambda_context<ObjectCacheRequest*,
std::function<void(ObjectCacheRequest*)>>
([this, snap_id, read_data, dispatch_result, on_dispatched,
ldout(cct, 20) << dendl;
if (reg) {
- ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+ ldout(cct, 20) << "Parent cache open cache handler" << dendl;
m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
}
return 0;
}
+template <typename I>
+int ParentCacheObjectDispatch<I>::create_cache_session(Context* on_finish, bool is_reconnect) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ Context* register_ctx = new FunctionContext([this, cct, on_finish](int ret) {
+ if (ret < 0) {
+ ldout(cct, 20) << "Parent cache fail to register client." << dendl;
+ handle_register_client(false);
+ on_finish->complete(-1);
+ return;
+ }
+ ceph_assert(m_cache_client->is_session_work());
+
+ handle_register_client(true);
+ on_finish->complete(0);
+ });
+
+ Context* connect_ctx = new FunctionContext(
+ [this, cct, register_ctx](int ret) {
+ if (ret < 0) {
+ ldout(cct, 20) << "Parent cache fail to connect RO daeomn." << dendl;
+ register_ctx->complete(-1);
+ return;
+ }
+
+ ldout(cct, 20) << "Parent cache connected to RO daemon." << dendl;
+
+ m_cache_client->register_client(register_ctx);
+ });
+
+ if (m_cache_client != nullptr && is_reconnect) {
+ // CacheClient's destruction will cleanup all details on old session.
+ delete m_cache_client;
+
+ // create new CacheClient to connect RO daemon.
+ std::string controller_path =
+ ((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
+ m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
+ }
+
+ m_cache_client->run();
+
+ m_cache_client->connect(connect_ctx);
+ return 0;
+}
+
} // namespace cache
} // namespace librbd
io::DispatchResult* dispatch_result,
Context* on_dispatched);
int handle_register_client(bool reg);
+ int create_cache_session(Context* on_finish, bool is_reconnect);
CacheClient *m_cache_client = nullptr;
ImageCtxT* m_image_ctx;
SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
bool m_initialized;
+ std::atomic<bool> m_re_connecting;
};
} // namespace cache
MOCK_METHOD0(close, void());
MOCK_METHOD0(stop, void());
MOCK_METHOD0(connect, int());
+ MOCK_METHOD1(connect, void(Context*));
void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
std::string oid, CacheGenContextURef&& on_finish) {
// gmock don't support move
})));
}
+ void expect_cache_async_connect(MockParentImageCache& mparent_image_cache, int ret_val,
+ Context* on_finish) {
+ auto& expect = EXPECT_CALL(*(mparent_image_cache.get_cache_client()), connect(_));
+
+ expect.WillOnce(WithArg<0>(Invoke([on_finish, ret_val](Context* ctx) {
+ ctx->complete(ret_val);
+ on_finish->complete(ret_val);
+ })));
+ }
+
void expect_cache_lookup_object(MockParentImageCache& mparent_image_cache,
Context* on_finish) {
auto& expect = EXPECT_CALL(*(mparent_image_cache.get_cache_client()),
auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
expect_cache_run(*mock_parent_image_cache, 0);
- expect_cache_connect(*mock_parent_image_cache, 0);
+ C_SaferCond cond;
+ Context* handle_connect = new FunctionContext([&cond](int ret) {
+ ASSERT_EQ(ret, 0);
+ cond.complete(0);
+ });
+ expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
Context* ctx = new FunctionContext([](bool reg) {
ASSERT_EQ(reg, true);
});
expect_cache_stop(*mock_parent_image_cache, 0);
mock_parent_image_cache->init();
+ cond.wait();
ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+ expect_cache_session_state(*mock_parent_image_cache, true);
ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
mock_parent_image_cache->get_cache_client()->close();
auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
expect_cache_run(*mock_parent_image_cache, 0);
- expect_cache_connect(*mock_parent_image_cache, -1);
+ C_SaferCond cond;
+ Context* handle_connect = new FunctionContext([&cond](int ret) {
+ ASSERT_EQ(ret, -1);
+ cond.complete(0);
+ });
+ expect_cache_async_connect(*mock_parent_image_cache, -1, handle_connect);
+ expect_io_object_dispatcher_register_state(*mock_parent_image_cache, 0);
expect_cache_session_state(*mock_parent_image_cache, false);
expect_cache_close(*mock_parent_image_cache, 0);
expect_cache_stop(*mock_parent_image_cache, 0);
// initialization fails.
ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
- ASSERT_EQ(mock_parent_image_cache->get_state(), false);
+ ASSERT_EQ(mock_parent_image_cache->get_state(), true);
ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), false);
mock_parent_image_cache->get_cache_client()->close();
auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
expect_cache_run(*mock_parent_image_cache, 0);
- expect_cache_connect(*mock_parent_image_cache, 0);
+ C_SaferCond cond;
+ Context* handle_connect = new FunctionContext([&cond](int ret) {
+ ASSERT_EQ(ret, 0);
+ cond.complete(0);
+ });
+ expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
Context* ctx = new FunctionContext([](bool reg) {
ASSERT_EQ(reg, false);
});
expect_cache_register(*mock_parent_image_cache, ctx, -1);
- expect_cache_session_state(*mock_parent_image_cache, false);
+ expect_cache_session_state(*mock_parent_image_cache, true);
+ expect_io_object_dispatcher_register_state(*mock_parent_image_cache, 0);
expect_cache_close(*mock_parent_image_cache, 0);
expect_cache_stop(*mock_parent_image_cache, 0);
mock_parent_image_cache->init();
+ cond.wait();
ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
- ASSERT_EQ(mock_parent_image_cache->get_state(), false);
- ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), false);
+ ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+ expect_cache_session_state(*mock_parent_image_cache, true);
+ ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
mock_parent_image_cache->get_cache_client()->close();
mock_parent_image_cache->get_cache_client()->stop();
auto mock_parent_image_cache = MockParentImageCache::create(&mock_image_ctx);
expect_cache_run(*mock_parent_image_cache, 0);
- expect_cache_connect(*mock_parent_image_cache, 0);
+ C_SaferCond conn_cond;
+ Context* handle_connect = new FunctionContext([&conn_cond](int ret) {
+ ASSERT_EQ(ret, 0);
+ conn_cond.complete(0);
+ });
+ expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
Context* ctx = new FunctionContext([](bool reg) {
ASSERT_EQ(reg, true);
});
expect_cache_stop(*mock_parent_image_cache, 0);
mock_parent_image_cache->init();
+ conn_cond.wait();
ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
+ expect_cache_session_state(*mock_parent_image_cache, true);
ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), true);
C_SaferCond cond;
Context* on_finish = &cond;
auto& expect = EXPECT_CALL(*(mock_parent_image_cache->get_cache_client()), is_session_work())
+ .WillOnce(Return(true))
.WillOnce(Return(true));
expect_cache_lookup_object(*mock_parent_image_cache, on_finish);
void CacheClient::handle_connect(Context* on_finish,
const boost::system::error_code& err) {
if (err) {
- ldout(m_cct, 20) << "fails to connect to cache server." << dendl;
+ ldout(m_cct, 20) << "fails to connect to cache server. error : "
+ << err.message() << dendl;
fault(ASIO_ERROR_CONNECT, err);
on_finish->complete(-1);
return;
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
uint64_t snap_id, std::string oid,
CacheGenContextURef&& on_finish) {
+ ldout(m_cct, 20) << dendl;
ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
++m_sequence_id, 0, 0,
pool_id, snap_id, oid, pool_nspace);
}
void CacheClient::try_send() {
+ ldout(m_cct, 20) << dendl;
if (!m_writing.load()) {
m_writing.store(true);
send_message();
}
void CacheClient::send_message() {
+ ldout(m_cct, 20) << dendl;
bufferlist bl;
{
Mutex::Locker locker(m_lock);
fault(ASIO_ERROR_WRITE, err);
return;
}
+
ceph_assert(cb == bl.length());
{
}
void CacheClient::try_receive() {
+ ldout(m_cct, 20) << dendl;
if (!m_reading.load()) {
m_reading.store(true);
receive_message();
}
void CacheClient::receive_message() {
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_reading.load());
read_reply_header();
}
void CacheClient::read_reply_header() {
+ ldout(m_cct, 20) << dendl;
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(get_header_size()));
auto raw_ptr = bp_head.c_str();
void CacheClient::handle_reply_header(bufferptr bp_head,
const boost::system::error_code& ec,
size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
if (ec || bytes_transferred != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return;
void CacheClient::read_reply_data(bufferptr&& bp_head,
bufferptr&& bp_data,
const uint64_t data_len) {
+ ldout(m_cct, 20) << dendl;
auto raw_ptr = bp_data.c_str();
boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
boost::asio::transfer_exactly(data_len),
const uint64_t data_len,
const boost::system::error_code& ec,
size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
if (ec || bytes_transferred != data_len) {
fault(ASIO_ERROR_WRITE, ec);
return;
}
void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+ ldout(m_cct, 20) << dendl;
ObjectCacheRequest* current_request = nullptr;
{
Mutex::Locker locker(m_lock);
<< ec.message() << dendl;
}
+ // TODO : re-implement this method
int CacheClient::register_client(Context* on_finish) {
ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
m_sequence_id++);
data_buffer.append(std::move(bp_data));
ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
if (req->type == RBDSC_REGISTER_REPLY) {
- on_finish->complete(true);
+ m_session_work.store(true);
+ on_finish->complete(0);
} else {
- on_finish->complete(false);
+ on_finish->complete(-1);
}
delete req;
- m_session_work.store(true);
-
return 0;
}
size_t bytes_transferred) {
ldout(m_cct, 20) << dendl;
if (err || bytes_transferred != get_header_size()) {
- fault();
+ fault(err);
return;
}
size_t bytes_transferred) {
ldout(m_cct, 20) << dendl;
if (err || bytes_transferred != data_len) {
- fault();
+ fault(err);
return;
}
bl_data.append(std::move(bp));
ObjectCacheRequest* req = decode_object_cache_request(bl_data);
+
process(req);
delete req;
read_request_header();
size_t bytes_transferred) {
delete reply;
if (err || bytes_transferred != bl.length()) {
- fault();
+ fault(err);
return;
}
});
}
-void CacheSession::fault() {
- ldout(m_cct, 20) << dendl;
- // TODO(dehao)
+void CacheSession::fault(const boost::system::error_code& ec) {
+ ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
}
} // namespace immutable_obj_cache
const boost::system::error_code& err,
size_t bytes_transferred);
void process(ObjectCacheRequest* req);
- void fault();
+ void fault(const boost::system::error_code& ec);
void send(ObjectCacheRequest* msg);
private: