]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd, pg: ignore responses to obsolete queries
authorJosh Durgin <josh.durgin@dreamhost.com>
Thu, 6 Oct 2011 00:07:07 +0000 (17:07 -0700)
committerJosh Durgin <josh.durgin@dreamhost.com>
Thu, 6 Oct 2011 22:45:36 +0000 (15:45 -0700)
This adds a query_epoch to notify and log messages, which are
sent in response to queries from the primary during peering. To
guarantee we don't try to process old logs and notifies after
restarting peering, query_epoch is set to the epoch at which the
query was sent. If query_epoch is less than last_peering_reset,
the primary discards the message.

This caused a "bad state machine event" crash in the following
scenario:

1. Primary tells a stray to generate a backlog at epoch 199.
2. The up set changes because a stray goes up.
3. Primary restarts peering at epoch 200.
4. Stray gets new map for epoch 200, sees that acting set did not
change, and sends log to primary.
5. Primary crashes.

Related to #1403, #1449
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
src/messages/MOSDPGLog.h
src/messages/MOSDPGNotify.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index b733b8d18846949695d7a635551f768f04bde883..d328db1d8d477c7a6be4c46afb1773319c5b4ecd 100644 (file)
 
 class MOSDPGLog : public Message {
   epoch_t epoch;
+  /// query_epoch is the epoch of the query being responded to, or
+  /// the current epoch if this is not being sent in response to a
+  /// query. This allows the recipient to disregard responses to old
+  /// queries.
+  epoch_t query_epoch;
 
 public:
   PG::Info info;
@@ -28,25 +33,32 @@ public:
 
   epoch_t get_epoch() { return epoch; }
   pg_t get_pgid() { return info.pgid; }
+  epoch_t get_query_epoch() { return query_epoch; }
 
   MOSDPGLog() {}
   MOSDPGLog(version_t mv, PG::Info& i) :
     Message(MSG_OSD_PG_LOG),
-    epoch(mv), info(i) { }
+    epoch(mv), query_epoch(mv), info(i)  { }
+  MOSDPGLog(version_t mv, PG::Info& i, epoch_t query_epoch) :
+    Message(MSG_OSD_PG_LOG),
+    epoch(mv), query_epoch(query_epoch), info(i)  { }
 private:
   ~MOSDPGLog() {}
 
 public:
   const char *get_type_name() { return "PGlog"; }
   void print(ostream& out) {
-    out << "pg_log(" << info.pgid << " e" << epoch << ")";
+    out << "pg_log(" << info.pgid << " e" << epoch
+       << " query epoch" << query_epoch << ")";
   }
 
   void encode_payload(CephContext *cct) {
+    header.version = 2;
     ::encode(epoch, payload);
     ::encode(info, payload);
     ::encode(log, payload);
     ::encode(missing, payload);
+    ::encode(query_epoch, payload);
   }
   void decode_payload(CephContext *cct) {
     bufferlist::iterator p = payload.begin();
@@ -54,6 +66,9 @@ public:
     ::decode(info, p);
     ::decode(log, p);
     ::decode(missing, p);
+    if (header.version > 1) {
+      ::decode(query_epoch, p);
+    }
   }
 };
 
