class MOSDPGLog : public Message {
epoch_t epoch;
+ /// query_epoch is the epoch of the query being responded to, or
+ /// the current epoch if this is not being sent in response to a
+ /// query. This allows the recipient to disregard responses to old
+ /// queries.
+ epoch_t query_epoch;
public:
PG::Info info;
epoch_t get_epoch() { return epoch; }
pg_t get_pgid() { return info.pgid; }
+ epoch_t get_query_epoch() { return query_epoch; }
MOSDPGLog() {}
MOSDPGLog(version_t mv, PG::Info& i) :
Message(MSG_OSD_PG_LOG),
- epoch(mv), info(i) { }
+ epoch(mv), query_epoch(mv), info(i) { }
+ MOSDPGLog(version_t mv, PG::Info& i, epoch_t query_epoch) :
+ Message(MSG_OSD_PG_LOG),
+ epoch(mv), query_epoch(query_epoch), info(i) { }
private:
~MOSDPGLog() {}
public:
const char *get_type_name() { return "PGlog"; }
void print(ostream& out) {
- out << "pg_log(" << info.pgid << " e" << epoch << ")";
+ out << "pg_log(" << info.pgid << " e" << epoch
+ << " query epoch" << query_epoch << ")";
}
void encode_payload(CephContext *cct) {
+ header.version = 2;
::encode(epoch, payload);
::encode(info, payload);
::encode(log, payload);
::encode(missing, payload);
+ ::encode(query_epoch, payload);
}
void decode_payload(CephContext *cct) {
bufferlist::iterator p = payload.begin();
::decode(info, p);
::decode(log, p);
::decode(missing, p);
+ if (header.version > 1) {
+ ::decode(query_epoch, p);
+ }
}
};
*/
class MOSDPGNotify : public Message {
- epoch_t epoch;
+ epoch_t epoch;
+ /// query_epoch is the epoch of the query being responded to, or
+ /// the current epoch if this is not being sent in response to a
+ /// query. This allows the recipient to disregard responses to old
+ /// queries.
+ epoch_t query_epoch;
vector<PG::Info> pg_list; // pgid -> version
public:
version_t get_epoch() { return epoch; }
vector<PG::Info>& get_pg_list() { return pg_list; }
+ epoch_t get_query_epoch() { return query_epoch; }
MOSDPGNotify() {}
- MOSDPGNotify(epoch_t e, vector<PG::Info>& l) :
- Message(MSG_OSD_PG_NOTIFY) {
- this->epoch = e;
+ MOSDPGNotify(epoch_t e, vector<PG::Info>& l, epoch_t query_epoch) :
+ Message(MSG_OSD_PG_NOTIFY), epoch(e),
+ query_epoch(query_epoch) {
pg_list.swap(l);
}
private:
const char *get_type_name() { return "PGnot"; }
void encode_payload(CephContext *cct) {
+ header.version = 2;
::encode(epoch, payload);
::encode(pg_list, payload);
+ ::encode(query_epoch, payload);
}
void decode_payload(CephContext *cct) {
bufferlist::iterator p = payload.begin();
::decode(epoch, p);
::decode(pg_list, p);
+ if (header.version > 1) {
+ ::decode(query_epoch, p);
+ }
}
void print(ostream& out) {
- out << "osd pg notify(" << "epoch " << epoch << "; ";
+ out << "osd pg notify(" << "epoch " << epoch
+ << "query epoch " << query_epoch << "; ";
for (vector<PG::Info>::iterator i = pg_list.begin();
i != pg_list.end();
++i) {
pg->unlock();
}
- do_notifies(notify_list); // notify? (residual|replica)
+ do_notifies(notify_list, osdmap->get_epoch()); // notify? (residual|replica)
do_queries(query_map);
do_infos(info_map);
* content for, and they are primary for.
*/
-void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list)
+void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list,
+ epoch_t query_epoch)
{
for (map< int, vector<PG::Info> >::iterator it = notify_list.begin();
it != notify_list.end();
dout(7) << "do_notify osd." << it->first << " is self, skipping" << dendl;
continue;
}
- dout(7) << "do_notify osd." << it->first << " on " << it->second.size() << " PGs" << dendl;
- MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
+ dout(7) << "do_notify osd." << it->first
+ << " on " << it->second.size() << " PGs" << dendl;
+ MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(),
+ it->second,
+ query_epoch);
_share_map_outgoing(osdmap->get_cluster_inst(it->first));
cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
}
if (!pg)
continue;
- if (pg->old_peering_msg(m->get_epoch())) {
+ if (pg->old_peering_msg(m->get_epoch(), m->get_query_epoch())) {
dout(10) << "ignoring old peering message " << *m << dendl;
pg->unlock();
delete t;
return;
}
- if (pg->old_peering_msg(m->get_epoch())) {
+ if (pg->old_peering_msg(m->get_epoch(), m->get_query_epoch())) {
dout(10) << "ignoring old peering message " << *m << dendl;
pg->unlock();
delete t;
if (!pg)
continue;
- if (pg->old_peering_msg(m->get_epoch())) {
+ if (pg->old_peering_msg(m->get_epoch(), m->get_epoch())) {
dout(10) << "ignoring old peering message " << *m << dendl;
pg->unlock();
delete t;
if (it->second.type == PG::Query::LOG ||
it->second.type == PG::Query::BACKLOG ||
it->second.type == PG::Query::FULLLOG) {
- MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty);
+ MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
+ m->get_epoch());
_share_map_outgoing(osdmap->get_cluster_inst(from));
cluster_messenger->send_message(mlog,
osdmap->get_cluster_inst(from));
continue;
}
- if (pg->old_peering_msg(m->get_epoch())) {
+ if (pg->old_peering_msg(m->get_epoch(), m->get_epoch())) {
dout(10) << "ignoring old peering message " << *m << dendl;
pg->unlock();
continue;
// ok, process query!
PG::RecoveryCtx rctx(0, 0, ¬ify_list, 0, 0);
- pg->handle_query(from, it->second, &rctx);
+ pg->handle_query(from, it->second, m->get_epoch(), &rctx);
pg->unlock();
}
- do_notifies(notify_list);
+ do_notifies(notify_list, m->get_epoch());
m->put();
}
// -- generic pg peering --
- void do_notifies(map< int, vector<PG::Info> >& notify_list);
+ void do_notifies(map< int, vector<PG::Info> >& notify_list,
+ epoch_t query_epoch);
void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
void do_infos(map<int, MOSDPGInfo*>& info_map);
void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
notify_info = make_pair(from, info);
}
-void PG::fulfill_log(int from, const Query &query)
+void PG::fulfill_log(int from, const Query &query, epoch_t query_epoch)
{
assert(!acting.empty());
assert(from == acting[0]);
!log.backlog) {
assert(0); // generated in the state machine
} else {
- MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+ MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(),
+ info, query_epoch);
mlog->missing = missing;
-
+
// primary -> other, when building master log
if (query.type == PG::Query::LOG) {
dout(10) << " sending info+missing+log since " << query.since
dout(10) << " sending info+missing+full log" << dendl;
mlog->log.copy_non_backlog(log);
}
-
+
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
-
+
osd->_share_map_outgoing(osd->osdmap->get_cluster_inst(from));
osd->cluster_messenger->send_message(mlog,
osd->osdmap->get_cluster_inst(from));
}
}
-bool PG::old_peering_msg(const epoch_t &msg_epoch)
+bool PG::old_peering_msg(epoch_t reply_epoch,
+ epoch_t query_epoch)
{
- return (last_peering_reset > msg_epoch);
+ return (last_peering_reset > reply_epoch ||
+ last_peering_reset > query_epoch);
}
void PG::set_last_peering_reset() {
PG::RecoveryState::ReplicaActive::react(const MQuery& query) {
PG *pg = context< RecoveryMachine >().pg;
assert(query.query.type == Query::MISSING);
- pg->fulfill_log(query.from, query.query);
+ pg->fulfill_log(query.from, query.query, query.query_epoch);
return discard_event();
}
PG *pg = context< RecoveryMachine >().pg;
assert(backlog_requested);
dout(10) << "BacklogComplete" << dendl;
- for (map<int, Query>::iterator i = pending_queries.begin();
+ for (map<int, pair<Query, epoch_t> >::iterator i = pending_queries.begin();
i != pending_queries.end();
pending_queries.erase(i++)) {
dout(10) << "sending log to " << i->first << dendl;
- pg->fulfill_log(i->first, i->second);
+ pg->fulfill_log(i->first, i->second.first, i->second.second);
}
backlog_requested = false;
return discard_event();
if (!pg->log.backlog) {
dout(10) << "Stray, need a backlog!"
<< dendl;
- pending_queries[query.from] = query.query;
+ pending_queries[query.from] = pair<Query, epoch_t>(query.query,
+ query.query_epoch);
if (!backlog_requested) {
dout(10) << "Stray, generating a backlog!"
<< dendl;
pg->fulfill_info(query.from, query.query, notify_info);
context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
} else {
- pg->fulfill_log(query.from, query.query);
+ pg->fulfill_log(query.from, query.query, query.query_epoch);
}
return discard_event();
}
}
void PG::RecoveryState::handle_query(int from, const PG::Query& q,
+ epoch_t query_epoch,
RecoveryCtx *rctx)
{
dout(10) << "handle_query " << q << " from osd." << from << dendl;
start_handle(rctx);
- machine.process_event(MQuery(from, q));
+ machine.process_event(MQuery(from, q, query_epoch));
end_handle();
}
struct MQuery : boost::statechart::event< MQuery > {
int from;
const Query &query;
- MQuery(int from, const Query &query):
- from(from), query(query) {}
+ epoch_t query_epoch;
+ MQuery(int from, const Query &query, epoch_t query_epoch):
+ from(from), query(query), query_epoch(query_epoch) {}
};
struct AdvMap : boost::statechart::event< AdvMap > {
struct Stray : boost::statechart::state< Stray, Started >, NamedState {
bool backlog_requested;
- map<int, Query> pending_queries;
+ map<int, pair<Query, epoch_t> > pending_queries;
Stray(my_context ctx);
void exit();
void handle_log(int from,
MOSDPGLog *msg,
RecoveryCtx *ctx);
- void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx);
+ void handle_query(int from, const PG::Query& q,
+ epoch_t query_epoch,
+ RecoveryCtx *ctx);
void handle_advance_map(OSDMap *osdmap, OSDMap *lastmap,
vector<int>& newup, vector<int>& newacting,
RecoveryCtx *ctx);
void fulfill_info(int from, const Query &query,
pair<int, Info> ¬ify_info);
- void fulfill_log(int from, const Query &query);
+ void fulfill_log(int from, const Query &query, epoch_t query_epoch);
bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
- bool old_peering_msg(const epoch_t &msg_epoch);
+ bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
// recovery bits
void handle_notify(int from, PG::Info& i, RecoveryCtx *rctx) {
RecoveryCtx *rctx) {
recovery_state.handle_log(from, msg, rctx);
}
- void handle_query(int from, const PG::Query& q, RecoveryCtx *rctx) {
- recovery_state.handle_query(from, q, rctx);
+ void handle_query(int from, const PG::Query& q,
+ epoch_t query_epoch,
+ RecoveryCtx *rctx) {
+ recovery_state.handle_query(from, q, query_epoch, rctx);
}
void handle_advance_map(OSDMap *osdmap, OSDMap *lastmap,
vector<int>& newup, vector<int>& newacting,