]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
uclient: DOESN'T WORK, but more ref counting stuff. Now attempts to resend unsafe...
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 1 Jul 2009 15:10:12 +0000 (08:10 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 1 Jul 2009 15:33:48 +0000 (08:33 -0700)
src/client/Client.cc
src/client/Client.h
src/messages/MClientRequest.h

index f95232a0d3bb58c74976a50c522877edb809b851..2bc2c4944407c9c71ba58941f0b955da037f5e30 100644 (file)
@@ -770,7 +770,7 @@ MClientReply *Client::make_request(MClientRequest *req,
   req->set_caller_uid(uid);
   req->set_caller_gid(gid);
 
-  // encode payload now, in case we have to resend (in case of mds failure)
+  // encode payload now, in case we have to resend(in case of mds failure)
   req->encode_payload();
   request->request_payload = req->get_payload();
 
@@ -861,7 +861,7 @@ MClientReply *Client::make_request(MClientRequest *req,
 
   // insert trace
   utime_t from = request->sent_stamp;
-  Inode *target = insert_trace(request, from, mds);
+  Inode *target = insert_trace(request->get(), from, mds);
   if (ptarget)
     *ptarget = target;
 
@@ -1022,11 +1022,11 @@ void Client::handle_client_reply(MClientReply *reply)
   int mds_num = reply->get_source().num();
   MetaRequest *request = mds_requests[tid]->get();
   assert(request);
-
+  
   // store reply
-  if (!request->reply && !reply->is_safe()) //safe replies have no useful info
-    request->reply = reply;
-
+  //  if (!request->reply && !reply->is_safe()) //safe replies have no useful info
+  request->reply = reply;
+  
   if ((request->got_unsafe && !reply->is_safe())
       || (request->got_safe && reply->is_safe())) {
     //duplicate response
@@ -1035,21 +1035,23 @@ void Client::handle_client_reply(MClientReply *reply)
     request->put();
     return;
   }
-
-
+  
   if(reply->is_safe()) {
     //the filesystem change is committed to disk
     request->got_safe = true;
-    if (request->got_unsafe)
+    if (request->got_unsafe) {
       //we're done, clean up
+      request->remove_from_unsafe_list();
       goto cleanup;
+    }
   }
 
   if(!reply->is_safe()) {
     request->got_unsafe = true;
     if(request->got_safe)
-      //we already kicked, so don't do that, just clean up
+      //we already kicked, so just clean up
       goto cleanup;
+    mds_sessions[mds_num].unsafe_requests.push_back(request->get_meta_item());
   }
   if(request->got_safe ^ request->got_unsafe) {
     Cond cond;
@@ -1064,6 +1066,7 @@ void Client::handle_client_reply(MClientReply *reply)
       cond.Wait(client_lock);
     }
   }
+
  cleanup:
   request->put();
 }
@@ -1229,9 +1232,10 @@ void Client::send_reconnect(int mds)
       }
     }
 
-
     // reset my cap seq number
     mds_sessions[mds].seq = 0;
+
+    resend_unsafe_requests(mds);
   } else {
     dout(10) << " i had no session with this mds" << dendl;
     m->closed = true;
@@ -1253,12 +1257,31 @@ void Client::kick_requests(int mds, bool signal)
        p->second->caller_cond->Signal();
       }
       else {
-       send_request(p->second, mds);
+       send_request(p->second->get(), mds);
       }
     }
 }
 
-
+void Client::resend_unsafe_requests(int mds_num) {
+  MDSSession& mds = mds_sessions[mds_num];
+  MetaRequest* current = mds.unsafe_requests.front()->get();
+  MClientRequest *m;
+  while (current) {
+    current->remove_from_unsafe_list();
+    m = new MClientRequest;
+    m->copy_payload(current->request_payload);
+    m->decode_payload();
+    m->set_retry_attempt(current->retry_attempt);
+    m->set_dentry_wanted();
+    m->set_replayed_op();
+    current->request = m;
+    current->got_unsafe = false;
+    current->got_safe = false;
+    send_request(current->get(), mds_num);
+    current->put();
+    current = mds.unsafe_requests.front();
+  }
+}
 
 /************
  * leases
index 6430fa7055e3e5d4b0dacf9b09c208519f11f4e6..9bdec886659b465a02069014774fab6fb05ec9ba 100644 (file)
@@ -89,6 +89,69 @@ extern class Logger  *client_logger;
  
 */
 struct InodeCap;
