]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ObjectCacher: optionally make writex always non-blocking
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 13 Mar 2013 16:37:21 +0000 (09:37 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 28 Mar 2013 17:46:58 +0000 (10:46 -0700)
Add a callback argument to writex, and a finisher to run the
callbacks. Move the check for dirty+tx > max_dirty into a helper that
can be called from a wrapper around the callbacks from writex, or from
the current place in _wait_for_write().

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/client/Client.cc
src/librbd/ImageCtx.cc
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/test/osdc/object_cacher_stress.cc

index d1f8564aa8f6e5864b19b38f9068e21ad561e85f..1f5302ca580bfb810605db460888bf50350b2519 100644 (file)
@@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc)
                                  cct->_conf->client_oc_max_objects,
                                  cct->_conf->client_oc_max_dirty,
                                  cct->_conf->client_oc_target_dirty,
-                                 cct->_conf->client_oc_max_dirty_age);
+                                 cct->_conf->client_oc_max_dirty_age,
+                                 true);
   filer = new Filer(objecter);
 }
 
index add9a98ac4310ff29cd31928a42eb5b67bd646da..0423190fd6fef5be2f297d83856a18ead4897d48 100644 (file)
@@ -89,7 +89,8 @@ namespace librbd {
                                       10,  /* reset this in init */
                                       init_max_dirty,
                                       cct->_conf->rbd_cache_target_dirty,
-                                      cct->_conf->rbd_cache_max_dirty_age);
+                                      cct->_conf->rbd_cache_max_dirty_age,
+                                      true);
       object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
       object_set->return_enoent = true;
       object_cacher->start();
@@ -483,7 +484,7 @@ namespace librbd {
     wr->extents.push_back(extent);
     {
       Mutex::Locker l(cache_lock);
-      object_cacher->writex(wr, object_set, cache_lock);
+      object_cacher->writex(wr, object_set, cache_lock, NULL);
     }
   }
 
index 84079438f0fcc4aa09ca3f0d1ff431bb80362eae..3c4bc3c66f6fcc69dcbd6d1fe08b20f426b5c30c 100644 (file)
@@ -492,22 +492,26 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb,
                           flush_set_callback_t flush_callback,
                           void *flush_callback_arg,
                           uint64_t max_bytes, uint64_t max_objects,
-                          uint64_t max_dirty, uint64_t target_dirty, double max_dirty_age)
+                          uint64_t max_dirty, uint64_t target_dirty,
+                          double max_dirty_age, bool block_writes_upfront)
   : perfcounter(NULL),
     cct(cct_), writeback_handler(wb), name(name), lock(l),
     max_dirty(max_dirty), target_dirty(target_dirty),
     max_size(max_bytes), max_objects(max_objects),
+    block_writes_upfront(block_writes_upfront),
     flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
-    flusher_stop(false), flusher_thread(this),
+    flusher_stop(false), flusher_thread(this), finisher(cct),
     stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0),
     stat_error(0), stat_dirty_waiting(0)
 {
   this->max_dirty_age.set_from_double(max_dirty_age);
   perf_start();
+  finisher.start();
 }
 
 ObjectCacher::~ObjectCacher()
 {
+  finisher.stop();
   perf_stop();
   // we should be empty.
   for (vector<hash_map<sobject_t, Object *> >::iterator i = objects.begin();
@@ -1206,7 +1210,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
 }
 
 
-int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
+                        Context *onfreespace)
 {
   assert(lock.is_locked());
   utime_t now = ceph_clock_now(cct);
@@ -1273,53 +1278,83 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock)
     }
   }
 
-  int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock);
-
+  int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace);
   delete wr;
 
   //verify_stats();
   trim();
   return r;
 }
 
