]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* client idempotent ops
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Apr 2007 20:41:09 +0000 (20:41 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Apr 2007 20:41:09 +0000 (20:41 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1337 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/cephmds2/TODO
branches/sage/cephmds2/client/Client.cc
branches/sage/cephmds2/mds/ClientMap.h
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MClientRequest.h

index 3ef48543d45bb477c8f0e162d517c4fc9a73c579..787de608a803b624674116f6cf7b126acf86eb23 100644 (file)
@@ -59,11 +59,12 @@ mds
   - surviving mds rejoins replicas from a recovering mds
     - will tell auth if it holds an xlock
 - recovering open files
-  - need to journal EOpen
-  - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
+/  - need to journal EOpen
+/  - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
   - need to batch EOpen events when rejournaling to avoid looping
   - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
     - path+cap window will require fetching and metadata from disk before doing the rejoin
+- client reconnect after resolve, before rejoin.
 
 - clientmap request history
   - hmm, so we need completion codes, but only after recovery.
@@ -71,6 +72,10 @@ mds
   - list of active tids < max
   - journaled bit in MDRequest
   - journal 
+  - complete request list in MClientMap
+    - populated on replay
+    - periodic client->mds MClientLastRetry -> trim request list
+
 
 /untested- truncate...
 
@@ -198,12 +203,8 @@ rados snapshots
 
 - how to get usage feedback to monitor?
 
-- change messenger entity_inst_t
- - no more rank!  make it a uniquish nonce?
-
 - clean up mds caps release in exporter
 - figure out client failure modes
-- clean up messenger failure modes.  
 - add connection retry.
 
 
@@ -227,7 +228,8 @@ osd/rados
 
 simplemessenger
 - close idle connections
-- retry, timeout on connection or transmission failure
+- buffer sent messages until a receive is acknowledged (handshake!)
+  - retry, timeout on connection or transmission failure
 - exponential backoff on monitor resend attempts (actually, this should go outside the messenger!)
 
 objectcacher
index 65aca4101db396a4271a6b899a84aa77aab6a240..9be84a296b82822e1c621ca0b8ac20c3f3c4b90e 100644 (file)
@@ -585,6 +585,8 @@ MClientReply* Client::sendrecv(MClientRequest *req, int mds)
   // assign a unique tid
   tid_t tid = ++last_tid;
   req->set_tid(tid);
+  if (!mds_requests.empty()) 
+    req->set_oldest_client_tid(mds_requests.begin()->first);
 
   // make note
   MetaRequest request(req, tid);
index a617c3cabab6ba12686ef48b2f95018360e3fa87..e51e4ed4bc4e901d2d93074b75ecea9f37f721d7 100644 (file)
@@ -34,26 +34,40 @@ using namespace __gnu_cxx;
  * contacted if necessary).
  */
 class ClientMap {
+private:
   version_t version;
-
   version_t projected;
   version_t committing;
   version_t committed;
   map<version_t, list<Context*> > commit_waiters;
 
+public:
+  ClientMap() : version(0), projected(0), committing(0), committed(0) {}
+
+  version_t get_version() { return version; }
+  version_t get_projected() { return projected; }
+  version_t get_committing() { return committing; }
+  version_t get_committed() { return committed; }
+
+  version_t inc_projected() { return ++projected; }
+  void reset_projected() { projected = version; }
+  void set_committing(version_t v) { committing = v; }
+  void set_committed(version_t v) { committed = v; }
+
+  void add_commit_waiter(Context *c) { 
+    commit_waiters[committing].push_back(c); 
+  }
+  void take_commit_waiters(version_t v, list<Context*>& ls) { 
+    ls.swap(commit_waiters[v]);
+    commit_waiters.erase(v);
+  }
+
+  // client mount, inst info
+private:
   hash_map<int,entity_inst_t> client_inst;
   set<int>           client_mount;
   hash_map<int, int> client_ref;
 
-  /*
-    hrm, we need completion codes, too.
-  struct active_tid_set_t {
-    tid_t      max;     // max tid we've journaled
-    set<tid_t> active;  // still-active tids that are < max
-  };
-  hash_map<int, active_tid_set_t> client_committed;
-  */
-  
   void inc_ref(int client, const entity_inst_t& inst) {
     if (client_inst.count(client)) {
       assert(client_inst[client] == inst);
@@ -72,28 +86,8 @@ class ClientMap {
       client_inst.erase(client);
     }
   }
-  
-public:
-  ClientMap() : version(0), projected(0), committing(0), committed(0) {}
-
-  version_t get_version() { return version; }
-  version_t get_projected() { return projected; }
-  version_t get_committing() { return committing; }
-  version_t get_committed() { return committed; }
-
-  version_t inc_projected() { return ++projected; }
-  void reset_projected() { projected = version; }
-  void set_committing(version_t v) { committing = v; }
-  void set_committed(version_t v) { committed = v; }
-
-  void add_commit_waiter(Context *c) { 
-    commit_waiters[committing].push_back(c); 
-  }
-  void take_commit_waiters(version_t v, list<Context*>& ls) { 
-    ls.swap(commit_waiters[v]);
-    commit_waiters.erase(v);
-  }
 
+public:
   bool empty() {
     return client_inst.empty() && client_mount.empty() && client_ref.empty();
   }
@@ -125,6 +119,31 @@ public:
     //version++;
   }
 
+
+private:
+  // -- completed requests --
+  // client id -> tid -> result code
+  map<int, set<tid_t> > completed_requests;  // completed client requests
+public:
+  void add_completed_request(metareqid_t ri) {
+    completed_requests[ri.client].insert(ri.tid);
+  }
+  void trim_completed_requests(int client, tid_t mintid) {
+    if (completed_requests.count(client) == 0) return;
+    set<tid_t>& ls = completed_requests[client];
+    while (!ls.empty() && *ls.begin() < mintid)
+      ls.erase(ls.begin());
+    if (ls.empty())
+      completed_requests.erase(client);
+  }
+  bool have_completed_request(metareqid_t ri) {
+    return completed_requests.count(ri.client) &&
+      completed_requests[ri.client].count(ri.tid);
+  }
+
+
+  // -- encoding --
   void encode(bufferlist& bl) {
     bl.append((char*)&version, sizeof(version));
     ::_encode(client_inst, bl);
index 7a84a758376f9e59bede423eca10325564ce839f..0729742f1b0e1f44240f90019c5acdbe4bbe4738 100644 (file)
@@ -177,14 +177,17 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei)
           << " (" << strerror(-reply->get_result())
           << ") " << *req << endl;
 
+  // note result code in clientmap?
+  if (!req->is_idempotent())
+    mds->clientmap.add_completed_request(mdr->reqid);
+
   // include trace
   if (tracei) {
     reply->set_trace_dist( tracei, mds->get_nodeid() );
   }
   
   // send reply
-  messenger->send_message(reply,
-                          req->get_client_inst());
+  messenger->send_message(reply, req->get_client_inst());
   
   // finish request
   mdcache->request_finish(mdr);
@@ -216,6 +219,21 @@ void Server::handle_client_request(MClientRequest *req)
   // okay, i want
   CInode           *ref = 0;
 
+  // retry?
+  if (req->get_retry_attempt()) {
+    if (mds->clientmap.have_completed_request(req->get_reqid())) {
+      dout(5) << "already completed " << req->get_reqid() << endl;
+      mds->messenger->send_message(new MClientReply(req, 0),
+                                  req->get_source_inst());
+      delete req;
+      return;
+    }
+  }
+  // trim completed_request list
+  if (req->get_oldest_client_tid())
+    mds->clientmap.trim_completed_requests(req->get_source().num(),
+                                          req->get_oldest_client_tid());
+
 
   // -----
   // some ops are on ino's
index 73e13dcab52d282a143fd25acbd10b4fa890964d..83bdc1d6536c9032a749d22e0b4de06023cb6ddd 100644 (file)
@@ -397,6 +397,12 @@ void EMetaBlob::replay(MDS *mds)
             << " to " << p->second << endl;
     mds->mdcache->add_recovered_purge(p->first, p->second);  
   }
+
+  // client requests
+  for (list<metareqid_t>::iterator p = client_reqs.begin();
+       p != client_reqs.end();
+       ++p)
+    mds->clientmap.add_completed_request(*p);
 }
 
 // -----------------------
