m_io_thread(nullptr), m_session_work(false), m_writing(false),
m_reading(false), m_sequence_id(0),
m_lock("ceph::cache::cacheclient::m_lock"),
- m_map_lock("ceph::cache::cacheclient::m_map_lock"),
m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)])
{
// TODO : release these resources.
- m_use_dedicated_worker = true;
// TODO : configure it.
+ m_use_dedicated_worker = true;
m_worker_thread_num = 2;
if(m_use_dedicated_worker) {
m_worker = new boost::asio::io_service();
req->m_head.version = 0;
req->m_head.reserved = 0;
req->m_head.type = RBDSC_READ;
+ req->m_head.padding = 0;
+ req->m_head.seq = ++m_sequence_id;
+
req->m_data.m_pool_name = pool_name;
req->m_data.m_oid = oid;
req->m_process_msg = on_finish;
-
- req->m_head.seq = m_sequence_id++;
req->encode();
ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader));
ceph_assert(req->get_data_buffer().length() == req->m_head.data_len);
- {
+ {
Mutex::Locker locker(m_lock);
m_outcoming_bl.append(req->get_head_buffer());
m_outcoming_bl.append(req->get_data_buffer());
- }
-
- {
- Mutex::Locker locker(m_map_lock);
ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end());
m_seq_to_req[req->m_head.seq] = req;
}
}
}
- //TODO(): split into smaller functions
void CacheClient::receive_message() {
ceph_assert(m_reading.load());
+ read_reply_header();
+ }
+
+ void CacheClient::read_reply_header() {
/* one head buffer for all arrived reply. */
// bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer));
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
+ auto raw_ptr = bp_head.c_str();
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(bp_head.c_str(), sizeof(ObjectCacheMsgHeader)),
+ boost::asio::buffer(raw_ptr, sizeof(ObjectCacheMsgHeader)),
boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
- [this, bp_head](const boost::system::error_code& err, size_t cb) {
- if(err || cb != sizeof(ObjectCacheMsgHeader)) {
- fault(ASIO_ERROR_READ, err);
- return;
- }
+ boost::bind(&CacheClient::handle_reply_header,
+ this, bp_head,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ void CacheClient::handle_reply_header(bufferptr bp_head,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ if(ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ fault(ASIO_ERROR_READ, ec);
+ return;
+ }
+
+ ceph_assert(bytes_transferred == bp_head.length());
+
+ ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
+ uint64_t data_len = head->data_len;
+ uint64_t seq_id = head->seq;
+ ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+
+ bufferptr bp_data(buffer::create(data_len));
+ read_reply_data(std::move(bp_head), std::move(bp_data), data_len, seq_id);
+ }
+
+ void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+ const uint64_t data_len, const uint64_t seq_id) {
+
+ auto raw_ptr = bp_data.c_str();
+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheClient::handle_reply_data,
+ this, std::move(bp_head), std::move(bp_data), data_len, seq_id,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+
+ }
+
+ void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+ const uint64_t data_len, const uint64_t seq_id,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ if (ec || bytes_transferred != data_len) {
+ fault(ASIO_ERROR_WRITE, ec);
+ return;
+ }
+
+ bufferlist head_buffer;
+ bufferlist data_buffer;
+ head_buffer.append(std::move(bp_head));
+ data_buffer.append(std::move(bp_data));
+ ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
+ ceph_assert(data_buffer.length() == data_len);
+
+ ObjectCacheRequest* reply = decode_object_cache_request(head_buffer, data_buffer);
+ data_buffer.clear();
+ ceph_assert(data_buffer.length() == 0);
+
+ process(reply, seq_id);
+
+ {
+ Mutex::Locker locker(m_lock);
+ if(m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
+ m_reading.store(false);
+ return;
+ }
+ }
+ if(is_session_work()) {
+ receive_message();
+ }
- ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
- uint64_t data_len = head->data_len;
- uint64_t seq_id = head->seq;
- bufferptr bp_data(buffer::create(data_len));
-
- boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(bp_data.c_str(), data_len),
- boost::asio::transfer_exactly(data_len),
- [this, bp_data, bp_head, data_len, seq_id](const boost::system::error_code& err, size_t cb) {
- if(err || cb != data_len) {
- fault(ASIO_ERROR_READ, err);
- return;
- }
-
- bufferlist head_buffer;
- head_buffer.append(std::move(bp_head));
- bufferlist data_buffer;
- data_buffer.append(std::move(bp_data));
-
- ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
- ceph_assert(data_buffer.length() == data_len);
-
- // create reply message which have been decoded according to bufferlist
- ObjectCacheRequest* ack = decode_object_cache_request(head_buffer, data_buffer);
-
- data_buffer.clear();
- ceph_assert(data_buffer.length() == 0);
-
- auto process_current_reply = m_seq_to_req[ack->m_head.seq]->m_process_msg;
-
- // if hit, this context will read file from local cache.
- auto user_ctx = new FunctionContext([this, ack, process_current_reply]
- (bool dedicated) {
- if(dedicated) {
- // current thread belog to worker.
- }
-
- process_current_reply->complete(ack);
- delete ack;
- });
-
- // Because user_ctx will read file and execute their callback, we think it hold on current thread
- // for long time, then defer read/write message from/to socket.
- // if want to use dedicated thread to execute this context, enable it.
- if(m_use_dedicated_worker) {
- m_worker->post([user_ctx](){
- user_ctx->complete(true);
- });
- } else {
- // use read/write thread to execute this context.
- user_ctx->complete(false);
- }
-
- {
- Mutex::Locker locker(m_map_lock);
- ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
- delete m_seq_to_req[seq_id];
- m_seq_to_req.erase(seq_id);
- if(m_seq_to_req.size() == 0) {
- m_reading.store(false);
- return;
- }
- }
-
- receive_message();
- });
- });
}
+ void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+ ObjectCacheRequest* current_request = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ current_request = m_seq_to_req[seq_id];
+ m_seq_to_req.erase(seq_id);
+ }
+
+ ceph_assert(current_request != nullptr);
+ auto process_reply = new FunctionContext([this, current_request, reply]
+ (bool dedicated) {
+ if(dedicated) {
+ // dedicated thrad to execute this context.
+ }
+ current_request->m_process_msg->complete(reply);
+ delete current_request;
+ delete reply;
+ });
+
+ if(m_use_dedicated_worker) {
+ m_worker->post([process_reply]() {
+ process_reply->complete(true);
+ });
+ } else {
+ process_reply->complete(false);
+ }
+ }
void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
ldout(cct, 20) << "fault." << ec.message() << dendl;
if(err_type == ASIO_ERROR_WRITE) {
ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
- // should not occur this error.
+ // CacheClient should not occur this error.
ceph_assert(0);
}
ldout(cct, 20) << "ASIO async connect fails : " << ec.message() << dendl;
}
- // TODO : currently, for any asio error, just shutdown RO.
- // TODO : re-write close / shutdown/
- //close();
+ // currently, for any asio error, just shutdown RO.
+ close();
// all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
{
- Mutex::Locker locker(m_map_lock);
+ Mutex::Locker locker(m_lock);
for(auto it : m_seq_to_req) {
it.second->m_head.type = RBDSC_READ_RADOS;
it.second->m_process_msg->complete(it.second);
m_seq_to_req.clear();
}
- //m_outcoming_bl.clear();
+ ldout(cct, 20) << "Because ASIO fails, just shutdown RO. Later all reading \
+ will be re-dispatched RADOS layer" << ec.message() << dendl;
}