]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter and journaler error paths for inc_lock
authorSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 19:53:26 +0000 (12:53 -0700)
committerSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 19:53:26 +0000 (12:53 -0700)
src/Makefile.am
src/TODO
src/include/ceph_fs.h
src/mds/MDLog.cc
src/mds/MDS.cc
src/mds/MDS.h
src/osdc/Filer.cc
src/osdc/Filer.h
src/osdc/Journaler.cc
src/osdc/Journaler.h
src/osdc/Objecter.cc

index 4b6e98b634cf45739787f983aaecb849c9169359..851bb8705395bff373b5f035ac066543f0ee018c 100644 (file)
@@ -102,6 +102,8 @@ libcephclient_so_a_SOURCES = \
 libcephclient_so_a_CXXFLAGS = ${AM_CXXFLAGS} -fPIC
 libcephclient_so_a_CFLAGS = ${AM_CFLAGS} -fPIC
 
+#BUILT_SOURCES += libcephclient_so.a
+
 #libcephclient.so: libcephclient_so.a libcrush_so.a
 #      ${CXX} -I. -fPIC -shared -Wl,-soname,$@.1 ${AM_CXXFLAGS} ${LIBS} $^ -o $@
 #BUILT_SOURCES += libcephclient.so
@@ -132,8 +134,9 @@ bin_PROGRAMS = \
 noinst_LIBRARIES = \
        libcommon.a libcrush.a \
        libmon.a libmds.a libosdc.a libosd.a libclient.a \
-       libos.a libebofs.a \
-       libcrush_so.a libcephclient_so.a
+       libos.a libebofs.a 
+
+noinst_LIBRARIES += libcrush_so.a libcephclient_so.a
 
 # extra bits
 EXTRA_DIST = start.sh stop.sh crushtool.pl
index 0aac7ea4c275d38a0b9297ded4918a2a415285ba..daea10fddb036d47fcf8d90f9d709fa2c53657b0 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -168,11 +168,10 @@ osd/rados
 - transaction prepare/commit
   - rollback
   - rollback logging (to fix slow prepare vs rollback race)
-- read+floor_lockout for clean STOGITH-like/fencing semantics after failover.
+
+- a more general fencing mechanism?  per-object granularity isn't usually a good match.
 
 - consider implications of nvram writeahead logs
-- clean shutdown?
-- pgmonitor should supplement failure detection
 
 - flag missing log entries on crash recovery  --> WRNOOP? or WRLOST?
 
index 2a5bace55935a17394d71b67a77a70f5672702eb..c2b68cfc9828989a450c608e71d53a7546a9644f 100644 (file)
@@ -510,7 +510,7 @@ enum {
        CEPH_OSD_OP_ACK = 1,          /* want (or is) "ack" ack */
        CEPH_OSD_OP_SAFE = 2,         /* want (or is) "safe" ack */
        CEPH_OSD_OP_RETRY = 4,        /* resend attempt */
-       CEPH_OSD_OP_INC_LOCK = 8,     /* acquire/require incarnation lock */
+       CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */
        CEPH_OSD_OP_BALANCE_READS = 16
 };
 
index e950214ce526f883702ce9f9778fff55480aaf60..3c080c871d0fffe911a77d6c96a153e86a9226f1 100644 (file)
@@ -433,14 +433,21 @@ void MDLog::_replay_thread()
   dout(10) << "_replay_thread start" << dendl;
 
   // loop
+  int r = 0;
   off_t new_expire_pos = journaler->get_expire_pos();
   while (1) {
     // wait for read?
     while (!journaler->is_readable() &&
-          journaler->get_read_pos() < journaler->get_write_pos()) {
+          journaler->get_read_pos() < journaler->get_write_pos() &&
+          !journaler->get_error()) {
       journaler->wait_for_readable(new C_MDL_Replay(this));
       replay_cond.Wait(mds->mds_lock);
     }
+    if (journaler->get_error()) {
+      r = journaler->get_error();
+      dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
+      break;
+    }
     
     if (!journaler->is_readable() &&
        journaler->get_read_pos() == journaler->get_write_pos())
@@ -490,18 +497,20 @@ void MDLog::_replay_thread()
   }
 
   // done!
