]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* some idempotent client ops groundwork
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 21 Mar 2007 22:15:01 +0000 (22:15 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 21 Mar 2007 22:15:01 +0000 (22:15 +0000)
* tested/fixed up a few AnchorTable failure cases
* fixed remote_ino linking bug in Server.cc

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1280 29311d96-e01e-0410-9327-a35deaab8ce9

29 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/client/Client.cc
branches/sage/cephmds2/client/Client.h
branches/sage/cephmds2/client/SyntheticClient.cc
branches/sage/cephmds2/client/fuse.cc
branches/sage/cephmds2/doc/anchortable.txt
branches/sage/cephmds2/include/types.h
branches/sage/cephmds2/mds/AnchorClient.cc
branches/sage/cephmds2/mds/AnchorClient.h
branches/sage/cephmds2/mds/AnchorTable.cc
branches/sage/cephmds2/mds/AnchorTable.h
branches/sage/cephmds2/mds/CDentry.h
branches/sage/cephmds2/mds/CDir.cc
branches/sage/cephmds2/mds/CDir.h
branches/sage/cephmds2/mds/Locker.cc
branches/sage/cephmds2/mds/Locker.h
branches/sage/cephmds2/mds/LogEvent.cc
branches/sage/cephmds2/mds/LogEvent.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDS.cc
branches/sage/cephmds2/mds/MDSMap.h
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/events/EAnchor.h
branches/sage/cephmds2/mds/events/EAnchorClient.h [new file with mode: 0644]
branches/sage/cephmds2/mds/events/EMetaBlob.h
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MClientRequest.h
branches/sage/cephmds2/messages/MClientRequestForward.h
branches/sage/cephmds2/msg/SimpleMessenger.h

index 0587e97928078d0839b69b462bc01e871dc69962..e4f4b7711e6d8ba8dd122601ceafbcfd4abcca1d 100644 (file)
@@ -33,19 +33,22 @@ mds
   - DOCUMENT.
 
 - discover
-  - hard link dentries
+/  - hard link dentries
 
 - rejoin and replicas that are not in recovered node's cache...  fetch storm?
 
 - locking madness
 /  - request_auth_pin, request_drop_auth_pins, and _link/_unlink_local should pre-pin dn dir and targeti.
-  - move auth_pinning _out_ of locking _start and _finish methods
-  - clean up multi-auth_pin code paths (e.g. link_local)
+/  - move auth_pinning _out_ of locking _start and _finish methods
+/  - clean up multi-auth_pin code paths (e.g. link_local)
 
 
 - mds failure vs clients
-  - clean up client op redirection
+/  - clean up client op redirection
   - idempotent ops
+    - how to track in memory?
+    - how to trim?
+    - EMetablob replay, expire logic
 - journal+recovery
   - test anchortable, anchorclient
 /  - local link
index 9885bab970eb9706fc7a5a3275fec66dc4623d59..0884ecadcad9c8640775f6586507a3139ae2f846 100644 (file)
@@ -543,8 +543,7 @@ MClientReply *Client::make_request(MClientRequest *req,
   if (op == MDS_OP_STAT ||
       op == MDS_OP_LSTAT ||
       op == MDS_OP_READDIR ||
-      op == MDS_OP_OPEN ||
-      op == MDS_OP_RELEASE)
+      op == MDS_OP_OPEN)
     nojournal = true;
 
   MClientReply *reply = sendrecv(req, mds);
@@ -592,6 +591,9 @@ MClientReply* Client::sendrecv(MClientRequest *req, int mds)
   // encode payload now, in case we have to resend (in case of mds failure)
   req->encode_payload();
   request.request_payload = req->get_payload();
+
+  // note idempotency
+  request.idempotent = req->is_idempotent();
   
   // send initial request.
   send_request(&request, mds);
@@ -648,24 +650,38 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd)
   MetaRequest *request = mds_requests[tid];
   assert(request);
 
