]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: simplify IO flush handling
authorJason Dillaman <dillaman@redhat.com>
Fri, 31 Jul 2015 02:31:55 +0000 (22:31 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 3 Feb 2016 03:00:46 +0000 (22:00 -0500)
Add a new convenience method to ImageCtx for handling flush
requests and cleanup flush handling with dealing with the cache.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit e8d8096babaf15fe8af717297d512f580aff6c18)

src/librbd/AsyncOperation.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/internal.cc
src/librbd/internal.h

index dfb1e61a10095b095f2ff8b99bd76ad6d0125fd9..2402b48795246733f9c4c787d8f70fed89a60ce7 100644 (file)
@@ -3,6 +3,7 @@
 #include "librbd/AsyncOperation.h"
 #include "librbd/ImageCtx.h"
 #include "common/dout.h"
+#include "common/WorkQueue.h"
 #include "include/assert.h"
 
 #define dout_subsys ceph_subsys_rbd
index 1574c56896fbfd404e4761a556f2ef71f1a75f89..c8df238bf150dc6fd2f34b0c7a1a14605df16795 100644 (file)
@@ -48,6 +48,89 @@ public:
   }
 };
 
+struct C_FlushCache : public Context {
+  ImageCtx *image_ctx;
+  Context *on_safe;
+
+  C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
+    : image_ctx(_image_ctx), on_safe(_on_safe) {
+  }
+  virtual void finish(int r) {
+    // successful cache flush indicates all IO is now safe
+    RWLock::RLocker owner_locker(image_ctx->owner_lock);
+    image_ctx->flush_cache(on_safe);
+  }
+};
+
+struct C_InvalidateCache : public Context {
+  ImageCtx *image_ctx;
+  bool purge_on_error;
+  bool reentrant_safe;
+  Context *on_finish;
+
+  C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
+                    bool _reentrant_safe, Context *_on_finish)
+    : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
+      reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
+  }
+  virtual void finish(int r) {
+    assert(image_ctx->cache_lock.is_locked());
+    CephContext *cct = image_ctx->cct;
+
+    if (r == -EBLACKLISTED) {
+      lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
+      image_ctx->object_cacher->purge_set(image_ctx->object_set);
+    } else if (r != 0 && purge_on_error) {
+      lderr(cct) << "invalidate cache encountered error "
+                 << cpp_strerror(r) << " !Purging cache..." << dendl;
+      image_ctx->object_cacher->purge_set(image_ctx->object_set);
+    } else if (r != 0) {
+      lderr(cct) << "flush_cache returned " << r << dendl;
+    }
+
+    loff_t unclean = image_ctx->object_cacher->release_set(
+      image_ctx->object_set);
+    if (unclean == 0) {
+      r = 0;
+    } else {
+      lderr(cct) << "could not release all objects from cache: "
+                 << unclean << " bytes remain" << dendl;
+      r = -EBUSY;
+    }
+
+    if (reentrant_safe) {
+      on_finish->complete(r);
+    } else {
+      image_ctx->op_work_queue->queue(on_finish, r);
+    }
+  }
+
+};
+
+struct C_AsyncCallback : public Context {
+  ImageCtx *image_ctx;
+  Context *on_finish;
+  C_AsyncCallback(ImageCtx *image_ctx, Context *on_finish)
+    : image_ctx(image_ctx), on_finish(on_finish) {
+  }
+  virtual void finish(int r) {
+    image_ctx->op_work_queue->queue(on_finish, r);
+  }
+};
+
+void _flush_async_operations(ImageCtx *ictx, Context *on_finish) {
+  {
+    Mutex::Locker async_ops_locker(ictx->async_ops_lock);
+    if (!ictx->async_ops.empty()) {
+      ldout(ictx->cct, 20) << "flush async operations: " << on_finish << " "
+                           << "count=" << ictx->async_ops.size() << dendl;
+      ictx->async_ops.front()->add_flush_context(on_finish);
+      return;
+    }
+  }
+  on_finish->complete(0);
+}
+
 } // anonymous namespace
 
   ImageCtx::ImageCtx(const string &image_name, const string &image_id,
@@ -655,53 +738,47 @@ public:
     }
   }
 
