]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Use a finisher from Journaler
authorJohn Spray <john.spray@redhat.com>
Wed, 23 Jul 2014 16:32:57 +0000 (17:32 +0100)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:03 +0000 (01:34 +0100)
Completions from I/O operations (i.e. the objecter) hop
through the finisher twice, because of the three layers of
locking (MDS::mds_lock -> Journaler::lock -> Objecter osd session lock)

Because on the way "right" we take the locks in that order, to avoid
deadlock we can't take the locks in the opposite order on the way
"left", hence the finishers.

Signed-off-by: John Spray <john.spray@redhat.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index b4b51b19a7fd38b21e63a8ed3f79f79de329fc92..4b5824f96b6c529e085b68fd70d48bab6d1d3026 100644 (file)
@@ -19,6 +19,7 @@
 #include "osdc/Journaler.h"
 #include "common/errno.h"
 #include "include/assert.h"
+#include "common/Finisher.h"
 
 #define dout_subsys ceph_subsys_journaler
 #undef dout_prefix
@@ -165,7 +166,7 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl)
 
   object_t oid = file_object_t(ino, 0);
   object_locator_t oloc(pg_pool);
-  objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish);
+  objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher));
 }
 
 void Journaler::reread_head(Context *onfinish)
@@ -269,28 +270,27 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   ldout(cct, 1) << "_finish_read_head " << h << ".  probing for end of log (from " << write_pos << ")..." << dendl;
   C_ProbeEnd *fin = new C_ProbeEnd(this);
   state = STATE_PROBING;
-  probe(fin, &fin->end);
+  _probe(fin, &fin->end);
 }
 
-void Journaler::probe(Context *finish, uint64_t *end)
+void Journaler::_probe(Context *finish, uint64_t *end)
 {
+  assert(lock.is_locked_by_me());
   ldout(cct, 1) << "probing for end of the log" << dendl;
   assert(state == STATE_PROBING || state == STATE_REPROBING);
   // probe the log
   filer.probe(ino, &layout, CEPH_NOSNAP,
-             write_pos, end, 0, true, 0, finish);
+             write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher));
 }
 
-void Journaler::reprobe(Context *finish)
+void Journaler::_reprobe(Context *finish)
 {
-  Mutex::Locker l(lock);
-
   ldout(cct, 10) << "reprobe" << dendl;
   assert(state == STATE_ACTIVE);
 
   state = STATE_REPROBING;
   C_ReProbe *fin = new C_ReProbe(this, finish);
-  probe(fin, &fin->end);
+  _probe(fin, &fin->end);
 }
 
 
@@ -365,7 +365,7 @@ void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish)
   assert(lock.is_locked_by_me());
 
   assert(!r); //if we get an error, we're boned
-  reprobe(onfinish);
+  _reprobe(onfinish);
 }
 
 
@@ -414,7 +414,7 @@ void Journaler::_write_head(Context *oncommit)
   object_locator_t oloc(pg_pool);
   objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, 
                       NULL, 
-                      new C_WriteHead(this, last_written, oncommit));
+                      new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher));
 }
 
 void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
@@ -606,7 +606,7 @@ void Journaler::_do_flush(unsigned amount)
   filer.write(ino, &layout, snapc,
              flush_pos, len, write_bl, ceph_clock_now(cct),
              0,
-             NULL, onsafe);
+             NULL, new C_OnFinisher(onsafe, finisher));
 
   flush_pos += len;
   assert(write_buf.length() == write_pos - flush_pos);
@@ -634,15 +634,15 @@ void Journaler::_wait_for_flush(Context *onsafe)
     ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " 
             << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
     if (onsafe) {
-      onsafe->complete(0);
-      onsafe = 0;
+      finisher->queue(onsafe, 0);
     }
     return;
   }
 
   // queue waiter
-  if (onsafe) 
-    waitfor_safe[write_pos].push_back(onsafe);
+  if (onsafe) {
+    waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher));
+  }
 }  
 
 void Journaler::flush(Context *onsafe)
@@ -728,12 +728,15 @@ void Journaler::_issue_prezero()
       ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
     }
     SnapContext snapc;
