]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: make sure table request id unique
authorYan, Zheng <zheng.z.yan@intel.com>
Sat, 16 Mar 2013 00:02:18 +0000 (08:02 +0800)
committerGreg Farnum <greg@inktank.com>
Mon, 1 Apr 2013 16:16:48 +0000 (09:16 -0700)
When a MDS becomes active, the table server re-sends 'agree' messages
for old prepared request. If the recoverd MDS starts a new table request
at the same time, The new request's ID can happen to be the same as old
prepared request's ID, because current table client code assigns request
ID from zero after MDS restarts.

This patch make table server send 'ready' messages when table clients
become active or itself becomes active. The 'ready' message updates
table client's last_reqid to avoid request ID collision. The message
also replaces the roles of finish_recovery() and handle_mds_recovery()
callbacks for table client.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: Greg Farnum <greg@inktank.com>
src/mds/MDS.cc
src/mds/MDSTableClient.cc
src/mds/MDSTableClient.h
src/mds/MDSTableServer.cc
src/mds/MDSTableServer.h
src/mds/mds_table_types.h

index 3b9c8e98201915c2c1d33090677a81bb1a38418a..32bb06456d4e00dae6920fdc67a22a52487e00b0 100644 (file)
@@ -1514,14 +1514,14 @@ void MDS::recovery_done()
   
   // kick anchortable (resent AGREEs)
   if (mdsmap->get_tableserver() == whoami) {
-    anchorserver->finish_recovery();
-    snapserver->finish_recovery();
+    set<int> active;
+    mdsmap->get_mds_set(active, MDSMap::STATE_CLIENTREPLAY);
+    mdsmap->get_mds_set(active, MDSMap::STATE_ACTIVE);
+    mdsmap->get_mds_set(active, MDSMap::STATE_STOPPING);
+    anchorserver->finish_recovery(active);
+    snapserver->finish_recovery(active);
   }
-  
-  // kick anchorclient (resent COMMITs)
-  anchorclient->finish_recovery();
-  snapclient->finish_recovery();
-  
+
   mdcache->start_recovered_truncates();
   mdcache->do_file_recover();
 
@@ -1539,13 +1539,11 @@ void MDS::handle_mds_recovery(int who)
   
   mdcache->handle_mds_recovery(who);
 
-  if (anchorserver) {
+  if (mdsmap->get_tableserver() == whoami) {
     anchorserver->handle_mds_recovery(who);
     snapserver->handle_mds_recovery(who);
   }
-  anchorclient->handle_mds_recovery(who);
-  snapclient->handle_mds_recovery(who);
-  
+
   queue_waiters(waiting_for_active_peer[who]);
   waiting_for_active_peer.erase(who);
 }
index ea021f5d320701288fdbdc4e680f0a95e8e1314a..12331f910c313d285653437e2d371cacc584340a 100644 (file)
@@ -101,6 +101,15 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m)
     }
     break;
 
+  case TABLESERVER_OP_SERVER_READY:
+    if (last_reqid == ~0ULL)
+      last_reqid = reqid;
+
+    resend_queries();
+    resend_prepares();
+    resend_commits();
+    break;
+
   default:
     assert(0);
   }
@@ -126,6 +135,12 @@ void MDSTableClient::_logged_ack(version_t tid)
 void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
                              Context *onfinish)
 {
+  if (last_reqid == ~0ULL) {
+    dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
+    waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation));
+    return;
+  }
+
   uint64_t reqid = ++last_reqid;
   dout(10) << "_prepare " << reqid << dendl;
 
@@ -176,6 +191,7 @@ void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
   ls->pending_commit_tids[table].insert(tid);
   pending_commit[tid] = ls;
 }
+
 void MDSTableClient::got_journaled_ack(version_t tid)
 {
   dout(10) << "got_journaled_ack " << tid << dendl;
@@ -185,12 +201,6 @@ void MDSTableClient::got_journaled_ack(version_t tid)
   }
 }
 
-void MDSTableClient::finish_recovery()
-{
-  dout(7) << "finish_recovery" << dendl;
-  resend_commits();
-}
-
 void MDSTableClient::resend_commits()
 {
   for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
@@ -202,24 +212,19 @@ void MDSTableClient::resend_commits()
   }
 }
 
-void MDSTableClient::handle_mds_recovery(int who)
+void MDSTableClient::resend_prepares()
 {
-  dout(7) << "handle_mds_recovery mds." << who << dendl;
-
-  if (who != mds->mdsmap->get_tableserver()) 
-    return; // do nothing.
+  while (!waiting_for_reqid.empty()) {
+    pending_prepare[++last_reqid] = waiting_for_reqid.front();
+    waiting_for_reqid.pop_front();
+  }
 
-  resend_queries();
-  
-  // prepares.
   for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin();
        p != pending_prepare.end();
        ++p) {
-    dout(10) << "resending " << p->first << dendl;
+    dout(10) << "resending prepare on " << p->first << dendl;
     MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, p->first);
     req->bl = p->second.mutation;
     mds->send_message_mds(req, mds->mdsmap->get_tableserver());
