From: Yan, Zheng Date: Mon, 7 Dec 2015 08:14:43 +0000 (+0800) Subject: objectcacher: coalesce dirty buffers in one object into one IO X-Git-Tag: v10.0.3~48^2~2^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=60d205f4821ad6eecf9583de309b35ab37e25f3b;p=ceph.git objectcacher: coalesce dirty buffers in one object into one IO Signed-off-by: Yan, Zheng Update for ceph::real_time Signed-off-by: Greg Farnum --- diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 0324fcc00663..662208d025c3 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -582,6 +582,7 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, { perf_start(); finisher.start(); + scattered_write = writeback_handler.can_scattered_write(); } ObjectCacher::~ObjectCacher() @@ -891,6 +892,105 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, read_cond.Signal(); } +void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff, + int64_t *max_amount, int *max_count) +{ + list blist; + + int count = 0; + int64_t total_len = 0; + set::iterator it = dirty_or_tx_bh.find(bh); + assert(it != dirty_or_tx_bh.end()); + for (set::iterator p = it; + p != dirty_or_tx_bh.end(); + ++p) { + BufferHead *obh = *p; + if (obh->ob != bh->ob) + break; + if (obh->is_dirty() && obh->last_write < cutoff) { + blist.push_back(obh); + ++count; + total_len += obh->length(); + if ((max_count && count > *max_count) || + (max_amount && total_len > *max_amount)) + break; + } + } + + while (it != dirty_or_tx_bh.begin()) { + --it; + BufferHead *obh = *it; + if (obh->ob != bh->ob) + break; + if (obh->is_dirty() && obh->last_write < cutoff) { + blist.push_front(obh); + ++count; + total_len += obh->length(); + if ((max_count && count > *max_count) || + (max_amount && total_len > *max_amount)) + break; + } + } + if (max_count) + *max_count -= count; + if (max_amount) + *max_amount -= total_len; + + bh_write_scattered(blist); +} + +void ObjectCacher::bh_write_scattered(list& blist) +{ + assert(lock.is_locked()); + + Object *ob = blist.front()->ob; + ob->get(); + + ceph::real_time last_write; + SnapContext snapc; + vector > ranges; + vector > io_vec; + + ranges.reserve(blist.size()); + io_vec.reserve(blist.size()); + + uint64_t total_len = 0; + for (list::iterator p = blist.begin(); p != blist.end(); ++p) { + BufferHead *bh = *p; + ldout(cct, 7) << "bh_write_scattered " << *bh << dendl; + assert(bh->ob == ob); + assert(bh->bl.length() == bh->length()); + ranges.push_back(pair(bh->start(), bh->length())); + + int n = io_vec.size(); + io_vec.resize(n + 1); + io_vec[n].first = bh->start(); + io_vec[n].second = bh->bl; + + total_len += bh->length(); + if (bh->snapc.seq > snapc.seq) + snapc = bh->snapc; + if (bh->last_write > last_write) + bh->last_write = bh->last_write; + } + + C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges); + + ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(), + io_vec, snapc, last_write, + ob->truncate_size, ob->truncate_seq, + oncommit); + oncommit->tid = tid; + ob->last_write_tid = tid; + for (list::iterator p = blist.begin(); p != blist.end(); ++p) { + BufferHead *bh = *p; + bh->last_write_tid = tid; + mark_tx(bh); + } + + if (perfcounter) + perfcounter->inc(l_objectcacher_data_flushed, total_len); +} void ObjectCacher::bh_write(BufferHead *bh) { @@ -925,20 +1025,27 @@ void ObjectCacher::bh_write(BufferHead *bh) mark_tx(bh); } -void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, - uint64_t length, ceph_tid_t tid, int r) +void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, + vector >& ranges, + ceph_tid_t tid, int r) { assert(lock.is_locked()); ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid - << " " << start << "~" << length << " returned " << r - << dendl; + << " ranges " << ranges << " returned " << r << dendl; if (objects[poolid].count(oid) == 0) { ldout(cct, 7) << "bh_write_commit no object cache" << dendl; - } else { - Object *ob = objects[poolid][oid]; - int was_dirty_or_tx = ob->oset->dirty_or_tx; + return; + } + + Object *ob = objects[poolid][oid]; + int was_dirty_or_tx = ob->oset->dirty_or_tx; + for (vector >::iterator p = ranges.begin(); + p != ranges.end(); + ++p) { + loff_t start = p->first; + uint64_t length = p->second; if (!ob->exists) { ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl; ob->exists = true; @@ -1002,32 +1109,31 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, assert(*bh); ob->try_merge_bh(*bh); } + } - // update last_commit. - assert(ob->last_commit_tid < tid); - ob->last_commit_tid = tid; - - // waiters? - list ls; - if (ob->waitfor_commit.count(tid)) { - ls.splice(ls.begin(), ob->waitfor_commit[tid]); - ob->waitfor_commit.erase(tid); - } + // update last_commit. + assert(ob->last_commit_tid < tid); + ob->last_commit_tid = tid; - // is the entire object set now clean and fully committed? - ObjectSet *oset = ob->oset; - ob->put(); + // waiters? + list ls; + if (ob->waitfor_commit.count(tid)) { + ls.splice(ls.begin(), ob->waitfor_commit[tid]); + ob->waitfor_commit.erase(tid); + } - if (flush_set_callback && - was_dirty_or_tx > 0 && - oset->dirty_or_tx == 0) { - // nothing dirty/tx - flush_set_callback(flush_set_callback_arg, oset); - } + // is the entire object set now clean and fully committed? + ObjectSet *oset = ob->oset; + ob->put(); - if (!ls.empty()) - finish_contexts(cct, ls, r); + if (flush_set_callback && + was_dirty_or_tx > 0 && + oset->dirty_or_tx == 0) { // nothing dirty/tx + flush_set_callback(flush_set_callback_arg, oset); } + + if (!ls.empty()) + finish_contexts(cct, ls, r); } void ObjectCacher::flush(loff_t amount) @@ -1043,16 +1149,20 @@ void ObjectCacher::flush(loff_t amount) * to the other LRU, so that we can call * lru_dirty.lru_get_next_expire() again. */ - loff_t did = 0; - while (amount == 0 || did < amount) { + int64_t left = amount; + while (amount == 0 || left > 0) { BufferHead *bh = static_cast( bh_lru_dirty.lru_get_next_expire()); if (!bh) break; if (bh->last_write > cutoff) break; - did += bh->length(); - bh_write(bh); - } + if (scattered_write) { + bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL); + } else { + left -= bh->length(); + bh_write(bh); + } + } } @@ -1173,6 +1283,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, ex_it->length, soid.snap)) { ldout(cct, 20) << "readx may copy on write" << dendl; bool wait = false; + list blist; for (map::iterator bh_it = o->data.begin(); bh_it != o->data.end(); ++bh_it) { @@ -1180,10 +1291,16 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, if (bh->is_dirty() || bh->is_tx()) { ldout(cct, 10) << "readx flushing " << *bh << dendl; wait = true; - if (bh->is_dirty()) - bh_write(bh); + if (bh->is_dirty()) { + if (scattered_write) + blist.push_back(bh); + else + bh_write(bh); + } } } + if (scattered_write && !blist.empty()) + bh_write_scattered(blist); if (wait) { ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid << " on " << *o << dendl; @@ -1651,9 +1768,14 @@ void ObjectCacher::flusher_entry() while ((bh = static_cast(bh_lru_dirty. lru_get_next_expire())) != 0 && bh->last_write < cutoff && - --max > 0) { + max > 0) { ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl; - bh_write(bh); + if (scattered_write) { + bh_write_adjacencies(bh, cutoff, NULL, &max); + } else { + bh_write(bh); + --max; + } } if (!max) { // back off the lock to avoid starving other threads @@ -1769,6 +1891,7 @@ void ObjectCacher::purge(Object *ob) bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) { assert(lock.is_locked()); + list blist; bool clean = true; ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl; for (map::iterator p = ob->data_lower_bound(offset); @@ -1786,9 +1909,16 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) if (!bh->is_dirty()) { continue; } - bh_write(bh); + + if (scattered_write) + blist.push_back(bh); + else + bh_write(bh); clean = false; } + if (scattered_write && !blist.empty()) + bh_write_scattered(blist); + return clean; } @@ -1825,6 +1955,8 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) C_GatherBuilder gather(cct); set waitfor_commit; + list blist; + Object *last_ob = NULL; set::iterator it, p, q; // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset @@ -1846,8 +1978,20 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) if (bh->ob->oset != oset) break; waitfor_commit.insert(bh->ob); - if (bh->is_dirty()) - bh_write(bh); + if (bh->is_dirty()) { + if (scattered_write) { + if (last_ob != bh->ob) { + if (!blist.empty()) { + bh_write_scattered(blist); + blist.clear(); + } + last_ob = bh->ob; + } + blist.push_back(bh); + } else { + bh_write(bh); + } + } } if (backwards) { @@ -1860,13 +2004,28 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) if (bh->ob->oset != oset) break; waitfor_commit.insert(bh->ob); - if (bh->is_dirty()) - bh_write(bh); + if (bh->is_dirty()) { + if (scattered_write) { + if (last_ob != bh->ob) { + if (!blist.empty()) { + bh_write_scattered(blist); + blist.clear(); + } + last_ob = bh->ob; + } + blist.push_front(bh); + } else { + bh_write(bh); + } + } if (!backwards) break; } } + if (scattered_write && !blist.empty()) + bh_write_scattered(blist); + for (set::iterator i = waitfor_commit.begin(); i != waitfor_commit.end(); ++i) { Object *ob = *i; @@ -1935,6 +2094,8 @@ bool ObjectCacher::flush_all(Context *onfinish) C_GatherBuilder gather(cct); set waitfor_commit; + list blist; + Object *last_ob = NULL; set::iterator next, it; next = it = dirty_or_tx_bh.begin(); while (it != dirty_or_tx_bh.end()) { @@ -1942,12 +2103,27 @@ bool ObjectCacher::flush_all(Context *onfinish) BufferHead *bh = *it; waitfor_commit.insert(bh->ob); - if (bh->is_dirty()) - bh_write(bh); + if (bh->is_dirty()) { + if (scattered_write) { + if (last_ob != bh->ob) { + if (!blist.empty()) { + bh_write_scattered(blist); + blist.clear(); + } + last_ob = bh->ob; + } + blist.push_back(bh); + } else { + bh_write(bh); + } + } it = next; } + if (scattered_write && !blist.empty()) + bh_write_scattered(blist); + for (set::iterator i = waitfor_commit.begin(); i != waitfor_commit.end(); ++i) { diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index d5abb5434065..791412ee7f19 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -395,6 +395,7 @@ class ObjectCacher { // ObjectCacher fields private: WritebackHandler& writeback_handler; + bool scattered_write; string name; Mutex& lock; @@ -521,6 +522,9 @@ class ObjectCacher { // io void bh_read(BufferHead *bh, int op_flags); void bh_write(BufferHead *bh); + void bh_write_scattered(list& blist); + void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff, + int64_t *amount, int *max_count); void trim(); void flush(loff_t amount=0); @@ -552,8 +556,10 @@ class ObjectCacher { loff_t offset, uint64_t length, bufferlist &bl, int r, bool trust_enoent); - void bh_write_commit(int64_t poolid, sobject_t oid, loff_t offset, - uint64_t length, ceph_tid_t t, int r); + void bh_write_commit(int64_t poolid, sobject_t oid, + vector >& ranges, + ceph_tid_t t, int r); + class C_ReadFinish : public Context { ObjectCacher *oc; @@ -592,17 +598,23 @@ class ObjectCacher { ObjectCacher *oc; int64_t poolid; sobject_t oid; - loff_t start; - uint64_t length; + vector > ranges; public: ceph_tid_t tid; C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s, uint64_t l) : - oc(c), poolid(_poolid), oid(o), start(s), length(l), tid(0) {} + oc(c), poolid(_poolid), oid(o), tid(0) { + ranges.push_back(make_pair(s, l)); + } + C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, + vector >& _ranges) : + oc(c), poolid(_poolid), oid(o), tid(0) { + ranges.swap(_ranges); + } void finish(int r) { - oc->bh_write_commit(poolid, oid, start, length, tid, r); + oc->bh_write_commit(poolid, oid, ranges, tid, r); } - }; + }; class C_WaitForWrite : public Context { public: diff --git a/src/osdc/WritebackHandler.h b/src/osdc/WritebackHandler.h index fdafeaad3a3a..f0efd2007841 100644 --- a/src/osdc/WritebackHandler.h +++ b/src/osdc/WritebackHandler.h @@ -39,6 +39,15 @@ class WritebackHandler { virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len, ceph_tid_t journal_tid) {} + virtual bool can_scattered_write() { return false; } + virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc, + vector >& io_vec, + const SnapContext& snapc, ceph::real_time mtime, + uint64_t trunc_size, __u32 trunc_seq, + Context *oncommit) { + return 0; + } + virtual void get_client_lock() {} virtual void put_client_lock() {} };