index 27279d24bb5ecee39600e6282334acdcfdc9e307..b5fc12e058652bfb1847d9559f9222bfc921e184 100644 (file)
  */
 
 class MOSDPGNotify : public Message {
-  epoch_t      epoch;
+  epoch_t epoch;
+  /// query_epoch is the epoch of the query being responded to, or
+  /// the current epoch if this is not being sent in response to a
+  /// query. This allows the recipient to disregard responses to old
+  /// queries.
+  epoch_t query_epoch;
   vector<PG::Info> pg_list;   // pgid -> version
 
  public:
   version_t get_epoch() { return epoch; }
   vector<PG::Info>& get_pg_list() { return pg_list; }
+  epoch_t get_query_epoch() { return query_epoch; }
 
   MOSDPGNotify() {}
-  MOSDPGNotify(epoch_t e, vector<PG::Info>& l) :
-    Message(MSG_OSD_PG_NOTIFY) {
-    this->epoch = e;
+  MOSDPGNotify(epoch_t e, vector<PG::Info>& l, epoch_t query_epoch) :
+    Message(MSG_OSD_PG_NOTIFY), epoch(e),
+    query_epoch(query_epoch) {
     pg_list.swap(l);
   }
 private:
@@ -44,16 +50,22 @@ public:
   const char *get_type_name() { return "PGnot"; }
 
   void encode_payload(CephContext *cct) {
+    header.version = 2;
     ::encode(epoch, payload);
     ::encode(pg_list, payload);
+    ::encode(query_epoch, payload);
   }
   void decode_payload(CephContext *cct) {
     bufferlist::iterator p = payload.begin();
     ::decode(epoch, p);
     ::decode(pg_list, p);
+    if (header.version > 1) {
+      ::decode(query_epoch, p);
+    }
   }
   void print(ostream& out) {
-    out << "osd pg notify(" << "epoch " << epoch << "; ";
+    out << "osd pg notify(" << "epoch " << epoch
+       << "query epoch " << query_epoch << "; ";
     for (vector<PG::Info>::iterator i = pg_list.begin();
          i != pg_list.end();
          ++i) {
index 823f0ce61739f18cdca9ac32c0cc2270119ccf0b..3cbfc9c4e6a081bda470d6eff30a6f7414098ba1 100644 (file)
@@ -3500,7 +3500,7 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
     pg->unlock();
   }  
 
-  do_notifies(notify_list);  // notify? (residual|replica)
+  do_notifies(notify_list, osdmap->get_epoch());  // notify? (residual|replica)
   do_queries(query_map);
   do_infos(info_map);
 
@@ -4117,7 +4117,8 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
  * content for, and they are primary for.
  */
 
-void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list) 
+void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list,
+                     epoch_t query_epoch)
 {
   for (map< int, vector<PG::Info> >::iterator it = notify_list.begin();
        it != notify_list.end();
@@ -4126,8 +4127,11 @@ void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list)
       dout(7) << "do_notify osd." << it->first << " is self, skipping" << dendl;
       continue;
     }
-    dout(7) << "do_notify osd." << it->first << " on " << it->second.size() << " PGs" << dendl;
-    MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
+    dout(7) << "do_notify osd." << it->first
+           << " on " << it->second.size() << " PGs" << dendl;
+    MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(),
+                                      it->second,
+                                      query_epoch);
     _share_map_outgoing(osdmap->get_cluster_inst(it->first));
     cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
   }
@@ -4199,7 +4203,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
     if (!pg)
       continue;
 