-// blocking wait for write.
-int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock)
+void ObjectCacher::C_WaitForWrite::finish(int r)
+{
+  Mutex::Locker l(m_oc->lock);
+  m_oc->maybe_wait_for_writeback(m_len);
+  m_onfinish->complete(r);
+}
+
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
 {
   assert(lock.is_locked());
-  int blocked = 0;
   utime_t start = ceph_clock_now(cct);
+  int blocked = 0;
+  // wait for writeback?
+  //  - wait for dirty and tx bytes (relative to the max_dirty threshold)
+  //  - do not wait for bytes other waiters are waiting on.  this means that
+  //    threads do not wait for each other.  this effectively allows the cache
+  //    size to balloon proportional to the data that is in flight.
+  while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
+    ldout(cct, 10) << __func__ << " waiting for dirty|tx "
+                  << (get_stat_dirty() + get_stat_tx()) << " >= max "
+                  << max_dirty << " + dirty_waiting "
+                  << get_stat_dirty_waiting() << dendl;
+    flusher_cond.Signal();
+    stat_dirty_waiting += len;
+    stat_cond.Wait(lock);
+    stat_dirty_waiting -= len;
+    ++blocked;
+    ldout(cct, 10) << __func__ << " woke up" << dendl;
+  }
+  if (blocked && perfcounter) {
+    perfcounter->inc(l_objectcacher_write_ops_blocked);
+    perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
+    utime_t blocked = ceph_clock_now(cct) - start;
+    perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
+  }
+}
+
+// blocking wait for write.
+int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace)
+{
+  assert(lock.is_locked());
   int ret = 0;
 
   if (max_dirty > 0) {
-    // wait for writeback?
-    //  - wait for dirty and tx bytes (relative to the max_dirty threshold)
-    //  - do not wait for bytes other waiters are waiting on.  this means that
-    //    threads do not wait for each other.  this effectively allows the cache size
-    //    to balloon proportional to the data that is in flight.
-    while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
-      ldout(cct, 10) << "wait_for_write waiting on " << len << ", dirty|tx " 
-                    << (get_stat_dirty() + get_stat_tx()) 
-                    << " >= max " << max_dirty << " + dirty_waiting " << get_stat_dirty_waiting()
-                    << dendl;
-      flusher_cond.Signal();
-      stat_dirty_waiting += len;
-      stat_cond.Wait(lock);
-      stat_dirty_waiting -= len;
-      blocked++;
-      ldout(cct, 10) << "wait_for_write woke up" << dendl;
+    if (block_writes_upfront) {
+      maybe_wait_for_writeback(len);
+      if (onfreespace)
+       onfreespace->complete(0);
+    } else {
+      assert(onfreespace);
+      finisher.queue(new C_WaitForWrite(this, len, onfreespace));
     }
   } else {
     // write-thru!  flush what we just wrote.
     Cond cond;
     bool done;
-    C_Cond *fin = new C_Cond(&cond, &done, &ret);
+    Context *fin = block_writes_upfront ?
+      new C_Cond(&cond, &done, &ret) : onfreespace;
+    assert(fin);
     bool flushed = flush_set(oset, wr->extents, fin);
     assert(!flushed);   // we just dirtied it, and didn't drop our lock!
     ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len << " bytes" << dendl;
-    while (!done)
-      cond.Wait(lock);
-    ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+    if (block_writes_upfront) {
+      while (!done)
+       cond.Wait(lock);
+      ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+      if (onfreespace)
+       onfreespace->complete(ret);
+    }
   }
 
   // start writeback anyway?
@@ -1328,12 +1363,6 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, M
                   << target_dirty << ", nudging flusher" << dendl;
     flusher_cond.Signal();
   }
-  if (blocked && perfcounter) {
-    perfcounter->inc(l_objectcacher_write_ops_blocked);
-    perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
-    utime_t blocked = ceph_clock_now(cct) - start;
-    perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
-  }
   return ret;
 }
 
index 401045238b941a1be962172ccb81199b96f0ea4f..681b02406fa3601f6f4d25b093f8568d7a95fa09 100644 (file)
@@ -9,6 +9,7 @@
 #include "include/xlist.h"
 
 #include "common/Cond.h"
+#include "common/Finisher.h"
 #include "common/Thread.h"
 
 #include "Objecter.h"
@@ -327,6 +328,7 @@ class ObjectCacher {
   
   int64_t max_dirty, target_dirty, max_size, max_objects;
   utime_t max_dirty_age;
+  bool block_writes_upfront;
 
   flush_set_callback_t flush_set_callback;
   void *flush_set_callback_arg;
@@ -349,7 +351,8 @@ class ObjectCacher {
       return 0;
     }
   } flusher_thread;
-  
+
+  Finisher finisher;
 
   // objects
   Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
@@ -495,6 +498,17 @@ class ObjectCacher {
     }
   };
 
+  class C_WaitForWrite : public Context {
+  public:
+    C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
+      m_oc(oc), m_len(len), m_onfinish(onfinish) {}
+    void finish(int r);
+  private:
+    ObjectCacher *m_oc;
+    uint64_t m_len;
+    Context *m_onfinish;
+  };
+
   void perf_start();
   void perf_stop();
 
@@ -504,7 +518,8 @@ class ObjectCacher {
               flush_set_callback_t flush_callback,
               void *flush_callback_arg,
               uint64_t max_bytes, uint64_t max_objects,
-              uint64_t max_dirty, uint64_t target_dirty, double max_age);
+              uint64_t max_dirty, uint64_t target_dirty, double max_age,
+              bool block_writes_upfront);
   ~ObjectCacher();
 
   void start() {
@@ -549,13 +564,16 @@ class ObjectCacher {
    * the return value is total bytes read
    */
   int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
-  int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock);
+  int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
+            Context *onfreespace);
   bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
 
 private:
   // write blocking
-  int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock);
-  
+  int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock,
+                     Context *onfreespace);
+  void maybe_wait_for_writeback(uint64_t len);
+
 public:
   bool set_is_cached(ObjectSet *oset);
   bool set_is_dirty_or_committing(ObjectSet *oset);
@@ -624,7 +642,7 @@ public:
                 Mutex& wait_on_lock) {
     OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
     Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents);
-    return writex(wr, oset, wait_on_lock);
+    return writex(wr, oset, wait_on_lock, NULL);
   }
 
   bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
index 230ebf244aea96992e12664f87179130ffd5f62f..e2c58e8da9ebd38e632db93e113d1a5ba1dc20e5 100644 (file)
@@ -61,7 +61,8 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
                   g_conf->client_oc_max_objects,
                   g_conf->client_oc_max_dirty,
                   g_conf->client_oc_target_dirty,
-                  g_conf->client_oc_max_dirty_age);
+                  g_conf->client_oc_max_dirty_age,
+                  true);
   obc.start();
 
   atomic_t outstanding_reads;
@@ -110,7 +111,7 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
       ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
       wr->extents.push_back(op->extent);
       lock.Lock();
-      obc.writex(wr, &object_set, lock);
+      obc.writex(wr, &object_set, lock, NULL);
       lock.Unlock();
     }
   }