]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: Protect object map updates with a lock assertion
authorJason Dillaman <dillaman@redhat.com>
Tue, 18 Nov 2014 11:24:33 +0000 (06:24 -0500)
committerJason Dillaman <dillaman@redhat.com>
Thu, 29 Jan 2015 02:12:52 +0000 (21:12 -0500)
Use the new assert_locked functionality to protect object
map updates to only the single client that holds the lock.
This is to protect against possible race conditions during
lock ownership transitions.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc

index 9f1150828cc35f3e7d8855c383b17d20196a6b43..0eb2b2cd4ce2e67f3832687af96561a5898deb61 100644 (file)
@@ -7,6 +7,8 @@
 #include "common/errno.h"
 #include "common/perf_counters.h"
 
+#include "cls/lock/cls_lock_client.h"
+
 #include "librbd/internal.h"
 
 #include "librbd/ImageCtx.h"
@@ -685,6 +687,77 @@ namespace librbd {
     }
   }
 
+  int ImageCtx::lock_object_map()
+  {
+    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
+      return 0;
+    }
+
+    int r;
+    bool broke_lock = false;
+    while (true) {
+      ldout(cct, 10) << "locking object map" << dendl;
+      r = rados::cls::lock::lock(&md_ctx, object_map_name(id), RBD_LOCK_NAME,
+                                 LOCK_EXCLUSIVE, "", "", "", utime_t(), 0);
+      if (r == 0) {
+        break;
+      } else if (broke_lock || r != -EBUSY) {
+        lderr(cct) << "failed to lock object map: " << cpp_strerror(r) << dendl;
+        return r;
+      }
+
+      typedef std::map<rados::cls::lock::locker_id_t,
+                       rados::cls::lock::locker_info_t> lockers_t;
+      lockers_t lockers;
+      ClsLockType lock_type;
+      std::string lock_tag;
+      int r = rados::cls::lock::get_lock_info(&md_ctx, object_map_name(id),
+                                              RBD_LOCK_NAME, &lockers,
+                                              &lock_type, &lock_tag);
+      if (r == -ENOENT) {
+        continue;
+      } else if (r < 0) {
+        lderr(cct) << "failed to list object map locks: " << cpp_strerror(r)
+                   << dendl;
+        return r;
+      }
+
+      ldout(cct, 10) << "breaking current object map lock" << dendl;
+      for (lockers_t::iterator it = lockers.begin();
+           it != lockers.end(); ++it) {
+        const rados::cls::lock::locker_id_t &locker = it->first;
+        r = rados::cls::lock::break_lock(&md_ctx, object_map_name(id),
+                                         RBD_LOCK_NAME, locker.cookie,
+                                         locker.locker);
+        if (r < 0 && r != -ENOENT) {
+          lderr(cct) << "failed to break object map lock: " << cpp_strerror(r)
+                     << dendl;
+          return r;
+        }
+      }
+
+
+
+      broke_lock = true;
+    }
+    return 0;
+  }
+
+  int ImageCtx::unlock_object_map()
+  {
+    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
+      return 0;
+    }
+
+    int r = rados::cls::lock::unlock(&md_ctx, object_map_name(id),
+                                     RBD_LOCK_NAME, "");
+    if (r < 0 && r != -ENOENT) {
+      lderr(cct) << "failed to release object map lock: " << cpp_strerror(r)
+                 << dendl;
+    }
+    return r;
+  }
+
   bool ImageCtx::object_may_exist(uint64_t object_no) const
   {
     // Fall back to default logic if object map is disabled or invalid
@@ -740,9 +813,13 @@ namespace librbd {
     uint64_t num_objs = Striper::get_num_objects(layout, get_current_size());
     ldout(cct, 20) << "resizing object map: " << num_objs << dendl;
     librados::ObjectWriteOperation op;
+    rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
     cls_client::object_map_resize(&op, num_objs, default_object_state);
     int r = data_ctx.operate(object_map_name(id), &op);
-    if (r < 0) {
+    if (r == -EBUSY) {
+      lderr(cct) << "object map lock not owned by client" << dendl;
+      return r;
+    } else if (r < 0) {
       lderr(cct) << "error resizing object map: size=" << num_objs << ", "
                  << "state=" << default_object_state << ", "
                  << "error=" << cpp_strerror(r) << dendl;
@@ -800,10 +877,14 @@ namespace librbd {
                   << static_cast<uint32_t>(new_state) << dendl;
 
     librados::ObjectWriteOperation op;
+    rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
     cls_client::object_map_update(&op, start_object_no, end_object_no,
                                   new_state, current_state);
     int r = data_ctx.operate(object_map_name(id), &op);
-    if (r < 0) {
+    if (r == -EBUSY) {
+      lderr(cct) << "object map lock not owned by client" << dendl;
+      return r;
+    } else if (r < 0) {
       lderr(cct) << "object map update failed: " << cpp_strerror(r) << dendl;
       invalidate_object_map();
     } else {
@@ -814,7 +895,7 @@ namespace librbd {
         }
       }
     }
-    return r;
+    return 0;
   }
 
   void ImageCtx::invalidate_object_map()
index 6c0977ee964332cef0a907c0ed5b4bfcb666b860..5ece66d828386cbfea9ae7347b6cf6191b49053b 100644 (file)
@@ -183,6 +183,8 @@ namespace librbd {
     void wait_for_pending_copyup();
 
     /* object map */
+    int lock_object_map();
+    int unlock_object_map();
     bool object_may_exist(uint64_t object_no) const;
     int refresh_object_map();
     int resize_object_map(uint8_t default_object_state);
index 34d485914d1780c3cb86ce9cfb1aae0dfc398ce9..6d248d899ed70767c8dd4b1018a01a5c47a43beb 100644 (file)
@@ -320,11 +320,12 @@ int ImageWatcher::lock() {
   ldout(m_image_ctx.cct, 20) << "acquired exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
 
-  r = m_image_ctx.refresh_object_map();
-  if (r < 0) {
+  r = m_image_ctx.lock_object_map();
+  if (r < 0 && r != -ENOENT) {
     unlock();
     return r;
   }
+  m_image_ctx.refresh_object_map();
 
   bufferlist bl;
   ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
@@ -356,6 +357,7 @@ int ImageWatcher::unlock()
     return r;
   }
 
+  m_image_ctx.unlock_object_map();
   notify_released_lock();
   return 0;
 }
@@ -1001,7 +1003,6 @@ void ImageWatcher::reregister_watch() {
         lderr(m_image_ctx.cct) << "failed to re-register image watch: "
                                << cpp_strerror(m_watch_error) << dendl;
        schedule_retry_aio_requests();
-        cancel_async_requests(m_watch_error);
         return;
       }
     }