]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Journaler: use finisher for public callbacks
authorJohn Spray <john.spray@redhat.com>
Thu, 7 Aug 2014 14:52:58 +0000 (15:52 +0100)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:18 +0000 (01:34 +0100)
This is needed because of occasional lock cycles with
external callers doing e.g. write_head.

We do get some weird-looking multiply-nested
C_OnFinisher(C_OnFinisher(...)) from this approach,
where one finisher exists to protect journaler from
lock cycles wrt objecter, and the other exists
to protect the MDS from lock cycles wrt journaler.

Signed-off-by: John Spray <john.spray@redhat.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index 521c603505d1c6ea0f8c9474fe23a22df9580509..0585e73bb246735affdbcf594e3c9a447796247c 100644 (file)
@@ -128,10 +128,10 @@ public:
 
 class Journaler::C_ReProbe : public Context {
   Journaler *ls;
-  Context *onfinish;
+  C_OnFinisher *onfinish;
 public:
   uint64_t end;
-  C_ReProbe(Journaler *l, Context *onfinish_) :
+  C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
     ls(l), onfinish(onfinish_), end(0) {}
   void finish(int r) {
     ls->_finish_reprobe(r, end, onfinish);
@@ -157,22 +157,23 @@ void Journaler::recover(Context *onread)
   ldout(cct, 1) << "read_head" << dendl;
   state = STATE_READHEAD;
   C_ReadHead *fin = new C_ReadHead(this);
-  read_head(fin, &fin->bl);
+  _read_head(fin, &fin->bl);
 }
 
-void Journaler::read_head(Context *on_finish, bufferlist *bl)
+void Journaler::_read_head(Context *on_finish, bufferlist *bl)
 {
+  assert(lock.is_locked_by_me());
   assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
 
   object_t oid = file_object_t(ino, 0);
   object_locator_t oloc(pg_pool);
-  objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher));
+  objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
 }
 
 void Journaler::reread_head(Context *onfinish)
 {
   Mutex::Locker l(lock);
-  _reread_head(onfinish);
+  _reread_head(wrap_finisher(onfinish));
 }
 
 /**
@@ -190,7 +191,7 @@ void Journaler::_reread_head(Context *onfinish)
 
   state = STATE_REREADHEAD;
   C_RereadHead *fin = new C_RereadHead(this, onfinish);
-  read_head(fin, &fin->bl);
+  _read_head(fin, &fin->bl);
 }
 
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
@@ -280,10 +281,10 @@ void Journaler::_probe(Context *finish, uint64_t *end)
   assert(state == STATE_PROBING || state == STATE_REPROBING);
   // probe the log
   filer.probe(ino, &layout, CEPH_NOSNAP,
-             write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher));
+             write_pos, end, 0, true, 0, wrap_finisher(finish));
 }
 
-void Journaler::_reprobe(Context *finish)
+void Journaler::_reprobe(C_OnFinisher *finish)
 {
   ldout(cct, 10) << "reprobe" << dendl;
   assert(state == STATE_ACTIVE);
@@ -294,7 +295,7 @@ void Journaler::_reprobe(Context *finish)
 }
 
 
-void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish)
+void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish)
 {
   Mutex::Locker l(lock);
 
@@ -342,9 +343,9 @@ out:
 class Journaler::C_RereadHeadProbe : public Context
 {
   Journaler *ls;
-  Context *final_finish;
+  C_OnFinisher *final_finish;
 public:
-  C_RereadHeadProbe(Journaler *l, Context *finish) :
+  C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
     ls(l), final_finish(finish) {}
   void finish(int r) {
     ls->_finish_reread_head_and_probe(r, final_finish);
@@ -356,10 +357,10 @@ void Journaler::reread_head_and_probe(Context *onfinish)
   Mutex::Locker l(lock);
 
   assert(state == STATE_ACTIVE);
-  _reread_head(new C_RereadHeadProbe(this, onfinish));
+  _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
 }
 
-void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish)
+void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
 {
   // Expect to be called back from finish_reread_head, which already takes lock
   assert(lock.is_locked_by_me());
@@ -375,8 +376,8 @@ class Journaler::C_WriteHead : public Context {
 public:
   Journaler *ls;
   Header h;
-  Context *oncommit;
-  C_WriteHead(Journaler *l, Header& h_, Context *c) : ls(l), h(h_), oncommit(c) {}
+  C_OnFinisher *oncommit;
+  C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), oncommit(c) {}
   void finish(int r) {
     ls->_finish_write_head(r, h, oncommit);
   }
@@ -414,10 +415,10 @@ void Journaler::_write_head(Context *oncommit)
   object_locator_t oloc(pg_pool);
   objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, 
                       NULL, 
-                      new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher));
+                      wrap_finisher(new C_WriteHead(this, last_written, wrap_finisher(oncommit))));
 }
 
-void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
+void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit)
 {
   Mutex::Locker l(lock);
 
@@ -606,7 +607,7 @@ void Journaler::_do_flush(unsigned amount)
   filer.write(ino, &layout, snapc,
              flush_pos, len, write_bl, ceph_clock_now(cct),
              0,
-             NULL, new C_OnFinisher(onsafe, finisher));
+             NULL, wrap_finisher(onsafe));
 
   flush_pos += len;
   assert(write_buf.length() == write_pos - flush_pos);
@@ -641,17 +642,17 @@ void Journaler::_wait_for_flush(Context *onsafe)
 
   // queue waiter
   if (onsafe) {
-    waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher));
+    waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
   }
 }  
 
 void Journaler::flush(Context *onsafe)
 {
   Mutex::Locker l(lock);
-  _flush(onsafe);
+  _flush(wrap_finisher(onsafe));
 }
 
-void Journaler::_flush(Context *onsafe)
+void Journaler::_flush(C_OnFinisher *onsafe)
 {
   assert(!readonly);
 
@@ -728,7 +729,7 @@ void Journaler::_issue_prezero()
       ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
     }
     SnapContext snapc;
-    Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher);
+    Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, len));
     filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
     prezeroing_pos += len;
   }
@@ -807,7 +808,7 @@ void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
     ldout(cct, 0) << "_finish_read got error " << r << dendl;
     error = r;
     if (on_readable) {
-      Context *f = on_readable;
+      C_OnFinisher *f = on_readable;
       on_readable = 0;
       f->complete(r);
     }
@@ -854,7 +855,7 @@ void Journaler::_assimilate_prefetch()
     // readable!
     ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl;
     if (on_readable) {
-      Context *f = on_readable;
+      C_OnFinisher *f = on_readable;
       on_readable = 0;
       f->complete(0);
     }
@@ -903,7 +904,7 @@ void Journaler::_issue_read(uint64_t len)
     if (l > len)
       l = len;
     C_Read *c = new C_Read(this, requested_pos);
-    filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher));
+    filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, wrap_finisher(c));
     requested_pos += l;
     len -= l;
   }
@@ -999,9 +1000,9 @@ bool Journaler::is_readable()
 
 class Journaler::C_EraseFinish : public Context {
   Journaler *journaler;
-  Context *completion;
+  C_OnFinisher *completion;
   public:
-  C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {}
+  C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
   void finish(int r) {
     journaler->_finish_erase(r, completion);
   }
@@ -1019,21 +1020,21 @@ void Journaler::erase(Context *completion)
   uint64_t first = trimmed_pos / get_layout_period();
   uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
   filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
-      new C_OnFinisher(new C_EraseFinish(this, completion), finisher));
+      wrap_finisher(new C_EraseFinish(this, wrap_finisher(completion))));
 
   // We will not start the operation to delete the header until _finish_erase has
   // seen the data deletion succeed: otherwise if there was an error deleting data
   // we might prematurely delete the header thereby lose our reference to the data.
 }
 
-void Journaler::_finish_erase(int data_result, Context *completion)
+void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
 {
   Mutex::Locker l(lock);
 
   if (data_result == 0) {
     // Async delete the journal header
     filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
-        new C_OnFinisher(completion, finisher));
+        wrap_finisher(completion));
   } else {
     lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl;
     completion->complete(data_result);
@@ -1072,7 +1073,7 @@ void Journaler::wait_for_readable(Context *onreadable)
   ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
   assert(!_is_readable());
   assert(on_readable == 0);
-  on_readable = onreadable;
+  on_readable = wrap_finisher(onreadable);
 }
 
 
@@ -1132,7 +1133,7 @@ void Journaler::_trim()
   uint64_t num = (trim_to - trimming_pos) / period;
   SnapContext snapc;
   filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, 
-                   new C_OnFinisher(new C_Trim(this, trim_to), finisher));
+                   wrap_finisher(new C_Trim(this, trim_to)));
   trimming_pos = trim_to;  
 }
 
@@ -1295,3 +1296,38 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &s
   }
 }
 
+/**
+ * set write error callback
+ *
+ * Set a callback/context to trigger if we get a write error from
+ * the objecter.  This may be from an explicit request (e.g., flush)
+ * or something async the journaler did on its own (e.g., journal
+ * header update).
+ *
+ * It is only used once; if the caller continues to use the
+ * Journaler and wants to hear about errors, it needs to reset the
+ * error_handler.
+ *
+ * @param c callback/context to trigger on error
+ */
+void Journaler::set_write_error_handler(Context *c) {
+  Mutex::Locker l(lock);
+  assert(!on_write_error);
+  on_write_error = wrap_finisher(c);
+}
+
+
+/**
+ * Wrap a context in a C_OnFinisher, if it is non-NULL
+ *
+ * Utility function to avoid lots of error-prone and verbose
+ * NULL checking on contexts passed in.
+ */
+C_OnFinisher *Journaler::wrap_finisher(Context *c)
+{
+  if (c != NULL) {
+    return new C_OnFinisher(c, finisher);
+  } else {
+    return NULL;
+  }
+}
index 365cee5b2b0c94897d6b70270e0bf77c0082b7b8..375c5f9ffb9fe2ba23fa6b242f324a700edb5bbb 100644 (file)
@@ -60,6 +60,7 @@ class CephContext;
 class Context;
 class PerfCounters;
 class Finisher;
