]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: added additional callbacks for async ops
authorJason Dillaman <dillaman@redhat.com>
Sat, 11 Jul 2015 02:56:27 +0000 (22:56 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/Journaler.cc
src/journal/Journaler.h

index e5087e5ebc161d3c7c12e071c87031129dbe3e95..499100e356e007fcb0851afc952e161145215302 100644 (file)
@@ -43,40 +43,27 @@ JournalMetadata::~JournalMetadata() {
   rados.watch_flush();
 }
 
-int JournalMetadata::init() {
+void JournalMetadata::init(Context *on_init) {
   assert(!m_initialized);
   m_initialized = true;
 
-  int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order,
-                                         &m_splay_width);
-  if (r < 0) {
-    lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: "
-                 << cpp_strerror(r) << dendl;
-    return r;
-  }
-
   m_timer = new SafeTimer(m_cct, m_timer_lock, false);
   m_timer->init();
 
-  r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+  int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
   if (r < 0) {
     lderr(m_cct) << __func__ << ": failed to watch journal"
                  << cpp_strerror(r) << dendl;
-    return r;
+    on_init->complete(r);
+    return;
   }
 
-  C_SaferCond cond;
-  refresh(&cond);
-  r = cond.wait();
-  if (r < 0) {
-    return r;
-  }
-  return 0;
+  C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init);
+  client::get_immutable_metadata(m_ioctx, m_oid, &m_order, &m_splay_width,
+                                 ctx);
 }
 
 int JournalMetadata::register_client(const std::string &description) {
-  assert(!m_client_id.empty());
-
   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
   int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
   if (r < 0) {
@@ -211,8 +198,20 @@ bool JournalMetadata::get_last_allocated_tid(const std::string &tag,
   return true;
 }
 
+void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
+  if (r < 0) {
+    lderr(m_cct) << "failed to initialize immutable metadata: "
+                 << cpp_strerror(r) << dendl;
+    on_init->complete(r);
+    return;
+  }
+
+  ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
+  refresh(on_init);
+}
+
 void JournalMetadata::refresh(Context *on_complete) {
-  ldout(m_cct, 10) << "refreshing journal metadata" << dendl;
+  ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
   C_Refresh *refresh = new C_Refresh(this, on_complete);
   client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set,
                                &refresh->active_set,
@@ -220,7 +219,7 @@ void JournalMetadata::refresh(Context *on_complete) {
 }
 
 void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
-  ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl;
+  ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
   if (r == 0) {
     Mutex::Locker locker(m_lock);
 
index 81a539576f4a852aa20983a046d931c8f324d7a8..8763edd3edb00237a130e84cdc7c78588c72ab44 100644 (file)
@@ -42,7 +42,7 @@ public:
                   const std::string &client_id, double commit_interval);
   ~JournalMetadata();
 
-  int init();
+  void init(Context *on_init);
 
   void add_listener(Listener *listener);
   void remove_listener(Listener *listener);
@@ -159,6 +159,19 @@ private:
     }
   };
 
+  struct C_ImmutableMetadata : public Context {
+    JournalMetadataPtr journal_metadata;
+    Context *on_finish;
+
+    C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
+      : journal_metadata(_journal_metadata), on_finish(_on_finish) {
+    }
+
+    virtual void finish(int r) {
+      journal_metadata->handle_immutable_metadata(r, on_finish);
+    }
+
+  };
   struct C_Refresh : public Context {
     JournalMetadataPtr journal_metadata;
     uint64_t minimum_set;
@@ -209,6 +222,8 @@ private:
   ObjectSetPosition m_commit_position;
   Context *m_commit_position_ctx;
 
+  void handle_immutable_metadata(int r, Context *on_init);
+
   void refresh(Context *on_finish);
   void handle_refresh_complete(C_Refresh *refresh, int r);
 
index 82b2ef926c7a5126a502e1e2a9812789c17aef6c..efca93d688aff3d0f76bdd291b287ecb3d1e3a21 100644 (file)
@@ -31,7 +31,8 @@ static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
 using namespace cls::journal;
 
 Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
-                     uint64_t journal_id, const std::string &client_id)
+                     const std::string &journal_id,
+                     const std::string &client_id)
   : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
     m_trimmer(NULL)
 {
@@ -39,8 +40,8 @@ Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
   m_data_ioctx.dup(data_ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
 
-  m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id);
-  m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + ".";
+  m_header_oid = JOURNAL_HEADER_PREFIX + journal_id;
+  m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + journal_id + ".";
 
   // TODO configurable commit interval
   m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
@@ -60,8 +61,8 @@ Journaler::~Journaler() {
   assert(m_recorder == NULL);
 }
 
-int Journaler::init() {
-  return m_metadata->init();
+void Journaler::init(Context *on_init) {
+  m_metadata->init(on_init);
 }
 
 int Journaler::create(uint8_t order, uint8_t splay_width) {
@@ -136,17 +137,30 @@ void Journaler::start_append() {
                                    m_metadata, 0, 0, 0);
 }
 
-void Journaler::stop_append() {
+int Journaler::stop_append() {
   assert(m_recorder != NULL);
-  m_recorder->flush();
+
+  C_SaferCond cond;
+  flush(&cond);
+  int r = cond.wait();
+  if (r < 0) {
+    return r;
+  }
+
   delete m_recorder;
   m_recorder = NULL;
+  return 0;
 }
 
 Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
   return m_recorder->append(tag, payload_bl);
 }
 
+void Journaler::flush(Context *on_safe) {
+  // TODO pass ctx
+  m_recorder->flush();
+}
+
 void Journaler::create_player(ReplayHandler *replay_handler) {
   assert(m_player == NULL);
   m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
index 7de2bbcf741862f0636a23d3ddbc3caddbc843e4..c6abf711007c5d3508adb2362ece3a12b0d8ff1d 100644 (file)
@@ -13,6 +13,7 @@
 #include <map>
 #include "include/assert.h"
 
+class Context;
 class SafeTimer;
 
 namespace journal {
@@ -26,13 +27,13 @@ class ReplayHandler;
 class Journaler {
 public:
   Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
-            uint64_t journal_id, const std::string &client_id);
+            const std::string &journal_id, const std::string &client_id);
   ~Journaler();
 
-  int init();
-
   int create(uint8_t order, uint8_t splay_width);
 
+  void init(Context *on_init);
+
   int register_client(const std::string &description);
   int unregister_client();
 
@@ -45,7 +46,8 @@ public:
 
   void start_append();
   Future append(const std::string &tag, const bufferlist &bl);
-  void stop_append();
+  void flush(Context *on_safe);
+  int stop_append();
 
 private:
   librados::IoCtx m_header_ioctx;