void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (req->m_head.type) {
+ switch (req->m_data.type) {
case RBDSC_REGISTER: {
- req->m_head.type = RBDSC_REGISTER_REPLY;
+ req->m_data.type = RBDSC_REGISTER_REPLY;
m_cache_server->send(session_id, req);
break;
}
case RBDSC_READ: {
if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) {
- req->m_head.type = RBDSC_READ_RADOS;
+ req->m_data.type = RBDSC_READ_RADOS;
} else {
- req->m_head.type = RBDSC_READ_REPLY;
+ req->m_data.type = RBDSC_READ_REPLY;
}
m_cache_server->send(session_id, req);
break;
bool hit;
auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){
- hit = ack->m_head.type == RBDSC_READ_REPLY;
+ hit = ack->m_data.type == RBDSC_READ_REPLY;
m_wait_event.signal();
});
m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, ctx);
std::string oid_name("this is a oid name");
std::string cache_file_path("/temp/ceph_immutable_object_cache");
- ObjectCacheMsgHeader header;
- header.seq = 1;
- header.type = 2;
- header.version =3;
- header.data_len = 0;
- header.reserved = 5;
-
ObjectCacheRequest req;
- ASSERT_EQ(req.m_head_buffer.length(), 0);
- ASSERT_EQ(req.m_data_buffer.length(), 0);
-
- req.m_head = header;
-
+ req.m_data.seq = 1;
+ req.m_data.type = 2;
req.m_data.m_read_offset = 222222;
req.m_data.m_read_len = 333333;
req.m_data.m_pool_id = 444444;
// ObjectRequest --> bufferlist
req.encode();
- ASSERT_EQ(req.m_head_buffer.length(), sizeof(req.m_head));
-
- ObjectCacheRequest* req_decode;
-
- auto head_bl = req.get_head_buffer();
- auto data_bl = req.get_data_buffer();
// bufferlist --> ObjectCacheRequest
- req_decode = decode_object_cache_request(head_bl, data_bl);
+ auto data_bl = req.get_data_buffer();
+ uint32_t data_len = get_data_len(data_bl.c_str());
+ ASSERT_EQ(data_bl.length(), data_len + get_header_size());
- ASSERT_EQ(req_decode->m_head.seq, header.seq);
- ASSERT_EQ(req_decode->m_head.seq, 1);
- ASSERT_EQ(req_decode->m_head.type, header.type);
- ASSERT_EQ(req_decode->m_head.type, 2);
- ASSERT_EQ(req_decode->m_head.version, header.version);
- ASSERT_EQ(req_decode->m_head.version, 3);
- ASSERT_EQ(req_decode->m_head.data_len, req.m_data_buffer.length());
- ASSERT_EQ(req_decode->m_head.data_len, data_bl.length());
- ASSERT_EQ(req_decode->m_head.reserved, header.reserved);
- ASSERT_EQ(req_decode->m_head.reserved, 5);
+ ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl);
- ASSERT_EQ(req_decode->m_data.m_read_offset, req.m_data.m_read_offset);
+ ASSERT_EQ(req_decode->m_data.seq, 1);
ASSERT_EQ(req_decode->m_data.m_read_offset, 222222);
- ASSERT_EQ(req_decode->m_data.m_read_len, req.m_data.m_read_len);
ASSERT_EQ(req_decode->m_data.m_read_len, 333333);
- ASSERT_EQ(req_decode->m_data.m_pool_namespace, req.m_data.m_pool_namespace);
ASSERT_EQ(req_decode->m_data.m_pool_namespace, pool_nspace);
- ASSERT_EQ(req_decode->m_data.m_cache_path, req.m_data.m_cache_path);
ASSERT_EQ(req_decode->m_data.m_cache_path, cache_file_path);
- ASSERT_EQ(req_decode->m_data.m_oid, req.m_data.m_oid);
ASSERT_EQ(req_decode->m_data.m_oid, oid_name);
delete req_decode;
void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (req->m_head.type) {
+ switch (req->m_data.type) {
case RBDSC_REGISTER: {
- req->m_head.type = RBDSC_REGISTER_REPLY;
+ req->m_data.type = RBDSC_REGISTER_REPLY;
m_cache_server->send(session_id, req);
break;
}
case RBDSC_READ: {
- req->m_head.type = RBDSC_READ_REPLY;
+ req->m_data.type = RBDSC_READ_REPLY;
m_cache_server->send(session_id, req);
break;
}
m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
m_io_thread(nullptr), m_session_work(false), m_writing(false),
m_reading(false), m_sequence_id(0),
- m_lock("ceph::cache::cacheclient::m_lock"),
- m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)])
+ m_lock("ceph::cache::cacheclient::m_lock")
{
// TODO : configure it.
m_use_dedicated_worker = true;
m_worker_threads.push_back(thd);
}
}
+ m_bp_header = buffer::create(get_header_size());
+
}
CacheClient::~CacheClient() {
stop();
- delete m_header_buffer;
}
void CacheClient::run(){
std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
ObjectCacheRequest* req = new ObjectCacheRequest();
- req->m_head.version = 0;
- req->m_head.reserved = 0;
- req->m_head.type = RBDSC_READ;
- req->m_head.padding = 0;
- req->m_head.seq = ++m_sequence_id;
+ req->m_data.type = RBDSC_READ;
+ req->m_data.seq = ++m_sequence_id;
req->m_data.m_pool_id = pool_id;
req->m_data.m_snap_id = snap_id;
req->m_process_msg = on_finish;
req->encode();
- ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader));
- ceph_assert(req->get_data_buffer().length() == req->m_head.data_len);
-
- {
+ {
Mutex::Locker locker(m_lock);
- m_outcoming_bl.append(req->get_head_buffer());
m_outcoming_bl.append(req->get_data_buffer());
- ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end());
- m_seq_to_req[req->m_head.seq] = req;
+ ceph_assert(m_seq_to_req.find(req->m_data.seq) == m_seq_to_req.end());
+ m_seq_to_req[req->m_data.seq] = req;
}
// try to send message to server.
void CacheClient::read_reply_header() {
/* create new head buffer for every reply */
- bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
+ bufferptr bp_head(buffer::create(get_header_size()));
auto raw_ptr = bp_head.c_str();
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(raw_ptr, sizeof(ObjectCacheMsgHeader)),
- boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+ boost::asio::buffer(raw_ptr, get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
boost::bind(&CacheClient::handle_reply_header,
this, bp_head,
boost::asio::placeholders::error,
void CacheClient::handle_reply_header(bufferptr bp_head,
const boost::system::error_code& ec,
size_t bytes_transferred) {
- if (ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ if (ec || bytes_transferred != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return;
}
ceph_assert(bytes_transferred == bp_head.length());
- ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
- uint64_t data_len = head->data_len;
- uint64_t seq_id = head->seq;
- ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ uint32_t data_len = get_data_len(bp_head.c_str());
bufferptr bp_data(buffer::create(data_len));
- read_reply_data(std::move(bp_head), std::move(bp_data), data_len, seq_id);
+ read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
}
void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
- const uint64_t data_len, const uint64_t seq_id) {
+ const uint64_t data_len) {
auto raw_ptr = bp_data.c_str();
boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
boost::asio::transfer_exactly(data_len),
boost::bind(&CacheClient::handle_reply_data,
- this, std::move(bp_head), std::move(bp_data), data_len, seq_id,
+ this, std::move(bp_head), std::move(bp_data), data_len,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
- const uint64_t data_len, const uint64_t seq_id,
+ const uint64_t data_len,
const boost::system::error_code& ec,
size_t bytes_transferred) {
if (ec || bytes_transferred != data_len) {
fault(ASIO_ERROR_WRITE, ec);
return;
}
+ ceph_assert(bp_data.length() == data_len);
- bufferlist head_buffer;
bufferlist data_buffer;
- head_buffer.append(std::move(bp_head));
+ data_buffer.append(std::move(bp_head));
data_buffer.append(std::move(bp_data));
- ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
- ceph_assert(data_buffer.length() == data_len);
- ObjectCacheRequest* reply = decode_object_cache_request(head_buffer, data_buffer);
+ ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
data_buffer.clear();
ceph_assert(data_buffer.length() == 0);
- process(reply, seq_id);
+ process(reply, reply->m_data.seq);
{
Mutex::Locker locker(m_lock);
{
Mutex::Locker locker(m_lock);
for(auto it : m_seq_to_req) {
- it.second->m_head.type = RBDSC_READ_RADOS;
+ it.second->m_data.type = RBDSC_READ_RADOS;
it.second->m_process_msg->complete(it.second);
}
m_seq_to_req.clear();
int CacheClient::register_client(Context* on_finish) {
ObjectCacheRequest* message = new ObjectCacheRequest();
- message->m_head.version = 0;
- message->m_head.seq = m_sequence_id++;
- message->m_head.type = RBDSC_REGISTER;
- message->m_head.reserved = 0;
+ message->m_data.seq = m_sequence_id++;
+ message->m_data.type = RBDSC_REGISTER;
message->encode();
bufferlist bl;
- bl.append(message->get_head_buffer());
bl.append(message->get_data_buffer());
uint64_t ret;
}
ret = boost::asio::read(m_dm_socket,
- boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec);
- if (ec || ret != sizeof(ObjectCacheMsgHeader)) {
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
+ if (ec || ret != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return -1;
}
- ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)m_header_buffer;
- uint64_t data_len = head->data_len;
+ uint64_t data_len = get_data_len(m_bp_header.c_str());
bufferptr bp_data(buffer::create(data_len));
ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
}
bufferlist data_buffer;
+ data_buffer.append(m_bp_header);
data_buffer.append(std::move(bp_data));
- ObjectCacheRequest* req = decode_object_cache_request(head, data_buffer);
- if (req->m_head.type == RBDSC_REGISTER_REPLY) {
+ ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
+ if (req->m_data.type == RBDSC_REGISTER_REPLY) {
on_finish->complete(true);
} else {
on_finish->complete(false);
void handle_reply_header(bufferptr bp_head,
const boost::system::error_code& ec, size_t bytes_transferred);
void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
- const uint64_t data_len, const uint64_t seq_id);
+ const uint64_t data_len);
void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
- const uint64_t data_len, const uint64_t seq_id,
+ const uint64_t data_len,
const boost::system::error_code& ec, size_t bytes_transferred);
private:
Mutex m_lock;
std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
bufferlist m_outcoming_bl;
- char* m_header_buffer;
+ bufferptr m_bp_header;
};
} // namespace immutable_obj_cache
void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
ldout(m_cct, 20) << dendl;
- switch (req->m_head.type) {
+ switch (req->m_data.type) {
case RBDSC_REGISTER: {
// TODO(): skip register and allow clients to lookup directly
- req->m_head.type = RBDSC_REGISTER_REPLY;
+ req->m_data.type = RBDSC_REGISTER_REPLY;
m_cache_server->send(session_id, req);
break;
req->m_data.m_oid,
req->m_data.m_cache_path);
if (ret < 0) {
- req->m_head.type = RBDSC_READ_RADOS;
+ req->m_data.type = RBDSC_READ_RADOS;
} else {
- req->m_head.type = RBDSC_READ_REPLY;
+ req->m_data.type = RBDSC_READ_REPLY;
}
m_cache_server->send(session_id, req);
CacheSession::CacheSession(uint64_t session_id, io_service& io_service,
ProcessMsg processmsg, CephContext* cct)
: m_session_id(session_id), m_dm_socket(io_service),
- m_head_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
- m_server_process_msg(processmsg), cct(cct)
- {}
+ m_server_process_msg(processmsg), cct(cct) {
+ m_bp_header = buffer::create(get_header_size());
+}
CacheSession::~CacheSession() {
close();
- delete[] m_head_buffer;
}
stream_protocol::socket& CacheSession::socket() {
void CacheSession::read_request_header() {
ldout(cct, 20) << dendl;
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(m_head_buffer, sizeof(ObjectCacheMsgHeader)),
- boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
boost::bind(&CacheSession::handle_request_header,
shared_from_this(),
boost::asio::placeholders::error,
void CacheSession::handle_request_header(const boost::system::error_code& err,
size_t bytes_transferred) {
ldout(cct, 20) << dendl;
- if (err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ if (err || bytes_transferred != get_header_size()) {
fault();
return;
}
- ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer);
- ceph_assert(head->version == 0);
- ceph_assert(head->reserved == 0);
- ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ);
-
- read_request_data(head->data_len);
+ read_request_data(get_data_len(m_bp_header.c_str()));
}
void CacheSession::read_request_data(uint64_t data_len) {
}
bufferlist bl_data;
+
+ bl_data.append(m_bp_header);
bl_data.append(std::move(bp));
- ObjectCacheRequest* req = decode_object_cache_request(
- (ObjectCacheMsgHeader*)m_head_buffer, bl_data);
+ ObjectCacheRequest* req = decode_object_cache_request(bl_data);
process(req);
read_request_header();
}
void CacheSession::process(ObjectCacheRequest* req) {
ldout(cct, 20) << dendl;
- m_server_process_msg(m_session_id, req);
+ m_server_process_msg(m_session_id, req);
}
void CacheSession::send(ObjectCacheRequest* reply) {
ldout(cct, 20) << dendl;
- reply->m_head_buffer.clear();
reply->m_data_buffer.clear();
reply->encode();
bufferlist bl;
- bl.append(reply->get_head_buffer());
bl.append(reply->get_data_buffer());
boost::asio::async_write(m_dm_socket,
private:
uint64_t m_session_id;
stream_protocol::socket m_dm_socket;
- char* m_head_buffer;
ProcessMsg m_server_process_msg;
CephContext* cct;
+
+ bufferptr m_bp_header;
};
typedef std::shared_ptr<CacheSession> CacheSessionPtr;
namespace ceph {
namespace immutable_obj_cache {
-void ObjectCacheMsgHeader::encode(bufferlist& bl) const {
- ceph::encode(seq, bl);
- ceph::encode(type, bl);
- ceph::encode(version, bl);
- ceph::encode(padding, bl);
- ceph::encode(data_len, bl);
- ceph::encode(reserved, bl);
-}
-
-void ObjectCacheMsgHeader::decode(bufferlist::const_iterator& it) {
- ceph::decode(seq, it);
- ceph::decode(type, it);
- ceph::decode(version, it);
- ceph::decode(padding, it);
- ceph::decode(data_len, it);
- ceph::decode(reserved, it);
-}
-
void ObjectCacheMsgData::encode(bufferlist& bl) {
ENCODE_START(1, 1, bl);
+ ceph::encode(seq, bl);
+ ceph::encode(type, bl);
ceph::encode(m_read_offset, bl);
ceph::encode(m_read_len, bl);
ceph::encode(m_pool_id, bl);
void ObjectCacheMsgData::decode(bufferlist& bl) {
auto i = bl.cbegin();
DECODE_START(1, i);
+ ceph::decode(seq, i);
+ ceph::decode(type, i);
ceph::decode(m_read_offset, i);
ceph::decode(m_read_len, i);
ceph::decode(m_pool_id, i);
void ObjectCacheRequest::encode() {
m_data.encode(m_data_buffer);
- m_head.data_len = m_data_buffer.length();
- ceph_assert(m_head_buffer.length() == 0);
- m_head.encode(m_head_buffer);
- ceph_assert(sizeof(ObjectCacheMsgHeader) == m_head_buffer.length());
}
-bufferlist ObjectCacheRequest::get_head_buffer() {
- return m_head_buffer;
+uint8_t get_header_size() {
+ //return sizeof(ObjectCacheMsgHeader);
+ return 6;
+}
+
+struct encode_header{
+ uint8_t v;
+ uint8_t c_v;
+ uint32_t len;
+}__attribute__((packed));
+
+uint32_t get_data_len(char* buf) {
+ encode_header* header = (encode_header*)buf;
+ return header->len;
}
+
bufferlist ObjectCacheRequest::get_data_buffer() {
return m_data_buffer;
}
-ObjectCacheRequest* decode_object_cache_request(
- ObjectCacheMsgHeader* head, bufferlist data_buffer) {
+ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) {
ObjectCacheRequest* req = new ObjectCacheRequest();
- req->m_head = *head;
- ceph_assert(req->m_head.data_len == data_buffer.length());
req->m_data.decode(data_buffer);
return req;
}
-ObjectCacheRequest* decode_object_cache_request(
- bufferlist head_buffer, bufferlist data_buffer) {
- return decode_object_cache_request((ObjectCacheMsgHeader*)(head_buffer.c_str()), data_buffer);
-}
-
} // namespace immutable_obj_cache
} // namespace ceph
namespace ceph {
namespace immutable_obj_cache {
-struct ObjectCacheMsgHeader {
- uint64_t seq; /* sequence id */
- uint16_t type; /* msg type */
- uint16_t version; /* object cache version */
- uint32_t padding;
- uint32_t data_len;
- uint32_t reserved;
-
- void encode(bufferlist& bl) const;
- void decode(bufferlist::const_iterator& it);
-};
-
class ObjectCacheMsgData {
public:
+ uint64_t seq; /* sequence id */
+ uint16_t type; /* msg type */
uint64_t m_read_offset;
uint64_t m_read_len;
uint64_t m_pool_id;
class ObjectCacheRequest {
public:
- ObjectCacheMsgHeader m_head;
ObjectCacheMsgData m_data;
- bufferlist m_head_buffer;
bufferlist m_data_buffer;
GenContext<ObjectCacheRequest*>* m_process_msg;
void encode();
- bufferlist get_head_buffer();
bufferlist get_data_buffer();
};
-ObjectCacheRequest* decode_object_cache_request(
- ObjectCacheMsgHeader* head, bufferlist data_buffer);
+uint8_t get_header_size();
+uint32_t get_data_len(char* buf);
-ObjectCacheRequest* decode_object_cache_request(
- bufferlist head_buffer, bufferlist data_buffer);
+ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer);
} // namespace immutable_obj_cache
} // namespace ceph