namespace immutable_obj_cache {
CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
- : cct(ceph_ctx), m_io_service_work(m_io_service),
+ : m_cct(ceph_ctx), m_io_service_work(m_io_service),
m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
m_io_thread(nullptr), m_session_work(false), m_writing(false),
m_reading(false), m_sequence_id(0),
m_lock("ceph::cache::cacheclient::m_lock") {
// TODO(dehao) : configure it.
- m_use_dedicated_worker = true;
- m_worker_thread_num = 2;
- if (m_use_dedicated_worker) {
+ m_worker_thread_num =
+ m_cct->_conf.get_val<uint64_t>(
+ "immutable_object_cache_client_dedicated_thread_num");
+
+ if (m_worker_thread_num != 0) {
m_worker = new boost::asio::io_service();
m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
for (uint64_t i = 0; i < m_worker_thread_num; i++) {
if (m_io_thread != nullptr) {
m_io_thread->join();
}
- if (m_use_dedicated_worker) {
+ if (m_worker_thread_num != 0) {
m_worker->stop();
for (auto thd : m_worker_threads) {
thd->join();
boost::system::error_code close_ec;
m_dm_socket.close(close_ec);
if (close_ec) {
- ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
}
}
fault(ASIO_ERROR_CONNECT, ec);
return -1;
}
- ldout(cct, 20) <<"connect success"<< dendl;
+ ldout(m_cct, 20) <<"connect success"<< dendl;
return 0;
}
delete reply;
});
- if (m_use_dedicated_worker) {
+ if (m_worker_thread_num != 0) {
m_worker->post([process_reply]() {
process_reply->complete(true);
});
// if there is one request fails, just execute fault, then shutdown RO.
void CacheClient::fault(const int err_type,
const boost::system::error_code& ec) {
- ldout(cct, 20) << "fault." << ec.message() << dendl;
+ ldout(m_cct, 20) << "fault." << ec.message() << dendl;
if (err_type == ASIO_ERROR_CONNECT) {
ceph_assert(!m_session_work.load());
if (ec == boost::asio::error::connection_refused) {
- ldout(cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
+ ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
<< ". Immutable-object-cache daemon is down ? "
<< "Data will be read from ceph cluster " << dendl;
} else {
- ldout(cct, 20) << "Connecting RO daemon fails : "
+ ldout(m_cct, 20) << "Connecting RO daemon fails : "
<< ec.message() << dendl;
}
boost::system::error_code close_ec;
m_dm_socket.close(close_ec);
if (close_ec) {
- ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
}
}
return;
m_session_work.store(false);
if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
- ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+ ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
ceph_assert(0);
}
if (err_type == ASIO_ERROR_READ) {
- ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+ ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
}
if (err_type == ASIO_ERROR_WRITE) {
- ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+ ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
// CacheClient should not occur this error.
ceph_assert(0);
}
m_seq_to_req.clear();
}
- ldout(cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
+ ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
Later all reading will be re-dispatched RADOS layer"
<< ec.message() << dendl;
}