From 3f521074c59e34d3a96e6eb710b5d2a03b2388ec Mon Sep 17 00:00:00 2001 From: shangdehao1 Date: Wed, 27 Feb 2019 01:18:37 +0800 Subject: [PATCH] tools: refactor ObjectCacheRequest of RO Signed-off-by: Dehao Shang --- .../test_DomainSocket.cc | 25 +- .../immutable_object_cache/test_message.cc | 51 ++-- .../test_multi_session.cc | 17 +- .../immutable_object_cache/CacheClient.cc | 32 +-- .../immutable_object_cache/CacheController.cc | 33 +-- .../immutable_object_cache/CacheSession.cc | 4 +- src/tools/immutable_object_cache/Types.cc | 233 +++++++----------- src/tools/immutable_object_cache/Types.h | 123 ++++++--- 8 files changed, 237 insertions(+), 281 deletions(-) diff --git a/src/test/immutable_object_cache/test_DomainSocket.cc b/src/test/immutable_object_cache/test_DomainSocket.cc index 17a442d13ff..af6b7f91fe5 100644 --- a/src/test/immutable_object_cache/test_DomainSocket.cc +++ b/src/test/immutable_object_cache/test_DomainSocket.cc @@ -79,28 +79,21 @@ public: void handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->type) { + switch (req->get_request_type()) { case RBDSC_REGISTER: { - ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); - data->type = RBDSC_REGISTER_REPLY; - req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY); - m_cache_server->send(session_id, req); + ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq); + m_cache_server->send(session_id, reply); break; } case RBDSC_READ: { - ObjectCacheReadData* req_data = (ObjectCacheReadData*)req->m_data; - if (m_hit_entry_set.find(req_data->m_oid) == m_hit_entry_set.end()) { - ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData(); - data->type = RBDSC_READ_RADOS; - data->seq = req_data->seq; - req = encode_object_cache_request(data, RBDSC_READ_RADOS); + ObjectCacheReadData* read_req = (ObjectCacheReadData*)req; + ObjectCacheRequest* reply = nullptr; + if (m_hit_entry_set.find(read_req->m_oid) == m_hit_entry_set.end()) { + reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); } else { - ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); - data->type = RBDSC_READ_REPLY; - data->seq = req_data->seq; - req = encode_object_cache_request(data, RBDSC_READ_REPLY); + reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/temp/cache/path"); } - m_cache_server->send(session_id, req); + m_cache_server->send(session_id, reply); break; } } diff --git a/src/test/immutable_object_cache/test_message.cc b/src/test/immutable_object_cache/test_message.cc index fe27cec0efc..0d3a31f23b8 100644 --- a/src/test/immutable_object_cache/test_message.cc +++ b/src/test/immutable_object_cache/test_message.cc @@ -10,39 +10,38 @@ TEST(test_for_message, test_1) std::string oid_name("this is a oid name"); std::string cache_file_path("/temp/ceph_immutable_object_cache"); - ObjectCacheReadData data; - - data.seq = 1UL; - data.type = RBDSC_READ; - data.m_read_offset = 222222; - data.m_read_len = 333333; - data.m_pool_id = 444444; - data.m_snap_id = 555555; - data.m_oid = oid_name; - data.m_pool_namespace = pool_nspace; + uint16_t type = RBDSC_READ; + uint64_t seq = 123456UL; + uint64_t read_offset = 222222UL; + uint64_t read_len = 333333UL; + uint64_t pool_id = 444444UL; + uint64_t snap_id = 555555UL; // ObjectRequest --> bufferlist - ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ); + ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len, + pool_id, snap_id, oid_name, pool_nspace); + req->encode(); + auto payload_bl = req->get_payload_bufferlist(); - auto data_bl = req->get_data_buffer(); - uint32_t data_len = get_data_len(data_bl.c_str()); - ASSERT_EQ(data_bl.length(), data_len + get_header_size()); - ASSERT_TRUE(data_bl.c_str() != nullptr); + uint32_t data_len = get_data_len(payload_bl.c_str()); + ASSERT_EQ(payload_bl.length(), data_len + get_header_size()); + ASSERT_TRUE(payload_bl.c_str() != nullptr); // bufferlist --> ObjectCacheRequest - ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl); - ObjectCacheReadData* reply_data = (ObjectCacheReadData*)(req_decode->m_data); + ObjectCacheRequest* req_decode = decode_object_cache_request(payload_bl); - ASSERT_EQ(req_decode->type, RBDSC_READ); + ASSERT_EQ(req_decode->get_request_type(), RBDSC_READ); - ASSERT_EQ(reply_data->seq, 1UL); - ASSERT_EQ(reply_data->type, RBDSC_READ); - ASSERT_EQ(reply_data->m_read_offset, 222222UL); - ASSERT_EQ(reply_data->m_read_len, 333333UL); - ASSERT_EQ(reply_data->m_pool_id, 444444UL); - ASSERT_EQ(reply_data->m_snap_id, 555555UL); - ASSERT_EQ(reply_data->m_pool_namespace, pool_nspace); - ASSERT_EQ(reply_data->m_oid, oid_name); + ASSERT_EQ(req_decode->type, RBDSC_READ); + ASSERT_EQ(req_decode->seq, 123456UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->type, RBDSC_READ); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->seq, 123456UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_offset, 222222UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_len, 333333UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_id, 444444UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_snap_id, 555555UL); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_namespace, pool_nspace); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_oid, oid_name); delete req; delete req_decode; diff --git a/src/test/immutable_object_cache/test_multi_session.cc b/src/test/immutable_object_cache/test_multi_session.cc index 008a3ba6d05..3ebdd4583ee 100644 --- a/src/test/immutable_object_cache/test_multi_session.cc +++ b/src/test/immutable_object_cache/test_multi_session.cc @@ -87,20 +87,17 @@ public: void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->type) { + switch (req->get_request_type()) { case RBDSC_REGISTER: { - ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); - data->type = RBDSC_REGISTER_REPLY; - req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY); - m_cache_server->send(session_id, req); + ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, + req->seq); + m_cache_server->send(session_id, reply); break; } case RBDSC_READ: { - ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); - data->type = RBDSC_READ_REPLY; - data->seq = req->seq; - req = encode_object_cache_request(data, RBDSC_READ_REPLY); - m_cache_server->send(session_id, req); + ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, + req->seq); + m_cache_server->send(session_id, reply); break; } } diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 20d5426b37f..e2ab8e6c039 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -25,7 +25,7 @@ namespace immutable_obj_cache { if (m_use_dedicated_worker) { m_worker = new boost::asio::io_service(); m_worker_io_service_work = new boost::asio::io_service::work(*m_worker); - for(uint64_t i = 0; i < m_worker_thread_num; i++) { + for (uint64_t i = 0; i < m_worker_thread_num; i++) { std::thread* thd = new std::thread([this](){m_worker->run();}); m_worker_threads.push_back(thd); } @@ -55,7 +55,7 @@ namespace immutable_obj_cache { } if (m_use_dedicated_worker) { m_worker->stop(); - for(auto thd : m_worker_threads) { + for (auto thd : m_worker_threads) { thd->join(); delete thd; } @@ -88,22 +88,16 @@ namespace immutable_obj_cache { void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, std::string oid, GenContext* on_finish) { - ObjectCacheReadData data; - data.type = RBDSC_READ; - data.seq = ++m_sequence_id; - - data.m_pool_id = pool_id; - data.m_snap_id = snap_id; - data.m_pool_namespace = pool_nspace; - data.m_oid = oid; - ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ); + ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0, + pool_id, snap_id, oid, pool_nspace); req->m_process_msg = on_finish; + req->encode(); { Mutex::Locker locker(m_lock); - m_outcoming_bl.append(req->get_data_buffer()); - ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end()); - m_seq_to_req[data.seq] = req; + m_outcoming_bl.append(req->get_payload_bufferlist()); + ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end()); + m_seq_to_req[req->seq] = req; } // try to send message to server. @@ -330,7 +324,7 @@ namespace immutable_obj_cache { // all pending request, which have entered into ASIO, will be re-dispatched to RADOS. { Mutex::Locker locker(m_lock); - for(auto it : m_seq_to_req) { + for (auto it : m_seq_to_req) { it.second->type = RBDSC_READ_RADOS; it.second->m_process_msg->complete(it.second); } @@ -343,13 +337,11 @@ namespace immutable_obj_cache { } int CacheClient::register_client(Context* on_finish) { - ObjectCacheRegData data; - data.seq = m_sequence_id++; - data.type = RBDSC_REGISTER; - ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER); + ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++); + reg_req->encode(); bufferlist bl; - bl.append(reg_req->get_data_buffer()); + bl.append(reg_req->get_payload_bufferlist()); uint64_t ret; boost::system::error_code ec; diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index 213b2d36b50..319631d3463 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -83,39 +83,30 @@ void CacheController::run() { void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){ ldout(m_cct, 20) << dendl; - switch (req->type) { + switch (req->get_request_type()) { case RBDSC_REGISTER: { // TODO(): skip register and allow clients to lookup directly - ObjectCacheRegReplyData data; - data.type = RBDSC_REGISTER_REPLY; - data.seq = req->seq; - req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY); - m_cache_server->send(session_id, req); + ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq); + m_cache_server->send(session_id, reply); break; } case RBDSC_READ: { // lookup object in local cache store - ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data); std::string cache_path; - int ret = m_object_cache_store->lookup_object(data->m_pool_namespace, - data->m_pool_id, - data->m_snap_id, - data->m_oid, + ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req; + int ret = m_object_cache_store->lookup_object(req_read_data->m_pool_namespace, + req_read_data->m_pool_id, + req_read_data->m_snap_id, + req_read_data->m_oid, cache_path); + ObjectCacheRequest* reply = nullptr; if (ret != OBJ_CACHE_PROMOTED) { - ObjectCacheReadRadosData reply_data; - reply_data.type = RBDSC_READ_RADOS; - reply_data.seq = req->seq; - req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS); + reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); } else { - ObjectCacheReadReplyData reply_data; - reply_data.m_cache_path = cache_path; - reply_data.type = RBDSC_READ_REPLY; - reply_data.seq = req->seq; - req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY); + reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path); } - m_cache_server->send(session_id, req); + m_cache_server->send(session_id, reply); break; } default: diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index c72305944fc..e4f76a7c806 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -94,6 +94,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, ObjectCacheRequest* req = decode_object_cache_request(bl_data); process(req); + delete req; read_request_header(); } @@ -105,7 +106,8 @@ void CacheSession::process(ObjectCacheRequest* req) { void CacheSession::send(ObjectCacheRequest* reply) { ldout(cct, 20) << dendl; bufferlist bl; - bl.append(reply->get_data_buffer()); + reply->encode(); + bl.append(reply->get_payload_bufferlist()); boost::asio::async_write(m_dm_socket, boost::asio::buffer(bl.c_str(), bl.length()), diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index b1a00a5a21d..2e547b11b5e 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -11,207 +11,148 @@ namespace ceph { namespace immutable_obj_cache { -void ObjectCacheRegData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); +ObjectCacheRequest::ObjectCacheRequest(){} +ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) + : type(t), seq(s) {} +ObjectCacheRequest::~ObjectCacheRequest(){} + +void ObjectCacheRequest::encode() { + ENCODE_START(1, 1, m_payload); + ceph::encode(type, m_payload); + ceph::encode(seq, m_payload); + if (!payload_empty()) { + encode_payload(); + } + ENCODE_FINISH(m_payload); } -void ObjectCacheRegData::decode(bufferlist& bl) { +void ObjectCacheRequest::decode(bufferlist& bl) { auto i = bl.cbegin(); DECODE_START(1, i); ceph::decode(type, i); ceph::decode(seq, i); + if (!payload_empty()) { + decode_payload(i); + } DECODE_FINISH(i); } -void ObjectCacheRegReplyData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); -} +ObjectCacheRegData::ObjectCacheRegData() {} +ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} -void ObjectCacheRegReplyData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); - DECODE_FINISH(i); -} +ObjectCacheRegData::~ObjectCacheRegData() {} + +void ObjectCacheRegData::encode_payload() {} + +void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {} + +ObjectCacheRegReplyData::ObjectCacheRegReplyData() {} +ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {} + +void ObjectCacheRegReplyData::encode_payload() {} + +void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, + uint64_t read_offset, uint64_t read_len, + uint64_t pool_id, uint64_t snap_id, + std::string oid, std::string pool_namespace) + : ObjectCacheRequest(t, s), m_read_offset(read_offset), + m_read_len(read_len), m_pool_id(pool_id), m_snap_id(snap_id), + m_oid(oid), m_pool_namespace(pool_namespace) +{} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadData::~ObjectCacheReadData() {} -void ObjectCacheReadData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ceph::encode(m_read_offset, bl); - ceph::encode(m_read_len, bl); - ceph::encode(m_pool_id, bl); - ceph::encode(m_snap_id, bl); - ceph::encode(m_oid, bl); - ceph::encode(m_pool_namespace, bl); - ENCODE_FINISH(bl); +void ObjectCacheReadData::encode_payload() { + ceph::encode(m_read_offset, m_payload); + ceph::encode(m_read_len, m_payload); + ceph::encode(m_pool_id, m_payload); + ceph::encode(m_snap_id, m_payload); + ceph::encode(m_oid, m_payload); + ceph::encode(m_pool_namespace, m_payload); } -void ObjectCacheReadData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); +void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) { ceph::decode(m_read_offset, i); ceph::decode(m_read_len, i); ceph::decode(m_pool_id, i); ceph::decode(m_snap_id, i); ceph::decode(m_oid, i); ceph::decode(m_pool_namespace, i); - DECODE_FINISH(i); } -void ObjectCacheReadReplyData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ceph::encode(m_cache_path, bl); - ENCODE_FINISH(bl); +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path) + : ObjectCacheRequest(t, s), m_cache_path(cache_path) {} +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {} + +void ObjectCacheReadReplyData::encode_payload() { + ceph::encode(m_cache_path, m_payload); } -void ObjectCacheReadReplyData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); +void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) { ceph::decode(m_cache_path, i); - DECODE_FINISH(i); } -void ObjectCacheReadRadosData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); -} +ObjectCacheReadRadosData::ObjectCacheReadRadosData() {} +ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} -void ObjectCacheReadRadosData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); - DECODE_FINISH(i); -} +ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {} -uint8_t get_header_size() { - return 6; //uint8_t + uint8_t + uint32_t -} +void ObjectCacheReadRadosData::encode_payload() {} -struct encode_header{ - uint8_t v; - uint8_t c_v; - uint32_t len; -}__attribute__((packed)); +void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {} -uint32_t get_data_len(char* buf) { - encode_header* header = (encode_header*)buf; - return header->len; -} +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) +{ + ObjectCacheRequest* req = nullptr; -uint16_t get_data_type(bufferlist buf) { uint16_t type; - auto i = buf.cbegin(); + uint64_t seq; + auto i = payload_buffer.cbegin(); DECODE_START(1, i); - decode(type, i); + ceph::decode(type, i); + ceph::decode(seq, i); DECODE_FINISH(i); - return type; -} - -bufferlist ObjectCacheRequest::get_data_buffer() { - return m_data_buffer; -} - -ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) { - ObjectCacheRequest* req = new ObjectCacheRequest(); - - switch(type) { - case RBDSC_REGISTER: { - ObjectCacheRegData* data = (ObjectCacheRegData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_REGISTER_REPLY: { - ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ: { - ObjectCacheReadData* data = (ObjectCacheReadData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ_RADOS: { - ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ_REPLY: { - ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data; - data->encode(req->m_data_buffer); - break; - } - default: - ceph_assert(0); - } - - req->type = type; - return req; -} - -ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) { - ObjectCacheRequest* req = new ObjectCacheRequest(); - uint16_t type = get_data_type(data_buffer); - uint64_t seq; switch(type) { case RBDSC_REGISTER: { - ObjectCacheRegData* data = new ObjectCacheRegData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheRegData(type, seq); break; } case RBDSC_READ: { - ObjectCacheReadData* data = new ObjectCacheReadData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadData(type, seq); break; } case RBDSC_REGISTER_REPLY: { - ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheRegReplyData(type, seq); break; } case RBDSC_READ_REPLY: { - ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadReplyData(type, seq); break; } case RBDSC_READ_RADOS: { - ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadRadosData(type, seq); break; } default: ceph_assert(0); } - req->type = type; - req->seq = seq; + req->decode(payload_buffer); + return req; } diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index 4d03dfa1993..99b7b8ae97f 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -6,78 +6,119 @@ #include "include/encoding.h" #include "include/Context.h" +#include "SocketCommon.h" namespace ceph { namespace immutable_obj_cache { -class ObjectCacheRegData { +namespace { +struct HeaderHelper { + uint8_t v; + uint8_t c_v; + uint32_t len; +}__attribute__((packed)); + +inline uint8_t get_header_size() { + return sizeof(HeaderHelper); +} + +inline uint32_t get_data_len(char* buf) { + HeaderHelper* header = (HeaderHelper*)buf; + return header->len; +} +} + +class ObjectCacheRequest { public: uint16_t type; uint64_t seq; - void encode(bufferlist& bl); + bufferlist m_payload; + + GenContext* m_process_msg; + + ObjectCacheRequest(); + ObjectCacheRequest(uint16_t type, uint64_t seq); + virtual ~ObjectCacheRequest(); + + // encode consists of two steps + // step 1 : directly encode common bits using encode method of base classs. + // step 2 : according to payload_empty, determine whether addtional bits need to + // be encoded which be implements by child class. + void encode(); void decode(bufferlist& bl); + bufferlist get_payload_bufferlist() { return m_payload; } + + virtual void encode_payload() = 0; + virtual void decode_payload(bufferlist::const_iterator bl_it) = 0; + virtual uint16_t get_request_type() = 0; + virtual bool payload_empty() = 0; }; -class ObjectCacheRegReplyData { +class ObjectCacheRegData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; + ObjectCacheRegData(); + ObjectCacheRegData(uint16_t t, uint64_t s); + ~ObjectCacheRegData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_REGISTER; } + bool payload_empty() override { return true; } +}; - void encode(bufferlist& bl); - void decode(bufferlist& bl); +class ObjectCacheRegReplyData : public ObjectCacheRequest { +public: + ObjectCacheRegReplyData(); + ObjectCacheRegReplyData(uint16_t t, uint64_t s); + ~ObjectCacheRegReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator iter) override; + uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; } + bool payload_empty() override { return true; } }; -class ObjectCacheReadData { +class ObjectCacheReadData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; uint64_t m_read_offset; uint64_t m_read_len; uint64_t m_pool_id; uint64_t m_snap_id; std::string m_oid; std::string m_pool_namespace; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, + uint64_t snap_id, std::string oid, std::string pool_namespace ); + ObjectCacheReadData(uint16_t t, uint64_t s); + ~ObjectCacheReadData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ; } + bool payload_empty() override { return false; } }; -class ObjectCacheReadReplyData { +class ObjectCacheReadReplyData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; std::string m_cache_path; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path); + ObjectCacheReadReplyData(uint16_t t, uint64_t s); + ~ObjectCacheReadReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ_REPLY; } + bool payload_empty() override { return false; } }; -class ObjectCacheReadRadosData { +class ObjectCacheReadRadosData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadRadosData(); + ObjectCacheReadRadosData(uint16_t t, uint64_t s); + ~ObjectCacheReadRadosData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ_RADOS; } + bool payload_empty() override { return true; } }; -class ObjectCacheRequest { -public: - uint64_t seq; - uint16_t type; - void* m_data; - bufferlist m_data_buffer; - GenContext* m_process_msg; - - bufferlist get_data_buffer(); -}; - -uint8_t get_header_size(); -uint32_t get_data_len(char* buf); - -ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type); -ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer); +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer); } // namespace immutable_obj_cache } // namespace ceph -- 2.39.5