+class C_OnFinisher;
 
 typedef __u8 stream_format_t;
 
@@ -259,21 +260,21 @@ private:
 
   // header
   utime_t last_wrote_head;
-  void _finish_write_head(int r, Header &wrote, Context *oncommit);
+  void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
   class C_WriteHead;
   friend class C_WriteHead;
 
   void _reread_head(Context *onfinish);
   void _set_layout(ceph_file_layout const *l);
   list<Context*> waitfor_recover;
-  void read_head(Context *on_finish, bufferlist *bl);
+  void _read_head(Context *on_finish, bufferlist *bl);
   void _finish_read_head(int r, bufferlist& bl);
   void _finish_reread_head(int r, bufferlist& bl, Context *finish);
   void _probe(Context *finish, uint64_t *end);
   void _finish_probe_end(int r, uint64_t end);
-  void _reprobe(Context *onfinish);
-  void _finish_reprobe(int r, uint64_t end, Context *onfinish);
-  void _finish_reread_head_and_probe(int r, Context *onfinish);
+  void _reprobe(C_OnFinisher *onfinish);
+  void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish);
+  void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish);
   class C_ReadHead;
   friend class C_ReadHead;
   class C_ProbeEnd;
@@ -296,9 +297,11 @@ private:
   bool waiting_for_zero;
   interval_set<uint64_t> pending_zero;  // non-contig bits we've zeroed
   std::set<uint64_t> pending_safe;
