- rejoin will need to explicitly resolve uncommitted items.
- fully implement link/unlink first, and use that as a model?
-monitor
-- finish generic paxos
-
osdmon
-- distribute w/ paxos framework
- allow fresh replacement osds. add osd_created in osdmap, probably
- monitor needs to monitor some osds...
- monitor pg states, notify on out?
- watch osd utilization; adjust overload in cluster map
-mdsmon
-- distribute w/ paxos framework
-
journaler
- fix up for large events (e.g. imports)
- use set_floor_and_read for safe takeover from possibly-not-quite-dead otherguy.
int r = monmap.read(".ceph_monmap");
assert(r >= 0);
+ // start up network
+ rank.start_rank();
+ messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
+ messenger->set_dispatcher(&dispatcher);
+
// build command
- MMonCommand *m = new MMonCommand;
+ MMonCommand *m = new MMonCommand(messenger->get_myinst());
string cmd;
for (unsigned i=0; i<args.size(); i++) {
if (i) cmd += " ";
dout(0) << "mon" << mon << " <- '" << cmd << "'" << endl;
- // start up network
- rank.start_rank();
- messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
- messenger->set_dispatcher(&dispatcher);
-
// send it
messenger->send_message(m, monmap.get_inst(mon));
mon_lease_ack_timeout: 10.0, // on leader, if lease isn't acked by all peons
mon_lease_timeout: 10.0, // on peon, if lease isn't extended
mon_accept_timeout: 10.0, // on leader, if paxos update isn't accepted
+ mon_stop_on_last_unmount: false,
mon_stop_with_last_mds: false,
// --- client ---
mds_trim_on_rejoin: true,
mds_commit_on_shutdown: true,
mds_shutdown_check: 0, //30,
- mds_shutdown_on_last_unmount: true,
mds_verify_export_dirauth: true,
g_conf.mds_commit_on_shutdown = atoi(args[++i]);
else if (strcmp(args[i], "--mds_shutdown_check") == 0)
g_conf.mds_shutdown_check = atoi(args[++i]);
- else if (strcmp(args[i], "--mds_shutdown_on_last_unmount") == 0)
- g_conf.mds_shutdown_on_last_unmount = atoi(args[++i]);
else if (strcmp(args[i], "--mds_log_flush_on_shutdown") == 0)
g_conf.mds_log_flush_on_shutdown = atoi(args[++i]);
else if (strcmp(args[i], "--mon_osd_down_out_interval") == 0)
g_conf.mon_osd_down_out_interval = atoi(args[++i]);
+ else if (strcmp(args[i], "--mon_stop_on_last_unmount") == 0)
+ g_conf.mon_stop_on_last_unmount = atoi(args[++i]);
else if (strcmp(args[i], "--mon_stop_with_last_mds") == 0)
g_conf.mon_stop_with_last_mds = atoi(args[++i]);
float mon_lease_ack_timeout;
float mon_lease_timeout;
float mon_accept_timeout;
+ bool mon_stop_on_last_unmount;
bool mon_stop_with_last_mds;
// client
bool mds_trim_on_rejoin;
bool mds_commit_on_shutdown;
int mds_shutdown_check;
- bool mds_shutdown_on_last_unmount;
bool mds_verify_export_dirauth; // debug flag
if (g_conf.clock_tare) g_clock.tare();
// osd specific args
- char *dev;
+ char *dev = 0;
+ char dev_default[20];
int whoami = -1;
for (unsigned i=0; i<args.size(); i++) {
if (strcmp(args[i],"--dev") == 0)
return -1;
}
}
+ if (whoami < 0) {
+ cerr << "must specify '--osd #' where # is the osd number" << endl;
+ }
+ if (!dev) {
+ sprintf(dev_default, "dev/osd%d", whoami);
+ dev = dev_default;
+ }
cout << "dev " << dev << endl;
beacon_seq_stamp[beacon_last_seq] = g_clock.now();
int mon = monmap->pick_mon();
- messenger->send_message(new MMDSBeacon(want_state, beacon_last_seq),
+ messenger->send_message(new MMDSBeacon(messenger->get_myinst(), want_state, beacon_last_seq),
monmap->get_inst(mon));
// schedule next sender
#include "mds/MDSMap.h"
class MMDSBeacon : public Message {
+ entity_inst_t inst;
int state;
version_t seq;
public:
MMDSBeacon() : Message(MSG_MDS_BEACON) {}
- MMDSBeacon(int st, version_t se) : Message(MSG_MDS_BEACON),
- state(st), seq(se) { }
+ MMDSBeacon(entity_inst_t i, int st, version_t se) :
+ Message(MSG_MDS_BEACON),
+ inst(i), state(st), seq(se) { }
+ entity_inst_t& get_mds_inst() { return inst; }
int get_state() { return state; }
version_t get_seq() { return seq; }
char *get_type_name() { return "mdsbeacon"; }
void print(ostream& out) {
- out << "mdsbeacon(" << MDSMap::get_state_name(state)
+ out << "mdsbeacon(" << inst
+ << " " << MDSMap::get_state_name(state)
<< " seq " << seq << ")";
}
void encode_payload() {
- payload.append((char*)&state, sizeof(state));
- payload.append((char*)&seq, sizeof(seq));
+ ::_encode(inst, payload);
+ ::_encode(state, payload);
+ ::_encode(seq, payload);
}
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(state), (char*)&state);
- off += sizeof(state);
- payload.copy(off, sizeof(seq), (char*)&seq);
- off += sizeof(seq);
+ ::_decode(inst, payload, off);
+ ::_decode(state, payload, off);
+ ::_decode(seq, payload, off);
}
};
class MMonCommand : public Message {
public:
+ entity_inst_t inst;
vector<string> cmd;
MMonCommand() : Message(MSG_MON_COMMAND) {}
+ MMonCommand(entity_inst_t &i) :
+ Message(MSG_MON_COMMAND),
+ inst(i) { }
virtual char *get_type_name() { return "mon_command"; }
void print(ostream& o) {
}
void encode_payload() {
+ ::_encode(inst, payload);
::_encode(cmd, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(inst, payload, off);
::_decode(cmd, payload, off);
}
};
class MOSDBoot : public Message {
public:
+ entity_inst_t inst;
OSDSuperblock sb;
MOSDBoot() {}
- MOSDBoot(OSDSuperblock& s) :
+ MOSDBoot(entity_inst_t i, OSDSuperblock& s) :
Message(MSG_OSD_BOOT),
+ inst(i),
sb(s) {
}
- char *get_type_name() { return "oboot"; }
+ char *get_type_name() { return "osd_boot"; }
+ void print(ostream& out) {
+ out << "osd_boot(" << inst << ")";
+ }
void encode_payload() {
- payload.append((char*)&sb, sizeof(sb));
+ ::_encode(inst, payload);
+ ::_encode(sb, payload);
}
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(sb), (char*)&sb);
- off += sizeof(sb);
+ ::_decode(inst, payload, off);
+ ::_decode(sb, payload, off);
}
};
class MOSDFailure : public Message {
public:
+ entity_inst_t from;
entity_inst_t failed;
epoch_t epoch;
MOSDFailure() {}
- MOSDFailure(entity_inst_t f, epoch_t e) :
+ MOSDFailure(entity_inst_t fr, entity_inst_t f, epoch_t e) :
Message(MSG_OSD_FAILURE),
- failed(f), epoch(e) {}
+ from(fr), failed(f), epoch(e) {}
+ entity_inst_t get_from() { return from; }
entity_inst_t get_failed() { return failed; }
epoch_t get_epoch() { return epoch; }
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(failed), (char*)&failed);
- off += sizeof(failed);
- payload.copy(off, sizeof(epoch), (char*)&epoch);
- off += sizeof(epoch);
+ ::_decode(from, payload, off);
+ ::_decode(failed, payload, off);
+ ::_decode(epoch, payload, off);
}
void encode_payload() {
- payload.append((char*)&failed, sizeof(failed));
- payload.append((char*)&epoch, sizeof(epoch));
+ ::_encode(from, payload);
+ ::_encode(failed, payload);
+ ::_encode(epoch, payload);
}
- virtual char *get_type_name() { return "osdfail"; }
+ virtual char *get_type_name() { return "osd_failure"; }
+ void print(ostream& out) {
+ out << "osd_failure(" << failed << " e" << epoch << ")";
+ }
};
#endif
mon->messenger->send_message(m, m->inst);
// auto-shutdown?
+ // (hack for fakesyn/newsyn, mostly)
if (mon->is_leader() &&
client_map.version > 1 &&
client_map.client_addr.empty() &&
- g_conf.mds_shutdown_on_last_unmount) {
+ g_conf.mon_stop_on_last_unmount) {
dout(1) << "last client unmounted" << endl;
mon->do_stop();
}
bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
{
dout(12) << "preprocess_beacon " << *m
- << " from " << m->get_source()
- << " " << m->get_source_inst()
+ << " from " << m->get_mds_inst()
<< endl;
// fw to leader?
}
// let's see.
- int from = m->get_source().num();
+ int from = m->get_mds_inst().name.num();
int state = m->get_state();
version_t seq = m->get_seq();
// boot?
if (state == MDSMap::STATE_BOOT) {
// already booted?
- int already = mdsmap.get_addr_rank(m->get_source_addr());
+ int already = mdsmap.get_addr_rank(m->get_mds_inst().addr);
if (already < 0)
return false; // need to update map
// reply to beacon?
if (state != MDSMap::STATE_OUT) {
last_beacon[from] = g_clock.now(); // note time
- mon->messenger->send_message(new MMDSBeacon(state, seq),
- m->get_source_inst());
+ mon->messenger->send_message(new MMDSBeacon(m->get_mds_inst(), state, seq),
+ m->get_mds_inst());
}
// is there a state change here?
{
// -- this is an update --
dout(12) << "handle_beacon " << *m
- << " from " << m->get_source()
- << " " << m->get_source_inst()
+ << " from " << m->get_mds_inst()
<< endl;
- int from = m->get_source().num();
+ int from = m->get_mds_inst().name.num();
int state = m->get_state();
version_t seq = m->get_seq();
} else if (mdsmap.is_out(from)) {
dout(10) << "mds_beacon boot: mds" << from << " was out, starting" << endl;
state = MDSMap::STATE_STARTING;
- } else if (mdsmap.get_inst(from) != m->get_source_inst()) {
+ } else if (!mdsmap.have_inst(from) || mdsmap.get_inst(from) != m->get_mds_inst()) {
dout(10) << "mds_beacon boot: mds" << from << " is someone else" << endl;
from = -1;
}
}
if (from < 0) {
- from = pending_mdsmap.get_addr_rank(m->get_source_addr());
+ from = pending_mdsmap.get_addr_rank(m->get_mds_inst().addr);
if (from >= 0) {
state = pending_mdsmap.mds_state[from];
dout(10) << "mds_beacon boot: already pending mds" << from
assert(state != MDSMap::STATE_BOOT);
// put it in the map.
- pending_mdsmap.mds_inst[from].addr = m->get_source_addr();
+ pending_mdsmap.mds_inst[from].addr = m->get_mds_inst().addr;
pending_mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from);
pending_mdsmap.mds_inc[from]++;
// make sure mds's are still alive
utime_t now = g_clock.now();
+ // ...if i am an active leader
if (!mon->is_leader()) return;
if (!paxos->is_active()) return;
{
int badboy = m->get_failed().name.num();
+ // weird?
+ if (!osdmap.have_inst(badboy) ||
+ osdmap.get_inst(badboy) != m->get_failed()) {
+ dout(5) << "preprocess_failure weird: " << m->get_failed() << " from " << m->get_from() << endl;
+ send_incremental(m->get_epoch(), m->get_from());
+ return true;
+ }
+
// already reported?
- if (osdmap.is_down(badboy) &&
- (!osdmap.have_inst(badboy) ||
- osdmap.get_inst(badboy) == m->get_failed())) {
- dout(5) << "preprocess_failure dup: " << m->get_failed() << " from " << m->get_source() << endl;
- send_incremental(m->get_epoch(), m->get_source_inst());
+ if (osdmap.is_down(badboy)) {
+ dout(5) << "preprocess_failure dup: " << m->get_failed() << " from " << m->get_from() << endl;
+ send_incremental(m->get_epoch(), m->get_from());
return true;
}
- dout(10) << "preprocess_failure new: " << m->get_failed() << " from " << m->get_source() << endl;
+ dout(10) << "preprocess_failure new: " << m->get_failed() << " from " << m->get_from() << endl;
return false;
}
bool OSDMonitor::prepare_failure(MOSDFailure *m)
{
- dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_source() << endl;
+ dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_from() << endl;
// FIXME
// take their word for it
int badboy = m->get_failed().name.num();
- if (osdmap.is_up(badboy) &&
- (osdmap.osd_inst.count(badboy) == 0 ||
- osdmap.osd_inst[badboy] == m->get_failed())) {
-
- pending_inc.new_down[badboy] = m->get_failed();
-
- if (osdmap.is_in(badboy))
- down_pending_out[badboy] = g_clock.now();
- }
+ assert(osdmap.is_up(badboy));
+ assert(osdmap.osd_inst[badboy] == m->get_failed());
+
+ pending_inc.new_down[badboy] = m->get_failed();
+
+ if (osdmap.is_in(badboy))
+ down_pending_out[badboy] = g_clock.now();
+ paxos->wait_for_commit(new C_Reported(this, m));
+
delete m;
return true;
}
+void OSDMonitor::_reported_failure(MOSDFailure *m)
+{
+ dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << endl;
+ send_latest(m->get_epoch(), m->get_from());
+}
+
// boot --
bool OSDMonitor::preprocess_boot(MOSDBoot *m)
{
- assert(m->get_source().is_osd());
- int from = m->get_source().num();
+ assert(m->inst.name.is_osd());
+ int from = m->inst.name.num();
// already booted?
if (osdmap.is_up(from) &&
- osdmap.get_inst(from) == m->get_source_inst()) {
+ osdmap.get_inst(from) == m->inst) {
// yup.
- dout(7) << "preprocess_boot dup from " << m->get_source() << endl;
+ dout(7) << "preprocess_boot dup from " << m->inst << endl;
_booted(m);
return true;
}
- dout(10) << "preprocess_boot from " << m->get_source() << endl;
+ dout(10) << "preprocess_boot from " << m->inst << endl;
return false;
}
bool OSDMonitor::prepare_boot(MOSDBoot *m)
{
- dout(7) << "prepare_boot from " << m->get_source() << endl;
- assert(m->get_source().is_osd());
- int from = m->get_source().num();
+ dout(7) << "prepare_boot from " << m->inst << endl;
+ assert(m->inst.name.is_osd());
+ int from = m->inst.name.num();
// does this osd exist?
if (!osdmap.exists(from)) {
// already up? mark down first?
if (osdmap.is_up(from)) {
- assert(osdmap.get_inst(from) != m->get_source_inst()); // preproces should have caught it
+ assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it
// mark previous guy down
pending_inc.new_down[from] = osdmap.osd_inst[from];
// mark new guy up.
down_pending_out.erase(from); // if any
- assert(osdmap.is_down(from));
- pending_inc.new_up[from] = m->get_source_inst();
+ pending_inc.new_up[from] = m->inst;
// mark in?
if (osdmap.out_osds.count(from))
void OSDMonitor::_booted(MOSDBoot *m)
{
- dout(7) << "_booted " << m->get_source() << endl;
- send_latest(m->sb.current_epoch, m->get_source_inst());
+ dout(7) << "_booted " << m->inst << endl;
+ send_latest(m->sb.current_epoch, m->inst);
delete m;
}
bool preprocess_failure(class MOSDFailure *m);
bool prepare_failure(class MOSDFailure *m);
+ void _reported_failure(MOSDFailure *m);
bool preprocess_boot(class MOSDBoot *m);
bool prepare_boot(class MOSDBoot *m);
cmon->dispatch((Message*)m);
}
};
+ class C_Reported : public Context {
+ OSDMonitor *cmon;
+ MOSDFailure *m;
+ public:
+ C_Reported(OSDMonitor *cm, MOSDFailure *m_) :
+ cmon(cm), m(m_) {}
+ void finish(int r) {
+ if (r >= 0)
+ cmon->_reported_failure(m);
+ else
+ cmon->dispatch((Message*)m);
+ }
+ };
bool preprocess_in(class MOSDIn *m);
bool prepare_in(class MOSDIn *m);
have_pending = false;
}
- if (g_conf.mkfs)
- _try_create_initial();
-
// make sure we update our state
if (paxos->is_active())
_active();
void PaxosService::_active()
{
dout(10) << "_active" << endl;
- update_from_paxos();
+ assert(paxos->is_active());
- if (mon->is_leader() &&
- !have_pending) {
- create_pending();
- have_pending = true;
- }
-}
+ // pull latest from paxos
+ update_from_paxos();
-void PaxosService::_try_create_initial()
-{
- if (mon->is_leader() &&
- paxos->get_version() == 0) {
-
- if (!paxos->is_writeable()) {
- dout(1) << "election_finished -- waiting for writeable to create initial state" << endl;
- paxos->wait_for_writeable(new C_CreateInitial(this));
- } else {
- // do it
- assert(have_pending == false);
+ // create pending state?
+ if (mon->is_leader()) {
+ if (!have_pending) {
create_pending();
have_pending = true;
+ }
+
+ if (g_conf.mkfs &&
+ paxos->get_version() == 0) {
create_initial();
propose_pending();
}
}
};
friend class C_Update;
- class C_CreateInitial : public Context {
- PaxosService *svc;
- public:
- C_CreateInitial(PaxosService *s) : svc(s) {}
- void finish(int r) {
- svc->_try_create_initial();
- }
- };
- friend class C_CreateInitial;
private:
bool have_pending;
void election_finished();
private:
- void _try_create_initial();
void _active();
void _commit();
// announce to monitor i exist and have booted.
int mon = monmap->pick_mon();
- messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon));
+ messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon));
// start the heart
timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
<< ", dropping and reporting to mon" << mon
<< " " << *m
<< dendl;
- messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()),
monmap->get_inst(mon));
delete m;
} else if (dest.is_mon()) {
}
// nope, incremental.
- for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
- i != inc.new_up.end();
- i++) {
- assert(down_osds.count(i->first));
- down_osds.erase(i->first);
- assert(osd_inst.count(i->first) == 0);
- osd_inst[i->first] = i->second;
- //cout << "epoch " << epoch << " up osd" << i->first << endl;
- }
for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++) {
osd_inst.erase(i->first);
//cout << "epoch " << epoch << " down osd" << i->first << endl;
}
- for (list<int>::iterator i = inc.new_in.begin();
- i != inc.new_in.end();
- i++) {
- assert(out_osds.count(*i));
- out_osds.erase(*i);
- //cout << "epoch " << epoch << " in osd" << *i << endl;
- }
for (list<int>::iterator i = inc.new_out.begin();
i != inc.new_out.end();
i++) {
out_osds.insert(*i);
//cout << "epoch " << epoch << " out osd" << *i << endl;
}
- for (map<int,float>::iterator i = inc.new_overload.begin();
- i != inc.new_overload.end();
- i++) {
- overload_osds[i->first] = i->second;
- }
for (list<int>::iterator i = inc.old_overload.begin();
i != inc.old_overload.end();
i++) {
assert(overload_osds.count(*i));
overload_osds.erase(*i);
}
+
+ for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
+ i != inc.new_up.end();
+ i++) {
+ assert(down_osds.count(i->first));
+ down_osds.erase(i->first);
+ assert(osd_inst.count(i->first) == 0);
+ osd_inst[i->first] = i->second;
+ //cout << "epoch " << epoch << " up osd" << i->first << endl;
+ }
+ for (list<int>::iterator i = inc.new_in.begin();
+ i != inc.new_in.end();
+ i++) {
+ assert(out_osds.count(*i));
+ out_osds.erase(*i);
+ //cout << "epoch " << epoch << " in osd" << *i << endl;
+ }
+ for (map<int,float>::iterator i = inc.new_overload.begin();
+ i != inc.new_overload.end();
+ i++) {
+ overload_osds[i->first] = i->second;
+ }
}
// serialize, unserialize
dout(0) << "ms_handle_failure " << dest << " inst " << inst
<< ", dropping and reporting to mon" << mon
<< endl;
- messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()),
monmap->get_inst(mon));
delete m;
} else {