]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tool: break down receive_message implements
authorshangdehao1 <dehao.shang@intel.com>
Sun, 20 Jan 2019 18:19:04 +0000 (02:19 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:27 +0000 (00:16 +0800)
Also, enable fault at CacheClient side.

Signed-off-by: Dehao Shang <dehao.shang@intel.com>
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h

index 8017afb3289eb36e9ebcbfd6260ac8e5c66d84fd..de373f93d968d2a4aac4679d59efe63e9726be27 100644 (file)
@@ -18,12 +18,11 @@ namespace immutable_obj_cache {
       m_io_thread(nullptr), m_session_work(false), m_writing(false),
       m_reading(false), m_sequence_id(0),
       m_lock("ceph::cache::cacheclient::m_lock"),
-      m_map_lock("ceph::cache::cacheclient::m_map_lock"),
       m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)])
   {
     // TODO : release these resources.
-    m_use_dedicated_worker = true;
     // TODO : configure it.
+    m_use_dedicated_worker = true;
     m_worker_thread_num = 2;
     if(m_use_dedicated_worker) {
       m_worker = new boost::asio::io_service();
@@ -107,24 +106,21 @@ namespace immutable_obj_cache {
     req->m_head.version = 0;
     req->m_head.reserved = 0;
     req->m_head.type = RBDSC_READ;
+    req->m_head.padding = 0;
+    req->m_head.seq = ++m_sequence_id;
+
     req->m_data.m_pool_name = pool_name;
     req->m_data.m_oid = oid;
     req->m_process_msg = on_finish;
-
-    req->m_head.seq = m_sequence_id++;
     req->encode();
 
     ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader));
     ceph_assert(req->get_data_buffer().length() == req->m_head.data_len);
 
-    {
+   {
       Mutex::Locker locker(m_lock);
       m_outcoming_bl.append(req->get_head_buffer());
       m_outcoming_bl.append(req->get_data_buffer());
-    }
-
-    {
-      Mutex::Locker locker(m_map_lock);
       ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end());
       m_seq_to_req[req->m_head.seq] = req;
     }
@@ -183,94 +179,124 @@ namespace immutable_obj_cache {
     }
   }
 
-  //TODO(): split into smaller functions
   void CacheClient::receive_message() {
     ceph_assert(m_reading.load());
+    read_reply_header();
+  }
+
+  void CacheClient::read_reply_header() {
 
     /* one head buffer for all arrived reply. */
     // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer));
 
     /* create new head buffer for every reply */
     bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
+    auto raw_ptr = bp_head.c_str();
 
     boost::asio::async_read(m_dm_socket,
-      boost::asio::buffer(bp_head.c_str(), sizeof(ObjectCacheMsgHeader)),
+      boost::asio::buffer(raw_ptr, sizeof(ObjectCacheMsgHeader)),
       boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
-      [this, bp_head](const boost::system::error_code& err, size_t cb) {
-        if(err || cb != sizeof(ObjectCacheMsgHeader)) {
-          fault(ASIO_ERROR_READ, err);
-          return;
-        }
+      boost::bind(&CacheClient::handle_reply_header,
+                  this, bp_head,
+                  boost::asio::placeholders::error,
+                  boost::asio::placeholders::bytes_transferred));
+  }
+
+  void CacheClient::handle_reply_header(bufferptr bp_head,
+                                        const boost::system::error_code& ec,
+                                        size_t bytes_transferred) {
+    if(ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+      fault(ASIO_ERROR_READ, ec);
+      return;
+    }
+
+    ceph_assert(bytes_transferred == bp_head.length());
+
+    ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
+    uint64_t data_len = head->data_len;
+    uint64_t seq_id = head->seq;
+    ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+
+    bufferptr bp_data(buffer::create(data_len));
+    read_reply_data(std::move(bp_head), std::move(bp_data), data_len, seq_id);
+  }
+
+  void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+                                    const uint64_t data_len, const uint64_t seq_id) {
+
+    auto raw_ptr = bp_data.c_str();
+    boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
+      boost::asio::transfer_exactly(data_len),
+      boost::bind(&CacheClient::handle_reply_data,
+                  this, std::move(bp_head), std::move(bp_data), data_len, seq_id,
+                  boost::asio::placeholders::error,
+                  boost::asio::placeholders::bytes_transferred));
+
+  }
+
+  void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+                                      const uint64_t data_len, const uint64_t seq_id,
+                                      const boost::system::error_code& ec,
+                                      size_t bytes_transferred) {
+    if (ec || bytes_transferred != data_len) {
+      fault(ASIO_ERROR_WRITE, ec);
+      return;
+    }
+
+    bufferlist head_buffer;
+    bufferlist data_buffer;
+    head_buffer.append(std::move(bp_head));
+    data_buffer.append(std::move(bp_data));
+    ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
+    ceph_assert(data_buffer.length() == data_len);
+
+    ObjectCacheRequest* reply = decode_object_cache_request(head_buffer, data_buffer);
+    data_buffer.clear();
+    ceph_assert(data_buffer.length() == 0);
+
+    process(reply, seq_id);
+
+    {
+      Mutex::Locker locker(m_lock);
+      if(m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
+        m_reading.store(false);
+        return;
+      }
+    }
+    if(is_session_work()) {
+      receive_message();
+    }
 
-        ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
-        uint64_t data_len = head->data_len;
-        uint64_t seq_id = head->seq;
-        bufferptr bp_data(buffer::create(data_len));
-
-        boost::asio::async_read(m_dm_socket,
-          boost::asio::buffer(bp_data.c_str(), data_len),
-          boost::asio::transfer_exactly(data_len),
-          [this, bp_data, bp_head, data_len, seq_id](const boost::system::error_code& err, size_t cb) {
-            if(err || cb != data_len) {
-              fault(ASIO_ERROR_READ, err);
-              return;
-            }
-
-            bufferlist head_buffer;
-            head_buffer.append(std::move(bp_head));
-            bufferlist data_buffer;
-            data_buffer.append(std::move(bp_data));
-
-            ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
-            ceph_assert(data_buffer.length() == data_len);
-
-            // create reply message which have been decoded according to bufferlist
-            ObjectCacheRequest* ack = decode_object_cache_request(head_buffer, data_buffer);
-
-            data_buffer.clear();
-            ceph_assert(data_buffer.length() == 0);
-
-            auto process_current_reply = m_seq_to_req[ack->m_head.seq]->m_process_msg;
-
-            // if hit, this context will read file from local cache.
-            auto user_ctx = new FunctionContext([this, ack, process_current_reply]
-               (bool dedicated) {
-              if(dedicated) {
-                // current thread belog to worker.
-              }
-
-              process_current_reply->complete(ack);
-              delete ack;
-            });
-
-            // Because user_ctx will read file and execute their callback, we think it hold on current thread
-            // for long time, then defer read/write message from/to socket.
-            // if want to use dedicated thread to execute this context, enable it.
-            if(m_use_dedicated_worker) {
-              m_worker->post([user_ctx](){
-                user_ctx->complete(true);
-              });
-            } else {
-              // use read/write thread to execute this context.
-              user_ctx->complete(false);
-            }
-
-            {
-              Mutex::Locker locker(m_map_lock);
-              ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
-              delete m_seq_to_req[seq_id];
-              m_seq_to_req.erase(seq_id);
-              if(m_seq_to_req.size() == 0) {
-                m_reading.store(false);
-                return;
-              }
-            }
-
-            receive_message();
-        });
-    });
   }
 
