]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: track simple maintenance ops to coordinate replay
authorJason Dillaman <dillaman@redhat.com>
Tue, 22 Dec 2015 15:18:33 +0000 (10:18 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 15 Jan 2016 15:40:29 +0000 (10:40 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/journal/Replay.cc
src/librbd/journal/Replay.h

index b30a34a5ef5d5d9b9fc1e7a88728d3a90fdc22d8..745c04df643949bfadd990a4b5e5957d58c5d8c3 100644 (file)
@@ -579,8 +579,9 @@ template <typename I>
 void Journal<I>::handle_replay_complete(int r) {
   CephContext *cct = m_image_ctx.cct;
 
-  Mutex::Locker locker(m_lock);
+  m_lock.Lock();
   if (m_state != STATE_REPLAYING) {
+    m_lock.Unlock();
     return;
   }
 
@@ -588,11 +589,13 @@ void Journal<I>::handle_replay_complete(int r) {
   m_journaler->stop_replay();
   if (r < 0) {
     transition_state(STATE_FLUSHING_RESTART, r);
+    m_lock.Unlock();
 
     m_journal_replay->flush(create_context_callback<
       Journal<I>, &Journal<I>::handle_flushing_restart>(this));
   } else {
     transition_state(STATE_FLUSHING_REPLAY, 0);
+    m_lock.Unlock();
 
     m_journal_replay->flush(create_context_callback<
       Journal<I>, &Journal<I>::handle_flushing_replay>(this));
index 95a412f519d7f128166380888fa2de1510ea7986..bd645e09e421cf445487c1cedbc6f2d17af76a81 100644 (file)
@@ -38,7 +38,7 @@ Replay<I>::~Replay() {
   assert(m_in_flight_aio == 0);
   assert(m_aio_modify_unsafe_contexts.empty());
   assert(m_aio_modify_safe_contexts.empty());
-  assert(m_op_contexts.empty());
+  assert(m_op_events.empty());
 }
 
 template <typename I>
@@ -56,7 +56,6 @@ void Replay<I>::process(bufferlist::iterator *it, Context *on_ready,
     return;
   }
 
-  Mutex::Locker locker(m_lock);
   RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
   boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
                        event_entry.event);
@@ -67,22 +66,35 @@ void Replay<I>::flush(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
+  OpTids cancel_op_tids;
   on_finish = util::create_async_context_callback(
     m_image_ctx, on_finish);
+
   {
+    RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
     Mutex::Locker locker(m_lock);
-    assert(m_flush_ctx == nullptr);
-    m_flush_ctx = on_finish;
 
+    // safely commit any remaining AIO modify operations
     if (m_in_flight_aio != 0) {
       flush_aio();
     }
 
-    if (!m_op_contexts.empty() || m_in_flight_aio != 0) {
-      return;
+    for (auto &op_event_pair : m_op_events) {
+      cancel_op_tids.push_back(op_event_pair.first);
+    }
+
+    assert(m_flush_ctx == nullptr);
+    if (!m_op_events.empty() || m_in_flight_aio != 0) {
+      std::swap(m_flush_ctx, on_finish);
     }
   }
-  on_finish->complete(0);
+
+  for (auto op_tid : cancel_op_tids) {
+    handle_op_complete(op_tid, -ERESTART);
+  }
+  if (on_finish != nullptr) {
+    on_finish->complete(0);
+  }
 }
 
 template <typename I>
@@ -97,6 +109,7 @@ void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
   AioImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
                                   event.length);
   if (flush_required) {
+    Mutex::Locker locker(m_lock);
     flush_aio();
   }
 }
@@ -114,6 +127,7 @@ void Replay<I>::handle_event(const journal::AioWriteEvent &event,
   AioImageRequest<I>::aio_write(&m_image_ctx, aio_comp, event.offset,
                                 event.length, data.c_str(), 0);
   if (flush_required) {
+    Mutex::Locker locker(m_lock);
     flush_aio();
   }
 }
@@ -124,7 +138,11 @@ void Replay<I>::handle_event(const journal::AioFlushEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
 
-  AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe);
+  AioCompletion *aio_comp;
+  {
+    Mutex::Locker locker(m_lock);
+    aio_comp = create_aio_flush_completion(on_ready, on_safe);
+  }
   AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
 }
 
@@ -132,7 +150,40 @@ template <typename I>
 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
                              Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
+  ldout(cct, 20) << this << " " << __func__ << ": Op finish event: "
+                 << "op_tid=" << event.op_tid << dendl;
+
+  Context *on_op_finish_event = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    auto op_it = m_op_events.find(event.op_tid);
+    if (op_it == m_op_events.end()) {
+      ldout(cct, 10) << "unable to locate associated op: assuming previously "
+                     << "committed." << dendl;
+      on_ready->complete(0);
+      m_image_ctx.op_work_queue->queue(on_safe, 0);
+      return;
+    }
+
+    OpEvent &op_event = op_it->second;
+    assert(op_event.on_finish_safe == nullptr);
+    op_event.on_finish_ready = on_ready;
+    op_event.on_finish_safe = on_safe;
+    std::swap(on_op_finish_event, op_event.on_op_finish_event);
+  }
+
+  if (event.r < 0) {
+    // TODO handle snap create / resize
+
+    // journal recorded failure of op -- no-op the operation
+    delete on_op_finish_event;
+    handle_op_complete(event.op_tid, 0);
+    return;
+  }
+
+  // apply the op now -- each op is responsible for filtering the
+  // recorded result to know if the op completed successfully
+  on_op_finish_event->complete(0);
 }
 
 template <typename I>
