From 59022ad43ed19795ac93b143beffa6012058f5e8 Mon Sep 17 00:00:00 2001 From: shangdehao1 Date: Mon, 21 Jan 2019 02:19:04 +0800 Subject: [PATCH] tool: break down receive_message implements Also, enable fault at CacheClient side. Signed-off-by: Dehao Shang --- .../immutable_object_cache/CacheClient.cc | 204 ++++++++++-------- .../immutable_object_cache/CacheClient.h | 20 +- 2 files changed, 130 insertions(+), 94 deletions(-) diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 8017afb3289..de373f93d96 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -18,12 +18,11 @@ namespace immutable_obj_cache { m_io_thread(nullptr), m_session_work(false), m_writing(false), m_reading(false), m_sequence_id(0), m_lock("ceph::cache::cacheclient::m_lock"), - m_map_lock("ceph::cache::cacheclient::m_map_lock"), m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]) { // TODO : release these resources. - m_use_dedicated_worker = true; // TODO : configure it. + m_use_dedicated_worker = true; m_worker_thread_num = 2; if(m_use_dedicated_worker) { m_worker = new boost::asio::io_service(); @@ -107,24 +106,21 @@ namespace immutable_obj_cache { req->m_head.version = 0; req->m_head.reserved = 0; req->m_head.type = RBDSC_READ; + req->m_head.padding = 0; + req->m_head.seq = ++m_sequence_id; + req->m_data.m_pool_name = pool_name; req->m_data.m_oid = oid; req->m_process_msg = on_finish; - - req->m_head.seq = m_sequence_id++; req->encode(); ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader)); ceph_assert(req->get_data_buffer().length() == req->m_head.data_len); - { + { Mutex::Locker locker(m_lock); m_outcoming_bl.append(req->get_head_buffer()); m_outcoming_bl.append(req->get_data_buffer()); - } - - { - Mutex::Locker locker(m_map_lock); ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end()); m_seq_to_req[req->m_head.seq] = req; } @@ -183,94 +179,124 @@ namespace immutable_obj_cache { } } - //TODO(): split into smaller functions void CacheClient::receive_message() { ceph_assert(m_reading.load()); + read_reply_header(); + } + + void CacheClient::read_reply_header() { /* one head buffer for all arrived reply. */ // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer)); /* create new head buffer for every reply */ bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader))); + auto raw_ptr = bp_head.c_str(); boost::asio::async_read(m_dm_socket, - boost::asio::buffer(bp_head.c_str(), sizeof(ObjectCacheMsgHeader)), + boost::asio::buffer(raw_ptr, sizeof(ObjectCacheMsgHeader)), boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)), - [this, bp_head](const boost::system::error_code& err, size_t cb) { - if(err || cb != sizeof(ObjectCacheMsgHeader)) { - fault(ASIO_ERROR_READ, err); - return; - } + boost::bind(&CacheClient::handle_reply_header, + this, bp_head, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void CacheClient::handle_reply_header(bufferptr bp_head, + const boost::system::error_code& ec, + size_t bytes_transferred) { + if(ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { + fault(ASIO_ERROR_READ, ec); + return; + } + + ceph_assert(bytes_transferred == bp_head.length()); + + ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str(); + uint64_t data_len = head->data_len; + uint64_t seq_id = head->seq; + ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); + + bufferptr bp_data(buffer::create(data_len)); + read_reply_data(std::move(bp_head), std::move(bp_data), data_len, seq_id); + } + + void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, + const uint64_t data_len, const uint64_t seq_id) { + + auto raw_ptr = bp_data.c_str(); + boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len), + boost::asio::transfer_exactly(data_len), + boost::bind(&CacheClient::handle_reply_data, + this, std::move(bp_head), std::move(bp_data), data_len, seq_id, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + } + + void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data, + const uint64_t data_len, const uint64_t seq_id, + const boost::system::error_code& ec, + size_t bytes_transferred) { + if (ec || bytes_transferred != data_len) { + fault(ASIO_ERROR_WRITE, ec); + return; + } + + bufferlist head_buffer; + bufferlist data_buffer; + head_buffer.append(std::move(bp_head)); + data_buffer.append(std::move(bp_data)); + ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader)); + ceph_assert(data_buffer.length() == data_len); + + ObjectCacheRequest* reply = decode_object_cache_request(head_buffer, data_buffer); + data_buffer.clear(); + ceph_assert(data_buffer.length() == 0); + + process(reply, seq_id); + + { + Mutex::Locker locker(m_lock); + if(m_seq_to_req.size() == 0 && m_outcoming_bl.length()) { + m_reading.store(false); + return; + } + } + if(is_session_work()) { + receive_message(); + } - ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str(); - uint64_t data_len = head->data_len; - uint64_t seq_id = head->seq; - bufferptr bp_data(buffer::create(data_len)); - - boost::asio::async_read(m_dm_socket, - boost::asio::buffer(bp_data.c_str(), data_len), - boost::asio::transfer_exactly(data_len), - [this, bp_data, bp_head, data_len, seq_id](const boost::system::error_code& err, size_t cb) { - if(err || cb != data_len) { - fault(ASIO_ERROR_READ, err); - return; - } - - bufferlist head_buffer; - head_buffer.append(std::move(bp_head)); - bufferlist data_buffer; - data_buffer.append(std::move(bp_data)); - - ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader)); - ceph_assert(data_buffer.length() == data_len); - - // create reply message which have been decoded according to bufferlist - ObjectCacheRequest* ack = decode_object_cache_request(head_buffer, data_buffer); - - data_buffer.clear(); - ceph_assert(data_buffer.length() == 0); - - auto process_current_reply = m_seq_to_req[ack->m_head.seq]->m_process_msg; - - // if hit, this context will read file from local cache. - auto user_ctx = new FunctionContext([this, ack, process_current_reply] - (bool dedicated) { - if(dedicated) { - // current thread belog to worker. - } - - process_current_reply->complete(ack); - delete ack; - }); - - // Because user_ctx will read file and execute their callback, we think it hold on current thread - // for long time, then defer read/write message from/to socket. - // if want to use dedicated thread to execute this context, enable it. - if(m_use_dedicated_worker) { - m_worker->post([user_ctx](){ - user_ctx->complete(true); - }); - } else { - // use read/write thread to execute this context. - user_ctx->complete(false); - } - - { - Mutex::Locker locker(m_map_lock); - ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); - delete m_seq_to_req[seq_id]; - m_seq_to_req.erase(seq_id); - if(m_seq_to_req.size() == 0) { - m_reading.store(false); - return; - } - } - - receive_message(); - }); - }); } + void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) { + ObjectCacheRequest* current_request = nullptr; + { + Mutex::Locker locker(m_lock); + ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); + current_request = m_seq_to_req[seq_id]; + m_seq_to_req.erase(seq_id); + } + + ceph_assert(current_request != nullptr); + auto process_reply = new FunctionContext([this, current_request, reply] + (bool dedicated) { + if(dedicated) { + // dedicated thrad to execute this context. + } + current_request->m_process_msg->complete(reply); + delete current_request; + delete reply; + }); + + if(m_use_dedicated_worker) { + m_worker->post([process_reply]() { + process_reply->complete(true); + }); + } else { + process_reply->complete(false); + } + } void CacheClient::fault(const int err_type, const boost::system::error_code& ec) { ldout(cct, 20) << "fault." << ec.message() << dendl; @@ -297,7 +323,7 @@ namespace immutable_obj_cache { if(err_type == ASIO_ERROR_WRITE) { ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl; - // should not occur this error. + // CacheClient should not occur this error. ceph_assert(0); } @@ -305,13 +331,12 @@ namespace immutable_obj_cache { ldout(cct, 20) << "ASIO async connect fails : " << ec.message() << dendl; } - // TODO : currently, for any asio error, just shutdown RO. - // TODO : re-write close / shutdown/ - //close(); + // currently, for any asio error, just shutdown RO. + close(); // all pending request, which have entered into ASIO, will be re-dispatched to RADOS. { - Mutex::Locker locker(m_map_lock); + Mutex::Locker locker(m_lock); for(auto it : m_seq_to_req) { it.second->m_head.type = RBDSC_READ_RADOS; it.second->m_process_msg->complete(it.second); @@ -319,7 +344,8 @@ namespace immutable_obj_cache { m_seq_to_req.clear(); } - //m_outcoming_bl.clear(); + ldout(cct, 20) << "Because ASIO fails, just shutdown RO. Later all reading \ + will be re-dispatched RADOS layer" << ec.message() << dendl; } diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index 8cb7f320e32..a30ce155847 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -23,24 +23,35 @@ namespace immutable_obj_cache { class CacheClient { public: + CacheClient(const std::string& file, CephContext* ceph_ctx); ~CacheClient(); void run(); bool is_session_work(); - void close(); int stop(); int connect(); - void lookup_object(std::string pool_name, std::string oid, GenContext* on_finish); + int register_client(Context* on_finish); + +private: + void send_message(); void try_send(); void fault(const int err_type, const boost::system::error_code& err); void try_receive(); void receive_message(); - int register_client(Context* on_finish); - + void process(ObjectCacheRequest* reply, uint64_t seq_id); + void read_reply_header(); + void handle_reply_header(bufferptr bp_head, + const boost::system::error_code& ec, size_t bytes_transferred); + void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, + const uint64_t data_len, const uint64_t seq_id); + void handle_reply_data(bufferptr bp_head, bufferptr bp_data, + const uint64_t data_len, const uint64_t seq_id, + const boost::system::error_code& ec, size_t bytes_transferred); private: + CephContext* cct; boost::asio::io_service m_io_service; boost::asio::io_service::work m_io_service_work; @@ -59,7 +70,6 @@ private: std::atomic m_reading; std::atomic m_sequence_id; Mutex m_lock; - Mutex m_map_lock; std::map m_seq_to_req; bufferlist m_outcoming_bl; char* m_header_buffer; -- 2.39.5