]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools: adjust code style of RO
authorshangdehao1 <dehao.shang@intel.com>
Wed, 6 Mar 2019 22:24:16 +0000 (06:24 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:30 +0000 (00:16 +0800)
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
18 files changed:
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/CacheController.h
src/tools/immutable_object_cache/CacheServer.cc
src/tools/immutable_object_cache/CacheServer.h
src/tools/immutable_object_cache/CacheSession.cc
src/tools/immutable_object_cache/CacheSession.h
src/tools/immutable_object_cache/ObjectCacheStore.cc
src/tools/immutable_object_cache/ObjectCacheStore.h
src/tools/immutable_object_cache/Policy.h
src/tools/immutable_object_cache/SimplePolicy.cc
src/tools/immutable_object_cache/SimplePolicy.h
src/tools/immutable_object_cache/SocketCommon.h
src/tools/immutable_object_cache/Types.cc
src/tools/immutable_object_cache/Types.h
src/tools/immutable_object_cache/Utils.h
src/tools/immutable_object_cache/main.cc

index ba226dd34cf1aab643d8fc535885027ce79412b4..58083bbb9ae797d370acf80c55c403f6c5c2edb7 100644 (file)
@@ -17,9 +17,8 @@ namespace immutable_obj_cache {
       m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
       m_io_thread(nullptr), m_session_work(false), m_writing(false),
       m_reading(false), m_sequence_id(0),
-      m_lock("ceph::cache::cacheclient::m_lock")
-  {
-    // TODO : configure it.
+      m_lock("ceph::cache::cacheclient::m_lock") {
+    // TODO(dehao) : configure it.
     m_use_dedicated_worker = true;
     m_worker_thread_num = 2;
     if (m_use_dedicated_worker) {
@@ -31,14 +30,13 @@ namespace immutable_obj_cache {
       }
     }
     m_bp_header = buffer::create(get_header_size());
-
   }
 
   CacheClient::~CacheClient() {
     stop();
   }
 
-  void CacheClient::run(){
+  void CacheClient::run() {
      m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
   }
 
@@ -85,11 +83,12 @@ namespace immutable_obj_cache {
     return 0;
   }
 
-  void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
-                                  std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
-
-    ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0,
-                                                      pool_id, snap_id, oid, pool_nspace);
+  void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
+                                  uint64_t snap_id, std::string oid,
+                                  GenContext<ObjectCacheRequest*>* on_finish) {
+    ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
+                                    ++m_sequence_id, 0, 0,
+                                    pool_id, snap_id, oid, pool_nspace);
     req->process_msg = on_finish;
     req->encode();
 
@@ -160,7 +159,6 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::read_reply_header() {
-
     /* create new head buffer for every reply */
     bufferptr bp_head(buffer::create(get_header_size()));
     auto raw_ptr = bp_head.c_str();
@@ -175,8 +173,8 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::handle_reply_header(bufferptr bp_head,
-                                        const boost::system::error_code& ec,
-                                        size_t bytes_transferred) {
+         const boost::system::error_code& ec,
+         size_t bytes_transferred) {
     if (ec || bytes_transferred != get_header_size()) {
       fault(ASIO_ERROR_READ, ec);
       return;
@@ -190,9 +188,9 @@ namespace immutable_obj_cache {
     read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
   }
 
-  void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+  void CacheClient::read_reply_data(bufferptr&& bp_head,
+                                    bufferptr&& bp_data,
                                     const uint64_t data_len) {
-
     auto raw_ptr = bp_data.c_str();
     boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
       boost::asio::transfer_exactly(data_len),
@@ -200,10 +198,10 @@ namespace immutable_obj_cache {
                   this, std::move(bp_head), std::move(bp_data), data_len,
                   boost::asio::placeholders::error,
                   boost::asio::placeholders::bytes_transferred));
-
   }
 
-  void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+  void CacheClient::handle_reply_data(bufferptr bp_head,
+                                      bufferptr bp_data,
                                       const uint64_t data_len,
                                       const boost::system::error_code& ec,
                                       size_t bytes_transferred) {
@@ -233,7 +231,6 @@ namespace immutable_obj_cache {
     if (is_session_work()) {
       receive_message();
     }
-
   }
 
   void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
@@ -266,7 +263,8 @@ namespace immutable_obj_cache {
   }
 
   // if there is one request fails, just execute fault, then shutdown RO.
-  void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
+  void CacheClient::fault(const int err_type,
+                          const boost::system::error_code& ec) {
     ldout(cct, 20) << "fault." << ec.message() << dendl;
 
     if (err_type == ASIO_ERROR_CONNECT) {
@@ -276,7 +274,8 @@ namespace immutable_obj_cache {
                         << ". Immutable-object-cache daemon is down ? "
                         << "Data will be read from ceph cluster " << dendl;
        } else {
-         ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl;
+         ldout(cct, 20) << "Connecting RO daemon fails : "
+                        << ec.message() << dendl;
        }
 
        if (m_dm_socket.is_open()) {
@@ -296,11 +295,11 @@ namespace immutable_obj_cache {
       return;
     }
 
-    // when current session don't work, ASIO will don't receive any new request from hook.
-    // On the other hand, for pending request of ASIO, cancle these request, then call their callback.
-    // these request which are cancled by this method, will be re-dispatched to RADOS layer.
-    //
-    // make sure just have one thread to modify execute below code.
+    /* when current session don't work, ASIO will don't receive any new request from hook.
+     * On the other hand, for pending request of ASIO, cancle these request,
+     * then call their callback. these request which are cancled by this method,
+     * will be re-dispatched to RADOS layer.
+     * make sure just have one thread to modify execute below code. */
     m_session_work.store(false);
 
     if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
@@ -321,7 +320,8 @@ namespace immutable_obj_cache {
     // currently, for any asio error, just shutdown RO.
     close();
 
-    // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
+    /* all pending request, which have entered into ASIO,
+     * will be re-dispatched to RADOS.*/
     {
       Mutex::Locker locker(m_lock);
       for (auto it : m_seq_to_req) {
@@ -337,7 +337,8 @@ namespace immutable_obj_cache {
   }
 
   int CacheClient::register_client(Context* on_finish) {
-    ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++);
+    ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
+                                                         m_sequence_id++);
     reg_req->encode();
 
     bufferlist bl;
@@ -365,7 +366,8 @@ namespace immutable_obj_cache {
     uint64_t data_len = get_data_len(m_bp_header.c_str());
     bufferptr bp_data(buffer::create(data_len));
 
-    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
+    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
+                            data_len), ec);
     if (ec || ret != data_len) {
       fault(ASIO_ERROR_READ, ec);
       return -1;
@@ -387,5 +389,5 @@ namespace immutable_obj_cache {
     return 0;
   }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 8917062286c18a771a3f450782a442f8a84f770d..47a39cb1432191b17487a3ff470653999f6136cc 100644 (file)
@@ -9,6 +9,7 @@
 #include <boost/bind.hpp>
 #include <boost/asio/error.hpp>
 #include <boost/algorithm/string.hpp>
+
 #include "include/ceph_assert.h"
 #include "include/Context.h"
 #include "common/Mutex.h"
@@ -22,8 +23,7 @@ namespace ceph {
 namespace immutable_obj_cache {
 
 class CacheClient {
-public:
-
+ public:
   CacheClient(const std::string& file, CephContext* ceph_ctx);
   ~CacheClient();
   void run();
@@ -31,12 +31,12 @@ public:
   void close();
   int stop();
   int connect();
-  void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
-                     std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
+  void lookup_object(std::string pool_nspace, uint64_t pool_id,
+                     uint64_t snap_id, std::string oid,
+                     GenContext<ObjectCacheRequest*>* on_finish);
   int register_client(Context* on_finish);
 
-private:
-
+ private:
   void send_message();
   void try_send();
   void fault(const int err_type, const boost::system::error_code& err);
@@ -45,14 +45,16 @@ private:
   void process(ObjectCacheRequest* reply, uint64_t seq_id);
   void read_reply_header();
   void handle_reply_header(bufferptr bp_head,
-                           const boost::system::error_code& ec, size_t bytes_transferred);
+                           const boost::system::error_code& ec,
+                           size_t bytes_transferred);
   void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
                        const uint64_t data_len);
   void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
                         const uint64_t data_len,
-                        const boost::system::error_code& ec, size_t bytes_transferred);
-private:
+                        const boost::system::error_code& ec,
+                        size_t bytes_transferred);
 
+ private:
   CephContext* cct;
   boost::asio::io_service m_io_service;
   boost::asio::io_service::work m_io_service_work;
@@ -76,6 +78,6 @@ private:
   bufferptr m_bp_header;
 };
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 #endif
index 7648a3a6d05d2e1eb0c42c4a045a2a3d35d07909..4fc2dce910ce05868806e665419e76e13f99aaf8 100644 (file)
@@ -12,7 +12,8 @@
 namespace ceph {
 namespace immutable_obj_cache {
 
-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
+CacheController::CacheController(CephContext *cct,
+                                 const std::vector<const char*> &args):
   m_args(args), m_cct(cct) {
   ldout(m_cct, 20) << dendl;
 }
@@ -26,7 +27,7 @@ int CacheController::init() {
   ldout(m_cct, 20) << dendl;
 
   m_object_cache_store = new ObjectCacheStore(m_cct);
-  //TODO(): make this configurable
+  // TODO(dehao): make this configurable
   int r = m_object_cache_store->init(true);
   if (r < 0) {
     lderr(m_cct) << "init error\n" << dendl;
@@ -65,7 +66,8 @@ void CacheController::handle_signal(int signum) {
 
 void CacheController::run() {
   try {
-    std::string controller_path = m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
+    std::string controller_path =
+      m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
     std::remove(controller_path.c_str());
 
     m_cache_server = new CacheServer(m_cct, controller_path,
@@ -80,14 +82,16 @@ void CacheController::run() {
   }
 }
 
-void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
+void CacheController::handle_request(uint64_t session_id,
+                                     ObjectCacheRequest* req) {
   ldout(m_cct, 20) << dendl;
 
   switch (req->get_request_type()) {
     case RBDSC_REGISTER: {
-      // TODO(): skip register and allow clients to lookup directly
+      // TODO(dehao): skip register and allow clients to lookup directly
 
-      ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+      ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
+        RBDSC_REGISTER_REPLY, req->seq);
       m_cache_server->send(session_id, reply);
       break;
     }
@@ -95,16 +99,15 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re
       // lookup object in local cache store
       std::string cache_path;
       ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req;
-      int ret = m_object_cache_store->lookup_object(req_read_data->pool_namespace,
-                                                    req_read_data->pool_id,
-                                                    req_read_data->snap_id,
-                                                    req_read_data->oid,
-                                                    cache_path);
+      int ret = m_object_cache_store->lookup_object(
+        req_read_data->pool_namespace, req_read_data->pool_id,
+        req_read_data->snap_id, req_read_data->oid, cache_path);
       ObjectCacheRequest* reply = nullptr;
       if (ret != OBJ_CACHE_PROMOTED) {
         reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
       } else {
-        reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path);
+        reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+                                             req->seq, cache_path);
       }
       m_cache_server->send(session_id, reply);
       break;
@@ -115,5 +118,5 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re
   }
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 79b3b9525de1b8e4fd0a1b05c5a1ccd63ab6f328..ef8495f767ef34a0f21adfb2006b968a419b37d3 100644 (file)
@@ -34,7 +34,7 @@ class CacheController {
   ObjectCacheStore *m_object_cache_store;
 };
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 
 #endif
index e10db19c44a0e307fcd9f9ef77e4f84f6a3d1805..0df58d55f50afde8bf738db1373476e0bc3b3816 100644 (file)
@@ -76,7 +76,8 @@ int CacheServer::start_accept() {
 void CacheServer::accept() {
   CacheSessionPtr new_session = nullptr;
 
-  new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
+  new_session.reset(new CacheSession(m_session_id, m_io_service,
+                    m_server_process_msg, cct));
 
   m_acceptor.async_accept(new_session->socket(),
       boost::bind(&CacheServer::handle_accept, this, new_session,
@@ -93,7 +94,7 @@ void CacheServer::handle_accept(CacheSessionPtr new_session,
   }
 
   m_session_map.emplace(m_session_id, new_session);
-  // TODO : session setting
+  // TODO(dehao) : session setting
   new_session->start();
   m_session_id++;
 
@@ -113,5 +114,5 @@ void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
   }
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 4646a15952b6fbf100d4da35bd899eb2a1982f3f..84cdd89e5e1f0f9644635997fd19ec0943d08c08 100644 (file)
@@ -29,7 +29,8 @@ class CacheServer {
 
  private:
   void accept();
-  void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
+  void handle_accept(CacheSessionPtr new_session,
+                     const boost::system::error_code& error);
 
  private:
   CephContext* cct;
@@ -38,11 +39,11 @@ class CacheServer {
   stream_protocol::endpoint m_local_path;
   stream_protocol::acceptor m_acceptor;
   uint64_t m_session_id = 1;
-  // TODO : need to lock it.
+  // TODO(dehao) : need to lock it.
   std::map<uint64_t, CacheSessionPtr> m_session_map;
 };
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 
 #endif
index e4f76a7c806bbbdcbe009f8956eb59f5d72c0993..6069f811259821286cb6de4c699cbef60b82052c 100644 (file)
@@ -47,12 +47,11 @@ void CacheSession::start() {
 void CacheSession::read_request_header() {
   ldout(cct, 20) << dendl;
   boost::asio::async_read(m_dm_socket,
-                          boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
-                          boost::asio::transfer_exactly(get_header_size()),
-                          boost::bind(&CacheSession::handle_request_header,
-                                      shared_from_this(),
-                                      boost::asio::placeholders::error,
-                                      boost::asio::placeholders::bytes_transferred));
+    boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
+    boost::asio::transfer_exactly(get_header_size()),
+    boost::bind(&CacheSession::handle_request_header,
+     shared_from_this(), boost::asio::placeholders::error,
+     boost::asio::placeholders::bytes_transferred));
 }
 
 void CacheSession::handle_request_header(const boost::system::error_code& err,
@@ -70,12 +69,12 @@ void CacheSession::read_request_data(uint64_t data_len) {
   ldout(cct, 20) << dendl;
   bufferptr bp_data(buffer::create(data_len));
   boost::asio::async_read(m_dm_socket,
-                          boost::asio::buffer(bp_data.c_str(), bp_data.length()),
-                          boost::asio::transfer_exactly(data_len),
-                          boost::bind(&CacheSession::handle_request_data,
-                                      shared_from_this(), bp_data, data_len,
-                                      boost::asio::placeholders::error,
-                                      boost::asio::placeholders::bytes_transferred));
+    boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+    boost::asio::transfer_exactly(data_len),
+    boost::bind(&CacheSession::handle_request_data,
+      shared_from_this(), bp_data, data_len,
+      boost::asio::placeholders::error,
+      boost::asio::placeholders::bytes_transferred));
 }
 
 void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
@@ -112,7 +111,8 @@ void CacheSession::send(ObjectCacheRequest* reply) {
   boost::asio::async_write(m_dm_socket,
         boost::asio::buffer(bl.c_str(), bl.length()),
         boost::asio::transfer_exactly(bl.length()),
-        [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
+        [this, bl, reply](const boost::system::error_code& err,
+          size_t bytes_transferred) {
           if (err || bytes_transferred != bl.length()) {
             fault();
             return;
@@ -123,8 +123,8 @@ void CacheSession::send(ObjectCacheRequest* reply) {
 
 void CacheSession::fault() {
   ldout(cct, 20) << dendl;
-  // TODO
+  // TODO(dehao)
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 5f5cd1be9bd4d14b68351d78547d4216d7377a70..36bd5dbaad9f9120300bd9071b99ad6a67f667c0 100644 (file)
@@ -18,22 +18,25 @@ namespace ceph {
 namespace immutable_obj_cache {
 
 class CacheSession : public std::enable_shared_from_this<CacheSession> {
-public:
-  CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx);
+ public:
+  CacheSession(uint64_t session_id, io_service& io_service,
+               ProcessMsg process_msg, CephContext* ctx);
   ~CacheSession();
   stream_protocol::socket& socket();
   void close();
   void start();
   void read_request_header();
-  void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred);
+  void handle_request_header(const boost::system::error_code& err,
+                             size_t bytes_transferred);
   void read_request_data(uint64_t data_len);
   void handle_request_data(bufferptr bp, uint64_t data_len,
-                          const boost::system::error_code& err, size_t bytes_transferred);
+                          const boost::system::error_code& err,
+                          size_t bytes_transferred);
   void process(ObjectCacheRequest* req);
   void fault();
   void send(ObjectCacheRequest* msg);
 
-private:
+ private:
   uint64_t m_session_id;
   stream_protocol::socket m_dm_socket;
   ProcessMsg m_server_process_msg;
@@ -44,7 +47,7 @@ private:
 
 typedef std::shared_ptr<CacheSession> CacheSessionPtr;
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 
 #endif
index cafe4a731e8a8a3afa7b8a654dcee12e3f4fdd57..f0b6eb1a5ab90d22e47960c296438d0c8e759c13 100644 (file)
@@ -19,7 +19,6 @@ namespace immutable_obj_cache {
 ObjectCacheStore::ObjectCacheStore(CephContext *cct)
       : m_cct(cct), m_rados(new librados::Rados()),
         m_ioctx_map_lock("ceph::cache::ObjectCacheStore::m_ioctx_map_lock") {
-
   object_cache_max_size =
     m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
 
@@ -30,7 +29,7 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct)
     m_cache_root_dir += "/";
   }
 
-  //TODO(): allow to set cache level
+  // TODO(dehao): allow to set cache level
   m_policy = new SimplePolicy(m_cct, object_cache_max_size, 0.1);
 }
 
@@ -48,22 +47,23 @@ int ObjectCacheStore::init(bool reset) {
   }
 
   ret = m_rados->connect();
-  if (ret < 0 ) {
+  if (ret < 0) {
     lderr(m_cct) << "fail to connect to cluster" << dendl;
     return ret;
   }
 
-  //TODO(): fsck and reuse existing cache objects
+  // TODO(dehao): fsck and reuse existing cache objects
   if (reset) {
     std::error_code ec;
     if (efs::exists(m_cache_root_dir)) {
        int dir = m_dir_num - 1;
        while (dir >= 0) {
-         if (!efs::remove_all(m_cache_root_dir + "/" + std::to_string(dir), ec)) {
+         if (!efs::remove_all(
+           m_cache_root_dir + "/" + std::to_string(dir), ec)) {
            lderr(m_cct) << "fail to remove old cache store: " << ec << dendl;
-          return ec.value();
+           return ec.value();
          }
-         dir --;
+         dir--;
        }
     } else {
       if (!efs::create_directories(m_cache_root_dir, ec)) {
@@ -89,7 +89,7 @@ int ObjectCacheStore::init_cache() {
   int dir = m_dir_num - 1;
   while (dir >= 0) {
     efs::create_directories(cache_dir + "/" + std::to_string(dir));
-    dir --;
+    dir--;
   }
   return 0;
 }
@@ -126,10 +126,10 @@ int ObjectCacheStore::do_promote(std::string pool_nspace,
   librados::bufferlist* read_buf = new librados::bufferlist();
 
   auto ctx = new FunctionContext([this, read_buf, cache_file_name](int ret) {
-      handle_promote_callback(ret, read_buf, cache_file_name);
-   });
+    handle_promote_callback(ret, read_buf, cache_file_name);
+  });
 
-   return promote_object(&ioctx, object_name, read_buf, ctx);
+  return promote_object(&ioctx, object_name, read_buf, ctx);
 }
 
 int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
@@ -186,7 +186,7 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace,
 
   cache_status_t ret = m_policy->lookup_object(cache_file_name);
 
-  switch(ret) {
+  switch (ret) {
     case OBJ_CACHE_NONE: {
       pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
       if (pret < 0) {
@@ -227,7 +227,7 @@ int ObjectCacheStore::evict_objects() {
 
   std::list<std::string> obj_list;
   m_policy->get_evict_list(&obj_list);
-  for (auto& obj: obj_list) {
+  for (auto& obj : obj_list) {
     do_evict(obj);
   }
 }
@@ -243,9 +243,9 @@ int ObjectCacheStore::do_evict(std::string cache_file) {
 
   ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
 
-  // TODO(): possible race on read?
+  // TODO(dehao): possible race on read?
   int ret = std::remove(cache_file_path.c_str());
-   // evict metadata
+  // evict metadata
   if (ret == 0) {
     m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
     m_policy->evict_entry(cache_file);
@@ -263,16 +263,16 @@ std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace,
 }
 
 std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name) {
-
   std::string cache_file_dir = "";
   if (m_dir_num > 0) {
     uint32_t crc = 0;
-    crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), cache_file_name.length());
+    crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
+                     cache_file_name.length());
     cache_file_dir = std::to_string(crc % m_dir_num);
   }
 
   return m_cache_root_dir + cache_file_dir + "/" + cache_file_name;
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 935e2970babca864a15807ea73a480820309c2ac..fa86aca4032cc823a2c6d48629fb4fab2c4c0802 100644 (file)
@@ -21,43 +21,42 @@ namespace immutable_obj_cache {
 typedef shared_ptr<librados::Rados> RadosRef;
 typedef shared_ptr<librados::IoCtx> IoCtxRef;
 
-class ObjectCacheStore
-{
-  public:
-    ObjectCacheStore(CephContext *cct);
-    ~ObjectCacheStore();
-    int init(bool reset);
-    int shutdown();
-    int init_cache();
-    int lookup_object(std::string pool_nspace,
-                      uint64_t pool_id, uint64_t snap_id,
-                      std::string object_name,
-                      std::string& target_cache_file_path);
-
-  private:
-    std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
-                                         uint64_t snap_id, std::string oid);
-    std::string get_cache_file_path(std::string cache_file_name);
-    int evict_objects();
-    int do_promote(std::string pool_nspace, uint64_t pool_id,
-                    uint64_t snap_id, std::string object_name);
-    int promote_object(librados::IoCtx*, std::string object_name,
-                       librados::bufferlist* read_buf,
-                       Context* on_finish);
-   int handle_promote_callback(int, bufferlist*, std::string);
-   int do_evict(std::string cache_file);
-
-    CephContext *m_cct;
-    RadosRef m_rados;
-    std::map<uint64_t, librados::IoCtx> m_ioctx_map;
-    Mutex m_ioctx_map_lock;
-    Policy* m_policy;
-    //TODO(): make this configurable
-    int m_dir_num = 10;
-    uint64_t object_cache_max_size;
-    std::string m_cache_root_dir;
+class ObjectCacheStore {
+ public:
+  ObjectCacheStore(CephContext *cct);
+  ~ObjectCacheStore();
+  int init(bool reset);
+  int shutdown();
+  int init_cache();
+  int lookup_object(std::string pool_nspace,
+                    uint64_t pool_id, uint64_t snap_id,
+                    std::string object_name,
+                    std::string& target_cache_file_path);
+
+ private:
+  std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
+                                  uint64_t snap_id, std::string oid);
+  std::string get_cache_file_path(std::string cache_file_name);
+  int evict_objects();
+  int do_promote(std::string pool_nspace, uint64_t pool_id,
+                 uint64_t snap_id, std::string object_name);
+  int promote_object(librados::IoCtx*, std::string object_name,
+                     librados::bufferlist* read_buf,
+                     Context* on_finish);
+  int handle_promote_callback(int, bufferlist*, std::string);
+  int do_evict(std::string cache_file);
+
+  CephContext *m_cct;
+  RadosRef m_rados;
+  std::map<uint64_t, librados::IoCtx> m_ioctx_map;
+  Mutex m_ioctx_map_lock;
+  Policy* m_policy;
+  // TODO(dehao): make this configurable
+  int m_dir_num = 10;
+  uint64_t object_cache_max_size;
+  std::string m_cache_root_dir;
 };
 
-} // namespace ceph
-} // namespace immutable_obj_cache
+}  // namespace ceph
+}  // namespace immutable_obj_cache
 #endif
index cda8e69c917d67517c491b9c446bd60e0a82bea4..3e3a0ef8787087c73e2079f557672670520b5972 100644 (file)
@@ -16,18 +16,18 @@ typedef enum {
   OBJ_CACHE_SKIP,
 } cache_status_t;
 
-
 class Policy {
-public:
-  Policy(){}
-  virtual ~Policy(){};
+ public:
+  Policy() {}
+  virtual ~Policy() {}
   virtual cache_status_t lookup_object(std::string) = 0;
   virtual int evict_entry(std::string) = 0;
-  virtual void update_status(std::string, cache_status_t, uint64_t size=0) = 0;
+  virtual void update_status(std::string, cache_status_t,
+                             uint64_t size = 0) = 0;
   virtual cache_status_t get_status(std::string) = 0;
   virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
 };
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 #endif
index 8fe70b4e00d5ff1ab8bf2a98f078821936357e1d..bb76f95c32193a893ee9e2db8984c3ea00c56cf3 100644 (file)
@@ -18,19 +18,18 @@ SimplePolicy::SimplePolicy(CephContext *cct, uint64_t cache_size,
   : cct(cct), m_watermark(watermark), m_max_cache_size(cache_size),
     m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock") {
   ldout(cct, 20) << dendl;
-  m_max_inflight_ops = cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
+  m_max_inflight_ops = cct->_conf.get_val<uint64_t>(
+    "immutable_object_cache_max_inflight_ops");
   m_cache_size = 0;
 }
 
 SimplePolicy::~SimplePolicy() {
   ldout(cct, 20) << dendl;
 
-  for (auto it: m_cache_map) {
+  for (auto it : m_cache_map) {
     Entry* entry = (it.second);
     delete entry;
   }
-
-
 }
 
 cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
@@ -44,13 +43,14 @@ cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
     return OBJ_CACHE_SKIP;
   }
 
-  if ((m_cache_size < m_max_cache_size) && (inflight_ops < m_max_inflight_ops)) {
+  if ((m_cache_size < m_max_cache_size) &&
+      (inflight_ops < m_max_inflight_ops)) {
     Entry* entry = new Entry();
     ceph_assert(entry != nullptr);
     m_cache_map[file_name] = entry;
     wlocker.unlock();
     update_status(file_name, OBJ_CACHE_SKIP);
-    return OBJ_CACHE_NONE; // start promotion request
+    return OBJ_CACHE_NONE;  // start promotion request
   }
 
   // if there's no free entry, return skip to read from rados
@@ -136,7 +136,6 @@ void SimplePolicy::update_status(std::string file_name,
     m_cache_size -= size;
     return;
   }
-
 }
 
 int SimplePolicy::evict_entry(std::string file_name) {
@@ -165,7 +164,8 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
   RWLock::WLocker locker(m_cache_map_lock);
   // check free ratio, pop entries from LRU
   if ((double)m_cache_size / m_max_cache_size > (1 - m_watermark)) {
-    int evict_num = m_cache_map.size() * 0.1; //TODO(): make this configurable
+    // TODO(dehao): make this configurable
+    int evict_num = m_cache_map.size() * 0.1;
     for (int i = 0; i < evict_num; i++) {
       Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
       if (entry == nullptr) {
@@ -173,7 +173,6 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
       }
       std::string file_name = entry->file_name;
       obj_list->push_back(file_name);
-
     }
   }
 }
@@ -206,5 +205,5 @@ std::string SimplePolicy::get_evict_entry() {
   return entry->file_name;
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 7d97bdc9a15998436d8e092f6c795d5ab4efacf4..9ff00b26a520258d9de27878ed06b4e7a4d91871 100644 (file)
@@ -17,15 +17,16 @@ namespace ceph {
 namespace immutable_obj_cache {
 
 class SimplePolicy : public Policy {
-public:
+ public:
   SimplePolicy(CephContext *cct, uint64_t block_num, float watermark);
-  ~SimplePolicy() ;
+  ~SimplePolicy();
 
   cache_status_t lookup_object(std::string file_name);
   cache_status_t get_status(std::string file_name);
 
   void update_status(std::string file_name,
-                     cache_status_t new_status, uint64_t size=0);
+                     cache_status_t new_status,
+                     uint64_t size = 0);
 
   int evict_entry(std::string file_name);
 
@@ -36,15 +37,15 @@ public:
   uint64_t get_promoted_entry_num();
   std::string get_evict_entry();
 
-private:
+ private:
   cache_status_t alloc_entry(std::string file_name);
 
   class Entry : public LRUObject {
-    public:
-      cache_status_t status;
-      Entry() : status(OBJ_CACHE_NONE){}
-      std::string file_name;
-      uint64_t size;
+   public:
+    cache_status_t status;
+    Entry() : status(OBJ_CACHE_NONE) {}
+    std::string file_name;
+    uint64_t size;
   };
 
   CephContext* cct;
@@ -57,7 +58,6 @@ private:
   std::unordered_map<std::string, Entry*> m_cache_map;
   RWLock m_cache_map_lock;
 
-
   std::deque<Entry*> m_free_list;
 
   std::atomic<uint64_t> m_cache_size;
@@ -65,6 +65,6 @@ private:
   LRU m_promoted_lru;
 };
 
-} // namespace immutable_obj_cache
-} // namespace ceph
-#endif // CEPH_CACHE_SIMPLE_POLICY_H
+}  // namespace immutable_obj_cache
+}  // namespace ceph
+#endif  // CEPH_CACHE_SIMPLE_POLICY_H
index db78e42791232cd4c0b78b216bc7ae1bb2b2a477..9cd108ca06d213099834bc73bc46f06c87fa77da 100644 (file)
@@ -23,6 +23,6 @@ class ObjectCacheRequest;
 
 typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 #endif
index beb63c3e4ef68375c223cbcd5072247e26f3519d..a0a4c6352aa5382cf729f34d46948cbd7aad0419 100644 (file)
 namespace ceph {
 namespace immutable_obj_cache {
 
-ObjectCacheRequest::ObjectCacheRequest(){}
+ObjectCacheRequest::ObjectCacheRequest() {}
 ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
   : type(t), seq(s) {}
-ObjectCacheRequest::~ObjectCacheRequest(){}
+ObjectCacheRequest::~ObjectCacheRequest() {}
 
 void ObjectCacheRequest::encode() {
   ENCODE_START(1, 1, payload);
@@ -58,9 +58,11 @@ void ObjectCacheRegReplyData::encode_payload() {}
 void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
 
 ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
-                                         uint64_t read_offset, uint64_t read_len,
+                                         uint64_t read_offset,
+                                         uint64_t read_len,
                                          uint64_t pool_id, uint64_t snap_id,
-                                         std::string oid, std::string pool_namespace)
+                                         std::string oid,
+                                         std::string pool_namespace)
   : ObjectCacheRequest(t, s), read_offset(read_offset),
     read_len(read_len), pool_id(pool_id), snap_id(snap_id),
     oid(oid), pool_namespace(pool_namespace)
@@ -89,7 +91,8 @@ void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
   ceph::decode(pool_namespace, i);
 }
 
-ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path)
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
+                                                   string cache_path)
   : ObjectCacheRequest(t, s), cache_path(cache_path) {}
 ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
   : ObjectCacheRequest(t, s) {}
@@ -114,8 +117,7 @@ void ObjectCacheReadRadosData::encode_payload() {}
 
 void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
 
-ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) 
-{
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
   ObjectCacheRequest* req = nullptr;
 
   uint16_t type;
@@ -126,7 +128,7 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
   ceph::decode(seq, i);
   DECODE_FINISH(i);
 
-  switch(type) {
+  switch (type) {
     case RBDSC_REGISTER: {
       req = new ObjectCacheRegData(type, seq);
       break;
@@ -156,5 +158,5 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
   return req;
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
index 6518d13ea4dc5cb3a970e906c4bd2c1c733315a5..470f764e0b54417599fa81c6685d06efe7e156a2 100644 (file)
@@ -26,10 +26,10 @@ inline uint32_t get_data_len(char* buf) {
   HeaderHelper* header = (HeaderHelper*)buf;
   return header->len;
 }
-}
+}  //  namespace
 
 class ObjectCacheRequest {
-public:
+ public:
   uint16_t type;
   uint64_t seq;
 
@@ -43,8 +43,8 @@ public:
 
   // encode consists of two steps
   // step 1 : directly encode common bits using encode method of base classs.
-  // step 2 : according to payload_empty, determine whether addtional bits need to
-  //          be encoded which be implements by child class.
+  // step 2 : according to payload_empty, determine whether addtional bits
+  //          need to be encoded which be implements by child class.
   void encode();
   void decode(bufferlist& bl);
   bufferlist get_payload_bufferlist() { return payload; }
@@ -56,7 +56,7 @@ public:
 };
 
 class ObjectCacheRegData : public ObjectCacheRequest {
-public:
+ public:
   ObjectCacheRegData();
   ObjectCacheRegData(uint16_t t, uint64_t s);
   ~ObjectCacheRegData() override;
@@ -67,7 +67,7 @@ public:
 };
 
 class ObjectCacheRegReplyData : public ObjectCacheRequest {
-public:
+ public:
   ObjectCacheRegReplyData();
   ObjectCacheRegReplyData(uint16_t t, uint64_t s);
   ~ObjectCacheRegReplyData() override;
@@ -78,15 +78,17 @@ public:
 };
 
 class ObjectCacheReadData : public ObjectCacheRequest {
-public:
+ public:
   uint64_t read_offset;
   uint64_t read_len;
   uint64_t pool_id;
   uint64_t snap_id;
   std::string oid;
   std::string pool_namespace;
-  ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id,
-                      uint64_t snap_id, std::string oid, std::string pool_namespace );
+  ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
+                      uint64_t read_len, uint64_t pool_id,
+                      uint64_t snap_id, std::string oid,
+                      std::string pool_namespace);
   ObjectCacheReadData(uint16_t t, uint64_t s);
   ~ObjectCacheReadData() override;
   void encode_payload() override;
@@ -96,7 +98,7 @@ public:
 };
 
 class ObjectCacheReadReplyData : public ObjectCacheRequest {
-public:
+ public:
   std::string cache_path;
   ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
   ObjectCacheReadReplyData(uint16_t t, uint64_t s);
@@ -108,7 +110,7 @@ public:
 };
 
 class ObjectCacheReadRadosData : public ObjectCacheRequest {
-public:
+ public:
   ObjectCacheReadRadosData();
   ObjectCacheReadRadosData(uint16_t t, uint64_t s);
   ~ObjectCacheReadRadosData() override;
@@ -120,6 +122,6 @@ public:
 
 ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 #endif
index daa2bffda8bf9efcc09ad1abc0ad9d728d58051d..71e716bfc2211d0f5f14240e24248bffe88ae0c2 100644 (file)
@@ -7,7 +7,6 @@
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 
-
 namespace ceph {
 namespace immutable_obj_cache {
 namespace detail {
@@ -19,7 +18,7 @@ void rados_callback(rados_completion_t c, void *arg) {
   (obj->*MF)(r);
 }
 
-} // namespace detail
+}  // namespace detail
 
 template <typename T, void(T::*MF)(int)=&T::complete>
 librados::AioCompletion *create_rados_callback(T *obj) {
@@ -27,6 +26,6 @@ librados::AioCompletion *create_rados_callback(T *obj) {
     obj, &detail::rados_callback<T, MF>, nullptr);
 }
 
-} // namespace immutable_obj_cache
-} // namespace ceph
+}  // namespace immutable_obj_cache
+}  // namespace ceph
 #endif
index dffae87acc95a3b1166fe4fefcd894ad00ee9c20..bfe49df6323894fce40afe5cb801a040dc543c9b 100644 (file)
@@ -17,20 +17,20 @@ void usage() {
   std::cout << "usage: cache controller [options...]" << std::endl;
   std::cout << "options:\n";
   std::cout << "  -m monaddress[:port]      connect to specified monitor\n";
-  std::cout << "  --keyring=<path>          path to keyring for local cluster\n";
+  std::cout << "  --keyring=<path>          path to keyring for local "
+            << "cluster\n";
   std::cout << "  --log-file=<logfile>       file to log debug output\n";
-  std::cout << "  --debug-immutable-obj-cache=<log-level>/<memory-level>  set debug level\n";
+  std::cout << "  --debug-immutable-obj-cache=<log-level>/<memory-level>  "
+            << "set debug level\n";
   generic_server_usage();
 }
 
-static void handle_signal(int signum)
-{
+static void handle_signal(int signum) {
   if (cachectl)
     cachectl->handle_signal(signum);
 }
 
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
   std::vector<const char*> args;
   env_to_vec(args);
   argv_to_vec(argc, argv, args);
@@ -41,8 +41,8 @@ int main(int argc, const char **argv)
   }
 
   auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
-                        CODE_ENVIRONMENT_DAEMON,
-                        CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+                         CODE_ENVIRONMENT_DAEMON,
+                         CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
 
   if (g_conf()->daemonize) {
     global_init_daemonize(g_ceph_context);