+  void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+    ObjectCacheRequest* current_request = nullptr;
+    {
+      Mutex::Locker locker(m_lock);
+      ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+      current_request = m_seq_to_req[seq_id];
+      m_seq_to_req.erase(seq_id);
+    }
+
+    ceph_assert(current_request != nullptr);
+    auto process_reply = new FunctionContext([this, current_request, reply]
+      (bool dedicated) {
+       if(dedicated) {
+         // dedicated thrad to execute this context.
+       }
+       current_request->m_process_msg->complete(reply);
+       delete current_request;
+       delete reply;
+    });
+
+    if(m_use_dedicated_worker) {
+      m_worker->post([process_reply]() {
+        process_reply->complete(true);
+      });
+    } else {
+      process_reply->complete(false);
+    }
+  }
 
   void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
     ldout(cct, 20) << "fault." << ec.message() << dendl;
@@ -297,7 +323,7 @@ namespace immutable_obj_cache {
 
     if(err_type == ASIO_ERROR_WRITE) {
        ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
-       // should not occur this error.
+       // CacheClient should not occur this error.
        ceph_assert(0);
     }
 
@@ -305,13 +331,12 @@ namespace immutable_obj_cache {
        ldout(cct, 20) << "ASIO async connect fails : " << ec.message() << dendl;
     }
 
-    // TODO : currently, for any asio error, just shutdown RO.
-    // TODO : re-write close / shutdown/
-    //close();
+    // currently, for any asio error, just shutdown RO.
+    close();
 
     // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
     {
-      Mutex::Locker locker(m_map_lock);
+      Mutex::Locker locker(m_lock);
       for(auto it : m_seq_to_req) {
         it.second->m_head.type = RBDSC_READ_RADOS;
         it.second->m_process_msg->complete(it.second);
@@ -319,7 +344,8 @@ namespace immutable_obj_cache {
       m_seq_to_req.clear();
     }
 
-    //m_outcoming_bl.clear();
+    ldout(cct, 20) << "Because ASIO fails, just shutdown RO. Later all reading \
+                       will be re-dispatched RADOS layer"  << ec.message() << dendl;
   }
 
 
index 8cb7f320e32b71048a314b2a8276e582e4d38272..a30ce155847202cf1ecfba25cf5e8a10c8fdc625 100644 (file)
@@ -23,24 +23,35 @@ namespace immutable_obj_cache {
 
 class CacheClient {
 public:
+
   CacheClient(const std::string& file, CephContext* ceph_ctx);
   ~CacheClient();
   void run();
   bool is_session_work();
-
   void close();
   int stop();
   int connect();
-
   void lookup_object(std::string pool_name, std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
+  int register_client(Context* on_finish);
+
+private:
+
   void send_message();
   void try_send();
   void fault(const int err_type, const boost::system::error_code& err);
   void try_receive();
   void receive_message();
-  int register_client(Context* on_finish);
-
+  void process(ObjectCacheRequest* reply, uint64_t seq_id);
+  void read_reply_header();
+  void handle_reply_header(bufferptr bp_head,
+                           const boost::system::error_code& ec, size_t bytes_transferred);
+  void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+                       const uint64_t data_len, const uint64_t seq_id);
+  void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+                        const uint64_t data_len, const uint64_t seq_id,
+                        const boost::system::error_code& ec, size_t bytes_transferred);
 private:
+
   CephContext* cct;
   boost::asio::io_service m_io_service;
   boost::asio::io_service::work m_io_service_work;
@@ -59,7 +70,6 @@ private:
   std::atomic<bool> m_reading;
   std::atomic<uint64_t> m_sequence_id;
   Mutex m_lock;
-  Mutex m_map_lock;
   std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
   bufferlist m_outcoming_bl;
   char* m_header_buffer;