]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
uclient: resend cap flushes on mds restart
authorSage Weil <sage@newdream.net>
Thu, 9 Jul 2009 18:17:39 +0000 (11:17 -0700)
committerSage Weil <sage@newdream.net>
Thu, 9 Jul 2009 18:17:39 +0000 (11:17 -0700)
Fuse doesn't implement sync_fs, so this is difficult to test. :/

src/client/Client.cc
src/client/Client.h

index d3de095241f55f9c049a4513763890f3aee3e716..d0d1dcb8ca8d3c33b6268e65b198beb9a4ba0532 100644 (file)
@@ -122,6 +122,8 @@ Client::Client(Messenger *m, MonClient *mc) : timer(client_lock), client_lock("C
   // 
   root = 0;
 
+  num_flushing_caps = 0;
+
   lru.lru_set_max(g_conf.client_cache_size);
 
   // file handles
@@ -1158,26 +1160,31 @@ void Client::handle_mds_map(MMDSMap* m)
   }  
 
   dout(1) << "handle_mds_map epoch " << m->get_epoch() << dendl;
+
+  MDSMap *oldmap = mdsmap;
+  mdsmap = new MDSMap;
   mdsmap->decode(m->get_encoded());
 
   // reset session
   for (map<int,MDSSession>::iterator p = mds_sessions.begin();
        p != mds_sessions.end();
-       p++)
+       p++) {
+    int oldstate = oldmap->get_state(p->first);
+    int newstate = mdsmap->get_state(p->first);
     if (!mdsmap->is_up(p->first) ||
-       mdsmap->get_inst(p->first) != p->second.inst)
+       mdsmap->get_inst(p->first) != p->second.inst) {
       messenger->mark_down(p->second.inst.addr);
-  
-  // send reconnect?
-  if (frommds >= 0 && 
-      mdsmap->get_state(frommds) == MDSMap::STATE_RECONNECT) {
-    send_reconnect(frommds);
-  }
+    } else if (oldstate == newstate)
+      continue;  // no change
+    
+    if (newstate == MDSMap::STATE_RECONNECT)
+      send_reconnect(p->first);
 
-  // kick requests?
-  if (frommds >= 0 &&
-      mdsmap->get_state(frommds) == MDSMap::STATE_ACTIVE) {
-    kick_requests(frommds, false);
+    if (oldstate < MDSMap::STATE_ACTIVE &&
+       newstate >= MDSMap::STATE_ACTIVE) {
+      kick_requests(p->first, false);
+      kick_flushing_caps(p->first);
+    }
   }
 
   // kick any waiting threads
@@ -1185,6 +1192,7 @@ void Client::handle_mds_map(MMDSMap* m)
   ls.swap(waiting_for_mdsmap);
   signal_cond_list(ls);
 
+  delete oldmap;
   delete m;
 }
 
@@ -1263,7 +1271,8 @@ void Client::kick_requests(int mds, bool signal)
     }
 }
 