@@ -142,8 +193,11 @@ void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
   ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
 
   // TODO not-ready until state machine lets us know
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+
+  m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_op_complete);
 }
 
 template <typename I>
@@ -152,8 +206,16 @@ void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->snap_remove(event.snap_name.c_str(),
+                                          on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -163,9 +225,17 @@ void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(),
-                                      on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->snap_rename(event.snap_id,
+                                          event.snap_name.c_str(),
+                                          on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -175,8 +245,16 @@ void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->snap_protect(event.snap_name.c_str(),
+                                           on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -187,8 +265,16 @@ void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
   ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
                  << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(),
+                                             on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -199,9 +285,17 @@ void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
   ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
                  << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
-                                        no_op_progress_callback, on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
+                                            no_op_progress_callback,
+                                            on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -211,8 +305,15 @@ void Replay<I>::handle_event(const journal::RenameEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->rename(event.image_name.c_str(), on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->rename(event.image_name.c_str(), on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -223,9 +324,12 @@ void Replay<I>::handle_event(const journal::ResizeEvent &event,
   ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
 
   // TODO not-ready until state machine lets us know
-  Context *on_finish = create_op_context_callback(on_safe);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+
   m_image_ctx.operations->resize(event.size, no_op_progress_callback,
-                                 on_finish);
+                                 on_op_complete);
 }
 
 template <typename I>
@@ -234,8 +338,15 @@ void Replay<I>::handle_event(const journal::FlattenEvent &event,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
 
-  Context *on_finish = create_op_context_callback(on_safe);
-  m_image_ctx.operations->flatten(no_op_progress_callback, on_finish);
+  Mutex::Locker locker(m_lock);
+  OpEvent *op_event;
+  Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+                                                       &op_event);
+  op_event->on_op_finish_event = new FunctionContext(
+    [this, event, on_op_complete](int r) {
+      m_image_ctx.operations->flatten(no_op_progress_callback, on_op_complete);
+    });
+
   on_ready->complete(0);
 }
 
@@ -291,7 +402,7 @@ void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
     m_in_flight_aio -= on_safe_ctxs.size();
 
     std::swap(on_aio_ready, m_on_aio_ready);
-    if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+    if (m_op_events.empty() && m_in_flight_aio == 0) {
       on_flush = m_flush_ctx;
     }
 
@@ -321,34 +432,53 @@ void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
 }
 
 template <typename I>
-Context *Replay<I>::create_op_context_callback(Context *on_safe) {
+Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
+                                               Context *on_safe,
+                                               OpEvent **op_event) {
   assert(m_lock.is_locked());
 
-  C_OpOnFinish *on_finish;
-  {
-    on_finish = new C_OpOnFinish(this);
-    m_op_contexts[on_finish] = on_safe;
-  }
-  return on_finish;
+  *op_event = &m_op_events[op_tid];
+  (*op_event)->on_start_safe = on_safe;
+  return new C_OpOnComplete(this, op_tid);
 }
 
 template <typename I>
-void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
-  Context *on_safe = nullptr;
+void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
+                 << "r=" << r << dendl;
+
+  OpEvent op_event;
   Context *on_flush = nullptr;
   {
     Mutex::Locker locker(m_lock);
-    auto it = m_op_contexts.find(on_op_finish);
-    assert(it != m_op_contexts.end());
+    auto op_it = m_op_events.find(op_tid);
+    assert(op_it != m_op_events.end());
+
+    op_event = std::move(op_it->second);
+    m_op_events.erase(op_it);
 
-    on_safe = it->second;
-    m_op_contexts.erase(it);
-    if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+    // TODO handle paused snap create / resize
+
+    if (m_op_events.empty() && m_in_flight_aio == 0) {
       on_flush = m_flush_ctx;
     }
   }
 
-  on_safe->complete(r);
+  assert((op_event.on_finish_ready != nullptr &&
+          op_event.on_finish_safe != nullptr) || r == -ERESTART);
+
+  // skipped upon error -- so clean up if non-null
+  delete op_event.on_op_finish_event;
+
+  if (op_event.on_finish_ready != nullptr) {
+    op_event.on_finish_ready->complete(0);
+  }
+
+  op_event.on_start_safe->complete(r);
+  if (op_event.on_finish_safe != nullptr) {
+    op_event.on_finish_safe->complete(r);
+  }
   if (on_flush != nullptr) {
     on_flush->complete(0);
   }
@@ -358,8 +488,8 @@ template <typename I>
 AioCompletion *Replay<I>::create_aio_modify_completion(Context *on_ready,
                                                        Context *on_safe,
                                                        bool *flush_required) {
+  Mutex::Locker locker(m_lock);
   CephContext *cct = m_image_ctx.cct;
-  assert(m_lock.is_locked());
   assert(m_on_aio_ready == nullptr);
 
   ++m_in_flight_aio;
@@ -400,6 +530,8 @@ AioCompletion *Replay<I>::create_aio_modify_completion(Context *on_ready,
 template <typename I>
 AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_ready,
                                                       Context *on_safe) {
+  assert(m_lock.is_locked());
+
   // associate all prior write/discard ops to this flush request
   AioCompletion *aio_comp = AioCompletion::create<Context>(
       new C_AioFlushComplete(this, on_safe,
index c71157da04dc0e0d2d593f236846a3483b838ec9..19956c9178edc384e586494ca9ad2c2ea909ddde 100644 (file)
@@ -7,13 +7,14 @@
 #include "include/int_types.h"
 #include "include/buffer_fwd.h"
 #include "include/Context.h"
-#include "include/unordered_set.h"
-#include "include/unordered_map.h"
 #include "include/rbd/librbd.hpp"
 #include "common/Mutex.h"
 #include "librbd/journal/Entries.h"
 #include <boost/variant.hpp>
 #include <list>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
 
 namespace librbd {
 
@@ -36,16 +37,26 @@ public:
   void flush(Context *on_finish);
 
 private:
+  struct OpEvent {
+    Context *on_op_finish_event = nullptr;
+    Context *on_start_safe = nullptr;
+    Context *on_finish_ready = nullptr;
+    Context *on_finish_safe = nullptr;
+  };
+
+  typedef std::list<uint64_t> OpTids;
   typedef std::list<Context *> Contexts;
-  typedef ceph::unordered_set<Context *> ContextSet;
-  typedef ceph::unordered_map<Context *, Context *> OpContexts;
+  typedef std::unordered_set<Context *> ContextSet;
+  typedef std::unordered_map<uint64_t, OpEvent> OpEvents;
 
-  struct C_OpOnFinish : public Context {
+  struct C_OpOnComplete : public Context {
     Replay *replay;
-    C_OpOnFinish(Replay *replay) : replay(replay) {
+    uint64_t op_tid;
+    C_OpOnComplete(Replay *replay, uint64_t op_tid)
+      : replay(replay), op_tid(op_tid) {
     }
     virtual void finish(int r) override {
-      replay->handle_op_context_callback(this, r);
+      replay->handle_op_complete(op_tid, r);
     }
   };
 
@@ -97,7 +108,7 @@ private:
   Contexts m_aio_modify_unsafe_contexts;
   ContextSet m_aio_modify_safe_contexts;
 
-  OpContexts m_op_contexts;
+  OpEvents m_op_events;
 
   Context *m_flush_ctx = nullptr;
   Context *m_on_aio_ready = nullptr;
@@ -136,8 +147,9 @@ private:
   void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
                                  int r);
 
-  Context *create_op_context_callback(Context *on_safe);
-  void handle_op_context_callback(Context *on_op_finish, int r);
+  Context *create_op_context_callback(uint64_t op_tid, Context *on_safe,
+                                      OpEvent **op_event);
+  void handle_op_complete(uint64_t op_tid, int r);
 
   AioCompletion *create_aio_modify_completion(Context *on_ready,
                                               Context *on_safe,