-    if (pg->old_peering_msg(m->get_epoch())) {
+    if (pg->old_peering_msg(m->get_epoch(), m->get_query_epoch())) {
       dout(10) << "ignoring old peering message " << *m << dendl;
       pg->unlock();
       delete t;
@@ -4246,7 +4250,7 @@ void OSD::handle_pg_log(MOSDPGLog *m)
     return;
   }
 
-  if (pg->old_peering_msg(m->get_epoch())) {
+  if (pg->old_peering_msg(m->get_epoch(), m->get_query_epoch())) {
     dout(10) << "ignoring old peering message " << *m << dendl;
     pg->unlock();
     delete t;
@@ -4293,7 +4297,7 @@ void OSD::handle_pg_info(MOSDPGInfo *m)
     if (!pg)
       continue;
 
-    if (pg->old_peering_msg(m->get_epoch())) {
+    if (pg->old_peering_msg(m->get_epoch(), m->get_epoch())) {
       dout(10) << "ignoring old peering message " << *m << dendl;
       pg->unlock();
       delete t;
@@ -4434,7 +4438,8 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       if (it->second.type == PG::Query::LOG ||
          it->second.type == PG::Query::BACKLOG ||
          it->second.type == PG::Query::FULLLOG) {
-       MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty);
+       MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
+                                       m->get_epoch());
        _share_map_outgoing(osdmap->get_cluster_inst(from));
        cluster_messenger->send_message(mlog,
                                        osdmap->get_cluster_inst(from));
@@ -4453,7 +4458,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       continue;
     }
 
-    if (pg->old_peering_msg(m->get_epoch())) {
+    if (pg->old_peering_msg(m->get_epoch(), m->get_epoch())) {
       dout(10) << "ignoring old peering message " << *m << dendl;
       pg->unlock();
       continue;
@@ -4479,11 +4484,11 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
 
     // ok, process query!
     PG::RecoveryCtx rctx(0, 0, &notify_list, 0, 0);
-    pg->handle_query(from, it->second, &rctx);
+    pg->handle_query(from, it->second, m->get_epoch(), &rctx);
     pg->unlock();
   }
   
-  do_notifies(notify_list);   
+  do_notifies(notify_list, m->get_epoch());
 
   m->put();
 }
index ae65b030c03fa23d5e5e98dc4e755e91dd472424..f24c609b35998893eb1716ad4c6a2dbf19acf5d8 100644 (file)
@@ -573,7 +573,8 @@ protected:
 
 
   // -- generic pg peering --
-  void do_notifies(map< int, vector<PG::Info> >& notify_list);
+  void do_notifies(map< int, vector<PG::Info> >& notify_list,
+                  epoch_t query_epoch);
   void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
   void do_infos(map<int, MOSDPGInfo*>& info_map);
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
index d52b1b402034acb881a5538be099a72cb812181b..f664d76b2b4f265243eb1d48629cabd884007f5a 100644 (file)
@@ -3479,7 +3479,7 @@ void PG::fulfill_info(int from, const Query &query,
   notify_info = make_pair(from, info);
 }
 
-void PG::fulfill_log(int from, const Query &query)
+void PG::fulfill_log(int from, const Query &query, epoch_t query_epoch)
 {
   assert(!acting.empty());
   assert(from == acting[0]);
@@ -3488,9 +3488,10 @@ void PG::fulfill_log(int from, const Query &query)
       !log.backlog) {
     assert(0); // generated in the state machine
   } else {
-    MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+    MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(),
+                                   info, query_epoch);
     mlog->missing = missing;
-       
+
     // primary -> other, when building master log
     if (query.type == PG::Query::LOG) {
       dout(10) << " sending info+missing+log since " << query.since
@@ -3512,9 +3513,9 @@ void PG::fulfill_log(int from, const Query &query)
       dout(10) << " sending info+missing+full log" << dendl;
       mlog->log.copy_non_backlog(log);
     }
-       
+
     dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
-       
+
     osd->_share_map_outgoing(osd->osdmap->get_cluster_inst(from));
     osd->cluster_messenger->send_message(mlog, 
                                         osd->osdmap->get_cluster_inst(from));
@@ -3530,9 +3531,11 @@ bool PG::acting_up_affected(const vector<int>& newup, const vector<int>& newacti
   }
 }
 
-bool PG::old_peering_msg(const epoch_t &msg_epoch)
+bool PG::old_peering_msg(epoch_t reply_epoch,
+                        epoch_t query_epoch)
 {
-  return (last_peering_reset > msg_epoch);
+  return (last_peering_reset > reply_epoch ||
+         last_peering_reset > query_epoch);
 }
 
 void PG::set_last_peering_reset() {
@@ -4297,7 +4300,7 @@ boost::statechart::result
 PG::RecoveryState::ReplicaActive::react(const MQuery& query) {
   PG *pg = context< RecoveryMachine >().pg;
   assert(query.query.type == Query::MISSING);
-  pg->fulfill_log(query.from, query.query);
+  pg->fulfill_log(query.from, query.query, query.query_epoch);
   return discard_event();
 }
 
@@ -4353,11 +4356,11 @@ PG::RecoveryState::Stray::react(const BacklogComplete&) {
   PG *pg = context< RecoveryMachine >().pg;
   assert(backlog_requested);
   dout(10) << "BacklogComplete" << dendl;
-  for (map<int, Query>::iterator i = pending_queries.begin();
+  for (map<int, pair<Query, epoch_t> >::iterator i = pending_queries.begin();
        i != pending_queries.end();
        pending_queries.erase(i++)) {
     dout(10) << "sending log to " << i->first << dendl;
-    pg->fulfill_log(i->first, i->second);
+    pg->fulfill_log(i->first, i->second.first, i->second.second);
   }
   backlog_requested = false;
   return discard_event();
@@ -4370,7 +4373,8 @@ PG::RecoveryState::Stray::react(const MQuery& query) {
     if (!pg->log.backlog) {
       dout(10) << "Stray, need a backlog!" 
               << dendl;
-      pending_queries[query.from] = query.query;
+      pending_queries[query.from] = pair<Query, epoch_t>(query.query,
+                                                        query.query_epoch);
       if (!backlog_requested) {
        dout(10) << "Stray, generating a backlog!" 
                 << dendl;
@@ -4386,7 +4390,7 @@ PG::RecoveryState::Stray::react(const MQuery& query) {
     pg->fulfill_info(query.from, query.query, notify_info);
     context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
   } else {
-    pg->fulfill_log(query.from, query.query);
+    pg->fulfill_log(query.from, query.query, query.query_epoch);
   }
   return discard_event();
 }
@@ -4775,11 +4779,12 @@ void PG::RecoveryState::handle_log(int from,
 }
 
 void PG::RecoveryState::handle_query(int from, const PG::Query& q,
+                                    epoch_t query_epoch,
                                     RecoveryCtx *rctx)
 {
   dout(10) << "handle_query " << q << " from osd." << from << dendl;
   start_handle(rctx);
-  machine.process_event(MQuery(from, q));
+  machine.process_event(MQuery(from, q, query_epoch));
   end_handle();
 }
 
index c1ecce0461a4bf66e935496493df490982682b2a..38e22d5d094fa2d42e54540422bcc85370852da5 100644 (file)
@@ -928,8 +928,9 @@ public:
     struct MQuery : boost::statechart::event< MQuery > {
       int from;
       const Query &query;
-      MQuery(int from, const Query &query):
-       from(from), query(query) {}
+      epoch_t query_epoch;
+      MQuery(int from, const Query &query, epoch_t query_epoch):
+       from(from), query(query), query_epoch(query_epoch) {}
     };
 
     struct AdvMap : boost::statechart::event< AdvMap > {
@@ -1178,7 +1179,7 @@ public:
 
     struct Stray : boost::statechart::state< Stray, Started >, NamedState {
       bool backlog_requested;
-      map<int, Query> pending_queries;
+      map<int, pair<Query, epoch_t> > pending_queries;
 
       Stray(my_context ctx);
       void exit();
@@ -1281,7 +1282,9 @@ public:
     void handle_log(int from,
                    MOSDPGLog *msg,
                    RecoveryCtx *ctx);
-    void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx);
+    void handle_query(int from, const PG::Query& q,
+                     epoch_t query_epoch,
+                     RecoveryCtx *ctx);
     void handle_advance_map(OSDMap *osdmap, OSDMap *lastmap, 
                            vector<int>& newup, vector<int>& newacting, 
                            RecoveryCtx *ctx);
@@ -1601,9 +1604,9 @@ public:
 
   void fulfill_info(int from, const Query &query, 
                    pair<int, Info> &notify_info);
-  void fulfill_log(int from, const Query &query);
+  void fulfill_log(int from, const Query &query, epoch_t query_epoch);
   bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
-  bool old_peering_msg(const epoch_t &msg_epoch);
+  bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
 
   // recovery bits
   void handle_notify(int from, PG::Info& i, RecoveryCtx *rctx) {
@@ -1617,8 +1620,10 @@ public:
                  RecoveryCtx *rctx) {
     recovery_state.handle_log(from, msg, rctx);
   }
-  void handle_query(int from, const PG::Query& q, RecoveryCtx *rctx) {
-    recovery_state.handle_query(from, q, rctx);
+  void handle_query(int from, const PG::Query& q,
+                   epoch_t query_epoch,
+                   RecoveryCtx *rctx) {
+    recovery_state.handle_query(from, q, query_epoch, rctx);
   }
   void handle_advance_map(OSDMap *osdmap, OSDMap *lastmap, 
                          vector<int>& newup, vector<int>& newacting,