void SetUp() override {
std::remove(m_local_path.c_str());
m_cache_server = new CacheServer(g_ceph_context, m_local_path,
- [this](uint64_t sid, ObjectCacheRequest* req){
+ [this](CacheSession* sid, ObjectCacheRequest* req){
handle_request(sid, req);
});
ASSERT_TRUE(m_cache_server != nullptr);
delete srv_thd;
}
- void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
+ void handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
switch (req->get_request_type()) {
case RBDSC_REGISTER: {
ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
- m_cache_server->send(session_id, reply);
+ session_id->send(reply);
break;
}
case RBDSC_READ: {
} else {
reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/fakepath");
}
- m_cache_server->send(session_id, reply);
+ session_id->send(reply);
break;
}
}
void SetUp() override {
std::remove(m_local_path.c_str());
m_cache_server = new CacheServer(g_ceph_context, m_local_path,
- [this](uint64_t session_id, ObjectCacheRequest* req){
+ [this](CacheSession* session_id, ObjectCacheRequest* req){
server_handle_request(session_id, req);
});
ASSERT_TRUE(m_cache_server != nullptr);
return cache_client;
}
- void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
+ void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
switch (req->get_request_type()) {
case RBDSC_REGISTER: {
ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
req->seq);
- m_cache_server->send(session_id, reply);
+ session_id->send(reply);
break;
}
case RBDSC_READ: {
ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
req->seq);
- m_cache_server->send(session_id, reply);
+ session_id->send(reply);
break;
}
}
std::remove(controller_path.c_str());
m_cache_server = new CacheServer(m_cct, controller_path,
- ([&](uint64_t p, ObjectCacheRequest* s){handle_request(p, s);}));
+ ([&](CacheSession* p, ObjectCacheRequest* s){handle_request(p, s);}));
int ret = m_cache_server->run();
if (ret != 0) {
}
}
-void CacheController::handle_request(uint64_t session_id,
+void CacheController::handle_request(CacheSession* session,
ObjectCacheRequest* req) {
ldout(m_cct, 20) << dendl;
ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
RBDSC_REGISTER_REPLY, req->seq);
- m_cache_server->send(session_id, reply);
+ session->send(reply);
break;
}
case RBDSC_READ: {
reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
req->seq, cache_path);
}
- m_cache_server->send(session_id, reply);
+ session->send(reply);
break;
}
default:
void run();
void handle_request(uint64_t sesstion_id, ObjectCacheRequest* msg);
+ void handle_request(CacheSession* session, ObjectCacheRequest* msg);
private:
CacheServer *m_cache_server;
accept();
}
-void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
- ldout(cct, 20) << dendl;
-
- auto it = m_session_map.find(session_id);
- if (it != m_session_map.end()) {
- it->second->send(msg);
- } else {
- ldout(cct, 20) << "missing reply session id" << dendl;
- ceph_assert(0);
- }
-}
-
} // namespace immutable_obj_cache
} // namespace ceph
int run();
int start_accept();
int stop();
- void send(uint64_t session_id, ObjectCacheRequest* msg);
private:
void accept();
stream_protocol::endpoint m_local_path;
stream_protocol::acceptor m_acceptor;
uint64_t m_session_id = 1;
- // TODO(dehao) : need to lock it.
std::map<uint64_t, CacheSessionPtr> m_session_map;
};
void CacheSession::process(ObjectCacheRequest* req) {
ldout(cct, 20) << dendl;
- m_server_process_msg(m_session_id, req);
+ m_server_process_msg(this, req);
}
void CacheSession::send(ObjectCacheRequest* reply) {
static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
class ObjectCacheRequest;
+class CacheSession;
-typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
+typedef std::function<void(CacheSession*, ObjectCacheRequest*)> ProcessMsg;
} // namespace immutable_obj_cache
} // namespace ceph