-void Client::resend_unsafe_requests(int mds_num) {
+void Client::resend_unsafe_requests(int mds_num)
+{
   MDSSession& mds = mds_sessions[mds_num];
   for (xlist<MetaRequest*>::iterator iter = mds.unsafe_requests.begin();
        !iter.end();
@@ -1271,6 +1280,9 @@ void Client::resend_unsafe_requests(int mds_num) {
     send_request(*iter, mds_num);
 }
 
+
+
+
 /************
  * leases
  */
@@ -1573,6 +1585,12 @@ void Client::check_caps(Inode *in, bool is_delayed)
   ack:
     if (cap == in->auth_cap) {
       flush = in->dirty_caps;
+      if (flush && !in->flushing_caps) {
+       dout(10) << " " << *in << " flushing" << dendl;
+       mds_sessions[mds].flushing_caps.push_back(&in->flushing_cap_item);
+       in->get();
+       num_flushing_caps++;
+      }
       in->flushing_caps |= flush;
       in->dirty_caps = 0;
       dout(10) << " flushing " << ccap_string(flush) << dendl;
@@ -1895,6 +1913,43 @@ void Client::mark_caps_dirty(Inode *in, int caps)
   in->dirty_caps |= caps;
 }
 
+void Client::flush_caps()
+{
+  dout(10) << "flush_caps" << dendl;
+  xlist<Inode*>::iterator p = delayed_caps.begin();
+  while (!p.end()) {
+    Inode *in = *p;
+    ++p;
+    delayed_caps.pop_front();
+    check_caps(in, true);
+  }
+
+  // other caps, too
+  p = cap_list.begin();
+  while (!p.end()) {
+    Inode *in = *p;
+    ++p;
+    check_caps(in, true);
+  }
+}
+
+void Client::kick_flushing_caps(int mds)
+{
+  dout(10) << "kick_flushing_caps" << dendl;
+  MDSSession *session = &mds_sessions[mds];
+
+  for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
+    Inode *in = *p;
+    dout(20) << " reflushing caps on " << *in << " to mds" << mds << dendl;
+    InodeCap *cap = in->auth_cap;
+    assert(cap->session == session);
+    send_cap(in, mds, cap, in->caps_used(), in->caps_wanted(), 
+            cap->issued | cap->implemented,
+            in->flushing_caps);
+  }
+}
+
+
 void SnapRealm::build_snap_context()
 {
   set<snapid_t> snaps;
@@ -2242,6 +2297,16 @@ void Client::handle_cap_flush_ack(Inode *in, int mds, InodeCap *cap, MClientCaps
     dout(5) << "  flushing_caps " << ccap_string(in->flushing_caps)
            << " -> " << ccap_string(in->flushing_caps & ~cleaned) << dendl;
     in->flushing_caps &= ~cleaned;
+    if (in->flushing_caps == 0) {
+      dout(10) << " " << *in << " !flushing" << dendl;
+      in->flushing_cap_item.remove_myself();
+      num_flushing_caps--;
+      put_inode(in);
+      if (!num_flushing_caps)
+       sync_cond.Signal();
+      else
+       dout(20) << " still " << num_flushing_caps << " more flushing caps" << dendl;
+    }
     if (!in->caps_dirty())
       put_inode(in);
   }
@@ -2475,22 +2540,7 @@ int Client::unmount()
     }
   }
 
-  // flush delayed caps
-  xlist<Inode*>::iterator p = delayed_caps.begin();
-  while (!p.end()) {
-    Inode *in = *p;
-    ++p;
-    delayed_caps.pop_front();
-    check_caps(in, true);
-  }
-
-  // other caps, too
-  p = cap_list.begin();
-  while (!p.end()) {
-    Inode *in = *p;
-    ++p;
-    check_caps(in, true);
-  }
+  flush_caps();
 
   //if (0) {// hack
   while (lru.lru_get_size() > 0 || 
@@ -4152,6 +4202,21 @@ int Client::ll_statfs(vinodeno_t vino, struct statvfs *stbuf)
 int Client::_sync_fs()
 {
   dout(10) << "_sync_fs" << dendl;
+
+  // wait for unsafe mds requests
+  // FIXME
+  
+  // flush caps
+  flush_caps();
+  while (num_flushing_caps) {
+    // FIXME: starvation
+    dout(10) << "waiting on " << num_flushing_caps << " flushing caps" << dendl;
+    sync_cond.Wait(client_lock);
+  }
+
+  // flush file data
+  // FIXME
+
   return 0;
 }
 
index ebf681389f0fb3b5eb3f46112a6ac8e9348eea3d..a0a7718efabffae5a5a14b3585348c42d8653b61 100644 (file)
@@ -143,6 +143,7 @@ struct MDSSession {
   bool was_stale;
 
   xlist<InodeCap*> caps;
+  xlist<Inode*> flushing_caps;
   xlist<MetaRequest*> unsafe_requests;
 
   MClientCapRelease *release;
@@ -280,7 +281,7 @@ class Inode {
   int exporting_mds;
   ceph_seq_t exporting_mseq;
   utime_t hold_caps_until;
-  xlist<Inode*>::item cap_item;
+  xlist<Inode*>::item cap_item, flushing_cap_item;
 
   SnapRealm *snaprealm;
   xlist<Inode*>::item snaprealm_item;
@@ -362,7 +363,7 @@ class Inode {
     dirty_caps(0), flushing_caps(0), shared_gen(0), cache_gen(0),
     snap_caps(0), snap_cap_refs(0),
     exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
-    cap_item(this),
+    cap_item(this), flushing_cap_item(this),
     snaprealm(0), snaprealm_item(this), snapdir_parent(0),
     reported_size(0), wanted_max_size(0), requested_max_size(0),
     ref(0), ll_ref(0), 
@@ -725,6 +726,7 @@ protected:
 
   // all inodes with caps sit on either cap_list or delayed_caps.
   xlist<Inode*> delayed_caps, cap_list;
+  int num_flushing_caps;
   hash_map<inodeno_t,SnapRealm*> snap_realms;
 
   SnapRealm *get_snap_realm(inodeno_t r) {
@@ -887,7 +889,7 @@ protected:
   ofstream traceout;
 
 
-  Cond mount_cond;
+  Cond mount_cond, sync_cond;
 
 
   // friends
@@ -920,6 +922,8 @@ protected:
   void remove_session_caps(int mds_num);
   void trim_caps(int mds, int max);
   void mark_caps_dirty(Inode *in, int caps);
+  void flush_caps();
+  void kick_flushing_caps(int mds);
 
   void maybe_update_snaprealm(SnapRealm *realm, snapid_t snap_created, snapid_t snap_highwater, 
                              vector<snapid_t>& snaps);