]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: adding ceph level immutable obj cache daemon
authorYuan Zhou <yuan.zhou@intel.com>
Thu, 13 Dec 2018 21:36:36 +0000 (05:36 +0800)
committerYuan Zhou <yuan.zhou@intel.com>
Thu, 21 Mar 2019 16:16:25 +0000 (00:16 +0800)
The daemon is built for future integration with both RBD and RGW cache.
The key components are:
- domain socket based simple IPC
- simple LRU policy based promotion/demotion for the cache
- simple file based caching store for RADOS objs with sync IO interface
- systemd service/target files for the daemon

Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
29 files changed:
src/common/options.cc
src/common/subsys.h
src/test/CMakeLists.txt
src/test/ceph_immutable_object_cache/CMakeLists.txt [new file with mode: 0644]
src/test/ceph_immutable_object_cache/test_DomainSocket.cc [new file with mode: 0644]
src/test/ceph_immutable_object_cache/test_SimplePolicy.cc [new file with mode: 0644]
src/test/ceph_immutable_object_cache/test_main.cc [new file with mode: 0644]
src/test/ceph_immutable_object_cache/test_sync_file.cc [new file with mode: 0644]
src/tools/CMakeLists.txt
src/tools/ceph_immutable_object_cache/CMakeLists.txt [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheClient.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheClient.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheController.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheController.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheServer.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheServer.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheSession.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/CacheSession.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/ObjectCacheFile.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/ObjectCacheFile.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/ObjectCacheStore.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/Policy.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/SimplePolicy.cc [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/SimplePolicy.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/SocketCommon.h [new file with mode: 0644]
src/tools/ceph_immutable_object_cache/main.cc [new file with mode: 0644]
systemd/ceph-immutable-object-cache.target [new file with mode: 0644]
systemd/ceph-immutable-object-cache@.service.in [new file with mode: 0644]

index a6b0be73384ddd7b848729ddac19d61a9ed2b86a..be3d63fa113536516882d9dab6a79db21daac218 100644 (file)
@@ -7035,6 +7035,22 @@ static std::vector<Option> get_rbd_options() {
     .set_default(false)
     .set_description("whether to block writes to the cache before the aio_write call completes"),
 
+    Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+    .set_default(false)
+    .set_description("whether to enable shared ssd caching"),
+
+    Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+    .set_default("/tmp")
+    .set_description("shared ssd caching data dir"),
+
+    Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+    .set_default("/tmp/rbd_shared_ro_cache_sock")
+    .set_description("shared ssd caching domain socket"),
+
+    Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+    .set_default(4096)
+    .set_description("shared ssd caching data entries"),
+
     Option("rbd_concurrent_management_ops", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(10)
     .set_min(1)
index bdd2d0e506d6c82c0871088ae7a9e4d27d03d665..2ebd8ef02d1deccfad12d4aac30129fc52c90ba3 100644 (file)
@@ -39,6 +39,7 @@ SUBSYS(rbd_mirror, 0, 5)
 SUBSYS(rbd_replay, 0, 5)
 SUBSYS(journaler, 0, 5)
 SUBSYS(objectcacher, 0, 5)
+SUBSYS(immutable_obj_cache, 0, 5)
 SUBSYS(client, 0, 5)
 SUBSYS(osd, 1, 5)
 SUBSYS(optracker, 0, 5)
index 20a64fd41b0445f554acfd1bb2f5a9bf7030fe61..607a08e08a1d39c58ea623a2c68e9e836f604ea9 100644 (file)
@@ -51,6 +51,7 @@ add_subdirectory(os)
 add_subdirectory(osd)
 add_subdirectory(osdc)
 add_subdirectory(pybind)
+add_subdirectory(immutable_object_cache)
 if(WITH_RADOSGW)
   set(rgw_libs rgw_a)
   if(WITH_RADOSGW_AMQP_ENDPOINT)
diff --git a/src/test/ceph_immutable_object_cache/CMakeLists.txt b/src/test/ceph_immutable_object_cache/CMakeLists.txt
new file mode 100644 (file)
index 0000000..7e8e3bd
--- /dev/null
@@ -0,0 +1,35 @@
+
+add_executable(unittest_ceph_immutable_obj_cache
+  test_main.cc
+  test_SimplePolicy.cc
+  test_sync_file.cc
+  test_DomainSocket.cc
+  )
+add_ceph_unittest(unittest_ceph_immutable_obj_cache)
+
+
+target_link_libraries(unittest_ceph_immutable_obj_cache
+  ceph_immutable_object_cache_lib
+  rados_test_stub
+  librados-cxx
+  global
+  radostest-cxx
+  stdc++fs
+  GTest::GTest
+  )
+
+
+add_executable(ceph_test_immutable_obj_cache
+  test_main.cc
+  )
+
+target_link_libraries(ceph_test_immutable_obj_cache
+  librados-cxx
+  radostest-cxx
+  ${UNITTEST_LIBS}
+  )
+
+
+install(TARGETS
+  ceph_test_immutable_obj_cache
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/ceph_immutable_object_cache/test_DomainSocket.cc b/src/test/ceph_immutable_object_cache/test_DomainSocket.cc
new file mode 100644 (file)
index 0000000..e281090
--- /dev/null
@@ -0,0 +1,209 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+#include <unistd.h>
+
+#include "gtest/gtest.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "global/global_init.h"
+#include "global/global_context.h"
+
+#include "tools/ceph_immutable_object_cache/CacheClient.h"
+#include "tools/ceph_immutable_object_cache/CacheServer.h"
+
+using namespace ceph::immutable_obj_cache;
+
+// sequentialize async_operation
+class WaitEvent {
+public:
+  WaitEvent() : m_signaled(false) {
+    pthread_mutex_init(&m_lock, NULL);
+    pthread_cond_init(&m_cond, NULL);
+  }
+
+  ~WaitEvent() {
+    pthread_mutex_destroy(&m_lock);
+    pthread_cond_destroy(&m_cond);
+  }
+
+  void wait() {
+    pthread_mutex_lock(&m_lock);
+    while (!m_signaled) {
+      pthread_cond_wait(&m_cond, &m_lock);
+    }
+    m_signaled = false;
+    pthread_mutex_unlock(&m_lock);
+   }
+
+  void signal() {
+    pthread_mutex_lock(&m_lock);
+    m_signaled = true;
+    pthread_cond_signal(&m_cond);
+    pthread_mutex_unlock(&m_lock);
+  }
+private:
+    pthread_mutex_t m_lock;
+    pthread_cond_t m_cond;
+    bool m_signaled;
+};
+
+class TestCommunication :public ::testing::Test {
+public:
+  CacheServer* m_cache_server;
+  std::thread* srv_thd;
+  CacheClient* m_cache_client;
+  std::string m_local_path;
+  pthread_mutex_t m_mutex;
+  pthread_cond_t m_cond;
+  std::atomic<uint64_t> m_send_request_index;
+  std::atomic<uint64_t> m_recv_ack_index;
+  WaitEvent m_wait_event;
+  unordered_set<std::string> m_hit_entry_set;
+
+  TestCommunication()
+    : m_cache_server(nullptr), m_cache_client(nullptr), m_local_path("/tmp/ceph_test_domain_socket"),
+      m_send_request_index(0), m_recv_ack_index(0)
+    {}
+
+  ~TestCommunication() {}
+
+  static void SetUpTestCase() {}
+  static void TearDownTestCase() {}
+
+  void SetUp() override {
+    std::remove(m_local_path.c_str());
+    m_cache_server = new CacheServer(g_ceph_context, m_local_path, [this](uint64_t xx, std::string yy ){
+        handle_request(xx, yy);
+    });
+    ASSERT_TRUE(m_cache_server != nullptr);
+    srv_thd = new std::thread([this]() {m_cache_server->run();});
+
+    m_cache_client = new CacheClient(m_local_path, g_ceph_context);
+    ASSERT_TRUE(m_cache_client != nullptr);
+    m_cache_client->run();
+
+    while(true) {
+      if (0 == m_cache_client->connect()) {
+        break;
+      }
+    }
+
+    auto ctx = new FunctionContext([this](bool reg) {
+      ASSERT_TRUE(reg);
+    });
+    m_cache_client->register_volume("pool_name", "rbd_name", 4096, ctx);
+    ASSERT_TRUE(m_cache_client->is_session_work());
+  }
+
+  void TearDown() override {
+
+    delete m_cache_client;
+    m_cache_server->stop();
+    if (srv_thd->joinable()) {
+      srv_thd->join();
+    }
+    delete m_cache_server;
+    std::remove(m_local_path.c_str());
+    delete srv_thd;
+  }
+
+  void handle_request(uint64_t session_id, std::string msg){
+    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+
+    switch (io_ctx->type) {
+      case RBDSC_REGISTER: {
+        io_ctx->type = RBDSC_REGISTER_REPLY;
+        m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+        break;
+      }
+      case RBDSC_READ: {
+        if (m_hit_entry_set.find(io_ctx->oid) == m_hit_entry_set.end()) {
+          io_ctx->type = RBDSC_READ_RADOS;
+        } else {
+          io_ctx->type = RBDSC_READ_REPLY;
+        }
+        m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+        break;
+      }
+    }
+  }
+
+   // times: message number
+   // queue_deqth : imitate message queue depth
+   // thinking : imitate handing message time
+   void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
+     m_send_request_index.store(0);
+     m_recv_ack_index.store(0);
+     for (uint64_t index = 0; index < times; index++) {
+       auto ctx = new FunctionContext([this, thinking, times](bool req){
+          if (thinking != 0) {
+            usleep(thinking); // handling message
+          }
+          m_recv_ack_index++;
+          if (m_recv_ack_index == times) {
+            m_wait_event.signal();
+          }
+       });
+
+       // simple queue depth
+       while (m_send_request_index - m_recv_ack_index > queue_depth) {
+         usleep(1);
+       }
+
+       m_cache_client->lookup_object("test_pool", "test_rbd", "123456", ctx);
+       m_send_request_index++;
+     }
+     m_wait_event.wait();
+   }
+
+  bool startup_lookupobject_testing(std::string pool_name, std::string volume_name, std::string object_id) {
+    bool hit;
+    auto ctx = new FunctionContext([this, &hit](bool req){
+       hit = req;
+       m_wait_event.signal();
+    });
+    m_cache_client->lookup_object(pool_name, volume_name, object_id, ctx);
+    m_wait_event.wait();
+    return hit;
+  }
+
+  void set_hit_entry_in_fake_lru(std::string oid) {
+    if (m_hit_entry_set.find(oid) == m_hit_entry_set.end()) {
+      m_hit_entry_set.insert(oid);
+    }
+  }
+};
+
+TEST_F(TestCommunication, test_pingpong) {
+
+  startup_pingpong_testing(10, 16, 0);
+  ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
+  startup_pingpong_testing(200, 128, 0);
+  ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
+  startup_pingpong_testing(10000, 512, 0);
+  ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
+}
+
+TEST_F(TestCommunication, test_lookup_object) {
+
+  m_hit_entry_set.clear();
+
+  srand(time(0));
+  uint64_t random_hit = random();
+
+  for (uint64_t i = 50; i < 100; i++) {
+    if ((random_hit % i) == 0) {
+      set_hit_entry_in_fake_lru(std::to_string(i));
+    }
+  }
+  for (uint64_t i = 50; i < 100; i++) {
+    if ((random_hit % i) != 0) {
+      ASSERT_FALSE(startup_lookupobject_testing("test_pool", "testing_volume", std::to_string(i)));
+    } else {
+      ASSERT_TRUE(startup_lookupobject_testing("test_pool", "testing_volume", std::to_string(i)));
+    }
+  }
+}
diff --git a/src/test/ceph_immutable_object_cache/test_SimplePolicy.cc b/src/test/ceph_immutable_object_cache/test_SimplePolicy.cc
new file mode 100644 (file)
index 0000000..f83a7e1
--- /dev/null
@@ -0,0 +1,232 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sstream>
+#include <list>
+#include <gtest/gtest.h>
+
+#include "tools/ceph_immutable_object_cache/SimplePolicy.h"
+
+using namespace ceph::immutable_obj_cache;
+
+std::string generate_file_name(uint64_t index) {
+  std::string pre_name("object_cache_file_");
+  std::ostringstream oss;
+  oss << index;
+  return pre_name + oss.str();
+}
+
+class TestSimplePolicy :public ::testing::Test {
+public:
+  SimplePolicy* m_simple_policy;
+  const uint64_t m_entry_num;
+  uint64_t m_entry_index;
+  std::vector<std::string> m_promoted_lru;
+  std::vector<std::string> m_promoting_lru;
+
+  TestSimplePolicy() : m_entry_num(100), m_entry_index(0) {}
+  ~TestSimplePolicy() {}
+  static void SetUpTestCase() {}
+  static void TearDownTestCase() {}
+  void SetUp() override {
+    m_simple_policy = new SimplePolicy(g_ceph_context, m_entry_num, 0.1);
+    for (uint64_t i = 0; i < m_entry_num / 2; i++, m_entry_index++) {
+      insert_entry_into_promoted_lru(generate_file_name(m_entry_index));
+    }
+  }
+  void TearDown() override {
+    while(m_promoted_lru.size()) {
+      ASSERT_TRUE(m_simple_policy->get_evict_entry() == m_promoted_lru.front());
+      m_simple_policy->evict_entry(m_simple_policy->get_evict_entry());
+      m_promoted_lru.erase(m_promoted_lru.begin());
+    }
+    delete m_simple_policy;
+  }
+
+  void insert_entry_into_promoted_lru(std::string cache_file_name) {
+    ASSERT_EQ(m_entry_num - m_promoted_lru.size() - m_promoting_lru.size(), m_simple_policy->get_free_entry_num());
+    ASSERT_EQ(m_promoting_lru.size(), m_simple_policy->get_promoting_entry_num());
+    ASSERT_EQ(m_promoted_lru.size(), m_simple_policy->get_promoted_entry_num());
+    ASSERT_EQ(OBJ_CACHE_NONE, m_simple_policy->get_status(cache_file_name));
+
+    m_simple_policy->lookup_object(cache_file_name);
+    ASSERT_EQ(OBJ_CACHE_SKIP, m_simple_policy->get_status(cache_file_name));
+    ASSERT_EQ(m_entry_num - m_promoted_lru.size() - m_promoting_lru.size() - 1, m_simple_policy->get_free_entry_num());
+    ASSERT_EQ(m_promoting_lru.size() + 1, m_simple_policy->get_promoting_entry_num());
+    ASSERT_EQ(m_promoted_lru.size(), m_simple_policy->get_promoted_entry_num());
+
+    m_simple_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
+    m_promoted_lru.push_back(cache_file_name);
+    ASSERT_EQ(OBJ_CACHE_PROMOTED, m_simple_policy->get_status(cache_file_name));
+
+    ASSERT_EQ(m_entry_num - m_promoted_lru.size() - m_promoting_lru.size(), m_simple_policy->get_free_entry_num());
+    ASSERT_EQ(m_promoting_lru.size(), m_simple_policy->get_promoting_entry_num());
+    ASSERT_EQ(m_promoted_lru.size(), m_simple_policy->get_promoted_entry_num());
+  }
+
+  void insert_entry_into_promoting_lru(std::string cache_file_name) {
+    ASSERT_EQ(m_entry_num - m_promoted_lru.size() - m_promoting_lru.size(), m_simple_policy->get_free_entry_num());
+    ASSERT_EQ(m_promoting_lru.size(), m_simple_policy->get_promoting_entry_num());
+    ASSERT_EQ(m_promoted_lru.size(), m_simple_policy->get_promoted_entry_num());
+    ASSERT_EQ(OBJ_CACHE_NONE, m_simple_policy->get_status(cache_file_name));
+
+    m_simple_policy->lookup_object(cache_file_name);
+    m_promoting_lru.push_back(cache_file_name);
+    ASSERT_EQ(OBJ_CACHE_SKIP, m_simple_policy->get_status(cache_file_name));
+    ASSERT_EQ(m_entry_num - m_promoted_lru.size() - m_promoting_lru.size(), m_simple_policy->get_free_entry_num());
+    ASSERT_EQ(m_promoting_lru.size(), m_simple_policy->get_promoting_entry_num());
+    ASSERT_EQ(m_promoted_lru.size(), m_simple_policy->get_promoted_entry_num());
+  }
+};
+
+TEST_F(TestSimplePolicy, test_lookup_miss_and_no_free) {
+  // exhaust cache space
+  uint64_t left_entry_num = m_entry_num - m_promoted_lru.size() - m_promoting_lru.size();
+  for(uint64_t i = 0; i < left_entry_num; i++, ++m_entry_index) {
+    insert_entry_into_promoted_lru(generate_file_name(m_entry_index));
+  }
+  ASSERT_TRUE(0 == m_simple_policy->get_free_entry_num());
+  ASSERT_TRUE(m_simple_policy->lookup_object("no_this_cache_file_name") == OBJ_CACHE_SKIP);
+}
+
+TEST_F(TestSimplePolicy, test_lookup_miss_and_free_space) {
+  ASSERT_TRUE(m_entry_num - m_promoting_lru.size() - m_promoted_lru.size() == m_simple_policy->get_free_entry_num());
+  ASSERT_TRUE(m_simple_policy->lookup_object("miss_but_have_free_space_file_name") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("miss_but_have_free_space_file_name") == OBJ_CACHE_SKIP);
+}
+
+TEST_F(TestSimplePolicy, test_lookup_hit_and_promoting) {
+  ASSERT_TRUE(m_entry_num - m_promoting_lru.size() - m_promoted_lru.size() == m_simple_policy->get_free_entry_num());
+  insert_entry_into_promoting_lru("promoting_file_1");
+  insert_entry_into_promoting_lru("promoting_file_2");
+  insert_entry_into_promoted_lru(generate_file_name(++m_entry_index));
+  insert_entry_into_promoted_lru(generate_file_name(++m_entry_index));
+  insert_entry_into_promoting_lru("promoting_file_3");
+  insert_entry_into_promoting_lru("promoting_file_4");
+
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 4);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_file_1") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_file_2") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_file_3") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_file_4") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->lookup_object("promoting_file_1") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->lookup_object("promoting_file_2") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->lookup_object("promoting_file_3") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->lookup_object("promoting_file_4") == OBJ_CACHE_SKIP);
+}
+
+TEST_F(TestSimplePolicy, test_lookup_hit_and_promoted) {
+  ASSERT_TRUE(m_promoted_lru.size() == m_simple_policy->get_promoted_entry_num());
+  for(uint64_t index = 0; index < m_entry_index; index++) {
+    ASSERT_TRUE(m_simple_policy->get_status(generate_file_name(index)) == OBJ_CACHE_PROMOTED);
+  }
+}
+
+TEST_F(TestSimplePolicy, test_update_state_from_promoting_to_none) {
+  ASSERT_TRUE(m_entry_num - m_promoting_lru.size() - m_promoted_lru.size() == m_simple_policy->get_free_entry_num());
+  insert_entry_into_promoting_lru("promoting_to_none_file_1");
+  insert_entry_into_promoting_lru("promoting_to_none_file_2");
+  insert_entry_into_promoted_lru(generate_file_name(++m_entry_index));
+  insert_entry_into_promoting_lru("promoting_to_none_file_3");
+  insert_entry_into_promoting_lru("promoting_to_none_file_4");
+
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 4);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_1") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_2") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_3") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_4") == OBJ_CACHE_SKIP);
+
+  m_simple_policy->update_status("promoting_to_none_file_1", OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 3);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_1") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_2") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_3") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_4") == OBJ_CACHE_SKIP);
+
+  m_simple_policy->update_status("promoting_to_none_file_2", OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 2);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_1") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_2") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_3") == OBJ_CACHE_SKIP);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_4") == OBJ_CACHE_SKIP);
+
+  m_simple_policy->update_status("promoting_to_none_file_3", OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 1);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_1") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_2") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_3") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_4") == OBJ_CACHE_SKIP);
+
+  m_simple_policy->update_status("promoting_to_none_file_4", OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_promoting_entry_num() == 0);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_1") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_2") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_3") == OBJ_CACHE_NONE);
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_none_file_4") == OBJ_CACHE_NONE);
+}
+
+TEST_F(TestSimplePolicy, test_update_state_from_promoted_to_none) {
+  ASSERT_TRUE(m_promoted_lru.size() == m_simple_policy->get_promoted_entry_num());
+  for(uint64_t index = 0; index < m_entry_index; index++) {
+    ASSERT_TRUE(m_simple_policy->get_status(generate_file_name(index)) == OBJ_CACHE_PROMOTED);
+    m_simple_policy->update_status(generate_file_name(index), OBJ_CACHE_NONE);
+    ASSERT_TRUE(m_simple_policy->get_status(generate_file_name(index)) == OBJ_CACHE_NONE);
+    ASSERT_TRUE(m_simple_policy->get_promoted_entry_num() == m_promoted_lru.size() - index - 1);
+  }
+  m_promoted_lru.clear();
+}
+
+TEST_F(TestSimplePolicy, test_update_state_from_promoting_to_promoted) {
+  ASSERT_TRUE(m_entry_num - m_promoting_lru.size() - m_promoted_lru.size() == m_simple_policy->get_free_entry_num());
+  insert_entry_into_promoting_lru("promoting_to_promoted_file_1");
+  insert_entry_into_promoting_lru("promoting_to_promoted_file_2");
+  insert_entry_into_promoting_lru("promoting_to_promoted_file_3");
+  insert_entry_into_promoting_lru("promoting_to_promoted_file_4");
+  ASSERT_TRUE(4 == m_simple_policy->get_promoting_entry_num());
+
+  m_simple_policy->update_status("promoting_to_promoted_file_1", OBJ_CACHE_PROMOTED);
+  ASSERT_TRUE(3 == m_simple_policy->get_promoting_entry_num());
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_promoted_file_1") == OBJ_CACHE_PROMOTED);
+
+  m_simple_policy->update_status("promoting_to_promoted_file_2", OBJ_CACHE_PROMOTED);
+  ASSERT_TRUE(2 == m_simple_policy->get_promoting_entry_num());
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_promoted_file_2") == OBJ_CACHE_PROMOTED);
+
+  m_simple_policy->update_status("promoting_to_promoted_file_3", OBJ_CACHE_PROMOTED);
+  ASSERT_TRUE(1 == m_simple_policy->get_promoting_entry_num());
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_promoted_file_3") == OBJ_CACHE_PROMOTED);
+
+  m_simple_policy->update_status("promoting_to_promoted_file_4", OBJ_CACHE_PROMOTED);
+  ASSERT_TRUE(0 == m_simple_policy->get_promoting_entry_num());
+  ASSERT_TRUE(m_simple_policy->get_status("promoting_to_promoted_file_4") == OBJ_CACHE_PROMOTED);
+
+  m_promoted_lru.push_back("promoting_to_promoted_file_1");
+  m_promoted_lru.push_back("promoting_to_promoted_file_2");
+  m_promoted_lru.push_back("promoting_to_promoted_file_3");
+  m_promoted_lru.push_back("promoting_to_promoted_file_4");
+}
+
+TEST_F(TestSimplePolicy, test_evict_list_0) {
+  std::list<std::string> evict_entry_list;
+  // 0.1 is watermark
+  ASSERT_TRUE((float)m_simple_policy->get_free_entry_num() > m_entry_num*0.1);
+  m_simple_policy->get_evict_list(&evict_entry_list);
+  ASSERT_TRUE(evict_entry_list.size() == 0);
+}
+
+TEST_F(TestSimplePolicy, test_evict_list_10) {
+  uint64_t left_entry_num = m_entry_num - m_promoted_lru.size() - m_promoting_lru.size();
+  for(uint64_t i = 0; i < left_entry_num; i++, ++m_entry_index) {
+    insert_entry_into_promoted_lru(generate_file_name(m_entry_index));
+  }
+  ASSERT_TRUE(0 == m_simple_policy->get_free_entry_num());
+  std::list<std::string> evict_entry_list;
+  m_simple_policy->get_evict_list(&evict_entry_list);
+  ASSERT_TRUE(10 == evict_entry_list.size());
+  ASSERT_TRUE(m_entry_num - 10  == m_simple_policy->get_promoted_entry_num());
+
+  for(auto it = evict_entry_list.begin(); it != evict_entry_list.end(); it++) {
+    ASSERT_TRUE(*it == m_promoted_lru.front());
+    m_promoted_lru.erase(m_promoted_lru.begin());
+  }
+}
diff --git a/src/test/ceph_immutable_object_cache/test_main.cc b/src/test/ceph_immutable_object_cache/test_main.cc
new file mode 100644 (file)
index 0000000..0c68e64
--- /dev/null
@@ -0,0 +1,30 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/rados/librados.hpp"
+#include "global/global_context.h"
+#include "test/librados/test_cxx.h"
+#include "gtest/gtest.h"
+#include <iostream>
+#include <string>
+
+int main(int argc, char **argv)
+{
+  ::testing::InitGoogleTest(&argc, argv);
+
+  librados::Rados rados;
+  std::string result = connect_cluster_pp(rados);
+  if (result != "" ) {
+    std::cerr << result << std::endl;
+    return 1;
+  }
+
+  g_ceph_context = reinterpret_cast<CephContext*>(rados.cct());
+
+  int r = rados.conf_set("lockdep", "true");
+  if (r < 0) {
+    std::cerr << "failed to enable lockdep" << std::endl;
+    return -r;
+  }
+  return RUN_ALL_TESTS();
+}
diff --git a/src/test/ceph_immutable_object_cache/test_sync_file.cc b/src/test/ceph_immutable_object_cache/test_sync_file.cc
new file mode 100644 (file)
index 0000000..19352d4
--- /dev/null
@@ -0,0 +1,110 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "gtest/gtest.h"
+#include "include/Context.h"
+#include "include/buffer_fwd.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+#include "global/global_context.h"
+#include <experimental/filesystem>
+
+#include "tools/ceph_immutable_object_cache/ObjectCacheFile.h"
+
+using namespace ceph::immutable_obj_cache;
+namespace efs = std::experimental::filesystem;
+
+class TestSyncFile :public ::testing::Test {
+public:
+  std::string m_cache_root_dir;
+
+  TestSyncFile(){}
+  ~TestSyncFile(){}
+  static void SetUpTestCase() {}
+  static void TearDownTestCase() {}
+
+  void SetUp() override {
+    m_cache_root_dir = g_ceph_context->_conf.get_val<std::string>("rbd_shared_cache_path")
+      + "/ceph_immutable_obj_cache/";
+
+    if (efs::exists(m_cache_root_dir)) {
+      efs::remove_all(m_cache_root_dir);
+    }
+    efs::create_directories(m_cache_root_dir);
+  }
+
+  void TearDown() override {
+    efs::remove_all(m_cache_root_dir);
+  }
+
+};
+
+TEST_F(TestSyncFile, test_create_file) {
+  SyncFile* m_sync_file = new SyncFile(g_ceph_context, "test_sync_file");
+  ASSERT_TRUE(m_sync_file->create() >  0);
+  ASSERT_TRUE(m_sync_file->get_file_size() == 0);
+  delete m_sync_file;
+}
+
+
+TEST_F(TestSyncFile, test_open_file) {
+  SyncFile* m_sync_file = new SyncFile(g_ceph_context, "test_sync_file");
+  ASSERT_EQ(m_sync_file->open_file(), -1);
+  ASSERT_GT(m_sync_file->create(), 0);
+  ASSERT_GT(m_sync_file->open_file(), 0);
+  delete m_sync_file;
+}
+
+TEST_F(TestSyncFile, test_write_object_to_file) {
+  SyncFile* m_sync_file_1 = new SyncFile(g_ceph_context, "test_sync_file_1");
+  SyncFile* m_sync_file_2 = new SyncFile(g_ceph_context, "test_sync_file_2");
+  ASSERT_GT(m_sync_file_1->create(), 0);
+  ASSERT_GT(m_sync_file_2->create(), 0);
+  ASSERT_TRUE(m_sync_file_1->get_file_size() == 0);
+  ASSERT_TRUE(m_sync_file_2->get_file_size() == 0);
+  bufferlist* buf_1 = new ceph::bufferlist();
+  bufferlist* buf_2 = new ceph::bufferlist();
+  buf_1->append(std::string(1024, '0'));
+  buf_2->append(std::string(4096, '0'));
+  ASSERT_TRUE(m_sync_file_1->write_object_to_file(*buf_1, 1024) == 1024);
+  ASSERT_TRUE(m_sync_file_2->write_object_to_file(*buf_2, 4096) == 4096);
+  ASSERT_TRUE(m_sync_file_1->get_file_size() == 1024);
+  ASSERT_TRUE(m_sync_file_2->get_file_size() == 4096);
+  delete m_sync_file_1;
+  delete m_sync_file_2;
+  delete buf_1;
+  delete buf_2;
+}
+
+TEST_F(TestSyncFile, test_read_object_from_file) {
+  SyncFile* m_sync_file_1 = new SyncFile(g_ceph_context, "test_sync_file_1");
+  SyncFile* m_sync_file_2 = new SyncFile(g_ceph_context, "test_sync_file_2");
+  bufferlist* buf_1 = new ceph::bufferlist();
+  bufferlist* buf_2 = new ceph::bufferlist();
+
+  ASSERT_EQ(m_sync_file_1->read_object_from_file(buf_1, 0, 1024), -1);
+  ASSERT_EQ(m_sync_file_2->read_object_from_file(buf_2, 0, 1024), -1);
+
+  ASSERT_GT(m_sync_file_1->create(), 0);
+  ASSERT_GT(m_sync_file_2->create(), 0);
+  ASSERT_TRUE(m_sync_file_1->get_file_size() == 0);
+  ASSERT_TRUE(m_sync_file_2->get_file_size() == 0);
+  ASSERT_EQ(m_sync_file_1->read_object_from_file(buf_1, 0, 1024), 0);
+  ASSERT_EQ(m_sync_file_2->read_object_from_file(buf_2, 0, 1024), 0);
+
+  buf_1->append(std::string(1024, '0'));
+  buf_2->append(std::string(4096, '2'));
+  ASSERT_TRUE(m_sync_file_1->write_object_to_file(*buf_1, 1024) == 1024);
+  ASSERT_TRUE(m_sync_file_2->write_object_to_file(*buf_2, 4096) == 4096);
+  ASSERT_TRUE(m_sync_file_1->get_file_size() == 1024);
+  ASSERT_TRUE(m_sync_file_2->get_file_size() == 4096);
+  ASSERT_EQ(m_sync_file_1->read_object_from_file(buf_1, 0, 1024), 1024);
+  ASSERT_EQ(m_sync_file_2->read_object_from_file(buf_2, 0, 4096), 4096);
+
+  delete m_sync_file_1;
+  delete m_sync_file_2;
+  delete buf_1;
+  delete buf_2;
+}
index d31f3d240844abe4d5c9f6ba978d764febb50bfa..f8c53a3334d096f8c2beabc43ae7d0df660bf50f 100644 (file)
@@ -122,4 +122,5 @@ if(WITH_RBD)
   endif()
 endif(WITH_RBD)
 