-  assert(journaler->get_read_pos() == journaler->get_write_pos());
-  dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
-  
-  // move read pointer _back_ to first subtree map we saw, for eventual trimming
-  journaler->set_read_pos(new_expire_pos);
-  journaler->set_expire_pos(new_expire_pos);
-  logger->set("expos", new_expire_pos);
-  
+  if (r == 0) {
+    assert(journaler->get_read_pos() == journaler->get_write_pos());
+    dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
+    
+    // move read pointer _back_ to first subtree map we saw, for eventual trimming
+    journaler->set_read_pos(new_expire_pos);
+    journaler->set_expire_pos(new_expire_pos);
+    logger->set("expos", new_expire_pos);
+  }
+
   // kick waiter(s)
   list<Context*> ls;
   ls.swap(waitfor_replay);
-  finish_contexts(ls,0);  
+  finish_contexts(ls, r);  
   
   dout(10) << "_replay_thread finish" << dendl;
   mds->mds_lock.Unlock();
index f6337d3a13ab1c31a4aec8f6c687378c4ba073fe..3757cc52803d3bff50198d13156b02c87e274c98 100644 (file)
@@ -778,11 +778,17 @@ class C_MDS_BootStart : public Context {
   int nextstep;
 public:
   C_MDS_BootStart(MDS *m, int n) : mds(m), nextstep(n) {}
-  void finish(int r) { mds->boot_start(nextstep); }
+  void finish(int r) { mds->boot_start(nextstep, r); }
 };
 
-void MDS::boot_start(int step)
+void MDS::boot_start(int step, int r)
 {
+  if (r < 0) {
+    dout(0) << "boot_start encountered an error, failing" << dendl;
+    suicide();
+    return;
+  }
+
   switch (step) {
   case 0:
     step = 1;  // fall-thru.
index f5971130df383d35f315581d36b280dbbe93ae0c..442bcacad6c994341145a2cd6811d10950403a88 100644 (file)
@@ -237,7 +237,7 @@ class MDS : public Dispatcher {
 
   void boot();
   void boot_create();             // i am new mds.
-  void boot_start(int step=0);    // starting|replay
+  void boot_start(int step=0, int r=0);    // starting|replay
 
   void replay_start();
   void creating_done();
index 000c6374a5b05e2d09d33cfda37a2c84ec80a928..a5ee5389a7518d345d2bd5f430948195d87d9b82 100644 (file)
@@ -50,11 +50,12 @@ public:
 int Filer::probe_fwd(inode_t& inode,
                     off_t start_from,
                     off_t *end,
+                    int flags,
                     Context *onfinish) 
 {
   dout(10) << "probe_fwd " << hex << inode.ino << dec << " starting from " << start_from << dendl;
 
-  Probe *probe = new Probe(inode, start_from, end, onfinish);
+  Probe *probe = new Probe(inode, start_from, end, flags, onfinish);
 
   // period (bytes before we jump unto a new set of object(s))
   off_t period = ceph_file_layout_period(inode.layout);
@@ -80,7 +81,7 @@ void Filer::_probe(Probe *probe)
        p++) {
     dout(10) << "_probe  probing " << p->oid << dendl;
     C_Probe *c = new C_Probe(this, probe, p->oid);
-    probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, 0, c);
+    probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c);
   }
 }
 