+  // XXX this would be C_OnFinisher reather than Context if it weren't for use of C_RetryRead here
+  // FIXME but oh dear, these are called back inside lock and RetryRead takes the lock!!!
   std::map<uint64_t, std::list<Context*> > waitfor_safe; // when safe through given offset
 
-  void _flush(Context *onsafe);
+  void _flush(C_OnFinisher *onsafe);
   void _do_flush(unsigned amount=0);
   void _finish_flush(int r, uint64_t start, utime_t stamp);
   class C_Flush;
@@ -316,9 +319,8 @@ private:
   uint64_t temp_fetch_len;
 
   // for wait_for_readable()
-  Context    *on_readable;
-
-  Context    *on_write_error;
+  C_OnFinisher    *on_readable;
+  C_OnFinisher    *on_write_error;
 
   void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
   void _finish_retry_read(int r);
@@ -362,10 +364,12 @@ private:
 
   bool _is_readable();
 
-  void _finish_erase(int data_result, Context *completion);
+  void _finish_erase(int data_result, C_OnFinisher *completion);
   class C_EraseFinish;
   friend class C_EraseFinish;
 
+  C_OnFinisher *wrap_finisher(Context *c);
+
 public:
   Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) : 
     last_committed(mag),
@@ -460,25 +464,8 @@ public:
     assert(!readonly);
     _issue_prezero();
   }
-  /**
-   * set write error callback
-   *
-   * Set a callback/context to trigger if we get a write error from
-   * the objecter.  This may be from an explicit request (e.g., flush)
-   * or something async the journaler did on its own (e.g., journal
-   * header update).
-   *
-   * It is only used once; if the caller continues to use the
-   * Journaler and wants to hear about errors, it needs to reset the
-   * error_handler.
-   *
-   * @param c callback/context to trigger on error
-   */
-  void set_write_error_handler(Context *c) {
-    Mutex::Locker l(lock);
-    assert(!on_write_error);
-    on_write_error = c;
-  }
+
+  void set_write_error_handler(Context *c);
 
   // Synchronous getters
   // ===================