}
mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch");
- // kick pgmon, in case there are pg creations going on
- mon->pgmon->register_new_pgs();
- mon->pgmon->send_pg_creates();
+ // kick pgmon, make sure it's seen the latest map
+ mon->pgmon->check_osd_map(osdmap.epoch);
// new map!
bcast_latest_mds();
bool should_propose(double &delay);
// ...
- bool get_map_bl(epoch_t epoch, bufferlist &bl);
- bool get_inc_map_bl(epoch_t epoch, bufferlist &bl);
-
void send_to_waiting(); // send current map to waiters.
void send_full(entity_inst_t dest);
void send_incremental(entity_inst_t dest, epoch_t since);
public:
// the map
version_t version;
+ epoch_t last_osdmap_epoch; // last osdmap epoch i applied to the pgmap
epoch_t last_pg_scan; // osdmap epoch
hash_map<pg_t,pg_stat_t> pg_stat;
hash_map<int,osd_stat_t> osd_stat;
version_t version;
map<pg_t,pg_stat_t> pg_stat_updates;
map<int,osd_stat_t> osd_stat_updates;
+ set<int> osd_stat_rm;
+ epoch_t osdmap_epoch;
epoch_t pg_scan; // osdmap epoch
void _encode(bufferlist &bl) {
::_encode(version, bl);
::_encode(pg_stat_updates, bl);
::_encode(osd_stat_updates, bl);
+ ::_encode(osd_stat_rm, bl);
+ ::_encode(osdmap_epoch, bl);
::_encode(pg_scan, bl);
}
void _decode(bufferlist& bl, int& off) {
::_decode(version, bl, off);
::_decode(pg_stat_updates, bl, off);
::_decode(osd_stat_updates, bl, off);
+ ::_decode(osd_stat_rm, bl, off);
+ ::_decode(osdmap_epoch, bl, off);
::_decode(pg_scan, bl, off);
}
- Incremental() : version(0), pg_scan(0) {}
+ Incremental() : version(0), osdmap_epoch(0), pg_scan(0) {}
};
void apply_incremental(Incremental& inc) {
osd_stat[p->first] = p->second;
stat_osd_add(p->second);
}
+ for (set<int>::iterator p = inc.osd_stat_rm.begin();
+ p != inc.osd_stat_rm.end();
+ p++)
+ if (osd_stat.count(*p)) {
+ stat_osd_sub(osd_stat[*p]);
+ osd_stat.erase(*p);
+ }
+ if (inc.osdmap_epoch)
+ last_osdmap_epoch = inc.osdmap_epoch;
if (inc.pg_scan)
last_pg_scan = inc.pg_scan;
}
uint64_t total_used_kb() { return total_kb() - total_avail_kb(); }
PGMap() : version(0),
- last_pg_scan(0),
+ last_osdmap_epoch(0), last_pg_scan(0),
num_pg(0),
total_pg_num_bytes(0),
total_pg_num_blocks(0),
::_encode(version, bl);
::_encode(pg_stat, bl);
::_encode(osd_stat, bl);
+ ::_encode(last_osdmap_epoch, bl);
+ ::_encode(last_pg_scan, bl);
}
void _decode(bufferlist& bl, int& off) {
::_decode(version, bl, off);
::_decode(pg_stat, bl, off);
::_decode(osd_stat, bl, off);
+ ::_decode(last_osdmap_epoch, bl, off);
+ ::_decode(last_pg_scan, bl, off);
stat_zero();
for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
p != pg_stat.end();
// ------------------------
-struct RetryRegisterNewPgs : public Context {
+struct RetryCheckOSDMap : public Context {
PGMonitor *pgmon;
- RetryRegisterNewPgs(PGMonitor *p) : pgmon(p) {}
+ epoch_t epoch;
+ RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {}
void finish(int r) {
- pgmon->register_new_pgs();
+ pgmon->check_osd_map(epoch);
}
};
-void PGMonitor::register_new_pgs()
+void PGMonitor::check_osd_map(epoch_t epoch)
{
if (mon->is_peon())
return; // whatever.
+ if (pg_map.last_osdmap_epoch >= epoch) {
+ dout(10) << "check_osd_map already seen " << pg_map.last_osdmap_epoch << " >= " << epoch << dendl;
+ return;
+ }
+
if (!mon->osdmon->paxos->is_readable()) {
dout(10) << "register_new_pgs -- osdmap not readable, waiting" << dendl;
- mon->osdmon->paxos->wait_for_readable(new RetryRegisterNewPgs(this));
+ mon->osdmon->paxos->wait_for_readable(new RetryCheckOSDMap(this, epoch));
return;
}
if (!paxos->is_writeable()) {
dout(10) << "register_new_pgs -- pgmap not writeable, waiting" << dendl;
- paxos->wait_for_writeable(new RetryRegisterNewPgs(this));
+ paxos->wait_for_writeable(new RetryCheckOSDMap(this, epoch));
return;
}
+ // apply latest map(s)
+ for (epoch_t e = pg_map.last_osdmap_epoch+1;
+ e <= epoch;
+ e++) {
+ dout(10) << "check_osd_map applying osdmap e" << e << " to pg_map" << dendl;
+ bufferlist bl;
+ mon->store->get_bl_sn(bl, "osdmap", e);
+ assert(bl.length());
+ OSDMap::Incremental inc;
+ int off = 0;
+ inc.decode(bl, off);
+ for (map<int32_t,uint32_t>::iterator p = inc.new_offload.begin();
+ p != inc.new_offload.end();
+ p++)
+ if (p->second == 0x10000) {
+ dout(10) << "check_osd_map osd" << p->first << " went OUT" << dendl;
+ pending_inc.osd_stat_rm.insert(p->first);
+ } else {
+ dout(10) << "check_osd_map osd" << p->first << " is IN" << dendl;
+ pending_inc.osd_stat_rm.erase(p->first);
+ pending_inc.osd_stat_updates[p->first];
+ }
+ }
+
+ bool propose = false;
+ if (pg_map.last_osdmap_epoch < epoch) {
+ pending_inc.osdmap_epoch = epoch;
+ propose = true;
+ }
+
+ // scan pg space?
+ if (register_new_pgs())
+ propose = true;
+
+ if (propose)
+ propose_pending();
+
+ send_pg_creates();
+}
+
+bool PGMonitor::register_new_pgs()
+{
+
+
dout(10) << "osdmap last_pg_change " << mon->osdmon->osdmap.get_last_pg_change()
<< ", pgmap last_pg_scan " << pg_map.last_pg_scan << dendl;
if (mon->osdmon->osdmap.get_last_pg_change() <= pg_map.last_pg_scan ||
mon->osdmon->osdmap.get_last_pg_change() <= pending_inc.pg_scan) {
dout(10) << "register_new_pgs -- i've already scanned pg space since last significant osdmap update" << dendl;
- return;
+ return false;
}
// iterate over crush mapspace
if (created) {
last_sent_pg_create.clear(); // reset pg_create throttle timer
pending_inc.pg_scan = epoch;
- propose_pending();
+ return true;
}
+ return false;
}
void PGMonitor::send_pg_creates()
map<int,utime_t> last_sent_pg_create; // per osd throttle
+ bool register_new_pgs();
+ void send_pg_creates();
+
public:
PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
void tick(); // check state, take actions
- void register_new_pgs();
- void send_pg_creates();
+ void check_osd_map(epoch_t epoch);