]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: simplify state machine lock assumptions
authorJason Dillaman <dillaman@redhat.com>
Tue, 11 Aug 2015 15:42:02 +0000 (11:42 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 17 Nov 2015 20:05:38 +0000 (15:05 -0500)
Use asynchronous callbacks to ensure that no locks are held when
re-entering a state machine.  This helps to avoid odd edge conditions
where locks may or may not be held depending on whether a AIO
operation was required.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AsyncObjectThrottle.cc
src/librbd/AsyncObjectThrottle.h
src/librbd/FlattenRequest.cc
src/librbd/RebuildObjectMapRequest.cc
src/librbd/TrimRequest.cc

index 59b3a1f6021cd381ef2287aee81c4586ab1174d2..e4b07faf364d226390e0adc77e5a13c15ab20a9a 100644 (file)
@@ -39,16 +39,17 @@ void AsyncObjectThrottle<T>::start_ops(uint64_t max_concurrent) {
     complete = (m_current_ops == 0);
   }
   if (complete) {
-    m_ctx->complete(m_ret);
+    // avoid re-entrant callback
+    m_image_ctx.op_work_queue->queue(m_ctx, m_ret);
     delete this;
   }
 }
 
 template <typename T>
 void AsyncObjectThrottle<T>::finish_op(int r) {
-  assert(m_image_ctx.owner_lock.is_locked());
   bool complete;
   {
+    RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
     Mutex::Locker locker(m_lock);
     --m_current_ops;
     if (r < 0 && r != -ENOENT && m_ret == 0) {
index a83105145a49e38e4cbf24a838bccfda48184425..08f0199b5efb916cba8e86a830d6e5e57447a87e 100644 (file)
@@ -36,7 +36,6 @@ protected:
   ImageCtxT &m_image_ctx;
 
   virtual void finish(int r) {
-    RWLock::RLocker locker(m_image_ctx.owner_lock);
     m_finisher.finish_op(r);
   }
 
index 40e09af5aa0eb9b44cad3675e763fb7e82e2759c..5b3e4a2e01fef8cb4dd74ba341c6f0cb759676db 100644 (file)
 
 namespace librbd {
 
-class AsyncFlattenObjectContext : public C_AsyncObjectThrottle<> {
+class C_FlattenObject : public C_AsyncObjectThrottle<> {
 public:
-  AsyncFlattenObjectContext(AsyncObjectThrottle<> &throttle,
-                            ImageCtx *image_ctx, uint64_t object_size,
-                            ::SnapContext snapc, uint64_t object_no)
+  C_FlattenObject(AsyncObjectThrottle<> &throttle, ImageCtx *image_ctx,
+                  uint64_t object_size, ::SnapContext snapc, uint64_t object_no)
     : C_AsyncObjectThrottle(throttle, *image_ctx), m_object_size(object_size),
       m_snapc(snapc), m_object_no(object_no)
   {
@@ -67,6 +66,7 @@ bool FlattenRequest::should_complete(int r) {
     return true;
   }
 
+  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   switch (m_state) {
   case STATE_FLATTEN_OBJECTS:
     ldout(cct, 5) << "FLATTEN_OBJECTS" << dendl;
@@ -95,7 +95,7 @@ void FlattenRequest::send() {
 
   m_state = STATE_FLATTEN_OBJECTS;
   AsyncObjectThrottle<>::ContextFactory context_factory(
-    boost::lambda::bind(boost::lambda::new_ptr<AsyncFlattenObjectContext>(),
+    boost::lambda::bind(boost::lambda::new_ptr<C_FlattenObject>(),
       boost::lambda::_1, &m_image_ctx, m_object_size, m_snapc,
       boost::lambda::_2));
   AsyncObjectThrottle<> *throttle = new AsyncObjectThrottle<>(
@@ -143,10 +143,9 @@ bool FlattenRequest::send_update_header() {
 }
 
 bool FlattenRequest::send_update_children() {
+  assert(m_image_ctx.owner_lock.is_locked());
   CephContext *cct = m_image_ctx.cct;
 
-  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-
   // should have been canceled prior to releasing lock
   assert(!m_image_ctx.image_watcher->is_lock_supported() ||
          m_image_ctx.image_watcher->is_lock_owner());
index 9b2039b0e3b030368a257c507d2f0a249f14bb89..5d5a8f00b729136846c079d0fc7fb8659499747e 100644 (file)
@@ -165,17 +165,16 @@ bool RebuildObjectMapRequest::should_complete(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << this << " should_complete: " << " r=" << r << dendl;
 
+  RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
   switch (m_state) {
   case STATE_RESIZE_OBJECT_MAP:
     ldout(cct, 5) << "RESIZE_OBJECT_MAP" << dendl;
     if (r == -ESTALE && !m_attempted_trim) {
       // objects are still flagged as in-use -- delete them
       m_attempted_trim = true;
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
       send_trim_image();
       return false;
     } else if (r == 0) {
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
       send_verify_objects();
     }
     break;
@@ -183,7 +182,6 @@ bool RebuildObjectMapRequest::should_complete(int r) {
   case STATE_TRIM_IMAGE:
     ldout(cct, 5) << "TRIM_IMAGE" << dendl;
     if (r == 0) {
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
       send_resize_object_map();
     }
     break;
@@ -191,7 +189,6 @@ bool RebuildObjectMapRequest::should_complete(int r) {
   case STATE_VERIFY_OBJECTS:
     ldout(cct, 5) << "VERIFY_OBJECTS" << dendl;
     if (r == 0) {
-      assert(m_image_ctx.owner_lock.is_locked());
       send_save_object_map();
     }
     break;
@@ -199,7 +196,6 @@ bool RebuildObjectMapRequest::should_complete(int r) {
   case STATE_SAVE_OBJECT_MAP:
     ldout(cct, 5) << "SAVE_OBJECT_MAP" << dendl;
     if (r == 0) {
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
       send_update_header();
     }
     break;
index a690ce5feeb9b5f7df10619eb3e7769da98a0c48..afef4d9da337936368f57aec0bb7263bb5a287fb 100644 (file)
@@ -113,6 +113,7 @@ bool TrimRequest::should_complete(int r)
     return true;
   }
 
+  RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
   switch (m_state) {
   case STATE_COPYUP_OBJECTS:
     ldout(cct, 5) << " COPYUP_OBJECTS" << dendl;
@@ -121,10 +122,7 @@ bool TrimRequest::should_complete(int r)
 
   case STATE_PRE_REMOVE:
     ldout(cct, 5) << " PRE_REMOVE" << dendl;
-    {
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
-      send_remove_objects();
-    }
+    send_remove_objects();
     break;
 
   case STATE_REMOVE_OBJECTS:
@@ -134,10 +132,7 @@ bool TrimRequest::should_complete(int r)
 
   case STATE_POST_REMOVE:
     ldout(cct, 5) << " POST_OBJECTS" << dendl;
-    {
-      RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
-      send_clean_boundary();
-    }
+    send_clean_boundary();
     break;
 
   case STATE_CLEAN_BOUNDARY: