]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: added interface to flush pending commit position updates
authorJason Dillaman <dillaman@redhat.com>
Fri, 19 Feb 2016 01:41:52 +0000 (20:41 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 19 Feb 2016 03:53:10 +0000 (22:53 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/Journaler.cc
src/journal/Journaler.h

index f8d55742892ef20cdccbac8fb2ffcd74d4c29618..d13bbed9bc3d53d11251aeef01a0a1ee4d884a3b 100644 (file)
@@ -243,6 +243,21 @@ struct C_GetTags : public Context {
   }
 };
 
+struct C_FlushCommitPosition : public Context {
+  Context *commit_position_ctx;
+  Context *on_finish;
+
+  C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
+    : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
+  }
+  virtual void finish(int r) override {
+    if (commit_position_ctx != nullptr) {
+      commit_position_ctx->complete(r);
+    }
+    on_finish->complete(r);
+  }
+};
+
 } // anonymous namespace
 
 JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
@@ -434,19 +449,32 @@ void JournalMetadata::set_active_set(uint64_t object_set) {
 }
 
 void JournalMetadata::flush_commit_position() {
-
   ldout(m_cct, 20) << __func__ << dendl;
 
-  {
-    Mutex::Locker timer_locker(m_timer_lock);
-    Mutex::Locker locker(m_lock);
-    if (m_commit_position_task_ctx == NULL) {
-      return;
-    }
+  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker locker(m_lock);
+  if (m_commit_position_ctx == nullptr) {
+    return;
+  }
+
+  cancel_commit_task();
+  handle_commit_position_task();
+}
 
-    m_timer->cancel_event(m_commit_position_task_ctx);
-    m_commit_position_task_ctx = NULL;
+void JournalMetadata::flush_commit_position(Context *on_safe) {
+  ldout(m_cct, 20) << __func__ << dendl;
+
+  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker locker(m_lock);
+  if (m_commit_position_ctx == nullptr) {
+    // nothing to flush
+    m_finisher->queue(on_safe, 0);
+    return;
   }
+
+  m_commit_position_ctx = new C_FlushCommitPosition(
+    m_commit_position_ctx, on_safe);
+  cancel_commit_task();
   handle_commit_position_task();
 }
 
@@ -554,13 +582,24 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
   }
 }
 
-void JournalMetadata::schedule_commit_task() {
-
+void JournalMetadata::cancel_commit_task() {
   ldout(m_cct, 20) << __func__ << dendl;
 
   assert(m_timer_lock.is_locked());
   assert(m_lock.is_locked());
+  assert(m_commit_position_ctx != nullptr);
+  assert(m_commit_position_task_ctx != nullptr);
+
+  m_timer->cancel_event(m_commit_position_task_ctx);
+  m_commit_position_task_ctx = NULL;
+}
 
+void JournalMetadata::schedule_commit_task() {
+  ldout(m_cct, 20) << __func__ << dendl;
+
+  assert(m_timer_lock.is_locked());
+  assert(m_lock.is_locked());
+  assert(m_commit_position_ctx != nullptr);
   if (m_commit_position_task_ctx == NULL) {
     m_commit_position_task_ctx = new C_CommitPositionTask(this);
     m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
@@ -568,11 +607,10 @@ void JournalMetadata::schedule_commit_task() {
 }
 
 void JournalMetadata::handle_commit_position_task() {
-
+  assert(m_timer_lock.is_locked());
+  assert(m_lock.is_locked());
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-
   librados::ObjectWriteOperation op;
   client::client_commit(&op, m_client_id, m_commit_position);
 
index 450169b61d6e16bb10d1b57822bba4321dd745c7..f3898dab132a57ab38cc80500a8fed1342cdc4fc 100644 (file)
@@ -99,6 +99,7 @@ public:
   }
 
   void flush_commit_position();
+  void flush_commit_position(Context *on_safe);
   void set_commit_position(const ObjectSetPosition &commit_position,
                            Context *on_safe);
   void get_commit_position(ObjectSetPosition *commit_position) const {
@@ -185,6 +186,7 @@ private:
       journal_metadata->m_async_op_tracker.finish_op();
     }
     virtual void finish(int r) {
+      Mutex::Locker locker(journal_metadata->m_lock);
       journal_metadata->handle_commit_position_task();
     };
   };
@@ -309,6 +311,7 @@ private:
   void refresh(Context *on_finish);
   void handle_refresh_complete(C_Refresh *refresh, int r);
 
+  void cancel_commit_task();
   void schedule_commit_task();
   void handle_commit_position_task();
 
index 1eb7e2c2cb6d07ccc2c516f123cb6fdd4cffcc72..d3fb225b1d55d18fc9935457cdb25cd4685a3eef 100644 (file)
@@ -159,6 +159,10 @@ int Journaler::remove(bool force) {
   return 0;
 }
 
+void Journaler::flush_commit_position(Context *on_safe) {
+  m_metadata->flush_commit_position(on_safe);
+}
+
 int Journaler::register_client(const bufferlist &data) {
   return m_metadata->register_client(data);
 }
@@ -240,7 +244,7 @@ void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
 void Journaler::stop_append(Context *on_safe) {
   assert(m_recorder != NULL);
 
-  flush(new C_DeleteRecorder(m_recorder, on_safe));
+  flush_append(new C_DeleteRecorder(m_recorder, on_safe));
   m_recorder = NULL;
 }
 
@@ -248,7 +252,7 @@ Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) {
   return m_recorder->append(tag_tid, payload_bl);
 }
 
-void Journaler::flush(Context *on_safe) {
+void Journaler::flush_append(Context *on_safe) {
   m_recorder->flush(on_safe);
 }
 
index 6702eb4dffb4b2cbdb839975a2165c8f6747c361..324dbbc72bb6c463711069992e9c88ff5dbb77c0 100644 (file)
@@ -48,6 +48,8 @@ public:
   int register_client(const bufferlist &data);
   int unregister_client();
 
+  void flush_commit_position(Context *on_safe);
+
   void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
                     Context *on_finish);
   void allocate_tag(uint64_t tag_class, const bufferlist &data,
@@ -61,7 +63,7 @@ public:
 
   void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
   Future append(uint64_t tag_tid, const bufferlist &bl);
-  void flush(Context *on_safe);
+  void flush_append(Context *on_safe);
   void stop_append(Context *on_safe);
 
   void committed(const ReplayEntry &replay_entry);