From e1e28667d437b6b213f9a72cd4cd3ce0bea2622a Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 7 Aug 2014 15:52:58 +0100 Subject: [PATCH] osdc/Journaler: use finisher for public callbacks This is needed because of occasional lock cycles with external callers doing e.g. write_head. We do get some weird-looking multiply-nested C_OnFinisher(C_OnFinisher(...)) from this approach, where one finisher exists to protect journaler from lock cycles wrt objecter, and the other exists to protect the MDS from lock cycles wrt journaler. Signed-off-by: John Spray --- src/osdc/Journaler.cc | 102 ++++++++++++++++++++++++++++-------------- src/osdc/Journaler.h | 45 +++++++------------ 2 files changed, 85 insertions(+), 62 deletions(-) diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 521c603505d1c..0585e73bb2467 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -128,10 +128,10 @@ public: class Journaler::C_ReProbe : public Context { Journaler *ls; - Context *onfinish; + C_OnFinisher *onfinish; public: uint64_t end; - C_ReProbe(Journaler *l, Context *onfinish_) : + C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) : ls(l), onfinish(onfinish_), end(0) {} void finish(int r) { ls->_finish_reprobe(r, end, onfinish); @@ -157,22 +157,23 @@ void Journaler::recover(Context *onread) ldout(cct, 1) << "read_head" << dendl; state = STATE_READHEAD; C_ReadHead *fin = new C_ReadHead(this); - read_head(fin, &fin->bl); + _read_head(fin, &fin->bl); } -void Journaler::read_head(Context *on_finish, bufferlist *bl) +void Journaler::_read_head(Context *on_finish, bufferlist *bl) { + assert(lock.is_locked_by_me()); assert(state == STATE_READHEAD || state == STATE_REREADHEAD); object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); - objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher)); + objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish)); } void Journaler::reread_head(Context *onfinish) { Mutex::Locker l(lock); - _reread_head(onfinish); + _reread_head(wrap_finisher(onfinish)); } /** @@ -190,7 +191,7 @@ void Journaler::_reread_head(Context *onfinish) state = STATE_REREADHEAD; C_RereadHead *fin = new C_RereadHead(this, onfinish); - read_head(fin, &fin->bl); + _read_head(fin, &fin->bl); } void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) @@ -280,10 +281,10 @@ void Journaler::_probe(Context *finish, uint64_t *end) assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log filer.probe(ino, &layout, CEPH_NOSNAP, - write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher)); + write_pos, end, 0, true, 0, wrap_finisher(finish)); } -void Journaler::_reprobe(Context *finish) +void Journaler::_reprobe(C_OnFinisher *finish) { ldout(cct, 10) << "reprobe" << dendl; assert(state == STATE_ACTIVE); @@ -294,7 +295,7 @@ void Journaler::_reprobe(Context *finish) } -void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) +void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) { Mutex::Locker l(lock); @@ -342,9 +343,9 @@ out: class Journaler::C_RereadHeadProbe : public Context { Journaler *ls; - Context *final_finish; + C_OnFinisher *final_finish; public: - C_RereadHeadProbe(Journaler *l, Context *finish) : + C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) : ls(l), final_finish(finish) {} void finish(int r) { ls->_finish_reread_head_and_probe(r, final_finish); @@ -356,10 +357,10 @@ void Journaler::reread_head_and_probe(Context *onfinish) Mutex::Locker l(lock); assert(state == STATE_ACTIVE); - _reread_head(new C_RereadHeadProbe(this, onfinish)); + _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); } -void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish) +void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) { // Expect to be called back from finish_reread_head, which already takes lock assert(lock.is_locked_by_me()); @@ -375,8 +376,8 @@ class Journaler::C_WriteHead : public Context { public: Journaler *ls; Header h; - Context *oncommit; - C_WriteHead(Journaler *l, Header& h_, Context *c) : ls(l), h(h_), oncommit(c) {} + C_OnFinisher *oncommit; + C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), oncommit(c) {} void finish(int r) { ls->_finish_write_head(r, h, oncommit); } @@ -414,10 +415,10 @@ void Journaler::_write_head(Context *oncommit) object_locator_t oloc(pg_pool); objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, NULL, - new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher)); + wrap_finisher(new C_WriteHead(this, last_written, wrap_finisher(oncommit)))); } -void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit) +void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit) { Mutex::Locker l(lock); @@ -606,7 +607,7 @@ void Journaler::_do_flush(unsigned amount) filer.write(ino, &layout, snapc, flush_pos, len, write_bl, ceph_clock_now(cct), 0, - NULL, new C_OnFinisher(onsafe, finisher)); + NULL, wrap_finisher(onsafe)); flush_pos += len; assert(write_buf.length() == write_pos - flush_pos); @@ -641,17 +642,17 @@ void Journaler::_wait_for_flush(Context *onsafe) // queue waiter if (onsafe) { - waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher)); + waitfor_safe[write_pos].push_back(wrap_finisher(onsafe)); } } void Journaler::flush(Context *onsafe) { Mutex::Locker l(lock); - _flush(onsafe); + _flush(wrap_finisher(onsafe)); } -void Journaler::_flush(Context *onsafe) +void Journaler::_flush(C_OnFinisher *onsafe) { assert(!readonly); @@ -728,7 +729,7 @@ void Journaler::_issue_prezero() ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl; } SnapContext snapc; - Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher); + Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, len)); filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c); prezeroing_pos += len; } @@ -807,7 +808,7 @@ void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl) ldout(cct, 0) << "_finish_read got error " << r << dendl; error = r; if (on_readable) { - Context *f = on_readable; + C_OnFinisher *f = on_readable; on_readable = 0; f->complete(r); } @@ -854,7 +855,7 @@ void Journaler::_assimilate_prefetch() // readable! ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl; if (on_readable) { - Context *f = on_readable; + C_OnFinisher *f = on_readable; on_readable = 0; f->complete(0); } @@ -903,7 +904,7 @@ void Journaler::_issue_read(uint64_t len) if (l > len) l = len; C_Read *c = new C_Read(this, requested_pos); - filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher)); + filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, wrap_finisher(c)); requested_pos += l; len -= l; } @@ -999,9 +1000,9 @@ bool Journaler::is_readable() class Journaler::C_EraseFinish : public Context { Journaler *journaler; - Context *completion; + C_OnFinisher *completion; public: - C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {} + C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {} void finish(int r) { journaler->_finish_erase(r, completion); } @@ -1019,21 +1020,21 @@ void Journaler::erase(Context *completion) uint64_t first = trimmed_pos / get_layout_period(); uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0, - new C_OnFinisher(new C_EraseFinish(this, completion), finisher)); + wrap_finisher(new C_EraseFinish(this, wrap_finisher(completion)))); // We will not start the operation to delete the header until _finish_erase has // seen the data deletion succeed: otherwise if there was an error deleting data // we might prematurely delete the header thereby lose our reference to the data. } -void Journaler::_finish_erase(int data_result, Context *completion) +void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) { Mutex::Locker l(lock); if (data_result == 0) { // Async delete the journal header filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0, - new C_OnFinisher(completion, finisher)); + wrap_finisher(completion)); } else { lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl; completion->complete(data_result); @@ -1072,7 +1073,7 @@ void Journaler::wait_for_readable(Context *onreadable) ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl; assert(!_is_readable()); assert(on_readable == 0); - on_readable = onreadable; + on_readable = wrap_finisher(onreadable); } @@ -1132,7 +1133,7 @@ void Journaler::_trim() uint64_t num = (trim_to - trimming_pos) / period; SnapContext snapc; filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, - new C_OnFinisher(new C_Trim(this, trim_to), finisher)); + wrap_finisher(new C_Trim(this, trim_to))); trimming_pos = trim_to; } @@ -1295,3 +1296,38 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &s } } +/** + * set write error callback + * + * Set a callback/context to trigger if we get a write error from + * the objecter. This may be from an explicit request (e.g., flush) + * or something async the journaler did on its own (e.g., journal + * header update). + * + * It is only used once; if the caller continues to use the + * Journaler and wants to hear about errors, it needs to reset the + * error_handler. + * + * @param c callback/context to trigger on error + */ +void Journaler::set_write_error_handler(Context *c) { + Mutex::Locker l(lock); + assert(!on_write_error); + on_write_error = wrap_finisher(c); +} + + +/** + * Wrap a context in a C_OnFinisher, if it is non-NULL + * + * Utility function to avoid lots of error-prone and verbose + * NULL checking on contexts passed in. + */ +C_OnFinisher *Journaler::wrap_finisher(Context *c) +{ + if (c != NULL) { + return new C_OnFinisher(c, finisher); + } else { + return NULL; + } +} diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 365cee5b2b0c9..375c5f9ffb9fe 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -60,6 +60,7 @@ class CephContext; class Context; class PerfCounters; class Finisher; +class C_OnFinisher; typedef __u8 stream_format_t; @@ -259,21 +260,21 @@ private: // header utime_t last_wrote_head; - void _finish_write_head(int r, Header &wrote, Context *oncommit); + void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit); class C_WriteHead; friend class C_WriteHead; void _reread_head(Context *onfinish); void _set_layout(ceph_file_layout const *l); list waitfor_recover; - void read_head(Context *on_finish, bufferlist *bl); + void _read_head(Context *on_finish, bufferlist *bl); void _finish_read_head(int r, bufferlist& bl); void _finish_reread_head(int r, bufferlist& bl, Context *finish); void _probe(Context *finish, uint64_t *end); void _finish_probe_end(int r, uint64_t end); - void _reprobe(Context *onfinish); - void _finish_reprobe(int r, uint64_t end, Context *onfinish); - void _finish_reread_head_and_probe(int r, Context *onfinish); + void _reprobe(C_OnFinisher *onfinish); + void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish); + void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish); class C_ReadHead; friend class C_ReadHead; class C_ProbeEnd; @@ -296,9 +297,11 @@ private: bool waiting_for_zero; interval_set pending_zero; // non-contig bits we've zeroed std::set pending_safe; + // XXX this would be C_OnFinisher reather than Context if it weren't for use of C_RetryRead here + // FIXME but oh dear, these are called back inside lock and RetryRead takes the lock!!! std::map > waitfor_safe; // when safe through given offset - void _flush(Context *onsafe); + void _flush(C_OnFinisher *onsafe); void _do_flush(unsigned amount=0); void _finish_flush(int r, uint64_t start, utime_t stamp); class C_Flush; @@ -316,9 +319,8 @@ private: uint64_t temp_fetch_len; // for wait_for_readable() - Context *on_readable; - - Context *on_write_error; + C_OnFinisher *on_readable; + C_OnFinisher *on_write_error; void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback void _finish_retry_read(int r); @@ -362,10 +364,12 @@ private: bool _is_readable(); - void _finish_erase(int data_result, Context *completion); + void _finish_erase(int data_result, C_OnFinisher *completion); class C_EraseFinish; friend class C_EraseFinish; + C_OnFinisher *wrap_finisher(Context *c); + public: Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) : last_committed(mag), @@ -460,25 +464,8 @@ public: assert(!readonly); _issue_prezero(); } - /** - * set write error callback - * - * Set a callback/context to trigger if we get a write error from - * the objecter. This may be from an explicit request (e.g., flush) - * or something async the journaler did on its own (e.g., journal - * header update). - * - * It is only used once; if the caller continues to use the - * Journaler and wants to hear about errors, it needs to reset the - * error_handler. - * - * @param c callback/context to trigger on error - */ - void set_write_error_handler(Context *c) { - Mutex::Locker l(lock); - assert(!on_write_error); - on_write_error = c; - } + + void set_write_error_handler(Context *c); // Synchronous getters // =================== -- 2.39.5