We use a pair<pg_notify_t,PastIntervals> everywhere a pg_notify_t is used.
This is silly; just make it a member instead.
Include some minor compat cruft so we can speak to pre-octopus OSDs.
Signed-off-by: Sage Weil <sage@redhat.com>
std::vector<OperationRef> ret;
ret.reserve(m->get_pg_list().size());
const int from = m->get_source().num();
- for (auto &p : m->get_pg_list()) {
- auto& [pg_notify, past_intervals] = p;
+ for (auto& pg_notify : m->get_pg_list()) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
MNotifyRec notify{pgid,
pg_shard_t{from, pg_notify.from},
pg_notify,
- 0, // the features is not used
- past_intervals};
+ 0}; // the features is not used
logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
auto create_info = new PGCreateInfo{
pgid,
pg_notify.query_epoch,
pg_notify.info.history,
- past_intervals,
+ pg_notify.past_intervals,
false};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
std::vector<OperationRef> ret;
ret.reserve(m->pg_list.size());
const int from = m->get_source().num();
- for (auto &p : m->pg_list) {
- auto& pg_notify = p.first;
+ for (auto& pg_notify : m->pg_list) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
MInfoRec info{pg_shard_t{from, pg_notify.from},
from.shard, pgid.shard,
evt.get_epoch_sent(),
osd.get_shard_services().get_osdmap()->get_epoch(),
- empty),
- PastIntervals());
+ empty,
+ PastIntervals()));
}
};
class MOSDPGInfo : public Message {
private:
- static constexpr int HEAD_VERSION = 5;
+ static constexpr int HEAD_VERSION = 6;
static constexpr int COMPAT_VERSION = 5;
epoch_t epoch = 0;
public:
- using pg_list_t = std::vector<std::pair<pg_notify_t,PastIntervals>>;
+ using pg_list_t = std::vector<pg_notify_t>;
pg_list_t pg_list;
epoch_t get_epoch() const { return epoch; }
++i) {
if (i != pg_list.begin())
out << " ";
- out << i->first << "=" << i->second;
+ out << *i;
}
out << " epoch " << epoch
<< ")";
void encode_payload(uint64_t features) override {
using ceph::encode;
+ header.version = HEAD_VERSION;
encode(epoch, payload);
+ if (!HAVE_FEATURE(features, SERVER_OCTOPUS)) {
+ // pretend to be vector<pair<pg_notify_t,PastIntervals>>
+ header.version = 5;
+ encode((uint32_t)pg_list.size(), payload);
+ for (auto& i : pg_list) {
+ encode(i, payload); // this embeds a dup (ignored) PastIntervals
+ encode(i.past_intervals, payload);
+ }
+ return;
+ }
encode(pg_list, payload);
}
void decode_payload() override {
auto p = payload.cbegin();
decode(epoch, p);
+ if (header.version == 5) {
+ // decode legacy vector<pair<pg_notify_t,PastIntervals>>
+ uint32_t num;
+ decode(num, p);
+ pg_list.resize(num);
+ for (unsigned i = 0; i < num; ++i) {
+ decode(pg_list[i], p);
+ decode(pg_list[i].past_intervals, p);
+ }
+ return;
+ }
decode(pg_list, p);
}
private:
class MOSDPGNotify : public Message {
private:
- static constexpr int HEAD_VERSION = 6;
+ static constexpr int HEAD_VERSION = 7;
static constexpr int COMPAT_VERSION = 6;
epoch_t epoch = 0;
/// the current epoch if this is not being sent in response to a
/// query. This allows the recipient to disregard responses to old
/// queries.
- using pg_list_t = std::vector<std::pair<pg_notify_t,PastIntervals>>;
- pg_list_t pg_list; // pgid -> version
+ using pg_list_t = std::vector<pg_notify_t>;
+ pg_list_t pg_list;
public:
version_t get_epoch() const { return epoch; }
void encode_payload(uint64_t features) override {
using ceph::encode;
+ header.version = HEAD_VERSION;
encode(epoch, payload);
+ if (!HAVE_FEATURE(features, SERVER_OCTOPUS)) {
+ // pretend to be vector<pair<pg_notify_t,PastIntervals>>
+ header.version = 6;
+ encode((uint32_t)pg_list.size(), payload);
+ for (auto& i : pg_list) {
+ encode(i, payload); // this embeds a dup (ignored) PastIntervals
+ encode(i.past_intervals, payload);
+ }
+ return;
+ }
encode(pg_list, payload);
}
void decode_payload() override {
auto p = payload.cbegin();
decode(epoch, p);
+ if (header.version == 6) {
+ // decode legacy vector<pair<pg_notify_t,PastIntervals>>
+ uint32_t num;
+ decode(num, p);
+ pg_list.resize(num);
+ for (unsigned i = 0; i < num; ++i) {
+ decode(pg_list[i], p);
+ decode(pg_list[i].past_intervals, p);
+ }
+ return;
+ }
decode(pg_list, p);
}
void print(ostream& out) const override {
++i) {
if (i != pg_list.begin())
out << " ";
- out << i->first << "=" << i->second;
+ out << *i;
}
out << " epoch " << epoch
<< ")";
*/
void OSD::do_notifies(
- map<int,vector<pair<pg_notify_t,PastIntervals> > >& notify_list,
+ map<int,vector<pg_notify_t>>& notify_list,
OSDMapRef curmap)
{
- for (map<int,
- vector<pair<pg_notify_t,PastIntervals> > >::iterator it =
- notify_list.begin();
- it != notify_list.end();
- ++it) {
- if (!curmap->is_up(it->first)) {
- dout(20) << __func__ << " skipping down osd." << it->first << dendl;
+ for (auto& [osd, notifies] : notify_list) {
+ if (!curmap->is_up(osd)) {
+ dout(20) << __func__ << " skipping down osd." << osd << dendl;
continue;
}
ConnectionRef con = service.get_con_osd_cluster(
- it->first, curmap->get_epoch());
+ osd, curmap->get_epoch());
if (!con) {
- dout(20) << __func__ << " skipping osd." << it->first
- << " (NULL con)" << dendl;
+ dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
continue;
}
service.maybe_share_map(con.get(), curmap);
- dout(7) << __func__ << " osd." << it->first
- << " on " << it->second.size() << " PGs" << dendl;
+ dout(7) << __func__ << " osd." << osd
+ << " on " << notifies.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
- std::move(it->second));
+ std::move(notifies));
con->send_message(m);
}
}
}
-void OSD::do_infos(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >& info_map,
+void OSD::do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef curmap)
{
- for (map<int,
- vector<pair<pg_notify_t, PastIntervals> > >::iterator p =
- info_map.begin();
- p != info_map.end();
- ++p) {
- if (!curmap->is_up(p->first)) {
- dout(20) << __func__ << " skipping down osd." << p->first << dendl;
+ for (auto& [osd, notifies] : info_map) {
+ if (!curmap->is_up(osd)) {
+ dout(20) << __func__ << " skipping down osd." << osd << dendl;
continue;
}
- for (vector<pair<pg_notify_t,PastIntervals> >::iterator i = p->second.begin();
- i != p->second.end();
- ++i) {
- dout(20) << __func__ << " sending info " << i->first.info
- << " to shard " << p->first << dendl;
+ for (auto& i : notifies) {
+ dout(20) << __func__ << " sending info " << i.info
+ << " to osd " << osd << dendl;
}
ConnectionRef con = service.get_con_osd_cluster(
- p->first, curmap->get_epoch());
+ osd, curmap->get_epoch());
if (!con) {
- dout(20) << __func__ << " skipping osd." << p->first
- << " (NULL con)" << dendl;
+ dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
continue;
}
service.maybe_share_map(con.get(), curmap);
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
- m->pg_list = p->second;
+ m->pg_list = std::move(notifies);
con->send_message(m);
}
info_map.clear();
}
int from = m->get_source().num();
for (auto& p : m->get_pg_list()) {
- spg_t pgid(p.first.info.pgid.pgid, p.first.to);
+ spg_t pgid(p.info.pgid.pgid, p.to);
enqueue_peering_evt(
pgid,
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
- p.first.epoch_sent,
- p.first.query_epoch,
+ p.epoch_sent,
+ p.query_epoch,
MNotifyRec(
- pgid, pg_shard_t(from, p.first.from),
- p.first,
- m->get_connection()->get_features(),
- p.second),
+ pgid, pg_shard_t(from, p.from),
+ p,
+ m->get_connection()->get_features()),
true,
new PGCreateInfo(
pgid,
- p.first.query_epoch,
- p.first.info.history,
- p.second,
+ p.query_epoch,
+ p.info.history,
+ p.past_intervals,
false)
)));
}
int from = m->get_source().num();
for (auto& p : m->pg_list) {
enqueue_peering_evt(
- spg_t(p.first.info.pgid.pgid, p.first.to),
+ spg_t(p.info.pgid.pgid, p.to),
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
- p.first.epoch_sent, p.first.query_epoch,
+ p.epoch_sent, p.query_epoch,
MInfoRec(
- pg_shard_t(from, p.first.from),
- p.first.info,
- p.first.epoch_sent)))
+ pg_shard_t(from, p.from),
+ p.info,
+ p.epoch_sent)))
);
}
m->put();
osdmap->get_epoch(), empty,
q.query.epoch_sent);
} else {
- vector<pair<pg_notify_t,PastIntervals>> ls;
+ vector<pg_notify_t> ls;
ls.push_back(
- make_pair(
- pg_notify_t(
- q.query.from, q.query.to,
- q.query.epoch_sent,
- osdmap->get_epoch(),
- empty),
+ pg_notify_t(
+ q.query.from, q.query.to,
+ q.query.epoch_sent,
+ osdmap->get_epoch(),
+ empty,
PastIntervals()));
m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
}
void dispatch_context_transaction(PeeringCtx &ctx, PG *pg,
ThreadPool::TPHandle *handle = NULL);
void discard_context(PeeringCtx &ctx);
- void do_notifies(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >&
- notify_list,
+ void do_notifies(map<int,vector<pg_notify_t>>& notify_list,
OSDMapRef map);
void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
OSDMapRef map);
- void do_infos(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >& info_map,
+ void do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef map);
bool require_mon_peer(const Message *m);
pg_shard_t from;
pg_notify_t notify;
uint64_t features;
- PastIntervals past_intervals;
- MNotifyRec(spg_t p, pg_shard_t from, const pg_notify_t ¬ify, uint64_t f,
- const PastIntervals& pi)
- : pgid(p), from(from), notify(notify), features(f), past_intervals(pi) {}
+ MNotifyRec(spg_t p, pg_shard_t from, const pg_notify_t ¬ify, uint64_t f)
+ : pgid(p), from(from), notify(notify), features(f) {}
void print(std::ostream *out) const {
*out << "MNotifyRec " << pgid << " from " << from << " notify: " << notify
- << " features: 0x" << std::hex << features << std::dec
- << " " << past_intervals;
+ << " features: 0x" << std::hex << features << std::dec;
}
};
from.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
- tinfo),
- past_intervals);
+ tinfo, past_intervals));
}
return found_missing;
}
ObjectStore::Transaction& t,
epoch_t activation_epoch,
map<int, map<spg_t,pg_query_t> >& query_map,
- map<int,
- vector<
- pair<pg_notify_t, PastIntervals> > > *activator_map,
+ map<int,vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx)
{
ceph_assert(!is_peered());
peer.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
- info),
- past_intervals);
+ info,
+ past_intervals));
} else {
psdout(10) << "activate peer osd." << peer
<< " is up to date, but sending pg_log anyway" << dendl;
pg_shard.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
- info),
- past_intervals);
+ info,
+ past_intervals));
pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch());
}
}
notify_info.first.shard, pg_whoami.shard,
query.query_epoch,
get_osdmap_epoch(),
- notify_info.second),
- past_intervals);
+ notify_info.second,
+ past_intervals));
} else {
update_history(query.query.history);
fulfill_log(query.from, query.query, query.query_epoch);
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
- ps->info),
- ps->past_intervals);
+ ps->info,
+ ps->past_intervals));
}
ps->update_heartbeat_peers();
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
- ps->info);
+ ps->info,
+ PastIntervals());
i.info.history.last_epoch_started = evt.activation_epoch;
i.info.history.last_interval_started = i.info.history.same_interval_since;
ps->state_set(PG_STATE_PEERED);
}
- m->pg_list.emplace_back(i, PastIntervals());
+ m->pg_list.emplace_back(i);
pl->send_cluster_message(
ps->get_primary().osd,
m,
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
- ps->info),
- ps->past_intervals);
+ ps->info,
+ ps->past_intervals));
}
return discard_event();
}
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
- ps->info),
- ps->past_intervals);
+ ps->info,
+ ps->past_intervals));
}
return discard_event();
}
// [primary only] content recovery state
struct BufferedRecoveryMessages {
map<int, map<spg_t, pg_query_t> > query_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > info_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
+ map<int, vector<pg_notify_t>> info_map;
+ map<int, vector<pg_notify_t>> notify_list;
BufferedRecoveryMessages() = default;
BufferedRecoveryMessages(PeeringCtx &);
struct PeeringCtxWrapper {
utime_t start_time;
map<int, map<spg_t, pg_query_t> > &query_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > &info_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > ¬ify_list;
+ map<int, vector<pg_notify_t>> &info_map;
+ map<int, vector<pg_notify_t>> ¬ify_list;
ObjectStore::Transaction &transaction;
HBHandle * const handle = nullptr;
PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
- void send_notify(pg_shard_t to,
- const pg_notify_t &info, const PastIntervals &pi) {
- notify_list[to.osd].emplace_back(info, pi);
+ void send_notify(pg_shard_t to, const pg_notify_t &n) {
+ notify_list[to.osd].emplace_back(n);
}
};
public:
return state->rctx->query_map;
}
- map<int, vector<pair<pg_notify_t, PastIntervals> > > &get_info_map() {
+ map<int, vector<pg_notify_t>> &get_info_map() {
ceph_assert(state->rctx);
return state->rctx->info_map;
}
return *(state->rctx);
}
- void send_notify(pg_shard_t to,
- const pg_notify_t &info, const PastIntervals &pi) {
+ void send_notify(pg_shard_t to, const pg_notify_t &n) {
ceph_assert(state->rctx);
- state->rctx->send_notify(to, info, pi);
+ state->rctx->send_notify(to, n);
}
};
friend class PeeringMachine;
ObjectStore::Transaction& t,
epoch_t activation_epoch,
map<int, map<spg_t,pg_query_t> >& query_map,
- map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
+ map<int, vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
// -- pg_notify_t --
void pg_notify_t::encode(ceph::buffer::list &bl) const
{
- ENCODE_START(2, 2, bl);
+ ENCODE_START(3, 2, bl);
encode(query_epoch, bl);
encode(epoch_sent, bl);
encode(info, bl);
encode(to, bl);
encode(from, bl);
+ encode(past_intervals, bl);
ENCODE_FINISH(bl);
}
void pg_notify_t::decode(ceph::buffer::list::const_iterator &bl)
{
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(query_epoch, bl);
decode(epoch_sent, bl);
decode(info, bl);
decode(to, bl);
decode(from, bl);
+ if (struct_v >= 3) {
+ decode(past_intervals, bl);
+ }
DECODE_FINISH(bl);
}
info.dump(f);
f->close_section();
}
+ f->dump_object("past_intervals", past_intervals);
}
void pg_notify_t::generate_test_instances(list<pg_notify_t*>& o)
{
- o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1, pg_info_t()));
- o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(0), 3, 10, pg_info_t()));
+ o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1,
+ pg_info_t(), PastIntervals()));
+ o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(0), 3, 10,
+ pg_info_t(), PastIntervals()));
}
ostream &operator<<(ostream &lhs, const pg_notify_t ¬ify)
notify.to != shard_id_t::NO_SHARD)
lhs << " " << (unsigned)notify.from
<< "->" << (unsigned)notify.to;
+ lhs << " " << notify.past_intervals;
return lhs << ")";
}
WRITE_CLASS_ENCODER(pg_fast_info_t)
-struct pg_notify_t {
- epoch_t query_epoch;
- epoch_t epoch_sent;
- pg_info_t info;
- shard_id_t to;
- shard_id_t from;
- pg_notify_t() :
- query_epoch(0), epoch_sent(0), to(shard_id_t::NO_SHARD),
- from(shard_id_t::NO_SHARD) {}
- pg_notify_t(
- shard_id_t to,
- shard_id_t from,
- epoch_t query_epoch,
- epoch_t epoch_sent,
- const pg_info_t &info)
- : query_epoch(query_epoch),
- epoch_sent(epoch_sent),
- info(info), to(to), from(from) {
- ceph_assert(from == info.pgid.shard);
- }
- void encode(ceph::buffer::list &bl) const;
- void decode(ceph::buffer::list::const_iterator &p);
- void dump(ceph::Formatter *f) const;
- static void generate_test_instances(std::list<pg_notify_t*> &o);
-};
-WRITE_CLASS_ENCODER(pg_notify_t)
-std::ostream &operator<<(std::ostream &lhs, const pg_notify_t ¬ify);
-
-
class OSDMap;
/**
* PastIntervals -- information needed to determine the PriorSet and
<< dendl;
}
+struct pg_notify_t {
+ epoch_t query_epoch;
+ epoch_t epoch_sent;
+ pg_info_t info;
+ shard_id_t to;
+ shard_id_t from;
+ PastIntervals past_intervals;
+ pg_notify_t() :
+ query_epoch(0), epoch_sent(0), to(shard_id_t::NO_SHARD),
+ from(shard_id_t::NO_SHARD) {}
+ pg_notify_t(
+ shard_id_t to,
+ shard_id_t from,
+ epoch_t query_epoch,
+ epoch_t epoch_sent,
+ const pg_info_t &info,
+ const PastIntervals& pi)
+ : query_epoch(query_epoch),
+ epoch_sent(epoch_sent),
+ info(info), to(to), from(from),
+ past_intervals(pi) {
+ ceph_assert(from == info.pgid.shard);
+ }
+ void encode(ceph::buffer::list &bl) const;
+ void decode(ceph::buffer::list::const_iterator &p);
+ void dump(ceph::Formatter *f) const;
+ static void generate_test_instances(std::list<pg_notify_t*> &o);
+};
+WRITE_CLASS_ENCODER(pg_notify_t)
+std::ostream &operator<<(std::ostream &lhs, const pg_notify_t ¬ify);
+
+
/**
* pg_query_t - used to ask a peer for information about a pg.
*