}
mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch");
- // mkpg flag set?
- if (osdmap.is_creating_pgs()) {
- mon->pgmon->register_new_pgs();
- mon->pgmon->send_pg_creates();
- }
+ // kick pgmon, in case there are pg creations going on
+ mon->pgmon->register_new_pgs();
+ mon->pgmon->send_pg_creates();
// new map!
bcast_latest_mds();
};
*/
-void OSDMonitor::try_clear_mkpg_flag()
-{
- if (paxos->is_writeable()) {
- dout(10) << "clear_mkpg_flag" << dendl;
- pending_inc.mkpg |= OSDMap::Incremental::MKPG_FINISH;
- //} else if (!mon->is_peon()) {
- //dout(10) << "clear_mkpg_flag -- waiting for writeable" << dendl;
- //paxos->wait_for_writeable(new RetryClearMkpg(this));
- }
-}
-
void OSDMonitor::encode_pending(bufferlist &bl)
{
dout(10) << "encode_pending e " << pending_inc.epoch
// finalize up pending_inc
pending_inc.ctime = g_clock.now();
- if ((pending_inc.mkpg & OSDMap::Incremental::MKPG_START) == 0 &&
- (pending_inc.crush.length() ||
- pending_inc.fullmap.length() ||
- pending_inc.new_pg_num ||
- pending_inc.new_localized_pg_num)) {
- dout(2) << " setting mkpg flag" << dendl;
- pending_inc.mkpg |= OSDMap::Incremental::MKPG_START;
- }
-
+
// tell me about it
for (map<int32_t,uint8_t>::iterator i = pending_inc.new_down.begin();
i != pending_inc.new_down.end();
void mark_all_down();
- void try_clear_mkpg_flag();
-
void send_latest(entity_inst_t i, epoch_t start=0);
void fake_osd_failure(int osd, bool down);
public:
// the map
version_t version;
- epoch_t last_mkpg_scan; // osdmap epoch
+ epoch_t last_pg_scan; // osdmap epoch
hash_map<pg_t,pg_stat_t> pg_stat;
hash_map<int,osd_stat_t> osd_stat;
version_t version;
map<pg_t,pg_stat_t> pg_stat_updates;
map<int,osd_stat_t> osd_stat_updates;
- epoch_t mkpg_scan; // osdmap epoch
+ epoch_t pg_scan; // osdmap epoch
void _encode(bufferlist &bl) {
::_encode(version, bl);
::_encode(pg_stat_updates, bl);
::_encode(osd_stat_updates, bl);
- ::_encode(mkpg_scan, bl);
+ ::_encode(pg_scan, bl);
}
void _decode(bufferlist& bl, int& off) {
::_decode(version, bl, off);
::_decode(pg_stat_updates, bl, off);
::_decode(osd_stat_updates, bl, off);
- ::_decode(mkpg_scan, bl, off);
+ ::_decode(pg_scan, bl, off);
}
- Incremental() : version(0), mkpg_scan(0) {}
+ Incremental() : version(0), pg_scan(0) {}
};
void apply_incremental(Incremental& inc) {
osd_stat[p->first] = p->second;
stat_osd_add(p->second);
}
- if (inc.mkpg_scan)
- last_mkpg_scan = inc.mkpg_scan;
+ if (inc.pg_scan)
+ last_pg_scan = inc.pg_scan;
}
// aggregate stats (soft state)
int64_t total_osd_num_blocks_avail;
int64_t total_osd_num_objects;
- set<pg_t> creating_pgs;
+ set<pg_t> creating_pgs; // lru: front = new additions, back = recently pinged
void stat_zero() {
num_pg = 0;
}
PGMap() : version(0),
- last_mkpg_scan(0),
+ last_pg_scan(0),
num_pg(0),
total_pg_num_bytes(0),
total_pg_num_blocks(0),
pg_map._encode(bl);
mon->store->put_bl_ss(bl, "pgmap", "latest");
- // not creating pgs?
- if (mon->osdmon->osdmap.is_creating_pgs() &&
- pg_map.creating_pgs.empty()) {
- // this may not work if osdmap not currently writeable.. but we'll get it eventually!
- mon->osdmon->try_clear_mkpg_flag();
- } else {
- send_pg_creates();
- }
+ send_pg_creates();
return true;
}
void PGMonitor::register_new_pgs()
{
- if (mon->is_peon()) return; // whatever.
+ if (mon->is_peon())
+ return; // whatever.
+
if (!mon->osdmon->paxos->is_readable()) {
dout(10) << "register_new_pgs -- osdmap not readable, waiting" << dendl;
mon->osdmon->paxos->wait_for_readable(new RetryRegisterNewPgs(this));
return;
}
+
if (!paxos->is_writeable()) {
dout(10) << "register_new_pgs -- pgmap not writeable, waiting" << dendl;
paxos->wait_for_writeable(new RetryRegisterNewPgs(this));
return;
}
- dout(10) << "map pg_creating_from = " << mon->osdmon->osdmap.get_creating_pgs_from()
- << " vs last_mkpg_scan " << pg_map.last_mkpg_scan
- << dendl;
- if (mon->osdmon->osdmap.get_creating_pgs_from() <=
- pg_map.last_mkpg_scan) {
+ dout(10) << "osdmap last_pg_change " << mon->osdmon->osdmap.get_last_pg_change()
+ << ", pgmap last_pg_scan " << pg_map.last_pg_scan << dendl;
+ if (mon->osdmon->osdmap.get_last_pg_change() <= pg_map.last_pg_scan ||
+ mon->osdmon->osdmap.get_last_pg_change() <= pending_inc.pg_scan) {
dout(10) << "register_new_pgs -- i've already scanned pg space since last significant osdmap update" << dendl;
return;
}
}
dout(10) << "register_new_pgs registered " << created << " new pgs" << dendl;
if (created) {
- last_pg_create.clear(); // reset pg_create throttle timer
- pending_inc.mkpg_scan = epoch;
+ last_sent_pg_create.clear(); // reset pg_create throttle timer
+ pending_inc.pg_scan = epoch;
propose_pending();
}
}
-void PGMonitor::send_pg_creates(int onlyosd)
+void PGMonitor::send_pg_creates()
{
dout(10) << "send_pg_creates to " << pg_map.creating_pgs.size() << " pgs" << dendl;
+
map<int, MOSDPGCreate*> msg;
+ utime_t now = g_clock.now();
for (set<pg_t>::iterator p = pg_map.creating_pgs.begin();
p != pg_map.creating_pgs.end();
pg_t pgid = *p;
vector<int> acting;
int nrep = mon->osdmon->osdmap.pg_to_acting_osds(pgid, acting);
- if (!nrep) {
- dout(20) << "send_pg_creates " << pgid << " -> no up devices, skipping" << dendl;
+ if (!nrep)
continue; // blarney!
- }
int osd = acting[0];
- if (onlyosd >= 0 && osd != onlyosd) continue;
- dout(20) << "send_pg_creates " << pgid << " -> osd" << osd << dendl;
+
+ // throttle?
+ if (last_sent_pg_create.count(osd) &&
+ now - g_conf.mon_pg_create_interval < last_sent_pg_create[osd])
+ continue;
+
+ dout(20) << "send_pg_creates " << pgid << " -> osd" << osd
+ << " in epoch " << pg_map.pg_stat[pgid].created << dendl;
if (msg.count(osd) == 0)
msg[osd] = new MOSDPGCreate(mon->osdmon->osdmap.get_epoch());
msg[osd]->mkpg[pgid] = pg_map.pg_stat[pgid].created;
}
- utime_t now = g_clock.now();
for (map<int, MOSDPGCreate*>::iterator p = msg.begin();
p != msg.end();
p++) {
- if (now - g_conf.mon_pg_create_interval < last_pg_create[p->first]) {
- dout(10) << "NOT sending pg_create to osd" << p->first << dendl;
- continue; // throttle
- } else {
- dout(10) << "sending pg_create to osd" << p->first << dendl;
- mon->messenger->send_message(p->second, mon->osdmon->osdmap.get_inst(p->first));
- last_pg_create[p->first] = g_clock.now();
- }
+ dout(10) << "sending pg_create to osd" << p->first << dendl;
+ mon->messenger->send_message(p->second, mon->osdmon->osdmap.get_inst(p->first));
+ last_sent_pg_create[p->first] = g_clock.now();
}
}
void handle_statfs(MStatfs *statfs);
bool handle_pg_stats(MPGStats *stats);
- map<int,utime_t> last_pg_create; // per osd throttle
+ map<int,utime_t> last_sent_pg_create; // per osd throttle
public:
PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
void register_new_pgs();
- void send_pg_creates(int onlyosd=-1);
+ void send_pg_creates();
};
dout(10) << "mkpg " << pgid << " creating now" << dendl;
try_create_pg(pgid, t);
} else {
- dout(10) << "mkpg " << pgid << " querying priors" << dendl;
+ dout(10) << "mkpg " << pgid << " e " << created << " : querying priors" << dendl;
assert(0);
}
}
epoch_t epoch; // new epoch; we are a diff from epoch-1 to epoch
utime_t ctime;
+ bool is_pg_change() {
+ return (fullmap.length() ||
+ crush.length() ||
+ new_pg_num ||
+ new_localized_pg_num);
+ }
+
// full (rare)
bufferlist fullmap; // in leiu of below.
bufferlist crush;
map<pg_t,uint32_t> new_pg_swap_primary;
list<pg_t> old_pg_swap_primary;
- static const __u8 MKPG_START = 1;
- static const __u8 MKPG_FINISH = 2;
- __u8 mkpg;
-
void encode(bufferlist& bl) {
::_encode(fsid, bl);
::_encode(epoch, bl);
::_encode(new_offload, bl);
::_encode(new_pg_swap_primary, bl);
::_encode(old_pg_swap_primary, bl);
- ::_encode(mkpg, bl);
}
void decode(bufferlist& bl, int& off) {
::_decode(fsid, bl, off);
::_decode(new_offload, bl, off);
::_decode(new_pg_swap_primary, bl, off);
::_decode(old_pg_swap_primary, bl, off);
- ::_decode(mkpg, bl, off);
}
- Incremental(epoch_t e=0) : epoch(e), new_max_osd(-1), new_pg_num(0), new_localized_pg_num(0), mkpg(false) {
+ Incremental(epoch_t e=0) : epoch(e), new_max_osd(-1), new_pg_num(0), new_localized_pg_num(0) {
fsid.major = fsid.minor = cpu_to_le64(0);
}
};
int32_t localized_pg_num_mask; // ditto
// new pgs
- epoch_t creating_pgs_from; // most recent epoch initiating possible pg creation
+ epoch_t last_pg_change; // most recent epoch initiating possible pg creation
int32_t max_osd;
vector<uint8_t> osd_state;
public:
OSDMap() : epoch(0),
pg_num(0), localized_pg_num(0),
- creating_pgs_from(0),
+ last_pg_change(0),
max_osd(0) {
fsid.major = fsid.minor = cpu_to_le64(0);
calc_pg_masks();
const utime_t& get_ctime() const { return ctime; }
const utime_t& get_mtime() const { return mtime; }
- bool is_creating_pgs() const {
- return creating_pgs_from;
- }
- epoch_t get_creating_pgs_from() const {
- return creating_pgs_from;
+ epoch_t get_last_pg_change() const {
+ return last_pg_change;
}
/***** cluster state *****/
ctime = inc.ctime;
// full map?
- if (inc.fullmap.length())
+ if (inc.fullmap.length())
decode(inc.fullmap);
-
- // mkpg flags?
- if (inc.mkpg & Incremental::MKPG_FINISH)
- creating_pgs_from = 0;
- if (inc.mkpg & Incremental::MKPG_START)
- creating_pgs_from = epoch;
+
+ if (inc.is_pg_change())
+ last_pg_change = epoch;
if (inc.fullmap.length())
return;
::_encode(mtime, blist);
::_encode(pg_num, blist);
::_encode(localized_pg_num, blist);
- ::_encode(creating_pgs_from, blist);
+ ::_encode(last_pg_change, blist);
::_encode(max_osd, blist);
::_encode(osd_state, blist);
::_decode(localized_pg_num, blist, off);
calc_pg_masks();
- ::_decode(creating_pgs_from, blist, off);
+ ::_decode(last_pg_change, blist, off);
::_decode(max_osd, blist, off);
::_decode(osd_state, blist, off);
ARGS="-d --bind $IP -o out --debug_ms 1"
# start monitor
-$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 --debug_ms 1
+$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 20 --debug_ms 1
# build and inject an initial osd map
$CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap