]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librdb: initial interface with journal library
authorJason Dillaman <dillaman@redhat.com>
Fri, 10 Jul 2015 01:25:33 +0000 (21:25 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 13 Nov 2015 04:26:25 +0000 (23:26 -0500)
Rough draft of journal library integration within librbd. Non-cached
IO paths are now recorded to the journal.  Incoming IO ops are blocked
if the exclusive lock isn't held or if the journal hasn't been replayed.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
18 files changed:
src/librbd/AioCompletion.cc
src/librbd/AioCompletion.h
src/librbd/AioImageRequest.cc
src/librbd/AioImageRequest.h
src/librbd/AioImageRequestWQ.cc
src/librbd/AioImageRequestWQ.h
src/librbd/CopyupRequest.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/Journal.cc [new file with mode: 0644]
src/librbd/Journal.h [new file with mode: 0644]
src/librbd/Makefile.am
src/librbd/ObjectMap.cc
src/librbd/ObjectMap.h
src/librbd/internal.cc
src/test/Makefile-client.am

index f3822379f096f1357fc8b41a978d3d8d3781ff06..982a03fc7f7982940d705e681dc1154ea7882a59 100644 (file)
@@ -6,9 +6,11 @@
 #include "common/ceph_context.h"
 #include "common/dout.h"
 #include "common/errno.h"
+#include "common/perf_counters.h"
 #include "common/WorkQueue.h"
 
 #include "librbd/AioObjectRequest.h"
+#include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
 
 #include "librbd/AioCompletion.h"
@@ -99,6 +101,21 @@ namespace librbd {
     tracepoint(librbd, aio_complete_exit);
   }
 
+  void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
+    if (ictx == NULL) {
+      ictx = i;
+      aio_type = t;
+      start_time = ceph_clock_now(ictx->cct);
+    }
+  }
+
+  void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
+    init_time(i, t);
+    if (!async_op.started()) {
+      async_op.start_op(*ictx);
+    }
+  }
+
   void AioCompletion::fail(CephContext *cct, int r)
   {
     lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
index ba24c300bb1c6039e59287248e224e2c98859520..94ed682dd2e8bbea17e27c83cd2171ad2d18baab 100644 (file)
@@ -5,18 +5,16 @@
 
 #include "common/Cond.h"
 #include "common/Mutex.h"
-#include "common/ceph_context.h"
-#include "common/perf_counters.h"
 #include "include/Context.h"
 #include "include/utime.h"
 #include "include/rbd/librbd.hpp"
 
 #include "librbd/AsyncOperation.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/internal.h"
 
 #include "osdc/Striper.h"
 
+class CephContext;
+
 namespace librbd {
 
   class AioObjectRead;
@@ -88,20 +86,8 @@ namespace librbd {
 
     void finish_adding_requests(CephContext *cct);
 
-    void init_time(ImageCtx *i, aio_type_t t) {
-      if (ictx == NULL) {
-        ictx = i;
-        aio_type = t;
-        start_time = ceph_clock_now(ictx->cct);
-      }
-    }
-    void start_op(ImageCtx *i, aio_type_t t) {
-      init_time(i, t);
-      if (!async_op.started()) {
-        async_op.start_op(*ictx);
-      }
-    }
-
+    void init_time(ImageCtx *i, aio_type_t t);
+    void start_op(ImageCtx *i, aio_type_t t);
     void fail(CephContext *cct, int r);
 
     void complete(CephContext *cct);
index 56fbc453ef31b99df9aa738f6280c600af2f5d8a..dc6d87f24fd5ea8b4522d28cc44472734277433c 100644 (file)
@@ -7,6 +7,8 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageWatcher.h"
 #include "librbd/internal.h"
+#include "librbd/Journal.h"
+#include "librbd/JournalTypes.h"
 #include "include/rados/librados.hpp"
 #include "osdc/Striper.h"
 
@@ -147,7 +149,11 @@ void AbstractAioImageWrite::send_request() {
 
   RWLock::RLocker md_locker(m_image_ctx.md_lock);
 
+  bool journaling = false;
+  uint64_t journal_tid = 0;
+
   uint64_t clip_len = m_len;
+  ObjectExtents object_extents;
   ::SnapContext snapc;
   {
     // prevent image size from changing between computing clip and recording
@@ -165,20 +171,33 @@ void AbstractAioImageWrite::send_request() {
     }
 
     snapc = m_image_ctx.snapc;
-    m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_WRITE); // TODO use correct enum
+    m_aio_comp->start_op(&m_image_ctx, get_aio_type());
+
+    // map to object extents
+    if (clip_len > 0) {
+      Striper::file_to_extents(cct, m_image_ctx.format_string,
+                               &m_image_ctx.layout, m_off, clip_len, 0,
+                               object_extents);
+    }
+
+    journaling = (m_image_ctx.journal != NULL);
   }
 
   assert(!m_image_ctx.image_watcher->is_lock_supported() ||
           m_image_ctx.image_watcher->is_lock_owner());
 
-  // map to object extents
-  ObjectExtents extents;
-  if (clip_len > 0) {
-    Striper::file_to_extents(cct, m_image_ctx.format_string,
-                             &m_image_ctx.layout, m_off, clip_len, 0, extents);
+  AioObjectRequests requests;
+  send_object_requests(object_extents, snapc, (journaling ? &requests : NULL));
+
+  if (journaling) {
+    // in-flight ops are flushed prior to closing the journal
+    assert(m_image_ctx.journal != NULL);
+    journal_tid = append_journal_event(requests, m_synchronous);
   }
 
-  send_object_requests(extents, snapc);
+  if (m_image_ctx.object_cacher != NULL) {
+    send_cache_requests(object_extents, snapc, journal_tid);
+  }
   update_stats(clip_len);
 
   m_aio_comp->finish_adding_requests(cct);
@@ -186,86 +205,131 @@ void AbstractAioImageWrite::send_request() {
 }
 
 void AbstractAioImageWrite::send_object_requests(
-    const ObjectExtents &object_extents, const ::SnapContext &snapc) {
+    const ObjectExtents &object_extents, const ::SnapContext &snapc,
+    AioObjectRequests *aio_object_requests) {
   CephContext *cct = m_image_ctx.cct;
+
   for (ObjectExtents::const_iterator p = object_extents.begin();
        p != object_extents.end(); ++p) {
     ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
                    << " from " << p->buffer_extents << dendl;
-    send_object_request(*p, snapc);
+    C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+    AioObjectRequest *request = send_object_request(*p, snapc, req_comp);
+
+    // if journaling, stash the request for later; otherwise send
+    if (request != NULL) {
+      if (aio_object_requests != NULL) {
+        aio_object_requests->push_back(request);
+      } else {
+        request->send();
+      }
+    }
   }
 }
 
-void AioImageWrite::send_object_request(const ObjectExtent &object_extent,
-                                        const ::SnapContext &snapc) {
-  CephContext *cct = m_image_ctx.cct;
-
-  // assemble extent
-  bufferlist bl;
+void AioImageWrite::assemble_extent(const ObjectExtent &object_extent,
+                                    bufferlist *bl) {
   for (Extents::const_iterator q = object_extent.buffer_extents.begin();
        q != object_extent.buffer_extents.end(); ++q) {
-    bl.append(m_buf + q->first, q->second);
+    bl->append(m_buf + q->first, q->second);;
   }
+}
 
-  C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
-  if (m_image_ctx.object_cacher) {
+uint64_t AioImageWrite::append_journal_event(
+    const AioObjectRequests &requests, bool synchronous) {
+  bufferlist bl;
+  bl.append(m_buf, m_len);
+
+  journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
+  return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
+                                           synchronous);
+}
+
+void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
+                                        const ::SnapContext &snapc,
+                                        uint64_t journal_tid) {
+  CephContext *cct = m_image_ctx.cct;
+
+  for (ObjectExtents::const_iterator p = object_extents.begin();
+       p != object_extents.end(); ++p) {
+    const ObjectExtent &object_extent = *p;
+
+    bufferlist bl;
+    assemble_extent(object_extent, &bl);
+
+    // TODO pass journal_tid to object cacher
+    C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
     m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
                                object_extent.offset, req_comp, m_op_flags);
-  } else {
-    AioObjectWrite *req = new AioObjectWrite(&m_image_ctx,
-                                             object_extent.oid.name,
-                                             object_extent.objectno,
-                                             object_extent.offset, bl,
-                                             snapc, req_comp);
-
-    req->set_op_flags(m_op_flags);
-    req->send();
   }
 }
 
+AioObjectRequest *AioImageWrite::send_object_request(
+    const ObjectExtent &object_extent, const ::SnapContext &snapc,
+    Context *on_finish) {
+  if (m_image_ctx.object_cacher != NULL) {
+    return NULL;
+  }
+
+  bufferlist bl;
+  assemble_extent(object_extent, &bl);
+  AioObjectWrite *req = new AioObjectWrite(&m_image_ctx,
+                                           object_extent.oid.name,
+                                           object_extent.objectno,
+                                           object_extent.offset, bl,
+                                           snapc, on_finish);
+  req->set_op_flags(m_op_flags);
+  return req;
+}
 
 void AioImageWrite::update_stats(size_t length) {
   m_image_ctx.perfcounter->inc(l_librbd_wr);
   m_image_ctx.perfcounter->inc(l_librbd_wr_bytes, length);
 }
 
-void AioImageDiscard::send_object_requests(const ObjectExtents &object_extents,
-                                           const ::SnapContext &snapc) {
-  // discard from the cache first to ensure writeback won't recreate
-  if (m_image_ctx.object_cacher != NULL) {
-    Mutex::Locker cache_locker(m_image_ctx.cache_lock);
-    m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
-                                           object_extents);
-  }
+uint64_t AioImageDiscard::append_journal_event(
+    const AioObjectRequests &requests, bool synchronous) {
+  journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
+  return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
+                                           synchronous);
+}
+
+void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
+                                          const ::SnapContext &snapc,
+                                          uint64_t journal_tid) {
+  // TODO need to have cache flag pending discard for writeback or need
+  // to delay cache update until after journal commits
+  Mutex::Locker cache_locker(m_image_ctx.cache_lock);
 
-  AbstractAioImageWrite::send_object_requests(object_extents, snapc);
+  // TODO pass journal_tid to object cacher
+  m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
+                                         object_extents);
 }
 
