osd_pg_bits: 6, // bits per osd
osd_object_layout: CEPH_OBJECT_LAYOUT_HASHINO,//LINEAR,//HASHINO,
osd_pg_layout: CEPH_PG_LAYOUT_CRUSH,//LINEAR,//CRUSH,
+ osd_min_rep: 2,
osd_max_rep: 4,
osd_min_raid_width: 4,
osd_max_raid_width: 3, //6,
osd_age_time: 0,
osd_heartbeat_interval: 1,
osd_heartbeat_grace: 30,
- osd_failure_report_interval: 10,
- osd_pg_stats_interval: 5,
+ osd_mon_report_interval: 5, // pg stats, failures, up_thru, boot.
osd_replay_window: 5,
osd_max_pull: 2,
osd_pad_pg_log: false,
int osd_pg_bits;
int osd_object_layout;
int osd_pg_layout;
+ int osd_min_rep;
int osd_max_rep;
int osd_min_raid_width;
int osd_max_raid_width;
int osd_age_time;
int osd_heartbeat_interval;
int osd_heartbeat_grace;
- double osd_failure_report_interval;
- int osd_pg_stats_interval;
+ int osd_mon_report_interval;
int osd_replay_window;
int osd_max_pull;
bool osd_pad_pg_log;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MPGSTATSACK_H
+#define __MPGSTATSACK_H
+
+#include "osd/osd_types.h"
+
+class MPGStatsAck : public Message {
+public:
+ map<pg_t,eversion_t> pg_stat;
+
+ MPGStatsAck() : Message(MSG_PGSTATSACK) {}
+
+ const char *get_type_name() { return "pg_stats_ack"; }
+ void print(ostream& out) {
+ out << "pg_stats_ack";
+ }
+
+ void encode_payload() {
+ ::encode(pg_stat, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(pg_stat, p);
+ }
+};
+
+#endif
#include "MonitorStore.h"
#include "messages/MPGStats.h"
+#include "messages/MPGStatsAck.h"
#include "messages/MStatfs.h"
#include "messages/MStatfsReply.h"
case MSG_PGSTATS:
return prepare_pg_stats((MPGStats*)m);
+ case MSG_MON_COMMAND:
+ delete m;
+ return false;
+
default:
assert(0);
delete m;
pg_map.stat_osd_add(stats->osd_stat);
// pg stats
+ MPGStatsAck *ack = new MPGStatsAck;
for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
p != stats->pg_stat.end();
p++) {
pg_t pgid = p->first;
+ ack->pg_stat[pgid] = p->second.reported;
+
if ((pg_map.pg_stat.count(pgid) &&
pg_map.pg_stat[pgid].reported > p->second.reported)) {
dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported << dendl;
pg_map.stat_pg_add(pgid, pg_map.pg_stat[pgid]);
}
- delete stats;
+ paxos->wait_for_commit(new C_Stats(this, ack, stats->get_source_inst()));
return true;
}
+void PGMonitor::_updated_stats(MPGStatsAck *ack, entity_inst_t who)
+{
+ dout(7) << "_updated_stats for " << who << dendl;
+ mon->messenger->send_message(ack, who);
+}
int nrep = mon->osdmon->osdmap.pg_to_acting_osds(on, acting);
if (!nrep) {
dout(20) << "send_pg_creates " << pgid << " -> no osds in epoch "
- << pg_map.pg_stat[pgid].created << ", skipping" << dendl;
+ << mon->osdmon->osdmap.get_epoch() << ", skipping" << dendl;
continue; // blarney!
}
int osd = acting[0];
ss << "got pgmap version " << pg_map.version;
r = 0;
}
+ else if (m->cmd[1] == "send_pg_creates") {
+ send_pg_creates();
+ ss << "sent pg creates ";
+ r = 0;
+ }
}
if (r != -1) {
#include "PGMap.h"
class MPGStats;
+class MPGStatsAck;
class MStatfs;
class MMonCommand;
bool prepare_update(Message *m);
bool prepare_pg_stats(MPGStats *stats);
+ void _updated_stats(MPGStatsAck *ack, entity_inst_t who);
+
+ struct C_Stats : public Context {
+ PGMonitor *pgmon;
+ MPGStatsAck *ack;
+ entity_inst_t who;
+ C_Stats(PGMonitor *p, MPGStatsAck *a, entity_inst_t w) : pgmon(p), ack(a), who(w) {}
+ void finish(int r) {
+ pgmon->_updated_stats(ack, who);
+ }
+ };
void handle_statfs(MStatfs *statfs);
#include "messages/MGenericMessage.h"
#include "messages/MPGStats.h"
+#include "messages/MPGStatsAck.h"
#include "messages/MStatfs.h"
#include "messages/MStatfsReply.h"
case MSG_PGSTATS:
m = new MPGStats;
break;
+ case MSG_PGSTATSACK:
+ m = new MPGStatsAck;
+ break;
case CEPH_MSG_STATFS:
m = new MStatfs;
#define MSG_OSD_PG_REMOVE 84
#define MSG_OSD_PG_INFO 85
-// CEPH_MSG_PGSTATS 87
+#define MSG_PGSTATS 86
+#define MSG_PGSTATSACK 87
+
#define MSG_OSD_PG_CREATE 88
-#define MSG_PGSTATS 86
#include "messages/MOSDAlive.h"
#include "messages/MPGStats.h"
+#include "messages/MPGStatsAck.h"
#include "common/Logger.h"
#include "common/LogType.h"
memset(&my_stat, 0, sizeof(my_stat));
- last_sent_alive = 0;
+ booting = boot_pending = false;
+ up_thru_wanted = up_thru_pending = 0;
+ osd_stat_updated = osd_stat_pending = false;
stat_ops = 0;
stat_qlen = 0;
messenger->set_dispatcher(this);
// announce to monitor i exist and have booted.
- int mon = monmap->pick_mon();
- messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon));
+ booting = true;
+ do_mon_report(); // start mon report timer
- // start the heart
+ // start the heartbeat
timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
- // and stat beacon
- timer.add_event_after(g_conf.osd_pg_stats_interval, new C_Stats(this));
-
return 0;
}
}
-void OSD::send_alive(epoch_t need)
-{
- if (need > last_sent_alive) {
- last_sent_alive = osdmap->get_epoch();
- /* AAHHH FIXME, may need to retry */
- int mon = monmap->pick_mon();
- messenger->send_message(new MOSDAlive(osdmap->get_epoch()),
- monmap->get_inst(mon));
- }
-}
// -------------------------------------
} else
heartbeat_from_stamp[*p] = now; // fake initial
}
- maybe_report_failures();
if (logger) logger->set("hbto", heartbeat_to.size());
if (logger) logger->set("hbfrom", heartbeat_from.size());
timer.add_event_after(wait, new C_Heartbeat(this));
}
-void OSD::maybe_report_failures()
+
+
+// =========================================
+
+void OSD::do_mon_report()
{
- if (pending_failures.empty())
- return; // nothing to report
+ dout(7) << "do_mon_report" << dendl;
- utime_t now = g_clock.now();
- if (last_failure_report + g_conf.osd_failure_report_interval > now)
- return; // not yet
+ last_mon_report = g_clock.now();
+
+ // are prior reports still pending?
+ bool retry = false;
+ if (boot_pending) {
+ dout(10) << "boot still pending" << dendl;
+ retry = true;
+ }
+ if (osdmap->exists(whoami) &&
+ up_thru_pending < osdmap->get_up_thru(whoami)) {
+ dout(10) << "up_thru_pending " << up_thru_pending << " < " << osdmap->get_up_thru(whoami)
+ << " -- still pending" << dendl;
+ retry = true;
+ }
+ pg_stat_queue_lock.Lock();
+ if (!pg_stat_pending.empty() || osd_stat_pending) {
+ dout(10) << "requeueing pg_stat_pending" << dendl;
+ retry = true;
+ osd_stat_updated = osd_stat_updated || osd_stat_pending;
+ osd_stat_pending = false;
+ for (map<pg_t,eversion_t>::iterator p = pg_stat_pending.begin();
+ p != pg_stat_pending.end();
+ p++)
+ if (pg_stat_queue.count(p->first) == 0) // _queue value will always be >= _pending
+ pg_stat_queue[p->first] = p->second;
+ pg_stat_pending.clear();
+ }
+ pg_stat_queue_lock.Unlock();
+
+ if (retry) {
+ int oldmon = monmap->pick_mon();
+ messenger->mark_down(monmap->get_inst(oldmon).addr);
+ int mon = monmap->pick_mon(true);
+ dout(10) << "marked down old mon" << oldmon << ", chose new mon" << mon << dendl;
+ }
+
+ // do any pending reports
+ if (booting)
+ send_boot();
+ send_alive();
+ send_failures();
+ send_pg_stats();
- last_failure_report = now;
+ // reschedule
+ timer.add_event_after(g_conf.osd_mon_report_interval, new C_MonReport(this));
+}
+void OSD::send_boot()
+{
int mon = monmap->pick_mon();
- for (set<int>::iterator p = pending_failures.begin();
- p != pending_failures.end();
- p++)
+ dout(10) << "send_boot to mon" << mon << dendl;
+ messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock),
+ monmap->get_inst(mon));
+}
+
+void OSD::queue_want_up_thru(epoch_t want)
+{
+ if (want > up_thru_wanted) {
+ up_thru_wanted = want;
+
+ // expedite, a bit. WARNING this will somewhat delay other mon queries.
+ last_mon_report = g_clock.now();
+ send_alive();
+ }
+}
+
+void OSD::send_alive()
+{
+ if (!osdmap->exists(whoami))
+ return;
+ epoch_t up_thru = osdmap->get_up_thru(whoami);
+ if (up_thru_wanted < up_thru) {
+ up_thru_pending = up_thru_wanted;
+ int mon = monmap->pick_mon();
+ dout(10) << "send_alive to mon" << mon << " (want " << up_thru_wanted << ")" << dendl;
+ messenger->send_message(new MOSDAlive(osdmap->get_epoch()),
+ monmap->get_inst(mon));
+ }
+}
+
+void OSD::send_failures()
+{
+ int mon = monmap->pick_mon();
+ while (!failure_queue.empty()) {
+ int osd = *failure_queue.begin();
messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(),
- osdmap->get_inst(*p), osdmap->get_epoch()),
+ osdmap->get_inst(osd), osdmap->get_epoch()),
monmap->get_inst(mon));
- pending_failures.clear();
+ failure_queue.erase(osd);
+ }
}
void OSD::send_pg_stats()
{
- //dout(-10) << "send_pg_stats" << dendl;
- bool updated;
+ dout(10) << "send_pg_stats" << dendl;
// grab queue
- set<pg_t> q;
+ assert(pg_stat_pending.empty());
pg_stat_queue_lock.Lock();
- q.swap(pg_stat_queue);
- updated = osd_stat_updated;
+ pg_stat_pending.swap(pg_stat_queue);
+ osd_stat_pending = osd_stat_updated;
osd_stat_updated = false;
pg_stat_queue_lock.Unlock();
-
- if (!q.empty() || osd_stat_updated) {
- dout(1) << "send_pg_stats - " << q.size() << " pgs updated" << dendl;
+
+ if (!pg_stat_pending.empty() || osd_stat_pending) {
+ dout(1) << "send_pg_stats - " << pg_stat_pending.size() << " pgs updated" << dendl;
MPGStats *m = new MPGStats;
- while (!q.empty()) {
- pg_t pgid = *q.begin();
- q.erase(q.begin());
+ for (map<pg_t,eversion_t>::iterator p = pg_stat_pending.begin();
+ p != pg_stat_pending.end();
+ p++) {
+ pg_t pgid = p->first;
- if (!pg_map.count(pgid)) continue;
+ if (!pg_map.count(pgid))
+ continue;
PG *pg = pg_map[pgid];
pg->pg_stats_lock.Lock();
m->pg_stat[pgid] = pg->pg_stats;
int mon = monmap->pick_mon();
messenger->send_message(m, monmap->get_inst(mon));
}
+}
- // reschedule
- timer.add_event_after(g_conf.osd_pg_stats_interval, new C_Stats(this));
+void OSD::handle_pgstats_ack(MPGStatsAck *ack)
+{
+ dout(10) << "handle_pgstats_ack " << dendl;
+
+ for (map<pg_t,eversion_t>::iterator p = ack->pg_stat.begin();
+ p != ack->pg_stat.end();
+ p++) {
+ if (pg_stat_pending.count(p->first) == 0) {
+ dout(10) << "ignoring " << p->first << " " << p->second << dendl;
+ } else if (pg_stat_pending[p->first] <= p->second) {
+ dout(10) << "ack on " << p->first << " " << p->second << dendl;
+ pg_stat_pending.erase(p->first);
+ } else {
+ dout(10) << "still pending " << p->first << " " << pg_stat_pending[p->first]
+ << " > acked " << p->second << dendl;
+ }
+ }
+
+ if (pg_stat_pending.empty()) {
+ dout(10) << "clearing osd_stat_pending" << dendl;
+ osd_stat_pending = false;
+ }
+
+ delete ack;
}
shutdown();
delete m;
break;
+
+ case MSG_PGSTATSACK:
+ handle_pgstats_ack((MPGStatsAck*)m);
+ break;
return;
}
- if (dest.is_mon()) {
- if (m->get_type() == MSG_PGSTATS) {
- MPGStats *pgstats = (MPGStats*)m;
- dout(10) << "ms_handle_failure on " << *m << ", requeuing pg stats" << dendl;
- pg_stat_queue_lock.Lock();
- for (map<pg_t,pg_stat_t>::iterator p = pgstats->pg_stat.begin();
- p != pgstats->pg_stat.end();
- p++)
- pg_stat_queue.insert(p->first);
- pg_stat_queue_lock.Unlock();
- }
- }
-
dout(1) << "ms_handle_failure " << inst
<< ", dropping " << *m << dendl;
delete m;
{
messenger->mark_down(osdmap->get_addr(osd));
peer_map_epoch.erase(entity_name_t::OSD(osd));
- pending_failures.erase(osd);
+ failure_queue.erase(osd);
+ failure_pending.erase(osd);
heartbeat_from_stamp.erase(osd);
}
void OSD::note_up_osd(int osd)
return;
}
+ booting = boot_pending = false;
+
wait_for_no_ops();
assert(osd_lock.is_locked());
};
- // -- alive --
- epoch_t last_sent_alive;
- void send_alive(epoch_t need);
-
// -- stats --
DecayCounter stat_oprate;
int stat_ops; // ops since last heartbeat
void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
- // -- pg stats --
- Mutex pg_stat_queue_lock;
- set<pg_t> pg_stat_queue;
- bool osd_stat_updated;
+ // == monitor interaction ==
+ utime_t last_mon_report;
+
+ void do_mon_report();
- class C_Stats : public Context {
+ struct C_MonReport : public Context {
OSD *osd;
- public:
- C_Stats(OSD *o) : osd(o) {}
- void finish(int r) {
- osd->send_pg_stats();
+ C_MonReport(OSD *o) : osd(o) {}
+ void finish(int r) {
+ osd->do_mon_report();
}
};
- void send_pg_stats();
+ // -- boot --
+ bool booting, boot_pending;
+
+ void send_boot();
+
+ // -- alive --
+ epoch_t up_thru_wanted;
+ epoch_t up_thru_pending;
+
+ void queue_want_up_thru(epoch_t want);
+ void send_alive();
// -- failures --
- set<int> pending_failures;
- utime_t last_failure_report;
+ set<int> failure_queue;
+ set<int> failure_pending;
+
void queue_failure(int n) {
- pending_failures.insert(n);
+ failure_queue.insert(n);
}
- void maybe_report_failures();
+ void send_failures();
+ void handle_pgstats_ack(class MPGStatsAck *ack);
+
+ // -- pg stats --
+ Mutex pg_stat_queue_lock;
+ map<pg_t,eversion_t> pg_stat_queue;
+ map<pg_t,eversion_t> pg_stat_pending;
+ bool osd_stat_updated;
+ bool osd_stat_pending;
+
+ void send_pg_stats();
+
+
// -- tids --
// for ops i issue
crush.set_type_name(1, "domain");
crush.set_type_name(2, "pool");
+ int minrep = g_conf.osd_min_rep;
int ndom = MAX(g_conf.osd_max_rep, g_conf.osd_max_raid_width);
- if (num_osd >= ndom*2) {
+ if (num_osd >= ndom*3 &&
+ num_osd > 8) {
int ritems[ndom];
int rweights[ndom];
// rules
// replication
for (int pool=0; pool<1; pool++) {
- // size 1..ndom
- crush_rule *rule = crush_make_rule(4, pool, CEPH_PG_TYPE_REP, 1, ndom);
+ // size minrep..ndom
+ crush_rule *rule = crush_make_rule(4, pool, CEPH_PG_TYPE_REP, minrep, ndom);
crush_rule_set_step(rule, 0, CRUSH_RULE_TAKE, rootid, 0);
crush_rule_set_step(rule, 1, CRUSH_RULE_CHOOSE_FIRSTN, CRUSH_CHOOSE_N, 1); // choose N domains
crush_rule_set_step(rule, 2, CRUSH_RULE_CHOOSE_FIRSTN, 1, 0); // and 1 device in each
// replication
for (int pool=0; pool<1; pool++) {
- crush_rule *rule = crush_make_rule(3, pool, CEPH_PG_TYPE_REP, 1, g_conf.osd_max_rep);
+ crush_rule *rule = crush_make_rule(3, pool, CEPH_PG_TYPE_REP, g_conf.osd_min_rep, g_conf.osd_max_rep);
crush_rule_set_step(rule, 0, CRUSH_RULE_TAKE, rootid, 0);
crush_rule_set_step(rule, 1, CRUSH_RULE_CHOOSE_FIRSTN, CRUSH_CHOOSE_N, 0);
crush_rule_set_step(rule, 2, CRUSH_RULE_EMIT, 0, 0);
dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
<< " < same_since " << info.history.same_since
<< ", must notify monitor" << dendl;
- osd->send_alive(info.history.same_since);
+ osd->queue_want_up_thru(info.history.same_since);
return;
} else {
dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
// put in osd stat_queue
osd->pg_stat_queue_lock.Lock();
if (is_primary())
- osd->pg_stat_queue.insert(info.pgid);
+ osd->pg_stat_queue[info.pgid] = info.last_update;
osd->osd_stat_updated = true;
osd->pg_stat_queue_lock.Unlock();
}
ARGS="-d"
# start monitor
-$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 --debug_ms 1
+$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 20 --debug_ms 1
# build and inject an initial osd map
-$CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap
+$CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 8 --print .ceph_osdmap --pgbits 2
$CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap
-for osd in 0 1 #2 3
+for osd in 0 1 2 3 #4 5 6 7
do
$CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store
$CEPH_BIN/cosd $ARGS dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_fakestore 10 #--debug_osd 40