rank.bind();
cout << "starting mds? at " << rank.get_rank_addr() << std::endl;
rank.start();
+
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+ //rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
// start mds
Messenger *m = rank.register_entity(entity_name_t::MDS(whoami));
osd_age_time: 0,
osd_heartbeat_interval: 1,
osd_heartbeat_grace: 30,
+ osd_failure_report_interval: 10,
osd_pg_stats_interval: 5,
osd_replay_window: 5,
osd_max_pull: 2,
int osd_age_time;
int osd_heartbeat_interval;
int osd_heartbeat_grace;
+ double osd_failure_report_interval;
int osd_pg_stats_interval;
int osd_replay_window;
int osd_max_pull;
rank.start();
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+
// start osd
Messenger *m = rank.register_entity(entity_name_t::OSD(whoami));
assert(m);
cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
rank.start();
- Rank::Policy client_policy;
- client_policy.fail_interval = 0;
- client_policy.drop_msg_callback = false;
- rank.set_policy(entity_name_t::TYPE_CLIENT, client_policy);
- rank.set_policy(entity_name_t::TYPE_MON, client_policy);
- rank.set_policy(entity_name_t::TYPE_MDS, client_policy);
- rank.set_policy(entity_name_t::TYPE_OSD, client_policy);
+ rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
list<Client*> clients;
list<SyntheticClient*> synclients;
void _encode(bufferlist &bl) {
::_encode(version, bl);
::_encode(pg_stat, bl);
+ ::_encode(osd_stat, bl);
}
void _decode(bufferlist& bl, int& off) {
::_decode(version, bl, off);
::_decode(pg_stat, bl, off);
+ ::_decode(osd_stat, bl, off);
stat_zero();
for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
p != pg_stat.end();
::close(sd);
sd = -1;
+ // lossy channel?
+ if (policy.retry_interval == 0) {
+ fail();
+ return;
+ }
+
if (q.empty()) {
if (state == STATE_CLOSING || onconnect) {
dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
class Rank {
public:
struct Policy {
- float retry_interval; // (initial)
- float fail_interval; // before we call ms_handle_failure
+ float retry_interval; // (initial). 0 => lossy channel, fail immediately.
+ float fail_interval; // before we call ms_handle_failure 0 => retry forever.
bool drop_msg_callback;
bool fail_callback;
bool remote_reset_callback;
drop_msg_callback(true),
fail_callback(true),
remote_reset_callback(true) {}
+
+ Policy(float r, float f, bool dmc, bool fc, bool rrc) :
+ retry_interval(r), fail_interval(f),
+ drop_msg_callback(dmc),
+ fail_callback(fc),
+ remote_reset_callback(rrc) {}
+
+ static Policy fast_fail() { return Policy(0, 0, true, true, true); }
+ static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
+ static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, 0, false, true, true); }
};
if (heartbeat_from_stamp[*p] < grace) {
dout(0) << "no heartbeat from osd" << *p << " since " << heartbeat_from_stamp[*p]
<< " (cutoff " << grace << ")" << dendl;
- int mon = monmap->pick_mon();
- messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), osdmap->get_inst(*p), osdmap->get_epoch()),
- monmap->get_inst(mon));
+ queue_failure(*p);
}
} else
heartbeat_from_stamp[*p] = now; // fake initial
}
-
+ maybe_report_failures();
if (logger) logger->set("hbto", heartbeat_to.size());
if (logger) logger->set("hbfrom", heartbeat_from.size());
timer.add_event_after(wait, new C_Heartbeat(this));
}
+void OSD::maybe_report_failures()
+{
+ if (pending_failures.empty())
+ return; // nothing to report
+
+ utime_t now = g_clock.now();
+ if (last_failure_report + g_conf.osd_failure_report_interval > now)
+ return; // not yet
+
+ last_failure_report = now;
+ int mon = monmap->pick_mon();
+ for (set<int>::iterator p = pending_failures.begin();
+ p != pending_failures.end();
+ p++)
+ messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(),
+ osdmap->get_inst(*p), osdmap->get_epoch()),
+ monmap->get_inst(mon));
+ pending_failures.clear();
+}
void OSD::send_pg_stats()
{
return;
}
- if (dest.is_osd()) {
- // failed osd. drop message, report to mon.
- if (!osdmap->have_inst(dest.num()) ||
- (osdmap->get_inst(dest.num()) != inst)) {
- dout(1) << "ms_handle_failure " << inst
- << ", already dropped/changed in osdmap, dropping " << *m
- << dendl;
- } else {
- int mon = monmap->pick_mon();
- dout(1) << "ms_handle_failure " << inst
- << ", dropping and reporting to mon" << mon
- << " " << *m
- << dendl;
- messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), inst,
- osdmap->get_epoch()),
- monmap->get_inst(mon));
+ if (dest.is_mon()) {
+ if (m->get_type() == MSG_PGSTATS) {
+ MPGStats *pgstats = (MPGStats*)m;
+ dout(10) << "ms_handle_failure on " << *m << ", requeuing pg stats" << dendl;
+ pg_stat_queue_lock.Lock();
+ for (map<pg_t,pg_stat_t>::iterator p = pgstats->pg_stat.begin();
+ p != pgstats->pg_stat.end();
+ p++)
+ pg_stat_queue.insert(p->first);
+ pg_stat_queue_lock.Unlock();
}
- delete m;
- } else if (dest.is_mon()) {
- // resend to a different monitor.
- int mon = monmap->pick_mon(true);
- dout(1) << "ms_handle_failure " << inst
- << ", resending to mon" << mon
- << " " << *m
- << dendl;
- messenger->send_message(m, monmap->get_inst(mon));
- }
- else {
- // client?
- dout(1) << "ms_handle_failure " << inst
- << ", dropping " << *m << dendl;
- delete m;
}
+
+ dout(1) << "ms_handle_failure " << inst
+ << ", dropping " << *m << dendl;
+ delete m;
}
i++) {
int osd = i->first;
if (osd == whoami) continue;
+ pending_failures.erase(i->first);
messenger->mark_down(osdmap->get_addr(i->first));
peer_map_epoch.erase(entity_name_t::OSD(i->first));
}
void send_pg_stats();
+ // -- failures --
+ set<int> pending_failures;
+ utime_t last_failure_report;
+ void queue_failure(int n) {
+ pending_failures.insert(n);
+ }
+ void maybe_report_failures();
+
// -- tids --
// for ops i issue
tid_t last_tid;