libcephclient_so_a_CXXFLAGS = ${AM_CXXFLAGS} -fPIC
libcephclient_so_a_CFLAGS = ${AM_CFLAGS} -fPIC
+#BUILT_SOURCES += libcephclient_so.a
+
#libcephclient.so: libcephclient_so.a libcrush_so.a
# ${CXX} -I. -fPIC -shared -Wl,-soname,$@.1 ${AM_CXXFLAGS} ${LIBS} $^ -o $@
#BUILT_SOURCES += libcephclient.so
noinst_LIBRARIES = \
libcommon.a libcrush.a \
libmon.a libmds.a libosdc.a libosd.a libclient.a \
- libos.a libebofs.a \
- libcrush_so.a libcephclient_so.a
+ libos.a libebofs.a
+
+noinst_LIBRARIES += libcrush_so.a libcephclient_so.a
# extra bits
EXTRA_DIST = start.sh stop.sh crushtool.pl
- transaction prepare/commit
- rollback
- rollback logging (to fix slow prepare vs rollback race)
-- read+floor_lockout for clean STOGITH-like/fencing semantics after failover.
+
+- a more general fencing mechanism? per-object granularity isn't usually a good match.
- consider implications of nvram writeahead logs
-- clean shutdown?
-- pgmonitor should supplement failure detection
- flag missing log entries on crash recovery --> WRNOOP? or WRLOST?
CEPH_OSD_OP_ACK = 1, /* want (or is) "ack" ack */
CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */
CEPH_OSD_OP_RETRY = 4, /* resend attempt */
- CEPH_OSD_OP_INC_LOCK = 8, /* acquire/require incarnation lock */
+ CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */
CEPH_OSD_OP_BALANCE_READS = 16
};
dout(10) << "_replay_thread start" << dendl;
// loop
+ int r = 0;
off_t new_expire_pos = journaler->get_expire_pos();
while (1) {
// wait for read?
while (!journaler->is_readable() &&
- journaler->get_read_pos() < journaler->get_write_pos()) {
+ journaler->get_read_pos() < journaler->get_write_pos() &&
+ !journaler->get_error()) {
journaler->wait_for_readable(new C_MDL_Replay(this));
replay_cond.Wait(mds->mds_lock);
}
+ if (journaler->get_error()) {
+ r = journaler->get_error();
+ dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
+ break;
+ }
if (!journaler->is_readable() &&
journaler->get_read_pos() == journaler->get_write_pos())
}
// done!
- assert(journaler->get_read_pos() == journaler->get_write_pos());
- dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
-
- // move read pointer _back_ to first subtree map we saw, for eventual trimming
- journaler->set_read_pos(new_expire_pos);
- journaler->set_expire_pos(new_expire_pos);
- logger->set("expos", new_expire_pos);
-
+ if (r == 0) {
+ assert(journaler->get_read_pos() == journaler->get_write_pos());
+ dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
+
+ // move read pointer _back_ to first subtree map we saw, for eventual trimming
+ journaler->set_read_pos(new_expire_pos);
+ journaler->set_expire_pos(new_expire_pos);
+ logger->set("expos", new_expire_pos);
+ }
+
// kick waiter(s)
list<Context*> ls;
ls.swap(waitfor_replay);
- finish_contexts(ls,0);
+ finish_contexts(ls, r);
dout(10) << "_replay_thread finish" << dendl;
mds->mds_lock.Unlock();
int nextstep;
public:
C_MDS_BootStart(MDS *m, int n) : mds(m), nextstep(n) {}
- void finish(int r) { mds->boot_start(nextstep); }
+ void finish(int r) { mds->boot_start(nextstep, r); }
};
-void MDS::boot_start(int step)
+void MDS::boot_start(int step, int r)
{
+ if (r < 0) {
+ dout(0) << "boot_start encountered an error, failing" << dendl;
+ suicide();
+ return;
+ }
+
switch (step) {
case 0:
step = 1; // fall-thru.
void boot();
void boot_create(); // i am new mds.
- void boot_start(int step=0); // starting|replay
+ void boot_start(int step=0, int r=0); // starting|replay
void replay_start();
void creating_done();
int Filer::probe_fwd(inode_t& inode,
off_t start_from,
off_t *end,
+ int flags,
Context *onfinish)
{
dout(10) << "probe_fwd " << hex << inode.ino << dec << " starting from " << start_from << dendl;
- Probe *probe = new Probe(inode, start_from, end, onfinish);
+ Probe *probe = new Probe(inode, start_from, end, flags, onfinish);
// period (bytes before we jump unto a new set of object(s))
off_t period = ceph_file_layout_period(inode.layout);
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
- probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, 0, c);
+ probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c);
}
}
inode_t inode;
off_t from;
off_t *end;
+ int flags;
Context *onfinish;
list<ObjectExtent> probing;
map<object_t, off_t> known;
map<object_t, tid_t> ops;
- Probe(inode_t &i, off_t f, off_t *e, Context *c) :
- inode(i), from(f), end(e), onfinish(c), probing_len(0) {}
+ Probe(inode_t &i, off_t f, off_t *e, int fl, Context *c) :
+ inode(i), from(f), end(e), flags(fl), onfinish(c), probing_len(0) {}
};
class C_Probe;
int probe_fwd(inode_t& inode,
off_t start_from,
off_t *end,
+ int flags,
Context *onfinish);
dout(1) << "read_head" << dendl;
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
- filer.read(inode, 0, sizeof(Header), &fin->bl, 0, fin);
+ filer.read(inode, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
// probe the log
state = STATE_PROBING;
C_ProbeEnd *fin = new C_ProbeEnd(this);
- filer.probe_fwd(inode, h.write_pos, &fin->end, fin);
+ filer.probe_fwd(inode, h.write_pos, &fin->end, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_probe_end(int r, off_t end)
bufferlist bl;
bl.append((char*)&last_written, sizeof(last_written));
- filer.write(inode, 0, bl.length(), bl, 0,
+ filer.write(inode, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL,
NULL,
new C_WriteHead(this, last_written, oncommit));
}
// submit write for anything pending
// flush _start_ pos to _finish_flush
- filer.write(inode, flush_pos, len, write_buf, 0,
+ filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL,
g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK
g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT
pending_flush[flush_pos] = g_clock.now();
void Journaler::_finish_read(int r)
{
+ if (r < 0) {
+ dout(0) << "_finish_read got error " << r << dendl;
+ error = r;
+ if (on_readable) {
+ Context *f = on_readable;
+ on_readable = 0;
+ f->finish(0);
+ delete f;
+ }
+ return;
+ }
assert(r>=0);
dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl;
<< ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
<< dendl;
- filer.read(inode, requested_pos, len, &reading_buf, 0,
+ filer.read(inode, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
new C_Read(this));
requested_pos += len;
}
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
<< dendl;
- filer.remove(inode, trimming_pos, trim_to-trimming_pos,
- 0, NULL, new C_Trim(this, trim_to));
+ filer.remove(inode, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL,
+ NULL, new C_Trim(this, trim_to));
trimming_pos = trim_to;
}
static const int STATE_ACTIVE = 2;
int state;
+ int error;
// header
utime_t last_wrote_head;
Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, off_t fl=0, off_t pff=0) :
inode(inode_), objecter(obj), filer(objecter), logger(l),
lock(lk), timer(*lk), delay_flush_event(0),
- state(STATE_UNDEF),
+ state(STATE_UNDEF), error(0),
write_pos(0), flush_pos(0), ack_pos(0),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(fl), prefetch_from(pff),
void write_head(Context *onsave=0);
bool is_active() { return state == STATE_ACTIVE; }
+ int get_error() { return error; }
off_t get_write_pos() const { return write_pos; }
off_t get_write_ack_pos() const { return ack_pos; }
if (pg.active_tids.empty()) close_pg( m->get_pg() );
// success?
- if (m->get_result() == -EINCLOCKED) {
+ if (m->get_result() == -EINCLOCKED &&
+ st->flags & CEPH_OSD_OP_INCLOCK_FAIL == 0) {
dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
stat_submit(st);
delete m;
delete m;
return;
}
- //assert(m->get_result() >= 0);
// ok!
if (m->get_result() < 0) {
- *st->size = -1;
+ *st->size = -1;
} else {
- *st->size = m->get_length();
+ *st->size = m->get_length();
}
// finish, clean up
// done
delete st;
if (onfinish) {
- onfinish->finish(m->get_result());
- delete onfinish;
+ onfinish->finish(m->get_result());
+ delete onfinish;
}
delete m;
// our op finished
rd->ops.erase(tid);
+ // fail?
+ if (m->get_result() == -EINCLOCKED &&
+ rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
+ dout(7) << " got -EINCLOCKED, failing" << dendl;
+ if (rd->onfinish) {
+ rd->onfinish->finish(-EINCLOCKED);
+ delete rd->onfinish;
+ }
+ delete rd;
+ delete m;
+ return;
+ }
+
// success?
if (m->get_result() == -EAGAIN ||
m->get_result() == -EINCLOCKED) {
delete m;
return;
}
- //assert(m->get_result() >= 0);
// what buffer offset are we?
dout(7) << " got frag from " << m->get_oid() << " "
delete m;
return;
}
+
+ int rc = 0;
+ if (m->get_result() == -EINCLOCKED && wr->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
+ dout(7) << " got -EINCLOCKED, failing" << dendl;
+ rc = -EINCLOCKED;
+ if (wr->onack) {
+ onack = wr->onack;
+ wr->onack = 0;
+ num_unacked--;
+ }
+ if (wr->oncommit) {
+ oncommit = wr->oncommit;
+ wr->oncommit = 0;
+ num_uncommitted--;
+ }
+ goto done;
+ }
+
if (m->get_result() == -EAGAIN ||
m->get_result() == -EINCLOCKED) {
dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
}
// done?
+ done:
if (wr->onack == 0 && wr->oncommit == 0) {
// remove from tid/osd maps
assert(pg.active_tids.count(tid));
// do callbacks
if (onack) {
- onack->finish(0);
+ onack->finish(rc);
delete onack;
}
if (oncommit) {
- oncommit->finish(0);
+ oncommit->finish(rc);
delete oncommit;
}