-    Context *c = new C_Journaler_Prezero(this, prezeroing_pos, len);
+    Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher);
     filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
     prezeroing_pos += len;
   }
 }
 
+// Lock cycle because we get called out of objecter callback (holding
+// objecter read lock), but there are also cases where we take the journaler
+// lock before calling into objecter to do I/O.
 void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
 {
   Mutex::Locker l(lock);
@@ -900,7 +903,7 @@ void Journaler::_issue_read(uint64_t len)
     if (l > len)
       l = len;
     C_Read *c = new C_Read(this, requested_pos);
-    filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, c);
+    filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher));
     requested_pos += l;
     len -= l;
   }
@@ -1016,7 +1019,7 @@ void Journaler::erase(Context *completion)
   uint64_t first = trimmed_pos / get_layout_period();
   uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
   filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
-      new C_EraseFinish(this, completion));
+      new C_OnFinisher(new C_EraseFinish(this, completion), finisher));
 
   // We will not start the operation to delete the header until _finish_erase has
   // seen the data deletion succeed: otherwise if there was an error deleting data
@@ -1030,7 +1033,7 @@ void Journaler::_finish_erase(int data_result, Context *completion)
   if (data_result == 0) {
     // Async delete the journal header
     filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
-        completion);
+        new C_OnFinisher(completion, finisher));
   } else {
     lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl;
     completion->complete(data_result);
@@ -1129,7 +1132,7 @@ void Journaler::_trim()
   uint64_t num = (trim_to - trimming_pos) / period;
   SnapContext snapc;
   filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, 
-                   new C_Trim(this, trim_to));
+                   new C_OnFinisher(new C_Trim(this, trim_to), finisher));
   trimming_pos = trim_to;  
 }
 
index 76192353aea64cbdd7a19a8bbc1f369bfae5d8fe..3ee93343dd109a05f70dcb2a06875f026e665eb5 100644 (file)
@@ -59,6 +59,7 @@
 class CephContext;
 class Context;
 class PerfCounters;
+class Finisher;
 
 typedef __u8 stream_format_t;
 
@@ -194,12 +195,14 @@ public:
     return stream_format;
   }
 
+  Header last_committed;
+
 private:
   // me
   CephContext *cct;
   Mutex lock;
+  Finisher *finisher;
   Header last_written;
-  Header last_committed;
   inodeno_t ino;
   int64_t pg_pool;
   bool readonly;
@@ -266,8 +269,9 @@ private:
   void read_head(Context *on_finish, bufferlist *bl);
   void _finish_read_head(int r, bufferlist& bl);
   void _finish_reread_head(int r, bufferlist& bl, Context *finish);
-  void probe(Context *finish, uint64_t *end);
+  void _probe(Context *finish, uint64_t *end);
   void _finish_probe_end(int r, uint64_t end);
+  void _reprobe(Context *onfinish);
   void _finish_reprobe(int r, uint64_t end, Context *onfinish);
   void _finish_reread_head_and_probe(int r, Context *onfinish);
   class C_ReadHead;
@@ -364,9 +368,10 @@ private:
   friend class C_EraseFinish;
 
 public:
-  Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) : 
-    cct(obj->cct), lock("Journaler"),
-    last_written(mag), last_committed(mag), 
+  Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) : 
+    last_committed(mag),
+    cct(obj->cct), lock("Journaler"), finisher(f),
+    last_written(mag),
     ino(ino_), pg_pool(pool), readonly(true),
     stream_format(-1), journal_stream(-1),
     magic(mag),
@@ -418,7 +423,6 @@ public:
   void create(ceph_file_layout *layout, stream_format_t const sf);
   void recover(Context *onfinish);
   void reread_head(Context *onfinish);
-  void reprobe(Context *onfinish);
   void reread_head_and_probe(Context *onfinish);
   void write_head(Context *onsave=0);
   void wait_for_flush(Context *onsafe = 0);
@@ -479,6 +483,7 @@ public:
 
   // Synchronous getters
   // ===================
+  // TODO: need some locks on reads for true safety
   uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
   ceph_file_layout& get_layout() { return layout; }
   bool is_active() { return state == STATE_ACTIVE; }