-void AioImageDiscard::send_object_request(const ObjectExtent &object_extent,
-                                          const ::SnapContext &snapc) {
+AioObjectRequest *AioImageDiscard::send_object_request(
+    const ObjectExtent &object_extent, const ::SnapContext &snapc,
+    Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
 
-  C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
-
   AioObjectRequest *req;
   if (object_extent.length == m_image_ctx.layout.fl_object_size) {
     req = new AioObjectRemove(&m_image_ctx, object_extent.oid.name,
-                              object_extent.objectno, snapc, req_comp);
+                              object_extent.objectno, snapc, on_finish);
   } else if (object_extent.offset + object_extent.length ==
                m_image_ctx.layout.fl_object_size) {
     req = new AioObjectTruncate(&m_image_ctx, object_extent.oid.name,
                                 object_extent.objectno, object_extent.offset,
-                                snapc, req_comp);
+                                snapc, on_finish);
   } else {
     if(cct->_conf->rbd_skip_partial_discard) {
-      delete req_comp;
-      return;
+      delete on_finish;
+      return NULL;
     }
     req = new AioObjectZero(&m_image_ctx, object_extent.oid.name,
                             object_extent.objectno, object_extent.offset,
-                            object_extent.length, snapc, req_comp);
+                            object_extent.length, snapc, on_finish);
   }
-  req->send();
+  return req;
 }
 
 void AioImageDiscard::update_stats(size_t length) {
@@ -276,6 +340,16 @@ void AioImageDiscard::update_stats(size_t length) {
 void AioImageFlush::send_request() {
   CephContext *cct = m_image_ctx.cct;
 
+  {
+    // journal the flush event
+    RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+    if (m_image_ctx.journal != NULL) {
+      m_image_ctx.journal->append_event(
+        m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
+        AioObjectRequests(), true);
+    }
+  }
+
   // TODO race condition between registering op and submitting to cache
   //      (might not be flushed -- backport needed)
   C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp);
index db65839b5dd6cbd12f3f5364dec15532e824307a..ab8053a03f2937462a6c5bafa384499cc3cabfa5 100644 (file)
@@ -8,12 +8,14 @@
 #include "include/buffer.h"
 #include "common/snap_types.h"
 #include "osd/osd_types.h"
+#include "librbd/AioCompletion.h"
+#include <list>
 #include <utility>
 #include <vector>
 
 namespace librbd {
 
-class AioCompletion;
+class AioObjectRequest;
 class ImageCtx;
 
 class AioImageRequest {
@@ -40,6 +42,8 @@ public:
   void send();
 
 protected:
+  typedef std::list<AioObjectRequest *> AioObjectRequests;
+
   ImageCtx &m_image_ctx;
   AioCompletion *m_aio_comp;
 
@@ -84,25 +88,43 @@ public:
     return true;
   }
 
+  inline void flag_synchronous() {
+    m_synchronous = true;
+  }
+
 protected:
   typedef std::vector<ObjectExtent> ObjectExtents;
 
+  const uint64_t m_off;
+  const size_t m_len;
+
   AbstractAioImageWrite(ImageCtx &image_ctx, AioCompletion *aio_comp,
                         uint64_t off, size_t len)
-    : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len) {
+    : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len),
+      m_synchronous(false) {
   }
 
+  virtual aio_type_t get_aio_type() const = 0;
+
   virtual void send_request();
 
-  virtual void send_object_requests(const ObjectExtents &object_extents,
-                                    const ::SnapContext &snapc);
-  virtual void send_object_request(const ObjectExtent &object_extent,
-                                   const ::SnapContext &snapc) = 0;
+  virtual void send_cache_requests(const ObjectExtents &object_extents,
+                                   const ::SnapContext &snapc,
+                                   uint64_t journal_tid) = 0;
+
+  void send_object_requests(const ObjectExtents &object_extents,
+                            const ::SnapContext &snapc,
+                            AioObjectRequests *aio_object_requests);
+  virtual AioObjectRequest *send_object_request(
+      const ObjectExtent &object_extent, const ::SnapContext &snapc,
+      Context *on_finish) = 0;
+
+  virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+                                        bool synchronous) = 0;
   virtual void update_stats(size_t length) = 0;
 
 private:
-  uint64_t m_off;
-  size_t m_len;
+  bool m_synchronous;
 };
 
 class AioImageWrite : public AbstractAioImageWrite {
@@ -114,12 +136,25 @@ public:
   }
 
 protected:
+  virtual aio_type_t get_aio_type() const {
+    return AIO_TYPE_WRITE;
+  }
   virtual const char *get_request_type() const {
     return "aio_write";
   }
 
-  virtual void send_object_request(const ObjectExtent &object_extent,
-                                   const ::SnapContext &snapc);
+  void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
+
+  virtual void send_cache_requests(const ObjectExtents &object_extents,
+                                   const ::SnapContext &snapc,
+                                   uint64_t journal_tid);
+
+  virtual AioObjectRequest *send_object_request(
+      const ObjectExtent &object_extent, const ::SnapContext &snapc,
+      Context *on_finish);
+
+  virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+                                        bool synchronous);
   virtual void update_stats(size_t length);
 private:
   const char *m_buf;
@@ -134,14 +169,23 @@ public:
   }
 
 protected:
+  virtual aio_type_t get_aio_type() const {
+    return AIO_TYPE_DISCARD;
+  }
   virtual const char *get_request_type() const {
     return "aio_discard";
   }
 
-  virtual void send_object_requests(const ObjectExtents &object_extents,
-                                    const ::SnapContext &snapc);
-  virtual void send_object_request(const ObjectExtent &object_extent,
-                                   const ::SnapContext &snapc);
+  virtual void send_cache_requests(const ObjectExtents &object_extents,
+                                   const ::SnapContext &snapc,
+                                   uint64_t journal_tid);
+
+  virtual AioObjectRequest *send_object_request(
+      const ObjectExtent &object_extent, const ::SnapContext &snapc,
+      Context *on_finish);
+
+  virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+                                        bool synchronous);
   virtual void update_stats(size_t length);
 };
 
index 6d5fffde5a4ea8729799685f44a06c1b7001ad4c..6b58b0b728b0f6739973cddd0e5a3a2090a601d2 100644 (file)
@@ -5,7 +5,6 @@
 #include "librbd/AioCompletion.h"
 #include "librbd/AioImageRequest.h"
 #include "librbd/ImageCtx.h"
-#include "librbd/ImageWatcher.h"
 #include "librbd/internal.h"
 
 #define dout_subsys ceph_subsys_rbd
 
 namespace librbd {
 
+AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
+                                     time_t ti, ThreadPool *tp)
+  : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
+    m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
+    m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
+    m_lock_listener(this), m_blocking_writes(false) {
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
+}
+
 ssize_t AioImageRequestWQ::read(uint64_t off, size_t len, char *buf,
                                 int op_flags) {
   CephContext *cct = m_image_ctx.cct;
@@ -86,8 +96,7 @@ void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, size_t len,
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_image_ctx.non_blocking_aio) {
-    queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags),
-          false);
+    queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
   } else {
     AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
   }
@@ -102,10 +111,9 @@ void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, size_t len,
                  << "len=" << len << ", flags=" << op_flags << dendl;
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-  bool lock_required = is_lock_required();
-  if (m_image_ctx.non_blocking_aio || lock_required) {
-    queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags),
-          lock_required);
+  if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+      writes_blocked()) {
+    queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
   } else {
     AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
   }
@@ -120,10 +128,9 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
                  << dendl;
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-  bool lock_required = is_lock_required();
-  if (m_image_ctx.non_blocking_aio || lock_required) {
-    queue(new AioImageDiscard(m_image_ctx, c, off, len),
-          lock_required);
+  if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+      writes_blocked()) {
+    queue(new AioImageDiscard(m_image_ctx, c, off, len));
   } else {
     AioImageRequest::aio_discard(&m_image_ctx, c, off, len);
   }
@@ -136,33 +143,51 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c) {
                  << "completion=" << c << dendl;
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-  if (m_image_ctx.non_blocking_aio || !writes_empty()) {
-    queue(new AioImageFlush(m_image_ctx, c), false);
+  if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+      writes_blocked() || !writes_empty()) {
+    queue(new AioImageFlush(m_image_ctx, c));
   } else {
     AioImageRequest::aio_flush(&m_image_ctx, c);
   }
 }
 
