*p += 4; /* skip length field (should match max) */
ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
- *p += sizeof(u32) + map->max_osd * sizeof(u32); /* osd_up_from */
- *p += sizeof(u32) + map->max_osd * sizeof(u32); /* osd_up_thru */
-
- /* ignore pg primary swapping */
- ceph_decode_32_safe(p, end, len, bad);
- p += len * (sizeof(u64) + sizeof(u32));
- if (len)
- derr(0, "WARNING: pg primary swaps in osdmap e%d unsupported\n",
- map->epoch);
-
- /* ignore max_snap, removed_snaps */
- *p += sizeof(u64);
- ceph_decode_32_safe(p, end, len, bad);
- *p += len * 2 * sizeof(u64);
-
/* crush */
ceph_decode_32_safe(p, end, len, bad);
dout(30, "osdmap_decode crush len %d from off 0x%x\n", len,
goto bad;
}
+ /* ignore the rest of the map */
+ *p = end;
+
dout(30, "osdmap_decode done %p %p\n", *p, end);
- if (*p != end)
- goto bad;
return map;
bad:
map->osd_weight[osd] = off;
}
- /* skip new_up_thru */
- ceph_decode_32_safe(p, end, len, bad);
- *p += len * 2 * sizeof(u32);
-
- /* skip old/new pg_swap stuff */
- ceph_decode_32_safe(p, end, len, bad);
- *p += len * (sizeof(u64) + sizeof(u32));
- if (len)
- derr(0, "WARNING: pg primary swaps in osdmap e%d unsupported\n",
- epoch);
- ceph_decode_32_safe(p, end, len, bad);
- *p += len * sizeof(u64);
- if (len)
- derr(0, "WARNING: pg primary swaps in osdmap e%d unsupported\n",
- epoch);
-
- /* skip new_max_snap, removed_snaps */
- *p += sizeof(u64);
- ceph_decode_32_safe(p, end, len, bad);
- *p += len * 2 * sizeof(u64);
-
- if (*p != end) {
- derr(10, "osdmap incremental has trailing gunk?\n");
- goto bad;
- }
+ /* ignore the rest */
+ *p = end;
return map;
bad:
map<int32_t,uint8_t> new_down;
map<int32_t,uint32_t> new_weight;
map<int32_t,epoch_t> new_up_thru;
+ map<int32_t,pair<epoch_t,epoch_t> > new_last_clean_interval;
map<pg_t,uint32_t> new_pg_swap_primary;
list<pg_t> old_pg_swap_primary;
interval_set<snapid_t> removed_snaps;
void encode(bufferlist& bl) {
+ // base
::encode(fsid, bl);
::encode(epoch, bl);
::encode(ctime, bl);
::encode(new_flags, bl);
::encode(fullmap, bl);
::encode(crush, bl);
+
::encode(new_max_osd, bl);
::encode(new_pg_num, bl);
::encode(new_pgp_num, bl);
::encode(new_up, bl);
::encode(new_down, bl);
::encode(new_weight, bl);
+
+ // extended
::encode(new_up_thru, bl);
+ ::encode(new_last_clean_interval, bl);
::encode(new_pg_swap_primary, bl);
::encode(old_pg_swap_primary, bl);
::encode(new_max_snap, bl);
::encode(removed_snaps.m, bl);
}
void decode(bufferlist::iterator &p) {
+ // base
::decode(fsid, p);
::decode(epoch, p);
::decode(ctime, p);
::decode(new_flags, p);
::decode(fullmap, p);
::decode(crush, p);
+
::decode(new_max_osd, p);
::decode(new_pg_num, p);
::decode(new_pgp_num, p);
::decode(new_up, p);
::decode(new_down, p);
::decode(new_weight, p);
+
+ // extended
::decode(new_up_thru, p);
+ ::decode(new_last_clean_interval, p);
::decode(new_pg_swap_primary, p);
::decode(old_pg_swap_primary, p);
::decode(new_max_snap, p);
vector<uint8_t> osd_state;
vector<entity_addr_t> osd_addr;
vector<__u32> osd_weight; // 16.16 fixed point, 0x10000 = "in", 0 = "out"
+
+ /*
+ * we track up to two intervals during which the osd was alive and
+ * healthy. the most recent is [up_from,up_thru), where up_thru is the
+ * last epoch the osd is known to have _started_. i.e., a lower bound on the
+ * actual osd death.
+ *
+ * the second is last_clean_interval [first,last]. in that case, the last
+ * interval is the last epoch known to have been either _finished_, or during
+ * which the osd cleanly shut down.
+ */
vector<epoch_t> osd_up_from; // when it went up
- vector<epoch_t> osd_up_thru; // lower bound on _actual_ osd death. bumped by osd before activating pgs with no replicas.
+ vector<epoch_t> osd_up_thru; // lower bound on _actual_ osd death.
+ vector<pair<epoch_t,epoch_t> > osd_last_clean_interval;
+
map<pg_t,uint32_t> pg_swap_primary; // force new osd to be pg primary (if already a member)
snapid_t max_snap;
interval_set<snapid_t> removed_snaps;
osd_state.resize(m);
osd_up_from.resize(m);
osd_up_thru.resize(m);
+ osd_last_clean_interval.resize(m);
osd_weight.resize(m);
for (; o<max_osd; o++) {
osd_state[o] = 0;
osd_up_from[o] = 0;
osd_up_thru[o] = 0;
+ osd_last_clean_interval[o].first = 0;
+ osd_last_clean_interval[o].second = 0;
osd_weight[o] = CEPH_OSD_OUT;
}
osd_addr.resize(m);
bool exists(int osd) { return osd < max_osd/* && osd_state[osd] & CEPH_OSD_EXISTS*/; }
bool is_up(int osd) { return exists(osd) && osd_state[osd] & CEPH_OSD_UP; }
bool is_down(int osd) { assert(exists(osd)); return !is_up(osd); }
- bool is_down_clean(int osd) {
- assert(exists(osd));
- return is_down(osd) && osd_state[osd] & CEPH_OSD_CLEAN;
- }
bool is_out(int osd) { return !exists(osd) || get_weight(osd) == CEPH_OSD_OUT; }
bool is_in(int osd) { return exists(osd) && !is_out(osd); }
assert(exists(osd));
return osd_up_thru[osd];
}
+ pair<epoch_t,epoch_t> get_last_clean_interval(int osd) {
+ assert(exists(osd));
+ return osd_last_clean_interval[osd];
+ }
int get_any_up_osd() {
for (int i=0; i<max_osd; i++)
i++)
osd_up_thru[i->first] = i->second;
+ for (map<int32_t,pair<epoch_t,epoch_t> >::iterator i = inc.new_last_clean_interval.begin();
+ i != inc.new_last_clean_interval.end();
+ i++)
+ osd_last_clean_interval[i->first] = i->second;
+
for (map<int32_t,entity_addr_t>::iterator i = inc.new_up.begin();
i != inc.new_up.end();
i++) {
// serialize, unserialize
void encode(bufferlist& blist) {
+ // base
::encode(fsid, blist);
::encode(epoch, blist);
::encode(ctime, blist);
::encode(osd_state, blist);
::encode(osd_weight, blist);
::encode(osd_addr, blist);
+
+ // crush
+ bufferlist cbl;
+ crush.encode(cbl);
+ ::encode(cbl, blist);
+
+ // extended
::encode(osd_up_from, blist);
::encode(osd_up_thru, blist);
+ ::encode(osd_last_clean_interval, blist);
::encode(pg_swap_primary, blist);
::encode(max_snap, blist);
::encode(removed_snaps.m, blist);
-
- bufferlist cbl;
- crush.encode(cbl);
- ::encode(cbl, blist);
}
void decode(bufferlist& blist) {
bufferlist::iterator p = blist.begin();
+ // base
::decode(fsid, p);
::decode(epoch, p);
::decode(ctime, p);
::decode(osd_state, p);
::decode(osd_weight, p);
::decode(osd_addr, p);
+
+ // crush
+ bufferlist cbl;
+ ::decode(cbl, p);
+ bufferlist::iterator cblp = cbl.begin();
+ crush.decode(cblp);
+
+ // extended
::decode(osd_up_from, p);
::decode(osd_up_thru, p);
+ ::decode(osd_last_clean_interval, p);
::decode(pg_swap_primary, p);
::decode(max_snap, p);
::decode(removed_snaps.m, p);
-
- bufferlist cbl;
- ::decode(cbl, p);
- bufferlist::iterator cblp = cbl.begin();
- crush.decode(cblp);
}