From 61a69c80066ec2734be005b7cfed640695b86b9b Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 23 Jul 2014 17:32:57 +0100 Subject: [PATCH] osdc: Use a finisher from Journaler Completions from I/O operations (i.e. the objecter) hop through the finisher twice, because of the three layers of locking (MDS::mds_lock -> Journaler::lock -> Objecter osd session lock) Because on the way "right" we take the locks in that order, to avoid deadlock we can't take the locks in the opposite order on the way "left", hence the finishers. Signed-off-by: John Spray --- src/osdc/Journaler.cc | 43 +++++++++++++++++++++++-------------------- src/osdc/Journaler.h | 17 +++++++++++------ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index b4b51b19a7fd3..4b5824f96b6c5 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -19,6 +19,7 @@ #include "osdc/Journaler.h" #include "common/errno.h" #include "include/assert.h" +#include "common/Finisher.h" #define dout_subsys ceph_subsys_journaler #undef dout_prefix @@ -165,7 +166,7 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl) object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); - objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish); + objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher)); } void Journaler::reread_head(Context *onfinish) @@ -269,28 +270,27 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl; C_ProbeEnd *fin = new C_ProbeEnd(this); state = STATE_PROBING; - probe(fin, &fin->end); + _probe(fin, &fin->end); } -void Journaler::probe(Context *finish, uint64_t *end) +void Journaler::_probe(Context *finish, uint64_t *end) { + assert(lock.is_locked_by_me()); ldout(cct, 1) << "probing for end of the log" << dendl; assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log filer.probe(ino, &layout, CEPH_NOSNAP, - write_pos, end, 0, true, 0, finish); + write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher)); } -void Journaler::reprobe(Context *finish) +void Journaler::_reprobe(Context *finish) { - Mutex::Locker l(lock); - ldout(cct, 10) << "reprobe" << dendl; assert(state == STATE_ACTIVE); state = STATE_REPROBING; C_ReProbe *fin = new C_ReProbe(this, finish); - probe(fin, &fin->end); + _probe(fin, &fin->end); } @@ -365,7 +365,7 @@ void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish) assert(lock.is_locked_by_me()); assert(!r); //if we get an error, we're boned - reprobe(onfinish); + _reprobe(onfinish); } @@ -414,7 +414,7 @@ void Journaler::_write_head(Context *oncommit) object_locator_t oloc(pg_pool); objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, NULL, - new C_WriteHead(this, last_written, oncommit)); + new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher)); } void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit) @@ -606,7 +606,7 @@ void Journaler::_do_flush(unsigned amount) filer.write(ino, &layout, snapc, flush_pos, len, write_bl, ceph_clock_now(cct), 0, - NULL, onsafe); + NULL, new C_OnFinisher(onsafe, finisher)); flush_pos += len; assert(write_buf.length() == write_pos - flush_pos); @@ -634,15 +634,15 @@ void Journaler::_wait_for_flush(Context *onsafe) ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; if (onsafe) { - onsafe->complete(0); - onsafe = 0; + finisher->queue(onsafe, 0); } return; } // queue waiter - if (onsafe) - waitfor_safe[write_pos].push_back(onsafe); + if (onsafe) { + waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher)); + } } void Journaler::flush(Context *onsafe) @@ -728,12 +728,15 @@ void Journaler::_issue_prezero() ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl; } SnapContext snapc; - Context *c = new C_Journaler_Prezero(this, prezeroing_pos, len); + Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher); filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c); prezeroing_pos += len; } } +// Lock cycle because we get called out of objecter callback (holding +// objecter read lock), but there are also cases where we take the journaler +// lock before calling into objecter to do I/O. void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) { Mutex::Locker l(lock); @@ -900,7 +903,7 @@ void Journaler::_issue_read(uint64_t len) if (l > len) l = len; C_Read *c = new C_Read(this, requested_pos); - filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, c); + filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher)); requested_pos += l; len -= l; } @@ -1016,7 +1019,7 @@ void Journaler::erase(Context *completion) uint64_t first = trimmed_pos / get_layout_period(); uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0, - new C_EraseFinish(this, completion)); + new C_OnFinisher(new C_EraseFinish(this, completion), finisher)); // We will not start the operation to delete the header until _finish_erase has // seen the data deletion succeed: otherwise if there was an error deleting data @@ -1030,7 +1033,7 @@ void Journaler::_finish_erase(int data_result, Context *completion) if (data_result == 0) { // Async delete the journal header filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0, - completion); + new C_OnFinisher(completion, finisher)); } else { lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl; completion->complete(data_result); @@ -1129,7 +1132,7 @@ void Journaler::_trim() uint64_t num = (trim_to - trimming_pos) / period; SnapContext snapc; filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, - new C_Trim(this, trim_to)); + new C_OnFinisher(new C_Trim(this, trim_to), finisher)); trimming_pos = trim_to; } diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 76192353aea64..3ee93343dd109 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -59,6 +59,7 @@ class CephContext; class Context; class PerfCounters; +class Finisher; typedef __u8 stream_format_t; @@ -194,12 +195,14 @@ public: return stream_format; } + Header last_committed; + private: // me CephContext *cct; Mutex lock; + Finisher *finisher; Header last_written; - Header last_committed; inodeno_t ino; int64_t pg_pool; bool readonly; @@ -266,8 +269,9 @@ private: void read_head(Context *on_finish, bufferlist *bl); void _finish_read_head(int r, bufferlist& bl); void _finish_reread_head(int r, bufferlist& bl, Context *finish); - void probe(Context *finish, uint64_t *end); + void _probe(Context *finish, uint64_t *end); void _finish_probe_end(int r, uint64_t end); + void _reprobe(Context *onfinish); void _finish_reprobe(int r, uint64_t end, Context *onfinish); void _finish_reread_head_and_probe(int r, Context *onfinish); class C_ReadHead; @@ -364,9 +368,10 @@ private: friend class C_EraseFinish; public: - Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) : - cct(obj->cct), lock("Journaler"), - last_written(mag), last_committed(mag), + Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) : + last_committed(mag), + cct(obj->cct), lock("Journaler"), finisher(f), + last_written(mag), ino(ino_), pg_pool(pool), readonly(true), stream_format(-1), journal_stream(-1), magic(mag), @@ -418,7 +423,6 @@ public: void create(ceph_file_layout *layout, stream_format_t const sf); void recover(Context *onfinish); void reread_head(Context *onfinish); - void reprobe(Context *onfinish); void reread_head_and_probe(Context *onfinish); void write_head(Context *onsave=0); void wait_for_flush(Context *onsafe = 0); @@ -479,6 +483,7 @@ public: // Synchronous getters // =================== + // TODO: need some locks on reads for true safety uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; } ceph_file_layout& get_layout() { return layout; } bool is_active() { return state == STATE_ACTIVE; } -- 2.39.5