-void AioImageRequestWQ::suspend_writes() {
+void AioImageRequestWQ::block_writes() {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
 
   Mutex::Locker locker(m_lock);
-  m_writes_suspended = true;
-  while (m_in_progress_writes > 0) {
-    m_cond.Wait(m_lock);
+  ++m_write_blockers;
+  ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
+                << "num=" << m_write_blockers << dendl;
+  if (m_write_blockers == 1) {
+    while (m_in_progress_writes > 0) {
+      m_cond.Wait(m_lock);
+    }
   }
 }
 
-void AioImageRequestWQ::resume_writes() {
+void AioImageRequestWQ::unblock_writes() {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
 
+  bool wake_up = false;
   {
     Mutex::Locker locker(m_lock);
-    m_writes_suspended = false;
+    assert(m_write_blockers > 0);
+    --m_write_blockers;
+
+    ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
+                  << "num=" << m_write_blockers << dendl;
+    if (m_write_blockers == 0) {
+      wake_up = true;
+    }
   }
-  signal();
+
+  if (wake_up) {
+    signal();
+  }
+}
+
+void AioImageRequestWQ::register_lock_listener() {
+  m_image_ctx.image_watcher->register_listener(&m_lock_listener);
 }
 
 void *AioImageRequestWQ::_void_dequeue() {
@@ -174,7 +199,7 @@ void *AioImageRequestWQ::_void_dequeue() {
   {
     if (peek_item->is_write_op()) {
       Mutex::Locker locker(m_lock);
-      if (m_writes_suspended) {
+      if (m_write_blockers > 0) {
         return NULL;
       }
       ++m_in_progress_writes;
@@ -201,9 +226,7 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
     Mutex::Locker locker(m_lock);
     if (req->is_write_op()) {
       assert(m_queued_writes > 0);
-      if (--m_queued_writes == 0) {
-        m_image_ctx.image_watcher->clear_aio_ops_pending();
-      }
+      --m_queued_writes;
 
       assert(m_in_progress_writes > 0);
       if (--m_in_progress_writes == 0) {
@@ -214,19 +237,25 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
   delete req;
 }
 
-bool AioImageRequestWQ::is_lock_required() {
+bool AioImageRequestWQ::is_journal_required() const {
+  RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+  return (m_image_ctx.journal != NULL);
+}
+
+bool AioImageRequestWQ::is_lock_required() const {
   assert(m_image_ctx.owner_lock.is_locked());
   if (m_image_ctx.image_watcher == NULL) {
     return false;
   }
+
   return (m_image_ctx.image_watcher->is_lock_supported() &&
           !m_image_ctx.image_watcher->is_lock_owner());
 }
 
-void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) {
+void AioImageRequestWQ::queue(AioImageRequest *req) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
-                 << "req=" << req << ", lock_req=" << lock_required << dendl;
+                 << "req=" << req << dendl;
 
   assert(m_image_ctx.owner_lock.is_locked());
 
@@ -241,9 +270,38 @@ void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) {
   }
   ThreadPool::PointerWQ<AioImageRequest>::queue(req);
 
-  if (first_write_op) {
-    m_image_ctx.image_watcher->flag_aio_ops_pending();
-    if (lock_required) {
+  if (is_lock_required() && first_write_op) {
+    m_image_ctx.image_watcher->request_lock();
+  }
+}
+
+void AioImageRequestWQ::handle_releasing_lock() {
+  assert(m_image_ctx.owner_lock.is_locked());
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << dendl;
+
+  if (!m_blocking_writes) {
+    m_blocking_writes = true;
+    block_writes();
+  }
+}
+
+void AioImageRequestWQ::handle_lock_updated(bool lock_supported,
+                                            bool lock_owner) {
+  assert(m_image_ctx.owner_lock.is_locked());
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
+                 << "lock_support=" << lock_supported << ", "
+                 << "owner=" << lock_owner << dendl;
+
+  if ((!lock_supported || lock_owner) && m_blocking_writes) {
+    m_blocking_writes = false;
+    unblock_writes();
+  } else if (lock_supported && !lock_owner) {
+    assert(writes_blocked());
+    if (!writes_empty()) {
       m_image_ctx.image_watcher->request_lock();
     }
   }
index c57715fc2393eeae5cf975dd62bf5cba8f1d1ba7..75ec889b53133b9812113c925858e9fda88c56ff 100644 (file)
@@ -4,8 +4,10 @@
 #ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
 #define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
 
+#include "include/Context.h"
 #include "common/WorkQueue.h"
 #include "common/Mutex.h"
+#include "librbd/ImageWatcher.h"
 
 namespace librbd {
 
@@ -16,11 +18,7 @@ class ImageCtx;
 class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
 public:
   AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti,
-                    ThreadPool *tp)
-    : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
-      m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
-      m_writes_suspended(false), m_in_progress_writes(0), m_queued_writes(0) {
-  }
+                    ThreadPool *tp);
 
   ssize_t read(uint64_t off, size_t len, char *buf, int op_flags);
   ssize_t write(uint64_t off, size_t len, const char *buf, int op_flags);
@@ -40,28 +38,54 @@ public:
     return (m_queued_writes == 0);
   }
 
-  inline bool writes_suspended() const {
+  inline bool writes_blocked() const {
     Mutex::Locker locker(m_lock);
-    return m_writes_suspended;
+    return (m_write_blockers > 0);
   }
 
-  void suspend_writes();
-  void resume_writes();
+  void block_writes();
+  void unblock_writes();
+
+  void register_lock_listener();
 
 protected:
   virtual void *_void_dequeue();
   virtual void process(AioImageRequest *req);
 
 private:
+  struct LockListener : public ImageWatcher::Listener {
+    AioImageRequestWQ *aio_work_queue;
+    LockListener(AioImageRequestWQ *_aio_work_queue)
+      : aio_work_queue(_aio_work_queue) {
+    }
+
+    virtual bool handle_requested_lock() {
+      return true;
+    }
+    virtual void handle_releasing_lock() {
+      aio_work_queue->handle_releasing_lock();
+    }
+    virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
+      aio_work_queue->handle_lock_updated(lock_supported, lock_owner);
+    }
+  };
+
   ImageCtx &m_image_ctx;
   mutable Mutex m_lock;
   Cond m_cond;
-  bool m_writes_suspended;
+  uint32_t m_write_blockers;
   uint32_t m_in_progress_writes;
   uint32_t m_queued_writes;
 
-  bool is_lock_required();
-  void queue(AioImageRequest *req, bool lock_required);
+  LockListener m_lock_listener;
+  bool m_blocking_writes;
+
+  bool is_journal_required() const;
+  bool is_lock_required() const;
+  void queue(AioImageRequest *req);
+
+  void handle_releasing_lock();
+  void handle_lock_updated(bool lock_supported, bool lock_owner);
 };
 
 } // namespace librbd
index eff12ff645aa9d027606c1eb9c601c478ac0a00e..5c3973a2b97f53b2cfb571a9a54f46ed66dbef6a 100644 (file)
@@ -13,6 +13,7 @@
 #include "librbd/CopyupRequest.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageWatcher.h"
+#include "librbd/internal.h"
 #include "librbd/ObjectMap.h"
 
 #include <boost/bind.hpp>
index d93e149b24a827b8073d1049b9fc2161ff4590bd..eb815283ef0287939f77ce0a58ab78a7005a97f4 100644 (file)
@@ -17,6 +17,7 @@
 #include "librbd/internal.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageWatcher.h"
+#include "librbd/Journal.h"
 #include "librbd/LibrbdAdminSocketHook.h"
 #include "librbd/ObjectMap.h"
 
@@ -67,6 +68,7 @@ public:
       exclusive_locked(false),
       name(image_name),
       image_watcher(NULL),
+      journal(NULL),
       refresh_seq(0),
       last_refresh(0),
       owner_lock(unique_lock_name("librbd::ImageCtx::owner_lock", this)),
@@ -110,6 +112,7 @@ public:
   }
 
   ImageCtx::~ImageCtx() {
+    assert(journal == NULL);
     if (perfcounter) {
       perf_stop();
     }
@@ -755,6 +758,7 @@ public:
   int ImageCtx::register_watch() {
     assert(image_watcher == NULL);
     image_watcher = new ImageWatcher(*this);
+    aio_work_queue->register_lock_listener();
     return image_watcher->register_watch();
   }
 
@@ -938,4 +942,24 @@ public:
     ASSIGN_OPTION(request_timed_out_seconds);
     ASSIGN_OPTION(enable_alloc_hint);
   }
+
+  void ImageCtx::open_journal() {
+    assert(journal == NULL);
+    journal = new Journal(*this);
+  }
+
+  int ImageCtx::close_journal(bool force) {
+    assert(journal != NULL);
+    int r = journal->close();
+    if (r < 0) {
+      lderr(cct) << "failed to flush journal: " << cpp_strerror(r) << dendl;
+      if (!force) {
+        return r;
+      }
+    }
+
+    delete journal;
+    journal = NULL;
+    return r;
+  }
 }
index 9e21e45493c32946dc810255768b5216170ced13..2d41e01145078124957bba4ee61c16cb437238bc 100644 (file)
@@ -44,6 +44,7 @@ namespace librbd {
   class CopyupRequest;
   class LibrbdAdminSocketHook;
   class ImageWatcher;
+  class Journal;
 
   struct ImageCtx {
     CephContext *cct;
@@ -69,6 +70,7 @@ namespace librbd {
     std::string snap_name;
     IoCtx data_ctx, md_ctx;
     ImageWatcher *image_watcher;
+    Journal *journal;
     int refresh_seq;    ///< sequence for refresh requests
     int last_refresh;   ///< last completed refresh
 
@@ -243,6 +245,9 @@ namespace librbd {
 
     void cancel_async_requests();
     void apply_metadata_confs();
+
+    void open_journal();
+    int close_journal(bool force);
   };
 }
 
index 026432cd5f5951bdd85111f361d9b172da443194..69825a1df9141f9bed362072a49632cd6b059e28 100644 (file)
@@ -2,7 +2,6 @@
 // vim: ts=8 sw=2 smarttab
 #include "librbd/ImageWatcher.h"
 #include "librbd/AioCompletion.h"
-#include "librbd/AioImageRequestWQ.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
 #include "librbd/ObjectMap.h"
@@ -35,8 +34,10 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx),
     m_watch_lock(unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)),
     m_watch_ctx(*this), m_watch_handle(0),
-    m_watch_state(WATCH_STATE_UNREGISTERED), m_aio_ops_pending(false),
+    m_watch_state(WATCH_STATE_UNREGISTERED), m_lock_supported(false),
     m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
+    m_listeners_lock(unique_lock_name("librbd::ImageWatcher::m_listeners_lock", this)),
+    m_listeners_in_use(false),
     m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
     m_async_request_lock(unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
     m_owner_client_id_lock(unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
@@ -74,6 +75,20 @@ bool ImageWatcher::is_lock_owner() const {
           m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
 }
 
+void ImageWatcher::register_listener(Listener *listener) {
+  Mutex::Locker listeners_locker(m_listeners_lock);
+  m_listeners.push_back(listener);
+}
+
+void ImageWatcher::unregister_listener(Listener *listener) {
+  // TODO CoW listener list
+  Mutex::Locker listeners_locker(m_listeners_lock);
+  while (m_listeners_in_use) {
+    m_listeners_cond.Wait(m_listeners_lock);
+  }
+  m_listeners.remove(listener);
+}
+
 int ImageWatcher::register_watch() {
   ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;
 
@@ -110,19 +125,41 @@ int ImageWatcher::unregister_watch() {
   return r;
 }
 
-void ImageWatcher::refresh() {
+int ImageWatcher::refresh() {
   assert(m_image_ctx.owner_lock.is_locked());
 
-  if (is_lock_supported() && !is_lock_owner()) {
-    m_image_ctx.aio_work_queue->suspend_writes();
-  } else if (!is_lock_supported()) {
-    m_image_ctx.aio_work_queue->resume_writes();
+  bool lock_support_changed = false;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    if (m_lock_supported != is_lock_supported()) {
+      m_lock_supported = is_lock_supported();
+      lock_support_changed = true;
+    }
+  }
+
+  int r = 0;
+  if (lock_support_changed) {
+    if (is_lock_supported() && !is_lock_owner()) {
+      // image opened, exclusive lock dynamically enabled, or now HEAD
+      notify_listeners_releasing_lock();
+    } else if (!is_lock_supported() && is_lock_owner()) {
+      // exclusive lock dynamically disabled or now snapshot
+      m_image_ctx.owner_lock.put_read();
+      {
+        RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
+        r = release_lock();
+      }
+      m_image_ctx.owner_lock.get_read();
+    }
+    notify_listeners_updated_lock();
   }
+  return r;
 }
 
 int ImageWatcher::try_lock() {
   assert(m_image_ctx.owner_lock.is_wlocked());
   assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+  assert(is_lock_supported());
 
   while (true) {
     int r = lock();
@@ -188,10 +225,6 @@ int ImageWatcher::try_lock() {
 }
 
 void ImageWatcher::request_lock() {
-  {
-    RWLock::WLocker watch_locker(m_watch_lock);
-    m_aio_ops_pending = true;
-  }
   schedule_request_lock(false);
 }
 
@@ -275,6 +308,9 @@ int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie
 }
 
 int ImageWatcher::lock() {
+  assert(m_image_ctx.owner_lock.is_wlocked());
+  assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+
   int r = rados::cls::lock::lock(&m_image_ctx.md_ctx, m_image_ctx.header_oid,
                                 RBD_LOCK_NAME, LOCK_EXCLUSIVE,
                                 encode_lock_cookie(), WATCHER_LOCK_TAG, "",
@@ -302,39 +338,16 @@ int ImageWatcher::lock() {
     m_image_ctx.object_map.refresh(CEPH_NOSNAP);
   }
 
-  bufferlist bl;
-  ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
-
-  m_image_ctx.aio_work_queue->resume_writes();
-
   // send the notification when we aren't holding locks
   FunctionContext *ctx = new FunctionContext(
-    boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
-               bl, NOTIFY_TIMEOUT, reinterpret_cast<bufferlist *>(0)));
+    boost::bind(&ImageWatcher::notify_acquired_lock, this));
   m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx);
   return 0;
 }
 
-void ImageWatcher::prepare_unlock() {
-  assert(m_image_ctx.owner_lock.is_wlocked());
-  if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
-    m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
-  }
-}
-
-void ImageWatcher::cancel_unlock() {
-  assert(m_image_ctx.owner_lock.is_wlocked());
-  if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
-    m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
-  }
-}
-
 int ImageWatcher::unlock()
 {
   assert(m_image_ctx.owner_lock.is_wlocked());
-  if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
-    return 0;
-  }
 
   ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_NOT_LOCKED;
@@ -350,63 +363,77 @@ int ImageWatcher::unlock()
     m_image_ctx.object_map.unlock();
   }
 
-  if (is_lock_supported()) {
-    m_image_ctx.aio_work_queue->suspend_writes();
+  {
+    Mutex::Locker l(m_owner_client_id_lock);
+    set_owner_client_id(ClientId());
   }
 
-  Mutex::Locker l(m_owner_client_id_lock);
-  set_owner_client_id(ClientId());
-
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::notify_released_lock, this));
   m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx);
   return 0;
 }
 
-bool ImageWatcher::release_lock()
+int ImageWatcher::release_lock()
 {
   assert(m_image_ctx.owner_lock.is_wlocked());
-  ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock by request"
-                             << dendl;
-  if (!is_lock_owner()) {
-    return false;
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 10) << this << " releasing exclusive lock by request" << dendl;
+  if (m_lock_owner_state != LOCK_OWNER_STATE_LOCKED) {
+    return 0;
   }
-  prepare_unlock();
+
+  m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
   m_image_ctx.owner_lock.put_write();
 
+  // ensure all maint operations are canceled
   m_image_ctx.cancel_async_requests();
   m_image_ctx.flush_async_operations();
-  m_image_ctx.aio_work_queue->suspend_writes();
 
+  int r;
   {
     RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+
+    // alert listeners that all incoming IO needs to be stopped since the
+    // lock is being released
+    notify_listeners_releasing_lock();
+
     RWLock::WLocker md_locker(m_image_ctx.md_lock);
-    librbd::_flush(&m_image_ctx);
+    r = librbd::_flush(&m_image_ctx);
+    if (r < 0) {
+      lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
+      goto err_cancel_unlock;
+    }
   }
 
   m_image_ctx.owner_lock.get_write();
-  if (!is_lock_owner()) {
-    return false;
-  }
+  assert(m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
+  r = unlock();
 
-  unlock();
-  return true;
-}
+  // notify listeners of the change w/ owner read locked
+  m_image_ctx.owner_lock.put_write();
+  {
+    RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+    if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+      notify_listeners_updated_lock();
+    }
+  }
+  m_image_ctx.owner_lock.get_write();
 
-void ImageWatcher::flag_aio_ops_pending() {
-  RWLock::WLocker watch_locker(m_watch_lock);
-  if (!m_aio_ops_pending) {
-    ldout(m_image_ctx.cct, 20) << this << " pending AIO ops" << dendl;
-    m_aio_ops_pending = true;
+  if (r < 0) {
+    lderr(cct) << this << " failed to unlock: " << cpp_strerror(r) << dendl;
+    return r;
   }
-}
 
-void ImageWatcher::clear_aio_ops_pending() {
-  RWLock::WLocker watch_locker(m_watch_lock);
-  if (m_aio_ops_pending) {
-    ldout(m_image_ctx.cct, 20) << this << " no pending AIO ops" << dendl;
-    m_aio_ops_pending = false;
+  return 0;
+
+err_cancel_unlock:
+  m_image_ctx.owner_lock.get_write();
+  if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
+    m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
   }
+  return r;
 }
 
 void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
@@ -537,6 +564,21 @@ int ImageWatcher::notify_rebuild_object_map(uint64_t request_id,
   return notify_async_request(async_request_id, bl, prog_ctx);
 }
 
+void ImageWatcher::notify_lock_state() {
+  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+  if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
+    // re-send the acquired lock notification so that peers know they can now
+    // request the lock
+    ldout(m_image_ctx.cct, 10) << this << " notify lock state" << dendl;
+
+    bufferlist bl;
+    ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
+
+    m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT,
+                               NULL);
+  }
+}
+
 void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
                                        const std::string &oid)
 {
@@ -592,6 +634,17 @@ ClientId ImageWatcher::get_client_id() {
   return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
 }
 
+void ImageWatcher::notify_acquired_lock() {
+  ldout(m_image_ctx.cct, 10) << this << " notify acquired lock" << dendl;
+
+  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+  notify_listeners_updated_lock();
+
+  bufferlist bl;
+  ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
+  m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+}
+
 void ImageWatcher::notify_release_lock() {
   RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
   release_lock();
@@ -609,7 +662,7 @@ void ImageWatcher::schedule_request_lock(bool use_timer, int timer_delay) {
   assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
 
   RWLock::RLocker watch_locker(m_watch_lock);
-  if (m_watch_state == WATCH_STATE_REGISTERED && m_aio_ops_pending) {
+  if (m_watch_state == WATCH_STATE_REGISTERED) {
     ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
 
     FunctionContext *ctx = new FunctionContext(
@@ -806,40 +859,48 @@ void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
                                   bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << this << " image exclusively locked announcement"
                              << dendl;
+
+  bool cancel_async_requests = true;
   if (payload.client_id.is_valid()) {
     Mutex::Locker l(m_owner_client_id_lock);
     if (payload.client_id == m_owner_client_id) {
-      // we already know that the remote client is the owner
-      return;
+      cancel_async_requests = false;
     }
     set_owner_client_id(payload.client_id);
   }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
-    schedule_cancel_async_requests();
-    schedule_request_lock(false);
+    if (cancel_async_requests) {
+      schedule_cancel_async_requests();
+    }
+    notify_listeners_updated_lock();
   }
 }
 
 void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
                                   bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << this << " exclusive lock released" << dendl;
+
+  bool cancel_async_requests = true;
   if (payload.client_id.is_valid()) {
     Mutex::Locker l(m_owner_client_id_lock);
     if (payload.client_id != m_owner_client_id) {
       ldout(m_image_ctx.cct, 10) << this << " unexpected owner: "
                                  << payload.client_id << " != "
                                  << m_owner_client_id << dendl;
-      return;
+      cancel_async_requests = false;
+    } else {
+      set_owner_client_id(ClientId());
     }
-    set_owner_client_id(ClientId());
   }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
-    schedule_cancel_async_requests();
-    schedule_request_lock(false);
+    if (cancel_async_requests) {
+      schedule_cancel_async_requests();
+    }
+    notify_listeners_updated_lock();
   }
 }
 
@@ -862,11 +923,25 @@ void ImageWatcher::handle_payload(const RequestLockPayload &payload,
       }
     }
 
-    ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock"
-                               << dendl;
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_release_lock, this));
-    m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
+    bool release_permitted = true;
+    {
+      Mutex::Locker listeners_locker(m_listeners_lock);
+      for (Listeners::iterator it = m_listeners.begin();
+           it != m_listeners.end(); ++it) {
+        if (!(*it)->handle_requested_lock()) {
+          release_permitted = false;
+          break;
+        }
+      }
+    }
+
+    if (release_permitted) {
+      ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock"
+                                 << dendl;
+      FunctionContext *ctx = new FunctionContext(
+        boost::bind(&ImageWatcher::notify_release_lock, this));
+      m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
+    }
   }
 }
 
