]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: refactor ObjectCacheRequest of RO
authorshangdehao1 <dehao.shang@intel.com>
Tue, 26 Feb 2019 17:18:37 +0000 (01:18 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:30 +0000 (00:16 +0800)
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
src/test/immutable_object_cache/test_DomainSocket.cc
src/test/immutable_object_cache/test_message.cc
src/test/immutable_object_cache/test_multi_session.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/Types.cc
src/tools/immutable_object_cache/Types.h

index 17a442d13ff62bdcc9746edbf9fbc5f1613b2f0b..af6b7f91fe5340af6cfaf020d4334211efd88bf2 100644 (file)
@@ -79,28 +79,21 @@ public:
 
   void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (req->type) {
+    switch (req->get_request_type()) {
       case RBDSC_REGISTER: {
-        ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
-        data->type = RBDSC_REGISTER_REPLY;
-        req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
-        m_cache_server->send(session_id, req);
+        ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+        m_cache_server->send(session_id, reply);
         break;
       }
       case RBDSC_READ: {
-        ObjectCacheReadData* req_data = (ObjectCacheReadData*)req->m_data;
-        if (m_hit_entry_set.find(req_data->m_oid) == m_hit_entry_set.end()) {
-          ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
-          data->type = RBDSC_READ_RADOS;
-          data->seq = req_data->seq;
-          req = encode_object_cache_request(data, RBDSC_READ_RADOS);
+        ObjectCacheReadData* read_req = (ObjectCacheReadData*)req;
+        ObjectCacheRequest* reply = nullptr;
+        if (m_hit_entry_set.find(read_req->m_oid) == m_hit_entry_set.end()) {
+          reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
         } else {
-          ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
-          data->type = RBDSC_READ_REPLY;
-          data->seq = req_data->seq;
-          req = encode_object_cache_request(data, RBDSC_READ_REPLY);
+          reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/temp/cache/path");
         }
-        m_cache_server->send(session_id, req);
+        m_cache_server->send(session_id, reply);
         break;
       }
     }
index fe27cec0efc9c4b8bd2fe50edeef5bc145243aa2..0d3a31f23b87c5ba8d3b8105a8846a81f22f1e50 100644 (file)
@@ -10,39 +10,38 @@ TEST(test_for_message, test_1)
   std::string oid_name("this is a oid name");
   std::string cache_file_path("/temp/ceph_immutable_object_cache");
 
-  ObjectCacheReadData data;
-
-  data.seq = 1UL;
-  data.type = RBDSC_READ;
-  data.m_read_offset = 222222;
-  data.m_read_len = 333333;
-  data.m_pool_id = 444444;
-  data.m_snap_id = 555555;
-  data.m_oid = oid_name;
-  data.m_pool_namespace = pool_nspace;
+  uint16_t type = RBDSC_READ;
+  uint64_t seq = 123456UL;
+  uint64_t read_offset = 222222UL;
+  uint64_t read_len = 333333UL;
+  uint64_t pool_id = 444444UL;
+  uint64_t snap_id = 555555UL;
 
   // ObjectRequest --> bufferlist
-  ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
+  ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len,
+                                    pool_id, snap_id, oid_name, pool_nspace);
+  req->encode();
+  auto payload_bl = req->get_payload_bufferlist();
 
-  auto data_bl = req->get_data_buffer();
-  uint32_t data_len = get_data_len(data_bl.c_str());
-  ASSERT_EQ(data_bl.length(), data_len + get_header_size());
-  ASSERT_TRUE(data_bl.c_str() != nullptr);
+  uint32_t data_len = get_data_len(payload_bl.c_str());
+  ASSERT_EQ(payload_bl.length(), data_len + get_header_size());
+  ASSERT_TRUE(payload_bl.c_str() != nullptr);
 
   // bufferlist --> ObjectCacheRequest
-  ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl);
-  ObjectCacheReadData* reply_data = (ObjectCacheReadData*)(req_decode->m_data);
+  ObjectCacheRequest* req_decode = decode_object_cache_request(payload_bl);
 
-  ASSERT_EQ(req_decode->type, RBDSC_READ);
+  ASSERT_EQ(req_decode->get_request_type(), RBDSC_READ);
 
-  ASSERT_EQ(reply_data->seq, 1UL);
-  ASSERT_EQ(reply_data->type, RBDSC_READ);
-  ASSERT_EQ(reply_data->m_read_offset, 222222UL);
-  ASSERT_EQ(reply_data->m_read_len, 333333UL);
-  ASSERT_EQ(reply_data->m_pool_id, 444444UL);
-  ASSERT_EQ(reply_data->m_snap_id, 555555UL);
-  ASSERT_EQ(reply_data->m_pool_namespace, pool_nspace);
-  ASSERT_EQ(reply_data->m_oid, oid_name);
+  ASSERT_EQ(req_decode->type, RBDSC_READ);
+  ASSERT_EQ(req_decode->seq, 123456UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->type, RBDSC_READ);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->seq, 123456UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_offset, 222222UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_len, 333333UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_id, 444444UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_snap_id, 555555UL);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_namespace, pool_nspace);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_oid, oid_name);
 
   delete req;
   delete req_decode;