-  // note new mds set.
-  // there are now exactly two mds's whose failure should trigger a resend
-  // of this request.
-  if (request->num_fwd < fwd->get_num_fwd()) {
+  if (request->idempotent) {
+    // note new mds set.
+    // there are now exactly two mds's whose failure should trigger a resend
+    // of this request.
+    if (request->num_fwd < fwd->get_num_fwd()) {
+      request->mds.clear();
+      request->mds.insert(fwd->get_source().num());
+      request->mds.insert(fwd->get_dest_mds());
+      request->num_fwd = fwd->get_num_fwd();
+      dout(10) << "handle_client_request tid " << tid
+              << " fwd " << fwd->get_num_fwd() 
+              << " to mds" << fwd->get_dest_mds() 
+              << ", mds set now " << request->mds
+              << endl;
+    } else {
+      dout(10) << "handle_client_request tid " << tid
+              << " previously forwarded to mds" << fwd->get_dest_mds() 
+              << ", mds still " << request->mds
+              << endl;
+    }
+  } else {
     request->mds.clear();
-    request->mds.insert(fwd->get_source().num());
     request->mds.insert(fwd->get_dest_mds());
     request->num_fwd = fwd->get_num_fwd();
+    
     dout(10) << "handle_client_request tid " << tid
             << " fwd " << fwd->get_num_fwd() 
             << " to mds" << fwd->get_dest_mds() 
-            << ", mds set now " << request->mds
-            << endl;
-  } else {
-    dout(10) << "handle_client_request tid " << tid
-            << " previously forwarded to mds" << fwd->get_dest_mds() 
-            << ", mds still " << request->mds
+            << ", non-idempotent, resending to " << fwd->get_dest_mds()
             << endl;
+    
+    send_request(request, fwd->get_dest_mds());
   }
 
   delete fwd;
@@ -2107,7 +2123,7 @@ struct dirent_lite *Client::readdirlite(DIR *dirp)
 
 /****** file i/o **********/
 
-int Client::open(const char *relpath, int flags) 
+int Client::open(const char *relpath, int flags, mode_t mode
 {
   client_lock.Lock();
 
@@ -2120,27 +2136,14 @@ int Client::open(const char *relpath, int flags)
   tout << path << endl;
   tout << flags << endl;
 
-  int cmode = 0;
-  bool tryauth = false;
-  if (flags & O_LAZY) 
-    cmode = FILE_MODE_LAZY;
-  else if (flags & O_WRONLY) {
-    cmode = FILE_MODE_W;
-    tryauth = true;
-  } else if (flags & O_RDWR) {
-    cmode = FILE_MODE_RW;
-    tryauth = true;
-  } else if (flags & O_APPEND) {
-    cmode = FILE_MODE_W;
-    tryauth = true;
-  } else
-    cmode = FILE_MODE_R;
-
   // go
   MClientRequest *req = new MClientRequest(MDS_OP_OPEN, messenger->get_myinst());
   req->set_path(path); 
   req->args.open.flags = flags;
-  req->args.open.mode = cmode;
+  req->args.open.mode = mode;
+
+  int cmode = req->get_open_file_mode();
+  bool tryauth = !req->open_file_mode_is_readonly();
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
index 82902c621ff5bda21ed5a7d034250108808eaab2..8be11a38cdccb2d8a878fbeaf9349ab0a28eb1e4 100644 (file)
@@ -44,8 +44,6 @@ using namespace std;
 #include <ext/hash_map>
 using namespace __gnu_cxx;
 
-#define O_LAZY 01000000
-
 
 class MClientRequest;
 class MClientRequestForward;
@@ -321,6 +319,7 @@ class Client : public Dispatcher {
     MClientRequest *request;    
     bufferlist request_payload;  // in case i have to retry
 
+    bool     idempotent;         // is request idempotent?
     set<int> mds;                // who i am asking
     int      num_fwd;            // # of times i've been forwarded
 
@@ -331,7 +330,7 @@ class Client : public Dispatcher {
 
     MetaRequest(MClientRequest *req, tid_t t) : 
       tid(t), request(req), 
-      num_fwd(0), 
+      idempotent(false), num_fwd(0), 
       reply(0), 
       caller_cond(0), dispatch_cond(0) { }
   };
@@ -593,7 +592,7 @@ protected:
   
   // file ops
   int mknod(const char *path, mode_t mode);
-  int open(const char *path, int mode);
+  int open(const char *path, int flags, mode_t mode=0);
   int close(fh_t fh);
   off_t lseek(fh_t fh, off_t offset, int whence);
   int read(fh_t fh, char *buf, off_t size, off_t offset=-1);
index 0c8f7081abbfc5873cbccde2f4b905082a841562..1bdbe934586516481aac21864e8d8857d046827e 100644 (file)
@@ -1317,14 +1317,12 @@ void SyntheticClient::make_dir_mess(const char *basedir, int n)
 void SyntheticClient::foo()
 {
   // link fun
-  /*
   client->mknod("one", 0755);
   client->mknod("two", 0755);
   client->link("one", "three");
   client->mkdir("dir", 0755);
   client->link("two", "/dir/twolink");
   client->link("dir/twolink", "four");
-  */
   
   // unlink fun
   client->mknod("a", 0644);
index 2feb7472d1c7b7828073c30c9f766314ec796ac8..b142609ccda73e7e57ad8399ead5162131eb0df6 100644 (file)
@@ -157,7 +157,7 @@ static int ceph_open(const char *path, struct fuse_file_info *fi)
 {
   int res;
   
-  res = client->open(path, fi->flags);
+  res = client->open(path, fi->flags, 0);
   if (res < 0) return res;
   fi->fh = res;
   return 0;  // fuse wants 0 onsucess
index ab036a14877f76cea2edc6726926a9a91ba8651f..d9c0fefc31e08842e53494be58dcf772ab6306f3 100644 (file)
@@ -36,7 +36,8 @@ INITIATING MDS FAILURE
 If the MDS fails before the metadata update has been journaled, no
 action is taken, since nothing is known about the previously proposed
 transaction.  If an AGREE message is received and there is no
-corresponding PREPARE state, and ROLLBACK is sent to the anchor table.
+corresponding PREPARE or pending-commit state, and ROLLBACK is sent to
+the anchor table.
 
 If the MDS fails after journaling the metadata update but before
 journaling the ACK, it resends COMMIT to the anchor table.  If it
index 2c4ef21a6b793dbe634079c690195a0bb0b6599b..177a81c25aec63f796036bed83bef45fa134cef3 100644 (file)
@@ -129,6 +129,8 @@ typedef __uint32_t epoch_t;       // map epoch  (32bits -> 13 epochs/second for
 
 
 
+#define O_LAZY 01000000
+
 
 
 /** object layout
index c28231ca9b1d0441e98713b3012f53ba598400a5..d1c2da0b870ffde2e8441c2430216e26486f9ee3 100644 (file)
@@ -26,13 +26,13 @@ using std::endl;
 #include "MDS.h"
 #include "MDLog.h"
 
-#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
 #include "messages/MAnchor.h"
 
 #include "config.h"
 #undef dout
-#define dout(x)  if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
-#define derr(x)  if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
+#define dout(x)  if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
+#define derr(x)  if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
 
 
 void AnchorClient::dispatch(Message *m)
@@ -52,6 +52,8 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
   inodeno_t ino = m->get_ino();
   version_t atid = m->get_atid();
 
+  dout(10) << "handle_anchor_reply " << *m << endl;
+
   switch (m->get_op()) {
 
     // lookup
@@ -83,7 +85,18 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
         onfinish->finish(0);
         delete onfinish;
       }
-    } else {
+    } 
+    else if (pending_commit.count(atid)) {
+      dout(10) << "stray create_agree on " << ino
+              << " atid " << atid
+              << ", already committing, resending COMMIT"
+              << endl;      
+      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+      mds->messenger->send_message(req, 
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+    }
+    else {
       dout(10) << "stray create_agree on " << ino
               << " atid " << atid
               << ", sending ROLLBACK"
@@ -108,7 +121,18 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
         onfinish->finish(0);
         delete onfinish;
       }
-    } else {
+    } 
+    else if (pending_commit.count(atid)) {
+      dout(10) << "stray destroy_agree on " << ino
+              << " atid " << atid
+              << ", already committing, resending COMMIT"
+              << endl;      
+      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+      mds->messenger->send_message(req, 
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+    }
+    else {
       dout(10) << "stray destroy_agree on " << ino
               << " atid " << atid
               << ", sending ROLLBACK"
@@ -133,7 +157,18 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
         onfinish->finish(0);
         delete onfinish;
       }
-    } else {
+    }
+    else if (pending_commit.count(atid)) {
+      dout(10) << "stray update_agree on " << ino
+              << " atid " << atid
+              << ", already committing, resending COMMIT"
+              << endl;      
+      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+      mds->messenger->send_message(req, 
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+    }
+    else {
       dout(10) << "stray update_agree on " << ino
               << " atid " << atid
               << ", sending ROLLBACK"
@@ -155,7 +190,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
       pending_commit.erase(atid);
 
       // log ACK.
-      mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_ACK, 0, atid));  // ino doesn't matter.
+      mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid));
 
       // kick any waiters
       if (ack_waiters.count(atid)) {
@@ -194,9 +229,9 @@ void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinis
   pending_lookup[ino].onfinish = onfinish;
   pending_lookup[ino].trace = &trace;
 
-  mds->messenger->send_message(req, 
-                         mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+  mds->send_message_mds(req, 
+                       mds->mdsmap->get_anchortable(),
+                       MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
 }
 
 
@@ -213,9 +248,9 @@ void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace,
   pending_create_prepare[ino].patid = patid;
   pending_create_prepare[ino].onfinish = onfinish;
 
-  mds->messenger->send_message(req, 
-                         mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+  mds->send_message_mds(req, 
+                       mds->mdsmap->get_anchortable(),
+                       MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
 }
 
 void AnchorClient::prepare_destroy(inodeno_t ino, 
@@ -270,16 +305,61 @@ void AnchorClient::commit(version_t atid)
 
 void AnchorClient::finish_recovery()
 {
-  dout(7) << "finish_recovery - sending COMMIT on un-ACKed atids" << endl;
+  dout(7) << "finish_recovery" << endl;
+
+  resend_commits();
+}
 
+void AnchorClient::resend_commits()
+{
   for (set<version_t>::iterator p = pending_commit.begin();
        p != pending_commit.end();
        ++p) {
-    dout(10) << " sending COMMIT on " << *p << endl;
+    dout(10) << "resending commit on " << *p << endl;
     MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p);
-    mds->messenger->send_message(req, 
-                                mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+    mds->send_message_mds(req, 
+                         mds->mdsmap->get_anchortable(),
+                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
   }
 }
 
+void AnchorClient::resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op)
+{
+  for (hash_map<inodeno_t, _pending_prepare>::iterator p = prepares.begin();
+       p != prepares.end();
+       p++) {
+    dout(10) << "resending " << get_anchor_opname(op) << " on " << p->first << endl;
+    MAnchor *req = new MAnchor(op, p->first);
+    req->set_trace(p->second.trace);
+    mds->send_message_mds(req, 
+                         mds->mdsmap->get_anchortable(),
+                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+  } 
+}
+
+
+void AnchorClient::handle_mds_recovery(int who)
+{
+  dout(7) << "handle_mds_recovery mds" << who << endl;
+
+  if (who != mds->mdsmap->get_anchortable()) 
+    return; // do nothing.
+
+  // resend any pending lookups.
+  for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
+       p != pending_lookup.end();
+       p++) {
+    dout(10) << "resending lookup on " << p->first << endl;
+    mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
+                         mds->mdsmap->get_anchortable(),
+                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+  }
+  
+  // resend any pending prepares.
+  resend_prepares(pending_create_prepare, ANCHOR_OP_CREATE_PREPARE);
+  resend_prepares(pending_update_prepare, ANCHOR_OP_UPDATE_PREPARE);
+  resend_prepares(pending_destroy_prepare, ANCHOR_OP_DESTROY_PREPARE);
+
+  // resend any pending commits.
+  resend_commits();
+}
index 6f036f27ab830a8153413d3fade4bd0c74c90115..ae62608ce29822874de8d6ec6221ec6096499efa 100644 (file)
@@ -70,6 +70,9 @@ public:
   // for recovery (by other nodes)
   void handle_mds_recovery(int mds); // called when someone else recovers
 
+  void resend_commits();
+  void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
+
   // for recovery (by me)
   void got_journaled_agree(version_t atid) {
     pending_commit.insert(atid);
index c20daf79c5288b62e240913078567a73f25432a9..d9ef7d7d5f02460c54d4fb185a83003716f61fc7 100644 (file)
@@ -145,7 +145,7 @@ void AnchorTable::handle_lookup(MAnchor *req)
 
 // MIDLEVEL
 
-void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace)
+void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
 {
   // make sure trace is in table
   for (unsigned i=0; i<trace.size(); i++) 
@@ -154,21 +154,24 @@ void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace)
 
   version++;
   pending_create[version] = ino;  // so we can undo
+  pending_reqmds[version] = reqmds;
   //dump();
 }
 
-void AnchorTable::destroy_prepare(inodeno_t ino)
+void AnchorTable::destroy_prepare(inodeno_t ino, int reqmds)
 {
   version++;
   pending_destroy[version] = ino;
+  pending_reqmds[version] = reqmds;
   //dump();
 }
 
-void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace)
+void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
 {
   version++;
   pending_update[version].first = ino;
   pending_update[version].second = trace;
+  pending_reqmds[version] = reqmds;
   //dump();
 }
 
@@ -207,6 +210,8 @@ void AnchorTable::commit(version_t atid)
   else
     assert(0);
 
+  pending_reqmds.erase(atid);
+
   // bump version.
   version++;
   //dump();
@@ -235,6 +240,8 @@ void AnchorTable::rollback(version_t atid)
   else
     assert(0);
 
+  pending_reqmds.erase(atid);
+
   // bump version.
   version++;
   //dump();
@@ -264,10 +271,10 @@ void AnchorTable::handle_create_prepare(MAnchor *req)
 
   dout(7) << "handle_create_prepare " << ino << endl;
   
-  create_prepare(ino, trace);
+  create_prepare(ino, trace, req->get_source().num());
 
   // log it
-  EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version);
+  EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version, req->get_source().num());
   le->set_trace(trace);
   mds->mdlog->submit_entry(le, 
                           new C_AT_CreatePrepare(this, req, version));
@@ -307,9 +314,9 @@ void AnchorTable::handle_destroy_prepare(MAnchor *req)
   inodeno_t ino = req->get_ino();
   dout(7) << "handle_destroy_prepare " << ino << endl;
 
-  destroy_prepare(ino);
+  destroy_prepare(ino, req->get_source().num());
 
-  mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version),
+  mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version, req->get_source().num()),
                           new C_AT_DestroyPrepare(this, req, version));
 }
 
@@ -347,10 +354,10 @@ void AnchorTable::handle_update_prepare(MAnchor *req)
 
   dout(7) << "handle_update_prepare " << ino << endl;
   
-  update_prepare(ino, trace);
+  update_prepare(ino, trace, req->get_source().num());
 
   // log it
-  EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version);
+  EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version, req->get_source().num());
   le->set_trace(trace);
   mds->mdlog->submit_entry(le, 
                           new C_AT_UpdatePrepare(this, req, version));
@@ -467,6 +474,8 @@ void AnchorTable::handle_anchor_request(class MAnchor *req)
     return;
   }
 
+  dout(10) << "handle_anchor_request " << *req << endl;
+
   // go
   switch (req->get_op()) {
 
@@ -514,7 +523,10 @@ public:
 void AnchorTable::save(Context *onfinish)
 {
   dout(7) << "save v " << version << endl;
-  if (!opened) return;
+  if (!opened) {
+    assert(!onfinish);
+    return;
+  }
   
   if (onfinish)
     waiting_for_save[version].push_back(onfinish);
@@ -544,6 +556,7 @@ void AnchorTable::save(Context *onfinish)
   }
 
   // pending
+  ::_encode(pending_reqmds, bl);
   ::_encode(pending_create, bl);
   ::_encode(pending_destroy, bl);
   
@@ -620,6 +633,7 @@ void AnchorTable::_loaded(bufferlist& bl)
     dout(15) << "load_2 decoded " << a << endl;
   }
 
+  ::_decode(pending_reqmds, bl, off);
   ::_decode(pending_create, bl, off);
   ::_decode(pending_destroy, bl, off);
 
@@ -647,3 +661,31 @@ void AnchorTable::_loaded(bufferlist& bl)
   finish_contexts(waiting_for_open);
 }
 
+
+//////
+
+void AnchorTable::finish_recovery()
+{
+  dout(7) << "finish_recovery" << endl;
+  
+  for (map<version_t, inodeno_t>::iterator p = pending_create.begin();
+       p != pending_create.end();
+       ++p) {
+    MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, p->second, p->first);
+    mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+  }
+  for (map<version_t, inodeno_t>::iterator p = pending_destroy.begin();
+       p != pending_destroy.end();
+       ++p) {
+    MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, p->second, p->first);
+    mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+  }
+  for (map<version_t, pair<inodeno_t, vector<Anchor> > >::iterator p = pending_update.begin();
+       p != pending_update.end();
+       ++p) {
+    MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, p->second.first, p->first);
+    mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+  }
+}
+
+
index fd188483ca7d5917658432d9ded6d0d6e4882a70..24b5575f1a0536f53661878dc5977bc560d6f22e 100644 (file)
@@ -31,6 +31,7 @@ class AnchorTable {
   hash_map<inodeno_t, Anchor>  anchor_map;
 
   // uncommitted operations
+  map<version_t, int> pending_reqmds;
   map<version_t, inodeno_t> pending_create;
   map<version_t, inodeno_t> pending_destroy;
   map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
@@ -54,9 +55,9 @@ protected:
   void dec(inodeno_t ino);
 
   // mid-level
-  void create_prepare(inodeno_t ino, vector<Anchor>& trace);
-  void destroy_prepare(inodeno_t ino);
-  void update_prepare(inodeno_t ino, vector<Anchor>& trace);
+  void create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
+  void destroy_prepare(inodeno_t ino, int reqmds);
+  void update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
   void commit(version_t atid);
   void rollback(version_t atid);
   friend class EAnchor;  // used for journal replay.
@@ -115,6 +116,8 @@ public:
   void load(Context *onfinish);
   void _loaded(bufferlist& bl);
 
+  // recovery
+  void finish_recovery();
 
 };
 
index a0f283b6466d06ed55d3b61d2d9e49fa5dab8826..c9fbcf2188e6b82c8f42716546748ce5e2995c99 100644 (file)
@@ -309,9 +309,16 @@ public:
 
   string& get_dname() { return dname; }
   int get_nonce() { return replica_nonce; }
+  bool is_remote() { return remote_ino ? true:false; }
+  inodeno_t get_remote_ino() { return remote_ino; }
 
   void update_dentry(CDentry *dn) {
     dn->set_replica_nonce( replica_nonce );
+    if (remote_ino)
+      dn->set_remote_ino(remote_ino);
+  }
+  void update_new_dentry(CDentry *dn) {
+    update_dentry(dn);
     dn->set_lockstate( lockstate );
   }
 
index 46c9b729a7de2fccf296f017f481950e979c6544..3ceb5c6f934741c6abb17fa51f5571fe2b613362 100644 (file)
@@ -242,6 +242,7 @@ void CDir::link_inode( CDentry *dn, inodeno_t ino)
   //assert(null_items.count(dn->name) == 1);
   //null_items.erase(dn->name);
   nnull--;
+  assert(nnull + nitems == items.size());
 }
 
 void CDir::link_inode( CDentry *dn, CInode *in )
index 3e2b947461656790fea771c17680353cd8ee2398..d0c7a46d8a32c0db183768fbdae5556e128281de 100644 (file)
@@ -529,7 +529,6 @@ class CDirDiscover {
 class CDirExport {
   struct {
     dirfrag_t   dirfrag;
-    long        nitems; // actual real entries
     long        nden;   // num dentries (including null ones)
     version_t   version;
     version_t   committed_version;
@@ -549,7 +548,6 @@ class CDirExport {
     assert(dir->get_version() == dir->get_projected_version());
 
     st.dirfrag = dir->dirfrag();
-    st.nitems = dir->nitems;
     st.nden = dir->items.size();
     st.version = dir->version;
     st.committed_version = dir->committed_version;
@@ -571,8 +569,6 @@ class CDirExport {
   void update_dir(CDir *dir) {
     assert(dir->dirfrag() == st.dirfrag);
 
-    //dir->nitems = st.nitems;
-
     // set committed_version at old version
     dir->committing_version = dir->committed_version = st.committed_version;
     dir->projected_version = dir->version = st.version;
index ebb78b14c8459dece0033aa97a3849a8eefa67be..b5e999a6c030addf73c9650f844d87a78b79e510 100644 (file)
@@ -544,7 +544,7 @@ bool Locker::inode_hard_read_try(CInode *in, Context *con)
   return false;
 }
 
-bool Locker::inode_hard_read_start(CInode *in, MClientRequest *m)
+bool Locker::inode_hard_read_start(CInode *in, MClientRequest *m, CInode *ref)
 {
   dout(7) << "inode_hard_read_start  on " << *in << endl;  
 
@@ -559,7 +559,7 @@ bool Locker::inode_hard_read_start(CInode *in, MClientRequest *m)
 
   // wait!
   dout(7) << "inode_hard_read_start waiting on " << *in << endl;
-  in->add_waiter(CInode::WAIT_HARDR, new C_MDS_RetryRequest(mds, m, in));
+  in->add_waiter(CInode::WAIT_HARDR, new C_MDS_RetryRequest(mds, m, ref));
   return false;
 }
 
@@ -576,7 +576,7 @@ void Locker::inode_hard_read_finish(CInode *in)
 }
 
 
-bool Locker::inode_hard_write_start(CInode *in, MClientRequest *m)
+bool Locker::inode_hard_write_start(CInode *in, MClientRequest *m, CInode *ref)
 {
   dout(7) << "inode_hard_write_start  on " << *in << endl;
 
@@ -604,7 +604,7 @@ bool Locker::inode_hard_write_start(CInode *in, MClientRequest *m)
     }
     
     dout(7) << "inode_hard_write_start waiting on " << *in << endl;
-    in->add_waiter(CInode::WAIT_HARDW, new C_MDS_RetryRequest(mds, m, in));
+    in->add_waiter(CInode::WAIT_HARDW, new C_MDS_RetryRequest(mds, m, ref));
 
     return false;
   } else {
@@ -850,7 +850,7 @@ void Locker::handle_lock_inode_hard(MLock *m)
 // soft inode metadata
 
 
-bool Locker::inode_file_read_start(CInode *in, MClientRequest *m)
+bool Locker::inode_file_read_start(CInode *in, MClientRequest *m, CInode *ref)
 {
   dout(7) << "inode_file_read_start " << *in << " filelock=" << in->filelock << endl;  
 
@@ -883,7 +883,7 @@ bool Locker::inode_file_read_start(CInode *in, MClientRequest *m)
         }
       } else {
         dout(7) << "inode_file_read_start waiting until stable on " << *in << ", filelock=" << in->filelock << endl;
-        in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+        in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
         return false;
       }
     } else {
@@ -900,7 +900,7 @@ bool Locker::inode_file_read_start(CInode *in, MClientRequest *m)
       } else {
         // wait until stable
         dout(7) << "inode_file_read_start waiting until stable on " << *in << ", filelock=" << in->filelock << endl;
-        in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+        in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
         return false;
       }
     }
@@ -908,7 +908,7 @@ bool Locker::inode_file_read_start(CInode *in, MClientRequest *m)
 
   // wait
   dout(7) << "inode_file_read_start waiting on " << *in << ", filelock=" << in->filelock << endl;
-  in->add_waiter(CInode::WAIT_FILER, new C_MDS_RetryRequest(mds, m, in));
+  in->add_waiter(CInode::WAIT_FILER, new C_MDS_RetryRequest(mds, m, ref));
         
   return false;
 }
@@ -929,7 +929,7 @@ void Locker::inode_file_read_finish(CInode *in)
 }
 
 
-bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
+bool Locker::inode_file_write_start(CInode *in, MClientRequest *m, CInode *ref)
 {
   dout(7) << "inode_file_write_start on " << *in << endl;
 
@@ -942,7 +942,7 @@ bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
       if (!in->filelock.can_write_soon(in->is_auth())) {
        if (!in->filelock.is_stable()) {
          dout(7) << "inode_file_write_start on auth, waiting for stable on " << *in << endl;
-         in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+         in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
          return false;
        }
        
@@ -970,7 +970,7 @@ bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
     return true;
   } else {
     dout(7) << "inode_file_write_start on auth, waiting for write on " << *in << endl;
-    in->add_waiter(CInode::WAIT_FILEW, new C_MDS_RetryRequest(mds, m, in));
+    in->add_waiter(CInode::WAIT_FILEW, new C_MDS_RetryRequest(mds, m, ref));
     return false;
   }
 }
index 6146a6ccff27aeec8b1537500ab55dedf7c13842..9b7b1b569dec5a5eeb6c78987509b0700f0aa9ce 100644 (file)
@@ -62,13 +62,13 @@ private:
   // high level interface
  public:
   bool inode_hard_read_try(CInode *in, Context *con);
-  bool inode_hard_read_start(CInode *in, MClientRequest *m);
+  bool inode_hard_read_start(CInode *in, MClientRequest *m, CInode *ref);
   void inode_hard_read_finish(CInode *in);
-  bool inode_hard_write_start(CInode *in, MClientRequest *m);
+  bool inode_hard_write_start(CInode *in, MClientRequest *m, CInode *ref);
   void inode_hard_write_finish(CInode *in);
-  bool inode_file_read_start(CInode *in, MClientRequest *m);
+  bool inode_file_read_start(CInode *in, MClientRequest *m, CInode *ref);
   void inode_file_read_finish(CInode *in);
-  bool inode_file_write_start(CInode *in, MClientRequest *m);
+  bool inode_file_write_start(CInode *in, MClientRequest *m, CInode *ref);
   void inode_file_write_finish(CInode *in);
 
   void inode_hard_eval(CInode *in);
index 6052ca5f29e9077cafda37d6d2f00780e4791e6d..4fc9101548f11f601a489d83363a111130770970 100644 (file)
@@ -24,6 +24,7 @@
 #include "events/EMount.h"
 #include "events/EClientMap.h"
 #include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
 #include "events/EAlloc.h"
 #include "events/EPurgeFinish.h"
 #include "events/EExport.h"
@@ -54,6 +55,7 @@ LogEvent *LogEvent::decode(bufferlist& bl)
   case EVENT_MOUNT: le = new EMount(); break;
   case EVENT_CLIENTMAP: le = new EClientMap(); break;
   case EVENT_ANCHOR: le = new EAnchor(); break;
+  case EVENT_ANCHORCLIENT: le = new EAnchorClient(); break;
   case EVENT_ALLOC: le = new EAlloc(); break;
   case EVENT_EXPORT: le = new EExport; break;
   case EVENT_IMPORTSTART: le = new EImportStart; break;
index e057c1485c90657eb757b710fc2457b3e964339d..d50edd720207cda7a2a6e1d9e24f22532d9d0f33 100644 (file)
@@ -26,6 +26,7 @@
 #define EVENT_CLIENTMAP    7
 
 #define EVENT_ANCHOR       8
+#define EVENT_ANCHORCLIENT 9
 
 #define EVENT_ALLOC        10
 #define EVENT_MKNOD        11
index 10a00a64b945655ca28d8070b97759e3b44997e8..e5a60583272a51f687757eaf61d855da8f63518d 100644 (file)
@@ -1846,7 +1846,7 @@ void MDCache::trim_non_auth()
     }
   }
 
-  if (lru.lru_get_size() == 0) {
+  if (lru.lru_get_size() == 0 && root) {
     list<CDir*> ls;
     root->get_dirfrags(ls);
     for (list<CDir*>::iterator p = ls.begin();
@@ -2537,45 +2537,44 @@ int MDCache::path_traverse(filepath& origpath,
           dout(10) << "traverse: relative symlink, path now " << path << " depth " << depth << endl;
         }
         continue;        
-      } else {
-        // keep going.
+      }
 
-        // forwarder wants replicas?
-        if (is_client_req && ((MClientRequest*)req)->get_mds_wants_replica_in_dirino()) {
-          dout(30) << "traverse: REP is here, " << ((MClientRequest*)req)->get_mds_wants_replica_in_dirino() << " vs " << curdir->dirfrag() << endl;
-          
-          if (((MClientRequest*)req)->get_mds_wants_replica_in_dirino() == curdir->ino() &&
-              curdir->is_auth() && 
-              curdir->is_rep() &&
-              curdir->is_replica(req->get_source().num()) &&
-              dn->is_auth()
-              ) {
-            assert(req->get_source().is_mds());
-            int from = req->get_source().num();
-            
-            if (dn->is_replica(from)) {
-              dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by " 
-                       << req->get_source() << " dn " << *dn << endl; 
-            } else {
-              dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl;
-              MDiscoverReply *reply = new MDiscoverReply(curdir->ino());
-              reply->add_dentry( dn->replicate_to( from ) );
-             if (dn->is_primary())
-               reply->add_inode( dn->inode->replicate_to( from ) );
-              mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
-            }
-          }
-        }
-            
-        trace.push_back(dn);
-        cur = dn->inode;
-        touch_inode(cur);
-        depth++;
-        continue;
+      // forwarder wants replicas?
+      if (is_client_req && ((MClientRequest*)req)->get_mds_wants_replica_in_dirino()) {
+       dout(30) << "traverse: REP is here, " << ((MClientRequest*)req)->get_mds_wants_replica_in_dirino() << " vs " << curdir->dirfrag() << endl;
+       
+       if (((MClientRequest*)req)->get_mds_wants_replica_in_dirino() == curdir->ino() &&
+           curdir->is_auth() && 
+           curdir->is_rep() &&
+           curdir->is_replica(req->get_source().num()) &&
+           dn->is_auth()
+           ) {
+         assert(req->get_source().is_mds());
+         int from = req->get_source().num();
+         
+         if (dn->is_replica(from)) {
+           dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by " 
+                    << req->get_source() << " dn " << *dn << endl; 
+         } else {
+           dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl;
+           MDiscoverReply *reply = new MDiscoverReply(curdir->ino());
+           reply->add_dentry( dn->replicate_to( from ) );
+           if (dn->is_primary())
+             reply->add_inode( dn->inode->replicate_to( from ) );
+           mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
+         }
+       }
       }
+      
+      // add to trace, continue.
+      trace.push_back(dn);
+      cur = dn->inode;
+      touch_inode(cur);
+      depth++;
+      continue;
     }
     
-    // MISS.  don't have it.
+    // MISS.  dentry doesn't exist.
     dout(12) << "traverse: miss on dentry " << path[depth] << " in " << *curdir << endl;
     
     if (curdir->is_auth()) {
@@ -3649,10 +3648,10 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
       
       if (dn) {
         dout(7) << "had " << *dn << endl;
-       dn->replica_nonce = m->get_dentry(i).get_nonce();  // fix nonce.
+       m->get_dentry(i).update_dentry(dn);
       } else {
         dn = curdir->add_dentry( m->get_dentry(i).get_dname(), 0, false );
-       m->get_dentry(i).update_dentry(dn);
+       m->get_dentry(i).update_new_dentry(dn);
         dout(7) << "added " << *dn << endl;
       }
 
@@ -4197,6 +4196,11 @@ void MDCache::show_subtrees(int dbl)
   if (dbl > g_conf.debug && dbl > g_conf.debug_mds) 
     return;  // i won't print anything.
 
+  if (!root) {
+    dout(dbl) << "no subtrees" << endl;
+    return;
+  }
+
   list<pair<CDir*,int> > q;
   string indent;
 
index 1c3d972e9d82805f04931ca34244c6fee3016950..8d20a1dc908fd27c85c53f0279a5bee438f43e95 100644 (file)
@@ -222,11 +222,15 @@ void MDS::forward_message_mds(Message *req, int mds, int port)
 {
   // client request?
   if (req->get_type() == MSG_CLIENT_REQUEST) {
-    // tell the client
     MClientRequest *creq = (MClientRequest*)req;
     creq->inc_num_fwd();    // inc forward counter
+
+    // tell the client
     messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd()),
                            creq->get_client_inst());
+    
+    if (!creq->is_idempotent()) 
+      return;  // don't forward if non-idempotent
   }
   
   // forward
@@ -529,7 +533,11 @@ void MDS::handle_mds_map(MMDSMap *m)
       // did i just recover?
       if (oldstate == MDSMap::STATE_REJOIN) {
        dout(1) << "successful recovery!" << endl;
-       
+
+       // kick anchortable (resent AGREEs)
+       if (mdsmap->get_anchortable() == whoami) 
+         anchortable->finish_recovery();
+
        // kick anchorclient (resent COMMITs)
        anchorclient->finish_recovery();
 
@@ -625,6 +633,7 @@ void MDS::handle_mds_map(MMDSMap *m)
       if (*p == whoami) continue;         // not me
       if (oldactive.count(*p)) continue;  // newly so?
       mdcache->handle_mds_recovery(*p);
+      anchorclient->handle_mds_recovery(*p);
     }
   }
 
index f19ee448acfc2dfc6af6cae0939ff2a96fe57047..2c6d79736e8cf51b21b33793d72290de113f76de 100644 (file)
@@ -80,7 +80,7 @@ class MDSMap {
   friend class MDSMonitor;
 
  public:
-  MDSMap() : epoch(0), same_inst_since(0), anchortable(0), root(0) {}
+  MDSMap() : epoch(0), same_inst_since(0), anchortable(1), root(0) {}
 
   epoch_t get_epoch() const { return epoch; }
   void inc_epoch() { epoch++; }
index 3041505ce6655ec33792ae6fad2d53dddb157ce9..16cc509857acb4d0a0b10dccabf5cfe803fbbe8b 100644 (file)
@@ -307,7 +307,6 @@ void Server::handle_client_request(MClientRequest *req)
   case MDS_OP_TRUNCATE:
     if (!req->args.truncate.ino) break;   // can be called w/ either fh OR path
     
-  case MDS_OP_RELEASE:
   case MDS_OP_FSYNC:
     ref = mdcache->get_inode(req->args.fsync.ino);   // fixme someday no ino needed?
 
@@ -593,7 +592,7 @@ void Server::handle_client_stat(MClientRequest *req,
   int mask = req->args.stat.mask;
   if (mask & (INODE_MASK_SIZE|INODE_MASK_MTIME)) {
     // yes.  do a full stat.
-    if (!mds->locker->inode_file_read_start(ref, req))
+    if (!mds->locker->inode_file_read_start(ref, req, ref))
       return;  // syncing
     mds->locker->inode_file_read_finish(ref);
   } else {
@@ -662,7 +661,7 @@ void Server::handle_client_utime(MClientRequest *req,
   mdcache->request_auth_pin(req, cur);
 
   // write
-  if (!mds->locker->inode_file_write_start(cur, req))
+  if (!mds->locker->inode_file_write_start(cur, req, cur))
     return;  // fw or (wait for) sync
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
@@ -676,6 +675,7 @@ void Server::handle_client_utime(MClientRequest *req,
 
   // log + wait
   EUpdate *le = new EUpdate("utime");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(cur->get_parent_dir());
   inode_t *pi = le->metablob.add_dentry(cur->parent, true);
   pi->mtime = mtime;
@@ -735,7 +735,7 @@ void Server::handle_client_chmod(MClientRequest *req,
   mdcache->request_auth_pin(req, cur);
 
   // write
-  if (!mds->locker->inode_hard_write_start(cur, req))
+  if (!mds->locker->inode_hard_write_start(cur, req, cur))
     return;  // fw or (wait for) lock
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
@@ -748,6 +748,7 @@ void Server::handle_client_chmod(MClientRequest *req,
 
   // log + wait
   EUpdate *le = new EUpdate("chmod");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(cur->get_parent_dir());
   inode_t *pi = le->metablob.add_dentry(cur->parent, true);
   pi->mode = mode;
@@ -801,7 +802,7 @@ void Server::handle_client_chown(MClientRequest *req,
   mdcache->request_auth_pin(req, cur);
 
   // write
-  if (!mds->locker->inode_hard_write_start(cur, req))
+  if (!mds->locker->inode_hard_write_start(cur, req, cur))
     return;  // fw or (wait for) lock
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
@@ -815,6 +816,7 @@ void Server::handle_client_chown(MClientRequest *req,
 
   // log + wait
   EUpdate *le = new EUpdate("chown");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(cur->get_parent_dir());
   inode_t *pi = le->metablob.add_dentry(cur->parent, true);
   if (uid >= 0) pi->uid = uid;
@@ -894,7 +896,7 @@ void Server::handle_client_readdir(MClientRequest *req,
   assert(dir->is_auth());
 
   // check perm
-  if (!mds->locker->inode_hard_read_start(diri,req))
+  if (!mds->locker->inode_hard_read_start(diri, req, diri))
     return;
   mds->locker->inode_hard_read_finish(diri);
 
@@ -993,6 +995,7 @@ void Server::handle_client_mknod(MClientRequest *req, CInode *diri)
   // prepare finisher
   C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
   EUpdate *le = new EUpdate("mknod");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(dir);
   inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
   pi->version = dn->get_projected_version();
@@ -1171,6 +1174,7 @@ void Server::handle_client_mkdir(MClientRequest *req, CInode *diri)
   // prepare finisher
   C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
   EUpdate *le = new EUpdate("mkdir");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(dir);
   inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
   pi->version = dn->get_projected_version();
@@ -1227,6 +1231,7 @@ void Server::handle_client_symlink(MClientRequest *req, CInode *diri)
   // prepare finisher
   C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
   EUpdate *le = new EUpdate("symlink");
+  le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_dir_context(dir);
   inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
   pi->version = dn->get_projected_version();
@@ -1385,17 +1390,15 @@ void Server::_link_local(MClientRequest *req, CInode *diri,
   mdcache->request_auth_pin(req, targeti);
   
   // sweet.  let's get our locks.
-  // lock dentry
-  if (!mds->locker->dentry_xlock_start(dn, req, diri))
-    return;
-
-  // lock target inode
-  if (!mds->locker->inode_hard_write_start(targeti, req))
+  // lock dentry, target inode
+  if (!mds->locker->dentry_xlock_start(dn, req, diri) ||
+      !mds->locker->inode_hard_write_start(targeti, req, diri))
     return;
 
   // ok, let's do it.
   // prepare log entry
   EUpdate *le = new EUpdate("link_local");
+  le->metablob.add_client_req(req->get_reqid());
 
   // predirty
   dn->pre_dirty();
@@ -1426,7 +1429,7 @@ void Server::_link_local_finish(MClientRequest *req, CDentry *dn, CInode *target
   dout(10) << "_link_local_finish " << *dn << " to " << *targeti << endl;
 
   // link and unlock the new dentry
-  dn->set_remote_ino(targeti->ino());
+  dn->dir->link_inode(dn, targeti->ino());
   dn->set_version(dpv);
   dn->mark_dirty(dpv);
 
@@ -1713,9 +1716,8 @@ void Server::_unlink_local(MClientRequest *req, CDentry *dn, CInode *in)
     mdcache->request_auth_pin(req, in);
 
     // lock
-    if (!mds->locker->dentry_xlock_start(dn, req, dn->get_dir()->get_inode()))
-      return;
-    if (!mds->locker->inode_hard_write_start(in, req))
+    if (!mds->locker->dentry_xlock_start(dn, req, dn->get_dir()->get_inode()) ||
+       !mds->locker->inode_hard_write_start(in, req, dn->get_dir()->get_inode()))
       return;
   } else {
     // the inode will go away.
@@ -1729,6 +1731,7 @@ void Server::_unlink_local(MClientRequest *req, CDentry *dn, CInode *in)
   // ok, let's do it.
   // prepare log entry
   EUpdate *le = new EUpdate("unlink_local");
+  le->metablob.add_client_req(req->get_reqid());
 
   // predirty
   version_t ipv = in->pre_dirty();
@@ -2411,7 +2414,7 @@ void Server::handle_client_truncate(MClientRequest *req, CInode *cur)
   mdcache->request_auth_pin(req, cur);
 
   // write
-  if (!mds->locker->inode_file_write_start(cur, req))
+  if (!mds->locker->inode_file_write_start(cur, req, cur))
     return;  // fw or (wait for) lock
 
   // check permissions
@@ -2440,20 +2443,20 @@ void Server::handle_client_truncate(MClientRequest *req, CInode *cur)
 void Server::handle_client_open(MClientRequest *req, CInode *cur)
 {
   int flags = req->args.open.flags;
-  int mode = req->args.open.mode;
+  int cmode = req->get_open_file_mode();
 
   dout(7) << "open " << flags << " on " << *cur << endl;
-  dout(10) << "open flags = " << flags << "  mode = " << mode << endl;
+  dout(10) << "open flags = " << flags << "  filemode = " << cmode << endl;
 
   // is it a file?
-  if (!(cur->inode.mode & INODE_MODE_FILE)) {
+  if (!(cmode & INODE_MODE_FILE)) {
     dout(7) << "not a regular file" << endl;
     reply_request(req, -EINVAL);                 // FIXME what error do we want?
     return;
   }
 
   // auth for write access
-  if (mode != FILE_MODE_R && mode != FILE_MODE_LAZY &&
+  if (cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY &&
       !cur->is_auth()) {
     int auth = cur->authority().first;
     assert(auth != mds->get_nodeid());
@@ -2474,7 +2477,7 @@ void Server::handle_client_open(MClientRequest *req, CInode *cur)
     mdcache->request_auth_pin(req, cur);
 
     // write
-    if (!mds->locker->inode_file_write_start(cur, req))
+    if (!mds->locker->inode_file_write_start(cur, req, cur))
       return;  // fw or (wait for) lock
     
     // do update
@@ -2490,7 +2493,7 @@ void Server::handle_client_open(MClientRequest *req, CInode *cur)
 
   // can we issue the caps they want?
   version_t fdv = mds->locker->issue_file_data_version(cur);
-  Capability *cap = mds->locker->issue_new_caps(cur, mode, req);
+  Capability *cap = mds->locker->issue_new_caps(cur, cmode, req);
   if (!cap) return; // can't issue (yet), so wait!
 
   dout(12) << "open gets caps " << cap_string(cap->pending()) << " for " << req->get_source() << " on " << *cur << endl;
@@ -2571,6 +2574,7 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri)
     // prepare finisher
     C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, req, dn, in);
     EUpdate *le = new EUpdate("openc");
+    le->metablob.add_client_req(req->get_reqid());
     le->metablob.add_dir_context(dir);
     inode_t *pi = le->metablob.add_primary_dentry(dn, true, in);
     pi->version = dn->get_projected_version();
index cd0098df7c7bce8b801ac484d4665d91cf182114..f65ebfb1f8ff4bb41ec228eb412ff84931ccd638 100644 (file)
@@ -28,15 +28,16 @@ protected:
   version_t atid; 
   vector<Anchor> trace;
   version_t version;    // anchor table version
+  int reqmds;
 
  public:
   EAnchor() : LogEvent(EVENT_ANCHOR) { }
-  EAnchor(int o, inodeno_t i, version_t v) :
+  EAnchor(int o, inodeno_t i, version_t v, int rm) :
     LogEvent(EVENT_ANCHOR),
-    op(o), ino(i), atid(0), version(v) { }
-  EAnchor(int o, version_t a, version_t v=0) :
+    op(o), ino(i), atid(0), version(v), reqmds(rm) { }
+  EAnchor(int o, version_t a, version_t v) :
     LogEvent(EVENT_ANCHOR),
-    op(o), atid(a), version(v) { }
+    op(o), atid(a), version(v), reqmds(-1) { }
 
   void set_trace(vector<Anchor>& t) { trace = t; }
   vector<Anchor>& get_trace() { return trace; }
@@ -47,6 +48,7 @@ protected:
     bl.append((char*)&atid, sizeof(atid));
     ::_encode(trace, bl);
     bl.append((char*)&version, sizeof(version));
+    bl.append((char*)&reqmds, sizeof(reqmds));
   }
   void decode_payload(bufferlist& bl, int& off) {
     bl.copy(off, sizeof(op), (char*)&op);
@@ -58,6 +60,8 @@ protected:
     ::_decode(trace, bl, off);
     bl.copy(off, sizeof(version), (char*)&version);
     off += sizeof(version);
+    bl.copy(off, sizeof(reqmds), (char*)&reqmds);
+    off += sizeof(reqmds);
   }
 
   void print(ostream& out) {
@@ -65,6 +69,7 @@ protected:
     if (ino) out << " " << ino;
     if (atid) out << " atid " << atid;
     if (version) out << " v " << version;
+    if (reqmds >= 0) out << " by mds" << reqmds;
   }  
 
   bool has_expired(MDS *mds);
diff --git a/branches/sage/cephmds2/mds/events/EAnchorClient.h b/branches/sage/cephmds2/mds/events/EAnchorClient.h
new file mode 100644 (file)
index 0000000..111a341
--- /dev/null
@@ -0,0 +1,57 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_EANCHORCLIENT_H
+#define __MDS_EANCHORCLIENT_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+#include "../Anchor.h"
+
+class EAnchorClient : public LogEvent {
+protected:
+  int op;
+  version_t atid; 
+
+ public:
+  EAnchorClient() : LogEvent(EVENT_ANCHORCLIENT) { }
+  EAnchorClient(int o, version_t at) :
+    LogEvent(EVENT_ANCHORCLIENT),
+    op(o), atid(at) { }
+
+  void encode_payload(bufferlist& bl) {
+    bl.append((char*)&op, sizeof(op));
+    bl.append((char*)&atid, sizeof(atid));
+  }
+  void decode_payload(bufferlist& bl, int& off) {
+    bl.copy(off, sizeof(op), (char*)&op);
+    off += sizeof(op);
+    bl.copy(off, sizeof(atid), (char*)&atid);
+    off += sizeof(atid);
+  }
+
+  void print(ostream& out) {
+    out << "EAnchorClient " << get_anchor_opname(op);
+    if (atid) out << " atid " << atid;
+  }  
+
+  bool has_expired(MDS *mds);
+  void expire(MDS *mds, Context *c);
+  void replay(MDS *mds);
+  
+};
+
+#endif
index 1286202777a44f295151a6042dc13b49f97bf8bc..df83df56e113f8f35a0cab5fd8b2498dac6a3917 100644 (file)
@@ -21,7 +21,7 @@ using namespace std;
 #include "../CInode.h"
 #include "../CDir.h"
 #include "../CDentry.h"
-
+#include "include/reqid.h"
 
 class MDS;
 
@@ -215,8 +215,15 @@ class EMetaBlob {
   // inodes i've destroyed.
   list<inode_t>           destroyed_inodes;
 
+  // idempotent op(s)
+  list<reqid_t> client_reqs;
+
  public:
 
+  void add_client_req(reqid_t r) {
+    client_reqs.push_back(r);
+  }
+
   void add_anchor_transaction(version_t atid) {
     atids.push_back(atid);
   }  
@@ -338,6 +345,7 @@ class EMetaBlob {
     }
     ::_encode(atids, bl);
     ::_encode(destroyed_inodes, bl);
+    ::_encode(client_reqs, bl);
   } 
   void _decode(bufferlist& bl, int& off) {
     int n;
@@ -352,6 +360,7 @@ class EMetaBlob {
     }
     ::_decode(atids, bl, off);
     ::_decode(destroyed_inodes, bl, off);
+    ::_decode(client_reqs, bl, off);
   }
   
   void print(ostream& out) const {
index 00a9655726a381c0e4b9253747bc43a954f03ede..6cee1656dfeebc6f27bb12569314c61f0417a9a7 100644 (file)
@@ -16,6 +16,7 @@
 #include "events/EMetaBlob.h"
 #include "events/EAlloc.h"
 #include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
 #include "events/EUpdate.h"
 #include "events/EImportMap.h"
 
@@ -528,23 +529,18 @@ void EAnchor::replay(MDS *mds)
     switch (op) {
       // anchortable
     case ANCHOR_OP_CREATE_PREPARE:
-      mds->anchortable->create_prepare(ino, trace);
+      mds->anchortable->create_prepare(ino, trace, reqmds);
       break;
     case ANCHOR_OP_DESTROY_PREPARE:
-      mds->anchortable->destroy_prepare(ino);
+      mds->anchortable->destroy_prepare(ino, reqmds);
       break;
     case ANCHOR_OP_UPDATE_PREPARE:
-      mds->anchortable->update_prepare(ino, trace);
+      mds->anchortable->update_prepare(ino, trace, reqmds);
       break;
     case ANCHOR_OP_COMMIT:
       mds->anchortable->commit(atid);
       break;
 
-      // anchorclient
-    case ANCHOR_OP_ACK:
-      mds->anchorclient->got_journaled_ack(atid);
-      break;
-
     default:
       assert(0);
     }
@@ -554,6 +550,34 @@ void EAnchor::replay(MDS *mds)
 }
 
 
+// EAnchorClient
+
+bool EAnchorClient::has_expired(MDS *mds) 
+{
+  return true;
+}
+
+void EAnchorClient::expire(MDS *mds, Context *c)
+{
+  assert(0);
+}
+
+void EAnchorClient::replay(MDS *mds)
+{
+  dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << endl;
+    
+  switch (op) {
+    // anchorclient
+  case ANCHOR_OP_ACK:
+    mds->anchorclient->got_journaled_ack(atid);
+    break;
+    
+  default:
+    assert(0);
+  }
+}
+
+
 // -----------------------
 // EUpdate
 
index 59d12ef9d4994acd39ad48a4496c7fe07f33dfdb..d1282efad184eeba53cc8e7637b8573f95076211 100644 (file)
@@ -40,6 +40,7 @@
 #include <utime.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <fcntl.h>
 
 
 // md ops
 
 #define MDS_OP_STAT     100
 #define MDS_OP_LSTAT    101
-#define MDS_OP_UTIME    102
-#define MDS_OP_CHMOD    103
-#define MDS_OP_CHOWN    104  
+#define MDS_OP_UTIME    1102
+#define MDS_OP_CHMOD    1103
+#define MDS_OP_CHOWN    1104  
 
 
 #define MDS_OP_READDIR  200
-#define MDS_OP_MKNOD    201
-#define MDS_OP_LINK     202
-#define MDS_OP_UNLINK   203
-#define MDS_OP_RENAME   204
+#define MDS_OP_MKNOD    1201
+#define MDS_OP_LINK     1202
+#define MDS_OP_UNLINK   1203
+#define MDS_OP_RENAME   1204
 
-#define MDS_OP_MKDIR    220
-#define MDS_OP_RMDIR    221
-#define MDS_OP_SYMLINK  222
+#define MDS_OP_MKDIR    1220
+#define MDS_OP_RMDIR    1221
+#define MDS_OP_SYMLINK  1222
 
 #define MDS_OP_OPEN     301
-#define MDS_OP_TRUNCATE 306
+#define MDS_OP_TRUNCATE 1306
 #define MDS_OP_FSYNC    307
-#define MDS_OP_RELEASE  308
+
+#define MDS_OP_RELEASE  308  // used only by SyntheticClient op_dist thinger
 
 
 class MClientRequest : public Message {
@@ -132,6 +134,31 @@ class MClientRequest : public Message {
     this->st.client_inst = ci;
   }
 
+  reqid_t get_reqid() {
+    // FIXME: for now, assume clients always have 1 incarnation
+    return reqid_t(st.client_inst.name, 1, st.tid); 
+  }
+
+  int get_open_file_mode() {
+    if (args.open.flags & O_LAZY) 
+      return FILE_MODE_LAZY;
+    if (args.open.flags & O_WRONLY) 
+      return FILE_MODE_W;
+    if (args.open.flags & O_RDWR) 
+      return FILE_MODE_RW;
+    if (args.open.flags & O_APPEND) 
+      return FILE_MODE_W;
+    return FILE_MODE_R;
+  }
+  bool open_file_mode_is_readonly() {
+    return get_open_file_mode() == FILE_MODE_R;
+  }
+  bool is_idempotent() {
+    if (st.op == MDS_OP_OPEN) 
+      return open_file_mode_is_readonly();
+    return (st.op < 1000);
+  }
+
   // normal fields
   void set_tid(long t) { st.tid = t; }
   void inc_num_fwd() { st.num_fwd++; }
@@ -219,8 +246,8 @@ class MClientRequest : public Message {
       out << "truncate"; break;
     case MDS_OP_FSYNC: 
       out << "fsync"; break;
-    case MDS_OP_RELEASE: 
-      out << "release"; break;
+      //    case MDS_OP_RELEASE: 
+      //out << "release"; break;
     default: 
       out << "unknown=" << get_op();
     }
index 14c76fa26a7f7de6eaa88d43667850fff82c3f10..69e2b889c6d22058fe30739dbe5acb160750751a 100644 (file)
@@ -34,7 +34,8 @@ class MClientRequestForward : public Message {
   void print(ostream& o) {
     o << "client_request_forward(" << tid
       << " to " << dest_mds
-      << " num_fwd=" << num_fwd << ")";
+      << " num_fwd=" << num_fwd
+      << ")";
   }
 
   void encode_payload() {
index 6803f74d80539d28aaa04896c787c691a6b631a2..1839b26683cae313cca9d3d04e6aabcda7cf4cfc 100644 (file)
@@ -137,8 +137,8 @@ private:
 
     void close();
     void join() {
-      writer_thread.join();
-      reader_thread.join();
+      if (writer_thread.is_started()) writer_thread.join();
+      if (reader_thread.is_started()) reader_thread.join();
     }
 
     void send(Message *m) {