]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: refactor asio domain socket of RO
authorshangdehao1 <dehao.shang@intel.com>
Sun, 13 Jan 2019 22:46:19 +0000 (06:46 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:27 +0000 (00:16 +0800)
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
14 files changed:
src/test/immutable_object_cache/CMakeLists.txt
src/test/immutable_object_cache/test_message.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/CacheController.h
src/tools/immutable_object_cache/CacheServer.cc
src/tools/immutable_object_cache/CacheServer.h
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/CacheSession.h
src/tools/immutable_object_cache/ObjectCacheFile.cc
src/tools/immutable_object_cache/SocketCommon.h
src/tools/immutable_object_cache/Types.cc
src/tools/immutable_object_cache/Types.h

index aade49afba88bfb5862e1518e27ab8fe8917d472..fa2601da49be503906dc21c3d11c142b38eaf2d7 100644 (file)
@@ -3,8 +3,8 @@ add_executable(unittest_ceph_immutable_obj_cache
   test_main.cc
   test_SimplePolicy.cc
   test_sync_file.cc
-  test_DomainSocket.cc
-  test_multi_session.cc
+  #test_DomainSocket.cc // TODO
+  #test_multi_session.cc // TODO
   test_object_store.cc
   test_message.cc
   )
index 4d00007e6ff45bb290e282ec5e2f5fb2a1df8323..c8360ec0f5c147e96e2a340cf3261d3b6bb23f1c 100644 (file)
@@ -13,51 +13,46 @@ TEST(test_for_message, test_1)
   header.seq = 1;
   header.type = 2;
   header.version =3;
-  header.mid_len =4;
   header.data_len = 5;
   header.reserved = 6;
 
   ObjectCacheRequest req;
 
   ASSERT_EQ(req.m_head_buffer.length(), 0);
-  ASSERT_EQ(req.m_mid_buffer.length(), 0);
   ASSERT_EQ(req.m_data_buffer.length(), 0);
 
   req.m_head = header;
 
-  req.m_mid.m_image_size = 111111;
-  req.m_mid.m_read_offset = 222222;
-  req.m_mid.m_read_len = 333333;
-  req.m_mid.m_pool_name = pool_name;
-  req.m_mid.m_image_name = image_name;
-  req.m_mid.m_oid = oid_name;
+  req.m_data.m_image_size = 111111;
+  req.m_data.m_read_offset = 222222;
+  req.m_data.m_read_len = 333333;
+  req.m_data.m_pool_name = pool_name;
+  req.m_data.m_image_name = image_name;
+  req.m_data.m_oid = oid_name;
 
   req.encode();
 
   ASSERT_EQ(req.m_head_buffer.length(), sizeof(req.m_head));
-  ASSERT_EQ(req.m_data_buffer.length(), 0);
 
 
   ObjectCacheRequest* req_decode;
 
   auto x = req.get_head_buffer();
-  auto y = req.get_mid_buffer();
   auto z = req.get_data_buffer();
 
-  req_decode = decode_object_cache_request(x, y, z);
+  req_decode = decode_object_cache_request(x, z);
 
   ASSERT_EQ(req_decode->m_head.seq, header.seq);
   ASSERT_EQ(req_decode->m_head.type, header.type);
   ASSERT_EQ(req_decode->m_head.version, header.version);
-  ASSERT_EQ(req_decode->m_head.mid_len, req.m_mid_buffer.length());
   ASSERT_EQ(req_decode->m_head.data_len, req.m_data_buffer.length());
   ASSERT_EQ(req_decode->m_head.reserved, header.reserved);
 
 
-  ASSERT_EQ(req_decode->m_mid.m_image_size, 111111);
-  ASSERT_EQ(req_decode->m_mid.m_read_offset, 222222);
-  ASSERT_EQ(req_decode->m_mid.m_read_len, 333333);
-  ASSERT_EQ(req_decode->m_mid.m_pool_name, pool_name);
-  ASSERT_EQ(req_decode->m_mid.m_image_name, image_name);
-  ASSERT_EQ(req_decode->m_mid.m_oid, oid_name);
+  ASSERT_EQ(req_decode->m_data.m_image_size, 111111);
+  ASSERT_EQ(req_decode->m_data.m_read_offset, 222222);
+  ASSERT_EQ(req_decode->m_data.m_read_len, 333333);
+  ASSERT_EQ(req_decode->m_data.m_pool_name, pool_name);
+  ASSERT_EQ(req_decode->m_data.m_image_name, image_name);
+  ASSERT_EQ(req_decode->m_data.m_oid, oid_name);
 }
