From: Jason Dillaman Date: Wed, 15 Jul 2015 16:20:42 +0000 (-0400) Subject: osdc: track journal commit tid within ObjectCacher for writes X-Git-Tag: v10.0.1~52^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b10398b08f96e3ab8dc2f562393f773b621cd262;p=ceph.git osdc: track journal commit tid within ObjectCacher for writes Writebacks from the journal will provide the associated journal commit tid so that writebacks can be delayed until after the journal entry is safe on disk. This allows asynchronously submitting an event to the journal and submitting the write operation to the ObjectCacher. Signed-off-by: Jason Dillaman --- diff --git a/src/client/ObjecterWriteback.h b/src/client/ObjecterWriteback.h index 8c05e969076d..69a9806df57b 100644 --- a/src/client/ObjecterWriteback.h +++ b/src/client/ObjecterWriteback.h @@ -30,9 +30,10 @@ class ObjecterWriteback : public WritebackHandler { } virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - const bufferlist &bl, utime_t mtime, uint64_t trunc_size, - __u32 trunc_seq, Context *oncommit) { + uint64_t off, uint64_t len, const SnapContext& snapc, + const bufferlist &bl, utime_t mtime, + uint64_t trunc_size, __u32 trunc_seq, + ceph_tid_t journal_tid, Context *oncommit) { return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0, trunc_size, trunc_seq, NULL, new C_OnFinisher(new C_Lock(m_lock, oncommit), diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index eb815283ef02..86cd49e88bab 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -628,7 +628,9 @@ public: int fadvise_flags) { snap_lock.get_read(); ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl, - utime_t(), fadvise_flags); + utime_t(), + fadvise_flags, + 0); snap_lock.put_read(); ObjectExtent extent(o, 0, off, len, 0); extent.oloc.pool = data_ctx.get_id(); diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc index d240c97f2437..b55eb3ffa203 100644 --- a/src/librbd/LibrbdWriteback.cc +++ b/src/librbd/LibrbdWriteback.cc @@ -160,7 +160,7 @@ namespace librbd { const SnapContext& snapc, const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit) + ceph_tid_t journal_tid, Context *oncommit) { assert(m_ictx->owner_lock.is_locked()); uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); @@ -175,6 +175,15 @@ namespace librbd { return ++m_tid; } + + void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off, + uint64_t len, ceph_tid_t journal_tid) { + assert(journal_tid != 0); + + // TODO inform the journal that we no longer expect to receive writebacks + // for the specified extent + } + void LibrbdWriteback::get_client_lock() { m_ictx->owner_lock.get_read(); } diff --git a/src/librbd/LibrbdWriteback.h b/src/librbd/LibrbdWriteback.h index b5578ae62f5c..b7574ae5dd2d 100644 --- a/src/librbd/LibrbdWriteback.h +++ b/src/librbd/LibrbdWriteback.h @@ -34,9 +34,14 @@ namespace librbd { // Note that oloc, trunc_size, and trunc_seq are ignored virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - const bufferlist &bl, utime_t mtime, uint64_t trunc_size, - __u32 trunc_seq, Context *oncommit); + uint64_t off, uint64_t len, + const SnapContext& snapc, const bufferlist &bl, + utime_t mtime, uint64_t trunc_size, + __u32 trunc_seq, ceph_tid_t journal_tid, + Context *oncommit); + + virtual void overwrite_extent(const object_t& oid, uint64_t off, + uint64_t len, ceph_tid_t journal_tid); virtual void get_client_lock(); virtual void put_client_lock(); diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index d3aeb5a372cc..060b7326d24b 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -40,6 +40,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o right->last_read_tid = left->last_read_tid; right->set_state(left->get_state()); right->snapc = left->snapc; + right->set_journal_tid(left->journal_tid); loff_t newleftlen = off - left->start(); right->set_start(off); @@ -88,8 +89,14 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right) assert(oc->lock.is_locked()); assert(left->end() == right->start()); assert(left->get_state() == right->get_state()); + assert(left->can_merge_journal(right)); ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl; + if (left->get_journal_tid() == 0) { + left->set_journal_tid(right->get_journal_tid()); + } + right->set_journal_tid(0); + oc->bh_remove(this, right); oc->bh_stat_sub(left); left->set_length(left->length() + right->length()); @@ -97,8 +104,8 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right) // data left->bl.claim_append(right->bl); - - // version + + // version // note: this is sorta busted, but should only be used for dirty buffers left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid ); left->last_write = MAX( left->last_write, right->last_write ); @@ -134,7 +141,8 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh) if (p != data.begin()) { --p; if (p->second->end() == bh->start() && - p->second->get_state() == bh->get_state()) { + p->second->get_state() == bh->get_state() && + p->second->can_merge_journal(bh)) { merge_left(p->second, bh); bh = p->second; } else { @@ -146,7 +154,8 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh) ++p; if (p != data.end() && p->second->start() == bh->end() && - p->second->get_state() == bh->get_state()) + p->second->get_state() == bh->get_state() && + p->second->can_merge_journal(bh)) merge_left(bh, p->second); } @@ -363,6 +372,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) oc->bh_add(this, final); ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl; } else { + replace_journal_tid(final, wr->journal_tid); oc->bh_stat_sub(final); final->set_length(final->length() + max); oc->bh_stat_add(final); @@ -371,14 +381,14 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) cur += max; continue; } - + ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl; //oc->verify_stats(); if (p->first <= cur) { BufferHead *bh = p->second; ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl; - + if (p->first < cur) { assert(final == 0); if (cur + max >= bh->end()) { @@ -406,24 +416,26 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) oc->mark_dirty(final); --p; // move iterator back to final assert(p->second == final); + replace_journal_tid(bh, 0); merge_left(final, bh); } else { final = bh; } } - + // keep going. loff_t lenfromcur = final->end() - cur; cur += lenfromcur; left -= lenfromcur; ++p; - continue; + continue; } else { // gap! loff_t next = p->first; loff_t glen = MIN(next - cur, max); ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl; if (final) { + replace_journal_tid(final, wr->journal_tid); oc->bh_stat_sub(final); final->set_length(final->length() + glen); oc->bh_stat_add(final); @@ -433,21 +445,34 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) final->set_length( glen ); oc->bh_add(this, final); } - + cur += glen; left -= glen; continue; // more? } } } - - // set versoin + + // set version assert(final); + replace_journal_tid(final, wr->journal_tid); ldout(oc->cct, 10) << "map_write final is " << *final << dendl; return final; } +void ObjectCacher::Object::replace_journal_tid(BufferHead *bh, ceph_tid_t tid) { + ceph_tid_t bh_tid = bh->get_journal_tid(); + + assert(tid == 0 || bh_tid <= tid); + if (bh_tid != 0 && bh_tid != tid) { + // inform journal that it should not expect a writeback from this extent + oc->writeback_handler.overwrite_extent(get_oid(), bh->start(), bh->length(), + bh_tid); + } + bh->set_journal_tid(tid); +} + void ObjectCacher::Object::truncate(loff_t s) { assert(oc->lock.is_locked()); @@ -467,6 +492,7 @@ void ObjectCacher::Object::truncate(loff_t s) // remove bh entirely assert(bh->start() >= s); assert(bh->waitfor_read.empty()); + replace_journal_tid(bh, 0); oc->bh_remove(this, bh); delete bh; } @@ -507,6 +533,7 @@ void ObjectCacher::Object::discard(loff_t off, loff_t len) ++p; ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl; assert(bh->waitfor_read.empty()); + replace_journal_tid(bh, 0); oc->bh_remove(this, bh); delete bh; } @@ -843,10 +870,11 @@ void ObjectCacher::bh_write(BufferHead *bh) bh->ob->get_soid(), bh->start(), bh->length()); // go ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(), bh->ob->get_oloc(), - bh->start(), bh->length(), - bh->snapc, bh->bl, bh->last_write, - bh->ob->truncate_size, bh->ob->truncate_seq, - oncommit); + bh->start(), bh->length(), + bh->snapc, bh->bl, bh->last_write, + bh->ob->truncate_size, + bh->ob->truncate_seq, + bh->journal_tid, oncommit); ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl; // set bh last_write_tid @@ -920,6 +948,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, if (r >= 0) { // ok! mark bh clean and error-free mark_clean(bh); + bh->set_journal_tid(0); if (bh->get_nocache()) bh_lru_rest.lru_bottouch(bh); hit.push_back(bh); @@ -2192,6 +2221,7 @@ void ObjectCacher::bh_add(Object *ob, BufferHead *bh) void ObjectCacher::bh_remove(Object *ob, BufferHead *bh) { assert(lock.is_locked()); + assert(bh->get_journal_tid() == 0); ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl; ob->remove_bh(bh); if (bh->is_dirty()) { diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index c3b3b639c222..87fe351b4698 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -71,13 +71,16 @@ class ObjectCacher { bufferlist bl; utime_t mtime; int fadvise_flags; - OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f) - : snapc(sc), bl(b), mtime(mt), fadvise_flags(f) {} + ceph_tid_t journal_tid; + OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f, + ceph_tid_t _journal_tid) + : snapc(sc), bl(b), mtime(mt), fadvise_flags(f), + journal_tid(_journal_tid) {} }; OSDWrite *prepare_write(const SnapContext& sc, const bufferlist &b, - utime_t mt, int f) { - return new OSDWrite(sc, b, mt, f); + utime_t mt, int f, ceph_tid_t journal_tid) { + return new OSDWrite(sc, b, mt, f, journal_tid); } @@ -111,6 +114,7 @@ class ObjectCacher { ceph_tid_t last_read_tid; // tid of last read op (if any) utime_t last_write; SnapContext snapc; + ceph_tid_t journal_tid; int error; // holds return value for failed reads map< loff_t, list > waitfor_read; @@ -124,6 +128,7 @@ class ObjectCacher { ob(o), last_write_tid(0), last_read_tid(0), + journal_tid(0), error(0) { ex.start = ex.length = 0; } @@ -143,7 +148,15 @@ class ObjectCacher { state = s; } int get_state() const { return state; } - + + inline ceph_tid_t get_journal_tid() const { + return journal_tid; + } + inline void set_journal_tid(ceph_tid_t _journal_tid) { + + journal_tid = _journal_tid; + } + bool is_missing() { return state == STATE_MISSING; } bool is_dirty() { return state == STATE_DIRTY; } bool is_clean() { return state == STATE_CLEAN; } @@ -178,6 +191,11 @@ class ObjectCacher { bool get_nocache() { return nocache; } + + inline bool can_merge_journal(BufferHead *bh) const { + return (get_journal_tid() == 0 || bh->get_journal_tid() == 0 || + get_journal_tid() == bh->get_journal_tid()); + } }; // ******* Object ********* @@ -308,7 +326,8 @@ class ObjectCacher { map& rx, map& errors); BufferHead *map_write(OSDWrite *wr); - + + void replace_journal_tid(BufferHead *bh, ceph_tid_t tid); void truncate(loff_t s); void discard(loff_t off, loff_t len); @@ -687,7 +706,7 @@ public: int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, loff_t offset, uint64_t len, bufferlist& bl, utime_t mtime, int flags) { - OSDWrite *wr = prepare_write(snapc, bl, mtime, flags); + OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0); Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents); return writex(wr, oset, NULL); } @@ -708,6 +727,9 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) << " " << bh.ob << " (" << bh.bl.length() << ")" << " v " << bh.last_write_tid; + if (bh.get_journal_tid() != 0) { + out << " j " << bh.get_journal_tid(); + } if (bh.is_tx()) out << " tx"; if (bh.is_rx()) out << " rx"; if (bh.is_dirty()) out << " dirty"; diff --git a/src/osdc/WritebackHandler.h b/src/osdc/WritebackHandler.h index cbcf20dbcdc7..0e51cb003c21 100644 --- a/src/osdc/WritebackHandler.h +++ b/src/osdc/WritebackHandler.h @@ -32,7 +32,10 @@ class WritebackHandler { uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit) = 0; + ceph_tid_t journal_tid, Context *oncommit) = 0; + + virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len, + ceph_tid_t journal_tid) {} virtual void get_client_lock() {} virtual void put_client_lock() {} diff --git a/src/test/osdc/FakeWriteback.cc b/src/test/osdc/FakeWriteback.cc index 69f32991b89e..32ae4f52a58b 100644 --- a/src/test/osdc/FakeWriteback.cc +++ b/src/test/osdc/FakeWriteback.cc @@ -74,7 +74,7 @@ ceph_tid_t FakeWriteback::write(const object_t& oid, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit) + ceph_tid_t journal_tid, Context *oncommit) { C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL, m_delay_ns); m_finisher->queue(wrapper, 0); diff --git a/src/test/osdc/FakeWriteback.h b/src/test/osdc/FakeWriteback.h index 9b9598ed9480..351d521073a9 100644 --- a/src/test/osdc/FakeWriteback.h +++ b/src/test/osdc/FakeWriteback.h @@ -26,7 +26,8 @@ public: uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, uint64_t trunc_size, - __u32 trunc_seq, Context *oncommit); + __u32 trunc_seq, ceph_tid_t journal_tid, + Context *oncommit); virtual bool may_copy_on_write(const object_t&, uint64_t, uint64_t, snapid_t); private: diff --git a/src/test/osdc/object_cacher_stress.cc b/src/test/osdc/object_cacher_stress.cc index 39aabfdeb225..30438b77f2b0 100644 --- a/src/test/osdc/object_cacher_stress.cc +++ b/src/test/osdc/object_cacher_stress.cc @@ -109,7 +109,8 @@ int stress_test(uint64_t num_ops, uint64_t num_objs, else assert(r == 0); } else { - ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0); + ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0, + random()); wr->extents.push_back(op->extent); lock.Lock(); obc.writex(wr, &object_set, NULL);