+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "gtest/gtest.h"
-#include "include/Context.h"
-#include "include/buffer_fwd.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
-#include "global/global_init.h"
-#include "common/ceph_argparse.h"
-#include "global/global_context.h"
-#include <experimental/filesystem>
-
-#include "tools/immutable_object_cache/ObjectCacheFile.h"
-
-using namespace ceph::immutable_obj_cache;
-namespace efs = std::experimental::filesystem;
-
-class TestObjectCacheFile :public ::testing::Test {
-public:
- std::string m_cache_root_dir;
-
- TestObjectCacheFile(){}
- ~TestObjectCacheFile(){}
- static void SetUpTestCase() {}
- static void TearDownTestCase() {}
-
- void SetUp() override {
- m_cache_root_dir = g_ceph_context->_conf.get_val<std::string>("immutable_object_cache_path")
- + "/ceph_immutable_obj_cache/";
-
- if (efs::exists(m_cache_root_dir)) {
- efs::remove_all(m_cache_root_dir);
- }
- efs::create_directories(m_cache_root_dir);
- }
-
- void TearDown() override {
- efs::remove_all(m_cache_root_dir);
- }
-
-};
-
-TEST_F(TestObjectCacheFile, test_write_object_to_file) {
- ObjectCacheFile* m_cache_file_1 = new ObjectCacheFile(g_ceph_context, "test_sync_file_1");
- ObjectCacheFile* m_cache_file_2 = new ObjectCacheFile(g_ceph_context, "test_sync_file_2");
- ObjectCacheFile* m_cache_file_3 = new ObjectCacheFile(g_ceph_context, "test_sync_file_3");
- ASSERT_TRUE(m_cache_file_1->get_file_size() == -1);
- ASSERT_TRUE(m_cache_file_2->get_file_size() == -1);
- ASSERT_TRUE(m_cache_file_3->get_file_size() == -1);
-
- bufferlist* buf_1 = new ceph::bufferlist();
- bufferlist* buf_2 = new ceph::bufferlist();
- bufferlist* buf_3 = new ceph::bufferlist();
- buf_1->append(std::string(1024, '0'));
- buf_2->append(std::string(4096, '0'));
- buf_3->append(std::string(0, '0'));
-
- ASSERT_TRUE(m_cache_file_1->write_object_to_file(*buf_1, 1024) == 1024);
- ASSERT_TRUE(m_cache_file_2->write_object_to_file(*buf_2, 4096) == 4096);
- ASSERT_TRUE(m_cache_file_3->write_object_to_file(*buf_3, 0) == 0);
- ASSERT_TRUE(m_cache_file_1->get_file_size() == 1024);
- ASSERT_TRUE(m_cache_file_2->get_file_size() == 4096);
- ASSERT_TRUE(m_cache_file_3->get_file_size() == 0);
-
- delete m_cache_file_1;
- delete m_cache_file_2;
- delete m_cache_file_3;
- delete buf_1;
- delete buf_2;
- delete buf_3;
-}
-
-TEST_F(TestObjectCacheFile, test_read_object_from_file) {
- ObjectCacheFile* m_cache_file_1 = new ObjectCacheFile(g_ceph_context, "test_sync_file_1");
- ObjectCacheFile* m_cache_file_2 = new ObjectCacheFile(g_ceph_context, "test_sync_file_2");
- bufferlist* buf_1 = new ceph::bufferlist();
- bufferlist* buf_2 = new ceph::bufferlist();
-
- ASSERT_TRUE(m_cache_file_1->get_file_size() == -1);
- ASSERT_TRUE(m_cache_file_2->get_file_size() == -1);
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_1, 0, 1024), -1);
- ASSERT_EQ(m_cache_file_2->read_object_from_file(buf_2, 0, 1024), -1);
-
- buf_1->append(std::string("helloworld"));
- ASSERT_TRUE(m_cache_file_1->write_object_to_file(*buf_1, 10) == 10);
- ASSERT_TRUE(m_cache_file_1->get_file_size() == 10);
-
- bufferlist* buf_3 = new ceph::bufferlist();
- bufferlist* buf_4 = new ceph::bufferlist();
- bufferlist* buf_5 = new ceph::bufferlist();
- bufferlist* buf_6 = new ceph::bufferlist();
-
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_3, 0, 10), 10);
- ASSERT_EQ(10, buf_3->length());
- ASSERT_EQ(0, (strncmp(buf_1->c_str(), buf_3->c_str(), 10)));
-
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_4, 0, 4096), 10);
- ASSERT_EQ(10, buf_4->length());
- ASSERT_EQ(0, (strncmp(buf_1->c_str(), buf_4->c_str(), 10)));
-
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_5, 2, 4), 4);
- ASSERT_EQ(4, buf_5->length());
- bufferlist sub_bl;
- sub_bl.substr_of(*buf_1, 2, 4);
- ASSERT_EQ(0, (strncmp(sub_bl.c_str(), buf_5->c_str(), 4)));
-
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_6, 12, 4), 0);
- ASSERT_EQ(0, buf_6->length());
-
-
- delete m_cache_file_1;
- delete m_cache_file_2;
- delete buf_1;
- delete buf_2;
- delete buf_3;
- delete buf_4;
- delete buf_5;
- delete buf_6;
-}
// TODO : configure it.
m_use_dedicated_worker = true;
m_worker_thread_num = 2;
- if(m_use_dedicated_worker) {
+ if (m_use_dedicated_worker) {
m_worker = new boost::asio::io_service();
m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
for(uint64_t i = 0; i < m_worker_thread_num; i++) {
m_session_work.store(false);
m_io_service.stop();
- if(m_io_thread != nullptr) {
+ if (m_io_thread != nullptr) {
m_io_thread->join();
}
- if(m_use_dedicated_worker) {
+ if (m_use_dedicated_worker) {
m_worker->stop();
for(auto thd : m_worker_threads) {
thd->join();
m_session_work.store(false);
boost::system::error_code close_ec;
m_dm_socket.close(close_ec);
- if(close_ec) {
+ if (close_ec) {
ldout(cct, 20) << "close: " << close_ec.message() << dendl;
}
}
int CacheClient::connect() {
boost::system::error_code ec;
m_dm_socket.connect(m_ep, ec);
- if(ec) {
+ if (ec) {
fault(ASIO_ERROR_CONNECT, ec);
return -1;
}
}
void CacheClient::try_send() {
- if(!m_writing.load()) {
+ if (!m_writing.load()) {
m_writing.store(true);
send_message();
}
{
Mutex::Locker locker(m_lock);
- if(m_outcoming_bl.length() == 0) {
+ if (m_outcoming_bl.length() == 0) {
m_writing.store(false);
return;
}
}
void CacheClient::try_receive() {
- if(!m_reading.load()) {
+ if (!m_reading.load()) {
m_reading.store(true);
receive_message();
}
void CacheClient::read_reply_header() {
- /* one head buffer for all arrived reply. */
- // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer));
-
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader)));
auto raw_ptr = bp_head.c_str();
void CacheClient::handle_reply_header(bufferptr bp_head,
const boost::system::error_code& ec,
size_t bytes_transferred) {
- if(ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ if (ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
fault(ASIO_ERROR_READ, ec);
return;
}
{
Mutex::Locker locker(m_lock);
- if(m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
+ if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
m_reading.store(false);
return;
}
}
- if(is_session_work()) {
+ if (is_session_work()) {
receive_message();
}
ceph_assert(current_request != nullptr);
auto process_reply = new FunctionContext([this, current_request, reply]
(bool dedicated) {
- if(dedicated) {
+ if (dedicated) {
// dedicated thrad to execute this context.
}
current_request->m_process_msg->complete(reply);
delete reply;
});
- if(m_use_dedicated_worker) {
+ if (m_use_dedicated_worker) {
m_worker->post([process_reply]() {
process_reply->complete(true);
});
void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
ldout(cct, 20) << "fault." << ec.message() << dendl;
- if(err_type == ASIO_ERROR_CONNECT) {
+ if (err_type == ASIO_ERROR_CONNECT) {
ceph_assert(!m_session_work.load());
- if(ec == boost::asio::error::connection_refused) {
+ if (ec == boost::asio::error::connection_refused) {
ldout(cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
<< ". Immutable-object-cache daemon is down ? "
<< "Data will be read from ceph cluster " << dendl;
ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl;
}
- if(m_dm_socket.is_open()) {
+ if (m_dm_socket.is_open()) {
// Set to indicate what error occurred, if any.
// Note that, even if the function indicates an error,
// the underlying descriptor is closed.
boost::system::error_code close_ec;
m_dm_socket.close(close_ec);
- if(close_ec) {
+ if (close_ec) {
ldout(cct, 20) << "close: " << close_ec.message() << dendl;
}
}
return;
}
- if(!m_session_work.load()) {
+ if (!m_session_work.load()) {
return;
}
// make sure just have one thread to modify execute below code.
m_session_work.store(false);
- if(err_type == ASIO_ERROR_MSG_INCOMPLETE) {
+ if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
ceph_assert(0);
}
- if(err_type == ASIO_ERROR_READ) {
+ if (err_type == ASIO_ERROR_READ) {
ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
}
- if(err_type == ASIO_ERROR_WRITE) {
+ if (err_type == ASIO_ERROR_WRITE) {
ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
// CacheClient should not occur this error.
ceph_assert(0);
<< ec.message() << dendl;
}
-
- // TODO : use async + wait_event
- // TODO : accept one parameter : ObjectCacheRequest
int CacheClient::register_client(Context* on_finish) {
ObjectCacheRequest* message = new ObjectCacheRequest();
message->m_head.version = 0;
ret = boost::asio::write(m_dm_socket,
boost::asio::buffer(bl.c_str(), bl.length()), ec);
- if(ec || ret != bl.length()) {
+ if (ec || ret != bl.length()) {
fault(ASIO_ERROR_WRITE, ec);
return -1;
}
ret = boost::asio::read(m_dm_socket,
boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec);
- if(ec || ret != sizeof(ObjectCacheMsgHeader)) {
+ if (ec || ret != sizeof(ObjectCacheMsgHeader)) {
fault(ASIO_ERROR_READ, ec);
return -1;
}
bufferptr bp_data(buffer::create(data_len));
ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
- if(ec || ret != data_len) {
+ if (ec || ret != data_len) {
fault(ASIO_ERROR_READ, ec);
return -1;
}
ldout(cct, 20) << dendl;
int ret = start_accept();
- if(ret != 0) {
+ if (ret != 0) {
return ret;
}
boost::system::error_code ec;
ret = m_io_service.run(ec);
- if(ec) {
+ if (ec) {
ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl;
return -1;
}
boost::system::error_code ec;
m_acceptor.open(m_local_path.protocol(), ec);
- if(ec) {
+ if (ec) {
ldout(cct, 1) << "m_acceptor open fails: " << ec.message() << dendl;
return -1;
}
m_acceptor.bind(m_local_path, ec);
- if(ec) {
+ if (ec) {
ldout(cct, 1) << "m_acceptor bind fails: " << ec.message() << dendl;
return -1;
}
m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
- if(ec) {
+ if (ec) {
ldout(cct, 1) << "m_acceptor listen fails: " << ec.message() << dendl;
return -1;
}
}
void CacheSession::close() {
- if(m_dm_socket.is_open()) {
+ if (m_dm_socket.is_open()) {
boost::system::error_code close_ec;
m_dm_socket.close(close_ec);
- if(close_ec) {
+ if (close_ec) {
ldout(cct, 20) << "close: " << close_ec.message() << dendl;
}
}
void CacheSession::handle_request_header(const boost::system::error_code& err,
size_t bytes_transferred) {
ldout(cct, 20) << dendl;
- if(err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
+ if (err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
fault();
return;
}
ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer);
ceph_assert(head->version == 0);
ceph_assert(head->reserved == 0);
- ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ ||
- head->type == RBDSC_LOOKUP);
+ ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ);
read_request_data(head->data_len);
}
const boost::system::error_code& err,
size_t bytes_transferred) {
ldout(cct, 20) << dendl;
- if(err || bytes_transferred != data_len) {
+ if (err || bytes_transferred != data_len) {
fault();
return;
}
boost::asio::buffer(bl.c_str(), bl.length()),
boost::asio::transfer_exactly(bl.length()),
[this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
- if(err || bytes_transferred != bl.length()) {
+ if (err || bytes_transferred != bl.length()) {
fault();
return;
}
ldout(m_cct, 20) << dendl;
int ret = m_rados->init_with_context(m_cct);
- if(ret < 0) {
+ if (ret < 0) {
lderr(m_cct) << "fail to init Ceph context" << dendl;
return ret;
}
ret = m_rados->connect();
- if(ret < 0 ) {
+ if (ret < 0 ) {
lderr(m_cct) << "fail to connect to cluster" << dendl;
return ret;
}
ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl;
// rados read error
- if(ret != -ENOENT && ret < 0) {
+ if (ret != -ENOENT && ret < 0) {
lderr(m_cct) << "fail to read from rados" << dendl;
m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
ret = 0;
}
- uint32_t file_size = ret;
-
std::string cache_file_path = std::move(generate_cache_file_path(cache_file_name));
ret = read_buf->write_file(cache_file_path.c_str());
librados::AioCompletion* read_completion = create_rados_callback(on_finish);
// issue a zero-sized read req to get full obj
int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0);
- if(ret < 0) {
+ if (ret < 0) {
lderr(m_cct) << "failed to read from rados" << dendl;
}
read_completion->release();
std::string cache_file_dir = "";
- if(m_dir_num > 0) {
+ if (m_dir_num > 0) {
auto const pos = cache_file_name.find_last_of('.');
cache_file_dir = std::to_string(stoul(cache_file_name.substr(pos+1)) % m_dir_num);
}
RWLock::RLocker locker(m_cache_map_lock);
auto entry_it = m_cache_map.find(file_name);
- if(entry_it == m_cache_map.end()) {
+ if (entry_it == m_cache_map.end()) {
return OBJ_CACHE_NONE;
}
static const int RBDSC_REGISTER = 0X11;
static const int RBDSC_READ = 0X12;
-static const int RBDSC_LOOKUP = 0X13;
-static const int RBDSC_REGISTER_REPLY = 0X14;
-static const int RBDSC_READ_REPLY = 0X15;
-static const int RBDSC_READ_RADOS = 0X16;
+static const int RBDSC_REGISTER_REPLY = 0X13;
+static const int RBDSC_READ_REPLY = 0X14;
+static const int RBDSC_READ_RADOS = 0X15;
static const int ASIO_ERROR_READ = 0X01;
static const int ASIO_ERROR_WRITE = 0X02;