From c21250406eced8e5c467f492a2148c57978634f4 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Wed, 13 Mar 2013 09:37:21 -0700 Subject: [PATCH] ObjectCacher: optionally make writex always non-blocking Add a callback argument to writex, and a finisher to run the callbacks. Move the check for dirty+tx > max_dirty into a helper that can be called from a wrapper around the callbacks from writex, or from the current place in _wait_for_write(). Signed-off-by: Josh Durgin --- src/client/Client.cc | 3 +- src/librbd/ImageCtx.cc | 5 +- src/osdc/ObjectCacher.cc | 99 +++++++++++++++++---------- src/osdc/ObjectCacher.h | 30 ++++++-- src/test/osdc/object_cacher_stress.cc | 5 +- 5 files changed, 96 insertions(+), 46 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index d1f8564aa8f6e..1f5302ca580bf 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc) cct->_conf->client_oc_max_objects, cct->_conf->client_oc_max_dirty, cct->_conf->client_oc_target_dirty, - cct->_conf->client_oc_max_dirty_age); + cct->_conf->client_oc_max_dirty_age, + true); filer = new Filer(objecter); } diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index add9a98ac4310..0423190fd6fef 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -89,7 +89,8 @@ namespace librbd { 10, /* reset this in init */ init_max_dirty, cct->_conf->rbd_cache_target_dirty, - cct->_conf->rbd_cache_max_dirty_age); + cct->_conf->rbd_cache_max_dirty_age, + true); object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0); object_set->return_enoent = true; object_cacher->start(); @@ -483,7 +484,7 @@ namespace librbd { wr->extents.push_back(extent); { Mutex::Locker l(cache_lock); - object_cacher->writex(wr, object_set, cache_lock); + object_cacher->writex(wr, object_set, cache_lock, NULL); } } diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 84079438f0fcc..3c4bc3c66f6fc 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -492,22 +492,26 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, flush_set_callback_t flush_callback, void *flush_callback_arg, uint64_t max_bytes, uint64_t max_objects, - uint64_t max_dirty, uint64_t target_dirty, double max_dirty_age) + uint64_t max_dirty, uint64_t target_dirty, + double max_dirty_age, bool block_writes_upfront) : perfcounter(NULL), cct(cct_), writeback_handler(wb), name(name), lock(l), max_dirty(max_dirty), target_dirty(target_dirty), max_size(max_bytes), max_objects(max_objects), + block_writes_upfront(block_writes_upfront), flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg), - flusher_stop(false), flusher_thread(this), + flusher_stop(false), flusher_thread(this), finisher(cct), stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), stat_error(0), stat_dirty_waiting(0) { this->max_dirty_age.set_from_double(max_dirty_age); perf_start(); + finisher.start(); } ObjectCacher::~ObjectCacher() { + finisher.stop(); perf_stop(); // we should be empty. for (vector >::iterator i = objects.begin(); @@ -1206,7 +1210,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, } -int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock) +int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock, + Context *onfreespace) { assert(lock.is_locked()); utime_t now = ceph_clock_now(cct); @@ -1273,53 +1278,83 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock) } } - int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock); - + int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace); delete wr; //verify_stats(); trim(); return r; } - -// blocking wait for write. -int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock) +void ObjectCacher::C_WaitForWrite::finish(int r) +{ + Mutex::Locker l(m_oc->lock); + m_oc->maybe_wait_for_writeback(m_len); + m_onfinish->complete(r); +} + +void ObjectCacher::maybe_wait_for_writeback(uint64_t len) { assert(lock.is_locked()); - int blocked = 0; utime_t start = ceph_clock_now(cct); + int blocked = 0; + // wait for writeback? + // - wait for dirty and tx bytes (relative to the max_dirty threshold) + // - do not wait for bytes other waiters are waiting on. this means that + // threads do not wait for each other. this effectively allows the cache + // size to balloon proportional to the data that is in flight. + while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) { + ldout(cct, 10) << __func__ << " waiting for dirty|tx " + << (get_stat_dirty() + get_stat_tx()) << " >= max " + << max_dirty << " + dirty_waiting " + << get_stat_dirty_waiting() << dendl; + flusher_cond.Signal(); + stat_dirty_waiting += len; + stat_cond.Wait(lock); + stat_dirty_waiting -= len; + ++blocked; + ldout(cct, 10) << __func__ << " woke up" << dendl; + } + if (blocked && perfcounter) { + perfcounter->inc(l_objectcacher_write_ops_blocked); + perfcounter->inc(l_objectcacher_write_bytes_blocked, len); + utime_t blocked = ceph_clock_now(cct) - start; + perfcounter->tinc(l_objectcacher_write_time_blocked, blocked); + } +} + +// blocking wait for write. +int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace) +{ + assert(lock.is_locked()); int ret = 0; if (max_dirty > 0) { - // wait for writeback? - // - wait for dirty and tx bytes (relative to the max_dirty threshold) - // - do not wait for bytes other waiters are waiting on. this means that - // threads do not wait for each other. this effectively allows the cache size - // to balloon proportional to the data that is in flight. - while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) { - ldout(cct, 10) << "wait_for_write waiting on " << len << ", dirty|tx " - << (get_stat_dirty() + get_stat_tx()) - << " >= max " << max_dirty << " + dirty_waiting " << get_stat_dirty_waiting() - << dendl; - flusher_cond.Signal(); - stat_dirty_waiting += len; - stat_cond.Wait(lock); - stat_dirty_waiting -= len; - blocked++; - ldout(cct, 10) << "wait_for_write woke up" << dendl; + if (block_writes_upfront) { + maybe_wait_for_writeback(len); + if (onfreespace) + onfreespace->complete(0); + } else { + assert(onfreespace); + finisher.queue(new C_WaitForWrite(this, len, onfreespace)); } } else { // write-thru! flush what we just wrote. Cond cond; bool done; - C_Cond *fin = new C_Cond(&cond, &done, &ret); + Context *fin = block_writes_upfront ? + new C_Cond(&cond, &done, &ret) : onfreespace; + assert(fin); bool flushed = flush_set(oset, wr->extents, fin); assert(!flushed); // we just dirtied it, and didn't drop our lock! ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len << " bytes" << dendl; - while (!done) - cond.Wait(lock); - ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl; + if (block_writes_upfront) { + while (!done) + cond.Wait(lock); + ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl; + if (onfreespace) + onfreespace->complete(ret); + } } // start writeback anyway? @@ -1328,12 +1363,6 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, M << target_dirty << ", nudging flusher" << dendl; flusher_cond.Signal(); } - if (blocked && perfcounter) { - perfcounter->inc(l_objectcacher_write_ops_blocked); - perfcounter->inc(l_objectcacher_write_bytes_blocked, len); - utime_t blocked = ceph_clock_now(cct) - start; - perfcounter->tinc(l_objectcacher_write_time_blocked, blocked); - } return ret; } diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 401045238b941..681b02406fa36 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -9,6 +9,7 @@ #include "include/xlist.h" #include "common/Cond.h" +#include "common/Finisher.h" #include "common/Thread.h" #include "Objecter.h" @@ -327,6 +328,7 @@ class ObjectCacher { int64_t max_dirty, target_dirty, max_size, max_objects; utime_t max_dirty_age; + bool block_writes_upfront; flush_set_callback_t flush_set_callback; void *flush_set_callback_arg; @@ -349,7 +351,8 @@ class ObjectCacher { return 0; } } flusher_thread; - + + Finisher finisher; // objects Object *get_object_maybe(sobject_t oid, object_locator_t &l) { @@ -495,6 +498,17 @@ class ObjectCacher { } }; + class C_WaitForWrite : public Context { + public: + C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) : + m_oc(oc), m_len(len), m_onfinish(onfinish) {} + void finish(int r); + private: + ObjectCacher *m_oc; + uint64_t m_len; + Context *m_onfinish; + }; + void perf_start(); void perf_stop(); @@ -504,7 +518,8 @@ class ObjectCacher { flush_set_callback_t flush_callback, void *flush_callback_arg, uint64_t max_bytes, uint64_t max_objects, - uint64_t max_dirty, uint64_t target_dirty, double max_age); + uint64_t max_dirty, uint64_t target_dirty, double max_age, + bool block_writes_upfront); ~ObjectCacher(); void start() { @@ -549,13 +564,16 @@ class ObjectCacher { * the return value is total bytes read */ int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish); - int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock); + int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock, + Context *onfreespace); bool is_cached(ObjectSet *oset, vector& extents, snapid_t snapid); private: // write blocking - int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock); - + int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, + Context *onfreespace); + void maybe_wait_for_writeback(uint64_t len); + public: bool set_is_cached(ObjectSet *oset); bool set_is_dirty_or_committing(ObjectSet *oset); @@ -624,7 +642,7 @@ public: Mutex& wait_on_lock) { OSDWrite *wr = prepare_write(snapc, bl, mtime, flags); Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents); - return writex(wr, oset, wait_on_lock); + return writex(wr, oset, wait_on_lock, NULL); } bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, diff --git a/src/test/osdc/object_cacher_stress.cc b/src/test/osdc/object_cacher_stress.cc index 230ebf244aea9..e2c58e8da9ebd 100644 --- a/src/test/osdc/object_cacher_stress.cc +++ b/src/test/osdc/object_cacher_stress.cc @@ -61,7 +61,8 @@ int stress_test(uint64_t num_ops, uint64_t num_objs, g_conf->client_oc_max_objects, g_conf->client_oc_max_dirty, g_conf->client_oc_target_dirty, - g_conf->client_oc_max_dirty_age); + g_conf->client_oc_max_dirty_age, + true); obc.start(); atomic_t outstanding_reads; @@ -110,7 +111,7 @@ int stress_test(uint64_t num_ops, uint64_t num_objs, ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0); wr->extents.push_back(op->extent); lock.Lock(); - obc.writex(wr, &object_set, lock); + obc.writex(wr, &object_set, lock, NULL); lock.Unlock(); } } -- 2.39.5