@@ -1116,7 +1191,7 @@ void ImageWatcher::reregister_watch() {
   }
 
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
-    schedule_request_lock(false);
+    notify_listeners_updated_lock();
   }
 }
 
@@ -1135,4 +1210,52 @@ void ImageWatcher::RemoteContext::finish(int r) {
   m_image_watcher.schedule_async_complete(m_async_request_id, r);
 }
 
+void ImageWatcher::notify_listeners_releasing_lock() {
+  assert(m_image_ctx.owner_lock.is_locked());
+
+  Listeners listeners;
+  {
+    Mutex::Locker listeners_locker(m_listeners_lock);
+    m_listeners_in_use = true;
+    listeners = m_listeners;
+  }
+
+  for (Listeners::iterator it = listeners.begin();
+       it != listeners.end(); ++it) {
+    (*it)->handle_releasing_lock();
+  }
+
+  Mutex::Locker listeners_locker(m_listeners_lock);
+  m_listeners_in_use = false;
+  m_listeners_cond.Signal();
+}
+
+void ImageWatcher::notify_listeners_updated_lock() {
+  assert(m_image_ctx.owner_lock.is_locked());
+
+  Listeners listeners;
+  {
+    Mutex::Locker listeners_locker(m_listeners_lock);
+    m_listeners_in_use = true;
+    listeners = m_listeners;
+  }
+
+  bool lock_supported;
+  {
+    RWLock::RLocker watch_locker(m_watch_lock);
+    lock_supported = m_lock_supported;
+  }
+
+  assert(lock_supported || m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+  for (Listeners::iterator it = listeners.begin();
+       it != listeners.end(); ++it) {
+    (*it)->handle_lock_updated(lock_supported,
+                               m_lock_owner_state == LOCK_OWNER_STATE_LOCKED);
+  }
+
+  Mutex::Locker listeners_locker(m_listeners_lock);
+  m_listeners_in_use = false;
+  m_listeners_cond.Signal();
+}
+
 }
