]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: simplify IO flush handling
authorJason Dillaman <dillaman@redhat.com>
Fri, 31 Jul 2015 02:31:55 +0000 (22:31 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 17 Nov 2015 15:22:24 +0000 (10:22 -0500)
Add a new convenience method to ImageCtx for handling flush
requests and cleanup flush handling with dealing with the cache.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioImageRequest.cc
src/librbd/AsyncOperation.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/internal.cc
src/librbd/internal.h

index f594f61f9cad0b649c297577d61d9eb32ae9c7f8..49632b6d68dbf53595c53b22274432d04d5f2091 100644 (file)
@@ -432,22 +432,10 @@ void AioImageFlush::send_request() {
     }
   }
 
-  // TODO race condition between registering op and submitting to cache
-  //      (might not be flushed -- backport needed)
-  C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp);
-  m_image_ctx.flush_async_operations(flush_ctx);
-
-  m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
   C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
-  if (m_image_ctx.object_cacher != NULL) {
-    m_image_ctx.flush_cache_aio(req_comp);
-  } else {
-    librados::AioCompletion *rados_completion =
-      librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
-    m_image_ctx.data_ctx.aio_flush_async(rados_completion);
-    rados_completion->release();
-  }
+  m_image_ctx.flush(req_comp);
 
+  m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
   m_aio_comp->finish_adding_requests(cct);
   m_aio_comp->put();
 
index 3114d549f77156c245149b1dfe9e4fb774c9f92c..7cfc2d74f7fa60dbed8766e5686e79b53fe36bae 100644 (file)
@@ -3,6 +3,7 @@
 #include "librbd/AsyncOperation.h"
 #include "librbd/ImageCtx.h"
 #include "common/dout.h"
+#include "common/WorkQueue.h"
 #include "include/assert.h"
 
 #define dout_subsys ceph_subsys_rbd
 
 namespace librbd {
 
+namespace {
+
+struct C_CompleteFlushes : public Context {
+  ImageCtx *image_ctx;
+  std::list<Context *> flush_contexts;
+
+  C_CompleteFlushes(ImageCtx *image_ctx, std::list<Context *> &&flush_contexts)
+    : image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) {
+  }
+  virtual void finish(int r) {
+    RWLock::RLocker owner_locker(image_ctx->owner_lock);
+    while (!flush_contexts.empty()) {
+      Context *flush_ctx = flush_contexts.front();
+      flush_contexts.pop_front();
+
+      ldout(image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
+      flush_ctx->complete(0);
+    }
+  }
+};
+
+} // anonymous namespace
+
 void AsyncOperation::start_op(ImageCtx &image_ctx) {
   assert(m_image_ctx == NULL);
   m_image_ctx = &image_ctx;
@@ -39,13 +63,9 @@ void AsyncOperation::finish_op() {
     }
   }
 
-  while (!m_flush_contexts.empty()) {
-    Context *flush_ctx = m_flush_contexts.front();
-    m_flush_contexts.pop_front();
-
-    ldout(m_image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
-    flush_ctx->complete(0);
-  }
+  C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx,
+                                                 std::move(m_flush_contexts));
+  m_image_ctx->op_work_queue->queue(ctx);
 }
 
 void AsyncOperation::add_flush_context(Context *on_finish) {
index a7ce73a168b6ca33f1cda4076a705dcfd312667f..8fd13e4f742319e76cf9d8f35dcefa3c0fcf034e 100644 (file)
@@ -53,6 +53,65 @@ public:
   }
 };
 
