From 18ac05dff865e40a70ac918de207d69060773279 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 24 Mar 2008 12:53:26 -0700 Subject: [PATCH] objecter and journaler error paths for inc_lock --- src/Makefile.am | 7 +++++-- src/TODO | 5 ++--- src/include/ceph_fs.h | 2 +- src/mds/MDLog.cc | 29 ++++++++++++++++--------- src/mds/MDS.cc | 10 +++++++-- src/mds/MDS.h | 2 +- src/osdc/Filer.cc | 5 +++-- src/osdc/Filer.h | 6 ++++-- src/osdc/Journaler.cc | 25 +++++++++++++++------- src/osdc/Journaler.h | 4 +++- src/osdc/Objecter.cc | 49 +++++++++++++++++++++++++++++++++++-------- 11 files changed, 104 insertions(+), 40 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 4b6e98b634cf4..851bb8705395b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -102,6 +102,8 @@ libcephclient_so_a_SOURCES = \ libcephclient_so_a_CXXFLAGS = ${AM_CXXFLAGS} -fPIC libcephclient_so_a_CFLAGS = ${AM_CFLAGS} -fPIC +#BUILT_SOURCES += libcephclient_so.a + #libcephclient.so: libcephclient_so.a libcrush_so.a # ${CXX} -I. -fPIC -shared -Wl,-soname,$@.1 ${AM_CXXFLAGS} ${LIBS} $^ -o $@ #BUILT_SOURCES += libcephclient.so @@ -132,8 +134,9 @@ bin_PROGRAMS = \ noinst_LIBRARIES = \ libcommon.a libcrush.a \ libmon.a libmds.a libosdc.a libosd.a libclient.a \ - libos.a libebofs.a \ - libcrush_so.a libcephclient_so.a + libos.a libebofs.a + +noinst_LIBRARIES += libcrush_so.a libcephclient_so.a # extra bits EXTRA_DIST = start.sh stop.sh crushtool.pl diff --git a/src/TODO b/src/TODO index 0aac7ea4c275d..daea10fddb036 100644 --- a/src/TODO +++ b/src/TODO @@ -168,11 +168,10 @@ osd/rados - transaction prepare/commit - rollback - rollback logging (to fix slow prepare vs rollback race) -- read+floor_lockout for clean STOGITH-like/fencing semantics after failover. + +- a more general fencing mechanism? per-object granularity isn't usually a good match. - consider implications of nvram writeahead logs -- clean shutdown? -- pgmonitor should supplement failure detection - flag missing log entries on crash recovery --> WRNOOP? or WRLOST? diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 2a5bace55935a..c2b68cfc98289 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -510,7 +510,7 @@ enum { CEPH_OSD_OP_ACK = 1, /* want (or is) "ack" ack */ CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */ CEPH_OSD_OP_RETRY = 4, /* resend attempt */ - CEPH_OSD_OP_INC_LOCK = 8, /* acquire/require incarnation lock */ + CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */ CEPH_OSD_OP_BALANCE_READS = 16 }; diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index e950214ce526f..3c080c871d0ff 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -433,14 +433,21 @@ void MDLog::_replay_thread() dout(10) << "_replay_thread start" << dendl; // loop + int r = 0; off_t new_expire_pos = journaler->get_expire_pos(); while (1) { // wait for read? while (!journaler->is_readable() && - journaler->get_read_pos() < journaler->get_write_pos()) { + journaler->get_read_pos() < journaler->get_write_pos() && + !journaler->get_error()) { journaler->wait_for_readable(new C_MDL_Replay(this)); replay_cond.Wait(mds->mds_lock); } + if (journaler->get_error()) { + r = journaler->get_error(); + dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; + break; + } if (!journaler->is_readable() && journaler->get_read_pos() == journaler->get_write_pos()) @@ -490,18 +497,20 @@ void MDLog::_replay_thread() } // done! - assert(journaler->get_read_pos() == journaler->get_write_pos()); - dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl; - - // move read pointer _back_ to first subtree map we saw, for eventual trimming - journaler->set_read_pos(new_expire_pos); - journaler->set_expire_pos(new_expire_pos); - logger->set("expos", new_expire_pos); - + if (r == 0) { + assert(journaler->get_read_pos() == journaler->get_write_pos()); + dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl; + + // move read pointer _back_ to first subtree map we saw, for eventual trimming + journaler->set_read_pos(new_expire_pos); + journaler->set_expire_pos(new_expire_pos); + logger->set("expos", new_expire_pos); + } + // kick waiter(s) list ls; ls.swap(waitfor_replay); - finish_contexts(ls,0); + finish_contexts(ls, r); dout(10) << "_replay_thread finish" << dendl; mds->mds_lock.Unlock(); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index f6337d3a13ab1..3757cc52803d3 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -778,11 +778,17 @@ class C_MDS_BootStart : public Context { int nextstep; public: C_MDS_BootStart(MDS *m, int n) : mds(m), nextstep(n) {} - void finish(int r) { mds->boot_start(nextstep); } + void finish(int r) { mds->boot_start(nextstep, r); } }; -void MDS::boot_start(int step) +void MDS::boot_start(int step, int r) { + if (r < 0) { + dout(0) << "boot_start encountered an error, failing" << dendl; + suicide(); + return; + } + switch (step) { case 0: step = 1; // fall-thru. diff --git a/src/mds/MDS.h b/src/mds/MDS.h index f5971130df383..442bcacad6c99 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -237,7 +237,7 @@ class MDS : public Dispatcher { void boot(); void boot_create(); // i am new mds. - void boot_start(int step=0); // starting|replay + void boot_start(int step=0, int r=0); // starting|replay void replay_start(); void creating_done(); diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index 000c6374a5b05..a5ee5389a7518 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -50,11 +50,12 @@ public: int Filer::probe_fwd(inode_t& inode, off_t start_from, off_t *end, + int flags, Context *onfinish) { dout(10) << "probe_fwd " << hex << inode.ino << dec << " starting from " << start_from << dendl; - Probe *probe = new Probe(inode, start_from, end, onfinish); + Probe *probe = new Probe(inode, start_from, end, flags, onfinish); // period (bytes before we jump unto a new set of object(s)) off_t period = ceph_file_layout_period(inode.layout); @@ -80,7 +81,7 @@ void Filer::_probe(Probe *probe) p++) { dout(10) << "_probe probing " << p->oid << dendl; C_Probe *c = new C_Probe(this, probe, p->oid); - probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, 0, c); + probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c); } } diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index 86fb663fa2991..6a094035b1056 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -53,6 +53,7 @@ class Filer { inode_t inode; off_t from; off_t *end; + int flags; Context *onfinish; list probing; @@ -61,8 +62,8 @@ class Filer { map known; map ops; - Probe(inode_t &i, off_t f, off_t *e, Context *c) : - inode(i), from(f), end(e), onfinish(c), probing_len(0) {} + Probe(inode_t &i, off_t f, off_t *e, int fl, Context *c) : + inode(i), from(f), end(e), flags(fl), onfinish(c), probing_len(0) {} }; class C_Probe; @@ -137,6 +138,7 @@ class Filer { int probe_fwd(inode_t& inode, off_t start_from, off_t *end, + int flags, Context *onfinish); diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index db0267b9c8ee0..78425679b1b4c 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -81,7 +81,7 @@ void Journaler::recover(Context *onread) dout(1) << "read_head" << dendl; state = STATE_READHEAD; C_ReadHead *fin = new C_ReadHead(this); - filer.read(inode, 0, sizeof(Header), &fin->bl, 0, fin); + filer.read(inode, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin); } void Journaler::_finish_read_head(int r, bufferlist& bl) @@ -112,7 +112,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) // probe the log state = STATE_PROBING; C_ProbeEnd *fin = new C_ProbeEnd(this); - filer.probe_fwd(inode, h.write_pos, &fin->end, fin); + filer.probe_fwd(inode, h.write_pos, &fin->end, CEPH_OSD_OP_INCLOCK_FAIL, fin); } void Journaler::_finish_probe_end(int r, off_t end) @@ -168,7 +168,7 @@ void Journaler::write_head(Context *oncommit) bufferlist bl; bl.append((char*)&last_written, sizeof(last_written)); - filer.write(inode, 0, bl.length(), bl, 0, + filer.write(inode, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL, NULL, new C_WriteHead(this, last_written, oncommit)); } @@ -300,7 +300,7 @@ void Journaler::_do_flush() // submit write for anything pending // flush _start_ pos to _finish_flush - filer.write(inode, flush_pos, len, write_buf, 0, + filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL, g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT pending_flush[flush_pos] = g_clock.now(); @@ -380,6 +380,17 @@ public: void Journaler::_finish_read(int r) { + if (r < 0) { + dout(0) << "_finish_read got error " << r << dendl; + error = r; + if (on_readable) { + Context *f = on_readable; + on_readable = 0; + f->finish(0); + delete f; + } + return; + } assert(r>=0); dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl; @@ -459,7 +470,7 @@ void Journaler::_issue_read(off_t len) << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len) << dendl; - filer.read(inode, requested_pos, len, &reading_buf, 0, + filer.read(inode, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL, new C_Read(this)); requested_pos += len; } @@ -637,8 +648,8 @@ void Journaler::trim() << trimmed_pos << "/" << trimming_pos << "/" << expire_pos << dendl; - filer.remove(inode, trimming_pos, trim_to-trimming_pos, - 0, NULL, new C_Trim(this, trim_to)); + filer.remove(inode, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL, + NULL, new C_Trim(this, trim_to)); trimming_pos = trim_to; } diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 7f7a5753ad708..1aae44b2528eb 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -101,6 +101,7 @@ class Journaler { static const int STATE_ACTIVE = 2; int state; + int error; // header utime_t last_wrote_head; @@ -173,7 +174,7 @@ public: Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, off_t fl=0, off_t pff=0) : inode(inode_), objecter(obj), filer(objecter), logger(l), lock(lk), timer(*lk), delay_flush_event(0), - state(STATE_UNDEF), + state(STATE_UNDEF), error(0), write_pos(0), flush_pos(0), ack_pos(0), read_pos(0), requested_pos(0), received_pos(0), fetch_len(fl), prefetch_from(pff), @@ -203,6 +204,7 @@ public: void write_head(Context *onsave=0); bool is_active() { return state == STATE_ACTIVE; } + int get_error() { return error; } off_t get_write_pos() const { return write_pos; } off_t get_write_ack_pos() const { return ack_pos; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index c72377df3655f..90be9b57a394b 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -409,7 +409,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) if (pg.active_tids.empty()) close_pg( m->get_pg() ); // success? - if (m->get_result() == -EINCLOCKED) { + if (m->get_result() == -EINCLOCKED && + st->flags & CEPH_OSD_OP_INCLOCK_FAIL == 0) { dout(7) << " got -EINCLOCKED, resubmitting" << dendl; stat_submit(st); delete m; @@ -421,13 +422,12 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) delete m; return; } - //assert(m->get_result() >= 0); // ok! if (m->get_result() < 0) { - *st->size = -1; + *st->size = -1; } else { - *st->size = m->get_length(); + *st->size = m->get_length(); } // finish, clean up @@ -436,8 +436,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) // done delete st; if (onfinish) { - onfinish->finish(m->get_result()); - delete onfinish; + onfinish->finish(m->get_result()); + delete onfinish; } delete m; @@ -548,6 +548,19 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) // our op finished rd->ops.erase(tid); + // fail? + if (m->get_result() == -EINCLOCKED && + rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) { + dout(7) << " got -EINCLOCKED, failing" << dendl; + if (rd->onfinish) { + rd->onfinish->finish(-EINCLOCKED); + delete rd->onfinish; + } + delete rd; + delete m; + return; + } + // success? if (m->get_result() == -EAGAIN || m->get_result() == -EINCLOCKED) { @@ -556,7 +569,6 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) delete m; return; } - //assert(m->get_result() >= 0); // what buffer offset are we? dout(7) << " got frag from " << m->get_oid() << " " @@ -862,6 +874,24 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) delete m; return; } + + int rc = 0; + if (m->get_result() == -EINCLOCKED && wr->flags & CEPH_OSD_OP_INCLOCK_FAIL) { + dout(7) << " got -EINCLOCKED, failing" << dendl; + rc = -EINCLOCKED; + if (wr->onack) { + onack = wr->onack; + wr->onack = 0; + num_unacked--; + } + if (wr->oncommit) { + oncommit = wr->oncommit; + wr->oncommit = 0; + num_uncommitted--; + } + goto done; + } + if (m->get_result() == -EAGAIN || m->get_result() == -EINCLOCKED) { dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl; @@ -919,6 +949,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) } // done? + done: if (wr->onack == 0 && wr->oncommit == 0) { // remove from tid/osd maps assert(pg.active_tids.count(tid)); @@ -935,11 +966,11 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) // do callbacks if (onack) { - onack->finish(0); + onack->finish(rc); delete onack; } if (oncommit) { - oncommit->finish(0); + oncommit->finish(rc); delete oncommit; } -- 2.39.5