@@ -681,7 +687,7 @@ void EOpen::replay(MDS *mds)
 
 
 // -----------------------
-// EUpdate
+// ESlaveUpdate
 
 bool ESlaveUpdate::has_expired(MDS *mds)
 {
index 72ea8fbcee252497834ac642abd839f61af31b8c..5fbd54b6428ce4d2fe0655f8344724b79e9b5ced 100644 (file)
@@ -73,7 +73,8 @@
 
 class MClientRequest : public Message {
   struct {
-    long tid;
+    tid_t tid;
+    tid_t oldest_client_tid;
     int num_fwd;
     int retry_attempt;
     inodeno_t  mds_wants_replica_in_dirino;
@@ -190,7 +191,8 @@ class MClientRequest : public Message {
 
 
   // normal fields
-  void set_tid(long t) { st.tid = t; }
+  void set_tid(tid_t t) { st.tid = t; }
+  void set_oldest_client_tid(tid_t t) { st.oldest_client_tid = t; }
   void inc_num_fwd() { st.num_fwd++; }
   void set_retry_attempt(int a) { st.retry_attempt = a; }
   void set_path(string& p) { path.set_path(p); }
@@ -207,7 +209,8 @@ class MClientRequest : public Message {
   const entity_inst_t& get_client_inst() { return st.client_inst; }
 
   int get_client() { return st.client_inst.name.num(); }
-  long get_tid() { return st.tid; }
+  tid_t get_tid() { return st.tid; }
+  tid_t get_oldest_client_tid() { return st.oldest_client_tid; }
   int get_num_fwd() { return st.num_fwd; }
   int get_retry_attempt() { return st.retry_attempt; }
   int get_op() { return st.op; }