]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: flush commit positions should wait for refresh 21206/head
authorJason Dillaman <dillaman@redhat.com>
Tue, 13 Feb 2018 15:05:01 +0000 (10:05 -0500)
committerNathan Cutler <ncutler@suse.com>
Tue, 3 Apr 2018 09:17:25 +0000 (11:17 +0200)
Fixes: http://tracker.ceph.com/issues/22945
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 24df022e0b34788953734c2e622c336077958e78)

Conflicts:
src/journal/JournalMetadata.cc - trivial resolution

src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h

index 3675e31c545ec5ff7c029a2a8210131b59938e02..50c8c3688f7e351d8824bc5275998cd783f873a7 100644 (file)
@@ -673,14 +673,9 @@ void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
 void JournalMetadata::flush_commit_position() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker timer_locker(*m_timer_lock);
-  Mutex::Locker locker(m_lock);
-  if (m_commit_position_ctx == nullptr) {
-    return;
-  }
-
-  cancel_commit_task();
-  handle_commit_position_task();
+  C_SaferCond ctx;
+  flush_commit_position(&ctx);
+  ctx.wait();
 }
 
 void JournalMetadata::flush_commit_position(Context *on_safe) {
@@ -688,7 +683,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
 
   Mutex::Locker timer_locker(*m_timer_lock);
   Mutex::Locker locker(m_lock);
-  if (m_commit_position_ctx == nullptr) {
+  if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
     // nothing to flush
     if (on_safe != nullptr) {
       m_work_queue->queue(on_safe, 0);
@@ -697,9 +692,12 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
   }
 
   if (on_safe != nullptr) {
-    m_commit_position_ctx = new C_FlushCommitPosition(
-      m_commit_position_ctx, on_safe);
+    m_flush_commit_position_ctxs.push_back(on_safe);
   }
+  if (m_commit_position_ctx == nullptr) {
+    return;
+  }
+
   cancel_commit_task();
   handle_commit_position_task();
 }
@@ -807,7 +805,6 @@ void JournalMetadata::cancel_commit_task() {
   assert(m_lock.is_locked());
   assert(m_commit_position_ctx != nullptr);
   assert(m_commit_position_task_ctx != nullptr);
-
   m_timer->cancel_event(m_commit_position_task_ctx);
   m_commit_position_task_ctx = NULL;
 }
@@ -818,7 +815,7 @@ void JournalMetadata::schedule_commit_task() {
   assert(m_timer_lock->is_locked());
   assert(m_lock.is_locked());
   assert(m_commit_position_ctx != nullptr);
-  if (m_commit_position_task_ctx == NULL) {
+  if (m_commit_position_task_ctx == nullptr) {
     m_commit_position_task_ctx = new C_CommitPositionTask(this);
     m_timer->add_event_after(m_settings.commit_interval,
                              m_commit_position_task_ctx);
@@ -832,22 +829,51 @@ void JournalMetadata::handle_commit_position_task() {
                    << "client_id=" << m_client_id << ", "
                    << "commit_position=" << m_commit_position << dendl;
 
-  librados::ObjectWriteOperation op;
-  client::client_commit(&op, m_client_id, m_commit_position);
+  m_commit_position_task_ctx = nullptr;
+  Context* commit_position_ctx = nullptr;
+  std::swap(commit_position_ctx, m_commit_position_ctx);
 
-  Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
-  m_commit_position_ctx = NULL;
+  m_async_op_tracker.start_op();
+  ++m_flush_commits_in_progress;
 
-  ctx = schedule_laggy_clients_disconnect(ctx);
+  Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
+      Contexts flush_commit_position_ctxs;
+      m_lock.Lock();
+      assert(m_flush_commits_in_progress > 0);
+      --m_flush_commits_in_progress;
+      if (m_flush_commits_in_progress == 0) {
+        std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
+      }
+      m_lock.Unlock();
 
-  librados::AioCompletion *comp =
-    librados::Rados::aio_create_completion(ctx, NULL,
-                                           utils::rados_ctx_callback);
+      commit_position_ctx->complete(0);
+      for (auto ctx : flush_commit_position_ctxs) {
+        ctx->complete(0);
+      }
+      m_async_op_tracker.finish_op();
+    });
+  ctx = new C_NotifyUpdate(this, ctx);
+  ctx = new FunctionContext([this, ctx](int r) {
+      // manually kick of a refresh in case the notification is missed
+      // and ignore the next notification that we are about to send
+      m_lock.Lock();
+      ++m_ignore_watch_notifies;
+      m_lock.Unlock();
+
+      refresh(ctx);
+    });
+  ctx = new FunctionContext([this, ctx](int r) {
+      schedule_laggy_clients_disconnect(ctx);
+    });
+
+  librados::ObjectWriteOperation op;
+  client::client_commit(&op, m_client_id, m_commit_position);
+
+  auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
+                                                     utils::rados_ctx_callback);
   int r = m_ioctx.aio_operate(m_oid, comp, &op);
   assert(r == 0);
   comp->release();
-
-  m_commit_position_task_ctx = NULL;
 }
 
 void JournalMetadata::schedule_watch_reset() {
@@ -884,6 +910,14 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
   bufferlist bl;
   m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
 
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_ignore_watch_notifies > 0) {
+      --m_ignore_watch_notifies;
+      return;
+    }
+  }
+
   refresh(NULL);
 }
 