index d6fda2b85c782a3ba172ef49edc0c1560cf98fcb..c2c0ce395a75e9ba6d8d753e32fd258e67c809bb 100644 (file)
@@ -3,6 +3,7 @@
 #ifndef CEPH_LIBRBD_IMAGE_WATCHER_H
 #define CEPH_LIBRBD_IMAGE_WATCHER_H
 
+#include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/RWLock.h"
 #include "include/Context.h"
@@ -25,6 +26,13 @@ template <typename T> class TaskFinisher;
 
 class ImageWatcher {
 public:
+  struct Listener {
+    virtual ~Listener() {}
+
+    virtual bool handle_requested_lock() = 0;
+    virtual void handle_releasing_lock() = 0;
+    virtual void handle_lock_updated(bool lock_supported, bool lock_owner) = 0;
+  };
 
   ImageWatcher(ImageCtx& image_ctx);
   ~ImageWatcher();
@@ -33,19 +41,17 @@ public:
   bool is_lock_supported(const RWLock &snap_lock) const;
   bool is_lock_owner() const;
 
+  void register_listener(Listener *listener);
+  void unregister_listener(Listener *listener);
+
   int register_watch();
   int unregister_watch();
 
-  void refresh();
+  int refresh();
 
   int try_lock();
   void request_lock();
-  void prepare_unlock();
-  void cancel_unlock();
-  int unlock();
-
-  void flag_aio_ops_pending();
-  void clear_aio_ops_pending();
+  int release_lock();
 
   void assert_header_locked(librados::ObjectWriteOperation *op);
 
@@ -59,6 +65,7 @@ public:
   int notify_rebuild_object_map(uint64_t request_id,
                                 ProgressContext &prog_ctx);
 
+  void notify_lock_state();
   static void notify_header_update(librados::IoCtx &io_ctx,
                                    const std::string &oid);
 
@@ -87,6 +94,7 @@ private:
     TASK_CODE_ASYNC_PROGRESS
   };
 
+  typedef std::list<Listener *> Listeners;
   typedef std::pair<Context *, ProgressContext *> AsyncRequest;
 
   class Task {
@@ -194,10 +202,16 @@ private:
   WatchCtx m_watch_ctx;
   uint64_t m_watch_handle;
   WatchState m_watch_state;
-  bool m_aio_ops_pending;
+
+  bool m_lock_supported;
 
   LockOwnerState m_lock_owner_state;
 
+  Mutex m_listeners_lock;
+  Cond m_listeners_cond;
+  Listeners m_listeners;
+  bool m_listeners_in_use;
+
   TaskFinisher<Task> *m_task_finisher;
 
   RWLock m_async_request_lock;
@@ -213,7 +227,7 @@ private:
   int get_lock_owner_info(entity_name_t *locker, std::string *cookie,
                           std::string *address, uint64_t *handle);
   int lock();
-  bool release_lock();
+  int unlock();
   bool try_request_lock();
 
   void schedule_cancel_async_requests();
@@ -222,6 +236,7 @@ private:
   void set_owner_client_id(const WatchNotify::ClientId &client_id);
   WatchNotify::ClientId get_client_id();
 
+  void notify_acquired_lock();
   void notify_release_lock();
   void notify_released_lock();
 
@@ -281,6 +296,9 @@ private:
   void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
 
   void reregister_watch();
+
+  void notify_listeners_releasing_lock();
+  void notify_listeners_updated_lock();
 };
 
 } // namespace librbd
diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc
new file mode 100644 (file)
index 0000000..e7ec7e3
--- /dev/null
@@ -0,0 +1,419 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/Journal.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
+#include "librbd/AioObjectRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/JournalTypes.h"
+#include "journal/Journaler.h"
+#include "journal/ReplayEntry.h"
+#include "common/errno.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Journal: "
+
+namespace librbd {
+
+namespace {
+
+const std::string CLIENT_DESCRIPTION = "master image";
+
+} // anonymous namespace
+
+Journal::Journal(ImageCtx &image_ctx)
+  : m_image_ctx(image_ctx), m_journaler(NULL),
+    m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
+    m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
+    m_next_tid(0), m_blocking_writes(false) {
+
+  ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+
+  m_image_ctx.image_watcher->register_listener(&m_lock_listener);
+
+  Mutex::Locker locker(m_lock);
+  block_writes();
+}
+
+Journal::~Journal() {
+  assert(m_journaler == NULL);
+
+  m_image_ctx.image_watcher->unregister_listener(&m_lock_listener);
+
+  Mutex::Locker locker(m_lock);
+  unblock_writes();
+}
+
+bool Journal::is_journal_supported(ImageCtx &image_ctx) {
+  assert(image_ctx.snap_lock.is_locked());
+  return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
+          !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
+}
+
+int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) {
+  CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
+  ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
+
+  // TODO configurable commit flush interval
+  ::journal::Journaler journaler(io_ctx, image_id, "", 5);
+
+  // TODO order / splay width via config / image metadata / data pool
+  int r = journaler.create(24, 4, io_ctx.get_id());
+  if (r < 0) {
+    lderr(cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  r = journaler.register_client(CLIENT_DESCRIPTION);
+  if (r < 0) {
+    lderr(cct) << "failed to register client: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  return 0;
+}
+
+int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
+  CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
+  ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
+
+  return 0;
+}
+
+bool Journal::is_journal_ready() const {
+  Mutex::Locker locker(m_lock);
+  return (m_state == STATE_RECORDING);
+}
+
+void Journal::open() {
+  Mutex::Locker locker(m_lock);
+  if (m_journaler != NULL) {
+    return;
+  }
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+  create_journaler();
+}
+
+int Journal::close() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": state=" << m_state << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (m_state == STATE_UNINITIALIZED) {
+    return 0;
+  }
+
+  int r;
+  bool done = false;
+  while (!done) {
+    switch (m_state) {
+    case STATE_UNINITIALIZED:
+      done = true;
+      break;
+    case STATE_INITIALIZING:
+    case STATE_REPLAYING:
+      m_close_pending = true;
+      wait_for_state_transition();
+      break;
+    case STATE_RECORDING:
+      r = stop_recording();
+      if (r < 0) {
+        return r;
+      }
+      done = true;
+      break;
+    default:
+      assert(false);
+    }
+  }
+
+  destroy_journaler();
+  return 0;
+}
+
+uint64_t Journal::append_event(AioCompletion *aio_comp,
+                               const journal::EventEntry &event_entry,
+                               const AioObjectRequests &requests,
+                               bool flush_entry) {
+  assert(m_image_ctx.owner_lock.is_locked());
+  assert(m_state == STATE_RECORDING);
+
+  bufferlist bl;
+  ::encode(event_entry, bl);
+
+  ::journal::Future future = m_journaler->append("", bl);
+  uint64_t tid;
+  {
+    Mutex::Locker locker(m_lock);
+    tid = m_next_tid++;
+    m_events[tid] = Event(future, aio_comp, requests);
+  }
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": "
+                 << "event=" << event_entry.get_event_type() << ", "
+                 << "new_reqs=" << requests.size() << ", "
+                 << "flush=" << flush_entry << ", tid=" << tid << dendl;
+
+  Context *on_safe = new C_EventSafe(this, tid);
+  if (flush_entry) {
+    future.flush(on_safe);
+  } else {
+    future.wait(on_safe);
+  }
+  return tid;
+}
+
+void Journal::create_journaler() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_state == STATE_UNINITIALIZED);
+
+  // TODO allow alternate pool for journal objects and commit flush interval
+  m_close_pending = false;
+  m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+                                         5);
+
+  m_journaler->init(new C_InitJournal(this));
+  transition_state(STATE_INITIALIZING);
+}
+
+void Journal::destroy_journaler() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  assert(m_lock.is_locked());
+
+  m_close_pending = false;
+  delete m_journaler;
+  m_journaler = NULL;
+  transition_state(STATE_UNINITIALIZED);
+}
+
+void Journal::handle_initialized(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+    Mutex::Locker locker(m_lock);
+
+    // TODO: failed to open journal -- retry?
+    destroy_journaler();
+    create_journaler();
+    return;
+  }
+
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+  Mutex::Locker locker(m_lock);
+  if (m_close_pending) {
+    destroy_journaler();
+    return;
+  }
+
+  transition_state(STATE_REPLAYING);
+  m_journaler->start_replay(&m_replay_handler);
+}
+
+void Journal::handle_replay_ready() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  Mutex::Locker locker(m_lock);
+  while (true) {
+    if (m_close_pending) {
+      m_journaler->stop_replay();
+      destroy_journaler();
+      return;
+    }
+
+    ::journal::ReplayEntry replay_entry;
+    if (!m_journaler->try_pop_front(&replay_entry)) {
+      return;
+    }
+
+    m_lock.Unlock();
+    // TODO process the payload
+    m_lock.Lock();
+  }
+}
+
+void Journal::handle_replay_complete(int r) {
+  CephContext *cct = m_image_ctx.cct;
+
+  {
+    Mutex::Locker locker(m_lock);
+    if (r < 0) {
+      lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+
+      // TODO: failed to replay journal -- retry?
+      destroy_journaler();
+      create_journaler();
+      return;
+    }
+
+    ldout(cct, 20) << this << " " << __func__ << dendl;
+    m_journaler->stop_replay();
+
+    if (m_close_pending) {
+      destroy_journaler();
+      return;
+    }
+
+    // TODO configurable flush interval, flush bytes, and flush age
+    m_journaler->start_append(0, 0, 0);
+    transition_state(STATE_RECORDING);
+
+    unblock_writes();
+  }
+
+  // kick peers to let them know they can re-request the lock now
+  m_image_ctx.image_watcher->notify_lock_state();
+}
+
+void Journal::handle_event_safe(int r, uint64_t tid) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
+                 << "tid=" << tid << dendl;
+
+  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+
+  AioCompletion *aio_comp;
+  AioObjectRequests aio_object_requests;
+  Contexts on_safe_contexts;
+  {
+    Mutex::Locker locker(m_lock);
+    Events::iterator it = m_events.find(tid);
+    assert(it != m_events.end());
+
+    Event &event = it->second;
+    aio_comp = event.aio_comp;
+    aio_object_requests.swap(event.aio_object_requests);
+    on_safe_contexts.swap(event.on_safe_contexts);
+    m_events.erase(it);
+  }
+
+  ldout(cct, 20) << "completing tid=" << tid << dendl;
+
+  assert(m_image_ctx.image_watcher->is_lock_owner());
+
+  if (r < 0) {
+    // don't send aio requests if the journal fails -- bubble error up
+    aio_comp->fail(cct, r);
+  } else {
+    // send any waiting aio requests now that journal entry is safe
+    for (AioObjectRequests::iterator it = aio_object_requests.begin();
+         it != aio_object_requests.end(); ++it) {
+      (*it)->send();
+    }
+  }
+
+  // alert the cache about the journal event status
+  for (Contexts::iterator it = on_safe_contexts.begin();
+       it != on_safe_contexts.end(); ++it) {
+    (*it)->complete(r);
+  }
+}
+
+bool Journal::handle_requested_lock() {
+  Mutex::Locker locker(m_lock);
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": " << "state=" << m_state
+                 << dendl;
+
+  // prevent peers from taking our lock while we are replaying
+  return (m_state != STATE_INITIALIZING && m_state != STATE_REPLAYING);
+}
+
+void Journal::handle_releasing_lock() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (m_state == STATE_INITIALIZING || m_state == STATE_REPLAYING) {
+    // wait for replay to successfully interrupt
+    m_close_pending = true;
+    wait_for_state_transition();
+  }
+
+  if (m_state == STATE_UNINITIALIZED || m_state == STATE_RECORDING) {
+    // prevent new write ops but allow pending ops to flush to the journal
+    block_writes();
+  }
+}
+
+void Journal::handle_lock_updated(bool lock_owner) {
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": "
+                 << "owner=" << lock_owner << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (lock_owner && m_state == STATE_UNINITIALIZED) {
+    create_journaler();
+  } else if (!lock_owner && m_state != STATE_UNINITIALIZED) {
+    assert(m_state == STATE_RECORDING);
+    assert(m_events.empty());
+    int r = stop_recording();
+    if (r < 0) {
+      // TODO handle failed journal writes
+      assert(false);
+    }
+  }
+}
+
+int Journal::stop_recording() {
+  C_SaferCond cond;
+
+  m_journaler->stop_append(&cond);
+
+  m_lock.Unlock();
+  int r = cond.wait();
+  m_lock.Lock();
+
+  destroy_journaler();
+  if (r < 0) {
+    lderr(m_image_ctx.cct) << "failed to flush journal: " << cpp_strerror(r)
+                           << dendl;
+    return r;
+  }
+  return 0;
+}
+
+void Journal::block_writes() {
+  assert(m_lock.is_locked());
+  if (!m_blocking_writes) {
+    m_blocking_writes = true;
+    m_image_ctx.aio_work_queue->block_writes();
+  }
+}
+
+void Journal::unblock_writes() {
+  assert(m_lock.is_locked());
+  if (m_blocking_writes) {
+    m_blocking_writes = false;
+    m_image_ctx.aio_work_queue->unblock_writes();
+  }
+}
+
+void Journal::transition_state(State state) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
+  assert(m_lock.is_locked());
+  m_state = state;
+  m_cond.Signal();
+}
+
+void Journal::wait_for_state_transition() {
+  assert(m_lock.is_locked());
+  State state = m_state;
+  while (m_state == state) {
+    m_cond.Wait(m_lock);
+  }
+}
+
+} // namespace librbd
diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h
new file mode 100644 (file)
index 0000000..35c1738
--- /dev/null
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_H
+#define CEPH_LIBRBD_JOURNAL_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/unordered_map.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "journal/Future.h"
+#include "journal/ReplayHandler.h"
+#include "librbd/ImageWatcher.h"
+#include <list>
+#include <string>
+
+class Context;
+namespace journal {
+class Journaler;
+}
+
+namespace librbd {
+
+class AioCompletion;
+class AioObjectRequest;
+class ImageCtx;
+namespace journal {
+class EventEntry;
+}
+
+class Journal {
+public:
+  typedef std::list<AioObjectRequest *> AioObjectRequests;
+
+  Journal(ImageCtx &image_ctx);
+  ~Journal();
+
+  static bool is_journal_supported(ImageCtx &image_ctx);
+  static int create(librados::IoCtx &io_ctx, const std::string &image_id);
+  static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
+
+  bool is_journal_ready() const;
+
+  void open();
+  int close();
+
+  uint64_t append_event(AioCompletion *aio_comp,
+                        const journal::EventEntry &event_entry,
+                        const AioObjectRequests &requests,
+                        bool flush_entry);
+
+private:
+  typedef std::list<Context *> Contexts;
+
+  enum State {
+    STATE_UNINITIALIZED,
+    STATE_INITIALIZING,
+    STATE_REPLAYING,
+    STATE_RECORDING,
+  };
+
+  struct Event {
+    ::journal::Future future;
+    AioCompletion *aio_comp;
+    AioObjectRequests aio_object_requests;
+    Contexts on_safe_contexts;
+
+    Event() : aio_comp(NULL) {
+    }
+    Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
+          const AioObjectRequests &_requests)
+      : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+    }
+  };
+  typedef ceph::unordered_map<uint64_t, Event> Events;
+
+  struct LockListener : public ImageWatcher::Listener {
+    Journal *journal;
+    LockListener(Journal *_journal) : journal(_journal) {
+    }
+
+    virtual bool handle_requested_lock() {
+      return journal->handle_requested_lock();
+    }
+    virtual void handle_releasing_lock() {
+      journal->handle_releasing_lock();
+    }
+    virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
+      journal->handle_lock_updated(lock_owner);
+    }
+  };
+
+  struct C_InitJournal : public Context {
+    Journal *journal;
+
+    C_InitJournal(Journal *_journal) : journal(_journal) {
+    }
+
+    virtual void finish(int r) {
+      journal->handle_initialized(r);
+    }
+  };
+
+  struct C_EventSafe : public Context {
+    Journal *journal;
+    uint64_t tid;
+
+    C_EventSafe(Journal *_journal, uint64_t _tid)
+      : journal(_journal), tid(_tid) {
+    }
+
+    virtual void finish(int r) {
+      journal->handle_event_safe(r, tid);
+    }
+  };
+
+  struct ReplayHandler : public ::journal::ReplayHandler {
+    Journal *journal;
+    ReplayHandler(Journal *_journal) : journal(_journal) {
+    }
+
+    virtual void get() {
+      // TODO
+    }
+    virtual void put() {
+      // TODO
+    }
+
+    virtual void handle_entries_available() {
+      journal->handle_replay_ready();
+    }
+    virtual void handle_complete(int r) {
+      journal->handle_replay_complete(r);
+    }
+  };
+
+  ImageCtx &m_image_ctx;
+
+  ::journal::Journaler *m_journaler;
+
+  mutable Mutex m_lock;
+  Cond m_cond;
+  State m_state;
+
+  LockListener m_lock_listener;
+
+  ReplayHandler m_replay_handler;
+  bool m_close_pending;
+
+  uint64_t m_next_tid;
+  Events m_events;
+
+  bool m_blocking_writes;
+
+  void create_journaler();
+  void destroy_journaler();
+
+  void handle_initialized(int r);
+
+  void handle_replay_ready();
+  void handle_replay_complete(int r);
+
+  void handle_event_safe(int r, uint64_t tid);
+
+  bool handle_requested_lock();
+  void handle_releasing_lock();
+  void handle_lock_updated(bool lock_owner);
+
+  int stop_recording();
+
+  void block_writes();
+  void unblock_writes();
+
+  void transition_state(State state);
+  void wait_for_state_transition();
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_H
index 84fed189ffc1b259bc92890ee256f35f3a62fb6b..863531f786388586c745cb72c020537165aa7cc0 100644 (file)
@@ -23,6 +23,7 @@ librbd_internal_la_SOURCES = \
        librbd/ImageCtx.cc \
        librbd/ImageWatcher.cc \
        librbd/internal.cc \