index 86fb663fa2991a0610526e4d07c9251cbabec91c..6a094035b10568b792fe67d6a27bc764792227bb 100644 (file)
@@ -53,6 +53,7 @@ class Filer {
     inode_t inode;
     off_t from;
     off_t *end;
+    int flags;
     Context *onfinish;
     
     list<ObjectExtent> probing;
@@ -61,8 +62,8 @@ class Filer {
     map<object_t, off_t> known;
     map<object_t, tid_t> ops;
 
-    Probe(inode_t &i, off_t f, off_t *e, Context *c) : 
-      inode(i), from(f), end(e), onfinish(c), probing_len(0) {}
+    Probe(inode_t &i, off_t f, off_t *e, int fl, Context *c) : 
+      inode(i), from(f), end(e), flags(fl), onfinish(c), probing_len(0) {}
   };
   
   class C_Probe;
@@ -137,6 +138,7 @@ class Filer {
   int probe_fwd(inode_t& inode,
                off_t start_from,
                off_t *end,
+               int flags,
                Context *onfinish);
 
 
index db0267b9c8ee0609a27c0bf56a622fb080745e5b..78425679b1b4cf039571bb2e322ed714097fee6e 100644 (file)
@@ -81,7 +81,7 @@ void Journaler::recover(Context *onread)
   dout(1) << "read_head" << dendl;
   state = STATE_READHEAD;
   C_ReadHead *fin = new C_ReadHead(this);
-  filer.read(inode, 0, sizeof(Header), &fin->bl, 0, fin);
+  filer.read(inode, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
 }
 
 void Journaler::_finish_read_head(int r, bufferlist& bl)
@@ -112,7 +112,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   // probe the log
   state = STATE_PROBING;
   C_ProbeEnd *fin = new C_ProbeEnd(this);
-  filer.probe_fwd(inode, h.write_pos, &fin->end, fin);
+  filer.probe_fwd(inode, h.write_pos, &fin->end, CEPH_OSD_OP_INCLOCK_FAIL, fin);
 }
 
 void Journaler::_finish_probe_end(int r, off_t end)
@@ -168,7 +168,7 @@ void Journaler::write_head(Context *oncommit)
 
   bufferlist bl;
   bl.append((char*)&last_written, sizeof(last_written));
-  filer.write(inode, 0, bl.length(), bl, 0
+  filer.write(inode, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL
              NULL, 
              new C_WriteHead(this, last_written, oncommit));
 }
@@ -300,7 +300,7 @@ void Journaler::_do_flush()
   
   // submit write for anything pending
   // flush _start_ pos to _finish_flush
-  filer.write(inode, flush_pos, len, write_buf, 0,
+  filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL,
              g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos),  // on ACK
              g_conf.journaler_safe ?   new C_Flush(this, flush_pos):0); // on COMMIT
   pending_flush[flush_pos] = g_clock.now();
@@ -380,6 +380,17 @@ public:
 
 void Journaler::_finish_read(int r)
 {
+  if (r < 0) {
+    dout(0) << "_finish_read got error " << r << dendl;
+    error = r;
+    if (on_readable) {
+      Context *f = on_readable;
+      on_readable = 0;
+      f->finish(0);
+      delete f;
+    }
+    return;
+  }
   assert(r>=0);
 
   dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl;
@@ -459,7 +470,7 @@ void Journaler::_issue_read(off_t len)
           << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
           << dendl;
   
-  filer.read(inode, requested_pos, len, &reading_buf, 0,
+  filer.read(inode, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
             new C_Read(this));
   requested_pos += len;
 }
@@ -637,8 +648,8 @@ void Journaler::trim()
           << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
           << dendl;
   
-  filer.remove(inode, trimming_pos, trim_to-trimming_pos, 
-              0, NULL, new C_Trim(this, trim_to));
+  filer.remove(inode, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL, 
+              NULL, new C_Trim(this, trim_to));
   trimming_pos = trim_to;  
 }
 
index 7f7a5753ad708f5ecf592c9421e644ab29d499e6..1aae44b2528eb65dcc8f8ac7f2f5c28c4efc8d3e 100644 (file)
@@ -101,6 +101,7 @@ class Journaler {
   static const int STATE_ACTIVE = 2;
 
   int state;
+  int error;
 
   // header
   utime_t last_wrote_head;
@@ -173,7 +174,7 @@ public:
   Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, off_t fl=0, off_t pff=0) : 
     inode(inode_), objecter(obj), filer(objecter), logger(l), 
     lock(lk), timer(*lk), delay_flush_event(0),