@@ -1060,57 +1094,55 @@ void JournalMetadata::handle_notified(int r) {
   ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
 }
 
-Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
-  assert(m_lock.is_locked());
-
+void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
   ldout(m_cct, 20) << __func__ << dendl;
-
   if (m_settings.max_concurrent_object_sets <= 0) {
-    return on_finish;
+    on_finish->complete(0);
+    return;
   }
 
   Context *ctx = on_finish;
+  {
+    Mutex::Locker locker(m_lock);
+    for (auto &c : m_registered_clients) {
+      if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+          c.id == m_client_id ||
+          m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+        continue;
+      }
+      const std::string &client_id = c.id;
+      uint64_t object_set = 0;
+      if (!c.commit_position.object_positions.empty()) {
+        auto &position = *(c.commit_position.object_positions.begin());
+        object_set = position.object_number / m_splay_width;
+      }
 
-  for (auto &c : m_registered_clients) {
-    if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
-       c.id == m_client_id ||
-       m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
-      continue;
-    }
-    const std::string &client_id = c.id;
-    uint64_t object_set = 0;
-    if (!c.commit_position.object_positions.empty()) {
-      auto &position = *(c.commit_position.object_positions.begin());
-      object_set = position.object_number / m_splay_width;
-    }
-
-    if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
-      ldout(m_cct, 1) << __func__ << ": " << client_id
-                     << ": scheduling disconnect" << dendl;
+      if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+        ldout(m_cct, 1) << __func__ << ": " << client_id
+                        << ": scheduling disconnect" << dendl;
 
-      ctx = new FunctionContext([this, client_id, ctx](int r1) {
-          ldout(m_cct, 10) << __func__ << ": " << client_id
-                           << ": flagging disconnected" << dendl;
+        ctx = new FunctionContext([this, client_id, ctx](int r1) {
+            ldout(m_cct, 10) << __func__ << ": " << client_id
+                             << ": flagging disconnected" << dendl;
 
-          librados::ObjectWriteOperation op;
-          client::client_update_state(&op, client_id,
-                                      cls::journal::CLIENT_STATE_DISCONNECTED);
+            librados::ObjectWriteOperation op;
+            client::client_update_state(
+              &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
 
-          librados::AioCompletion *comp =
-              librados::Rados::aio_create_completion(ctx, nullptr,
-                                                     utils::rados_ctx_callback);
-          int r = m_ioctx.aio_operate(m_oid, comp, &op);
-          assert(r == 0);
-          comp->release();
-       });
+            auto comp = librados::Rados::aio_create_completion(
+              ctx, nullptr, utils::rados_ctx_callback);
+            int r = m_ioctx.aio_operate(m_oid, comp, &op);
+            assert(r == 0);
+            comp->release();
+          });
+      }
     }
   }
 
   if (ctx == on_finish) {
     ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
   }
-
-  return ctx;
+  ctx->complete(0);
 }
 
 std::ostream &operator<<(std::ostream &os,
index 011ab87108a7762bd40a35dd447f6e7d3b79e916..05e6bcc43ec45227ba6461cbb2d2e793a48fdbe7 100644 (file)
@@ -333,6 +333,7 @@ private:
   size_t m_update_notifications;
   Cond m_update_cond;
 
+  size_t m_ignore_watch_notifies = 0;
   size_t m_refreshes_in_progress = 0;
   Contexts m_refresh_ctxs;
 
@@ -341,6 +342,9 @@ private:
   Context *m_commit_position_ctx;
   Context *m_commit_position_task_ctx;
 
+  size_t m_flush_commits_in_progress = 0;
+  Contexts m_flush_commit_position_ctxs;
+
   AsyncOpTracker m_async_op_tracker;
 
   void handle_immutable_metadata(int r, Context *on_init);
@@ -358,7 +362,7 @@ private:
   void handle_watch_error(int err);
   void handle_notified(int r);
 
-  Context *schedule_laggy_clients_disconnect(Context *on_finish);
+  void schedule_laggy_clients_disconnect(Context *on_finish);
 
   friend std::ostream &operator<<(std::ostream &os,
                                  const JournalMetadata &journal_metadata);