test_main.cc
test_SimplePolicy.cc
test_sync_file.cc
- #test_DomainSocket.cc // TODO
- #test_multi_session.cc // TODO
+ test_DomainSocket.cc
+ test_multi_session.cc
test_object_store.cc
test_message.cc
)
unordered_set<std::string> m_hit_entry_set;
TestCommunication()
- : m_cache_server(nullptr), m_cache_client(nullptr), m_local_path("/tmp/ceph_test_domain_socket"),
+ : m_cache_server(nullptr), m_cache_client(nullptr),
+ m_local_path("/tmp/ceph_test_domain_socket"),
m_send_request_index(0), m_recv_ack_index(0)
{}
void SetUp() override {
std::remove(m_local_path.c_str());
- m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t xx, std::string yy ){
- handle_request(xx, yy);
+ m_cache_server = new CacheServer(g_ceph_context, m_local_path,
+ [this](uint64_t sid, ObjectCacheRequest* req){
+ handle_request(sid, req);
});
ASSERT_TRUE(m_cache_server != nullptr);
srv_thd = new std::thread([this]() {m_cache_server->run();});
ASSERT_TRUE(m_cache_client != nullptr);
m_cache_client->run();
- while(true) {
+ while (true) {
if (0 == m_cache_client->connect()) {
break;
}
delete srv_thd;
}
- void handle_request(uint64_t session_id, std::string msg){
- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+ void handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (io_ctx->type) {
+ switch (req->m_head.type) {
case RBDSC_REGISTER: {
- io_ctx->type = RBDSC_REGISTER_REPLY;
- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+ req->m_head.type = RBDSC_REGISTER_REPLY;
+ m_cache_server->send(session_id, req);
break;
}
case RBDSC_READ: {
- if (m_hit_entry_set.find(io_ctx->oid) == m_hit_entry_set.end()) {
- io_ctx->type = RBDSC_READ_RADOS;
+ if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) {
+ req->m_head.type = RBDSC_READ_RADOS;
} else {
- io_ctx->type = RBDSC_READ_REPLY;
+ req->m_head.type = RBDSC_READ_REPLY;
}
- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+ m_cache_server->send(session_id, req);
break;
}
}
}
- // times: message number
- // queue_deqth : imitate message queue depth
- // thinking : imitate handing message time
- void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
- m_send_request_index.store(0);
- m_recv_ack_index.store(0);
- for (uint64_t index = 0; index < times; index++) {
- auto ctx = new FunctionContext([this, thinking, times](bool req){
- if (thinking != 0) {
- usleep(thinking); // handling message
- }
- m_recv_ack_index++;
- if (m_recv_ack_index == times) {
- m_wait_event.signal();
- }
- });
-
- // simple queue depth
- while (m_send_request_index - m_recv_ack_index > queue_depth) {
- usleep(1);
- }
-
- m_cache_client->lookup_object("test_pool", "123456", ctx);
- m_send_request_index++;
- }
- m_wait_event.wait();
- }
+ // times: message number
+ // queue_depth : imitate message queue depth
+ // thinking : imitate handing message time
+ void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
+ m_send_request_index.store(0);
+ m_recv_ack_index.store(0);
+ for (uint64_t index = 0; index < times; index++) {
+ auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+ ObjectCacheRequest*>([this, thinking, times](ObjectCacheRequest* ack){
+ if (thinking != 0) {
+ usleep(thinking); // handling message
+ }
+ m_recv_ack_index++;
+ if (m_recv_ack_index == times) {
+ m_wait_event.signal();
+ }
+ });
+
+ // simple queue depth
+ while (m_send_request_index - m_recv_ack_index > queue_depth) {
+ usleep(1);
+ }
+
+ m_cache_client->lookup_object("test_pool", "123456", ctx);
+ m_send_request_index++;
+ }
+ m_wait_event.wait();
+ }
bool startup_lookupobject_testing(std::string pool_name, std::string object_id) {
bool hit;
- auto ctx = new FunctionContext([this, &hit](bool req){
- hit = req;
+ auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+ ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){
+ hit = ack->m_head.type == RBDSC_READ_REPLY;
m_wait_event.signal();
});
m_cache_client->lookup_object(pool_name, object_id, ctx);
ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
startup_pingpong_testing(200, 128, 0);
ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
- startup_pingpong_testing(10000, 512, 0);
- ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
}
TEST_F(TestCommunication, test_lookup_object) {
void SetUp() override {
std::remove(m_local_path.c_str());
- m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t session_id, std::string request ){
- server_handle_request(session_id, request);
+ m_cache_server = new CacheServer(g_ceph_context, m_local_path,
+ [this](uint64_t session_id, ObjectCacheRequest* req){
+ server_handle_request(session_id, req);
});
ASSERT_TRUE(m_cache_server != nullptr);
}
void TearDown() override {
- for(uint64_t i = 0; i < m_session_num; i++) {
- if(m_cache_client_vec[i] != nullptr) {
+ for (uint64_t i = 0; i < m_session_num; i++) {
+ if (m_cache_client_vec[i] != nullptr) {
m_cache_client_vec[i]->close();
delete m_cache_client_vec[i];
}
}
m_cache_server->stop();
- if(m_cache_server_thread->joinable()) {
+ if (m_cache_server_thread->joinable()) {
m_cache_server_thread->join();
}
delete m_cache_server;
return cache_client;
}
- void server_handle_request(uint64_t session_id, std::string request) {
- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(request.c_str());
+ void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) {
- switch (io_ctx->type) {
+ switch (req->m_head.type) {
case RBDSC_REGISTER: {
-
- io_ctx->type = RBDSC_REGISTER_REPLY;
- m_cache_server->send(session_id, std::string((char*)io_ctx, request.size()));
+ req->m_head.type = RBDSC_REGISTER_REPLY;
+ m_cache_server->send(session_id, req);
break;
}
case RBDSC_READ: {
- io_ctx->type = RBDSC_READ_REPLY;
- m_cache_server->send(session_id, std::string((char*)io_ctx, request.size()));
+ req->m_head.type = RBDSC_READ_REPLY;
+ m_cache_server->send(session_id, req);
break;
}
}
void test_lookup_object(std::string pool, uint64_t index, uint64_t request_num, bool is_last) {
for (uint64_t i = 0; i < request_num; i++) {
- auto ctx = new FunctionContext([this](bool ret) {
+ auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>,
+ ObjectCacheRequest*>([this](ObjectCacheRequest* ack) {
m_recv_ack_index++;
});
m_send_request_index++;
m_cache_client_vec[index]->lookup_object(pool, "1234", ctx);
}
- if(is_last) {
+ if (is_last) {
while(m_send_request_index != m_recv_ack_index) {
usleep(1);
}
uint64_t test_times = 1000;
uint64_t test_session_num = 100;
- for(uint64_t i = 0; i <= test_times; i++) {
+ for (uint64_t i = 0; i <= test_times; i++) {
uint64_t random_index = random() % test_session_num;
if (m_cache_client_vec[random_index] == nullptr) {
test_register_client(random_index);
namespace immutable_obj_cache {
CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
- : m_io_service_work(m_io_service),
- m_dm_socket(m_io_service),
- m_ep(stream_protocol::endpoint(file)),
- m_io_thread(nullptr),
- m_session_work(false),
- m_writing(false),
+ : cct(ceph_ctx), m_io_service_work(m_io_service),
+ m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
+ m_io_thread(nullptr), m_session_work(false), m_writing(false),
+ m_reading(false), m_sequence_id(0),
m_lock("ceph::cache::cacheclient::m_lock"),
m_map_lock("ceph::cache::cacheclient::m_map_lock"),
- m_sequence_id(0),
- m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)]),
- cct(ceph_ctx)
+ m_header_buffer(new char[sizeof(ObjectCacheMsgHeader)])
{
// TODO : release these resources.
m_use_dedicated_worker = true;
if(m_use_dedicated_worker) {
m_worker = new boost::asio::io_service();
m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
- for(int i = 0; i < m_worker_thread_num; i++) {
+ for(uint64_t i = 0; i < m_worker_thread_num; i++) {
std::thread* thd = new std::thread([this](){m_worker->run();});
m_worker_threads.push_back(thd);
}
return 0;
}
- void CacheClient::lookup_object(ObjectCacheRequest* req) {
+ void CacheClient::lookup_object(std::string pool_name, std::string oid,
+ GenContext<ObjectCacheRequest*>* on_finish) {
+
+ ObjectCacheRequest* req = new ObjectCacheRequest();
+ req->m_head.version = 0;
+ req->m_head.reserved = 0;
+ req->m_head.type = RBDSC_READ;
+ req->m_data.m_pool_name = pool_name;
+ req->m_data.m_oid = oid;
+ req->m_process_msg = on_finish;
req->m_head.seq = m_sequence_id++;
req->encode();
boost::asio::buffer(bl.c_str(), bl.length()),
boost::asio::transfer_exactly(bl.length()),
[this, bl](const boost::system::error_code& err, size_t cb) {
- if(err || cb != bl.length()) {
+ if (err || cb != bl.length()) {
fault(ASIO_ERROR_WRITE, err);
return;
}
}
}
+ //TODO(): split into smaller functions
void CacheClient::receive_message() {
ceph_assert(m_reading.load());
{
Mutex::Locker locker(m_map_lock);
ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ delete m_seq_to_req[seq_id];
m_seq_to_req.erase(seq_id);
if(m_seq_to_req.size() == 0) {
m_reading.store(false);
int stop();
int connect();
- void lookup_object(ObjectCacheRequest* req);
+ void lookup_object(std::string pool_name, std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
void send_message();
void try_send();
void fault(const int err_type, const boost::system::error_code& err);
int register_client(Context* on_finish);
private:
+ CephContext* cct;
boost::asio::io_service m_io_service;
boost::asio::io_service::work m_io_service_work;
stream_protocol::socket m_dm_socket;
- ClientProcessMsg m_client_process_msg;
stream_protocol::endpoint m_ep;
std::shared_ptr<std::thread> m_io_thread;
std::atomic<bool> m_session_work;
- CephContext* cct;
bool m_use_dedicated_worker;
- int m_worker_thread_num;
+ uint64_t m_worker_thread_num;
boost::asio::io_service* m_worker;
std::vector<std::thread*> m_worker_threads;
boost::asio::io_service::work* m_worker_io_service_work;
- char* m_header_buffer;
std::atomic<bool> m_writing;
std::atomic<bool> m_reading;
std::atomic<uint64_t> m_sequence_id;
Mutex m_lock;
- bufferlist m_outcoming_bl;
Mutex m_map_lock;
std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
+ bufferlist m_outcoming_bl;
+ char* m_header_buffer;
};
} // namespace immutable_obj_cache
void CacheServer::handle_accept(CacheSessionPtr new_session,
const boost::system::error_code& error) {
ldout(cct, 20) << dendl;
- std::cout << "new session arrived....." << std::endl;
if (error) {
// operation_absort
lderr(cct) << "async accept fails : " << error.message() << dendl;
class ObjectCacheRequest;
typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
-typedef std::function<void(std::string)> ClientProcessMsg;
} // namespace immutable_obj_cache
} // namespace ceph
ObjectCacheMsgData m_data;
bufferlist m_head_buffer;
bufferlist m_data_buffer;
- Context* m_on_finish;
GenContext<ObjectCacheRequest*>* m_process_msg;
ObjectCacheRequest() {}