-    state(STATE_UNDEF),
+    state(STATE_UNDEF), error(0),
     write_pos(0), flush_pos(0), ack_pos(0),
     read_pos(0), requested_pos(0), received_pos(0),
     fetch_len(fl), prefetch_from(pff),
@@ -203,6 +204,7 @@ public:
   void write_head(Context *onsave=0);
 
   bool is_active() { return state == STATE_ACTIVE; }
+  int get_error() { return error; }
 
   off_t get_write_pos() const { return write_pos; }
   off_t get_write_ack_pos() const { return ack_pos; }
index c72377df3655f89a371bd413a9790dde4270c456..90be9b57a394b4b3bb2862245c5ac6075b8e27e6 100644 (file)
@@ -409,7 +409,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
   if (pg.active_tids.empty()) close_pg( m->get_pg() );
   
   // success?
-  if (m->get_result() == -EINCLOCKED) {
+  if (m->get_result() == -EINCLOCKED &&
+      st->flags & CEPH_OSD_OP_INCLOCK_FAIL == 0) {
     dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
     stat_submit(st);
     delete m;
@@ -421,13 +422,12 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
     delete m;
     return;
   }
-  //assert(m->get_result() >= 0);
 
   // ok!
   if (m->get_result() < 0) {
-       *st->size = -1;
+    *st->size = -1;
   } else {
-       *st->size = m->get_length();
+    *st->size = m->get_length();
   }
 
   // finish, clean up
@@ -436,8 +436,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
   // done
   delete st;
   if (onfinish) {
-       onfinish->finish(m->get_result());
-       delete onfinish;
+    onfinish->finish(m->get_result());
+    delete onfinish;
   }
 
   delete m;
@@ -548,6 +548,19 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   // our op finished
   rd->ops.erase(tid);
 
+  // fail?
+  if (m->get_result() == -EINCLOCKED &&
+      rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
+    dout(7) << " got -EINCLOCKED, failing" << dendl;
+    if (rd->onfinish) {
+      rd->onfinish->finish(-EINCLOCKED);
+      delete rd->onfinish;
+    }
+    delete rd;
+    delete m;
+    return;
+  }
+
   // success?
   if (m->get_result() == -EAGAIN ||
       m->get_result() == -EINCLOCKED) {
@@ -556,7 +569,6 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
     delete m;
     return;
   }
-  //assert(m->get_result() >= 0);
 
   // what buffer offset are we?
   dout(7) << " got frag from " << m->get_oid() << " "
@@ -862,6 +874,24 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
     delete m;
     return;
   }
+  
+  int rc = 0;
+  if (m->get_result() == -EINCLOCKED && wr->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
+    dout(7) << " got -EINCLOCKED, failing" << dendl;
+    rc = -EINCLOCKED;
+    if (wr->onack) {
+      onack = wr->onack;
+      wr->onack = 0;
+      num_unacked--;
+    }
+    if (wr->oncommit) {
+      oncommit = wr->oncommit;
+      wr->oncommit = 0;
+      num_uncommitted--;
+    }
+    goto done;
+  }
+
   if (m->get_result() == -EAGAIN ||
       m->get_result() == -EINCLOCKED) {
     dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
@@ -919,6 +949,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
   }
 
   // done?
+ done:
   if (wr->onack == 0 && wr->oncommit == 0) {
     // remove from tid/osd maps
     assert(pg.active_tids.count(tid));
@@ -935,11 +966,11 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
 
   // do callbacks
   if (onack) {
-    onack->finish(0);
+    onack->finish(rc);
     delete onack;
   }
   if (oncommit) {
-    oncommit->finish(0);
+    oncommit->finish(rc);
     delete oncommit;
   }