+add_subdirectory(ceph_immutable_object_cache)
 add_subdirectory(ceph-dencoder)
diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
new file mode 100644 (file)
index 0000000..cf0decf
--- /dev/null
@@ -0,0 +1,23 @@
+set(ceph_immutable_object_cache_files
+  ObjectCacheFile.cc
+  ObjectCacheStore.cc
+  CacheController.cc
+  CacheServer.cc
+  CacheClient.cc
+  CacheSession.cc
+  SimplePolicy.cc)
+add_library(ceph_immutable_object_cache_lib STATIC ${ceph_immutable_object_cache_files})
+
+add_executable(ceph-immutable-object-cache
+  ObjectCacheFile.cc
+  ObjectCacheStore.cc
+  CacheController.cc
+  CacheServer.cc
+  CacheSession.cc
+  SimplePolicy.cc
+  main.cc)
+target_link_libraries(ceph-immutable-object-cache
+  librados-cxx
+  stdc++fs
+  global)
+install(TARGETS ceph-immutable-object-cache DESTINATION bin)
diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc
new file mode 100644 (file)
index 0000000..7a54c70
--- /dev/null
@@ -0,0 +1,228 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "CacheClient.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \
+                           << __func__ << ": "
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+  CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
+    : m_io_service_work(m_io_service),
+      m_dm_socket(m_io_service),
+      m_ep(stream_protocol::endpoint(file)),
+      m_io_thread(nullptr),
+      m_session_work(false),
+      cct(ceph_ctx)
+  {}
+
+  CacheClient::~CacheClient() {
+    stop();
+  }
+
+  void CacheClient::run(){
+     m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
+  }
+
+  bool CacheClient::is_session_work() {
+    return m_session_work.load() == true;
+  }
+
+  int CacheClient::stop() {
+    m_session_work.store(false);
+    m_io_service.stop();
+
+    if(m_io_thread != nullptr) {
+      m_io_thread->join();
+    }
+    return 0;
+  }
+
+  // just when error occur, call this method.
+  void CacheClient::close() {
+    m_session_work.store(false);
+    boost::system::error_code close_ec;
+    m_dm_socket.close(close_ec);
+    if(close_ec) {
+       ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+    }
+    ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl;
+  }
+
+  int CacheClient::connect() {
+    boost::system::error_code ec;
+    m_dm_socket.connect(m_ep, ec);
+    if(ec) {
+      if(ec == boost::asio::error::connection_refused) {
+        ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. "
+                  << "Now data will be read from ceph cluster " << dendl;
+      } else {
+        ldout(cct, 20) << "connect: " << ec.message() << dendl;
+      }
+
+      if(m_dm_socket.is_open()) {
+        // Set to indicate what error occurred, if any.
+        // Note that, even if the function indicates an error,
+        // the underlying descriptor is closed.
+        boost::system::error_code close_ec;
+        m_dm_socket.close(close_ec);
+        if(close_ec) {
+          ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+        }
+      }
+      return -1;
+    }
+
+    ldout(cct, 20) <<"connect success"<< dendl;
+
+    return 0;
+  }
+
+  int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size, Context* on_finish) {
+    // cache controller will init layout
+    rbdsc_req_type_t *message = new rbdsc_req_type_t();
+    message->type = RBDSC_REGISTER;
+    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+    memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
+    message->vol_size = vol_size;
+    message->offset = 0;
+    message->length = 0;
+
+    uint64_t ret;
+    boost::system::error_code ec;
+
+    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
+    if(ec) {
+      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
+      return -1;
+    }
+
+    if(ret != message->size()) {
+      ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
+      return -1;
+    }
+
+    // hard code TODO
+    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
+    if(ec == boost::asio::error::eof) {
+      ldout(cct, 20) << "recv eof" << dendl;
+      return -1;
+    }
+
+    if(ec) {
+      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
+      return -1;
+    }
+
+    if(ret != RBDSC_MSG_LEN) {
+      ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
+      return -1;
+    }
+
+    std::string reply_msg(m_recv_buffer, ret);
+    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(reply_msg.c_str());
+
+    if (io_ctx->type == RBDSC_REGISTER_REPLY) {
+      on_finish->complete(true);
+    } else {
+      on_finish->complete(false);
+    }
+
+    delete message;
+
+    ldout(cct, 20) << "register volume success" << dendl;
+
+    // TODO
+    m_session_work.store(true);
+
+    return 0;
+  }
+
+  // if occur any error, we just return false. Then read from rados.
+  int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+    rbdsc_req_type_t *message = new rbdsc_req_type_t();
+    message->type = RBDSC_READ;
+    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+    memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
+    memcpy(message->oid, object_id.c_str(), object_id.size());
+    message->vol_size = 0;
+    message->offset = 0;
+    message->length = 0;
+
+    boost::asio::async_write(m_dm_socket,
+                             boost::asio::buffer((char*)message, message->size()),
+                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
+          delete message;
+          if(err) {
+            ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl;
+            close();
+            on_finish->complete(false);
+            return;
+          }
+          if(cb != RBDSC_MSG_LEN) {
+            ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl;
+            close();
+            on_finish->complete(false);
+            return;
+          }
+          get_result(on_finish);
+    });
+
+    return 0;
+  }
+
+  void CacheClient::get_result(Context* on_finish) {
+    char* lookup_result = new char[RBDSC_MSG_LEN + 1];
+    boost::asio::async_read(m_dm_socket, boost::asio::buffer(lookup_result, RBDSC_MSG_LEN),
+                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+        [this, lookup_result, on_finish](const boost::system::error_code& err, size_t cb) {
+          if(err == boost::asio::error::eof ||
+            err == boost::asio::error::connection_reset ||
+            err == boost::asio::error::operation_aborted ||
+            err == boost::asio::error::bad_descriptor) {
+            ldout(cct, 20) << "fail to read lookup result" << err.message() << dendl;
+            close();
+            on_finish->complete(false);
+            delete lookup_result;
+            return;
+          }
+
+          if(err) {
+            ldout(cct, 1) << "fail to read lookup result" << err.message() << dendl;
+            close();
+            on_finish->complete(false);
+            delete lookup_result;
+            return;
+          }
+
+          if (cb != RBDSC_MSG_LEN) {
+            ldout(cct, 1) << "incomplete lookup result" << dendl;
+            close();
+            on_finish->complete(false);
+            delete lookup_result;
+            return;
+          }
+
+         rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(lookup_result);
+
+          if (io_ctx->type == RBDSC_READ_REPLY) {
+           on_finish->complete(true);
+          } else {
+           on_finish->complete(false);
+          }
+          delete lookup_result;
+          return;
+    });
+  }
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h
new file mode 100644 (file)
index 0000000..ad1f9f9
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_CLIENT_H
+#define CEPH_CACHE_CLIENT_H
+
+#include <atomic>
+#include <boost/asio.hpp>
+#include <boost/bind.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/algorithm/string.hpp>
+#include "librbd/ImageCtx.h"
+#include "include/ceph_assert.h"
+#include "include/Context.h"
+#include "SocketCommon.h"
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheClient {
+public:
+  CacheClient(const std::string& file, CephContext* ceph_ctx);
+  ~CacheClient();
+  void run();
+  bool is_session_work();
+
+  void close();
+  int stop();
+  int connect();
+
+  int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size, Context* on_finish);
+  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish);
+  void get_result(Context* on_finish);
+
+private:
+  boost::asio::io_service m_io_service;
+  boost::asio::io_service::work m_io_service_work;
+  stream_protocol::socket m_dm_socket;
+  ClientProcessMsg m_client_process_msg;
+  stream_protocol::endpoint m_ep;
+  char m_recv_buffer[1024];
+  std::shared_ptr<std::thread> m_io_thread;
+
+  // atomic modfiy for this variable.
+  // thread 1 : asio callback thread modify it.
+  // thread 2 : librbd read it.
+  std::atomic<bool> m_session_work;
+  CephContext* cct;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc
new file mode 100644 (file)
index 0000000..5b4652c
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "CacheController.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
+                           << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
+  m_args(args), m_cct(cct) {
+  ldout(m_cct, 20) << dendl;
+}
+
+CacheController::~CacheController() {
+  delete m_cache_server;
+  delete m_object_cache_store;
+}
+
+int CacheController::init() {
+  ldout(m_cct, 20) << dendl;
+
+  m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
+  //TODO(): make this configurable
+  int r = m_object_cache_store->init(true);
+  if (r < 0) {
+    lderr(m_cct) << "init error\n" << dendl;
+  }
+  return r;
+}
+
+int CacheController::shutdown() {
+  ldout(m_cct, 20) << dendl;
+
+  int r = m_object_cache_store->shutdown();
+  return r;
+}
+
+void CacheController::handle_signal(int signum){}
+
+void CacheController::run() {
+  try {
+    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
+    std::remove(controller_path.c_str());
+
+    m_cache_server = new CacheServer(m_cct, controller_path,
+      ([&](uint64_t p, std::string s){handle_request(p, s);}));
+
+    int ret = m_cache_server->run();
+    if (ret != 0) {
+      throw std::runtime_error("io serivce run error");
+    }
+  } catch (std::exception& e) {
+    lderr(m_cct) << "Exception: " << e.what() << dendl;
+  }
+}
+
+void CacheController::handle_request(uint64_t session_id, std::string msg){
+  ldout(m_cct, 20) << dendl;
+
+  rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+
+  switch (io_ctx->type) {
+    case RBDSC_REGISTER: {
+      // init cache layout for volume
+      m_object_cache_store->init_cache(io_ctx->pool_name, io_ctx->vol_name, io_ctx->vol_size);
+      io_ctx->type = RBDSC_REGISTER_REPLY;
+      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+
+      break;
+    }
+    case RBDSC_READ: {
+      // lookup object in local cache store
+      int ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name, io_ctx->oid);
+      if (ret < 0) {
+        io_ctx->type = RBDSC_READ_RADOS;
+      } else {
+        io_ctx->type = RBDSC_READ_REPLY;
+      }
+      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+
+      break;
+    }
+    ldout(m_cct, 5) << "can't recongize request" << dendl;
+    assert(0); // TODO replace it.
+  }
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h
new file mode 100644 (file)
index 0000000..39a85ea
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_CONTROLLER_H
+#define CEPH_CACHE_CONTROLLER_H
+
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/ceph_context.h"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.h"
+#include "include/ceph_assert.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "CacheServer.h"
+#include "ObjectCacheStore.h"
+
+#include <thread>
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheController {
+ public:
+  CacheController(CephContext *cct, const std::vector<const char*> &args);
+  ~CacheController();
+
+  int init();
+
+  int shutdown();
+
+  void handle_signal(int sinnum);
+
+  void run();
+
+  void handle_request(uint64_t sesstion_id, std::string msg);
+
+ private:
+  CacheServer *m_cache_server;
+  std::vector<const char*> m_args;
+  CephContext *m_cct;
+  ObjectCacheStore *m_object_cache_store;
+  ContextWQ* pcache_op_work_queue;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc
new file mode 100644 (file)
index 0000000..1b2fe2e
--- /dev/null
@@ -0,0 +1,114 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/ceph_context.h"
+#include "CacheServer.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \
+                           << __func__ << ": "
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheServer::CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg)
+  : cct(cct), m_server_process_msg(processmsg),
+    m_local_path(file), m_acceptor(m_io_service) {}
+
+CacheServer::~CacheServer() {
+  stop();
+}
+
+int CacheServer::run() {
+  ldout(cct, 20) << dendl;
+
+  int ret = start_accept();
+  if(ret != 0) {
+    return ret;
+  }
+
+  boost::system::error_code ec;
+  ret = m_io_service.run(ec);
+  if(ec) {
+    ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl;
+    return -1;
+  }
+  return 0;
+}
+
+int CacheServer::stop() {
+  m_io_service.stop();
+  return 0;
+}
+
+void CacheServer::send(uint64_t session_id, std::string msg) {
+  ldout(cct, 20) << dendl;
+
+  auto it = m_session_map.find(session_id);
+  if (it != m_session_map.end()) {
+    it->second->send(msg);
+  } else {
+    ldout(cct, 20) << "missing reply session id" << dendl;
+    assert(0);
+  }
+}
+
+int CacheServer::start_accept() {
+  ldout(cct, 20) << dendl;
+
+  boost::system::error_code ec;
+  m_acceptor.open(m_local_path.protocol(), ec);
+  if(ec) {
+    ldout(cct, 1) << "m_acceptor open fails: " << ec.message() << dendl;
+    return -1;
+  }
+
+  m_acceptor.bind(m_local_path, ec);
+  if(ec) {
+    ldout(cct, 1) << "m_acceptor bind fails: " << ec.message() << dendl;
+    return -1;
+  }
+
+  m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
+  if(ec) {
+    ldout(cct, 1) << "m_acceptor listen fails: " << ec.message() << dendl;
+    return -1;
+  }
+
+  accept();
+  return 0;
+}
+
+void CacheServer::accept() {
+
+  CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
+  m_acceptor.async_accept(new_session->socket(),
+      boost::bind(&CacheServer::handle_accept, this, new_session,
+        boost::asio::placeholders::error));
+}
+
+void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) {
+  ldout(cct, 20) << dendl;
+  if (error) {
+    // operation_absort
+    lderr(cct) << "async accept fails : " << error.message() << dendl;
+    return;
+  }
+
+  m_session_map.emplace(m_session_id, new_session);
+  // TODO : session setting
+  new_session->start();
+  m_session_id++;
+
+  // lanuch next accept
+  accept();
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h
new file mode 100644 (file)
index 0000000..05b31c5
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SERVER_H
+#define CEPH_CACHE_SERVER_H
+
+#include <cstdio>
+#include <iostream>
+#include <array>
+#include <memory>
+#include <string>
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/algorithm/string.hpp>
+
+#include "include/ceph_assert.h"
+#include "SocketCommon.h"
+#include "CacheSession.h"
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheServer {
+
+ public:
+  CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg);
+  ~CacheServer();
+
+  int run();
+  void send(uint64_t session_id, std::string msg);
+  int start_accept();
+  int stop();
+
+ private:
+  void accept();
+  void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
+
+ private:
+  CephContext* cct;
+  boost::asio::io_service m_io_service; // TODO wrapper it.
+  ProcessMsg m_server_process_msg;
+  stream_protocol::endpoint m_local_path;
+  stream_protocol::acceptor m_acceptor;
+  uint64_t m_session_id = 1;
+  std::map<uint64_t, CacheSessionPtr> m_session_map;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc
new file mode 100644 (file)
index 0000000..1c06bc7
--- /dev/null
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/ceph_context.h"
+#include "CacheSession.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
+                           << __func__ << ": "
+
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct)
+    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct)
+    {}
+
+CacheSession::~CacheSession() {
+  close();
+}
+
+stream_protocol::socket& CacheSession::socket() {
+  return m_dm_socket;
+}
+
+void CacheSession::close() {
+  if(m_dm_socket.is_open()) {
+    boost::system::error_code close_ec;
+    m_dm_socket.close(close_ec);
+    if(close_ec) {
+       ldout(cct, 20) << "close: " << close_ec.message() << dendl;
+    }
+  }
+}
+
+void CacheSession::start() {
+  handing_request();
+}
+
+void CacheSession::handing_request() {
+  boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
+                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+                          boost::bind(&CacheSession::handle_read,
+                                      shared_from_this(),
+                                      boost::asio::placeholders::error,
+                                      boost::asio::placeholders::bytes_transferred));
+}
+
+void CacheSession::handle_read(const boost::system::error_code& err, size_t bytes_transferred) {
+  if (err == boost::asio::error::eof ||
+     err == boost::asio::error::connection_reset ||
+     err == boost::asio::error::operation_aborted ||
+     err == boost::asio::error::bad_descriptor) {
+    ldout(cct, 20) << "fail to handle read : " << err.message() << dendl;
+    close();
+    return;
+  }
+
+  if(err) {
+    ldout(cct, 1) << "faile to handle read: " << err.message() << dendl;
+    return;
+  }
+
+  if(bytes_transferred != RBDSC_MSG_LEN) {
+    ldout(cct, 1) << "incomplete read" << dendl;
+    return;
+  }
+
+  process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+}
+
+void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
+  if (error) {
+    ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl;
+    assert(0);
+  }
+
+  if(bytes_transferred != RBDSC_MSG_LEN) {
+    ldout(cct, 20) << "session : reply in-complete. "<<dendl;
+    assert(0);
+  }
+
+  boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
+                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+                          boost::bind(&CacheSession::handle_read,
+                          shared_from_this(),
+                          boost::asio::placeholders::error,
+                          boost::asio::placeholders::bytes_transferred));
+
+}
+
+void CacheSession::send(std::string msg) {
+    boost::asio::async_write(m_dm_socket,
+        boost::asio::buffer(msg.c_str(), msg.size()),
+        boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+        boost::bind(&CacheSession::handle_write,
+                    shared_from_this(),
+                    boost::asio::placeholders::error,
+                    boost::asio::placeholders::bytes_transferred));
+
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h
new file mode 100644 (file)
index 0000000..b45e878
--- /dev/null
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SESSION_H
+#define CEPH_CACHE_SESSION_H
+
+#include <iostream>
+#include <string>
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/algorithm/string.hpp>
+
+#include "include/ceph_assert.h"
+#include "SocketCommon.h"
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheSession : public std::enable_shared_from_this<CacheSession> {
+public:
+  CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct);
+  ~CacheSession();
+
+  stream_protocol::socket& socket();
+  void start();
+  void close();
+  void handing_request();
+
+private:
+
+  void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
+
+  void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
+
+public:
+  void send(std::string msg);
+
+private:
+  uint64_t m_session_id;
+  stream_protocol::socket m_dm_socket;
+  ProcessMsg process_msg;
+  CephContext* cct;
+
+  // Buffer used to store data received from the client.
+  //std::array<char, 1024> data_;
+  char m_buffer[1024];
+};
+
+typedef std::shared_ptr<CacheSession> CacheSessionPtr;
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheFile.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheFile.cc
new file mode 100644 (file)
index 0000000..9786be1
--- /dev/null
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ObjectCacheFile.h"
+#include "include/Context.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <aio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <utility>
+
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::ObjectCacheFile: " << this << " " \
+                           <<  __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+SyncFile::SyncFile(CephContext *cct, const std::string &name)
+  : cct(cct), m_fd(-1) {
+  m_name = cct->_conf.get_val<std::string>("rbd_shared_cache_path") + "/ceph_immutable_obj_cache/" + name;
+  ldout(cct, 20) << "file path=" << m_name << dendl;
+}
+
+SyncFile::~SyncFile() {
+  // TODO force proper cleanup
+  if (m_fd != -1) {
+    ::close(m_fd);
+  }
+}
+
+int SyncFile::open_file() {
+  m_fd = ::open(m_name.c_str(), O_RDONLY);
+  if(m_fd == -1) {
+    lderr(cct) << "open fails : " << std::strerror(errno) << dendl;
+  }
+  return m_fd;
+}
+
+int SyncFile::create() {
+  m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC,
+                  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+  if(m_fd == -1) {
+    lderr(cct) << "create fails : " << std::strerror(errno) << dendl;
+  }
+  return m_fd;
+}
+
+void SyncFile::read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish) {
+  on_finish->complete(read_object_from_file(bl, offset, length));
+}
+
+void SyncFile::write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish) {
+  on_finish->complete(write_object_to_file(bl, bl.length()));
+}
+
+int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) {
+
+  ldout(cct, 20) << "cache file name:" << m_name
+                 << ", length:" << object_len <<  dendl;
+
+  // TODO(): aio
+  int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0);
+  if(ret < 0) {
+    lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl;
+    return ret;
+  }
+
+  return ret;
+}
+
+int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) {
+
+  ldout(cct, 20) << "offset:" << object_off
+                 << ", length:" << object_len <<  dendl;
+
+  bufferptr buf(object_len);
+
+  // TODO(): aio
+  int ret = pread(m_fd, buf.c_str(), object_len, object_off);
+  if(ret < 0) {
+    lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl;
+    return ret;
+  }
+  read_buf->append(std::move(buf));
+
+  return ret;
+}
+
+uint64_t SyncFile::get_file_size() {
+  struct stat buf;
+  if(m_fd == -1) {
+    lderr(cct)<<"get_file_size fail: file is closed status." << dendl;
+    assert(0);
+  }
+  int ret = fstat(m_fd, &buf);
+  if(ret == -1) {
+    lderr(cct)<<"fstat fail:" << std::strerror(errno) << dendl;
+    assert(0);
+  }
+  return buf.st_size;
+}
+
+
+} // namespace cache
+} // namespace librbd
diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheFile.h b/src/tools/ceph_immutable_object_cache/ObjectCacheFile.h
new file mode 100644 (file)
index 0000000..d29b441
--- /dev/null
@@ -0,0 +1,47 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_OBJECT_CACHE_SYNC_FILE_H
+#define CEPH_CACHE_OBJECT_CACHE_SYNC_FILE_H
+
+#include "include/Context.h"
+#include "include/buffer_fwd.h"
+#include "common/ceph_context.h"
+#include <sys/mman.h>
+#include <string>
+
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class SyncFile {
+public:
+  SyncFile(CephContext *cct, const std::string &name);
+  ~SyncFile();
+
+  // TODO use IO queue instead of individual commands so operations can be
+  // submitted in batch
+
+  // TODO use scatter/gather API
+
+  int create();
+  int open_file();
+
+  void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish);
+
+  void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish);
+
+  int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len);
+  int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len);
+  uint64_t get_file_size();
+
+private:
+  CephContext *cct;
+  std::string m_name;
+  int m_fd;
+};
+
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_STORE_SYNC_FILE
diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
new file mode 100644 (file)
index 0000000..7409e8f
--- /dev/null
@@ -0,0 +1,273 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ObjectCacheStore.h"
+#include "include/Context.h"
+#include "librbd/Utils.h"
+
+#include <experimental/filesystem>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
+                           << __func__ << ": "
+
+namespace efs = std::experimental::filesystem;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
+      : m_cct(cct), m_work_queue(work_queue), m_rados(new librados::Rados()),
+        m_ioctxs_lock("ceph::cache::ObjectCacheStore::m_ioctxs_lock") {
+
+  object_cache_entries =
+    m_cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
+
+  std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
+  m_cache_root_dir = cache_path + "/ceph_immutable_obj_cache/";
+
+  //TODO(): allow to set cache level
+  m_policy = new SimplePolicy(m_cct, object_cache_entries, 0.1);
+}
+
+ObjectCacheStore::~ObjectCacheStore() {
+  delete m_policy;
+}
+
+int ObjectCacheStore::init(bool reset) {
+  ldout(m_cct, 20) << dendl;
+
+  int ret = m_rados->init_with_context(m_cct);
+  if(ret < 0) {
+    lderr(m_cct) << "fail to init Ceph context" << dendl;
+    return ret;
+  }
+
+  ret = m_rados->connect();
+  if(ret < 0 ) {
+    lderr(m_cct) << "fail to conect to cluster" << dendl;
+    return ret;
+  }
+
+  //TODO(): fsck and reuse existing cache objects
+  if (reset) {
+    if (efs::exists(m_cache_root_dir)) {
+      efs::remove_all(m_cache_root_dir);
+    }
+    efs::create_directories(m_cache_root_dir);
+  }
+
+  evict_thd = new std::thread([this]{this->evict_thread_body();});
+  return ret;
+}
+
+void ObjectCacheStore::evict_thread_body() {
+  int ret;
+  while(m_evict_go) {
+    ret = evict_objects();
+  }
+}
+
+int ObjectCacheStore::shutdown() {
+  ldout(m_cct, 20) << dendl;
+
+  m_evict_go = false;
+  evict_thd->join();
+  m_rados->shutdown();
+  return 0;
+}
+
+int ObjectCacheStore::init_cache(std::string pool_name, std::string vol_name, uint64_t vol_size) {
+  ldout(m_cct, 20) << "pool name = " << pool_name
+                   << " volume name = " << vol_name
+                   << " volume size = " << vol_size << dendl;
+
+  std::string vol_cache_dir = m_cache_root_dir + pool_name + "_" + vol_name;
+
+  int dir = dir_num - 1;
+  while (dir >= 0) {
+    efs::create_directories(vol_cache_dir + "/" + std::to_string(dir));
+    dir --;
+  }
+  return 0;
+}
+
+int ObjectCacheStore::do_promote(std::string pool_name, std::string vol_name, std::string object_name) {
+  ldout(m_cct, 20) << "to promote object = "
+                   << object_name << " from pool: "
+                   << pool_name << dendl;
+
+  int ret = 0;
+  std::string cache_file_name =  pool_name + object_name;
+  std::string vol_cache_dir = pool_name + "_" + vol_name;
+  {
+    Mutex::Locker _locker(m_ioctxs_lock);
+    if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
+      librados::IoCtx* io_ctx = new librados::IoCtx();
+      ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
+      if (ret < 0) {
+        lderr(m_cct) << "fail to create ioctx" << dendl;
+        return ret;
+      }
+      m_ioctxs.emplace(pool_name, io_ctx);
+    }
+  }
+
+  assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
+
+  librados::IoCtx* ioctx = m_ioctxs[pool_name];
+
+  librados::bufferlist* read_buf = new librados::bufferlist();
+  uint32_t object_size = 4096*1024; //TODO(): read config from image metadata
+
+  auto ctx = new FunctionContext([this, read_buf, vol_cache_dir, cache_file_name,
+    object_size](int ret) {
+      handle_promote_callback(ret, read_buf, vol_cache_dir, cache_file_name, object_size);
+   });
+
+   return promote_object(ioctx, object_name, read_buf, object_size, ctx);
+}
+
+int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
+  std::string cache_dir, std::string cache_file_name, uint32_t object_size) {
+  ldout(m_cct, 20) << "cache dir: " << cache_dir
+                   << " cache_file_name: " << cache_file_name << dendl;
+
+  // rados read error
+  if(ret != -ENOENT && ret < 0) {
+    lderr(m_cct) << "fail to read from rados" << dendl;
+
+    m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+    delete read_buf;
+    return ret;
+  }
+
+  if (ret == -ENOENT) {
+    // object is empty
+    ret = 0;
+  }
+
+  if (ret < object_size) {
+    // object is partial, fill with '0'
+    read_buf->append(std::string(object_size - ret, '0'));
+  }
+
+  if (dir_num > 0) {
+    auto const pos = cache_file_name.find_last_of('.');
+    cache_dir = cache_dir + "/" + std::to_string(stoul(cache_file_name.substr(pos+1)) % dir_num);
+  }
+  // write to cache
+  SyncFile cache_file(m_cct, cache_dir + "/" + cache_file_name);
+  cache_file.create();
+
+  ret = cache_file.write_object_to_file(*read_buf, object_size);
+  if (ret < 0) {
+    lderr(m_cct) << "fail to write cache file" << dendl;
+
+    m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+    delete read_buf;
+    return ret;
+  }
+
+  // update metadata
+  assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name));
+  m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
+  assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
+
+  delete read_buf;
+  return ret;
+
+  evict_objects();
+}
+
+int ObjectCacheStore::lookup_object(std::string pool_name,
+    std::string vol_name, std::string object_name) {
+  ldout(m_cct, 20) << "object name = " << object_name
+                   << " in pool: " << pool_name << dendl;
+
+  int pret = -1;
+  cache_status_t ret = m_policy->lookup_object(pool_name + object_name);
+
+  switch(ret) {
+    case OBJ_CACHE_NONE: {
+      pret = do_promote(pool_name, vol_name, object_name);
+      if (pret < 0) {
+        lderr(m_cct) << "fail to start promote" << dendl;
+      }
+      return -1;
+    }
+    case OBJ_CACHE_PROMOTED:
+      return 0;
+    case OBJ_CACHE_SKIP:
+      return -1;
+    default:
+      lderr(m_cct) << "unrecognized object cache status." << dendl;
+      assert(0);
+  }
+}
+
+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name,
+                                     librados::bufferlist* read_buf, uint64_t read_len,
+                                     Context* on_finish) {
+  ldout(m_cct, 20) << "object name = " << object_name
+                   << " read len = " << read_len << dendl;
+
+  auto ctx = new FunctionContext([on_finish](int ret) {
+    on_finish->complete(ret);
+  });
+
+  librados::AioCompletion* read_completion = librbd::util::create_rados_callback(ctx);
+  int ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
+  if(ret < 0) {
+    lderr(m_cct) << "fail to read from rados" << dendl;
+  }
+  read_completion->release();
+
+  return ret;
+}
+
+int ObjectCacheStore::evict_objects() {
+  ldout(m_cct, 20) << dendl;
+
+  std::list<std::string> obj_list;
+  m_policy->get_evict_list(&obj_list);
+  for (auto& obj: obj_list) {
+    do_evict(obj);
+  }
+}
+
+int ObjectCacheStore::do_evict(std::string cache_file) {
+  ldout(m_cct, 20) << "file = " << cache_file << dendl;
+
+  //TODO(): need a better way to get file path
+  std::string pool_name = "rbd";
+
+  size_t pos1 = cache_file.rfind("rbd_data");
+  pool_name = cache_file.substr(0, pos1);
+
+  pos1 = cache_file.find_first_of('.');
+  size_t pos2 = cache_file.find_last_of('.');
+  std::string vol_name = cache_file.substr(pos1+1, pos2-pos1-1);
+
+  std::string cache_dir = m_cache_root_dir + pool_name + "_" + vol_name;
+
+   if (dir_num > 0) {
+    auto const pos = cache_file.find_last_of('.');
+    cache_dir = cache_dir + "/" + std::to_string(stoul(cache_file.substr(pos+1)) % dir_num);
+  }
+  std::string cache_file_path = cache_dir + "/" + cache_file;
+
+  ldout(m_cct, 20) << "delete file: " << cache_file_path << dendl;
+  int ret = std::remove(cache_file_path.c_str());
+   // evict entry in policy
+  if (ret == 0) {
+    m_policy->evict_entry(cache_file);
+  }
+
+  return ret;
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
new file mode 100644 (file)
index 0000000..7db93b2
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
+#define CEPH_CACHE_OBJECT_CACHE_STORE_H
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/ceph_context.h"
+#include "common/Mutex.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "ObjectCacheFile.h"
+#include "SimplePolicy.h"
+
+
+using librados::Rados;
+using librados::IoCtx;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+typedef shared_ptr<librados::Rados> RadosRef;
+typedef shared_ptr<librados::IoCtx> IoCtxRef;
+
+class ObjectCacheStore
+{
+  public:
+    ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
+    ~ObjectCacheStore();
+
+    int init(bool reset);
+
+    int shutdown();
+
+    int lookup_object(std::string pool_name, std::string vol_name, std::string object_name);
+
+    int init_cache(std::string pool_name, std::string vol_name, uint64_t vol_size);
+
+  private:
+    void evict_thread_body();
+    int evict_objects();
+
+    int do_promote(std::string pool_name, std::string vol_name, std::string object_name);
+
+    int promote_object(librados::IoCtx*, std::string object_name,
+                       librados::bufferlist* read_buf,
+                       uint64_t length, Context* on_finish);
+
+   int handle_promote_callback(int, bufferlist*, std::string, std::string, uint32_t);
+   int do_evict(std::string cache_file);
+
+    CephContext *m_cct;
+    ContextWQ* m_work_queue;
+    RadosRef m_rados;
+
+
+    std::map<std::string, librados::IoCtx*> m_ioctxs;
+    Mutex m_ioctxs_lock;
+
+    SyncFile *m_cache_file;
+
+    Policy* m_policy;
+    std::thread* evict_thd;
+    bool m_evict_go = false;
+
+    //TODO(): make this configurable
+    int dir_num = 10;
+    uint64_t object_cache_entries;
+    std::string m_cache_root_dir;
+};
+
+} // namespace ceph
+} // namespace immutable_obj_cache
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/Policy.h b/src/tools/ceph_immutable_object_cache/Policy.h
new file mode 100644 (file)
index 0000000..b68e5fa
--- /dev/null
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_POLICY_HPP
+#define CEPH_CACHE_POLICY_HPP
+
+#include <list>
+#include <string>
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+typedef enum {
+  OBJ_CACHE_NONE = 0,
+  OBJ_CACHE_PROMOTED,
+  OBJ_CACHE_SKIP,
+} cache_status_t;
+
+
+class Policy {
+public:
+  Policy(){}
+  virtual ~Policy(){};
+  virtual cache_status_t lookup_object(std::string) = 0;
+  virtual int evict_entry(std::string) = 0;
+  virtual void update_status(std::string, cache_status_t) = 0;
+  virtual cache_status_t get_status(std::string) = 0;
+  virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.cc b/src/tools/ceph_immutable_object_cache/SimplePolicy.cc
new file mode 100644 (file)
index 0000000..8177529
--- /dev/null
@@ -0,0 +1,219 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "SimplePolicy.h"
+
+#include <vector>
+#include <unordered_map>
+#include <string>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::SimplePolicy: " << this << " " \
+                           << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+SimplePolicy::SimplePolicy(CephContext *cct, uint64_t block_num, float watermark)
+  : cct(cct), m_watermark(watermark), m_entry_count(block_num), inflight_ops(0),
+    m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
+    m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") {
+  ldout(cct, 20) << dendl;
+  for(uint64_t i = 0; i < m_entry_count; i++) {
+    m_free_list.push_back(new Entry());
+  }
+}
+
+SimplePolicy::~SimplePolicy() {
+  ldout(cct, 20) << dendl;
+
+  for (auto it: m_cache_map) {
+    Entry* entry = reinterpret_cast<Entry*>(it.second);
+    delete entry;
+  }
+
+  for(auto it : m_free_list) {
+    Entry* entry = it;
+    delete entry;
+  }
+}
+
+cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
+  ldout(cct, 20) << "alloc entry for: " << file_name << dendl;
+
+  m_free_list_lock.lock();
+
+  //TODO(): make the max inflight ops configurable
+  if (m_free_list.size() && (inflight_ops < 128)) {
+    Entry* entry = m_free_list.front();
+    ceph_assert(entry != nullptr);
+    m_free_list.pop_front();
+    m_free_list_lock.unlock();
+
+    {
+      RWLock::WLocker wlocker(m_cache_map_lock);
+      m_cache_map[file_name] = entry;
+    }
+    update_status(file_name, OBJ_CACHE_SKIP);
+    return OBJ_CACHE_NONE;
+  }
+
+  m_free_list_lock.unlock();
+  // if there's no free entry, return skip to read from rados
+  return OBJ_CACHE_SKIP;
+}
+
+cache_status_t SimplePolicy::lookup_object(std::string file_name) {
+  ldout(cct, 20) << "lookup: " << file_name << dendl;
+
+  RWLock::RLocker rlocker(m_cache_map_lock);
+
+  auto entry_it = m_cache_map.find(file_name);
+  // simplely promote on first lookup
+  if (entry_it == m_cache_map.end()) {
+      rlocker.unlock();
+      return alloc_entry(file_name);
+  }
+
+  Entry* entry = entry_it->second;
+
+  if (entry->status == OBJ_CACHE_PROMOTED) {
+    // bump pos in lru on hit
+    m_promoted_lru.lru_touch(entry);
+  }
+
+  return entry->status;
+}
+
+void SimplePolicy::update_status(std::string file_name, cache_status_t new_status) {
+  ldout(cct, 20) << "update status for: " << file_name
+                 << " new status = " << new_status << dendl;
+
+  RWLock::WLocker locker(m_cache_map_lock);
+
+  auto entry_it = m_cache_map.find(file_name);
+  if (entry_it == m_cache_map.end()) {
+    return;
+  }
+
+  ceph_assert(entry_it != m_cache_map.end());
+  Entry* entry = entry_it->second;
+
+  // to promote
+  if (entry->status == OBJ_CACHE_NONE && new_status== OBJ_CACHE_SKIP) {
+    entry->status = new_status;
+    entry->file_name = file_name;
+    inflight_ops++;
+    return;
+  }
+
+  // promoting done
+  if (entry->status == OBJ_CACHE_SKIP && new_status== OBJ_CACHE_PROMOTED) {
+    m_promoted_lru.lru_insert_top(entry);
+    entry->status = new_status;
+    inflight_ops--;
+    return;
+  }
+
+  // promoting failed
+  if (entry->status == OBJ_CACHE_SKIP && new_status== OBJ_CACHE_NONE) {
+    // mark this entry as free
+    entry->file_name = "";
+    entry->status = new_status;
+    {
+      Mutex::Locker free_list_locker(m_free_list_lock);
+      m_free_list.push_back(entry);
+    }
+    m_cache_map.erase(entry_it);
+    inflight_ops--;
+    return;
+  }
+
+  // to evict
+  if (entry->status == OBJ_CACHE_PROMOTED && new_status== OBJ_CACHE_NONE) {
+    // mark this entry as free
+    entry->file_name = "";
+    entry->status = new_status;
+    {
+      Mutex::Locker free_list_locker(m_free_list_lock);
+      m_free_list.push_back(entry);
+    }
+    m_promoted_lru.lru_remove(entry);
+    m_cache_map.erase(entry_it);
+    return;
+  }
+
+}
+
+int SimplePolicy::evict_entry(std::string file_name) {
+  ldout(cct, 20) << "to evict: " << file_name << dendl;
+
+  update_status(file_name, OBJ_CACHE_NONE);
+
+  return 0;
+}
+
+cache_status_t SimplePolicy::get_status(std::string file_name) {
+  ldout(cct, 20) << file_name << dendl;
+
+  RWLock::RLocker locker(m_cache_map_lock);
+  auto entry_it = m_cache_map.find(file_name);
+  if(entry_it == m_cache_map.end()) {
+    return OBJ_CACHE_NONE;
+  }
+
+  return entry_it->second->status;
+}
+
+void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
+  ldout(cct, 20) << dendl;
+
+  RWLock::WLocker locker(m_cache_map_lock);
+  // check free ratio, pop entries from LRU
+  if ((float)m_free_list.size() / m_entry_count < m_watermark) {
+    int evict_num = m_entry_count * 0.1; //TODO(): make this configurable
+    for (int i = 0; i < evict_num; i++) {
+      Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
+      if (entry == nullptr) {
+        continue;
+      }
+      std::string file_name = entry->file_name;
+      obj_list->push_back(file_name);
+
+    }
+  }
+}
+
+// for unit test
+uint64_t SimplePolicy::get_free_entry_num() {
+  Mutex::Locker free_list_locker(m_free_list_lock);
+  return m_free_list.size();
+}
+
+uint64_t SimplePolicy::get_promoting_entry_num() {
+  uint64_t index = 0;
+  RWLock::RLocker rlocker(m_cache_map_lock);
+  for (auto it : m_cache_map) {
+    if (it.second->status == OBJ_CACHE_SKIP) {
+      index++;
+    }
+  }
+  return index;
+}
+
+uint64_t SimplePolicy::get_promoted_entry_num() {
+  return m_promoted_lru.lru_get_size();
+}
+
+std::string SimplePolicy::get_evict_entry() {
+  Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire());
+  if (entry == nullptr) {
+    return "";
+  }
+  return entry->file_name;
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.h b/src/tools/ceph_immutable_object_cache/SimplePolicy.h
new file mode 100644 (file)
index 0000000..ac0fbf8
--- /dev/null
@@ -0,0 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP
+#define CEPH_CACHE_SIMPLE_POLICY_HPP
+
+#include "common/ceph_context.h"
+#include "common/RWLock.h"
+#include "common/Mutex.h"
+#include "common/debug.h"
+#include "include/lru.h"
+#include "Policy.h"
+
+#include <vector>
+#include <unordered_map>
+#include <string>
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class SimplePolicy : public Policy {
+public:
+  SimplePolicy(CephContext *cct, uint64_t block_num, float watermark);
+  ~SimplePolicy() ;
+
+  cache_status_t lookup_object(std::string file_name);
+  cache_status_t get_status(std::string file_name);
+
+  void update_status(std::string file_name, cache_status_t new_status);
+
+  int evict_entry(std::string file_name);
+
+  void get_evict_list(std::list<std::string>* obj_list);
+
+  uint64_t get_free_entry_num();
+  uint64_t get_promoting_entry_num();
+  uint64_t get_promoted_entry_num();
+  std::string get_evict_entry();
+
+private:
+  cache_status_t alloc_entry(std::string file_name);
+
+  class Entry : public LRUObject {
+    public:
+      cache_status_t status;
+      Entry() : status(OBJ_CACHE_NONE){}
+      std::string file_name;
+  };
+
+  CephContext* cct;
+  float m_watermark;
+  uint64_t m_entry_count;
+  std::atomic<uint8_t> inflight_ops;
+
+  std::unordered_map<std::string, Entry*> m_cache_map;
+  RWLock m_cache_map_lock;
+
+  std::deque<Entry*> m_free_list;
+  Mutex m_free_list_lock;
+
+  LRU m_promoted_lru;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h
new file mode 100644 (file)
index 0000000..7bb3af0
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SOCKET_COMMON_H
+#define CEPH_CACHE_SOCKET_COMMON_H
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+static const int RBDSC_REGISTER        =  0X11;
+static const int RBDSC_READ            =  0X12;
+static const int RBDSC_LOOKUP          =  0X13;
+static const int RBDSC_REGISTER_REPLY  =  0X14;
+static const int RBDSC_READ_REPLY      =  0X15;
+static const int RBDSC_LOOKUP_REPLY    =  0X16;
+static const int RBDSC_READ_RADOS      =  0X17;
+
+
+
+typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+typedef std::function<void(std::string)> ClientProcessMsg;
+typedef uint8_t rbdsc_req_type;
+
+//TODO(): switch to bufferlist
+struct rbdsc_req_type_t {
+  rbdsc_req_type type;
+  uint64_t vol_size;
+  uint64_t offset;
+  uint64_t length;
+  char pool_name[256];
+  char vol_name[256];
+  char oid[256];
+
+  uint64_t size() {
+    return sizeof(rbdsc_req_type_t);
+  }
+
+  std::string to_buffer() {
+    std::stringstream ss;
+    ss << type;
+    ss << vol_size;
+    ss << offset;
+    ss << length;
+    ss << pool_name;
+    ss << vol_name;
+
+    return ss.str();
+  }
+};
+
+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif
diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc
new file mode 100644 (file)
index 0000000..28b0a27
--- /dev/null
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "CacheController.h"
+
+#include <vector>
+
+ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
+
+void usage() {
+  std::cout << "usage: cache controller [options...]" << std::endl;
+  std::cout << "options:\n";
+  std::cout << "  -m monaddress[:port]      connect to specified monitor\n";
+  std::cout << "  --keyring=<path>          path to keyring for local cluster\n";
+  std::cout << "  --log-file=<logfile>       file to log debug output\n";
+  std::cout << "  --debug-rbd-cachecontroller=<log-level>/<memory-level>  set rbd-mirror debug level\n";
+  generic_server_usage();
+}
+
+static void handle_signal(int signum)
+{
+  if (cachectl)
+    cachectl->handle_signal(signum);
+}
+
+int main(int argc, const char **argv)
+{
+  std::vector<const char*> args;
+  env_to_vec(args);
+  argv_to_vec(argc, argv, args);
+
+  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+                        CODE_ENVIRONMENT_DAEMON,
+                        CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+
+  for (auto i = args.begin(); i != args.end(); ++i) {
+    if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
+      usage();
+      return EXIT_SUCCESS;
+    }
+  }
+
+  if (g_conf()->daemonize) {
+    global_init_daemonize(g_ceph_context);
+  }
+
+  common_init_finish(g_ceph_context);
+
+  init_async_signal_handler();
+  register_async_signal_handler(SIGHUP, sighup_handler);
+  register_async_signal_handler_oneshot(SIGINT, handle_signal);
+  register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+  std::vector<const char*> cmd_args;
+  argv_to_vec(argc, argv, cmd_args);
+
+  // disable unnecessary librbd cache
+  g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
+
+  cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args);
+  int r = cachectl->init();
+  if (r < 0) {
+    std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
+    goto cleanup;
+  }
+
+  cachectl->run();
+
+ cleanup:
+  unregister_async_signal_handler(SIGHUP, sighup_handler);
+  unregister_async_signal_handler(SIGINT, handle_signal);
+  unregister_async_signal_handler(SIGTERM, handle_signal);
+  shutdown_async_signal_handler();
+
+  delete cachectl;
+
+  return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+}
diff --git a/systemd/ceph-immutable-object-cache.target b/systemd/ceph-immutable-object-cache.target
new file mode 100644 (file)
index 0000000..ed51295
--- /dev/null
@@ -0,0 +1,6 @@
+[Unit]
+Description=ceph target allowing to start/stop all ceph-immutable-object-cache@.service instances at once
+PartOf=ceph.target
+Before=ceph.target
+[Install]
+WantedBy=multi-user.target ceph.target
diff --git a/systemd/ceph-immutable-object-cache@.service.in b/systemd/ceph-immutable-object-cache@.service.in
new file mode 100644 (file)
index 0000000..b9f1111
--- /dev/null
@@ -0,0 +1,24 @@
+[Unit]
+Description=Ceph immutable object cache daemon
+After=network-online.target local-fs.target
+Wants=network-online.target local-fs.target
+PartOf=ceph-immutable-object-cache.target
+
+[Service]
+LimitNOFILE=1048576
+LimitNPROC=1048576
+EnvironmentFile=-@SYSTEMD_ENV_FILE@
+Environment=CLUSTER=ceph
+ExecStart=/usr/bin/ceph-immutable-object-cache -f --cluster ${CLUSTER} --id %i --setuser ceph --setgroup ceph
+ExecReload=/bin/kill -HUP $MAINPID
+PrivateDevices=yes
+ProtectHome=true
+ProtectSystem=full
+PrivateTmp=true
+Restart=on-failure
+StartLimitInterval=30min
+StartLimitBurst=3
+TasksMax=infinity
+
+[Install]
+WantedBy=ceph-immutable-object-cache.target