index 008a3ba6d054dd2840e44d84d7e945faac944c9a..3ebdd4583ee8a3450555e49e3f0577cd2c68fde6 100644 (file)
@@ -87,20 +87,17 @@ public:
 
   void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
 
-    switch (req->type) {
+    switch (req->get_request_type()) {
       case RBDSC_REGISTER: {
-        ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
-        data->type = RBDSC_REGISTER_REPLY;
-        req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
-        m_cache_server->send(session_id, req);
+        ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
+                                                                req->seq);
+        m_cache_server->send(session_id, reply);
         break;
       }
       case RBDSC_READ: {
-        ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
-        data->type = RBDSC_READ_REPLY;
-        data->seq = req->seq;
-        req = encode_object_cache_request(data, RBDSC_READ_REPLY);
-        m_cache_server->send(session_id, req);
+        ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+                                                                req->seq);
+        m_cache_server->send(session_id, reply);
         break;
       }
     }
index 20d5426b37f1b922326e6f6014ca1c801b3910bd..e2ab8e6c0394ebda30a0b729764869af1ec19d2a 100644 (file)
@@ -25,7 +25,7 @@ namespace immutable_obj_cache {
     if (m_use_dedicated_worker) {
       m_worker = new boost::asio::io_service();
       m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
-      for(uint64_t i = 0; i < m_worker_thread_num; i++) {
+      for (uint64_t i = 0; i < m_worker_thread_num; i++) {
         std::thread* thd = new std::thread([this](){m_worker->run();});
         m_worker_threads.push_back(thd);
       }
@@ -55,7 +55,7 @@ namespace immutable_obj_cache {
     }
     if (m_use_dedicated_worker) {
       m_worker->stop();
-      for(auto thd : m_worker_threads) {
+      for (auto thd : m_worker_threads) {
         thd->join();
         delete thd;
       }
@@ -88,22 +88,16 @@ namespace immutable_obj_cache {
   void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
                                   std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
 
-    ObjectCacheReadData data;
-    data.type = RBDSC_READ;
-    data.seq = ++m_sequence_id;
-
-    data.m_pool_id = pool_id;
-    data.m_snap_id = snap_id;
-    data.m_pool_namespace = pool_nspace;
-    data.m_oid = oid;
-    ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
+    ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0,
+                                                      pool_id, snap_id, oid, pool_nspace);
     req->m_process_msg = on_finish;
+    req->encode();
 
     {
       Mutex::Locker locker(m_lock);
-      m_outcoming_bl.append(req->get_data_buffer());
-      ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end());
-      m_seq_to_req[data.seq] = req;
+      m_outcoming_bl.append(req->get_payload_bufferlist());
+      ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
+      m_seq_to_req[req->seq] = req;
     }
 
     // try to send message to server.
@@ -330,7 +324,7 @@ namespace immutable_obj_cache {
     // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
     {
       Mutex::Locker locker(m_lock);
-      for(auto it : m_seq_to_req) {
+      for (auto it : m_seq_to_req) {
         it.second->type = RBDSC_READ_RADOS;
         it.second->m_process_msg->complete(it.second);
       }
@@ -343,13 +337,11 @@ namespace immutable_obj_cache {
   }
 
   int CacheClient::register_client(Context* on_finish) {
-    ObjectCacheRegData data;
-    data.seq = m_sequence_id++;
-    data.type = RBDSC_REGISTER;
-    ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER);
+    ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++);
+    reg_req->encode();
 
     bufferlist bl;
-    bl.append(reg_req->get_data_buffer());
+    bl.append(reg_req->get_payload_bufferlist());
 
     uint64_t ret;
     boost::system::error_code ec;
index 213b2d36b50078ecd04af330fc14818c8af34a67..319631d34635289a1333fe5a9628677317736a54 100644 (file)
@@ -83,39 +83,30 @@ void CacheController::run() {
 void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
   ldout(m_cct, 20) << dendl;
 
-  switch (req->type) {
+  switch (req->get_request_type()) {
     case RBDSC_REGISTER: {
       // TODO(): skip register and allow clients to lookup directly
 
-      ObjectCacheRegReplyData data;
-      data.type = RBDSC_REGISTER_REPLY;
-      data.seq = req->seq;
-      req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY);
-      m_cache_server->send(session_id, req);
+      ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+      m_cache_server->send(session_id, reply);
       break;
     }
     case RBDSC_READ: {
       // lookup object in local cache store
-      ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data);
       std::string cache_path;
-      int ret = m_object_cache_store->lookup_object(data->m_pool_namespace,
-                                                    data->m_pool_id,
-                                                    data->m_snap_id,
-                                                    data->m_oid,
+      ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req;
+      int ret = m_object_cache_store->lookup_object(req_read_data->m_pool_namespace,
+                                                    req_read_data->m_pool_id,
+                                                    req_read_data->m_snap_id,
+                                                    req_read_data->m_oid,
                                                     cache_path);
+      ObjectCacheRequest* reply = nullptr;
       if (ret != OBJ_CACHE_PROMOTED) {
-        ObjectCacheReadRadosData reply_data;
-        reply_data.type = RBDSC_READ_RADOS;
-        reply_data.seq = req->seq;
-        req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS);
+        reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
       } else {
-        ObjectCacheReadReplyData reply_data;
-        reply_data.m_cache_path = cache_path;
-        reply_data.type = RBDSC_READ_REPLY;
-        reply_data.seq = req->seq;
-        req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY);
+        reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path);
       }
-      m_cache_server->send(session_id, req);
+      m_cache_server->send(session_id, reply);
       break;
     }
     default:
index c72305944fc478de14b423c35e7520baa190b7a5..e4f76a7c806bbbdcbe009f8956eb59f5d72c0993 100644 (file)
@@ -94,6 +94,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
 
   ObjectCacheRequest* req = decode_object_cache_request(bl_data);
   process(req);
+  delete req;
   read_request_header();
 }
 
@@ -105,7 +106,8 @@ void CacheSession::process(ObjectCacheRequest* req) {
 void CacheSession::send(ObjectCacheRequest* reply) {
   ldout(cct, 20) << dendl;
   bufferlist bl;
-  bl.append(reply->get_data_buffer());
+  reply->encode();
+  bl.append(reply->get_payload_bufferlist());
 
   boost::asio::async_write(m_dm_socket,
         boost::asio::buffer(bl.c_str(), bl.length()),
index b1a00a5a21d8cd65bc091b91db341100b77383a1..2e547b11b5e0d0989d8996243b8f145c083bbe62 100644 (file)
 namespace ceph {
 namespace immutable_obj_cache {
 
-void ObjectCacheRegData::encode(bufferlist& bl) {
-  ENCODE_START(1, 1, bl);
-  ceph::encode(type, bl);
-  ceph::encode(seq, bl);
-  ENCODE_FINISH(bl);
+ObjectCacheRequest::ObjectCacheRequest(){}
+ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
+  : type(t), seq(s) {}
+ObjectCacheRequest::~ObjectCacheRequest(){}
+
+void ObjectCacheRequest::encode() {
+  ENCODE_START(1, 1, m_payload);
+  ceph::encode(type, m_payload);
+  ceph::encode(seq, m_payload);
+  if (!payload_empty()) {
+    encode_payload();
+  }
+  ENCODE_FINISH(m_payload);
 }
 
-void ObjectCacheRegData::decode(bufferlist& bl) {
+void ObjectCacheRequest::decode(bufferlist& bl) {
   auto i = bl.cbegin();
   DECODE_START(1, i);
   ceph::decode(type, i);
   ceph::decode(seq, i);
+  if (!payload_empty()) {
+    decode_payload(i);
+  }
   DECODE_FINISH(i);
 }
 
-void ObjectCacheRegReplyData::encode(bufferlist& bl) {
-  ENCODE_START(1, 1, bl);
-  ceph::encode(type, bl);
-  ceph::encode(seq, bl);
-  ENCODE_FINISH(bl);
-}
+ObjectCacheRegData::ObjectCacheRegData() {}
+ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s)
+  : ObjectCacheRequest(t, s) {}
 
-void ObjectCacheRegReplyData::decode(bufferlist& bl) {
-  auto i = bl.cbegin();
-  DECODE_START(1, i);
-  ceph::decode(type, i);
-  ceph::decode(seq, i);
-  DECODE_FINISH(i);
-}
+ObjectCacheRegData::~ObjectCacheRegData() {}
+
+void ObjectCacheRegData::encode_payload() {}
+
+void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {}
+
+ObjectCacheRegReplyData::ObjectCacheRegReplyData() {}
+ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s)
+  : ObjectCacheRequest(t, s) {}
+
+ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
+
+void ObjectCacheRegReplyData::encode_payload() {}
+
+void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
+                                         uint64_t read_offset, uint64_t read_len,
+                                         uint64_t pool_id, uint64_t snap_id,
+                                         std::string oid, std::string pool_namespace)
+  : ObjectCacheRequest(t, s), m_read_offset(read_offset),
+    m_read_len(read_len), m_pool_id(pool_id), m_snap_id(snap_id),
+    m_oid(oid), m_pool_namespace(pool_namespace)
+{}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
+  : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadData::~ObjectCacheReadData() {}
 
-void ObjectCacheReadData::encode(bufferlist& bl) {
-  ENCODE_START(1, 1, bl);
-  ceph::encode(type, bl);
-  ceph::encode(seq, bl);
-  ceph::encode(m_read_offset, bl);
-  ceph::encode(m_read_len, bl);
-  ceph::encode(m_pool_id, bl);
-  ceph::encode(m_snap_id, bl);
-  ceph::encode(m_oid, bl);
-  ceph::encode(m_pool_namespace, bl);
-  ENCODE_FINISH(bl);
+void ObjectCacheReadData::encode_payload() {
+  ceph::encode(m_read_offset, m_payload);
+  ceph::encode(m_read_len, m_payload);
+  ceph::encode(m_pool_id, m_payload);
+  ceph::encode(m_snap_id, m_payload);
+  ceph::encode(m_oid, m_payload);
+  ceph::encode(m_pool_namespace, m_payload);
 }
 
-void ObjectCacheReadData::decode(bufferlist& bl) {
-  auto i = bl.cbegin();
-  DECODE_START(1, i);
-  ceph::decode(type, i);
-  ceph::decode(seq, i);
+void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
   ceph::decode(m_read_offset, i);
   ceph::decode(m_read_len, i);
   ceph::decode(m_pool_id, i);
   ceph::decode(m_snap_id, i);
   ceph::decode(m_oid, i);
   ceph::decode(m_pool_namespace, i);
-  DECODE_FINISH(i);
 }
 
-void ObjectCacheReadReplyData::encode(bufferlist& bl) {
-  ENCODE_START(1, 1, bl);
-  ceph::encode(type, bl);
-  ceph::encode(seq, bl);
-  ceph::encode(m_cache_path, bl);
-  ENCODE_FINISH(bl);
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path)
+  : ObjectCacheRequest(t, s), m_cache_path(cache_path) {}
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
+  : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {}
+
+void ObjectCacheReadReplyData::encode_payload() {
+  ceph::encode(m_cache_path, m_payload);
 }
 
-void ObjectCacheReadReplyData::decode(bufferlist& bl) {
-  auto i = bl.cbegin();
-  DECODE_START(1, i);
-  ceph::decode(type, i);
-  ceph::decode(seq, i);
+void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) {
   ceph::decode(m_cache_path, i);
-  DECODE_FINISH(i);
 }
 
-void ObjectCacheReadRadosData::encode(bufferlist& bl) {
-  ENCODE_START(1, 1, bl);
-  ceph::encode(type, bl);
-  ceph::encode(seq, bl);
-  ENCODE_FINISH(bl);
-}
+ObjectCacheReadRadosData::ObjectCacheReadRadosData() {}
+ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s)
+  : ObjectCacheRequest(t, s) {}
 
-void ObjectCacheReadRadosData::decode(bufferlist& bl) {
-  auto i = bl.cbegin();
-  DECODE_START(1, i);
-  ceph::decode(type, i);
-  ceph::decode(seq, i);
-  DECODE_FINISH(i);
-}
+ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
 
-uint8_t get_header_size() {
-  return 6; //uint8_t + uint8_t + uint32_t
-}
+void ObjectCacheReadRadosData::encode_payload() {}
 
-struct encode_header{
-  uint8_t v;
-  uint8_t c_v;
-  uint32_t len;
-}__attribute__((packed));
+void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
 
-uint32_t get_data_len(char* buf) {
-  encode_header* header = (encode_header*)buf;
-  return header->len;
-}
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) 
+{
+  ObjectCacheRequest* req = nullptr;
 
-uint16_t get_data_type(bufferlist buf) {
   uint16_t type;
-  auto i = buf.cbegin();
+  uint64_t seq;
+  auto i = payload_buffer.cbegin();
   DECODE_START(1, i);
-  decode(type, i);
+  ceph::decode(type, i);
+  ceph::decode(seq, i);
   DECODE_FINISH(i);
-  return type;
-}
-
-bufferlist ObjectCacheRequest::get_data_buffer() {
-  return m_data_buffer;
-}
-
-ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) {
-  ObjectCacheRequest* req = new ObjectCacheRequest();
-
-  switch(type) {
-    case RBDSC_REGISTER: {
-      ObjectCacheRegData* data = (ObjectCacheRegData*)m_data;
-      data->encode(req->m_data_buffer);
-      break;
-    }
-    case RBDSC_REGISTER_REPLY: {
-      ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data;
-      data->encode(req->m_data_buffer);
-      break;
-    }
-    case RBDSC_READ: {
-      ObjectCacheReadData* data = (ObjectCacheReadData*)m_data;
-      data->encode(req->m_data_buffer);
-      break;
-    }
-    case RBDSC_READ_RADOS: {
-      ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data;
-      data->encode(req->m_data_buffer);
-      break;
-    }
-    case RBDSC_READ_REPLY: {
-      ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data;
-      data->encode(req->m_data_buffer);
-      break;
-    }
-    default:
-      ceph_assert(0);
-  }
-
-  req->type = type;
-  return req;
-}
-
-ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) {
-  ObjectCacheRequest* req = new ObjectCacheRequest();
-  uint16_t type = get_data_type(data_buffer);
-  uint64_t seq;
 
   switch(type) {
     case RBDSC_REGISTER: {
-      ObjectCacheRegData* data = new ObjectCacheRegData();
-      data->decode(data_buffer);
-      seq = data->seq;
-      req->m_data = data;
+      req = new ObjectCacheRegData(type, seq);
       break;
     }
     case RBDSC_READ: {
-      ObjectCacheReadData* data = new ObjectCacheReadData();
-      data->decode(data_buffer);
-      seq = data->seq;
-      req->m_data = data;
+      req = new ObjectCacheReadData(type, seq);
       break;
     }
     case RBDSC_REGISTER_REPLY: {
-      ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
-      data->decode(data_buffer);
-      seq = data->seq;
-      req->m_data = data;
+      req = new ObjectCacheRegReplyData(type, seq);
       break;
     }
     case RBDSC_READ_REPLY: {
-      ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
-      data->decode(data_buffer);
-      seq = data->seq;
-      req->m_data = data;
+      req = new ObjectCacheReadReplyData(type, seq);
       break;
     }
     case RBDSC_READ_RADOS: {
-      ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
-      data->decode(data_buffer);
-      seq = data->seq;
-      req->m_data = data;
+      req = new ObjectCacheReadRadosData(type, seq);
       break;
     }
     default:
       ceph_assert(0);
   }
 
-  req->type = type;
-  req->seq = seq;
+  req->decode(payload_buffer);
+
   return req;
 }
 
index 4d03dfa199323802b4f1d63e667227393a946121..99b7b8ae97fb151b50841fac684cb98fd474878e 100644 (file)
 
 #include "include/encoding.h"
 #include "include/Context.h"
+#include "SocketCommon.h"
 
 namespace ceph {
 namespace immutable_obj_cache {
 
-class ObjectCacheRegData {
+namespace {
+struct HeaderHelper {
+  uint8_t v;
+  uint8_t c_v;
+  uint32_t len;
+}__attribute__((packed));
+
+inline uint8_t get_header_size() {
+  return sizeof(HeaderHelper);
+}
+
+inline uint32_t get_data_len(char* buf) {
+  HeaderHelper* header = (HeaderHelper*)buf;
+  return header->len;
+}
+}
+
+class ObjectCacheRequest {
 public:
   uint16_t type;
   uint64_t seq;
 
-  void encode(bufferlist& bl);
+  bufferlist m_payload;
+
+  GenContext<ObjectCacheRequest*>* m_process_msg;
+
+  ObjectCacheRequest();
+  ObjectCacheRequest(uint16_t type, uint64_t seq);
+  virtual ~ObjectCacheRequest();
+
+  // encode consists of two steps
+  // step 1 : directly encode common bits using encode method of base classs.
+  // step 2 : according to payload_empty, determine whether addtional bits need to
+  //          be encoded which be implements by child class.
+  void encode();
   void decode(bufferlist& bl);
+  bufferlist get_payload_bufferlist() { return m_payload; }
+
+  virtual void encode_payload() = 0;
+  virtual void decode_payload(bufferlist::const_iterator bl_it) = 0;
+  virtual uint16_t get_request_type() = 0;
+  virtual bool payload_empty() = 0;
 };
 
-class ObjectCacheRegReplyData {
+class ObjectCacheRegData : public ObjectCacheRequest {
 public:
-  uint16_t type;
-  uint64_t seq;
+  ObjectCacheRegData();
+  ObjectCacheRegData(uint16_t t, uint64_t s);
+  ~ObjectCacheRegData() override;
+  void encode_payload() override;
+  void decode_payload(bufferlist::const_iterator bl) override;
+  uint16_t get_request_type() override { return RBDSC_REGISTER; }
+  bool payload_empty() override { return true; }
+};
 
-  void encode(bufferlist& bl);
-  void decode(bufferlist& bl);
+class ObjectCacheRegReplyData : public ObjectCacheRequest {
+public:
+  ObjectCacheRegReplyData();
+  ObjectCacheRegReplyData(uint16_t t, uint64_t s);
+  ~ObjectCacheRegReplyData() override;
+  void encode_payload() override;
+  void decode_payload(bufferlist::const_iterator iter) override;
+  uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
+  bool payload_empty() override { return true; }
 };
 
-class ObjectCacheReadData {
+class ObjectCacheReadData : public ObjectCacheRequest {
 public:
-  uint16_t type;
-  uint64_t seq;
   uint64_t m_read_offset;
   uint64_t m_read_len;
   uint64_t m_pool_id;
   uint64_t m_snap_id;
   std::string m_oid;
   std::string m_pool_namespace;
-
-  void encode(bufferlist& bl);
-  void decode(bufferlist& bl);
+  ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id,
+                      uint64_t snap_id, std::string oid, std::string pool_namespace );
+  ObjectCacheReadData(uint16_t t, uint64_t s);
+  ~ObjectCacheReadData() override;
+  void encode_payload() override;
+  void decode_payload(bufferlist::const_iterator bl) override;
+  uint16_t get_request_type() override { return RBDSC_READ; }
+  bool payload_empty() override { return false; }
 };
 
-class ObjectCacheReadReplyData {
+class ObjectCacheReadReplyData : public ObjectCacheRequest {
 public:
-  uint16_t type;
-  uint64_t seq;
   std::string m_cache_path;
-
-  void encode(bufferlist& bl);
-  void decode(bufferlist& bl);
+  ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
+  ObjectCacheReadReplyData(uint16_t t, uint64_t s);
+  ~ObjectCacheReadReplyData() override;
+  void encode_payload() override;
+  void decode_payload(bufferlist::const_iterator bl) override;
+  uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
+  bool payload_empty() override { return false; }
 };
 
-class ObjectCacheReadRadosData {
+class ObjectCacheReadRadosData : public ObjectCacheRequest {
 public:
-  uint16_t type;
-  uint64_t seq;
-
-  void encode(bufferlist& bl);
-  void decode(bufferlist& bl);
+  ObjectCacheReadRadosData();
+  ObjectCacheReadRadosData(uint16_t t, uint64_t s);
+  ~ObjectCacheReadRadosData() override;
+  void encode_payload() override;
+  void decode_payload(bufferlist::const_iterator bl) override;
+  uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
+  bool payload_empty() override { return true; }
 };
 
-class ObjectCacheRequest {
-public:
-    uint64_t seq;
-    uint16_t type;
-    void* m_data;
-    bufferlist m_data_buffer;
-    GenContext<ObjectCacheRequest*>* m_process_msg;
-
-    bufferlist get_data_buffer();
-};
-
-uint8_t get_header_size();
-uint32_t get_data_len(char* buf);
-
-ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type);
-ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer);
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
 
 } // namespace immutable_obj_cache
 } // namespace ceph