]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools: use specific message for different ops in immutable obj cache daemon
authorYuan Zhou <yuan.zhou@intel.com>
Wed, 13 Feb 2019 14:24:58 +0000 (22:24 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:29 +0000 (00:16 +0800)
use different types of message for different ops to be more efficient

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
src/test/immutable_object_cache/test_DomainSocket.cc
src/test/immutable_object_cache/test_message.cc
src/test/immutable_object_cache/test_multi_session.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/Types.cc
src/tools/immutable_object_cache/Types.h

index 0ed5b160304db07c38c3aebd809ce8c09e542bf0..17a442d13ff62bdcc9746edbf9fbc5f1613b2f0b 100644 (file)
@@ -79,17 +79,26 @@ public:
 
   void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (req->m_data.type) {
+    switch (req->type) {
       case RBDSC_REGISTER: {
-        req->m_data.type = RBDSC_REGISTER_REPLY;
+        ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
+        data->type = RBDSC_REGISTER_REPLY;
+        req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
         m_cache_server->send(session_id, req);
         break;
       }
       case RBDSC_READ: {
-        if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) {
-          req->m_data.type = RBDSC_READ_RADOS;
+        ObjectCacheReadData* req_data = (ObjectCacheReadData*)req->m_data;
+        if (m_hit_entry_set.find(req_data->m_oid) == m_hit_entry_set.end()) {
+          ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
+          data->type = RBDSC_READ_RADOS;
+          data->seq = req_data->seq;
+          req = encode_object_cache_request(data, RBDSC_READ_RADOS);
         } else {
-          req->m_data.type = RBDSC_READ_REPLY;
+          ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
+          data->type = RBDSC_READ_REPLY;
+          data->seq = req_data->seq;
+          req = encode_object_cache_request(data, RBDSC_READ_REPLY);
         }
         m_cache_server->send(session_id, req);
         break;
@@ -130,7 +139,7 @@ public:
     bool hit;
     auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
         ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){
-       hit = ack->m_data.type == RBDSC_READ_REPLY;
+       hit = ack->type == RBDSC_READ_REPLY;
        m_wait_event.signal();
     });
     m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, ctx);
index 2d17f3940254fb50cf98a87e0825a43b5a83ebae..fe27cec0efc9c4b8bd2fe50edeef5bc145243aa2 100644 (file)
@@ -1,5 +1,6 @@
 #include "gtest/gtest.h"
 #include "tools/immutable_object_cache/Types.h"
+#include "tools/immutable_object_cache/SocketCommon.h"
 
 using namespace ceph::immutable_obj_cache;
 
@@ -9,35 +10,40 @@ TEST(test_for_message, test_1)
   std::string oid_name("this is a oid name");
   std::string cache_file_path("/temp/ceph_immutable_object_cache");
 
-  ObjectCacheRequest req;
+  ObjectCacheReadData data;
 
-  req.m_data.seq = 1;
-  req.m_data.type = 2;
-  req.m_data.m_read_offset = 222222;
-  req.m_data.m_read_len = 333333;
-  req.m_data.m_pool_id = 444444;
-  req.m_data.m_snap_id = 555555;
-  req.m_data.m_oid = oid_name;
-  req.m_data.m_pool_namespace = pool_nspace;
-  req.m_data.m_cache_path = cache_file_path;
+  data.seq = 1UL;
+  data.type = RBDSC_READ;
+  data.m_read_offset = 222222;
+  data.m_read_len = 333333;
+  data.m_pool_id = 444444;
+  data.m_snap_id = 555555;
+  data.m_oid = oid_name;
+  data.m_pool_namespace = pool_nspace;
 
   // ObjectRequest --> bufferlist
-  req.encode();
+  ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
 
-
-  // bufferlist --> ObjectCacheRequest
-  auto data_bl = req.get_data_buffer();
+  auto data_bl = req->get_data_buffer();
   uint32_t data_len = get_data_len(data_bl.c_str());
   ASSERT_EQ(data_bl.length(), data_len + get_header_size());
+  ASSERT_TRUE(data_bl.c_str() != nullptr);
 
+  // bufferlist --> ObjectCacheRequest
   ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl);
+  ObjectCacheReadData* reply_data = (ObjectCacheReadData*)(req_decode->m_data);
+
+  ASSERT_EQ(req_decode->type, RBDSC_READ);
 
