]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: make dedicated thread configurable for RO
authorshangdehao1 <dehao.shang@intel.com>
Wed, 6 Mar 2019 23:11:53 +0000 (07:11 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:30 +0000 (00:16 +0800)
At cache client side of RO, make dedicated woker thread configurable
Also, cleanup useless member of CacheClient class

Signed-off-by: Dehao Shang <dehao.shang@intel.com>
src/common/options.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h

index 1e9dbc1e4f610800ce6afae48c8338a62cef6326..59ed0e7f429c52aaec1e4702156b23af535158f1 100644 (file)
@@ -7405,6 +7405,10 @@ static std::vector<Option> get_immutable_object_cache_options() {
     Option("immutable_object_cache_max_inflight_ops", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(128)
     .set_description("max immutable object caching inflight ops"),
+
+    Option("immutable_object_cache_client_dedicated_thread_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_description("immutable object cache client dedicated thread number"),
   });
 }
 
index 58083bbb9ae797d370acf80c55c403f6c5c2edb7..c56f98931ecc2e5c708b86d49716a18c5c556e56 100644 (file)
@@ -13,15 +13,17 @@ namespace ceph {
 namespace immutable_obj_cache {
 
   CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
-    : cct(ceph_ctx), m_io_service_work(m_io_service),
+    : m_cct(ceph_ctx), m_io_service_work(m_io_service),
       m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
       m_io_thread(nullptr), m_session_work(false), m_writing(false),
       m_reading(false), m_sequence_id(0),
       m_lock("ceph::cache::cacheclient::m_lock") {
     // TODO(dehao) : configure it.
-    m_use_dedicated_worker = true;
-    m_worker_thread_num = 2;
-    if (m_use_dedicated_worker) {
+    m_worker_thread_num =
+      m_cct->_conf.get_val<uint64_t>(
+        "immutable_object_cache_client_dedicated_thread_num");
+
+    if (m_worker_thread_num != 0) {
       m_worker = new boost::asio::io_service();
       m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
       for (uint64_t i = 0; i < m_worker_thread_num; i++) {
@@ -51,7 +53,7 @@ namespace immutable_obj_cache {
     if (m_io_thread != nullptr) {
       m_io_thread->join();
     }
-    if (m_use_dedicated_worker) {
+    if (m_worker_thread_num != 0) {
       m_worker->stop();
       for (auto thd : m_worker_threads) {
         thd->join();
@@ -68,7 +70,7 @@ namespace immutable_obj_cache {
     boost::system::error_code close_ec;
     m_dm_socket.close(close_ec);
     if (close_ec) {
-       ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+       ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
     }
   }
 
@@ -79,7 +81,7 @@ namespace immutable_obj_cache {
       fault(ASIO_ERROR_CONNECT, ec);
       return -1;
     }
-    ldout(cct, 20) <<"connect success"<< dendl;
+    ldout(m_cct, 20) <<"connect success"<< dendl;
     return 0;
   }
 
@@ -253,7 +255,7 @@ namespace immutable_obj_cache {
        delete reply;
     });
 
-    if (m_use_dedicated_worker) {
+    if (m_worker_thread_num != 0) {
       m_worker->post([process_reply]() {
         process_reply->complete(true);
       });
@@ -265,16 +267,16 @@ namespace immutable_obj_cache {
   // if there is one request fails, just execute fault, then shutdown RO.
   void CacheClient::fault(const int err_type,
                           const boost::system::error_code& ec) {
-    ldout(cct, 20) << "fault." << ec.message() << dendl;
+    ldout(m_cct, 20) << "fault." << ec.message() << dendl;
 
     if (err_type == ASIO_ERROR_CONNECT) {
        ceph_assert(!m_session_work.load());
        if (ec == boost::asio::error::connection_refused) {
-         ldout(cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
+         ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
                         << ". Immutable-object-cache daemon is down ? "
                         << "Data will be read from ceph cluster " << dendl;
        } else {
-         ldout(cct, 20) << "Connecting RO daemon fails : "
+         ldout(m_cct, 20) << "Connecting RO daemon fails : "
                         << ec.message() << dendl;
        }
 
@@ -285,7 +287,7 @@ namespace immutable_obj_cache {
          boost::system::error_code close_ec;
          m_dm_socket.close(close_ec);
          if (close_ec) {
-           ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+           ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
          }
       }
       return;
@@ -303,16 +305,16 @@ namespace immutable_obj_cache {
     m_session_work.store(false);
 
     if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
-       ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+       ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
        ceph_assert(0);
     }
 
     if (err_type == ASIO_ERROR_READ) {
-       ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+       ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
     }
 
     if (err_type == ASIO_ERROR_WRITE) {
-       ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+       ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
        // CacheClient should not occur this error.
        ceph_assert(0);
     }
@@ -331,7 +333,7 @@ namespace immutable_obj_cache {
       m_seq_to_req.clear();
     }
 
-    ldout(cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
+    ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
                        Later all reading will be re-dispatched RADOS layer"
                        << ec.message() << dendl;
   }
index 47a39cb1432191b17487a3ff470653999f6136cc..3d0706ea4d9fdca4482aee190ef4431a2ea3ecf1 100644 (file)
@@ -55,7 +55,7 @@ class CacheClient {
                         size_t bytes_transferred);
 
  private:
-  CephContext* cct;
+  CephContext* m_cct;
   boost::asio::io_service m_io_service;
   boost::asio::io_service::work m_io_service_work;
   stream_protocol::socket m_dm_socket;
@@ -63,7 +63,6 @@ class CacheClient {
   std::shared_ptr<std::thread> m_io_thread;
   std::atomic<bool> m_session_work;
 
-  bool m_use_dedicated_worker;
   uint64_t m_worker_thread_num;
   boost::asio::io_service* m_worker;
   std::vector<std::thread*> m_worker_threads;