index 2e7609007f30c20fcd915f6917027a138046a8bc..648102d4a0a91e2e8994c61b7d96e5ab116e9a97 100644 (file)
@@ -18,11 +18,30 @@ namespace immutable_obj_cache {
       m_ep(stream_protocol::endpoint(file)),
       m_io_thread(nullptr),
       m_session_work(false),
+      m_writing(false),
+      m_lock("ceph::cache::cacheclient::m_lock"),
+      m_map_lock("ceph::cache::cacheclient::m_map_lock"),
+      m_sequence_id(0),
+      m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
       cct(ceph_ctx)
-  {}
+  {
+    // TODO : release these resources.
+    m_use_dedicated_worker = true;
+    // TODO : configure it.
+    m_worker_thread_num = 2;
+    if(m_use_dedicated_worker) {
+      m_worker = new boost::asio::io_service();
+      m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
+      for(int i = 0; i < m_worker_thread_num; i++) {
+        std::thread* thd = new std::thread([this](){m_worker->run();});
+        m_worker_threads.push_back(thd);
+      }
+    }
+  }
 
   CacheClient::~CacheClient() {
     stop();
+    delete m_header_buffer;
   }
 
   void CacheClient::run(){
@@ -85,147 +104,274 @@ namespace immutable_obj_cache {
     return 0;
   }
 
+  void CacheClient::lookup_object(ObjectCacheRequest* req) {
+
+    req->m_head.seq = m_sequence_id++;
+    req->encode();
+
+    ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader));
+    ceph_assert(req->get_data_buffer().length() == req->m_head.data_len);
+
+    {
+      Mutex::Locker locker(m_lock);
+      m_outcoming_bl.append(req->get_head_buffer());
+      m_outcoming_bl.append(req->get_data_buffer());
+    }
+
+    {
+      Mutex::Locker locker(m_map_lock);
+      ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end());
+      m_seq_to_req[req->m_head.seq] = req;
+    }
+
+    // try to send message to server.
+    try_send();
+
+    // try to receive ack from server.
+    try_receive();
+  }
+
+  void CacheClient::try_send() {
+    if(!m_writing.load()) {
+      m_writing.store(true);
+      send_message();
+    }
+  }
+
+  void CacheClient::send_message() {
+    bufferlist bl;
+    {
+      Mutex::Locker locker(m_lock);
+      bl.swap(m_outcoming_bl);
+      ceph_assert(m_outcoming_bl.length() == 0);
+    }
+
+    // send bytes as many as possible.
+    boost::asio::async_write(m_dm_socket,
+        boost::asio::buffer(bl.c_str(), bl.length()),
+        boost::asio::transfer_exactly(bl.length()),
+        [this, bl](const boost::system::error_code& err, size_t cb) {
+        if(err || cb != bl.length()) {
+           fault(ASIO_ERROR_WRITE, err);
+           return;
+        }
+        ceph_assert(cb == bl.length());
+
+        {
+           Mutex::Locker locker(m_lock);
+           if(m_outcoming_bl.length() == 0) {
+             m_writing.store(false);
+             return;
+           }
+        }
+
+        // still have left bytes, continue to send.
+        send_message();
+    });
+    try_receive();
+  }
+
+  void CacheClient::try_receive() {
+    if(!m_reading.load()) {
+      m_reading.store(true);
+      receive_message();
+    }
+  }
+
+  void CacheClient::receive_message() {
+    ceph_assert(m_reading.load());
+
+    /* one head buffer for all arrived reply. */
+    // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer));
+
+    /* create new head buffer for every reply */
+    bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
+
+    boost::asio::async_read(m_dm_socket,
+      boost::asio::buffer(bp_head.c_str(), sizeof(ObjectCacheMsgHeader)),
+      boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+      [this, bp_head](const boost::system::error_code& err, size_t cb) {
+        if(err || cb != sizeof(ObjectCacheMsgHeader)) {
+          fault(ASIO_ERROR_READ, err);
+          return;
+        }
+
+        ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
+        uint64_t data_len = head->data_len;
+        uint64_t seq_id = head->seq;
+        bufferptr bp_data(buffer::create(data_len));
+
+        boost::asio::async_read(m_dm_socket,
+          boost::asio::buffer(bp_data.c_str(), data_len),
+          boost::asio::transfer_exactly(data_len),
+          [this, bp_data, bp_head, data_len, seq_id](const boost::system::error_code& err, size_t cb) {
+            if(err || cb != data_len) {
+              fault(ASIO_ERROR_READ, err);
+              return;
+            }
+
+            bufferlist head_buffer;
+            head_buffer.append(std::move(bp_head));
+            bufferlist data_buffer;
+            data_buffer.append(std::move(bp_data));
+
+            ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
+            ceph_assert(data_buffer.length() == data_len);
+
+            // create reply message which have been decoded according to bufferlist
+            ObjectCacheRequest* ack = decode_object_cache_request(head_buffer, data_buffer);
+
+            data_buffer.clear();
+            ceph_assert(data_buffer.length() == 0);
+
+            auto process_current_reply = m_seq_to_req[ack->m_head.seq]->m_process_msg;
+
+            // if hit, this context will read file from local cache.
+            auto user_ctx = new FunctionContext([this, ack, process_current_reply]
+               (bool dedicated) {
+              if(dedicated) {
+                // current thread belog to worker.
+              }
+
+              process_current_reply->complete(ack);
+              delete ack;
+            });
+
+            // Because user_ctx will read file and execute their callback, we think it hold on current thread
+            // for long time, then defer read/write message from/to socket.
+            // if want to use dedicated thread to execute this context, enable it.
+            if(m_use_dedicated_worker) {
+              m_worker->post([user_ctx](){
+                user_ctx->complete(true);
+              });
+            } else {
+              // use read/write thread to execute this context.
+              user_ctx->complete(false);
+            }
+
+            {
+              Mutex::Locker locker(m_map_lock);
+              ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+              m_seq_to_req.erase(seq_id);
+              if(m_seq_to_req.size() == 0) {
+                m_reading.store(false);
+                return;
+              }
+            }
+
+            receive_message();
+        });
+    });
+  }
+
+
+  void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
+    ldout(cct, 20) << "fault." << ec.message() << dendl;
+    // if one request fails, just call its callback, then close this socket.
+    if(!m_session_work.load()) {
+      return;
+    }
+
+    // when current session don't work, ASIO will don't receive any new request from hook.
+    // On the other hand, for pending request of ASIO, cancle these request, then call their callback.
+    // there request which are cancled by fault, will be re-dispatched to RADOS layer.
+    //
+    // make sure just have one thread to modify execute below code.
+    m_session_work.store(false);
+
+    if(err_type == ASIO_ERROR_MSG_INCOMPLETE) {
+       ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+       ceph_assert(0);
+    }
+
+    if(err_type == ASIO_ERROR_READ) {
+       ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+    }
+
+    if(err_type == ASIO_ERROR_WRITE) {
+       ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+       // should not occur this error.
+       ceph_assert(0);
+    }
+
+    if(err_type == ASIO_ERROR_CONNECT) {
+       ldout(cct, 20) << "ASIO async connect fails : " << ec.message() << dendl;
+    }
+
+    // TODO : currently, for any asio error, just shutdown RO.
+    // TODO : re-write close / shutdown/
+    //close();
+
+    // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
+    {
+      Mutex::Locker locker(m_map_lock);
+      for(auto it : m_seq_to_req) {
+        it.second->m_head.type = RBDSC_READ_RADOS;
+        it.second->m_process_msg->complete(it.second);
+      }
+      m_seq_to_req.clear();
+    }
+
+    //m_outcoming_bl.clear();
+  }
+
+
+  // TODO : use async + wait_event
+  // TODO : accept one parameter : ObjectCacheRequest
   int CacheClient::register_client(Context* on_finish) {
-    // cache controller will init layout
-    rbdsc_req_type_t *message = new rbdsc_req_type_t();
-    message->type = RBDSC_REGISTER;
+    ObjectCacheRequest* message = new ObjectCacheRequest();
+    message->m_head.version = 0;
+    message->m_head.seq = m_sequence_id++;
+    message->m_head.type = RBDSC_REGISTER;
+    message->m_head.reserved = 0;
+    message->encode();
+
+    bufferlist bl;
+    bl.append(message->get_head_buffer());
+    bl.append(message->get_data_buffer());
 
     uint64_t ret;
     boost::system::error_code ec;
 
     ret = boost::asio::write(m_dm_socket,
-      boost::asio::buffer((char*)message, message->size()), ec);
+      boost::asio::buffer(bl.c_str(), bl.length()), ec);
 
-    if(ec) {
-      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
+    if(ec || ret != bl.length()) {
+      fault(ASIO_ERROR_WRITE, ec);
       return -1;
     }
 
-    if(ret != message->size()) {
-      ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
-      return -1;
-    }
-
-    // hard code TODO
     ret = boost::asio::read(m_dm_socket,
-      boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-
-    if(ec == boost::asio::error::eof) {
-      ldout(cct, 20) << "recv eof" << dendl;
+      boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec);
+    if(ec || ret != sizeof(ObjectCacheMsgHeader)) {
+      fault(ASIO_ERROR_READ, ec);
       return -1;
     }
 
-    if(ec) {
-      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
-      return -1;
-    }
+    ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)m_header_buffer;
+    uint64_t data_len = head->data_len;
+    bufferptr bp_data(buffer::create(data_len));
 
-    if(ret != RBDSC_MSG_LEN) {
-      ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
+    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
+    if(ec || ret != data_len) {
+      fault(ASIO_ERROR_READ, ec);
       return -1;
     }
 
-    std::string reply_msg(m_recv_buffer, ret);
-    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(reply_msg.c_str());
-
-    if (io_ctx->type == RBDSC_REGISTER_REPLY) {
+    bufferlist data_buffer;
+    data_buffer.append(std::move(bp_data));
+    ObjectCacheRequest* req = decode_object_cache_request(head, data_buffer);
+    if (req->m_head.type == RBDSC_REGISTER_REPLY) {
       on_finish->complete(true);
     } else {
       on_finish->complete(false);
     }
 
-    delete message;
-
-    ldout(cct, 20) << "register volume success" << dendl;
-
-    // TODO
+    delete req;
     m_session_work.store(true);
 
     return 0;
   }
 
-  // if occur any error, we just return false. Then read from rados.
-  int CacheClient::lookup_object(std::string pool_name, std::string object_id,
-                                 Context* on_finish) {
-    rbdsc_req_type_t *message = new rbdsc_req_type_t();
-    message->type = RBDSC_READ;
-    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
-    memcpy(message->oid, object_id.c_str(), object_id.size());
-    message->vol_size = 0;
-    message->offset = 0;
-    message->length = 0;
-
-    boost::asio::async_write(m_dm_socket,
-      boost::asio::buffer((char*)message, message->size()),
-      boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-      [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
-          delete message;
-          if(err) {
-            ldout(cct, 20) << "async_write failed"
-                           << err.message() << dendl;
-            close();
-            on_finish->complete(false);
-            return;
-          }
-          if(cb != RBDSC_MSG_LEN) {
-            ldout(cct, 20) << "async_write failed in-complete request" << dendl;
-            close();
-            on_finish->complete(false);
-            return;
-          }
-          get_result(on_finish);
-    });
-
-    return 0;
-  }
-
-  void CacheClient::get_result(Context* on_finish) {
-    char* lookup_result = new char[RBDSC_MSG_LEN + 1];
-    boost::asio::async_read(m_dm_socket,
-      boost::asio::buffer(lookup_result, RBDSC_MSG_LEN),
-      boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-      [this, lookup_result, on_finish](const boost::system::error_code& err,
-                                       size_t cb) {
-          if(err == boost::asio::error::eof ||
-            err == boost::asio::error::connection_reset ||
-            err == boost::asio::error::operation_aborted ||
-            err == boost::asio::error::bad_descriptor) {
-            ldout(cct, 20) << "fail to read lookup result"
-                           << err.message() << dendl;
-            close();
-            on_finish->complete(false);
-            delete lookup_result;
-            return;
-          }
-
-          if(err) {
-            ldout(cct, 1) << "fail to read lookup result"
-                          << err.message() << dendl;
-            close();
-            on_finish->complete(false);
-            delete lookup_result;
-            return;
-          }
-
-          if (cb != RBDSC_MSG_LEN) {
-            ldout(cct, 1) << "incomplete lookup result" << dendl;
-            close();
-            on_finish->complete(false);
-            delete lookup_result;
-            return;
-          }
-
-         rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(lookup_result);
-
-          if (io_ctx->type == RBDSC_READ_REPLY) {
-           on_finish->complete(true);
-          } else {
-           on_finish->complete(false);
-          }
-          delete lookup_result;
-          return;
-    });
-  }
-
 } // namespace immutable_obj_cache
 } // namespace ceph
