]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: Create image exclusive lock watch/notify handler
authorJason Dillaman <dillaman@redhat.com>
Wed, 8 Oct 2014 12:20:47 +0000 (08:20 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 13 Jan 2015 00:01:07 +0000 (19:01 -0500)
The new watch/notify handler replaces the existing header
update watch/notify handler and adds support for managing
image exclusive lock leadership.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc [new file with mode: 0644]
src/librbd/ImageWatcher.h [new file with mode: 0644]
src/librbd/Makefile.am
src/librbd/WatchCtx.cc [deleted file]
src/librbd/WatchCtx.h [deleted file]
src/librbd/internal.cc
src/librbd/internal.h

index d444492ae55700f25e49e755503e4d5cdf3c923e..da3e22ec4a70012683503fbcd8c0ee39014a9690 100644 (file)
@@ -8,9 +8,9 @@
 #include "common/perf_counters.h"
 
 #include "librbd/internal.h"
-#include "librbd/WatchCtx.h"
 
 #include "librbd/ImageCtx.h"
+#include "librbd/ImageWatcher.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -37,9 +37,10 @@ namespace librbd {
       flush_encountered(false),
       exclusive_locked(false),
       name(image_name),
-      wctx(NULL),
+      image_watcher(NULL),
       refresh_seq(0),
       last_refresh(0),
+      owner_lock("librbd::ImageCtx::owner_lock"),
       md_lock("librbd::ImageCtx::md_lock"),
       cache_lock("librbd::ImageCtx::cache_lock"),
       snap_lock("librbd::ImageCtx::snap_lock"),
@@ -606,17 +607,16 @@ namespace librbd {
   }
 
   int ImageCtx::register_watch() {
-    assert(!wctx);
-    wctx = new WatchCtx(this);
-    return md_ctx.watch(header_oid, 0, &(wctx->cookie), wctx);
+    assert(image_watcher == NULL);
+    image_watcher = new ImageWatcher(*this);
+    return image_watcher->register_watch();
   }
 
   void ImageCtx::unregister_watch() {
-    assert(wctx);
-    wctx->invalidate();
-    md_ctx.unwatch(header_oid, wctx->cookie);
-    delete wctx;
-    wctx = NULL;
+    assert(image_watcher != NULL);
+    image_watcher->unregister_watch();
+    delete image_watcher;
+    image_watcher = NULL;
   }
 
   size_t ImageCtx::parent_io_len(uint64_t offset, size_t length,
index 75d176de0b5a00399c8952e3664d6fab51b52428..8412223154d40221b2219809b509a0533ea1b0d0 100644 (file)
@@ -31,7 +31,7 @@ class PerfCounters;
 
 namespace librbd {
 
-  class WatchCtx;
+  class ImageWatcher;
 
   struct ImageCtx {
     CephContext *cct;
@@ -56,15 +56,16 @@ namespace librbd {
     std::string name;
     std::string snap_name;
     IoCtx data_ctx, md_ctx;
-    WatchCtx *wctx;
+    ImageWatcher *image_watcher;
     int refresh_seq;    ///< sequence for refresh requests
     int last_refresh;   ///< last completed refresh
 
     /**
      * Lock ordering:
-     * md_lock, cache_lock, snap_lock, parent_lock, refresh_lock,
+     * owner_lock, md_lock, cache_lock, snap_lock, parent_lock, refresh_lock,
      * aio_lock
      */
+    RWLock owner_lock; // protects exclusive lock leadership updates
     RWLock md_lock; // protects access to the mutable image metadata that
                    // isn't guarded by other locks below
                    // (size, features, image locks, etc)
diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc
new file mode 100644 (file)
index 0000000..91db833
--- /dev/null
@@ -0,0 +1,643 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "librbd/ImageWatcher.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/ImageCtx.h"
+#include "cls/lock/cls_lock_client.h"
+#include "cls/lock/cls_lock_types.h"
+#include "include/encoding.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include <sstream>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::ImageWatcher: "
+
+namespace librbd {
+
+static const std::string WATCHER_LOCK_TAG = "internal";
+static const std::string WATCHER_LOCK_COOKIE_PREFIX = "auto";
+
+static const uint64_t  NOTIFY_TIMEOUT = 5000;
+static const uint8_t   NOTIFY_VERSION = 1;
+
+enum {
+  NOTIFY_OP_ACQUIRED_LOCK = 0,
+  NOTIFY_OP_RELEASED_LOCK = 1,
+  NOTIFY_OP_REQUEST_LOCK  = 2,
+  NOTIFY_OP_HEADER_UPDATE = 3
+};
+
+class FunctionContext : public Context {
+public:
+  FunctionContext(const boost::function<void()> &callback)
+    : m_callback(callback)
+  {
+  }
+
+  virtual void finish(int r) {
+    m_callback();
+  }
+private:
+  boost::function<void()> m_callback;
+};
+
+
+ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
+  : m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0),
+    m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
+    m_finisher(new Finisher(image_ctx.cct)),
+    m_watch_lock("librbd::ImageWatcher::m_watch_lock"), m_watch_error(0),
+    m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
+    m_retrying_aio_requests(false)
+{
+  m_finisher->start();
+}
+
+ImageWatcher::~ImageWatcher()
+{
+  m_finisher->stop();
+  delete m_finisher;
+}
+
+bool ImageWatcher::is_lock_supported() const {
+  assert(m_image_ctx.owner_lock.is_locked());
+  return ((m_image_ctx.features & RBD_FEATURE_EXCLUSIVE_LOCK) != 0 &&
+         !m_image_ctx.read_only && m_image_ctx.snap_id == CEPH_NOSNAP);
+}
+
+bool ImageWatcher::is_lock_owner() const {
+  // TODO issue #8903 will address lost notification handling
+  // in cases where the lock was broken
+  assert(m_image_ctx.owner_lock.is_locked());
+  return m_lock_owner_state == LOCK_OWNER_STATE_LOCKED;
+}
+
+int ImageWatcher::register_watch() {
+  ldout(m_image_ctx.cct, 20) << "registering image watcher" << dendl;
+
+  RWLock::WLocker l(m_watch_lock);
+  m_watch_error = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid, &m_handle,
+                                           &m_watch_ctx);
+  return m_watch_error;
+}
+
+int ImageWatcher::get_watch_error() {
+  RWLock::RLocker l(m_watch_lock);
+  return m_watch_error;
+}
+
+int ImageWatcher::unregister_watch() {
+  ldout(m_image_ctx.cct, 20)  << "unregistering image watcher" << dendl;
+
+  {
+    Mutex::Locker l(m_aio_request_lock);
+    assert(m_aio_requests.empty());
+  }
+
+  RWLock::WLocker l(m_watch_lock);
+  return m_image_ctx.md_ctx.unwatch2(m_handle);
+}
+
+void ImageWatcher::flush_aio_operations() {
+  Mutex::Locker l(m_aio_request_lock);
+  while (m_retrying_aio_requests || !m_aio_requests.empty()) {
+    ldout(m_image_ctx.cct, 20)  << "flushing aio operations: "
+                               << "retrying=" << m_retrying_aio_requests << ","
+                               << "count=" << m_aio_requests.size() << dendl;
+    m_aio_request_cond.Wait(m_aio_request_lock);
+  }
+}
+
+int ImageWatcher::try_lock() {
+  assert(m_image_ctx.owner_lock.is_wlocked());
+  assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+
+  while (true) {
+    int r = lock();
+    if (r != -EBUSY) {
+      return r;
+    }
+
+    // determine if the current lock holder is still alive
+    entity_name_t locker;
+    std::string locker_cookie;
+    std::string locker_address;
+    uint64_t locker_handle;
+    r = get_lock_owner_info(&locker, &locker_cookie, &locker_address,
+                           &locker_handle);
+    if (r < 0) {
+      return r;
+    }
+    if (locker_cookie.empty() || locker_address.empty()) {
+      // lock is now unlocked ... try again
+      continue;
+    }
+
+    std::list<obj_watch_t> watchers;
+    r = m_image_ctx.md_ctx.list_watchers(m_image_ctx.header_oid, &watchers);
+    if (r < 0) {
+      return r;
+    }
+
+    for (std::list<obj_watch_t>::iterator iter = watchers.begin();
+        iter != watchers.end(); ++iter) {
+      if ((strncmp(locker_address.c_str(),
+                   iter->addr, sizeof(iter->addr)) == 0) &&
+         (locker_handle == iter->cookie)) {
+       return 0;
+      }
+    }
+
+    ldout(m_image_ctx.cct, 1) << "breaking exclusive lock: " << locker << dendl;
+    r = rados::cls::lock::break_lock(&m_image_ctx.md_ctx,
+                                     m_image_ctx.header_oid, RBD_LOCK_NAME,
+                                     locker_cookie, locker);
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    }
+  }
+  return 0;
+}
+
+int ImageWatcher::request_lock(
+    const boost::function<int(AioCompletion*)>& restart_op, AioCompletion* c) {
+  assert(m_image_ctx.owner_lock.is_locked());
+  assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+
+  {
+    Mutex::Locker l(m_aio_request_lock);
+    bool request_pending = !m_aio_requests.empty();
+    ldout(m_image_ctx.cct, 10) << "queuing aio request: " << c
+                              << dendl;
+    m_aio_requests.push_back(std::make_pair(restart_op, c));
+    if (request_pending) {
+      return 0;
+    }
+  }
+
+  // run notify request in finisher to avoid blocking aio path
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&ImageWatcher::notify_request_lock, this));
+  m_finisher->queue(ctx);
+  ldout(m_image_ctx.cct, 5) << "requesting exclusive lock" << dendl;
+  return 0;
+}
+
+bool ImageWatcher::try_request_lock() {
+  int r = try_lock();
+  if (r < 0) {
+    ldout(m_image_ctx.cct, 5) << "failed to acquire exclusive lock:"
+                             << cpp_strerror(r) << dendl;
+    cancel_aio_requests(-EROFS);
+    return true;
+  }
+
+  if (is_lock_owner()) {
+    ldout(m_image_ctx.cct, 5) << "successfully acquired exclusive lock"
+                             << dendl;
+  } else {
+    ldout(m_image_ctx.cct, 5) << "unable to acquire exclusive lock, retrying"
+                             << dendl;
+  }
+  return is_lock_owner();
+}
+
+void ImageWatcher::finalize_request_lock() {
+  {
+    RWLock::WLocker l(m_image_ctx.owner_lock);
+    try_request_lock();
+  }
+  retry_aio_requests();
+}
+
+int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie,
+                                     std::string *address, uint64_t *handle) {
+  std::map<rados::cls::lock::locker_id_t,
+          rados::cls::lock::locker_info_t> lockers;
+  ClsLockType lock_type;
+  std::string lock_tag;
+  int r = rados::cls::lock::get_lock_info(&m_image_ctx.md_ctx,
+                                         m_image_ctx.header_oid,
+                                         RBD_LOCK_NAME, &lockers, &lock_type,
+                                         &lock_tag);
+  if (r < 0) {
+    return r;
+  }
+
+  if (lockers.empty()) {
+    ldout(m_image_ctx.cct, 20) << "no lockers detected" << dendl;
+    return 0;
+  }
+
+  if (lock_tag != WATCHER_LOCK_TAG) {
+    ldout(m_image_ctx.cct, 10) << "locked by external mechanism: tag="
+                              << lock_tag << dendl;
+    return -EBUSY;
+  }
+
+  if (lock_type == LOCK_SHARED) {
+    ldout(m_image_ctx.cct, 10) << "shared lock type detected" << dendl;
+    return -EBUSY;
+  }
+
+  std::map<rados::cls::lock::locker_id_t,
+           rados::cls::lock::locker_info_t>::iterator iter = lockers.begin();
+  if (!decode_lock_cookie(iter->first.cookie, handle)) {
+    ldout(m_image_ctx.cct, 10) << "locked by external mechanism: cookie="
+                              << iter->first.cookie << dendl;
+    return -EBUSY;
+  }
+
+  *locker = iter->first.locker;
+  *cookie = iter->first.cookie;
+  *address = stringify(iter->second.addr);
+  ldout(m_image_ctx.cct, 10) << "retrieved exclusive locker: " << *locker
+                            << "@" << *address << dendl;
+  return 0;
+}
+
+int ImageWatcher::lock() {
+  int r = rados::cls::lock::lock(&m_image_ctx.md_ctx, m_image_ctx.header_oid,
+                                RBD_LOCK_NAME, LOCK_EXCLUSIVE,
+                                encode_lock_cookie(), WATCHER_LOCK_TAG, "",
+                                utime_t(), 0);
+  if (r < 0) {
+    return r;
+  }
+
+  ldout(m_image_ctx.cct, 20) << "acquired exclusive lock" << dendl;
+  m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
+
+  bufferlist bl;
+  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+  ::encode(NOTIFY_OP_ACQUIRED_LOCK, bl);
+  ENCODE_FINISH(bl);
+
+  // send the notification when we aren't holding locks
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
+               bl, NOTIFY_TIMEOUT, reinterpret_cast<bufferlist *>(NULL)));
+  m_finisher->queue(ctx);
+  return 0;
+}
+
+int ImageWatcher::unlock()
+{
+  assert(m_image_ctx.owner_lock.is_wlocked());
+  if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+    return 0;
+  }
+
+  ldout(m_image_ctx.cct, 20) << "releasing exclusive lock" << dendl;
+  m_lock_owner_state = LOCK_OWNER_STATE_NOT_LOCKED;
+  int r = rados::cls::lock::unlock(&m_image_ctx.md_ctx, m_image_ctx.header_oid,
+                                  RBD_LOCK_NAME, encode_lock_cookie());
+  if (r < 0 && r != -ENOENT) {
+    lderr(m_image_ctx.cct) << "failed to release exclusive lock: "
+                          << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  notify_released_lock();
+  return 0;
+}
+
+void ImageWatcher::release_lock()
+{
+  RWLock::WLocker l(m_image_ctx.owner_lock);
+  {
+    RWLock::WLocker l2(m_image_ctx.md_lock);
+    m_image_ctx.flush_cache();
+  }
+  m_image_ctx.data_ctx.aio_flush();
+
+  unlock();
+}
+
+void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
+                                       const std::string &oid)
+{
+  // supports legacy (empty buffer) clients
+  bufferlist bl;
+  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+  ::encode(NOTIFY_OP_HEADER_UPDATE, bl);
+  ENCODE_FINISH(bl);
+
+  io_ctx.notify2(oid, bl, NOTIFY_TIMEOUT, NULL);
+}
+
+std::string ImageWatcher::encode_lock_cookie() const {
+  std::ostringstream ss;
+  ss << WATCHER_LOCK_COOKIE_PREFIX << " " << m_handle;
+  return ss.str();
+}
+
+bool ImageWatcher::decode_lock_cookie(const std::string &tag,
+                                     uint64_t *handle) {
+  std::string prefix;
+  std::istringstream ss(tag);
+  if (!(ss >> prefix >> *handle) || prefix != WATCHER_LOCK_COOKIE_PREFIX) {
+    return false;
+  }
+  return true;
+}
+
+void ImageWatcher::retry_aio_requests() {
+  std::vector<AioRequest> lock_request_restarts;
+  {
+    Mutex::Locker l(m_aio_request_lock);
+    assert(!m_retrying_aio_requests);
+    lock_request_restarts.swap(m_aio_requests);
+    m_retrying_aio_requests = true;
+  }
+
+  for (std::vector<AioRequest>::iterator iter = lock_request_restarts.begin();
+       iter != lock_request_restarts.end(); ++iter) {
+    ldout(m_image_ctx.cct, 10) << "retrying aio request: " << iter->second
+                              << dendl;
+    iter->first(iter->second);
+  }
+
+  Mutex::Locker l(m_aio_request_lock);
+  m_retrying_aio_requests = false;
+  m_aio_request_cond.Signal();
+}
+
+void ImageWatcher::cancel_aio_requests(int result) {
+  Mutex::Locker l(m_aio_request_lock);
+  for (std::vector<AioRequest>::iterator iter = m_aio_requests.begin();
+       iter != m_aio_requests.end(); ++iter) {
+    AioCompletion *c = iter->second;
+    c->get();
+    c->lock.Lock();
+    c->rval = result;
+    c->lock.Unlock();
+    c->finish_adding_requests(m_image_ctx.cct);
+    c->put();
+  }
+  m_aio_requests.clear();
+  m_aio_request_cond.Signal();
+}
+
+int ImageWatcher::decode_response_code(bufferlist &bl) {
+  int r;
+  bufferlist::iterator iter = bl.begin();
+  DECODE_START(NOTIFY_VERSION, iter);
+  ::decode(r, iter);
+  DECODE_FINISH(iter);
+  return r;
+}
+
+void ImageWatcher::notify_released_lock() {
+  bufferlist bl;
+  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+  ::encode(NOTIFY_OP_RELEASED_LOCK, bl);
+  ENCODE_FINISH(bl);
+  m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+}
+
+void ImageWatcher::notify_request_lock() {
+  bool try_lock_complete;
+  {
+    // try to lock now that we know we are not in a rados callback
+    RWLock::WLocker l(m_image_ctx.owner_lock);
+    try_lock_complete = try_request_lock();
+  }
+  if (try_lock_complete) {
+    retry_aio_requests();
+    return;
+  }
+
+  bufferlist bl;
+  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+  ::encode(NOTIFY_OP_REQUEST_LOCK, bl);
+  ENCODE_FINISH(bl);
+
+  bufferlist response;
+  int r = notify_lock_owner(bl, response);
+  if (r == -ETIMEDOUT) {
+    ldout(m_image_ctx.cct, 5) << "timed out requesting lock: retrying" << dendl;
+    retry_aio_requests();
+  } else if (r < 0) {
+    lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r)
+                          << dendl;
+    cancel_aio_requests(-EROFS);
+  }
+}
+
+int ImageWatcher::notify_lock_owner(bufferlist &bl, bufferlist& response) {
+  bufferlist response_bl;
+  int r = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT,
+                                    &response_bl);
+  if (r < 0 && r != -ETIMEDOUT) {
+    lderr(m_image_ctx.cct) << "lock owner notification failed: "
+                          << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> responses_t;
+  responses_t responses;
+  if (response_bl.length() > 0) {
+    try {
+      bufferlist::iterator iter = response_bl.begin();
+      ::decode(responses, iter);
+    } catch (const buffer::error &err) {
+      lderr(m_image_ctx.cct) << "failed to decode response" << dendl;
+      return -EINVAL;
+    }
+  }
+
+  bool lock_owner_responded = false;
+  for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
+    if (i->second.length() > 0) {
+      if (lock_owner_responded) {
+       lderr(m_image_ctx.cct) << "duplicate lock owners detected" << dendl;
+       return -EIO;
+      }
+      lock_owner_responded = true;
+      response.claim(i->second);
+    }
+  }
+
+  if (!lock_owner_responded) {
+    lderr(m_image_ctx.cct) << "no lock owners detected" << dendl;
+    return -ETIMEDOUT;
+  }
+  return 0;
+}
+
+void ImageWatcher::handle_header_update() {
+  ldout(m_image_ctx.cct, 1) << "image header updated" << dendl;
+
+  Mutex::Locker lictx(m_image_ctx.refresh_lock);
+  ++m_image_ctx.refresh_seq;
+  m_image_ctx.perfcounter->inc(l_librbd_notify);
+}
+
+void ImageWatcher::handle_acquired_lock() {
+  ldout(m_image_ctx.cct, 1) << "image exclusively locked announcement" << dendl;
+}
+
+void ImageWatcher::handle_released_lock() {
+  ldout(m_image_ctx.cct, 20) << "exclusive lock released" << dendl;
+
+  Mutex::Locker l(m_aio_request_lock);
+  if (!m_aio_requests.empty()) {
+    ldout(m_image_ctx.cct, 20) << "queuing lock request" << dendl;
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::finalize_request_lock, this));
+    m_finisher->queue(ctx);
+  }
+}
+
+void ImageWatcher::handle_request_lock(bufferlist *out) {
+  RWLock::WLocker l(m_image_ctx.owner_lock);
+  if (is_lock_owner()) {
+    m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
+
+    // need to send something back so the client can detect a missing leader
+    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
+    ::encode(0, *out);
+    ENCODE_FINISH(*out);
+
+    ldout(m_image_ctx.cct, 5) << "exclusive lock requested, releasing" << dendl;
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::release_lock, this));
+    m_finisher->queue(ctx);
+  }
+}
+
+void ImageWatcher::handle_unknown_op(bufferlist *out) {
+  RWLock::RLocker l(m_image_ctx.owner_lock);
+  if (is_lock_owner()) {
+    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
+    ::encode(-EOPNOTSUPP, *out);
+    ENCODE_FINISH(*out);
+  }
+}
+
+void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
+                                bufferlist &bl) {
+  if (bl.length() == 0) {
+    // legacy notification for header updates
+    bufferlist out;
+    acknowledge_notify(notify_id, handle, out);
+    handle_header_update();
+    return;
+  }
+
+  bufferlist::iterator iter = bl.begin();
+  try {
+    DECODE_START(NOTIFY_VERSION, iter);
+    int op;
+    ::decode(op, iter);
+
+    bufferlist out;
+    switch (op) {
+    // client ops
+    case NOTIFY_OP_ACQUIRED_LOCK:
+      acknowledge_notify(notify_id, handle, out);
+      handle_acquired_lock();
+      break;
+    case NOTIFY_OP_RELEASED_LOCK:
+      acknowledge_notify(notify_id, handle, out);
+      handle_released_lock();
+      break;
+    case NOTIFY_OP_HEADER_UPDATE:
+      acknowledge_notify(notify_id, handle, out);
+      handle_header_update();
+      break;
+
+    // lock owner-only ops
+    case NOTIFY_OP_REQUEST_LOCK:
+      handle_request_lock(&out);
+      acknowledge_notify(notify_id, handle, out);
+      break;
+
+    default:
+      handle_unknown_op(&out);
+      acknowledge_notify(notify_id, handle, out);
+      break;
+    }
+    DECODE_FINISH(iter);
+  } catch (const buffer::error &err) {
+    lderr(m_image_ctx.cct) << "error decoding image notification" << dendl;
+  }
+}
+
+void ImageWatcher::handle_error(uint64_t handle, int err) {
+  lderr(m_image_ctx.cct) << "image watch failed: " << handle << ", "
+                         << cpp_strerror(err) << dendl;
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&ImageWatcher::reregister_watch, this));
+  m_finisher->queue(ctx);
+}
+
+void ImageWatcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+                                     bufferlist &out) {
+  m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out);
+}
+
+void ImageWatcher::reregister_watch() {
+  ldout(m_image_ctx.cct, 10) << "re-registering image watch" << dendl;
+
+  {
+    RWLock::WLocker l(m_image_ctx.owner_lock);
+    bool lock_owner = (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED);
+    int r;
+    if (lock_owner) {
+      unlock();
+    }
+
+    {
+      RWLock::WLocker l(m_watch_lock);
+      m_image_ctx.md_ctx.unwatch2(m_handle);
+      m_watch_error = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid,
+                                                &m_handle, &m_watch_ctx);
+      if (m_watch_error < 0) {
+        lderr(m_image_ctx.cct) << "failed to re-register image watch: "
+                               << cpp_strerror(m_watch_error) << dendl;
+        cancel_aio_requests(m_watch_error);
+        return;
+      }
+    }
+
+    if (lock_owner) {
+      r = try_lock();
+      if (r == -EBUSY) {
+        ldout(m_image_ctx.cct, 5) << "lost image lock while re-registering "
+                                  << "image watch" << dendl;
+      } else if (r < 0) {
+        lderr(m_image_ctx.cct) << "failed to lock image while re-registering "
+                               << "image watch" << cpp_strerror(r) << dendl;
+      }
+    }
+  }
+
+  retry_aio_requests();
+}
+
+void ImageWatcher::WatchCtx::handle_notify(uint64_t notify_id,
+                                          uint64_t handle,
+                                           uint64_t notifier_id,
+                                          bufferlist& bl) {
+  image_watcher.handle_notify(notify_id, handle, bl);
+}
+
+void ImageWatcher::WatchCtx::handle_failed_notify(uint64_t notify_id,
+                                                  uint64_t handle,
+                                                  uint64_t notifier_id) {
+  lderr(image_watcher.m_image_ctx.cct) << "notify ack failed: " << notify_id
+                                       << ", " << handle << ", " << notifier_id
+                                       << dendl;
+}
+
+void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) {
+  image_watcher.handle_error(handle, err);
+}
+
+}
diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h
new file mode 100644 (file)
index 0000000..57bf56d
--- /dev/null
@@ -0,0 +1,123 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_LIBRBD_IMAGE_WATCHER_H
+#define CEPH_LIBRBD_IMAGE_WATCHER_H
+
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "include/rados/librados.hpp"
+#include <string>
+#include <utility>
+#include <vector>
+#include <boost/function.hpp>
+#include "include/assert.h"
+
+class entity_name_t;
+class Context;
+class Finisher;
+
+namespace librbd {
+
+  class AioCompletion;
+  class ImageCtx;
+
+  class ImageWatcher {
+  public:
+
+    ImageWatcher(ImageCtx& image_ctx);
+    ~ImageWatcher();
+
+    bool is_lock_supported() const;
+    bool is_lock_owner() const;
+
+    int register_watch();
+    int unregister_watch();
+    int get_watch_error();
+
+    void flush_aio_operations();
+
+    int try_lock();
+    int request_lock(const boost::function<int(AioCompletion*)>& restart_op,
+                    AioCompletion* c);
+    int unlock();
+
+    static void notify_header_update(librados::IoCtx &io_ctx,
+                                    const std::string &oid);
+
+  private:
+
+    enum LockOwnerState {
+      LOCK_OWNER_STATE_NOT_LOCKED,
+      LOCK_OWNER_STATE_LOCKED,
+      LOCK_OWNER_STATE_RELEASING
+    };
+
+    typedef std::pair<boost::function<int(AioCompletion *)>,
+                     AioCompletion *> AioRequest;
+
+    struct WatchCtx : public librados::WatchCtx2 {
+      ImageWatcher &image_watcher;
+
+      WatchCtx(ImageWatcher &parent) : image_watcher(parent) {}
+
+      virtual void handle_notify(uint64_t notify_id,
+                                 uint64_t handle,
+                                uint64_t notifier_id,
+                                 bufferlist& bl);
+      virtual void handle_failed_notify(uint64_t notify_id,
+                                        uint64_t handle,
+                                        uint64_t notifier_id);
+      virtual void handle_error(uint64_t handle, int err);
+    };
+
+    ImageCtx &m_image_ctx;
+
+    WatchCtx m_watch_ctx;
+    uint64_t m_handle;
+
+    LockOwnerState m_lock_owner_state;
+
+    Finisher *m_finisher;
+
+    RWLock m_watch_lock;
+    int m_watch_error;
+
+    Mutex m_aio_request_lock;
+    Cond m_aio_request_cond;
+    std::vector<AioRequest> m_aio_requests;
+    bool m_retrying_aio_requests;
+
+    std::string encode_lock_cookie() const;
+    static bool decode_lock_cookie(const std::string &cookie, uint64_t *handle);
+
+    int get_lock_owner_info(entity_name_t *locker, std::string *cookie,
+                           std::string *address, uint64_t *handle);
+    int lock();
+    void release_lock();
+    bool try_request_lock();
+    void finalize_request_lock();
+
+    void retry_aio_requests();
+    void cancel_aio_requests(int result);
+    static int decode_response_code(bufferlist &bl);
+
+    void notify_released_lock();
+    void notify_request_lock();
+    int notify_lock_owner(bufferlist &bl, bufferlist &response);
+
+    void handle_header_update();
+    void handle_acquired_lock();
+    void handle_released_lock();
+    void handle_request_lock(bufferlist *out);
+    void handle_unknown_op(bufferlist *out);
+    void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
+    void handle_error(uint64_t cookie, int err);
+    void acknowledge_notify(uint64_t notify_id, uint64_t handle,
+                           bufferlist &out);
+    void reregister_watch();
+  };
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IMAGE_WATCHER_H
index 4274eb1c84f2c879bd26973499561c176db365f8..a1f9032262f7661f99c901667aacafb7b0070746 100644 (file)
@@ -3,9 +3,9 @@ librbd_la_SOURCES = \
        librbd/AioCompletion.cc \
        librbd/AioRequest.cc \
        librbd/ImageCtx.cc \
+       librbd/ImageWatcher.cc \
        librbd/internal.cc \
-       librbd/LibrbdWriteback.cc \
-       librbd/WatchCtx.cc
+       librbd/LibrbdWriteback.cc
 librbd_la_LIBADD = \
        $(LIBRADOS) $(LIBCOMMON) $(LIBOSDC) \
        librados_internal.la \
@@ -28,8 +28,8 @@ noinst_HEADERS += \
        librbd/AioCompletion.h \
        librbd/AioRequest.h \
        librbd/ImageCtx.h \
+       librbd/ImageWatcher.h \
        librbd/internal.h \
        librbd/LibrbdWriteback.h \
        librbd/parent_types.h \
-       librbd/SnapInfo.h \
-       librbd/WatchCtx.h
+       librbd/SnapInfo.h
diff --git a/src/librbd/WatchCtx.cc b/src/librbd/WatchCtx.cc
deleted file mode 100644 (file)
index b64e8ef..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "common/ceph_context.h"
-#include "common/dout.h"
-#include "common/perf_counters.h"
-
-#include "librbd/ImageCtx.h"
-#include "librbd/internal.h"
-
-#include "librbd/WatchCtx.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::WatchCtx: "
-
-namespace librbd {
-
-  void WatchCtx::invalidate()
-  {
-    Mutex::Locker l(lock);
-    valid = false;
-  }
-
-  void WatchCtx::notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
-  {
-    Mutex::Locker l(lock);
-    ldout(ictx->cct, 1) <<  " got notification opcode=" << (int)opcode
-                       << " ver=" << ver << " cookie=" << cookie << dendl;
-    if (valid) {
-      Mutex::Locker lictx(ictx->refresh_lock);
-      ++ictx->refresh_seq;
-      ictx->perfcounter->inc(l_librbd_notify);
-    }
-  }
-}
diff --git a/src/librbd/WatchCtx.h b/src/librbd/WatchCtx.h
deleted file mode 100644 (file)
index 9872c84..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#ifndef CEPH_LIBRBD_WATCHCTX_H
-#define CEPH_LIBRBD_WATCHCTX_H
-
-#include "include/int_types.h"
-
-#include "common/Mutex.h"
-#include "include/buffer.h"
-#include "include/rados/librados.hpp"
-
-class ImageCtx;
-
-namespace librbd {
-
-  class WatchCtx : public librados::WatchCtx {
-    ImageCtx *ictx;
-    bool valid;
-    Mutex lock;
-  public:
-    uint64_t cookie;
-    WatchCtx(ImageCtx *ctx) : ictx(ctx),
-                             valid(true),
-                             lock("librbd::WatchCtx"),
-                             cookie(0) {}
-    virtual ~WatchCtx() {}
-    void invalidate();
-    virtual void notify(uint8_t opcode, uint64_t ver, ceph::bufferlist& bl);
-  };
-}
-
-#endif
index 0250e9cbbe762dc11f3c476657e01b952fc4705e..3eb17601466e928a1958df00e9d0ee53e2c35d01 100644 (file)
@@ -17,6 +17,7 @@
 #include "librbd/AioCompletion.h"
 #include "librbd/AioRequest.h"
 #include "librbd/ImageCtx.h"
+#include "librbd/ImageWatcher.h"
 
 #include "librbd/internal.h"
 #include "librbd/parent_types.h"
@@ -256,11 +257,8 @@ namespace librbd {
     return 0;
   }
 
-  int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver,
-                   ImageCtx *ictx)
+  int notify_change(IoCtx& io_ctx, const string& oid, ImageCtx *ictx)
   {
-    uint64_t ver;
-
     if (ictx) {
       ictx->refresh_lock.Lock();
       ldout(ictx->cct, 20) << "notify_change refresh_seq = " << ictx->refresh_seq
@@ -269,12 +267,7 @@ namespace librbd {
       ictx->refresh_lock.Unlock();
     }
 
-    if (pver)
-      ver = *pver;
-    else
-      ver = io_ctx.get_last_version();
-    bufferlist bl;
-    io_ctx.notify(oid, ver, bl);
+    ImageWatcher::notify_header_update(io_ctx, oid);
     return 0;
   }
 
@@ -297,7 +290,7 @@ namespace librbd {
     bufferlist bl;
     int r = io_ctx.write(header_oid, header, header.length(), 0);
 
-    notify_change(io_ctx, header_oid, NULL, NULL);
+    notify_change(io_ctx, header_oid, NULL);
 
     return r;
   }
@@ -489,7 +482,7 @@ namespace librbd {
     if (r < 0)
       return r;
 
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
 
     ictx->perfcounter->inc(l_librbd_snap_create);
     return 0;
@@ -561,7 +554,7 @@ namespace librbd {
     if (r < 0)
       return r;
 
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
 
     ictx->perfcounter->inc(l_librbd_snap_remove);
     return 0;
@@ -606,7 +599,7 @@ namespace librbd {
                                          RBD_PROTECTION_STATUS_PROTECTED);
     if (r < 0)
       return r;
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return 0;
   }
 
@@ -652,7 +645,7 @@ namespace librbd {
                                          RBD_PROTECTION_STATUS_UNPROTECTING);
     if (r < 0)
       return r;
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
 
     parent_spec pspec(ictx->md_ctx.get_id(), ictx->id, snap_id);
     // search all pools for children depending on this snapshot
@@ -720,7 +713,7 @@ namespace librbd {
                       << dendl;
       goto reprotect_and_return_err;
     }
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return 0;
 
 reprotect_and_return_err:
@@ -731,7 +724,7 @@ reprotect_and_return_err:
     if (proterr < 0) {
       lderr(ictx->cct) << "snap_unprotect: can't reprotect image" << dendl;
     }
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return r;
   }
 
@@ -1219,7 +1212,7 @@ reprotect_and_return_err:
     }
 
     if (old_format) {
-      notify_change(io_ctx, old_header_name(srcname), NULL, NULL);
+      notify_change(io_ctx, old_header_name(srcname), NULL);
     }
 
     return 0;
@@ -1552,7 +1545,7 @@ reprotect_and_return_err:
       lderr(cct) << "error writing header: " << cpp_strerror(-r) << dendl;
       return r;
     } else {
-      notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+      notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     }
 
     return 0;
@@ -1943,7 +1936,7 @@ reprotect_and_return_err:
       return r;
     }
 
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
 
     ictx->perfcounter->inc(l_librbd_snap_rollback);
     return r;
@@ -2194,8 +2187,9 @@ reprotect_and_return_err:
       ictx->parent = NULL;
     }
 
-    if (ictx->wctx)
+    if (ictx->image_watcher) {
       ictx->unregister_watch();
+    }
 
     delete ictx;
   }
@@ -2309,7 +2303,7 @@ reprotect_and_return_err:
       }
     }
     ictx->snap_lock.put_read();
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
 
     ldout(cct, 20) << "finished flattening" << dendl;
 
@@ -2374,7 +2368,7 @@ reprotect_and_return_err:
                               cookie, tag, "", utime_t(), 0);
     if (r < 0)
       return r;
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return 0;
   }
 
@@ -2393,7 +2387,7 @@ reprotect_and_return_err:
                                 RBD_LOCK_NAME, cookie);
     if (r < 0)
       return r;
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return 0;
   }
 
@@ -2418,7 +2412,7 @@ reprotect_and_return_err:
                                     RBD_LOCK_NAME, cookie, lock_client);
     if (r < 0)
       return r;
-    notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
+    notify_change(ictx->md_ctx, ictx->header_oid, ictx);
     return 0;
   }
 
index d9aa1b5ce9af8307bd2c912f1f1e8d7a7d93c831..417efc0ab6a018725f87078807a914b5879b530a 100644 (file)
@@ -152,7 +152,7 @@ namespace librbd {
   int read_header_bl(librados::IoCtx& io_ctx, const std::string& md_oid,
                     ceph::bufferlist& header, uint64_t *ver);
   int notify_change(librados::IoCtx& io_ctx, const std::string& oid,
-                   uint64_t *pver, ImageCtx *ictx);
+                   ImageCtx *ictx);
   int read_header(librados::IoCtx& io_ctx, const std::string& md_oid,
                  struct rbd_obj_header_ondisk *header, uint64_t *ver);
   int write_header(librados::IoCtx& io_ctx, const std::string& md_oid,