void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (req->type) {
+ switch (req->get_request_type()) {
case RBDSC_REGISTER: {
- ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
- data->type = RBDSC_REGISTER_REPLY;
- req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
- m_cache_server->send(session_id, req);
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+ m_cache_server->send(session_id, reply);
break;
}
case RBDSC_READ: {
- ObjectCacheReadData* req_data = (ObjectCacheReadData*)req->m_data;
- if (m_hit_entry_set.find(req_data->m_oid) == m_hit_entry_set.end()) {
- ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
- data->type = RBDSC_READ_RADOS;
- data->seq = req_data->seq;
- req = encode_object_cache_request(data, RBDSC_READ_RADOS);
+ ObjectCacheReadData* read_req = (ObjectCacheReadData*)req;
+ ObjectCacheRequest* reply = nullptr;
+ if (m_hit_entry_set.find(read_req->m_oid) == m_hit_entry_set.end()) {
+ reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
} else {
- ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
- data->type = RBDSC_READ_REPLY;
- data->seq = req_data->seq;
- req = encode_object_cache_request(data, RBDSC_READ_REPLY);
+ reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/temp/cache/path");
}
- m_cache_server->send(session_id, req);
+ m_cache_server->send(session_id, reply);
break;
}
}
std::string oid_name("this is a oid name");
std::string cache_file_path("/temp/ceph_immutable_object_cache");
- ObjectCacheReadData data;
-
- data.seq = 1UL;
- data.type = RBDSC_READ;
- data.m_read_offset = 222222;
- data.m_read_len = 333333;
- data.m_pool_id = 444444;
- data.m_snap_id = 555555;
- data.m_oid = oid_name;
- data.m_pool_namespace = pool_nspace;
+ uint16_t type = RBDSC_READ;
+ uint64_t seq = 123456UL;
+ uint64_t read_offset = 222222UL;
+ uint64_t read_len = 333333UL;
+ uint64_t pool_id = 444444UL;
+ uint64_t snap_id = 555555UL;
// ObjectRequest --> bufferlist
- ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
+ ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len,
+ pool_id, snap_id, oid_name, pool_nspace);
+ req->encode();
+ auto payload_bl = req->get_payload_bufferlist();
- auto data_bl = req->get_data_buffer();
- uint32_t data_len = get_data_len(data_bl.c_str());
- ASSERT_EQ(data_bl.length(), data_len + get_header_size());
- ASSERT_TRUE(data_bl.c_str() != nullptr);
+ uint32_t data_len = get_data_len(payload_bl.c_str());
+ ASSERT_EQ(payload_bl.length(), data_len + get_header_size());
+ ASSERT_TRUE(payload_bl.c_str() != nullptr);
// bufferlist --> ObjectCacheRequest
- ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl);
- ObjectCacheReadData* reply_data = (ObjectCacheReadData*)(req_decode->m_data);
+ ObjectCacheRequest* req_decode = decode_object_cache_request(payload_bl);
- ASSERT_EQ(req_decode->type, RBDSC_READ);
+ ASSERT_EQ(req_decode->get_request_type(), RBDSC_READ);
- ASSERT_EQ(reply_data->seq, 1UL);
- ASSERT_EQ(reply_data->type, RBDSC_READ);
- ASSERT_EQ(reply_data->m_read_offset, 222222UL);
- ASSERT_EQ(reply_data->m_read_len, 333333UL);
- ASSERT_EQ(reply_data->m_pool_id, 444444UL);
- ASSERT_EQ(reply_data->m_snap_id, 555555UL);
- ASSERT_EQ(reply_data->m_pool_namespace, pool_nspace);
- ASSERT_EQ(reply_data->m_oid, oid_name);
+ ASSERT_EQ(req_decode->type, RBDSC_READ);
+ ASSERT_EQ(req_decode->seq, 123456UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->type, RBDSC_READ);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->seq, 123456UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_offset, 222222UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_read_len, 333333UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_id, 444444UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_snap_id, 555555UL);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_pool_namespace, pool_nspace);
+ ASSERT_EQ(((ObjectCacheReadData*)req_decode)->m_oid, oid_name);
delete req;
delete req_decode;
void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (req->type) {
+ switch (req->get_request_type()) {
case RBDSC_REGISTER: {
- ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
- data->type = RBDSC_REGISTER_REPLY;
- req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY);
- m_cache_server->send(session_id, req);
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
+ req->seq);
+ m_cache_server->send(session_id, reply);
break;
}
case RBDSC_READ: {
- ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
- data->type = RBDSC_READ_REPLY;
- data->seq = req->seq;
- req = encode_object_cache_request(data, RBDSC_READ_REPLY);
- m_cache_server->send(session_id, req);
+ ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+ req->seq);
+ m_cache_server->send(session_id, reply);
break;
}
}
if (m_use_dedicated_worker) {
m_worker = new boost::asio::io_service();
m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
- for(uint64_t i = 0; i < m_worker_thread_num; i++) {
+ for (uint64_t i = 0; i < m_worker_thread_num; i++) {
std::thread* thd = new std::thread([this](){m_worker->run();});
m_worker_threads.push_back(thd);
}
}
if (m_use_dedicated_worker) {
m_worker->stop();
- for(auto thd : m_worker_threads) {
+ for (auto thd : m_worker_threads) {
thd->join();
delete thd;
}
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
- ObjectCacheReadData data;
- data.type = RBDSC_READ;
- data.seq = ++m_sequence_id;
-
- data.m_pool_id = pool_id;
- data.m_snap_id = snap_id;
- data.m_pool_namespace = pool_nspace;
- data.m_oid = oid;
- ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ);
+ ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0,
+ pool_id, snap_id, oid, pool_nspace);
req->m_process_msg = on_finish;
+ req->encode();
{
Mutex::Locker locker(m_lock);
- m_outcoming_bl.append(req->get_data_buffer());
- ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end());
- m_seq_to_req[data.seq] = req;
+ m_outcoming_bl.append(req->get_payload_bufferlist());
+ ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
+ m_seq_to_req[req->seq] = req;
}
// try to send message to server.
// all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
{
Mutex::Locker locker(m_lock);
- for(auto it : m_seq_to_req) {
+ for (auto it : m_seq_to_req) {
it.second->type = RBDSC_READ_RADOS;
it.second->m_process_msg->complete(it.second);
}
}
int CacheClient::register_client(Context* on_finish) {
- ObjectCacheRegData data;
- data.seq = m_sequence_id++;
- data.type = RBDSC_REGISTER;
- ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER);
+ ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++);
+ reg_req->encode();
bufferlist bl;
- bl.append(reg_req->get_data_buffer());
+ bl.append(reg_req->get_payload_bufferlist());
uint64_t ret;
boost::system::error_code ec;
void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
ldout(m_cct, 20) << dendl;
- switch (req->type) {
+ switch (req->get_request_type()) {
case RBDSC_REGISTER: {
// TODO(): skip register and allow clients to lookup directly
- ObjectCacheRegReplyData data;
- data.type = RBDSC_REGISTER_REPLY;
- data.seq = req->seq;
- req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY);
- m_cache_server->send(session_id, req);
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+ m_cache_server->send(session_id, reply);
break;
}
case RBDSC_READ: {
// lookup object in local cache store
- ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data);
std::string cache_path;
- int ret = m_object_cache_store->lookup_object(data->m_pool_namespace,
- data->m_pool_id,
- data->m_snap_id,
- data->m_oid,
+ ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req;
+ int ret = m_object_cache_store->lookup_object(req_read_data->m_pool_namespace,
+ req_read_data->m_pool_id,
+ req_read_data->m_snap_id,
+ req_read_data->m_oid,
cache_path);
+ ObjectCacheRequest* reply = nullptr;
if (ret != OBJ_CACHE_PROMOTED) {
- ObjectCacheReadRadosData reply_data;
- reply_data.type = RBDSC_READ_RADOS;
- reply_data.seq = req->seq;
- req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS);
+ reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
} else {
- ObjectCacheReadReplyData reply_data;
- reply_data.m_cache_path = cache_path;
- reply_data.type = RBDSC_READ_REPLY;
- reply_data.seq = req->seq;
- req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY);
+ reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path);
}
- m_cache_server->send(session_id, req);
+ m_cache_server->send(session_id, reply);
break;
}
default:
ObjectCacheRequest* req = decode_object_cache_request(bl_data);
process(req);
+ delete req;
read_request_header();
}
void CacheSession::send(ObjectCacheRequest* reply) {
ldout(cct, 20) << dendl;
bufferlist bl;
- bl.append(reply->get_data_buffer());
+ reply->encode();
+ bl.append(reply->get_payload_bufferlist());
boost::asio::async_write(m_dm_socket,
boost::asio::buffer(bl.c_str(), bl.length()),
namespace ceph {
namespace immutable_obj_cache {
-void ObjectCacheRegData::encode(bufferlist& bl) {
- ENCODE_START(1, 1, bl);
- ceph::encode(type, bl);
- ceph::encode(seq, bl);
- ENCODE_FINISH(bl);
+ObjectCacheRequest::ObjectCacheRequest(){}
+ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
+ : type(t), seq(s) {}
+ObjectCacheRequest::~ObjectCacheRequest(){}
+
+void ObjectCacheRequest::encode() {
+ ENCODE_START(1, 1, m_payload);
+ ceph::encode(type, m_payload);
+ ceph::encode(seq, m_payload);
+ if (!payload_empty()) {
+ encode_payload();
+ }
+ ENCODE_FINISH(m_payload);
}
-void ObjectCacheRegData::decode(bufferlist& bl) {
+void ObjectCacheRequest::decode(bufferlist& bl) {
auto i = bl.cbegin();
DECODE_START(1, i);
ceph::decode(type, i);
ceph::decode(seq, i);
+ if (!payload_empty()) {
+ decode_payload(i);
+ }
DECODE_FINISH(i);
}
-void ObjectCacheRegReplyData::encode(bufferlist& bl) {
- ENCODE_START(1, 1, bl);
- ceph::encode(type, bl);
- ceph::encode(seq, bl);
- ENCODE_FINISH(bl);
-}
+ObjectCacheRegData::ObjectCacheRegData() {}
+ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
-void ObjectCacheRegReplyData::decode(bufferlist& bl) {
- auto i = bl.cbegin();
- DECODE_START(1, i);
- ceph::decode(type, i);
- ceph::decode(seq, i);
- DECODE_FINISH(i);
-}
+ObjectCacheRegData::~ObjectCacheRegData() {}
+
+void ObjectCacheRegData::encode_payload() {}
+
+void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {}
+
+ObjectCacheRegReplyData::ObjectCacheRegReplyData() {}
+ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
+
+void ObjectCacheRegReplyData::encode_payload() {}
+
+void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
+ uint64_t read_offset, uint64_t read_len,
+ uint64_t pool_id, uint64_t snap_id,
+ std::string oid, std::string pool_namespace)
+ : ObjectCacheRequest(t, s), m_read_offset(read_offset),
+ m_read_len(read_len), m_pool_id(pool_id), m_snap_id(snap_id),
+ m_oid(oid), m_pool_namespace(pool_namespace)
+{}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadData::~ObjectCacheReadData() {}
-void ObjectCacheReadData::encode(bufferlist& bl) {
- ENCODE_START(1, 1, bl);
- ceph::encode(type, bl);
- ceph::encode(seq, bl);
- ceph::encode(m_read_offset, bl);
- ceph::encode(m_read_len, bl);
- ceph::encode(m_pool_id, bl);
- ceph::encode(m_snap_id, bl);
- ceph::encode(m_oid, bl);
- ceph::encode(m_pool_namespace, bl);
- ENCODE_FINISH(bl);
+void ObjectCacheReadData::encode_payload() {
+ ceph::encode(m_read_offset, m_payload);
+ ceph::encode(m_read_len, m_payload);
+ ceph::encode(m_pool_id, m_payload);
+ ceph::encode(m_snap_id, m_payload);
+ ceph::encode(m_oid, m_payload);
+ ceph::encode(m_pool_namespace, m_payload);
}
-void ObjectCacheReadData::decode(bufferlist& bl) {
- auto i = bl.cbegin();
- DECODE_START(1, i);
- ceph::decode(type, i);
- ceph::decode(seq, i);
+void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
ceph::decode(m_read_offset, i);
ceph::decode(m_read_len, i);
ceph::decode(m_pool_id, i);
ceph::decode(m_snap_id, i);
ceph::decode(m_oid, i);
ceph::decode(m_pool_namespace, i);
- DECODE_FINISH(i);
}
-void ObjectCacheReadReplyData::encode(bufferlist& bl) {
- ENCODE_START(1, 1, bl);
- ceph::encode(type, bl);
- ceph::encode(seq, bl);
- ceph::encode(m_cache_path, bl);
- ENCODE_FINISH(bl);
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path)
+ : ObjectCacheRequest(t, s), m_cache_path(cache_path) {}
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {}
+
+void ObjectCacheReadReplyData::encode_payload() {
+ ceph::encode(m_cache_path, m_payload);
}
-void ObjectCacheReadReplyData::decode(bufferlist& bl) {
- auto i = bl.cbegin();
- DECODE_START(1, i);
- ceph::decode(type, i);
- ceph::decode(seq, i);
+void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) {
ceph::decode(m_cache_path, i);
- DECODE_FINISH(i);
}
-void ObjectCacheReadRadosData::encode(bufferlist& bl) {
- ENCODE_START(1, 1, bl);
- ceph::encode(type, bl);
- ceph::encode(seq, bl);
- ENCODE_FINISH(bl);
-}
+ObjectCacheReadRadosData::ObjectCacheReadRadosData() {}
+ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
-void ObjectCacheReadRadosData::decode(bufferlist& bl) {
- auto i = bl.cbegin();
- DECODE_START(1, i);
- ceph::decode(type, i);
- ceph::decode(seq, i);
- DECODE_FINISH(i);
-}
+ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
-uint8_t get_header_size() {
- return 6; //uint8_t + uint8_t + uint32_t
-}
+void ObjectCacheReadRadosData::encode_payload() {}
-struct encode_header{
- uint8_t v;
- uint8_t c_v;
- uint32_t len;
-}__attribute__((packed));
+void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
-uint32_t get_data_len(char* buf) {
- encode_header* header = (encode_header*)buf;
- return header->len;
-}
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
+{
+ ObjectCacheRequest* req = nullptr;
-uint16_t get_data_type(bufferlist buf) {
uint16_t type;
- auto i = buf.cbegin();
+ uint64_t seq;
+ auto i = payload_buffer.cbegin();
DECODE_START(1, i);
- decode(type, i);
+ ceph::decode(type, i);
+ ceph::decode(seq, i);
DECODE_FINISH(i);
- return type;
-}
-
-bufferlist ObjectCacheRequest::get_data_buffer() {
- return m_data_buffer;
-}
-
-ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) {
- ObjectCacheRequest* req = new ObjectCacheRequest();
-
- switch(type) {
- case RBDSC_REGISTER: {
- ObjectCacheRegData* data = (ObjectCacheRegData*)m_data;
- data->encode(req->m_data_buffer);
- break;
- }
- case RBDSC_REGISTER_REPLY: {
- ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data;
- data->encode(req->m_data_buffer);
- break;
- }
- case RBDSC_READ: {
- ObjectCacheReadData* data = (ObjectCacheReadData*)m_data;
- data->encode(req->m_data_buffer);
- break;
- }
- case RBDSC_READ_RADOS: {
- ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data;
- data->encode(req->m_data_buffer);
- break;
- }
- case RBDSC_READ_REPLY: {
- ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data;
- data->encode(req->m_data_buffer);
- break;
- }
- default:
- ceph_assert(0);
- }
-
- req->type = type;
- return req;
-}
-
-ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) {
- ObjectCacheRequest* req = new ObjectCacheRequest();
- uint16_t type = get_data_type(data_buffer);
- uint64_t seq;
switch(type) {
case RBDSC_REGISTER: {
- ObjectCacheRegData* data = new ObjectCacheRegData();
- data->decode(data_buffer);
- seq = data->seq;
- req->m_data = data;
+ req = new ObjectCacheRegData(type, seq);
break;
}
case RBDSC_READ: {
- ObjectCacheReadData* data = new ObjectCacheReadData();
- data->decode(data_buffer);
- seq = data->seq;
- req->m_data = data;
+ req = new ObjectCacheReadData(type, seq);
break;
}
case RBDSC_REGISTER_REPLY: {
- ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData();
- data->decode(data_buffer);
- seq = data->seq;
- req->m_data = data;
+ req = new ObjectCacheRegReplyData(type, seq);
break;
}
case RBDSC_READ_REPLY: {
- ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData();
- data->decode(data_buffer);
- seq = data->seq;
- req->m_data = data;
+ req = new ObjectCacheReadReplyData(type, seq);
break;
}
case RBDSC_READ_RADOS: {
- ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData();
- data->decode(data_buffer);
- seq = data->seq;
- req->m_data = data;
+ req = new ObjectCacheReadRadosData(type, seq);
break;
}
default:
ceph_assert(0);
}
- req->type = type;
- req->seq = seq;
+ req->decode(payload_buffer);
+
return req;
}
#include "include/encoding.h"
#include "include/Context.h"
+#include "SocketCommon.h"
namespace ceph {
namespace immutable_obj_cache {
-class ObjectCacheRegData {
+namespace {
+struct HeaderHelper {
+ uint8_t v;
+ uint8_t c_v;
+ uint32_t len;
+}__attribute__((packed));
+
+inline uint8_t get_header_size() {
+ return sizeof(HeaderHelper);
+}
+
+inline uint32_t get_data_len(char* buf) {
+ HeaderHelper* header = (HeaderHelper*)buf;
+ return header->len;
+}
+}
+
+class ObjectCacheRequest {
public:
uint16_t type;
uint64_t seq;
- void encode(bufferlist& bl);
+ bufferlist m_payload;
+
+ GenContext<ObjectCacheRequest*>* m_process_msg;
+
+ ObjectCacheRequest();
+ ObjectCacheRequest(uint16_t type, uint64_t seq);
+ virtual ~ObjectCacheRequest();
+
+ // encode consists of two steps
+ // step 1 : directly encode common bits using encode method of base classs.
+ // step 2 : according to payload_empty, determine whether addtional bits need to
+ // be encoded which be implements by child class.
+ void encode();
void decode(bufferlist& bl);
+ bufferlist get_payload_bufferlist() { return m_payload; }
+
+ virtual void encode_payload() = 0;
+ virtual void decode_payload(bufferlist::const_iterator bl_it) = 0;
+ virtual uint16_t get_request_type() = 0;
+ virtual bool payload_empty() = 0;
};
-class ObjectCacheRegReplyData {
+class ObjectCacheRegData : public ObjectCacheRequest {
public:
- uint16_t type;
- uint64_t seq;
+ ObjectCacheRegData();
+ ObjectCacheRegData(uint16_t t, uint64_t s);
+ ~ObjectCacheRegData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl) override;
+ uint16_t get_request_type() override { return RBDSC_REGISTER; }
+ bool payload_empty() override { return true; }
+};
- void encode(bufferlist& bl);
- void decode(bufferlist& bl);
+class ObjectCacheRegReplyData : public ObjectCacheRequest {
+public:
+ ObjectCacheRegReplyData();
+ ObjectCacheRegReplyData(uint16_t t, uint64_t s);
+ ~ObjectCacheRegReplyData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator iter) override;
+ uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
+ bool payload_empty() override { return true; }
};
-class ObjectCacheReadData {
+class ObjectCacheReadData : public ObjectCacheRequest {
public:
- uint16_t type;
- uint64_t seq;
uint64_t m_read_offset;
uint64_t m_read_len;
uint64_t m_pool_id;
uint64_t m_snap_id;
std::string m_oid;
std::string m_pool_namespace;
-
- void encode(bufferlist& bl);
- void decode(bufferlist& bl);
+ ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id,
+ uint64_t snap_id, std::string oid, std::string pool_namespace );
+ ObjectCacheReadData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl) override;
+ uint16_t get_request_type() override { return RBDSC_READ; }
+ bool payload_empty() override { return false; }
};
-class ObjectCacheReadReplyData {
+class ObjectCacheReadReplyData : public ObjectCacheRequest {
public:
- uint16_t type;
- uint64_t seq;
std::string m_cache_path;
-
- void encode(bufferlist& bl);
- void decode(bufferlist& bl);
+ ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
+ ObjectCacheReadReplyData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadReplyData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl) override;
+ uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
+ bool payload_empty() override { return false; }
};
-class ObjectCacheReadRadosData {
+class ObjectCacheReadRadosData : public ObjectCacheRequest {
public:
- uint16_t type;
- uint64_t seq;
-
- void encode(bufferlist& bl);
- void decode(bufferlist& bl);
+ ObjectCacheReadRadosData();
+ ObjectCacheReadRadosData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadRadosData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl) override;
+ uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
+ bool payload_empty() override { return true; }
};
-class ObjectCacheRequest {
-public:
- uint64_t seq;
- uint16_t type;
- void* m_data;
- bufferlist m_data_buffer;
- GenContext<ObjectCacheRequest*>* m_process_msg;
-
- bufferlist get_data_buffer();
-};
-
-uint8_t get_header_size();
-uint32_t get_data_len(char* buf);
-
-ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type);
-ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer);
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
} // namespace immutable_obj_cache
} // namespace ceph