-  } 
-
-  resend_commits();
+  }
 }
index e15837f26a198df8645210d68adf59169b05fe4b..934f5fea5ebc0625e41d4a5d3ba8483cff1b6f11 100644 (file)
@@ -38,9 +38,12 @@ protected:
     bufferlist mutation;
 
     _pending_prepare() : onfinish(0), ptid(0), pbl(0) {}
+    _pending_prepare(Context *c, version_t *pt, bufferlist *pb, bufferlist& m) :
+      onfinish(c), ptid(pt), pbl(pb), mutation(m) {}
   };
 
   map<uint64_t, _pending_prepare> pending_prepare;
+  list<_pending_prepare> waiting_for_reqid;
 
   // pending commits
   map<version_t, LogSegment*> pending_commit;
@@ -60,7 +63,7 @@ protected:
   void _logged_ack(version_t tid);
 
 public:
-  MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(0) {}
+  MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(~0ULL) {}
   virtual ~MDSTableClient() {}
 
   void handle_request(MMDSTableRequest *m);
@@ -68,9 +71,8 @@ public:
   void _prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl, Context *onfinish);
   void commit(version_t tid, LogSegment *ls);
 
-  // for recovery (by other nodes)
-  void handle_mds_recovery(int mds); // called when someone else recovers
   void resend_commits();
+  void resend_prepares();
 
   // for recovery (by me)
   void got_journaled_agree(version_t tid, LogSegment *ls);
@@ -82,7 +84,6 @@ public:
   void wait_for_ack(version_t tid, Context *c) {
     ack_waiters[tid].push_back(c);
   }
-  void finish_recovery();                // called when i recover and go active
 
   void send_to_tableserver(MMDSTableRequest *req);
 
index 4f86ff1dacbdb0e6ccec86a1e97cd05b033dbbdb..00bea5e14f19ea08078cf303501ffe627fe84ba6 100644 (file)
@@ -144,24 +144,30 @@ void MDSTableServer::do_server_update(bufferlist& bl)
 
 // recovery
 
-void MDSTableServer::finish_recovery()
+void MDSTableServer::finish_recovery(set<int>& active)
 {
   dout(7) << "finish_recovery" << dendl;
-  handle_mds_recovery(-1);  // resend agrees for everyone.
+  for (set<int>::iterator p = active.begin(); p != active.end(); ++p)
+    handle_mds_recovery(*p);  // resend agrees for everyone.
 }
 
 void MDSTableServer::handle_mds_recovery(int who)
 {
-  if (who >= 0)
-    dout(7) << "handle_mds_recovery mds." << who << dendl;
-  
+  dout(7) << "handle_mds_recovery mds." << who << dendl;
+
+  uint64_t next_reqid = 0;
   // resend agrees for recovered mds
   for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
        p != pending_for_mds.end();
        ++p) {
-    if (who >= 0 && p->second.mds != who)
+    if (p->second.mds != who)
       continue;
+    if (p->second.reqid >= next_reqid)
+      next_reqid = p->second.reqid + 1;
     MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
-    mds->send_message_mds(reply, p->second.mds);
+    mds->send_message_mds(reply, who);
   }
+
+  MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
+  mds->send_message_mds(reply, who);
 }
index 26cd5944844b2e787ab693d684f6893f1d4d46ab..55827e7c0b97adb42f020901cec707fab37406ff 100644 (file)
@@ -90,7 +90,7 @@ private:
   }
 
   // recovery
-  void finish_recovery();
+  void finish_recovery(set<int>& active);
   void handle_mds_recovery(int who);
 };
 
index b094c752565e5a6fbdfde9a725773a19a57f889e..c08519a81b834a11df856da73fb84da42c849d21 100644 (file)
@@ -39,6 +39,7 @@ enum {
   TABLESERVER_OP_ACK          = -6,
   TABLESERVER_OP_ROLLBACK     =  7,
   TABLESERVER_OP_SERVER_UPDATE = 8,
+  TABLESERVER_OP_SERVER_READY = -9,
 };
 
 inline const char *get_mdstableserver_opname(int op) {
@@ -51,6 +52,7 @@ inline const char *get_mdstableserver_opname(int op) {
   case TABLESERVER_OP_ACK: return "ack";
   case TABLESERVER_OP_ROLLBACK: return "rollback";
   case TABLESERVER_OP_SERVER_UPDATE: return "server_update";
+  case TABLESERVER_OP_SERVER_READY: return "server_ready";
   default: assert(0); return 0;
   }
 };