+struct C_FlushCache : public Context {
+  ImageCtx *image_ctx;
+  Context *on_safe;
+
+  C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
+    : image_ctx(_image_ctx), on_safe(_on_safe) {
+  }
+  virtual void finish(int r) {
+    // successful cache flush indicates all IO is now safe
+    assert(image_ctx->owner_lock.is_locked());
+    image_ctx->flush_cache(on_safe);
+  }
+};
+
+struct C_InvalidateCache : public Context {
+  ImageCtx *image_ctx;
+  bool purge_on_error;
+  bool reentrant_safe;
+  Context *on_finish;
+
+  C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
+                    bool _reentrant_safe, Context *_on_finish)
+    : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
+      reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
+  }
+  virtual void finish(int r) {
+    assert(image_ctx->cache_lock.is_locked());
+    CephContext *cct = image_ctx->cct;
+
+    if (r == -EBLACKLISTED) {
+      lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
+      image_ctx->object_cacher->purge_set(image_ctx->object_set);
+    } else if (r != 0 && purge_on_error) {
+      lderr(cct) << "invalidate cache encountered error "
+                 << cpp_strerror(r) << " !Purging cache..." << dendl;
+      image_ctx->object_cacher->purge_set(image_ctx->object_set);
+    } else if (r != 0) {
+      lderr(cct) << "flush_cache returned " << r << dendl;
+    }
+
+    loff_t unclean = image_ctx->object_cacher->release_set(
+      image_ctx->object_set);
+    if (unclean == 0) {
+      r = 0;
+    } else {
+      lderr(cct) << "could not release all objects from cache: "
+                 << unclean << " bytes remain" << dendl;
+      r = -EBUSY;
+    }
+
+    if (reentrant_safe) {
+      on_finish->complete(r);
+    } else {
+      image_ctx->op_work_queue->queue(on_finish, r);
+    }
+  }
+
+};
+
 } // anonymous namespace
 
   const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
@@ -663,30 +722,24 @@ public:
     }
   }
 