index 8bf76dd4909e36bbf0c81475c90aa5405bf926ce..7ecf7c851b4987cc98804be32379ef04bd6b3728 100644 (file)
@@ -11,6 +11,8 @@
 #include <boost/algorithm/string.hpp>
 #include "include/ceph_assert.h"
 #include "include/Context.h"
+#include "common/Mutex.h"
+#include "Types.h"
 #include "SocketCommon.h"
 
 
@@ -30,9 +32,13 @@ public:
   int stop();
   int connect();
 
+  void lookup_object(ObjectCacheRequest* req);
+  void send_message();
+  void try_send();
+  void fault(const int err_type, const boost::system::error_code& err);
+  void try_receive();
+  void receive_message();
   int register_client(Context* on_finish);
-  int lookup_object(std::string pool_name, std::string object_id, Context* on_finish);
-  void get_result(Context* on_finish);
 
 private:
   boost::asio::io_service m_io_service;
@@ -40,14 +46,24 @@ private:
   stream_protocol::socket m_dm_socket;
   ClientProcessMsg m_client_process_msg;
   stream_protocol::endpoint m_ep;
-  char m_recv_buffer[1024];
   std::shared_ptr<std::thread> m_io_thread;
-
-  // atomic modfiy for this variable.
-  // thread 1 : asio callback thread modify it.
-  // thread 2 : librbd read it.
   std::atomic<bool> m_session_work;
   CephContext* cct;
+
+  bool m_use_dedicated_worker;
+  int m_worker_thread_num;
+  boost::asio::io_service* m_worker;
+  std::vector<std::thread*> m_worker_threads;
+  boost::asio::io_service::work* m_worker_io_service_work;
+
+  char* m_header_buffer;
+  std::atomic<bool> m_writing;
+  std::atomic<bool> m_reading;
+  std::atomic<uint64_t> m_sequence_id;
+  Mutex m_lock;
+  bufferlist m_outcoming_bl;
+  Mutex m_map_lock;
+  std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
 };
 
 } // namespace immutable_obj_cache
index 717852eef9166c872713c490dd75c5683f48b42b..b657a101655d09d5f2c0db49df23cc7d3ff0da6a 100644 (file)
@@ -49,7 +49,7 @@ void CacheController::run() {
     std::remove(controller_path.c_str());
 
     m_cache_server = new CacheServer(m_cct, controller_path,
-      ([&](uint64_t p, std::string s){handle_request(p, s);}));
+      ([&](uint64_t p, ObjectCacheRequest* s){handle_request(p, s);}));
 
     int ret = m_cache_server->run();
     if (ret != 0) {
@@ -60,34 +60,32 @@ void CacheController::run() {
   }
 }
 