+       librbd/Journal.cc \
        librbd/LibrbdAdminSocketHook.cc \
        librbd/LibrbdWriteback.cc \
        librbd/ObjectMap.cc \
@@ -36,11 +37,12 @@ noinst_LTLIBRARIES += librbd_api.la
 librbd_la_SOURCES = \
        librbd/librbd.cc
 librbd_la_LIBADD = \
-       librbd_internal.la $(LIBRBD_TYPES) \
+       librbd_internal.la $(LIBRBD_TYPES) libjournal.la \
        $(LIBRADOS) $(LIBCOMMON) $(LIBOSDC) \
        librados_internal.la \
        libcls_rbd_client.la \
        libcls_lock_client.la \
+       libcls_journal_client.la \
        $(PTHREAD_LIBS) $(EXTRALIBS)
 
 librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0
@@ -66,6 +68,7 @@ noinst_HEADERS += \
        librbd/ImageCtx.h \
        librbd/ImageWatcher.h \
        librbd/internal.h \
+       librbd/Journal.h \
        librbd/JournalTypes.h \
        librbd/LibrbdAdminSocketHook.h \
        librbd/LibrbdWriteback.h \
index d94780711d5308f4503e0b83aad55c14f9982e2d..e1f2e17796ffac22bb8b9f2ff55695762f7e0107 100644 (file)
@@ -21,6 +21,10 @@ ObjectMap::ObjectMap(ImageCtx &image_ctx)
 {
 }
 
+int ObjectMap::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
+  return io_ctx.remove(object_map_name(image_id, CEPH_NOSNAP));
+}
+
 std::string ObjectMap::object_map_name(const std::string &image_id,
                                       uint64_t snap_id) {
   std::string oid(RBD_OBJECT_MAP_PREFIX + image_id);
index 797307f2820dd7bcc760e07d7ca6b484887f1aa8..1737e1246684daed5826afcf7d870195e7d0c6d8 100644 (file)
@@ -21,6 +21,7 @@ public:
 
   ObjectMap(ImageCtx &image_ctx);
 
+  static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
   static std::string object_map_name(const std::string &image_id,
                                     uint64_t snap_id);
 
index 763cee170dad00b6129506ef9ccea0dc7968fd18..3a88b6ab78be97d61330ec926a6405e50e9fda04 100644 (file)
@@ -29,6 +29,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageWatcher.h"
 #include "librbd/internal.h"
+#include "librbd/Journal.h"
 #include "librbd/ObjectMap.h"
 #include "librbd/parent_types.h"
 #include "librbd/RebuildObjectMapRequest.h"
@@ -1301,13 +1302,37 @@ reprotect_and_return_err:
                                     OBJECT_NONEXISTENT);
       r = io_ctx.operate(ObjectMap::object_map_name(id, CEPH_NOSNAP), &op);
       if (r < 0) {
+        lderr(cct) << "error creating initial object map: "
+                   << cpp_strerror(r) << dendl;
         goto err_remove_header;
       }
     }
 
+    if ((features & RBD_FEATURE_JOURNALING) != 0) {
+      if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+        lderr(cct) << "cannot use journaling without exclusive lock" << dendl;
+        goto err_remove_object_map;
+      }
+
+      r = Journal::create(io_ctx, id);
+      if (r < 0) {
+        lderr(cct) << "error creating journal: " << cpp_strerror(r) << dendl;
+        goto err_remove_object_map;
+      }
+    }
+
     ldout(cct, 2) << "done." << dendl;
     return 0;
 
+  err_remove_object_map:
+    if ((features & RBD_FEATURE_OBJECT_MAP) != 0) {
+      remove_r = ObjectMap::remove(io_ctx, id);
+      if (remove_r < 0) {
+        lderr(cct) << "error cleaning up object map after creation failed: "
+                   << cpp_strerror(remove_r) << dendl;
+      }
+    }
+
   err_remove_header:
     remove_r = io_ctx.remove(header_oid);
     if (remove_r < 0) {
@@ -1816,6 +1841,13 @@ reprotect_and_return_err:
       return -EINVAL;
     }
 
+    RWLock::RLocker owner_locker(ictx->owner_lock);
+    RWLock::WLocker md_locker(ictx->md_lock);
+    r = _flush(ictx);
+    if (r < 0) {
+      return r;
+    }
+
     if ((features & RBD_FEATURES_MUTABLE) != features) {
       lderr(cct) << "cannot update immutable features" << dendl;
       return -EINVAL;
@@ -1824,13 +1856,17 @@ reprotect_and_return_err:
       return -EINVAL;
     }
 