-  void ImageCtx::flush_cache_aio(Context *onfinish) {
+  int ImageCtx::flush_cache() {
+    C_SaferCond cond_ctx;
+    flush_cache(&cond_ctx);
+
+    ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+    int r = cond_ctx.wait();
+    ldout(cct, 20) << "finished flushing cache" << dendl;
+
+    return r;
+  }
+
+  void ImageCtx::flush_cache(Context *onfinish) {
     assert(owner_lock.is_locked());
     cache_lock.Lock();
     object_cacher->flush_set(object_set, onfinish);
     cache_lock.Unlock();
   }
 
-  int ImageCtx::flush_cache() {
-    int r = 0;
-    Mutex mylock("librbd::ImageCtx::flush_cache");
-    Cond cond;
-    bool done;
-    Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
-    flush_cache_aio(onfinish);
-    mylock.Lock();
-    while (!done) {
-      ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
-      cond.Wait(mylock);
-    }
-    mylock.Unlock();
-    ldout(cct, 20) << "finished flushing cache" << dendl;
-    return r;
-  }
-
-  void ImageCtx::shutdown_cache() {
+  int ImageCtx::shutdown_cache() {
     flush_async_operations();
 
     RWLock::RLocker owner_locker(owner_lock);
-    invalidate_cache(true);
+    int r = invalidate_cache(true);
     object_cacher->stop();
+    return r;
   }
 
   int ImageCtx::invalidate_cache(bool purge_on_error) {
-    int result;
-    C_SaferCond ctx;
-    invalidate_cache(&ctx);
-    result = ctx.wait();
-
-    if (result && purge_on_error) {
-      cache_lock.Lock();
-      if (object_cacher != NULL) {
-       lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
-       object_cacher->purge_set(object_set);
-      }
-      cache_lock.Unlock();
+    flush_async_operations();
+    if (object_cacher == NULL) {
+      return 0;
     }
 
+    cache_lock.Lock();
+    object_cacher->release_set(object_set);
+    cache_lock.Unlock();
+
+    C_SaferCond ctx;
+    flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
+
+    int result = ctx.wait();
     return result;
   }
 
@@ -715,29 +792,7 @@ public:
     object_cacher->release_set(object_set);
     cache_lock.Unlock();
 
-    flush_cache_aio(new FunctionContext(boost::bind(
-      &ImageCtx::invalidate_cache_completion, this, _1, on_finish)));
-  }
-
-  void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) {
-    assert(cache_lock.is_locked());
-    if (r == -EBLACKLISTED) {
-      lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
-      object_cacher->purge_set(object_set);
-    } else if (r != 0) {
-      lderr(cct) << "flush_cache returned " << r << dendl;
-    }
-
-    loff_t unclean = object_cacher->release_set(object_set);
-    if (unclean == 0) {
-      r = 0;
-    } else {
-      lderr(cct) << "could not release all objects from cache: "
-                 << unclean << " bytes remain" << dendl;
-      r = -EBUSY;
-    }
-
-    op_work_queue->queue(on_finish, r);
+    flush_cache(new C_InvalidateCache(this, false, false, on_finish));
   }
 
   void ImageCtx::clear_nonexistence_cache() {
@@ -800,20 +855,35 @@ public:
 
   void ImageCtx::flush_async_operations() {
     C_SaferCond ctx;
-    flush_async_operations(&ctx);
+    _flush_async_operations(this, &ctx);
     ctx.wait();
   }
 
   void ImageCtx::flush_async_operations(Context *on_finish) {
-    Mutex::Locker l(async_ops_lock);
-    if (async_ops.empty()) {
-      on_finish->complete(0);
-      return;
+    // complete context in clean thread context
+    _flush_async_operations(this, new C_AsyncCallback(this, on_finish));
+  }
+
+  int ImageCtx::flush() {
+    assert(owner_lock.is_locked());
+
+    flush_async_operations();
+    if (object_cacher != NULL) {
+      int r = flush_cache();
+      if (r < 0) {
+        return r;
+      }
     }
+    return 0;
+  }
 
-    ldout(cct, 20) << "flush async operations: " << on_finish << " "
-                   << "count=" << async_ops.size() << dendl;
-    async_ops.front()->add_flush_context(on_finish);
+  void ImageCtx::flush(Context *on_safe) {
+    assert(owner_lock.is_locked());
+    if (object_cacher != NULL) {
+      // flush cache after completing all in-flight AIO ops
+      on_safe = new C_FlushCache(this, on_safe);
+    }
+    flush_async_operations(on_safe);
   }
 
   void ImageCtx::cancel_async_requests() {
index 238b0ab6bd85775dc972207df627d2d316c51f32..5fa0ee6c488fe86cd9a46b7c59ba76ba42b50949 100644 (file)
@@ -192,12 +192,11 @@ namespace librbd {
     void write_to_cache(object_t o, const bufferlist& bl, size_t len,
                        uint64_t off, Context *onfinish, int fadvise_flags);
     void user_flushed();
-    void flush_cache_aio(Context *onfinish);
     int flush_cache();
-    void shutdown_cache();
+    void flush_cache(Context *onfinish);
+    int shutdown_cache();
     int invalidate_cache(bool purge_on_error=false);
     void invalidate_cache(Context *on_finish);
-    void invalidate_cache_completion(int r, Context *on_finish);
     void clear_nonexistence_cache();
     int register_watch();
     void unregister_watch();
@@ -209,6 +208,9 @@ namespace librbd {
     void flush_async_operations();
     void flush_async_operations(Context *on_finish);
 
+    int flush();
+    void flush(Context *on_safe);
+
     void cancel_async_requests();
   };
 }
index 790a03665a7559255e3ca925aff0033b33d0b91e..4c76dda8e957646e249ccf458db4eeec24c34f84 100644 (file)
@@ -391,7 +391,7 @@ bool ImageWatcher::release_lock()
   {
     RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
     RWLock::WLocker md_locker(m_image_ctx.md_lock);
-    librbd::_flush(&m_image_ctx);
+    m_image_ctx.flush();
   }
 
   m_image_ctx.owner_lock.get_write();
index b4b4d9e48780d8986c9716da8730bb78092f8c18..8c6e1b8bd3b3cf2166deb05b1ba043e7c67babf7 100644 (file)
@@ -612,7 +612,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     RWLock::WLocker md_locker(ictx->md_lock);
-    r = _flush(ictx);
+    r = ictx->flush();
     if (r < 0) {
       return r;
     }
@@ -2164,7 +2164,7 @@ reprotect_and_return_err:
     } // release snap_lock and cache_lock
 
     if (new_snap) {
-      _flush(ictx);
+      ictx->flush();
     }
 
     ictx->refresh_lock.Lock();
@@ -2224,7 +2224,6 @@ reprotect_and_return_err:
       // writes might create new snapshots. Rolling back will replace
       // the current version, so we have to invalidate that too.
       RWLock::WLocker md_locker(ictx->md_lock);
-      ictx->flush_async_operations();
       r = ictx->invalidate_cache();
       if (r < 0) {
        return r;
@@ -2446,7 +2445,7 @@ reprotect_and_return_err:
       // get -EROFS for writes
       RWLock::RLocker owner_locker(ictx->owner_lock);
       RWLock::WLocker md_locker(ictx->md_lock);
-      ictx->flush_cache();
+      ictx->flush();
     }
     int r = _snap_set(ictx, snap_name);
     if (r < 0) {
@@ -2893,7 +2892,7 @@ reprotect_and_return_err:
     // ensure previous writes are visible to listsnaps
     {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      _flush(ictx);
+      ictx->flush();
     }
 
     int r = ictx_check(ictx);
@@ -3275,19 +3274,9 @@ reprotect_and_return_err:
 
     C_AioWrite *flush_ctx = new C_AioWrite(cct, c);
     c->add_request();
-    ictx->flush_async_operations(flush_ctx);
+    ictx->flush(flush_ctx);
 
     c->init_time(ictx, AIO_TYPE_FLUSH);
-    C_AioWrite *req_comp = new C_AioWrite(cct, c);
-    c->add_request();
-    if (ictx->object_cacher) {
-      ictx->flush_cache_aio(req_comp);
-    } else {
-      librados::AioCompletion *rados_completion =
-       librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
-      ictx->data_ctx.aio_flush_async(rados_completion);
-      rados_completion->release();
-    }
     c->finish_adding_requests(cct);
     c->put();
     ictx->perfcounter->inc(l_librbd_aio_flush);
@@ -3306,31 +3295,12 @@ reprotect_and_return_err:
     ictx->user_flushed();
     {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      r = _flush(ictx);
+      r = ictx->flush();
     }
     ictx->perfcounter->inc(l_librbd_flush);
     return r;
   }
 
-  int _flush(ImageCtx *ictx)
-  {
-    assert(ictx->owner_lock.is_locked());
-    CephContext *cct = ictx->cct;
-    int r;
-    // flush any outstanding writes
-    if (ictx->object_cacher) {
-      r = ictx->flush_cache();
-    } else {
-      r = ictx->data_ctx.aio_flush();
-      ictx->flush_async_operations();
-    }
-
-    if (r)
-      lderr(cct) << "_flush " << ictx << " r = " << r << dendl;
-
-    return r;
-  }
-
   int invalidate_cache(ImageCtx *ictx)
   {
     CephContext *cct = ictx->cct;
@@ -3341,8 +3311,6 @@ reprotect_and_return_err:
       return r;
     }
 
-    ictx->flush_async_operations();
-
     RWLock::RLocker owner_locker(ictx->owner_lock);
     RWLock::WLocker md_locker(ictx->md_lock);
     r = ictx->invalidate_cache();
index a633c9d2002692211e2ec1e3aaf9145364dce9b4..b0b882b4863fa523720c672b7aa6a1603b16d5ce 100644 (file)
@@ -207,7 +207,6 @@ namespace librbd {
                char *buf, bufferlist *pbl, AioCompletion *c, int op_flags);
   void aio_flush(ImageCtx *ictx, AioCompletion *c);
   int flush(ImageCtx *ictx);
-  int _flush(ImageCtx *ictx);
   int invalidate_cache(ImageCtx *ictx);
 
   ssize_t handle_sparse_read(CephContext *cct,