m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
m_io_thread(nullptr), m_session_work(false), m_writing(false),
m_reading(false), m_sequence_id(0),
- m_lock("ceph::cache::cacheclient::m_lock")
- {
- // TODO : configure it.
+ m_lock("ceph::cache::cacheclient::m_lock") {
+ // TODO(dehao) : configure it.
m_use_dedicated_worker = true;
m_worker_thread_num = 2;
if (m_use_dedicated_worker) {
}
}
m_bp_header = buffer::create(get_header_size());
-
}
CacheClient::~CacheClient() {
stop();
}
- void CacheClient::run(){
+ void CacheClient::run() {
m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
}
return 0;
}
- void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
- std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
-
- ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0,
- pool_id, snap_id, oid, pool_nspace);
+ void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ GenContext<ObjectCacheRequest*>* on_finish) {
+ ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
+ ++m_sequence_id, 0, 0,
+ pool_id, snap_id, oid, pool_nspace);
req->process_msg = on_finish;
req->encode();
}
void CacheClient::read_reply_header() {
-
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(get_header_size()));
auto raw_ptr = bp_head.c_str();
}
void CacheClient::handle_reply_header(bufferptr bp_head,
- const boost::system::error_code& ec,
- size_t bytes_transferred) {
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
if (ec || bytes_transferred != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return;
read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
}
- void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+ void CacheClient::read_reply_data(bufferptr&& bp_head,
+ bufferptr&& bp_data,
const uint64_t data_len) {
-
auto raw_ptr = bp_data.c_str();
boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
boost::asio::transfer_exactly(data_len),
this, std::move(bp_head), std::move(bp_data), data_len,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
-
}
- void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+ void CacheClient::handle_reply_data(bufferptr bp_head,
+ bufferptr bp_data,
const uint64_t data_len,
const boost::system::error_code& ec,
size_t bytes_transferred) {
if (is_session_work()) {
receive_message();
}
-
}
void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
}
// if there is one request fails, just execute fault, then shutdown RO.
- void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
+ void CacheClient::fault(const int err_type,
+ const boost::system::error_code& ec) {
ldout(cct, 20) << "fault." << ec.message() << dendl;
if (err_type == ASIO_ERROR_CONNECT) {
<< ". Immutable-object-cache daemon is down ? "
<< "Data will be read from ceph cluster " << dendl;
} else {
- ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl;
+ ldout(cct, 20) << "Connecting RO daemon fails : "
+ << ec.message() << dendl;
}
if (m_dm_socket.is_open()) {
return;
}
- // when current session don't work, ASIO will don't receive any new request from hook.
- // On the other hand, for pending request of ASIO, cancle these request, then call their callback.
- // these request which are cancled by this method, will be re-dispatched to RADOS layer.
- //
- // make sure just have one thread to modify execute below code.
+ /* when current session don't work, ASIO will don't receive any new request from hook.
+ * On the other hand, for pending request of ASIO, cancle these request,
+ * then call their callback. these request which are cancled by this method,
+ * will be re-dispatched to RADOS layer.
+ * make sure just have one thread to modify execute below code. */
m_session_work.store(false);
if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
// currently, for any asio error, just shutdown RO.
close();
- // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
+ /* all pending request, which have entered into ASIO,
+ * will be re-dispatched to RADOS.*/
{
Mutex::Locker locker(m_lock);
for (auto it : m_seq_to_req) {
}
int CacheClient::register_client(Context* on_finish) {
- ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++);
+ ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
+ m_sequence_id++);
reg_req->encode();
bufferlist bl;
uint64_t data_len = get_data_len(m_bp_header.c_str());
bufferptr bp_data(buffer::create(data_len));
- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
+ data_len), ec);
if (ec || ret != data_len) {
fault(ASIO_ERROR_READ, ec);
return -1;
return 0;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#include <boost/bind.hpp>
#include <boost/asio/error.hpp>
#include <boost/algorithm/string.hpp>
+
#include "include/ceph_assert.h"
#include "include/Context.h"
#include "common/Mutex.h"
namespace immutable_obj_cache {
class CacheClient {
-public:
-
+ public:
CacheClient(const std::string& file, CephContext* ceph_ctx);
~CacheClient();
void run();
void close();
int stop();
int connect();
- void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
- std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
+ void lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ GenContext<ObjectCacheRequest*>* on_finish);
int register_client(Context* on_finish);
-private:
-
+ private:
void send_message();
void try_send();
void fault(const int err_type, const boost::system::error_code& err);
void process(ObjectCacheRequest* reply, uint64_t seq_id);
void read_reply_header();
void handle_reply_header(bufferptr bp_head,
- const boost::system::error_code& ec, size_t bytes_transferred);
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
const uint64_t data_len);
void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
const uint64_t data_len,
- const boost::system::error_code& ec, size_t bytes_transferred);
-private:
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
+ private:
CephContext* cct;
boost::asio::io_service m_io_service;
boost::asio::io_service::work m_io_service_work;
bufferptr m_bp_header;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
namespace ceph {
namespace immutable_obj_cache {
-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
+CacheController::CacheController(CephContext *cct,
+ const std::vector<const char*> &args):
m_args(args), m_cct(cct) {
ldout(m_cct, 20) << dendl;
}
ldout(m_cct, 20) << dendl;
m_object_cache_store = new ObjectCacheStore(m_cct);
- //TODO(): make this configurable
+ // TODO(dehao): make this configurable
int r = m_object_cache_store->init(true);
if (r < 0) {
lderr(m_cct) << "init error\n" << dendl;
void CacheController::run() {
try {
- std::string controller_path = m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
+ std::string controller_path =
+ m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
std::remove(controller_path.c_str());
m_cache_server = new CacheServer(m_cct, controller_path,
}
}
-void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
+void CacheController::handle_request(uint64_t session_id,
+ ObjectCacheRequest* req) {
ldout(m_cct, 20) << dendl;
switch (req->get_request_type()) {
case RBDSC_REGISTER: {
- // TODO(): skip register and allow clients to lookup directly
+ // TODO(dehao): skip register and allow clients to lookup directly
- ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
+ RBDSC_REGISTER_REPLY, req->seq);
m_cache_server->send(session_id, reply);
break;
}
// lookup object in local cache store
std::string cache_path;
ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req;
- int ret = m_object_cache_store->lookup_object(req_read_data->pool_namespace,
- req_read_data->pool_id,
- req_read_data->snap_id,
- req_read_data->oid,
- cache_path);
+ int ret = m_object_cache_store->lookup_object(
+ req_read_data->pool_namespace, req_read_data->pool_id,
+ req_read_data->snap_id, req_read_data->oid, cache_path);
ObjectCacheRequest* reply = nullptr;
if (ret != OBJ_CACHE_PROMOTED) {
reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
} else {
- reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path);
+ reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+ req->seq, cache_path);
}
m_cache_server->send(session_id, reply);
break;
}
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
ObjectCacheStore *m_object_cache_store;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
void CacheServer::accept() {
CacheSessionPtr new_session = nullptr;
- new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
+ new_session.reset(new CacheSession(m_session_id, m_io_service,
+ m_server_process_msg, cct));
m_acceptor.async_accept(new_session->socket(),
boost::bind(&CacheServer::handle_accept, this, new_session,
}
m_session_map.emplace(m_session_id, new_session);
- // TODO : session setting
+ // TODO(dehao) : session setting
new_session->start();
m_session_id++;
}
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
private:
void accept();
- void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
+ void handle_accept(CacheSessionPtr new_session,
+ const boost::system::error_code& error);
private:
CephContext* cct;
stream_protocol::endpoint m_local_path;
stream_protocol::acceptor m_acceptor;
uint64_t m_session_id = 1;
- // TODO : need to lock it.
+ // TODO(dehao) : need to lock it.
std::map<uint64_t, CacheSessionPtr> m_session_map;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
void CacheSession::read_request_header() {
ldout(cct, 20) << dendl;
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
- boost::asio::transfer_exactly(get_header_size()),
- boost::bind(&CacheSession::handle_request_header,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
+ boost::bind(&CacheSession::handle_request_header,
+ shared_from_this(), boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
void CacheSession::handle_request_header(const boost::system::error_code& err,
ldout(cct, 20) << dendl;
bufferptr bp_data(buffer::create(data_len));
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(bp_data.c_str(), bp_data.length()),
- boost::asio::transfer_exactly(data_len),
- boost::bind(&CacheSession::handle_request_data,
- shared_from_this(), bp_data, data_len,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheSession::handle_request_data,
+ shared_from_this(), bp_data, data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
boost::asio::async_write(m_dm_socket,
boost::asio::buffer(bl.c_str(), bl.length()),
boost::asio::transfer_exactly(bl.length()),
- [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
+ [this, bl, reply](const boost::system::error_code& err,
+ size_t bytes_transferred) {
if (err || bytes_transferred != bl.length()) {
fault();
return;
void CacheSession::fault() {
ldout(cct, 20) << dendl;
- // TODO
+ // TODO(dehao)
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
namespace immutable_obj_cache {
class CacheSession : public std::enable_shared_from_this<CacheSession> {
-public:
- CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx);
+ public:
+ CacheSession(uint64_t session_id, io_service& io_service,
+ ProcessMsg process_msg, CephContext* ctx);
~CacheSession();
stream_protocol::socket& socket();
void close();
void start();
void read_request_header();
- void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred);
+ void handle_request_header(const boost::system::error_code& err,
+ size_t bytes_transferred);
void read_request_data(uint64_t data_len);
void handle_request_data(bufferptr bp, uint64_t data_len,
- const boost::system::error_code& err, size_t bytes_transferred);
+ const boost::system::error_code& err,
+ size_t bytes_transferred);
void process(ObjectCacheRequest* req);
void fault();
void send(ObjectCacheRequest* msg);
-private:
+ private:
uint64_t m_session_id;
stream_protocol::socket m_dm_socket;
ProcessMsg m_server_process_msg;
typedef std::shared_ptr<CacheSession> CacheSessionPtr;
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
ObjectCacheStore::ObjectCacheStore(CephContext *cct)
: m_cct(cct), m_rados(new librados::Rados()),
m_ioctx_map_lock("ceph::cache::ObjectCacheStore::m_ioctx_map_lock") {
-
object_cache_max_size =
m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
m_cache_root_dir += "/";
}
- //TODO(): allow to set cache level
+ // TODO(dehao): allow to set cache level
m_policy = new SimplePolicy(m_cct, object_cache_max_size, 0.1);
}
}
ret = m_rados->connect();
- if (ret < 0 ) {
+ if (ret < 0) {
lderr(m_cct) << "fail to connect to cluster" << dendl;
return ret;
}
- //TODO(): fsck and reuse existing cache objects
+ // TODO(dehao): fsck and reuse existing cache objects
if (reset) {
std::error_code ec;
if (efs::exists(m_cache_root_dir)) {
int dir = m_dir_num - 1;
while (dir >= 0) {
- if (!efs::remove_all(m_cache_root_dir + "/" + std::to_string(dir), ec)) {
+ if (!efs::remove_all(
+ m_cache_root_dir + "/" + std::to_string(dir), ec)) {
lderr(m_cct) << "fail to remove old cache store: " << ec << dendl;
- return ec.value();
+ return ec.value();
}
- dir --;
+ dir--;
}
} else {
if (!efs::create_directories(m_cache_root_dir, ec)) {
int dir = m_dir_num - 1;
while (dir >= 0) {
efs::create_directories(cache_dir + "/" + std::to_string(dir));
- dir --;
+ dir--;
}
return 0;
}
librados::bufferlist* read_buf = new librados::bufferlist();
auto ctx = new FunctionContext([this, read_buf, cache_file_name](int ret) {
- handle_promote_callback(ret, read_buf, cache_file_name);
- });
+ handle_promote_callback(ret, read_buf, cache_file_name);
+ });
- return promote_object(&ioctx, object_name, read_buf, ctx);
+ return promote_object(&ioctx, object_name, read_buf, ctx);
}
int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
cache_status_t ret = m_policy->lookup_object(cache_file_name);
- switch(ret) {
+ switch (ret) {
case OBJ_CACHE_NONE: {
pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
if (pret < 0) {
std::list<std::string> obj_list;
m_policy->get_evict_list(&obj_list);
- for (auto& obj: obj_list) {
+ for (auto& obj : obj_list) {
do_evict(obj);
}
}
ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
- // TODO(): possible race on read?
+ // TODO(dehao): possible race on read?
int ret = std::remove(cache_file_path.c_str());
- // evict metadata
+ // evict metadata
if (ret == 0) {
m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
m_policy->evict_entry(cache_file);
}
std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name) {
-
std::string cache_file_dir = "";
if (m_dir_num > 0) {
uint32_t crc = 0;
- crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), cache_file_name.length());
+ crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
+ cache_file_name.length());
cache_file_dir = std::to_string(crc % m_dir_num);
}
return m_cache_root_dir + cache_file_dir + "/" + cache_file_name;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
typedef shared_ptr<librados::Rados> RadosRef;
typedef shared_ptr<librados::IoCtx> IoCtxRef;
-class ObjectCacheStore
-{
- public:
- ObjectCacheStore(CephContext *cct);
- ~ObjectCacheStore();
- int init(bool reset);
- int shutdown();
- int init_cache();
- int lookup_object(std::string pool_nspace,
- uint64_t pool_id, uint64_t snap_id,
- std::string object_name,
- std::string& target_cache_file_path);
-
- private:
- std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string oid);
- std::string get_cache_file_path(std::string cache_file_name);
- int evict_objects();
- int do_promote(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string object_name);
- int promote_object(librados::IoCtx*, std::string object_name,
- librados::bufferlist* read_buf,
- Context* on_finish);
- int handle_promote_callback(int, bufferlist*, std::string);
- int do_evict(std::string cache_file);
-
- CephContext *m_cct;
- RadosRef m_rados;
- std::map<uint64_t, librados::IoCtx> m_ioctx_map;
- Mutex m_ioctx_map_lock;
- Policy* m_policy;
- //TODO(): make this configurable
- int m_dir_num = 10;
- uint64_t object_cache_max_size;
- std::string m_cache_root_dir;
+class ObjectCacheStore {
+ public:
+ ObjectCacheStore(CephContext *cct);
+ ~ObjectCacheStore();
+ int init(bool reset);
+ int shutdown();
+ int init_cache();
+ int lookup_object(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
+ std::string object_name,
+ std::string& target_cache_file_path);
+
+ private:
+ std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid);
+ std::string get_cache_file_path(std::string cache_file_name);
+ int evict_objects();
+ int do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name);
+ int promote_object(librados::IoCtx*, std::string object_name,
+ librados::bufferlist* read_buf,
+ Context* on_finish);
+ int handle_promote_callback(int, bufferlist*, std::string);
+ int do_evict(std::string cache_file);
+
+ CephContext *m_cct;
+ RadosRef m_rados;
+ std::map<uint64_t, librados::IoCtx> m_ioctx_map;
+ Mutex m_ioctx_map_lock;
+ Policy* m_policy;
+ // TODO(dehao): make this configurable
+ int m_dir_num = 10;
+ uint64_t object_cache_max_size;
+ std::string m_cache_root_dir;
};
-} // namespace ceph
-} // namespace immutable_obj_cache
+} // namespace ceph
+} // namespace immutable_obj_cache
#endif
OBJ_CACHE_SKIP,
} cache_status_t;
-
class Policy {
-public:
- Policy(){}
- virtual ~Policy(){};
+ public:
+ Policy() {}
+ virtual ~Policy() {}
virtual cache_status_t lookup_object(std::string) = 0;
virtual int evict_entry(std::string) = 0;
- virtual void update_status(std::string, cache_status_t, uint64_t size=0) = 0;
+ virtual void update_status(std::string, cache_status_t,
+ uint64_t size = 0) = 0;
virtual cache_status_t get_status(std::string) = 0;
virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
: cct(cct), m_watermark(watermark), m_max_cache_size(cache_size),
m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock") {
ldout(cct, 20) << dendl;
- m_max_inflight_ops = cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
+ m_max_inflight_ops = cct->_conf.get_val<uint64_t>(
+ "immutable_object_cache_max_inflight_ops");
m_cache_size = 0;
}
SimplePolicy::~SimplePolicy() {
ldout(cct, 20) << dendl;
- for (auto it: m_cache_map) {
+ for (auto it : m_cache_map) {
Entry* entry = (it.second);
delete entry;
}
-
-
}
cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
return OBJ_CACHE_SKIP;
}
- if ((m_cache_size < m_max_cache_size) && (inflight_ops < m_max_inflight_ops)) {
+ if ((m_cache_size < m_max_cache_size) &&
+ (inflight_ops < m_max_inflight_ops)) {
Entry* entry = new Entry();
ceph_assert(entry != nullptr);
m_cache_map[file_name] = entry;
wlocker.unlock();
update_status(file_name, OBJ_CACHE_SKIP);
- return OBJ_CACHE_NONE; // start promotion request
+ return OBJ_CACHE_NONE; // start promotion request
}
// if there's no free entry, return skip to read from rados
m_cache_size -= size;
return;
}
-
}
int SimplePolicy::evict_entry(std::string file_name) {
RWLock::WLocker locker(m_cache_map_lock);
// check free ratio, pop entries from LRU
if ((double)m_cache_size / m_max_cache_size > (1 - m_watermark)) {
- int evict_num = m_cache_map.size() * 0.1; //TODO(): make this configurable
+ // TODO(dehao): make this configurable
+ int evict_num = m_cache_map.size() * 0.1;
for (int i = 0; i < evict_num; i++) {
Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
if (entry == nullptr) {
}
std::string file_name = entry->file_name;
obj_list->push_back(file_name);
-
}
}
}
return entry->file_name;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
namespace immutable_obj_cache {
class SimplePolicy : public Policy {
-public:
+ public:
SimplePolicy(CephContext *cct, uint64_t block_num, float watermark);
- ~SimplePolicy() ;
+ ~SimplePolicy();
cache_status_t lookup_object(std::string file_name);
cache_status_t get_status(std::string file_name);
void update_status(std::string file_name,
- cache_status_t new_status, uint64_t size=0);
+ cache_status_t new_status,
+ uint64_t size = 0);
int evict_entry(std::string file_name);
uint64_t get_promoted_entry_num();
std::string get_evict_entry();
-private:
+ private:
cache_status_t alloc_entry(std::string file_name);
class Entry : public LRUObject {
- public:
- cache_status_t status;
- Entry() : status(OBJ_CACHE_NONE){}
- std::string file_name;
- uint64_t size;
+ public:
+ cache_status_t status;
+ Entry() : status(OBJ_CACHE_NONE) {}
+ std::string file_name;
+ uint64_t size;
};
CephContext* cct;
std::unordered_map<std::string, Entry*> m_cache_map;
RWLock m_cache_map_lock;
-
std::deque<Entry*> m_free_list;
std::atomic<uint64_t> m_cache_size;
LRU m_promoted_lru;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
-#endif // CEPH_CACHE_SIMPLE_POLICY_H
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_SIMPLE_POLICY_H
typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
namespace ceph {
namespace immutable_obj_cache {
-ObjectCacheRequest::ObjectCacheRequest(){}
+ObjectCacheRequest::ObjectCacheRequest() {}
ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
: type(t), seq(s) {}
-ObjectCacheRequest::~ObjectCacheRequest(){}
+ObjectCacheRequest::~ObjectCacheRequest() {}
void ObjectCacheRequest::encode() {
ENCODE_START(1, 1, payload);
void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
- uint64_t read_offset, uint64_t read_len,
+ uint64_t read_offset,
+ uint64_t read_len,
uint64_t pool_id, uint64_t snap_id,
- std::string oid, std::string pool_namespace)
+ std::string oid,
+ std::string pool_namespace)
: ObjectCacheRequest(t, s), read_offset(read_offset),
read_len(read_len), pool_id(pool_id), snap_id(snap_id),
oid(oid), pool_namespace(pool_namespace)
ceph::decode(pool_namespace, i);
}
-ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path)
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
+ string cache_path)
: ObjectCacheRequest(t, s), cache_path(cache_path) {}
ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
: ObjectCacheRequest(t, s) {}
void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
-ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
-{
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
ObjectCacheRequest* req = nullptr;
uint16_t type;
ceph::decode(seq, i);
DECODE_FINISH(i);
- switch(type) {
+ switch (type) {
case RBDSC_REGISTER: {
req = new ObjectCacheRegData(type, seq);
break;
return req;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
HeaderHelper* header = (HeaderHelper*)buf;
return header->len;
}
-}
+} // namespace
class ObjectCacheRequest {
-public:
+ public:
uint16_t type;
uint64_t seq;
// encode consists of two steps
// step 1 : directly encode common bits using encode method of base classs.
- // step 2 : according to payload_empty, determine whether addtional bits need to
- // be encoded which be implements by child class.
+ // step 2 : according to payload_empty, determine whether addtional bits
+ // need to be encoded which be implements by child class.
void encode();
void decode(bufferlist& bl);
bufferlist get_payload_bufferlist() { return payload; }
};
class ObjectCacheRegData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheRegData();
ObjectCacheRegData(uint16_t t, uint64_t s);
~ObjectCacheRegData() override;
};
class ObjectCacheRegReplyData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheRegReplyData();
ObjectCacheRegReplyData(uint16_t t, uint64_t s);
~ObjectCacheRegReplyData() override;
};
class ObjectCacheReadData : public ObjectCacheRequest {
-public:
+ public:
uint64_t read_offset;
uint64_t read_len;
uint64_t pool_id;
uint64_t snap_id;
std::string oid;
std::string pool_namespace;
- ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id,
- uint64_t snap_id, std::string oid, std::string pool_namespace );
+ ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
+ uint64_t read_len, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ std::string pool_namespace);
ObjectCacheReadData(uint16_t t, uint64_t s);
~ObjectCacheReadData() override;
void encode_payload() override;
};
class ObjectCacheReadReplyData : public ObjectCacheRequest {
-public:
+ public:
std::string cache_path;
ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
ObjectCacheReadReplyData(uint16_t t, uint64_t s);
};
class ObjectCacheReadRadosData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheReadRadosData();
ObjectCacheReadRadosData(uint16_t t, uint64_t s);
~ObjectCacheReadRadosData() override;
ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
#include "include/rados/librados.hpp"
#include "include/Context.h"
-
namespace ceph {
namespace immutable_obj_cache {
namespace detail {
(obj->*MF)(r);
}
-} // namespace detail
+} // namespace detail
template <typename T, void(T::*MF)(int)=&T::complete>
librados::AioCompletion *create_rados_callback(T *obj) {
obj, &detail::rados_callback<T, MF>, nullptr);
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
std::cout << "usage: cache controller [options...]" << std::endl;
std::cout << "options:\n";
std::cout << " -m monaddress[:port] connect to specified monitor\n";
- std::cout << " --keyring=<path> path to keyring for local cluster\n";
+ std::cout << " --keyring=<path> path to keyring for local "
+ << "cluster\n";
std::cout << " --log-file=<logfile> file to log debug output\n";
- std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> set debug level\n";
+ std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> "
+ << "set debug level\n";
generic_server_usage();
}
-static void handle_signal(int signum)
-{
+static void handle_signal(int signum) {
if (cachectl)
cachectl->handle_signal(signum);
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
std::vector<const char*> args;
env_to_vec(args);
argv_to_vec(argc, argv, args);
}
auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
- CODE_ENVIRONMENT_DAEMON,
- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
if (g_conf()->daemonize) {
global_init_daemonize(g_ceph_context);