-    RWLock::RLocker l(ictx->snap_lock);
-    uint64_t new_features = ictx->features | features;
-    if (!enabled) {
+    RWLock::WLocker snap_locker(ictx->snap_lock);
+    uint64_t new_features;
+    if (enabled) {
+      features &= ~ictx->features;
+      new_features = ictx->features | features;
+    } else {
+      features &= ictx->features;
       new_features = ictx->features & ~features;
     }
 
-    if (ictx->features == new_features) {
+    if (features == 0) {
       return 0;
     }
 
@@ -1861,6 +1897,13 @@ reprotect_and_return_err:
           return -EINVAL;
         }
         features_mask |= RBD_FEATURE_EXCLUSIVE_LOCK;
+
+        r = Journal::create(ictx->md_ctx, ictx->id);
+        if (r < 0) {
+          lderr(cct) << "error creating image journal: " << cpp_strerror(r)
+                     << dendl;
+          return r;
+        }
       }
 
       if (enable_flags != 0) {
@@ -1894,6 +1937,14 @@ reprotect_and_return_err:
       if ((features & RBD_FEATURE_FAST_DIFF) != 0) {
         disable_flags = RBD_FLAG_FAST_DIFF_INVALID;
       }
+      if ((features & RBD_FEATURE_JOURNALING) != 0) {
+        r = Journal::remove(ictx->md_ctx, ictx->id);
+        if (r < 0) {
+          lderr(cct) << "error removing image journal: " << cpp_strerror(r)
+                     << dendl;
+          return r;
+        }
+      }
     }
 
     ldout(cct, 10) << "update_features: features=" << new_features << ", mask="
@@ -1903,6 +1954,7 @@ reprotect_and_return_err:
     if (r < 0) {
       lderr(cct) << "failed to update features: " << cpp_strerror(r)
                  << dendl;
+      return r;
     }
     if (((ictx->features & RBD_FEATURE_OBJECT_MAP) == 0) &&
       ((features & RBD_FEATURE_OBJECT_MAP) != 0)) {
@@ -2186,9 +2238,16 @@ reprotect_and_return_err:
       }
     }
     if (!old_format) {
-      r = io_ctx.remove(ObjectMap::object_map_name(id, CEPH_NOSNAP));
+      r = Journal::remove(io_ctx, id);
+      if (r < 0 && r != -ENOENT) {
+        lderr(cct) << "error removing image journal" << dendl;
+        return r;
+      }
+
+      r = ObjectMap::remove(io_ctx, id);
       if (r < 0 && r != -ENOENT) {
        lderr(cct) << "error removing image object map" << dendl;
+        return r;
       }
 
       ldout(cct, 2) << "removing id object..." << dendl;
@@ -2705,6 +2764,17 @@ reprotect_and_return_err:
       ictx->object_map.refresh(ictx->snap_id);
 
       ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
+
+      // dynamically enable/disable journaling support
+      if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 &&
+          ictx->image_watcher != NULL && ictx->journal == NULL &&
+          ictx->snap_name.empty()) {
+        ictx->open_journal();
+      } else if ((ictx->features & RBD_FEATURE_JOURNALING) == 0 &&
+                 ictx->journal != NULL) {
+        // TODO journal needs to be disabled via proxied request to avoid race
+        //      between deleting journal and appending journal events
+      }
     } // release snap_lock and cache_lock
 
     if (ictx->image_watcher != NULL) {
@@ -2997,44 +3067,57 @@ reprotect_and_return_err:
     // snapshot and the user is trying to fix that
     ictx_check(ictx);
 
-    bool unlocking = false;
-    {
-      RWLock::WLocker l(ictx->owner_lock);
-      if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner() &&
-          snap_name != NULL && strlen(snap_name) != 0) {
-        // stop incoming requests since we will release the lock
-        ictx->image_watcher->prepare_unlock();
-        unlocking = true;
+    int r;
+    bool snapshot_mode = (snap_name != NULL && strlen(snap_name) != 0);
+    if (snapshot_mode) {
+      {
+        RWLock::WLocker owner_locker(ictx->owner_lock);
+        if (ictx->image_watcher != NULL &&
+            ictx->image_watcher->is_lock_owner()) {
+          r = ictx->image_watcher->release_lock();
+          if (r < 0) {
+            return r;
+          }
+        }
       }
-    }
 
-    ictx->cancel_async_requests();
-    ictx->flush_async_operations();
-    if (ictx->object_cacher) {
-      // complete pending writes before we're set to a snapshot and
-      // get -EROFS for writes
-      RWLock::RLocker owner_locker(ictx->owner_lock);
-      RWLock::WLocker md_locker(ictx->md_lock);
-      ictx->flush_cache();
+      ictx->cancel_async_requests();
+      ictx->flush_async_operations();
+
+      if (ictx->object_cacher) {
+        RWLock::RLocker owner_locker(ictx->owner_lock);
+        r = _flush(ictx);
+        if (r < 0) {
+          return r;
+        }
+      }
+
+      {
+        RWLock::WLocker snap_locker(ictx->snap_lock);
+        if (ictx->journal != NULL) {
+          r = ictx->close_journal(false);
+          if (r < 0) {
+            return r;
+          }
+        }
+      }
     }
-    int r = _snap_set(ictx, snap_name);
+
+    r = _snap_set(ictx, snap_name);
     if (r < 0) {
-      RWLock::WLocker l(ictx->owner_lock);
-      if (unlocking) {
-        ictx->image_watcher->cancel_unlock();
-      }
       return r;
     }
 
-    RWLock::WLocker l(ictx->owner_lock);
-    if (ictx->image_watcher != NULL) {
-      if (unlocking) {
-       r = ictx->image_watcher->unlock();
-       if (r < 0) {
-         lderr(ictx->cct) << "error unlocking image: " << cpp_strerror(r)
-                           << dendl;
-       }
+    {
+      RWLock::WLocker snap_locker(ictx->snap_lock);
+      if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 &&
+          ictx->journal == NULL && !snapshot_mode) {
+        ictx->open_journal();
       }
+    }
+
+    RWLock::RLocker owner_locker(ictx->owner_lock);
+    if (ictx->image_watcher != NULL) {
       ictx->image_watcher->refresh();
     }
     return r;
@@ -3086,31 +3169,42 @@ reprotect_and_return_err:
   {
     ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
 
+    // finish all incoming IO operations
+    ictx->aio_work_queue->drain();
+
+    int r = 0;
     {
-      RWLock::WLocker l(ictx->owner_lock);
+      // release the lock (and flush all in-flight IO)
+      RWLock::WLocker owner_locker(ictx->owner_lock);
       if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner()) {
-        // stop incoming requests
-        ictx->image_watcher->prepare_unlock();
+        r = ictx->image_watcher->release_lock();
+        if (r < 0) {
+          lderr(ictx->cct) << "error releasing image lock: " << cpp_strerror(r)
+                           << dendl;
+        }
       }
     }
 
-    assert(!ictx->aio_work_queue->writes_suspended() ||
+    assert(!ictx->aio_work_queue->writes_blocked() ||
            ictx->aio_work_queue->writes_empty());
-    ictx->aio_work_queue->drain();
+
     ictx->cancel_async_requests();
     ictx->flush_async_operations();
     ictx->readahead.wait_for_pending();
 
-    int r;
+    int flush_r;
     if (ictx->object_cacher) {
-      r = ictx->shutdown_cache(); // implicitly flushes
+      flush_r = ictx->shutdown_cache(); // implicitly flushes
     } else {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      r = _flush(ictx);
+      flush_r = _flush(ictx);
     }
-    if (< 0) {
-      lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(r)
+    if (flush_r< 0) {
+      lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
                        << dendl;
+      if (r == 0) {
+        r = flush_r;
+      }
     }
 
     ictx->op_work_queue->drain();
@@ -3120,6 +3214,13 @@ reprotect_and_return_err:
       ictx->copyup_finisher->stop();
     }
 
+    if (ictx->journal != NULL) {
+      int close_r = ictx->close_journal(true);
+      if (close_r < 0 && r == 0) {
+        r = close_r;
+      }
+    }
+
     if (ictx->parent) {
       int close_r = close_image(ictx->parent);
       if (r == 0 && close_r < 0) {
@@ -3129,19 +3230,6 @@ reprotect_and_return_err:
     }
 
     if (ictx->image_watcher) {
-      {
-       RWLock::WLocker l(ictx->owner_lock);
-       if (ictx->image_watcher->is_lock_owner()) {
-         int unlock_r = ictx->image_watcher->unlock();
-         if (unlock_r < 0) {
-           lderr(ictx->cct) << "error unlocking image: "
-                             << cpp_strerror(unlock_r) << dendl;
-            if (r == 0) {
-              r = unlock_r;
-            }
-         }
-       }
-      }
       ictx->unregister_watch();
     }
 
index 9c7471117f531464be5eeef4f5a5f1fbcf611dac..cc96eaa8e430a3d491ed5640610a8dd58778efec 100644 (file)
@@ -355,6 +355,7 @@ unittest_librbd_CXXFLAGS = $(UNITTEST_CXXFLAGS) -DTEST_LIBRBD_INTERNALS
 unittest_librbd_LDADD = \
        librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \
        libcls_rbd_client.la libcls_lock_client.la \
+       libjournal.la libcls_journal_client.la \
        librados_test_stub.la librados_internal.la \
        $(LIBOSDC) $(UNITTEST_LDADD) \
        $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
@@ -367,6 +368,7 @@ ceph_test_librbd_CXXFLAGS = $(UNITTEST_CXXFLAGS) -DTEST_LIBRBD_INTERNALS
 ceph_test_librbd_LDADD = \
        librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \
        libcls_rbd_client.la libcls_lock_client.la \
+       libjournal.la libcls_journal_client.la \
        librados_api.la $(LIBRADOS_DEPS) $(UNITTEST_LDADD) \
        $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
 bin_DEBUGPROGRAMS += ceph_test_librbd