-  void ImageCtx::flush_cache_aio(Context *onfinish) {
+  int ImageCtx::flush_cache() {
+    C_SaferCond cond_ctx;
+    flush_cache(&cond_ctx);
+
+    ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+    int r = cond_ctx.wait();
+    ldout(cct, 20) << "finished flushing cache" << dendl;
+
+    return r;
+  }
+
+  void ImageCtx::flush_cache(Context *onfinish) {
     assert(owner_lock.is_locked());
     cache_lock.Lock();
     object_cacher->flush_set(object_set, onfinish);
     cache_lock.Unlock();
   }
 
-  int ImageCtx::flush_cache() {
-    int r = 0;
-    Mutex mylock("librbd::ImageCtx::flush_cache");
-    Cond cond;
-    bool done;
-    Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
-    flush_cache_aio(onfinish);
-    mylock.Lock();
-    while (!done) {
-      ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
-      cond.Wait(mylock);
-    }
-    mylock.Unlock();
-    ldout(cct, 20) << "finished flushing cache" << dendl;
-    return r;
-  }
-
   int ImageCtx::shutdown_cache() {
     flush_async_operations();
 
@@ -697,21 +750,19 @@ public:
   }
 
   int ImageCtx::invalidate_cache(bool purge_on_error) {
-    int result;
-    C_SaferCond ctx;
-    invalidate_cache(&ctx);
-    result = ctx.wait();
-    ldout(cct, 20) << "finished invalidating cache" << dendl;
-
-    if (result && purge_on_error) {
-      cache_lock.Lock();
-      if (object_cacher != NULL) {
-       lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
-       object_cacher->purge_set(object_set);
-      }
-      cache_lock.Unlock();
+    flush_async_operations();
+    if (object_cacher == NULL) {
+      return 0;
     }
 
+    cache_lock.Lock();
+    object_cacher->release_set(object_set);
+    cache_lock.Unlock();
+
+    C_SaferCond ctx;
+    flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
+
+    int result = ctx.wait();
     return result;
   }
 
@@ -725,29 +776,7 @@ public:
     object_cacher->release_set(object_set);
     cache_lock.Unlock();
 
-    flush_cache_aio(new FunctionContext(boost::bind(
-      &ImageCtx::invalidate_cache_completion, this, _1, on_finish)));
-  }
-
-  void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) {
-    assert(cache_lock.is_locked());
-    if (r == -EBLACKLISTED) {
-      lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
-      object_cacher->purge_set(object_set);
-    } else if (r != 0) {
-      lderr(cct) << "flush_cache returned " << r << dendl;
-    }
-
-    loff_t unclean = object_cacher->release_set(object_set);
-    if (unclean == 0) {
-      r = 0;
-    } else {
-      lderr(cct) << "could not release all objects from cache: "
-                 << unclean << " bytes remain" << dendl;
-      r = -EBUSY;
-    }
-
-    op_work_queue->queue(on_finish, r);
+    flush_cache(new C_InvalidateCache(this, false, false, on_finish));
   }
 
   void ImageCtx::clear_nonexistence_cache() {
@@ -800,15 +829,31 @@ public:
   }
 
   void ImageCtx::flush_async_operations(Context *on_finish) {
-    Mutex::Locker l(async_ops_lock);
-    if (async_ops.empty()) {
-      on_finish->complete(0);
-      return;
+    {
+      Mutex::Locker l(async_ops_lock);
+      if (!async_ops.empty()) {
+        ldout(cct, 20) << "flush async operations: " << on_finish << " "
+                       << "count=" << async_ops.size() << dendl;
+        async_ops.front()->add_flush_context(on_finish);
+        return;
+      }
     }
+    on_finish->complete(0);
+  }
 
-    ldout(cct, 20) << "flush async operations: " << on_finish << " "
-                   << "count=" << async_ops.size() << dendl;
-    async_ops.front()->add_flush_context(on_finish);
+  int ImageCtx::flush() {
+    C_SaferCond cond_ctx;
+    flush(&cond_ctx);
+    return cond_ctx.wait();
+  }
+
+  void ImageCtx::flush(Context *on_safe) {
+    assert(owner_lock.is_locked());
+    if (object_cacher != NULL) {
+      // flush cache after completing all in-flight AIO ops
+      on_safe = new C_FlushCache(this, on_safe);
+    }
+    flush_async_operations(on_safe);
   }
 
   void ImageCtx::cancel_async_requests() {
index df3784a770cd1e60c82a297a2f122ba86af5c281..f61929c1a6dff36c7f80ecd702743890a2f6bb37 100644 (file)
@@ -229,12 +229,11 @@ namespace librbd {
                        uint64_t off, Context *onfinish, int fadvise_flags,
                         uint64_t journal_tid);
     void user_flushed();
-    void flush_cache_aio(Context *onfinish);
     int flush_cache();
+    void flush_cache(Context *onfinish);
     int shutdown_cache();
     int invalidate_cache(bool purge_on_error=false);
     void invalidate_cache(Context *on_finish);
-    void invalidate_cache_completion(int r, Context *on_finish);
     void clear_nonexistence_cache();
     int register_watch();
     void unregister_watch();
@@ -244,6 +243,9 @@ namespace librbd {
     void flush_async_operations();
     void flush_async_operations(Context *on_finish);
 
+    int flush();
+    void flush(Context *on_safe);
+
     void cancel_async_requests();
     void apply_metadata_confs();
 
index bedc5a50c6537e80af4c363d32bcd57a2a4b747b..014ce9407217ee5c7cb97df5efc69a02612e9e09 100644 (file)
@@ -403,7 +403,7 @@ int ImageWatcher::release_lock()
     notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);
 
     RWLock::WLocker md_locker(m_image_ctx.md_lock);
-    r = librbd::_flush(&m_image_ctx);
+    r = m_image_ctx.flush();
     if (r < 0) {
       lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
       goto err_cancel_unlock;
index d4e1e6ed8169b5208d5b31cd965a68fb723e1822..25d99f43f31884da249904cb67b3673f9f86f5b3 100644 (file)
@@ -775,7 +775,7 @@ int invoke_async_request(ImageCtx *ictx, const std::string& request_type,
     }
 
     RWLock::WLocker md_locker(ictx->md_lock);
-    r = _flush(ictx);
+    r = ictx->flush();
     if (r < 0) {
       return r;
     }
@@ -1843,7 +1843,7 @@ reprotect_and_return_err:
 
     RWLock::RLocker owner_locker(ictx->owner_lock);
     RWLock::WLocker md_locker(ictx->md_lock);
-    r = _flush(ictx);
+    r = ictx->flush();
     if (r < 0) {
       return r;
     }
@@ -2779,7 +2779,7 @@ reprotect_and_return_err:
     }
 
     if (new_snap) {
-      _flush(ictx);
+      ictx->flush();
     }
     return 0;
   }
@@ -2834,7 +2834,6 @@ reprotect_and_return_err:
       // writes might create new snapshots. Rolling back will replace
       // the current version, so we have to invalidate that too.
       RWLock::WLocker md_locker(ictx->md_lock);
-      ictx->flush_async_operations();
       r = ictx->invalidate_cache();
       if (r < 0) {
        return r;
@@ -3079,14 +3078,9 @@ reprotect_and_return_err:
       }
 
       ictx->cancel_async_requests();
-      ictx->flush_async_operations();
-
-      if (ictx->object_cacher) {
+      {
         RWLock::RLocker owner_locker(ictx->owner_lock);
-        r = _flush(ictx);
-        if (r < 0) {
-          return r;
-        }
+        r = ictx->flush();
       }
 
       {
@@ -3191,18 +3185,14 @@ reprotect_and_return_err:
     ictx->flush_async_operations();
     ictx->readahead.wait_for_pending();
 
-    int flush_r;
     if (ictx->object_cacher) {
-      flush_r = ictx->shutdown_cache(); // implicitly flushes
-    } else {
-      RWLock::RLocker owner_locker(ictx->owner_lock);
-      flush_r = _flush(ictx);
-    }
-    if (flush_r< 0) {
-      lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
-                       << dendl;
-      if (r == 0) {
-        r = flush_r;
+      int flush_r = ictx->shutdown_cache(); // implicitly flushes
+      if (flush_r < 0) {
+        lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
+                         << dendl;
+        if (r == 0) {
+          r = flush_r;
+        }
       }
     }
 
@@ -3628,7 +3618,7 @@ reprotect_and_return_err:
     // ensure previous writes are visible to listsnaps
     {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      _flush(ictx);
+      ictx->flush();
     }
 
     int r = ictx_check(ictx);
@@ -3699,31 +3689,12 @@ reprotect_and_return_err:
     ictx->user_flushed();
     {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      r = _flush(ictx);
+      r = ictx->flush();
     }
     ictx->perfcounter->inc(l_librbd_flush);
     return r;
   }
 
-  int _flush(ImageCtx *ictx)
-  {
-    assert(ictx->owner_lock.is_locked());
-    CephContext *cct = ictx->cct;
-    int r;
-    // flush any outstanding writes
-    if (ictx->object_cacher) {
-      r = ictx->flush_cache();
-    } else {
-      r = ictx->data_ctx.aio_flush();
-      ictx->flush_async_operations();
-    }
-
-    if (r)
-      lderr(cct) << "_flush " << ictx << " r = " << r << dendl;
-
-    return r;
-  }
-
   int invalidate_cache(ImageCtx *ictx)
   {
     CephContext *cct = ictx->cct;
@@ -3734,8 +3705,6 @@ reprotect_and_return_err:
       return r;
     }
 
-    ictx->flush_async_operations();
-
     RWLock::RLocker owner_locker(ictx->owner_lock);
     RWLock::WLocker md_locker(ictx->md_lock);
     r = ictx->invalidate_cache();
index e94cc2b0accb80b050c34e70df2c4ae9f252050b..3de90a66fd03ac3c36a89ff002c1c3e760fd3e70 100644 (file)
@@ -212,7 +212,6 @@ namespace librbd {
                                ProgressContext &prog_ctx);
 
   int flush(ImageCtx *ictx);
-  int _flush(ImageCtx *ictx);
   int invalidate_cache(ImageCtx *ictx);
   int metadata_list(ImageCtx *ictx, const string &last, uint64_t max, map<string, bufferlist> *pairs);
   int metadata_get(ImageCtx *ictx, const std::string &key, std::string *value);