From e362c8e6c7dbf53c610d23a3822459db5835833f Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 11 Feb 2019 20:30:37 +0800 Subject: [PATCH] tools: cleanup IPC message for immutable obj cache daemon using ceph serialization(encode/decode) framework instead of building new data structure Signed-off-by: Yuan Zhou --- .../test_DomainSocket.cc | 10 +-- .../immutable_object_cache/test_message.cc | 41 ++-------- .../test_multi_session.cc | 6 +- .../immutable_object_cache/CacheClient.cc | 75 ++++++++----------- .../immutable_object_cache/CacheClient.h | 6 +- .../immutable_object_cache/CacheController.cc | 8 +- .../immutable_object_cache/CacheSession.cc | 29 +++---- .../immutable_object_cache/CacheSession.h | 3 +- src/tools/immutable_object_cache/Types.cc | 53 +++++-------- src/tools/immutable_object_cache/Types.h | 24 ++---- 10 files changed, 90 insertions(+), 165 deletions(-) diff --git a/src/test/immutable_object_cache/test_DomainSocket.cc b/src/test/immutable_object_cache/test_DomainSocket.cc index 3113bfabffd..0ed5b160304 100644 --- a/src/test/immutable_object_cache/test_DomainSocket.cc +++ b/src/test/immutable_object_cache/test_DomainSocket.cc @@ -79,17 +79,17 @@ public: void handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->m_head.type) { + switch (req->m_data.type) { case RBDSC_REGISTER: { - req->m_head.type = RBDSC_REGISTER_REPLY; + req->m_data.type = RBDSC_REGISTER_REPLY; m_cache_server->send(session_id, req); break; } case RBDSC_READ: { if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) { - req->m_head.type = RBDSC_READ_RADOS; + req->m_data.type = RBDSC_READ_RADOS; } else { - req->m_head.type = RBDSC_READ_REPLY; + req->m_data.type = RBDSC_READ_REPLY; } m_cache_server->send(session_id, req); break; @@ -130,7 +130,7 @@ public: bool hit; auto ctx = new LambdaGenContext, ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){ - hit = ack->m_head.type == RBDSC_READ_REPLY; + hit = ack->m_data.type == RBDSC_READ_REPLY; m_wait_event.signal(); }); m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, ctx); diff --git a/src/test/immutable_object_cache/test_message.cc b/src/test/immutable_object_cache/test_message.cc index 12435242305..2d17f394025 100644 --- a/src/test/immutable_object_cache/test_message.cc +++ b/src/test/immutable_object_cache/test_message.cc @@ -9,20 +9,10 @@ TEST(test_for_message, test_1) std::string oid_name("this is a oid name"); std::string cache_file_path("/temp/ceph_immutable_object_cache"); - ObjectCacheMsgHeader header; - header.seq = 1; - header.type = 2; - header.version =3; - header.data_len = 0; - header.reserved = 5; - ObjectCacheRequest req; - ASSERT_EQ(req.m_head_buffer.length(), 0); - ASSERT_EQ(req.m_data_buffer.length(), 0); - - req.m_head = header; - + req.m_data.seq = 1; + req.m_data.type = 2; req.m_data.m_read_offset = 222222; req.m_data.m_read_len = 333333; req.m_data.m_pool_id = 444444; @@ -34,36 +24,19 @@ TEST(test_for_message, test_1) // ObjectRequest --> bufferlist req.encode(); - ASSERT_EQ(req.m_head_buffer.length(), sizeof(req.m_head)); - - ObjectCacheRequest* req_decode; - - auto head_bl = req.get_head_buffer(); - auto data_bl = req.get_data_buffer(); // bufferlist --> ObjectCacheRequest - req_decode = decode_object_cache_request(head_bl, data_bl); + auto data_bl = req.get_data_buffer(); + uint32_t data_len = get_data_len(data_bl.c_str()); + ASSERT_EQ(data_bl.length(), data_len + get_header_size()); - ASSERT_EQ(req_decode->m_head.seq, header.seq); - ASSERT_EQ(req_decode->m_head.seq, 1); - ASSERT_EQ(req_decode->m_head.type, header.type); - ASSERT_EQ(req_decode->m_head.type, 2); - ASSERT_EQ(req_decode->m_head.version, header.version); - ASSERT_EQ(req_decode->m_head.version, 3); - ASSERT_EQ(req_decode->m_head.data_len, req.m_data_buffer.length()); - ASSERT_EQ(req_decode->m_head.data_len, data_bl.length()); - ASSERT_EQ(req_decode->m_head.reserved, header.reserved); - ASSERT_EQ(req_decode->m_head.reserved, 5); + ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl); - ASSERT_EQ(req_decode->m_data.m_read_offset, req.m_data.m_read_offset); + ASSERT_EQ(req_decode->m_data.seq, 1); ASSERT_EQ(req_decode->m_data.m_read_offset, 222222); - ASSERT_EQ(req_decode->m_data.m_read_len, req.m_data.m_read_len); ASSERT_EQ(req_decode->m_data.m_read_len, 333333); - ASSERT_EQ(req_decode->m_data.m_pool_namespace, req.m_data.m_pool_namespace); ASSERT_EQ(req_decode->m_data.m_pool_namespace, pool_nspace); - ASSERT_EQ(req_decode->m_data.m_cache_path, req.m_data.m_cache_path); ASSERT_EQ(req_decode->m_data.m_cache_path, cache_file_path); - ASSERT_EQ(req_decode->m_data.m_oid, req.m_data.m_oid); ASSERT_EQ(req_decode->m_data.m_oid, oid_name); delete req_decode; diff --git a/src/test/immutable_object_cache/test_multi_session.cc b/src/test/immutable_object_cache/test_multi_session.cc index 6f23d184218..e0f6d953591 100644 --- a/src/test/immutable_object_cache/test_multi_session.cc +++ b/src/test/immutable_object_cache/test_multi_session.cc @@ -87,14 +87,14 @@ public: void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->m_head.type) { + switch (req->m_data.type) { case RBDSC_REGISTER: { - req->m_head.type = RBDSC_REGISTER_REPLY; + req->m_data.type = RBDSC_REGISTER_REPLY; m_cache_server->send(session_id, req); break; } case RBDSC_READ: { - req->m_head.type = RBDSC_READ_REPLY; + req->m_data.type = RBDSC_READ_REPLY; m_cache_server->send(session_id, req); break; } diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 4dca9188ad2..67ce958f232 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -17,8 +17,7 @@ namespace immutable_obj_cache { m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)), m_io_thread(nullptr), m_session_work(false), m_writing(false), m_reading(false), m_sequence_id(0), - m_lock("ceph::cache::cacheclient::m_lock"), - m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]) + m_lock("ceph::cache::cacheclient::m_lock") { // TODO : configure it. m_use_dedicated_worker = true; @@ -31,11 +30,12 @@ namespace immutable_obj_cache { m_worker_threads.push_back(thd); } } + m_bp_header = buffer::create(get_header_size()); + } CacheClient::~CacheClient() { stop(); - delete m_header_buffer; } void CacheClient::run(){ @@ -89,11 +89,8 @@ namespace immutable_obj_cache { std::string oid, GenContext* on_finish) { ObjectCacheRequest* req = new ObjectCacheRequest(); - req->m_head.version = 0; - req->m_head.reserved = 0; - req->m_head.type = RBDSC_READ; - req->m_head.padding = 0; - req->m_head.seq = ++m_sequence_id; + req->m_data.type = RBDSC_READ; + req->m_data.seq = ++m_sequence_id; req->m_data.m_pool_id = pool_id; req->m_data.m_snap_id = snap_id; @@ -102,15 +99,11 @@ namespace immutable_obj_cache { req->m_process_msg = on_finish; req->encode(); - ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader)); - ceph_assert(req->get_data_buffer().length() == req->m_head.data_len); - - { + { Mutex::Locker locker(m_lock); - m_outcoming_bl.append(req->get_head_buffer()); m_outcoming_bl.append(req->get_data_buffer()); - ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end()); - m_seq_to_req[req->m_head.seq] = req; + ceph_assert(m_seq_to_req.find(req->m_data.seq) == m_seq_to_req.end()); + m_seq_to_req[req->m_data.seq] = req; } // try to send message to server. @@ -175,12 +168,12 @@ namespace immutable_obj_cache { void CacheClient::read_reply_header() { /* create new head buffer for every reply */ - bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader))); + bufferptr bp_head(buffer::create(get_header_size())); auto raw_ptr = bp_head.c_str(); boost::asio::async_read(m_dm_socket, - boost::asio::buffer(raw_ptr, sizeof(ObjectCacheMsgHeader)), - boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)), + boost::asio::buffer(raw_ptr, get_header_size()), + boost::asio::transfer_exactly(get_header_size()), boost::bind(&CacheClient::handle_reply_header, this, bp_head, boost::asio::placeholders::error, @@ -190,56 +183,51 @@ namespace immutable_obj_cache { void CacheClient::handle_reply_header(bufferptr bp_head, const boost::system::error_code& ec, size_t bytes_transferred) { - if (ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { + if (ec || bytes_transferred != get_header_size()) { fault(ASIO_ERROR_READ, ec); return; } ceph_assert(bytes_transferred == bp_head.length()); - ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str(); - uint64_t data_len = head->data_len; - uint64_t seq_id = head->seq; - ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); + uint32_t data_len = get_data_len(bp_head.c_str()); bufferptr bp_data(buffer::create(data_len)); - read_reply_data(std::move(bp_head), std::move(bp_data), data_len, seq_id); + read_reply_data(std::move(bp_head), std::move(bp_data), data_len); } void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, - const uint64_t data_len, const uint64_t seq_id) { + const uint64_t data_len) { auto raw_ptr = bp_data.c_str(); boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len), boost::asio::transfer_exactly(data_len), boost::bind(&CacheClient::handle_reply_data, - this, std::move(bp_head), std::move(bp_data), data_len, seq_id, + this, std::move(bp_head), std::move(bp_data), data_len, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data, - const uint64_t data_len, const uint64_t seq_id, + const uint64_t data_len, const boost::system::error_code& ec, size_t bytes_transferred) { if (ec || bytes_transferred != data_len) { fault(ASIO_ERROR_WRITE, ec); return; } + ceph_assert(bp_data.length() == data_len); - bufferlist head_buffer; bufferlist data_buffer; - head_buffer.append(std::move(bp_head)); + data_buffer.append(std::move(bp_head)); data_buffer.append(std::move(bp_data)); - ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader)); - ceph_assert(data_buffer.length() == data_len); - ObjectCacheRequest* reply = decode_object_cache_request(head_buffer, data_buffer); + ObjectCacheRequest* reply = decode_object_cache_request(data_buffer); data_buffer.clear(); ceph_assert(data_buffer.length() == 0); - process(reply, seq_id); + process(reply, reply->m_data.seq); { Mutex::Locker locker(m_lock); @@ -343,7 +331,7 @@ namespace immutable_obj_cache { { Mutex::Locker locker(m_lock); for(auto it : m_seq_to_req) { - it.second->m_head.type = RBDSC_READ_RADOS; + it.second->m_data.type = RBDSC_READ_RADOS; it.second->m_process_msg->complete(it.second); } m_seq_to_req.clear(); @@ -356,14 +344,11 @@ namespace immutable_obj_cache { int CacheClient::register_client(Context* on_finish) { ObjectCacheRequest* message = new ObjectCacheRequest(); - message->m_head.version = 0; - message->m_head.seq = m_sequence_id++; - message->m_head.type = RBDSC_REGISTER; - message->m_head.reserved = 0; + message->m_data.seq = m_sequence_id++; + message->m_data.type = RBDSC_REGISTER; message->encode(); bufferlist bl; - bl.append(message->get_head_buffer()); bl.append(message->get_data_buffer()); uint64_t ret; @@ -378,14 +363,13 @@ namespace immutable_obj_cache { } ret = boost::asio::read(m_dm_socket, - boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec); - if (ec || ret != sizeof(ObjectCacheMsgHeader)) { + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec); + if (ec || ret != get_header_size()) { fault(ASIO_ERROR_READ, ec); return -1; } - ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)m_header_buffer; - uint64_t data_len = head->data_len; + uint64_t data_len = get_data_len(m_bp_header.c_str()); bufferptr bp_data(buffer::create(data_len)); ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec); @@ -395,9 +379,10 @@ namespace immutable_obj_cache { } bufferlist data_buffer; + data_buffer.append(m_bp_header); data_buffer.append(std::move(bp_data)); - ObjectCacheRequest* req = decode_object_cache_request(head, data_buffer); - if (req->m_head.type == RBDSC_REGISTER_REPLY) { + ObjectCacheRequest* req = decode_object_cache_request(data_buffer); + if (req->m_data.type == RBDSC_REGISTER_REPLY) { on_finish->complete(true); } else { on_finish->complete(false); diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index 56ef42321fa..8917062286c 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -47,9 +47,9 @@ private: void handle_reply_header(bufferptr bp_head, const boost::system::error_code& ec, size_t bytes_transferred); void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, - const uint64_t data_len, const uint64_t seq_id); + const uint64_t data_len); void handle_reply_data(bufferptr bp_head, bufferptr bp_data, - const uint64_t data_len, const uint64_t seq_id, + const uint64_t data_len, const boost::system::error_code& ec, size_t bytes_transferred); private: @@ -73,7 +73,7 @@ private: Mutex m_lock; std::map m_seq_to_req; bufferlist m_outcoming_bl; - char* m_header_buffer; + bufferptr m_bp_header; }; } // namespace immutable_obj_cache diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index b62cdea724a..e7b06c4d9f3 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -83,10 +83,10 @@ void CacheController::run() { void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){ ldout(m_cct, 20) << dendl; - switch (req->m_head.type) { + switch (req->m_data.type) { case RBDSC_REGISTER: { // TODO(): skip register and allow clients to lookup directly - req->m_head.type = RBDSC_REGISTER_REPLY; + req->m_data.type = RBDSC_REGISTER_REPLY; m_cache_server->send(session_id, req); break; @@ -99,9 +99,9 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re req->m_data.m_oid, req->m_data.m_cache_path); if (ret < 0) { - req->m_head.type = RBDSC_READ_RADOS; + req->m_data.type = RBDSC_READ_RADOS; } else { - req->m_head.type = RBDSC_READ_REPLY; + req->m_data.type = RBDSC_READ_REPLY; } m_cache_server->send(session_id, req); diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index 87528ac48f5..c188e8e0b61 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -18,13 +18,12 @@ namespace immutable_obj_cache { CacheSession::CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg processmsg, CephContext* cct) : m_session_id(session_id), m_dm_socket(io_service), - m_head_buffer(new char[sizeof(ObjectCacheMsgHeader)]), - m_server_process_msg(processmsg), cct(cct) - {} + m_server_process_msg(processmsg), cct(cct) { + m_bp_header = buffer::create(get_header_size()); +} CacheSession::~CacheSession() { close(); - delete[] m_head_buffer; } stream_protocol::socket& CacheSession::socket() { @@ -48,8 +47,8 @@ void CacheSession::start() { void CacheSession::read_request_header() { ldout(cct, 20) << dendl; boost::asio::async_read(m_dm_socket, - boost::asio::buffer(m_head_buffer, sizeof(ObjectCacheMsgHeader)), - boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)), + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), + boost::asio::transfer_exactly(get_header_size()), boost::bind(&CacheSession::handle_request_header, shared_from_this(), boost::asio::placeholders::error, @@ -59,17 +58,12 @@ void CacheSession::read_request_header() { void CacheSession::handle_request_header(const boost::system::error_code& err, size_t bytes_transferred) { ldout(cct, 20) << dendl; - if (err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { + if (err || bytes_transferred != get_header_size()) { fault(); return; } - ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer); - ceph_assert(head->version == 0); - ceph_assert(head->reserved == 0); - ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ); - - read_request_data(head->data_len); + read_request_data(get_data_len(m_bp_header.c_str())); } void CacheSession::read_request_data(uint64_t data_len) { @@ -94,25 +88,24 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, } bufferlist bl_data; + + bl_data.append(m_bp_header); bl_data.append(std::move(bp)); - ObjectCacheRequest* req = decode_object_cache_request( - (ObjectCacheMsgHeader*)m_head_buffer, bl_data); + ObjectCacheRequest* req = decode_object_cache_request(bl_data); process(req); read_request_header(); } void CacheSession::process(ObjectCacheRequest* req) { ldout(cct, 20) << dendl; - m_server_process_msg(m_session_id, req); + m_server_process_msg(m_session_id, req); } void CacheSession::send(ObjectCacheRequest* reply) { ldout(cct, 20) << dendl; - reply->m_head_buffer.clear(); reply->m_data_buffer.clear(); reply->encode(); bufferlist bl; - bl.append(reply->get_head_buffer()); bl.append(reply->get_data_buffer()); boost::asio::async_write(m_dm_socket, diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h index c6761d48df7..5f5cd1be9bd 100644 --- a/src/tools/immutable_object_cache/CacheSession.h +++ b/src/tools/immutable_object_cache/CacheSession.h @@ -36,9 +36,10 @@ public: private: uint64_t m_session_id; stream_protocol::socket m_dm_socket; - char* m_head_buffer; ProcessMsg m_server_process_msg; CephContext* cct; + + bufferptr m_bp_header; }; typedef std::shared_ptr CacheSessionPtr; diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index 61af325810d..bd904449113 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -10,26 +10,10 @@ namespace ceph { namespace immutable_obj_cache { -void ObjectCacheMsgHeader::encode(bufferlist& bl) const { - ceph::encode(seq, bl); - ceph::encode(type, bl); - ceph::encode(version, bl); - ceph::encode(padding, bl); - ceph::encode(data_len, bl); - ceph::encode(reserved, bl); -} - -void ObjectCacheMsgHeader::decode(bufferlist::const_iterator& it) { - ceph::decode(seq, it); - ceph::decode(type, it); - ceph::decode(version, it); - ceph::decode(padding, it); - ceph::decode(data_len, it); - ceph::decode(reserved, it); -} - void ObjectCacheMsgData::encode(bufferlist& bl) { ENCODE_START(1, 1, bl); + ceph::encode(seq, bl); + ceph::encode(type, bl); ceph::encode(m_read_offset, bl); ceph::encode(m_read_len, bl); ceph::encode(m_pool_id, bl); @@ -43,6 +27,8 @@ void ObjectCacheMsgData::encode(bufferlist& bl) { void ObjectCacheMsgData::decode(bufferlist& bl) { auto i = bl.cbegin(); DECODE_START(1, i); + ceph::decode(seq, i); + ceph::decode(type, i); ceph::decode(m_read_offset, i); ceph::decode(m_read_len, i); ceph::decode(m_pool_id, i); @@ -55,32 +41,33 @@ void ObjectCacheMsgData::decode(bufferlist& bl) { void ObjectCacheRequest::encode() { m_data.encode(m_data_buffer); - m_head.data_len = m_data_buffer.length(); - ceph_assert(m_head_buffer.length() == 0); - m_head.encode(m_head_buffer); - ceph_assert(sizeof(ObjectCacheMsgHeader) == m_head_buffer.length()); } -bufferlist ObjectCacheRequest::get_head_buffer() { - return m_head_buffer; +uint8_t get_header_size() { + //return sizeof(ObjectCacheMsgHeader); + return 6; +} + +struct encode_header{ + uint8_t v; + uint8_t c_v; + uint32_t len; +}__attribute__((packed)); + +uint32_t get_data_len(char* buf) { + encode_header* header = (encode_header*)buf; + return header->len; } + bufferlist ObjectCacheRequest::get_data_buffer() { return m_data_buffer; } -ObjectCacheRequest* decode_object_cache_request( - ObjectCacheMsgHeader* head, bufferlist data_buffer) { +ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) { ObjectCacheRequest* req = new ObjectCacheRequest(); - req->m_head = *head; - ceph_assert(req->m_head.data_len == data_buffer.length()); req->m_data.decode(data_buffer); return req; } -ObjectCacheRequest* decode_object_cache_request( - bufferlist head_buffer, bufferlist data_buffer) { - return decode_object_cache_request((ObjectCacheMsgHeader*)(head_buffer.c_str()), data_buffer); -} - } // namespace immutable_obj_cache } // namespace ceph diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index d6c1c01792f..ef487e4f215 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -10,20 +10,10 @@ namespace ceph { namespace immutable_obj_cache { -struct ObjectCacheMsgHeader { - uint64_t seq; /* sequence id */ - uint16_t type; /* msg type */ - uint16_t version; /* object cache version */ - uint32_t padding; - uint32_t data_len; - uint32_t reserved; - - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& it); -}; - class ObjectCacheMsgData { public: + uint64_t seq; /* sequence id */ + uint16_t type; /* msg type */ uint64_t m_read_offset; uint64_t m_read_len; uint64_t m_pool_id; @@ -38,23 +28,19 @@ public: class ObjectCacheRequest { public: - ObjectCacheMsgHeader m_head; ObjectCacheMsgData m_data; - bufferlist m_head_buffer; bufferlist m_data_buffer; GenContext* m_process_msg; void encode(); - bufferlist get_head_buffer(); bufferlist get_data_buffer(); }; -ObjectCacheRequest* decode_object_cache_request( - ObjectCacheMsgHeader* head, bufferlist data_buffer); +uint8_t get_header_size(); +uint32_t get_data_len(char* buf); -ObjectCacheRequest* decode_object_cache_request( - bufferlist head_buffer, bufferlist data_buffer); +ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer); } // namespace immutable_obj_cache } // namespace ceph -- 2.39.5