+
+struct MetaRequest {
+  tid_t tid;
+  MClientRequest *request;    
+  bufferlist request_payload;  // in case i have to retry
+  
+  int uid, gid;
+  
+  utime_t  sent_stamp;
+  set<int> mds;                // who i am asking
+  int      resend_mds;         // someone wants you to (re)send the request here
+  int      num_fwd;            // # of times i've been forwarded
+  int      retry_attempt;
+  int      ref;
+  
+  MClientReply *reply;         // the reply
+  
+  //possible responses
+  bool got_safe;
+  bool got_unsafe;
+
+private:
+  xlist<MetaRequest*>::item unsafe_item;
+public:
+  Cond  *caller_cond;          // who to take up
+  Cond  *dispatch_cond;        // who to kick back
+
+  MetaRequest(MClientRequest *req, tid_t t) : 
+    tid(t), request(req), 
+    resend_mds(-1), num_fwd(0), retry_attempt(0),
+    ref(1), reply(0), 
+    got_safe(false), got_unsafe(false), unsafe_item(this),
+    caller_cond(0), dispatch_cond(0) { }
+
+  MetaRequest* get() {
+    ++ref;
+    cerr << "Get called on MetaRequest tid " << tid
+        << "Refcount is " << ref
+        << " Type is " << request->head.op << std::endl;
+    return this; }
+
+  void put() {
+    cerr << "Put called on MetaRequest tid " << tid;
+    if (--ref == 0) {
+      cerr << "MetaRequest tid" << tid << " deleting." << std::endl;
+      delete this;
+    }
+    cerr << "Refcount is " << ref
+        << " Type is " << request->head.op << std::endl;
+  }
+
+  xlist<MetaRequest*>::item * get_meta_item() {
+    get();
+    return &unsafe_item;
+  }
+
+  void remove_from_unsafe_list() {
+    unsafe_item.remove_myself();
+    put();
+  }
+};
+
+
 struct MDSSession {
   version_t seq;
   __u64 cap_gen;
@@ -99,6 +162,7 @@ struct MDSSession {
   bool was_stale;
 
   xlist<InodeCap*> caps;
+  xlist<MetaRequest*> unsafe_requests;
 
   MClientCapRelease *release;
   
@@ -614,52 +678,10 @@ public:
   void got_mds_push(int mds);
   void handle_client_session(MClientSession *m);
   void send_reconnect(int mds);
+  void resend_unsafe_requests(int mds);
 
   // mds requests
-  struct MetaRequest {
-    tid_t tid;
-    MClientRequest *request;    
-    bufferlist request_payload;  // in case i have to retry
-    
-    int uid, gid;
-
-    utime_t  sent_stamp;
-    set<int> mds;                // who i am asking
-    int      resend_mds;         // someone wants you to (re)send the request here
-    int      num_fwd;            // # of times i've been forwarded
-    int      retry_attempt;
-    int      ref;
-
-    MClientReply *reply;         // the reply
-
-    //possible responses
-    bool got_safe;
-    bool got_unsafe;
-
-    Cond  *caller_cond;          // who to take up
-    Cond  *dispatch_cond;        // who to kick back
-
-    MetaRequest(MClientRequest *req, tid_t t) : 
-      tid(t), request(req), 
-      resend_mds(-1), num_fwd(0), retry_attempt(0),
-      ref(1), reply(0), 
-      got_safe(false), got_unsafe(false),
-      caller_cond(0), dispatch_cond(0) { }
-
-    MetaRequest* get() {
-      ++ref;
-      dout(20) << "Get called on MetaRequest " << this << std::endl
-          << "Refcount is " << ref << dendl;
-      return this; }
-    void put() {
-      dout(20) << "Put called on MetaRequest " << this << dendl;
-      if (--ref == 0) {
-       dout(20) << "MetaRequest " << this << " deleting." << dendl;
-       delete this;
-      }
-      dout(20) << "Refcount is " << ref << dendl;
-    }
-  };
+
   tid_t last_tid;
   map<tid_t, MetaRequest*> mds_requests;
   set<int>                 failed_mds;
index cca6cc35a75354e5ed5202608bd2e88b5fc74c29..c7b56277144b8cc16bceef9b5f8901c9aa6c7958 100644 (file)
@@ -133,6 +133,9 @@ public:
   void set_dentry_wanted() {
     head.flags = head.flags | CEPH_MDS_FLAG_WANT_DENTRY;
   }
+  void set_replayed_op() {
+    head.flags = head.flags | CEPH_MDS_FLAG_REPLAY;
+  }
     
   tid_t get_tid() { return head.tid; }
   tid_t get_oldest_client_tid() { return head.oldest_client_tid; }