test_main.cc
test_SimplePolicy.cc
test_sync_file.cc
- test_DomainSocket.cc
- test_multi_session.cc
+ #test_DomainSocket.cc // TODO
+ #test_multi_session.cc // TODO
test_object_store.cc
test_message.cc
)
header.seq = 1;
header.type = 2;
header.version =3;
- header.mid_len =4;
header.data_len = 5;
header.reserved = 6;
ObjectCacheRequest req;
ASSERT_EQ(req.m_head_buffer.length(), 0);
- ASSERT_EQ(req.m_mid_buffer.length(), 0);
ASSERT_EQ(req.m_data_buffer.length(), 0);
req.m_head = header;
- req.m_mid.m_image_size = 111111;
- req.m_mid.m_read_offset = 222222;
- req.m_mid.m_read_len = 333333;
- req.m_mid.m_pool_name = pool_name;
- req.m_mid.m_image_name = image_name;
- req.m_mid.m_oid = oid_name;
+ req.m_data.m_image_size = 111111;
+ req.m_data.m_read_offset = 222222;
+ req.m_data.m_read_len = 333333;
+ req.m_data.m_pool_name = pool_name;
+ req.m_data.m_image_name = image_name;
+ req.m_data.m_oid = oid_name;
req.encode();
ASSERT_EQ(req.m_head_buffer.length(), sizeof(req.m_head));
- ASSERT_EQ(req.m_data_buffer.length(), 0);
ObjectCacheRequest* req_decode;
auto x = req.get_head_buffer();
- auto y = req.get_mid_buffer();
auto z = req.get_data_buffer();
- req_decode = decode_object_cache_request(x, y, z);
+ req_decode = decode_object_cache_request(x, z);
ASSERT_EQ(req_decode->m_head.seq, header.seq);
ASSERT_EQ(req_decode->m_head.type, header.type);
ASSERT_EQ(req_decode->m_head.version, header.version);
- ASSERT_EQ(req_decode->m_head.mid_len, req.m_mid_buffer.length());
ASSERT_EQ(req_decode->m_head.data_len, req.m_data_buffer.length());
ASSERT_EQ(req_decode->m_head.reserved, header.reserved);
- ASSERT_EQ(req_decode->m_mid.m_image_size, 111111);
- ASSERT_EQ(req_decode->m_mid.m_read_offset, 222222);
- ASSERT_EQ(req_decode->m_mid.m_read_len, 333333);
- ASSERT_EQ(req_decode->m_mid.m_pool_name, pool_name);
- ASSERT_EQ(req_decode->m_mid.m_image_name, image_name);
- ASSERT_EQ(req_decode->m_mid.m_oid, oid_name);
+ ASSERT_EQ(req_decode->m_data.m_image_size, 111111);
+ ASSERT_EQ(req_decode->m_data.m_read_offset, 222222);
+ ASSERT_EQ(req_decode->m_data.m_read_len, 333333);
+ ASSERT_EQ(req_decode->m_data.m_pool_name, pool_name);
+ ASSERT_EQ(req_decode->m_data.m_image_name, image_name);
+ ASSERT_EQ(req_decode->m_data.m_oid, oid_name);
}
m_ep(stream_protocol::endpoint(file)),
m_io_thread(nullptr),
m_session_work(false),
+ m_writing(false),
+ m_lock("ceph::cache::cacheclient::m_lock"),
+ m_map_lock("ceph::cache::cacheclient::m_map_lock"),
+ m_sequence_id(0),
+ m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
cct(ceph_ctx)
- {}
+ {
+ // TODO : release these resources.
+ m_use_dedicated_worker = true;
+ // TODO : configure it.
+ m_worker_thread_num = 2;
+ if(m_use_dedicated_worker) {
+ m_worker = new boost::asio::io_service();
+ m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
+ for(int i = 0; i < m_worker_thread_num; i++) {
+ std::thread* thd = new std::thread([this](){m_worker->run();});
+ m_worker_threads.push_back(thd);
+ }
+ }
+ }
CacheClient::~CacheClient() {
stop();
+ delete m_header_buffer;
}
void CacheClient::run(){
return 0;
}
+ void CacheClient::lookup_object(ObjectCacheRequest* req) {
+
+ req->m_head.seq = m_sequence_id++;
+ req->encode();
+
+ ceph_assert(req->get_head_buffer().length() == sizeof(ObjectCacheMsgHeader));
+ ceph_assert(req->get_data_buffer().length() == req->m_head.data_len);
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_outcoming_bl.append(req->get_head_buffer());
+ m_outcoming_bl.append(req->get_data_buffer());
+ }
+
+ {
+ Mutex::Locker locker(m_map_lock);
+ ceph_assert(m_seq_to_req.find(req->m_head.seq) == m_seq_to_req.end());
+ m_seq_to_req[req->m_head.seq] = req;
+ }
+
+ // try to send message to server.
+ try_send();
+
+ // try to receive ack from server.
+ try_receive();
+ }
+
+ void CacheClient::try_send() {
+ if(!m_writing.load()) {
+ m_writing.store(true);
+ send_message();
+ }
+ }
+
+ void CacheClient::send_message() {
+ bufferlist bl;
+ {
+ Mutex::Locker locker(m_lock);
+ bl.swap(m_outcoming_bl);
+ ceph_assert(m_outcoming_bl.length() == 0);
+ }
+
+ // send bytes as many as possible.
+ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()),
+ boost::asio::transfer_exactly(bl.length()),
+ [this, bl](const boost::system::error_code& err, size_t cb) {
+ if(err || cb != bl.length()) {
+ fault(ASIO_ERROR_WRITE, err);
+ return;
+ }
+ ceph_assert(cb == bl.length());
+
+ {
+ Mutex::Locker locker(m_lock);
+ if(m_outcoming_bl.length() == 0) {
+ m_writing.store(false);
+ return;
+ }
+ }
+
+ // still have left bytes, continue to send.
+ send_message();
+ });
+ try_receive();
+ }
+
+ void CacheClient::try_receive() {
+ if(!m_reading.load()) {
+ m_reading.store(true);
+ receive_message();
+ }
+ }
+
+ void CacheClient::receive_message() {
+ ceph_assert(m_reading.load());
+
+ /* one head buffer for all arrived reply. */
+ // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer));
+
+ /* create new head buffer for every reply */
+ bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
+
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(bp_head.c_str(), sizeof(ObjectCacheMsgHeader)),
+ boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+ [this, bp_head](const boost::system::error_code& err, size_t cb) {
+ if(err || cb != sizeof(ObjectCacheMsgHeader)) {
+ fault(ASIO_ERROR_READ, err);
+ return;
+ }
+
+ ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)bp_head.c_str();
+ uint64_t data_len = head->data_len;
+ uint64_t seq_id = head->seq;
+ bufferptr bp_data(buffer::create(data_len));
+
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(bp_data.c_str(), data_len),
+ boost::asio::transfer_exactly(data_len),
+ [this, bp_data, bp_head, data_len, seq_id](const boost::system::error_code& err, size_t cb) {
+ if(err || cb != data_len) {
+ fault(ASIO_ERROR_READ, err);
+ return;
+ }
+
+ bufferlist head_buffer;
+ head_buffer.append(std::move(bp_head));
+ bufferlist data_buffer;
+ data_buffer.append(std::move(bp_data));
+
+ ceph_assert(head_buffer.length() == sizeof(ObjectCacheMsgHeader));
+ ceph_assert(data_buffer.length() == data_len);
+
+ // create reply message which have been decoded according to bufferlist
+ ObjectCacheRequest* ack = decode_object_cache_request(head_buffer, data_buffer);
+
+ data_buffer.clear();
+ ceph_assert(data_buffer.length() == 0);
+
+ auto process_current_reply = m_seq_to_req[ack->m_head.seq]->m_process_msg;
+
+ // if hit, this context will read file from local cache.
+ auto user_ctx = new FunctionContext([this, ack, process_current_reply]
+ (bool dedicated) {
+ if(dedicated) {
+ // current thread belog to worker.
+ }
+
+ process_current_reply->complete(ack);
+ delete ack;
+ });
+
+ // Because user_ctx will read file and execute their callback, we think it hold on current thread
+ // for long time, then defer read/write message from/to socket.
+ // if want to use dedicated thread to execute this context, enable it.
+ if(m_use_dedicated_worker) {
+ m_worker->post([user_ctx](){
+ user_ctx->complete(true);
+ });
+ } else {
+ // use read/write thread to execute this context.
+ user_ctx->complete(false);
+ }
+
+ {
+ Mutex::Locker locker(m_map_lock);
+ ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ m_seq_to_req.erase(seq_id);
+ if(m_seq_to_req.size() == 0) {
+ m_reading.store(false);
+ return;
+ }
+ }
+
+ receive_message();
+ });
+ });
+ }
+
+
+ void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
+ ldout(cct, 20) << "fault." << ec.message() << dendl;
+ // if one request fails, just call its callback, then close this socket.
+ if(!m_session_work.load()) {
+ return;
+ }
+
+ // when current session don't work, ASIO will don't receive any new request from hook.
+ // On the other hand, for pending request of ASIO, cancle these request, then call their callback.
+ // there request which are cancled by fault, will be re-dispatched to RADOS layer.
+ //
+ // make sure just have one thread to modify execute below code.
+ m_session_work.store(false);
+
+ if(err_type == ASIO_ERROR_MSG_INCOMPLETE) {
+ ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+ ceph_assert(0);
+ }
+
+ if(err_type == ASIO_ERROR_READ) {
+ ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+ }
+
+ if(err_type == ASIO_ERROR_WRITE) {
+ ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+ // should not occur this error.
+ ceph_assert(0);
+ }
+
+ if(err_type == ASIO_ERROR_CONNECT) {
+ ldout(cct, 20) << "ASIO async connect fails : " << ec.message() << dendl;
+ }
+
+ // TODO : currently, for any asio error, just shutdown RO.
+ // TODO : re-write close / shutdown/
+ //close();
+
+ // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
+ {
+ Mutex::Locker locker(m_map_lock);
+ for(auto it : m_seq_to_req) {
+ it.second->m_head.type = RBDSC_READ_RADOS;
+ it.second->m_process_msg->complete(it.second);
+ }
+ m_seq_to_req.clear();
+ }
+
+ //m_outcoming_bl.clear();
+ }
+
+
+ // TODO : use async + wait_event
+ // TODO : accept one parameter : ObjectCacheRequest
int CacheClient::register_client(Context* on_finish) {
- // cache controller will init layout
- rbdsc_req_type_t *message = new rbdsc_req_type_t();
- message->type = RBDSC_REGISTER;
+ ObjectCacheRequest* message = new ObjectCacheRequest();
+ message->m_head.version = 0;
+ message->m_head.seq = m_sequence_id++;
+ message->m_head.type = RBDSC_REGISTER;
+ message->m_head.reserved = 0;
+ message->encode();
+
+ bufferlist bl;
+ bl.append(message->get_head_buffer());
+ bl.append(message->get_data_buffer());
uint64_t ret;
boost::system::error_code ec;
ret = boost::asio::write(m_dm_socket,
- boost::asio::buffer((char*)message, message->size()), ec);
+ boost::asio::buffer(bl.c_str(), bl.length()), ec);
- if(ec) {
- ldout(cct, 20) << "write fails : " << ec.message() << dendl;
+ if(ec || ret != bl.length()) {
+ fault(ASIO_ERROR_WRITE, ec);
return -1;
}
- if(ret != message->size()) {
- ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
- return -1;
- }
-
- // hard code TODO
ret = boost::asio::read(m_dm_socket,
- boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-
- if(ec == boost::asio::error::eof) {
- ldout(cct, 20) << "recv eof" << dendl;
+ boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec);
+ if(ec || ret != sizeof(ObjectCacheMsgHeader)) {
+ fault(ASIO_ERROR_READ, ec);
return -1;
}
- if(ec) {
- ldout(cct, 20) << "write fails : " << ec.message() << dendl;
- return -1;
- }
+ ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)m_header_buffer;
+ uint64_t data_len = head->data_len;
+ bufferptr bp_data(buffer::create(data_len));
- if(ret != RBDSC_MSG_LEN) {
- ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
+ if(ec || ret != data_len) {
+ fault(ASIO_ERROR_READ, ec);
return -1;
}
- std::string reply_msg(m_recv_buffer, ret);
- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(reply_msg.c_str());
-
- if (io_ctx->type == RBDSC_REGISTER_REPLY) {
+ bufferlist data_buffer;
+ data_buffer.append(std::move(bp_data));
+ ObjectCacheRequest* req = decode_object_cache_request(head, data_buffer);
+ if (req->m_head.type == RBDSC_REGISTER_REPLY) {
on_finish->complete(true);
} else {
on_finish->complete(false);
}
- delete message;
-
- ldout(cct, 20) << "register volume success" << dendl;
-
- // TODO
+ delete req;
m_session_work.store(true);
return 0;
}
- // if occur any error, we just return false. Then read from rados.
- int CacheClient::lookup_object(std::string pool_name, std::string object_id,
- Context* on_finish) {
- rbdsc_req_type_t *message = new rbdsc_req_type_t();
- message->type = RBDSC_READ;
- memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
- memcpy(message->oid, object_id.c_str(), object_id.size());
- message->vol_size = 0;
- message->offset = 0;
- message->length = 0;
-
- boost::asio::async_write(m_dm_socket,
- boost::asio::buffer((char*)message, message->size()),
- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
- delete message;
- if(err) {
- ldout(cct, 20) << "async_write failed"
- << err.message() << dendl;
- close();
- on_finish->complete(false);
- return;
- }
- if(cb != RBDSC_MSG_LEN) {
- ldout(cct, 20) << "async_write failed in-complete request" << dendl;
- close();
- on_finish->complete(false);
- return;
- }
- get_result(on_finish);
- });
-
- return 0;
- }
-
- void CacheClient::get_result(Context* on_finish) {
- char* lookup_result = new char[RBDSC_MSG_LEN + 1];
- boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(lookup_result, RBDSC_MSG_LEN),
- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- [this, lookup_result, on_finish](const boost::system::error_code& err,
- size_t cb) {
- if(err == boost::asio::error::eof ||
- err == boost::asio::error::connection_reset ||
- err == boost::asio::error::operation_aborted ||
- err == boost::asio::error::bad_descriptor) {
- ldout(cct, 20) << "fail to read lookup result"
- << err.message() << dendl;
- close();
- on_finish->complete(false);
- delete lookup_result;
- return;
- }
-
- if(err) {
- ldout(cct, 1) << "fail to read lookup result"
- << err.message() << dendl;
- close();
- on_finish->complete(false);
- delete lookup_result;
- return;
- }
-
- if (cb != RBDSC_MSG_LEN) {
- ldout(cct, 1) << "incomplete lookup result" << dendl;
- close();
- on_finish->complete(false);
- delete lookup_result;
- return;
- }
-
- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(lookup_result);
-
- if (io_ctx->type == RBDSC_READ_REPLY) {
- on_finish->complete(true);
- } else {
- on_finish->complete(false);
- }
- delete lookup_result;
- return;
- });
- }
-
} // namespace immutable_obj_cache
} // namespace ceph
#include <boost/algorithm/string.hpp>
#include "include/ceph_assert.h"
#include "include/Context.h"
+#include "common/Mutex.h"
+#include "Types.h"
#include "SocketCommon.h"
int stop();
int connect();
+ void lookup_object(ObjectCacheRequest* req);
+ void send_message();
+ void try_send();
+ void fault(const int err_type, const boost::system::error_code& err);
+ void try_receive();
+ void receive_message();
int register_client(Context* on_finish);
- int lookup_object(std::string pool_name, std::string object_id, Context* on_finish);
- void get_result(Context* on_finish);
private:
boost::asio::io_service m_io_service;
stream_protocol::socket m_dm_socket;
ClientProcessMsg m_client_process_msg;
stream_protocol::endpoint m_ep;
- char m_recv_buffer[1024];
std::shared_ptr<std::thread> m_io_thread;
-
- // atomic modfiy for this variable.
- // thread 1 : asio callback thread modify it.
- // thread 2 : librbd read it.
std::atomic<bool> m_session_work;
CephContext* cct;
+
+ bool m_use_dedicated_worker;
+ int m_worker_thread_num;
+ boost::asio::io_service* m_worker;
+ std::vector<std::thread*> m_worker_threads;
+ boost::asio::io_service::work* m_worker_io_service_work;
+
+ char* m_header_buffer;
+ std::atomic<bool> m_writing;
+ std::atomic<bool> m_reading;
+ std::atomic<uint64_t> m_sequence_id;
+ Mutex m_lock;
+ bufferlist m_outcoming_bl;
+ Mutex m_map_lock;
+ std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
};
} // namespace immutable_obj_cache
std::remove(controller_path.c_str());
m_cache_server = new CacheServer(m_cct, controller_path,
- ([&](uint64_t p, std::string s){handle_request(p, s);}));
+ ([&](uint64_t p, ObjectCacheRequest* s){handle_request(p, s);}));
int ret = m_cache_server->run();
if (ret != 0) {
}
}
-void CacheController::handle_request(uint64_t session_id, std::string msg){
+void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
ldout(m_cct, 20) << dendl;
- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
-
- switch (io_ctx->type) {
+ switch (req->m_head.type) {
case RBDSC_REGISTER: {
// init cache layout for volume
m_object_cache_store->init_cache();
- io_ctx->type = RBDSC_REGISTER_REPLY;
- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+ req->m_head.type = RBDSC_REGISTER_REPLY;
+ m_cache_server->send(session_id, req);
break;
}
case RBDSC_READ: {
// lookup object in local cache store
- int ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->oid);
+ int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_name, req->m_data.m_oid);
if (ret < 0) {
- io_ctx->type = RBDSC_READ_RADOS;
+ req->m_head.type = RBDSC_READ_RADOS;
} else {
- io_ctx->type = RBDSC_READ_REPLY;
+ req->m_head.type = RBDSC_READ_REPLY;
}
- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+ m_cache_server->send(session_id, req);
break;
}
ldout(m_cct, 5) << "can't recongize request" << dendl;
- assert(0); // TODO replace it.
+ ceph_assert(0); // TODO replace it.
}
}
void run();
- void handle_request(uint64_t sesstion_id, std::string msg);
+ void handle_request(uint64_t sesstion_id, ObjectCacheRequest* msg);
private:
CacheServer *m_cache_server;
return 0;
}
-void CacheServer::send(uint64_t session_id, std::string msg) {
- ldout(cct, 20) << dendl;
-
- auto it = m_session_map.find(session_id);
- if (it != m_session_map.end()) {
- it->second->send(msg);
- } else {
- ldout(cct, 20) << "missing reply session id" << dendl;
- assert(0);
- }
-}
-
int CacheServer::start_accept() {
ldout(cct, 20) << dendl;
}
void CacheServer::accept() {
+ CacheSessionPtr new_session = nullptr;
+
+ new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
- CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service,
- m_server_process_msg, cct));
m_acceptor.async_accept(new_session->socket(),
boost::bind(&CacheServer::handle_accept, this, new_session,
boost::asio::placeholders::error));
void CacheServer::handle_accept(CacheSessionPtr new_session,
const boost::system::error_code& error) {
ldout(cct, 20) << dendl;
+ std::cout << "new session arrived....." << std::endl;
if (error) {
// operation_absort
lderr(cct) << "async accept fails : " << error.message() << dendl;
accept();
}
+void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
+ ldout(cct, 20) << dendl;
+
+ auto it = m_session_map.find(session_id);
+ if (it != m_session_map.end()) {
+ it->second->send(msg);
+ } else {
+ ldout(cct, 20) << "missing reply session id" << dendl;
+ ceph_assert(0);
+ }
+}
+
} // namespace immutable_obj_cache
} // namespace ceph
#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
+#include "Types.h"
#include "SocketCommon.h"
#include "CacheSession.h"
namespace immutable_obj_cache {
class CacheServer {
-
public:
CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg);
~CacheServer();
int run();
- void send(uint64_t session_id, std::string msg);
int start_accept();
int stop();
+ void send(uint64_t session_id, ObjectCacheRequest* msg);
private:
void accept();
private:
CephContext* cct;
- boost::asio::io_service m_io_service; // TODO wrapper it.
+ boost::asio::io_service m_io_service;
ProcessMsg m_server_process_msg;
stream_protocol::endpoint m_local_path;
stream_protocol::acceptor m_acceptor;
uint64_t m_session_id = 1;
+ // TODO : need to lock it.
std::map<uint64_t, CacheSessionPtr> m_session_map;
};
CacheSession::CacheSession(uint64_t session_id, io_service& io_service,
ProcessMsg processmsg, CephContext* cct)
: m_session_id(session_id), m_dm_socket(io_service),
- process_msg(processmsg), cct(cct)
+ m_head_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
+ m_server_process_msg(processmsg), cct(cct)
{}
CacheSession::~CacheSession() {
close();
+ delete[] m_head_buffer;
}
stream_protocol::socket& CacheSession::socket() {
}
void CacheSession::start() {
- handing_request();
+ read_request_header();
}
-void CacheSession::handing_request() {
+void CacheSession::read_request_header() {
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- boost::bind(&CacheSession::handle_read,
+ boost::asio::buffer(m_head_buffer, sizeof(ObjectCacheMsgHeader)),
+ boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
+ boost::bind(&CacheSession::handle_request_header,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
-void CacheSession::handle_read(const boost::system::error_code& err,
+void CacheSession::handle_request_header(const boost::system::error_code& err,
size_t bytes_transferred) {
- if (err == boost::asio::error::eof ||
- err == boost::asio::error::connection_reset ||
- err == boost::asio::error::operation_aborted ||
- err == boost::asio::error::bad_descriptor) {
- ldout(cct, 20) << "fail to handle read : " << err.message() << dendl;
- close();
+ if(err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ fault();
return;
}
- if(err) {
- ldout(cct, 1) << "faile to handle read: " << err.message() << dendl;
- return;
- }
+ ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer);
+ ceph_assert(head->version == 0);
+ ceph_assert(head->reserved == 0);
+ ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ ||
+ head->type == RBDSC_LOOKUP);
- if(bytes_transferred != RBDSC_MSG_LEN) {
- ldout(cct, 1) << "incomplete read" << dendl;
- return;
- }
-
- process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+ read_request_data(head->data_len);
}
-void CacheSession::handle_write(const boost::system::error_code& error,
- size_t bytes_transferred) {
- if (error) {
- ldout(cct, 20) << "async_write failed: " << error.message() << dendl;
- assert(0);
- }
+void CacheSession::read_request_data(uint64_t data_len) {
+ bufferptr bp_data(buffer::create(data_len));
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheSession::handle_request_data,
+ shared_from_this(), bp_data, data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
- if(bytes_transferred != RBDSC_MSG_LEN) {
- ldout(cct, 20) << "reply in-complete. "<<dendl;
- assert(0);
+void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
+ const boost::system::error_code& err,
+ size_t bytes_transferred) {
+ if(err || bytes_transferred != data_len) {
+ fault();
+ return;
}
- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- boost::bind(&CacheSession::handle_read,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ bufferlist bl_data;
+ bl_data.append(std::move(bp));
+ ObjectCacheRequest* req = decode_object_cache_request(
+ (ObjectCacheMsgHeader*)m_head_buffer, bl_data);
+ process(req);
+ read_request_header();
+}
+void CacheSession::process(ObjectCacheRequest* req) {
+ m_server_process_msg(m_session_id, req);
}
-void CacheSession::send(std::string msg) {
- boost::asio::async_write(m_dm_socket,
- boost::asio::buffer(msg.c_str(), msg.size()),
- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- boost::bind(&CacheSession::handle_write,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+void CacheSession::send(ObjectCacheRequest* reply) {
+ reply->m_head_buffer.clear();
+ reply->m_data_buffer.clear();
+ reply->encode();
+ bufferlist bl;
+ bl.append(reply->get_head_buffer());
+ bl.append(reply->get_data_buffer());
+
+ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()),
+ boost::asio::transfer_exactly(bl.length()),
+ [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
+ if(err || bytes_transferred != bl.length()) {
+ fault();
+ return;
+ }
+ delete reply;
+ });
+}
+void CacheSession::fault() {
+ // TODO
}
} // namespace immutable_obj_cache
#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
+#include "Types.h"
#include "SocketCommon.h"
using boost::asio::local::stream_protocol;
class CacheSession : public std::enable_shared_from_this<CacheSession> {
public:
- CacheSession(uint64_t session_id, io_service& io_service,
- ProcessMsg processmsg, CephContext* cct);
+ CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx);
~CacheSession();
-
stream_protocol::socket& socket();
- void start();
void close();
- void handing_request();
-
-private:
-
- void handle_read(const boost::system::error_code& error,
- size_t bytes_transferred);
-
- void handle_write(const boost::system::error_code& error,
- size_t bytes_transferred);
-
-public:
- void send(std::string msg);
+ void start();
+ void read_request_header();
+ void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred);
+ void read_request_data(uint64_t data_len);
+ void handle_request_data(bufferptr bp, uint64_t data_len,
+ const boost::system::error_code& err, size_t bytes_transferred);
+ void process(ObjectCacheRequest* req);
+ void fault();
+ void send(ObjectCacheRequest* msg);
private:
uint64_t m_session_id;
stream_protocol::socket m_dm_socket;
- ProcessMsg process_msg;
+ char* m_head_buffer;
+ ProcessMsg m_server_process_msg;
CephContext* cct;
-
- // Buffer used to store data received from the client.
- //std::array<char, 1024> data_;
- char m_buffer[1024];
};
typedef std::shared_ptr<CacheSession> CacheSessionPtr;
bufferlist temp_bl;
std::string error_str;
- // TODO : optimization
+
+ // TODO : current implements will drop sharely performance.
int ret = temp_bl.read_file(m_name.c_str(), &error_str);
if (ret < 0) {
lderr(cct)<<"read file fail:" << error_str << dendl;
#ifndef CEPH_CACHE_SOCKET_COMMON_H
#define CEPH_CACHE_SOCKET_COMMON_H
-#include "include/types.h"
-#include "include/int_types.h"
-
namespace ceph {
namespace immutable_obj_cache {
static const int RBDSC_LOOKUP_REPLY = 0X16;
static const int RBDSC_READ_RADOS = 0X17;
+static const int ASIO_ERROR_READ = 0X01;
+static const int ASIO_ERROR_WRITE = 0X02;
+static const int ASIO_ERROR_CONNECT = 0X03;
+static const int ASIO_ERROR_ACCEPT = 0X04;
+static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
+class ObjectCacheRequest;
-typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
typedef std::function<void(std::string)> ClientProcessMsg;
-typedef uint8_t rbdsc_req_type;
-
-//TODO(): switch to bufferlist
-struct rbdsc_req_type_t {
- rbdsc_req_type type;
- uint64_t vol_size;
- uint64_t offset;
- uint64_t length;
- char pool_name[256];
- char vol_name[256];
- char oid[256];
-
- uint64_t size() {
- return sizeof(rbdsc_req_type_t);
- }
-
- std::string to_buffer() {
- std::stringstream ss;
- ss << type;
- ss << vol_size;
- ss << offset;
- ss << length;
- ss << pool_name;
- ss << vol_name;
-
- return ss.str();
- }
-};
-
-static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
} // namespace immutable_obj_cache
} // namespace ceph
namespace ceph {
namespace immutable_obj_cache {
+// TODO : fix compile issue
+/*
void ObjectCacheMsgHeader::encode(bufferlist& bl) const {
- using ceph::encode;
- ::encode(seq, bl);
- ::encode(type, bl);
- ::encode(version, bl);
- ::encode(padding, bl);
- ::encode(mid_len, bl);
- ::encode(data_len, bl);
- ::encode(reserved, bl);
+ ceph::encode(seq, bl);
+ ceph::encode(type, bl);
+ ceph::encode(version, bl);
+ ceph::encode(padding, bl);
+ ceph::encode(data_len, bl);
+ ceph::encode(reserved, bl);
}
void ObjectCacheMsgHeader::decode(bufferlist::const_iterator& it) {
- using ceph::decode;
- ::decode(seq, it);
- ::decode(type, it);
- ::decode(version, it);
- ::decode(padding, it);
- ::decode(mid_len, it);
- ::decode(data_len, it);
- ::decode(reserved, it);
+ ceph::decode(seq, it);
+ ceph::decode(type, it);
+ ceph::decode(version, it);
+ ceph::decode(padding, it);
+ ceph::decode(data_len, it);
+ ceph::decode(reserved, it);
}
+*/
} // namespace immutable_obj_cache
} // namespace ceph
#define CEPH_CACHE_TYPES_H
#include "include/encoding.h"
+#include "include/Context.h"
namespace ceph {
namespace immutable_obj_cache {
uint16_t type; /* msg type */
uint16_t version; /* object cache version */
uint32_t padding;
- uint64_t mid_len;
uint32_t data_len;
uint32_t reserved;
- void encode(bufferlist& bl) const;
- void decode(bufferlist::const_iterator& it);
+ void encode(bufferlist& bl) const {
+ ceph::encode(seq, bl);
+ ceph::encode(type, bl);
+ ceph::encode(version, bl);
+ ceph::encode(padding, bl);
+ ceph::encode(data_len, bl);
+ ceph::encode(reserved, bl);
+ }
+
+ void decode(bufferlist::const_iterator& it) {
+ ceph::decode(seq, it);
+ ceph::decode(type, it);
+ ceph::decode(version, it);
+ ceph::decode(padding, it);
+ ceph::decode(data_len, it);
+ ceph::decode(reserved, it);
+ }
};
-class ObjectCacheMsgMiddle {
+class ObjectCacheMsgData {
public:
uint64_t m_image_size;
uint64_t m_read_offset;
std::string m_image_name;
std::string m_oid;
- ObjectCacheMsgMiddle(){}
- ~ObjectCacheMsgMiddle(){}
+ ObjectCacheMsgData(){}
+ ~ObjectCacheMsgData(){}
void encode(bufferlist& bl) {
ceph::encode(m_image_size, bl);
class ObjectCacheRequest {
public:
ObjectCacheMsgHeader m_head;
- ObjectCacheMsgMiddle m_mid;
-
+ ObjectCacheMsgData m_data;
bufferlist m_head_buffer;
- bufferlist m_mid_buffer;
bufferlist m_data_buffer;
+ Context* m_on_finish;
+ GenContext<ObjectCacheRequest*>* m_process_msg;
ObjectCacheRequest() {}
~ObjectCacheRequest() {}
-
void encode() {
- m_mid.encode(m_mid_buffer);
-
- m_head.mid_len = m_mid_buffer.length();
+ m_data.encode(m_data_buffer);
+ m_head.data_len = m_data_buffer.length();
m_head.data_len = m_data_buffer.length();
-
assert(m_head_buffer.length() == 0);
m_head.encode(m_head_buffer);
assert(sizeof(ObjectCacheMsgHeader) == m_head_buffer.length());
}
-
bufferlist get_head_buffer() {
return m_head_buffer;
}
-
- bufferlist get_mid_buffer() {
- return m_mid_buffer;
- }
-
bufferlist get_data_buffer() {
return m_data_buffer;
}
};
-// currently, just use this interface.
inline ObjectCacheRequest* decode_object_cache_request(
- ObjectCacheMsgHeader* head, bufferlist mid_buffer)
-{
+ ObjectCacheMsgHeader* head, bufferlist data_buffer) {
ObjectCacheRequest* req = new ObjectCacheRequest();
-
- // head
req->m_head = *head;
- assert(req->m_head.mid_len == mid_buffer.length());
-
- // mid
- req->m_mid.decode(mid_buffer);
-
+ assert(req->m_head.data_len == data_buffer.length());
+ req->m_data.decode(data_buffer);
return req;
}
inline ObjectCacheRequest* decode_object_cache_request(
- ObjectCacheMsgHeader* head, bufferlist& mid_buffer,
- bufferlist& data_buffer)
-{
- ObjectCacheRequest* req = decode_object_cache_request(head, mid_buffer);
-
- // data
- if(data_buffer.length() != 0) {
- req->m_data_buffer = data_buffer;
- }
-
- return req;
+ bufferlist head_buffer, bufferlist data_buffer) {
+ return decode_object_cache_request((ObjectCacheMsgHeader*)(head_buffer.c_str()), data_buffer);
}
-inline ObjectCacheRequest* decode_object_cache_request(bufferlist& head,
- bufferlist& mid_buffer, bufferlist& data_buffer)
-{
- assert(sizeof(ObjectCacheMsgHeader) == head.length());
- return decode_object_cache_request((ObjectCacheMsgHeader*)(head.c_str()), mid_buffer, data_buffer);
-}
-
-
} // namespace immutable_obj_cache
} // namespace ceph
#endif