-  ASSERT_EQ(req_decode->m_data.seq, 1);
-  ASSERT_EQ(req_decode->m_data.m_read_offset, 222222);
-  ASSERT_EQ(req_decode->m_data.m_read_len, 333333);
-  ASSERT_EQ(req_decode->m_data.m_pool_namespace, pool_nspace);
-  ASSERT_EQ(req_decode->m_data.m_cache_path, cache_file_path);
-  ASSERT_EQ(req_decode->m_data.m_oid, oid_name);
+  ASSERT_EQ(reply_data->seq, 1UL);
+  ASSERT_EQ(reply_data->type, RBDSC_READ);
+  ASSERT_EQ(reply_data->m_read_offset, 222222UL);
+  ASSERT_EQ(reply_data->m_read_len, 333333UL);
+  ASSERT_EQ(reply_data->m_pool_id, 444444UL);
+  ASSERT_EQ(reply_data->m_snap_id, 555555UL);
+  ASSERT_EQ(reply_data->m_pool_namespace, pool_nspace);
+  ASSERT_EQ(reply_data->m_oid, oid_name);
 
+  delete req;
   delete req_decode;
 }
index e0f6d9535911c4013b0bd088be6028b8d20da935..008a3ba6d054dd2840e44d84d7e945faac944c9a 100644 (file)
@@ -87,14 +87,19 @@ public:
 
   void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (req->m_data.type) {
+    switch (req->type) {
       case RBDSC_REGISTER: {
-        req->m_data.type = RBDSC_REGISTER_REPLY;
+        ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
+        data->type = RBDSC_REGISTER_REPLY;
+        req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
         m_cache_server->send(session_id, req);
         break;
       }
       case RBDSC_READ: {
-        req->m_data.type = RBDSC_READ_REPLY;
+        ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
+        data->type = RBDSC_READ_REPLY;
+        data->seq = req->seq;
+        req = encode_object_cache_request(data, RBDSC_READ_REPLY);
         m_cache_server->send(session_id, req);
         break;
       }
index 67ce958f232bcc086b64433214d927aa584a4324..20d5426b37f1b922326e6f6014ca1c801b3910bd 100644 (file)
@@ -88,22 +88,22 @@ namespace immutable_obj_cache {
   void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
                                   std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
 
-    ObjectCacheRequest* req = new ObjectCacheRequest();
-    req->m_data.type = RBDSC_READ;
-    req->m_data.seq = ++m_sequence_id;
-
-    req->m_data.m_pool_id = pool_id;
-    req->m_data.m_snap_id = snap_id;
-    req->m_data.m_pool_namespace = pool_nspace;
-    req->m_data.m_oid = oid;
+    ObjectCacheReadData data;
+    data.type = RBDSC_READ;
+    data.seq = ++m_sequence_id;
+
+    data.m_pool_id = pool_id;
+    data.m_snap_id = snap_id;
+    data.m_pool_namespace = pool_nspace;
+    data.m_oid = oid;
+    ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
     req->m_process_msg = on_finish;
-    req->encode();
 
     {
       Mutex::Locker locker(m_lock);
       m_outcoming_bl.append(req->get_data_buffer());
-      ceph_assert(m_seq_to_req.find(req->m_data.seq) == m_seq_to_req.end());
-      m_seq_to_req[req->m_data.seq] = req;
+      ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end());
+      m_seq_to_req[data.seq] = req;
     }
 
     // try to send message to server.
@@ -227,7 +227,7 @@ namespace immutable_obj_cache {
     data_buffer.clear();
     ceph_assert(data_buffer.length() == 0);
 
-    process(reply, reply->m_data.seq);
+    process(reply, reply->seq);
 
     {
       Mutex::Locker locker(m_lock);
@@ -331,7 +331,7 @@ namespace immutable_obj_cache {
     {
       Mutex::Locker locker(m_lock);
       for(auto it : m_seq_to_req) {
-        it.second->m_data.type = RBDSC_READ_RADOS;
+        it.second->type = RBDSC_READ_RADOS;
         it.second->m_process_msg->complete(it.second);
       }
       m_seq_to_req.clear();
@@ -343,13 +343,13 @@ namespace immutable_obj_cache {
   }
 
   int CacheClient::register_client(Context* on_finish) {
-    ObjectCacheRequest* message = new ObjectCacheRequest();
-    message->m_data.seq = m_sequence_id++;
-    message->m_data.type = RBDSC_REGISTER;
-    message->encode();
+    ObjectCacheRegData data;
+    data.seq = m_sequence_id++;
+    data.type = RBDSC_REGISTER;
+    ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER);
 
     bufferlist bl;
-    bl.append(message->get_data_buffer());
+    bl.append(reg_req->get_data_buffer());
 
     uint64_t ret;
     boost::system::error_code ec;
@@ -361,6 +361,7 @@ namespace immutable_obj_cache {
       fault(ASIO_ERROR_WRITE, ec);
       return -1;
     }
+    delete reg_req;
 
     ret = boost::asio::read(m_dm_socket,
       boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
@@ -382,7 +383,7 @@ namespace immutable_obj_cache {
     data_buffer.append(m_bp_header);
     data_buffer.append(std::move(bp_data));
     ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
-    if (req->m_data.type == RBDSC_REGISTER_REPLY) {
+    if (req->type == RBDSC_REGISTER_REPLY) {
       on_finish->complete(true);
     } else {
       on_finish->complete(false);
index e7b06c4d9f3278ec4eaf11f842f2bdfebb507730..dd4ef3f3ed1359dfceba17fda517d9739cf1b314 100644 (file)
@@ -83,28 +83,39 @@ void CacheController::run() {
 void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
   ldout(m_cct, 20) << dendl;
 
-  switch (req->m_data.type) {
+  switch (req->type) {
     case RBDSC_REGISTER: {
       // TODO(): skip register and allow clients to lookup directly
-      req->m_data.type = RBDSC_REGISTER_REPLY;
-      m_cache_server->send(session_id, req);
 
+      ObjectCacheRegReplyData data;
+      data.type = RBDSC_REGISTER_REPLY;
+      data.seq = req->seq;
+      req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY);
+      m_cache_server->send(session_id, req);
       break;
     }
     case RBDSC_READ: {
       // lookup object in local cache store
-      int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_namespace,
-                                                    req->m_data.m_pool_id,
-                                                    req->m_data.m_snap_id,
-                                                    req->m_data.m_oid,
-                                                    req->m_data.m_cache_path);
+      ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data);
+      std::string cache_path;
+      int ret = m_object_cache_store->lookup_object(data->m_pool_namespace,
+                                                    data->m_pool_id,
+                                                    data->m_snap_id,
+                                                    data->m_oid,
+                                                    cache_path);
       if (ret < 0) {
-        req->m_data.type = RBDSC_READ_RADOS;
+        ObjectCacheReadRadosData reply_data;
+        reply_data.type = RBDSC_READ_RADOS;
+        reply_data.seq = req->seq;
+        req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS);
       } else {
-        req->m_data.type = RBDSC_READ_REPLY;
+        ObjectCacheReadReplyData reply_data;
+        reply_data.m_cache_path = cache_path;
+        reply_data.type = RBDSC_READ_REPLY;
+        reply_data.seq = req->seq;
+        req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY);
       }
       m_cache_server->send(session_id, req);
-
       break;
     }
     default:
index c188e8e0b612158117533d1f00873a74eee17599..c72305944fc478de14b423c35e7520baa190b7a5 100644 (file)
@@ -91,6 +91,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
 
   bl_data.append(m_bp_header);
   bl_data.append(std::move(bp));
+
   ObjectCacheRequest* req = decode_object_cache_request(bl_data);
   process(req);
   read_request_header();
@@ -103,8 +104,6 @@ void CacheSession::process(ObjectCacheRequest* req) {
 
 void CacheSession::send(ObjectCacheRequest* reply) {
   ldout(cct, 20) << dendl;
-  reply->m_data_buffer.clear();
-  reply->encode();
   bufferlist bl;
   bl.append(reply->get_data_buffer());
 
index bd904449113da2f46706c6cdc25a905c128b7edc..b1a00a5a21d8cd65bc091b91db341100b77383a1 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "Types.h"
+#include "SocketCommon.h"
 
 #define dout_subsys ceph_subsys_immutable_obj_cache
 #undef dout_prefix
 namespace ceph {
 namespace immutable_obj_cache {
 
-void ObjectCacheMsgData::encode(bufferlist& bl) {
+void ObjectCacheRegData::encode(bufferlist& bl) {
   ENCODE_START(1, 1, bl);
+  ceph::encode(type, bl);
   ceph::encode(seq, bl);
+  ENCODE_FINISH(bl);
+}
+
+void ObjectCacheRegData::decode(bufferlist& bl) {
+  auto i = bl.cbegin();
+  DECODE_START(1, i);
+  ceph::decode(type, i);
+  ceph::decode(seq, i);
+  DECODE_FINISH(i);
+}
+
+void ObjectCacheRegReplyData::encode(bufferlist& bl) {
+  ENCODE_START(1, 1, bl);
   ceph::encode(type, bl);
+  ceph::encode(seq, bl);
+  ENCODE_FINISH(bl);
+}
+
+void ObjectCacheRegReplyData::decode(bufferlist& bl) {
+  auto i = bl.cbegin();
+  DECODE_START(1, i);
+  ceph::decode(type, i);
+  ceph::decode(seq, i);
+  DECODE_FINISH(i);
+}
+
+void ObjectCacheReadData::encode(bufferlist& bl) {
+  ENCODE_START(1, 1, bl);
+  ceph::encode(type, bl);
+  ceph::encode(seq, bl);
   ceph::encode(m_read_offset, bl);
   ceph::encode(m_read_len, bl);
   ceph::encode(m_pool_id, bl);
   ceph::encode(m_snap_id, bl);
   ceph::encode(m_oid, bl);
   ceph::encode(m_pool_namespace, bl);
-  ceph::encode(m_cache_path, bl);
   ENCODE_FINISH(bl);
 }
 
-void ObjectCacheMsgData::decode(bufferlist& bl) {
+void ObjectCacheReadData::decode(bufferlist& bl) {
   auto i = bl.cbegin();
   DECODE_START(1, i);
-  ceph::decode(seq, i);
   ceph::decode(type, i);
+  ceph::decode(seq, i);
   ceph::decode(m_read_offset, i);
   ceph::decode(m_read_len, i);
   ceph::decode(m_pool_id, i);
   ceph::decode(m_snap_id, i);
   ceph::decode(m_oid, i);
   ceph::decode(m_pool_namespace, i);
+  DECODE_FINISH(i);
+}
+
+void ObjectCacheReadReplyData::encode(bufferlist& bl) {
+  ENCODE_START(1, 1, bl);
+  ceph::encode(type, bl);
+  ceph::encode(seq, bl);
+  ceph::encode(m_cache_path, bl);
+  ENCODE_FINISH(bl);
+}
+
+void ObjectCacheReadReplyData::decode(bufferlist& bl) {
+  auto i = bl.cbegin();
+  DECODE_START(1, i);
+  ceph::decode(type, i);
+  ceph::decode(seq, i);
   ceph::decode(m_cache_path, i);
   DECODE_FINISH(i);
 }
 
-void ObjectCacheRequest::encode() {
-  m_data.encode(m_data_buffer);
+void ObjectCacheReadRadosData::encode(bufferlist& bl) {
+  ENCODE_START(1, 1, bl);
+  ceph::encode(type, bl);
+  ceph::encode(seq, bl);
+  ENCODE_FINISH(bl);
+}
+
+void ObjectCacheReadRadosData::decode(bufferlist& bl) {
+  auto i = bl.cbegin();
+  DECODE_START(1, i);
+  ceph::decode(type, i);
+  ceph::decode(seq, i);
+  DECODE_FINISH(i);
 }
 
 uint8_t get_header_size() {
-  //return sizeof(ObjectCacheMsgHeader);
-  return 6;
+  return 6; //uint8_t + uint8_t + uint32_t
 }
 
 struct encode_header{
@@ -59,13 +115,103 @@ uint32_t get_data_len(char* buf) {
   return header->len;
 }
 
+uint16_t get_data_type(bufferlist buf) {
+  uint16_t type;
+  auto i = buf.cbegin();
+  DECODE_START(1, i);
+  decode(type, i);
+  DECODE_FINISH(i);
+  return type;
+}
+
 bufferlist ObjectCacheRequest::get_data_buffer() {
   return m_data_buffer;
 }
 
+ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) {
+  ObjectCacheRequest* req = new ObjectCacheRequest();
+
+  switch(type) {
+    case RBDSC_REGISTER: {
+      ObjectCacheRegData* data = (ObjectCacheRegData*)m_data;
+      data->encode(req->m_data_buffer);
+      break;
+    }
+    case RBDSC_REGISTER_REPLY: {
+      ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data;
+      data->encode(req->m_data_buffer);
+      break;
+    }
+    case RBDSC_READ: {
+      ObjectCacheReadData* data = (ObjectCacheReadData*)m_data;
+      data->encode(req->m_data_buffer);
+      break;
+    }
+    case RBDSC_READ_RADOS: {
+      ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data;
+      data->encode(req->m_data_buffer);
+      break;
+    }
+    case RBDSC_READ_REPLY: {
+      ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data;
+      data->encode(req->m_data_buffer);
+      break;
+    }
+    default:
+      ceph_assert(0);
+  }
+
+  req->type = type;
+  return req;
+}
+
 ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) {
   ObjectCacheRequest* req = new ObjectCacheRequest();
-  req->m_data.decode(data_buffer);
+  uint16_t type = get_data_type(data_buffer);
+  uint64_t seq;
+
+  switch(type) {
+    case RBDSC_REGISTER: {
+      ObjectCacheRegData* data = new ObjectCacheRegData();
+      data->decode(data_buffer);
+      seq = data->seq;
+      req->m_data = data;
+      break;
+    }
+    case RBDSC_READ: {
+      ObjectCacheReadData* data = new ObjectCacheReadData();
+      data->decode(data_buffer);
+      seq = data->seq;
+      req->m_data = data;
+      break;
+    }
+    case RBDSC_REGISTER_REPLY: {
+      ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
+      data->decode(data_buffer);
+      seq = data->seq;
+      req->m_data = data;
+      break;
+    }
+    case RBDSC_READ_REPLY: {
+      ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
+      data->decode(data_buffer);
+      seq = data->seq;
+      req->m_data = data;
+      break;
+    }
+    case RBDSC_READ_RADOS: {
+      ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
+      data->decode(data_buffer);
+      seq = data->seq;
+      req->m_data = data;
+      break;
+    }
+    default:
+      ceph_assert(0);
+  }
+
+  req->type = type;
+  req->seq = seq;
   return req;
 }
 
index ef487e4f2159d9fbcba947e9fb56f15d9424725e..4d03dfa199323802b4f1d63e667227393a946121 100644 (file)
 namespace ceph {
 namespace immutable_obj_cache {
 
-class ObjectCacheMsgData {
+class ObjectCacheRegData {
 public:
-  uint64_t seq;                         /* sequence id */
-  uint16_t type;                        /* msg type */
+  uint16_t type;
+  uint64_t seq;
+
+  void encode(bufferlist& bl);
+  void decode(bufferlist& bl);
+};
+
+class ObjectCacheRegReplyData {
+public:
+  uint16_t type;
+  uint64_t seq;
+
+  void encode(bufferlist& bl);
+  void decode(bufferlist& bl);
+};
+
+class ObjectCacheReadData {
+public:
+  uint16_t type;
+  uint64_t seq;
   uint64_t m_read_offset;
   uint64_t m_read_len;
   uint64_t m_pool_id;
   uint64_t m_snap_id;
   std::string m_oid;
   std::string m_pool_namespace;
+
+  void encode(bufferlist& bl);
+  void decode(bufferlist& bl);
+};
+
+class ObjectCacheReadReplyData {
+public:
+  uint16_t type;
+  uint64_t seq;
   std::string m_cache_path;
 
   void encode(bufferlist& bl);
   void decode(bufferlist& bl);
 };
 
+class ObjectCacheReadRadosData {
+public:
+  uint16_t type;
+  uint64_t seq;
+
+  void encode(bufferlist& bl);
+  void decode(bufferlist& bl);
+};
+
 class ObjectCacheRequest {
 public:
-    ObjectCacheMsgData m_data;
+    uint64_t seq;
+    uint16_t type;
+    void* m_data;
     bufferlist m_data_buffer;
     GenContext<ObjectCacheRequest*>* m_process_msg;
 
-    void encode();
     bufferlist get_data_buffer();
-
 };
 
 uint8_t get_header_size();
 uint32_t get_data_len(char* buf);
 
+ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type);
 ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer);
 
 } // namespace immutable_obj_cache