-void CacheController::handle_request(uint64_t session_id, std::string msg){
+void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
   ldout(m_cct, 20) << dendl;
 
-  rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
-
-  switch (io_ctx->type) {
+  switch (req->m_head.type) {
     case RBDSC_REGISTER: {
       // init cache layout for volume
       m_object_cache_store->init_cache();
-      io_ctx->type = RBDSC_REGISTER_REPLY;
-      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+      req->m_head.type = RBDSC_REGISTER_REPLY;
+      m_cache_server->send(session_id, req);
 
       break;
     }
     case RBDSC_READ: {
       // lookup object in local cache store
-      int ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->oid);
+      int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_name, req->m_data.m_oid);
       if (ret < 0) {
-        io_ctx->type = RBDSC_READ_RADOS;
+        req->m_head.type = RBDSC_READ_RADOS;
       } else {
-        io_ctx->type = RBDSC_READ_REPLY;
+        req->m_head.type = RBDSC_READ_REPLY;
       }
-      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+      m_cache_server->send(session_id, req);
 
       break;
     }
     ldout(m_cct, 5) << "can't recongize request" << dendl;
-    assert(0); // TODO replace it.
+    ceph_assert(0); // TODO replace it.
   }
 }
 
index 427ad5cc80fac4e033e356dc940de7523ed96e05..4d112c027b52f455bbe594288cd4a9a3847d4142 100644 (file)
@@ -25,7 +25,7 @@ class CacheController {
 
   void run();
 
-  void handle_request(uint64_t sesstion_id, std::string msg);
+  void handle_request(uint64_t sesstion_id, ObjectCacheRequest* msg);
 
  private:
   CacheServer *m_cache_server;
index 3e0f4c8a34d5d5c89eaf5846ed8f3f1e1cebb9de..a9f4ba44b518636add0b8539efead3833fe373e8 100644 (file)
@@ -47,18 +47,6 @@ int CacheServer::stop() {
   return 0;
 }
 
-void CacheServer::send(uint64_t session_id, std::string msg) {
-  ldout(cct, 20) << dendl;
-
-  auto it = m_session_map.find(session_id);
-  if (it != m_session_map.end()) {
-    it->second->send(msg);
-  } else {
-    ldout(cct, 20) << "missing reply session id" << dendl;
-    assert(0);
-  }
-}
-
 int CacheServer::start_accept() {
   ldout(cct, 20) << dendl;
 
@@ -86,9 +74,10 @@ int CacheServer::start_accept() {
 }
 
 void CacheServer::accept() {
+  CacheSessionPtr new_session = nullptr;
+
+  new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
 
-  CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service,
-                                               m_server_process_msg, cct));
   m_acceptor.async_accept(new_session->socket(),
       boost::bind(&CacheServer::handle_accept, this, new_session,
         boost::asio::placeholders::error));
@@ -97,6 +86,7 @@ void CacheServer::accept() {
 void CacheServer::handle_accept(CacheSessionPtr new_session,
                                 const boost::system::error_code& error) {
   ldout(cct, 20) << dendl;
+  std::cout << "new session arrived....." << std::endl;
   if (error) {
     // operation_absort
     lderr(cct) << "async accept fails : " << error.message() << dendl;
@@ -112,5 +102,17 @@ void CacheServer::handle_accept(CacheSessionPtr new_session,
   accept();
 }
 
+void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
+  ldout(cct, 20) << dendl;
+
+  auto it = m_session_map.find(session_id);
+  if (it != m_session_map.end()) {
+    it->second->send(msg);
+  } else {
+    ldout(cct, 20) << "missing reply session id" << dendl;
+    ceph_assert(0);
+  }
+}
+
 } // namespace immutable_obj_cache
 } // namespace ceph
index a0174451521bec3823e43bac4ce8170330002ab1..4646a15952b6fbf100d4da35bd899eb2a1982f3f 100644 (file)
@@ -7,6 +7,7 @@
 #include <boost/asio.hpp>
 #include <boost/asio/error.hpp>
 
+#include "Types.h"
 #include "SocketCommon.h"
 #include "CacheSession.h"
 
@@ -17,15 +18,14 @@ namespace ceph {
 namespace immutable_obj_cache {
 
 class CacheServer {
-
  public:
   CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg);
   ~CacheServer();
 
   int run();
-  void send(uint64_t session_id, std::string msg);
   int start_accept();
   int stop();
+  void send(uint64_t session_id, ObjectCacheRequest* msg);
 
  private:
   void accept();
@@ -33,11 +33,12 @@ class CacheServer {
 
  private:
   CephContext* cct;
-  boost::asio::io_service m_io_service; // TODO wrapper it.
+  boost::asio::io_service m_io_service;
   ProcessMsg m_server_process_msg;
   stream_protocol::endpoint m_local_path;
   stream_protocol::acceptor m_acceptor;
   uint64_t m_session_id = 1;
+  // TODO : need to lock it.
   std::map<uint64_t, CacheSessionPtr> m_session_map;
 };
 
index 417d198a6fbb990f65d86c1949c3223659731fb1..52098f33cc1d2e1190971bf911e886331a172243 100644 (file)
@@ -18,11 +18,13 @@ namespace immutable_obj_cache {
 CacheSession::CacheSession(uint64_t session_id, io_service& io_service,
                            ProcessMsg processmsg, CephContext* cct)
     : m_session_id(session_id), m_dm_socket(io_service),
-      process_msg(processmsg), cct(cct)
+      m_head_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
+      m_server_process_msg(processmsg), cct(cct)
     {}
 
 CacheSession::~CacheSession() {
   close();
+  delete[] m_head_buffer;
 }
 
 stream_protocol::socket& CacheSession::socket() {
@@ -40,73 +42,88 @@ void CacheSession::close() {
 }
 
 void CacheSession::start() {
-  handing_request();
+  read_request_header();
 }
 
-void CacheSession::handing_request() {
+void CacheSession::read_request_header() {
   boost::asio::async_read(m_dm_socket,
-                          boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
-                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-                          boost::bind(&CacheSession::handle_read,
+                          boost::asio::buffer(m_head_buffer, sizeof(ObjectCacheMsgHeader)),
+                          boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+                          boost::bind(&CacheSession::handle_request_header,
                                       shared_from_this(),
                                       boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
 }
 
-void CacheSession::handle_read(const boost::system::error_code& err,
+void CacheSession::handle_request_header(const boost::system::error_code& err,
                                size_t bytes_transferred) {
-  if (err == boost::asio::error::eof ||
-     err == boost::asio::error::connection_reset ||
-     err == boost::asio::error::operation_aborted ||
-     err == boost::asio::error::bad_descriptor) {
-    ldout(cct, 20) << "fail to handle read : " << err.message() << dendl;
-    close();
+  if(err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+    fault();
     return;
   }
 
-  if(err) {
-    ldout(cct, 1) << "faile to handle read: " << err.message() << dendl;
-    return;
-  }
+  ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer);
+  ceph_assert(head->version == 0);
+  ceph_assert(head->reserved == 0);
+  ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ ||
+              head->type == RBDSC_LOOKUP);
 
-  if(bytes_transferred != RBDSC_MSG_LEN) {
-    ldout(cct, 1) << "incomplete read" << dendl;
-    return;
-  }
-
-  process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+  read_request_data(head->data_len);
 }
 
-void CacheSession::handle_write(const boost::system::error_code& error,
-                                size_t bytes_transferred) {
-  if (error) {
-    ldout(cct, 20) << "async_write failed: " << error.message() << dendl;
-    assert(0);
-  }
+void CacheSession::read_request_data(uint64_t data_len) {
+  bufferptr bp_data(buffer::create(data_len));
+  boost::asio::async_read(m_dm_socket,
+                          boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+                          boost::asio::transfer_exactly(data_len),
+                          boost::bind(&CacheSession::handle_request_data,
+                                      shared_from_this(), bp_data, data_len,
+                                      boost::asio::placeholders::error,
+                                      boost::asio::placeholders::bytes_transferred));
+}
 
-  if(bytes_transferred != RBDSC_MSG_LEN) {
-    ldout(cct, 20) << "reply in-complete. "<<dendl;
-    assert(0);
+void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
+                                      const boost::system::error_code& err,
+                                      size_t bytes_transferred) {
+  if(err || bytes_transferred != data_len) {
+    fault();
+    return;
   }
 
-  boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
-                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-                          boost::bind(&CacheSession::handle_read,
-                          shared_from_this(),
-                          boost::asio::placeholders::error,
-                          boost::asio::placeholders::bytes_transferred));
+  bufferlist bl_data;
+  bl_data.append(std::move(bp));
+  ObjectCacheRequest* req = decode_object_cache_request(
+                               (ObjectCacheMsgHeader*)m_head_buffer, bl_data);
+  process(req);
+  read_request_header();
+}
 
+void CacheSession::process(ObjectCacheRequest* req) {
+   m_server_process_msg(m_session_id, req);
 }
 
-void CacheSession::send(std::string msg) {
-    boost::asio::async_write(m_dm_socket,
-        boost::asio::buffer(msg.c_str(), msg.size()),
-        boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-        boost::bind(&CacheSession::handle_write,
-                    shared_from_this(),
-                    boost::asio::placeholders::error,
-                    boost::asio::placeholders::bytes_transferred));
+void CacheSession::send(ObjectCacheRequest* reply) {
+  reply->m_head_buffer.clear();
+  reply->m_data_buffer.clear();
+  reply->encode();
+  bufferlist bl;
+  bl.append(reply->get_head_buffer());
+  bl.append(reply->get_data_buffer());
+
+  boost::asio::async_write(m_dm_socket,
+        boost::asio::buffer(bl.c_str(), bl.length()),
+        boost::asio::transfer_exactly(bl.length()),
+        [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
+          if(err || bytes_transferred != bl.length()) {
+            fault();
+            return;
+          }
+          delete reply;
+        });
+}
 
+void CacheSession::fault() {
+  // TODO
 }
 
 } // namespace immutable_obj_cache
index 989e1181ca29fdc223d94ad3ae537896de502525..c6761d48df7d066d18392d1002669d31daf34b25 100644 (file)
@@ -8,6 +8,7 @@
 #include <boost/asio.hpp>
 #include <boost/asio/error.hpp>
 
+#include "Types.h"
 #include "SocketCommon.h"
 
 using boost::asio::local::stream_protocol;
@@ -18,35 +19,26 @@ namespace immutable_obj_cache {
 
 class CacheSession : public std::enable_shared_from_this<CacheSession> {
 public:
-  CacheSession(uint64_t session_id, io_service& io_service,
-               ProcessMsg processmsg, CephContext* cct);
+  CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx);
   ~CacheSession();
-
   stream_protocol::socket& socket();
-  void start();
   void close();
-  void handing_request();
-
-private:
-
-  void handle_read(const boost::system::error_code& error,
-                   size_t bytes_transferred);
-
-  void handle_write(const boost::system::error_code& error,
-                    size_t bytes_transferred);
-
-public:
-  void send(std::string msg);
+  void start();
+  void read_request_header();
+  void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred);
+  void read_request_data(uint64_t data_len);
+  void handle_request_data(bufferptr bp, uint64_t data_len,
+                          const boost::system::error_code& err, size_t bytes_transferred);
+  void process(ObjectCacheRequest* req);
+  void fault();
+  void send(ObjectCacheRequest* msg);
 
 private:
   uint64_t m_session_id;
   stream_protocol::socket m_dm_socket;
-  ProcessMsg process_msg;
+  char* m_head_buffer;
+  ProcessMsg m_server_process_msg;
   CephContext* cct;
-
-  // Buffer used to store data received from the client.
-  //std::array<char, 1024> data_;
-  char m_buffer[1024];
 };
 
 typedef std::shared_ptr<CacheSession> CacheSessionPtr;
index c83e1e6e7570891d5210ad2912e72f847b0d7bd8..47c9da68ba8b82592273677bec04ce789eee2ca8 100644 (file)
@@ -58,7 +58,8 @@ int ObjectCacheFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t
 
   bufferlist temp_bl;
   std::string error_str;
-  // TODO : optimization
+
+  // TODO : current implements will drop sharely performance.
   int ret = temp_bl.read_file(m_name.c_str(), &error_str);
   if (ret < 0) {
     lderr(cct)<<"read file fail:" << error_str << dendl;
index 0bbea734398e132bc37520f22ebf26404be4eee5..77f4d184739cadf756e440f96129a812bb84d133 100644 (file)
@@ -4,9 +4,6 @@
 #ifndef CEPH_CACHE_SOCKET_COMMON_H
 #define CEPH_CACHE_SOCKET_COMMON_H
 
-#include "include/types.h"
-#include "include/int_types.h"
-
 namespace ceph {
 namespace immutable_obj_cache {
 
@@ -18,40 +15,16 @@ static const int RBDSC_READ_REPLY      =  0X15;
 static const int RBDSC_LOOKUP_REPLY    =  0X16;
 static const int RBDSC_READ_RADOS      =  0X17;
 
+static const int ASIO_ERROR_READ = 0X01;
+static const int ASIO_ERROR_WRITE = 0X02;
+static const int ASIO_ERROR_CONNECT = 0X03;
+static const int ASIO_ERROR_ACCEPT = 0X04;
+static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
 
+class ObjectCacheRequest;
 
-typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
 typedef std::function<void(std::string)> ClientProcessMsg;
-typedef uint8_t rbdsc_req_type;
-
-//TODO(): switch to bufferlist
-struct rbdsc_req_type_t {
-  rbdsc_req_type type;
-  uint64_t vol_size;
-  uint64_t offset;
-  uint64_t length;
-  char pool_name[256];
-  char vol_name[256];
-  char oid[256];
-
-  uint64_t size() {
-    return sizeof(rbdsc_req_type_t);
-  }
-
-  std::string to_buffer() {
-    std::stringstream ss;
-    ss << type;
-    ss << vol_size;
-    ss << offset;
-    ss << length;
-    ss << pool_name;
-    ss << vol_name;
-
-    return ss.str();
-  }
-};
-
-static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
 
 } // namespace immutable_obj_cache
 } // namespace ceph
index e59878730be1358afff82c5c5e606962278bf291..2f22e125527978c1a34841a73dc6cacf4627c0c3 100644 (file)
 namespace ceph {
 namespace immutable_obj_cache {
 
+// TODO : fix compile issue
+/*
 void ObjectCacheMsgHeader::encode(bufferlist& bl) const {
-  using ceph::encode;
-  ::encode(seq, bl);
-  ::encode(type, bl);
-  ::encode(version, bl);
-  ::encode(padding, bl);
-  ::encode(mid_len, bl);
-  ::encode(data_len, bl);
-  ::encode(reserved, bl);
+  ceph::encode(seq, bl);
+  ceph::encode(type, bl);
+  ceph::encode(version, bl);
+  ceph::encode(padding, bl);
+  ceph::encode(data_len, bl);
+  ceph::encode(reserved, bl);
 }
 
 void ObjectCacheMsgHeader::decode(bufferlist::const_iterator& it) {
-  using ceph::decode;
-  ::decode(seq, it);
-  ::decode(type, it);
-  ::decode(version, it);
-  ::decode(padding, it);
-  ::decode(mid_len, it);
-  ::decode(data_len, it);
-  ::decode(reserved, it);
+  ceph::decode(seq, it);
+  ceph::decode(type, it);
+  ceph::decode(version, it);
+  ceph::decode(padding, it);
+  ceph::decode(data_len, it);
+  ceph::decode(reserved, it);
 }
+*/
 
 } // namespace immutable_obj_cache
 } // namespace ceph
index c8ed029e9488db3437a2df4bbbbd708ce90ef675..f4b3c18b2cfd53314325f6f31e988330b15d031e 100644 (file)
@@ -5,6 +5,7 @@
 #define CEPH_CACHE_TYPES_H
 
 #include "include/encoding.h"
+#include "include/Context.h"
 
 namespace ceph {
 namespace immutable_obj_cache {
@@ -14,15 +15,29 @@ struct ObjectCacheMsgHeader {
     uint16_t type;                        /* msg type */
     uint16_t version;                     /* object cache version */
     uint32_t padding;
-    uint64_t mid_len;
     uint32_t data_len;
     uint32_t reserved;
 
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& it);
+    void encode(bufferlist& bl) const {
+      ceph::encode(seq, bl);
+      ceph::encode(type, bl);
+      ceph::encode(version, bl);
+      ceph::encode(padding, bl);
+      ceph::encode(data_len, bl);
+      ceph::encode(reserved, bl);
+    }
+
+    void decode(bufferlist::const_iterator& it) {
+      ceph::decode(seq, it);
+      ceph::decode(type, it);
+      ceph::decode(version, it);
+      ceph::decode(padding, it);
+      ceph::decode(data_len, it);
+      ceph::decode(reserved, it);
+    }
 };
 
-class ObjectCacheMsgMiddle {
+class ObjectCacheMsgData {
 public:
   uint64_t m_image_size;
   uint64_t m_read_offset;
@@ -31,8 +46,8 @@ public:
   std::string m_image_name;
   std::string m_oid;
 
-   ObjectCacheMsgMiddle(){}
-   ~ObjectCacheMsgMiddle(){}
+   ObjectCacheMsgData(){}
+   ~ObjectCacheMsgData(){}
 
    void encode(bufferlist& bl) {
      ceph::encode(m_image_size, bl);
@@ -57,77 +72,44 @@ public:
 class ObjectCacheRequest {
 public:
     ObjectCacheMsgHeader m_head;
-    ObjectCacheMsgMiddle m_mid;
-
+    ObjectCacheMsgData m_data;
     bufferlist m_head_buffer;
-    bufferlist m_mid_buffer;
     bufferlist m_data_buffer;
+    Context* m_on_finish;
+    GenContext<ObjectCacheRequest*>* m_process_msg;
 
     ObjectCacheRequest() {}
     ~ObjectCacheRequest() {}
-
     void encode() {
-      m_mid.encode(m_mid_buffer);
-
-      m_head.mid_len = m_mid_buffer.length();
+      m_data.encode(m_data_buffer);
+      m_head.data_len = m_data_buffer.length();
       m_head.data_len = m_data_buffer.length();
-
       assert(m_head_buffer.length() == 0);
       m_head.encode(m_head_buffer);
       assert(sizeof(ObjectCacheMsgHeader) == m_head_buffer.length());
     }
-
     bufferlist get_head_buffer() {
       return m_head_buffer;
     }
-
-    bufferlist get_mid_buffer() {
-      return m_mid_buffer;
-    }
-
     bufferlist get_data_buffer() {
       return m_data_buffer;
     }
 };
 
-// currently, just use this interface.
 inline ObjectCacheRequest* decode_object_cache_request(
-            ObjectCacheMsgHeader* head, bufferlist mid_buffer)
-{
+            ObjectCacheMsgHeader* head, bufferlist data_buffer) {
   ObjectCacheRequest* req = new ObjectCacheRequest();
-
-  // head
   req->m_head = *head;
-  assert(req->m_head.mid_len == mid_buffer.length());
-
-  // mid
-  req->m_mid.decode(mid_buffer);
-
+  assert(req->m_head.data_len == data_buffer.length());
+  req->m_data.decode(data_buffer);
   return req;
 }
 
 inline ObjectCacheRequest* decode_object_cache_request(
-             ObjectCacheMsgHeader* head, bufferlist& mid_buffer,
-             bufferlist& data_buffer)
-{
-  ObjectCacheRequest* req = decode_object_cache_request(head, mid_buffer);
-
-  // data
-  if(data_buffer.length() != 0) {
-    req->m_data_buffer = data_buffer;
-  }
-
-  return req;
+             bufferlist head_buffer, bufferlist data_buffer) {
+  return decode_object_cache_request((ObjectCacheMsgHeader*)(head_buffer.c_str()), data_buffer);
 }
 
-inline ObjectCacheRequest* decode_object_cache_request(bufferlist& head,
-                bufferlist& mid_buffer, bufferlist& data_buffer)
-{
-  assert(sizeof(ObjectCacheMsgHeader) == head.length());
-  return decode_object_cache_request((ObjectCacheMsgHeader*)(head.c_str()), mid_buffer, data_buffer);
-}
-
-
 } // namespace immutable_obj_cache
 } // namespace ceph
 #endif