]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: flush commits for caching mode behaves?
authorSage Weil <sage@newdream.net>
Thu, 15 May 2008 18:25:36 +0000 (11:25 -0700)
committerSage Weil <sage@newdream.net>
Thu, 15 May 2008 18:25:36 +0000 (11:25 -0700)
src/client/Client.cc
src/client/Client.h

index 18590797fa0e49f817ba7e6726552c9191af4a2a..abc9c39af3bb0c4514cd3d03fa150e796d151e64 100644 (file)
@@ -900,10 +900,7 @@ void Client::handle_client_session(MClientSession *m)
   }
 
   // kick waiting threads
-  for (list<Cond*>::iterator p = waiting_for_session[from].begin();
-       p != waiting_for_session[from].end();
-       ++p)
-    (*p)->Signal();
+  signal_cond_list(waiting_for_session[from]);
   waiting_for_session.erase(from);
 
   delete m;
@@ -1142,8 +1139,7 @@ void Client::handle_mds_map(MMDSMap* m)
   // kick any waiting threads
   list<Cond*> ls;
   ls.swap(waiting_for_mdsmap);
-  for (list<Cond*>::iterator p = ls.begin(); p != ls.end(); ++p)
-    (*p)->Signal();
+  signal_cond_list(ls);
 
   delete m;
 }
@@ -1393,17 +1389,6 @@ void Client::check_caps(Inode *in)
 }
 
 
-class C_Client_ImplementedCaps : public Context {
-  Client *client;
-  MClientFileCaps *msg;
-  Inode *in;
-public:
-  C_Client_ImplementedCaps(Client *c, MClientFileCaps *m, Inode *i) : client(c), msg(m), in(i) {}
-  void finish(int r) {
-    client->implemented_caps(msg,in);
-  }
-};
-
 
 void Client::wait_on_list(list<Cond*>& ls)
 {
@@ -1420,6 +1405,38 @@ void Client::signal_cond_list(list<Cond*>& ls)
 }
 
 
+
+// flush dirty data (from objectcache)
+
+struct C_Flush : public Context {
+  Client *client;
+  Inode *in;
+  C_Flush(Client *c, Inode *i) : client(c), in(i) {}
+  void finish(int r) {
+    client->_flushed(in);
+  }
+};
+
+void Client::_flush(Inode *in)
+{
+  dout(10) << "_flush " << *in << dendl;
+  if (in->cap_refs[CEPH_CAP_WRBUFFER] == 1) {
+    in->get_cap_ref(CEPH_CAP_WRBUFFER);  // for the (one!) waiter
+    in->fc.flush_dirty(0);
+    in->fc.add_safe_waiter(new C_Flush(this,  in));
+  }
+}
+
+void Client::_flushed(Inode *in)
+{
+  dout(10) << "_flushed " << *in << dendl;
+  assert(in->cap_refs[CEPH_CAP_WRBUFFER] == 2);
+  put_cap_ref(in, CEPH_CAP_WRBUFFER);
+  put_cap_ref(in, CEPH_CAP_WRBUFFER);
+}
+
+
+
 /** handle_file_caps
  * handle caps update from mds.  including mds to mds caps transitions.
  * do not block.
@@ -1600,17 +1617,16 @@ void Client::handle_file_caps(MClientFileCaps *m)
   cap.seq = m->get_seq();
 
   bool ack = false;
-  bool invalidate = false;
-  bool writeback = false;
-
+  
   if (old_caps & ~new_caps) { 
     dout(10) << "  revocation of " << cap_string(~new_caps & old_caps) << dendl;
     cap.issued = new_caps;
 
     if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
-      invalidate = true;
+      in->fc.release_clean();
+
     if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
-      writeback = true;
+      _flush(in);
     else {
       ack = true;
       cap.implemented = new_caps;
@@ -1635,26 +1651,11 @@ void Client::handle_file_caps(MClientFileCaps *m)
 
   if (ack)
     messenger->send_message(m, m->get_source_inst());
-
-  if (invalidate)
-    in->fc.release_clean();
-
-  if (writeback)
-    in->fc.flush_dirty();
-
-}
-
-void Client::implemented_caps(MClientFileCaps *m, Inode *in)
-{
-  dout(5) << "implemented_caps " << cap_string(m->get_caps()) 
-          << ", acking to " << m->get_source() << dendl;
-
-  messenger->send_message(m, m->get_source_inst());
+  else
+    delete m;
 }
 
 
-
-
 // -------------------
 // MOUNT
 
@@ -3077,7 +3078,7 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
     // do sync read
     Cond cond;
     bool done = false;
-    C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
+    Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
 
     Objecter::OSDRead *rd = filer->prepare_read(in->inode, offset, size, bl, 0);
     if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
@@ -3213,12 +3214,16 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
 
   if (g_conf.client_oc) {
     // buffer cache
+    
+    if (in->cap_refs[CEPH_CAP_WRBUFFER] == 0)
+      in->get_cap_ref(CEPH_CAP_WRBUFFER);
+
     in->fc.write(offset, size, blist, client_lock);
   } else {
     // simple, non-atomic sync write
     Cond cond;
     bool done = false;
-    Context *onfinish = new C_Cond(&cond, &done);
+    Context *onfinish = new C_SafeCond(&client_lock, &cond, &done);
     Context *onsafe = new C_Client_SyncCommit(this, in);
 
     unsafe_sync_write++;
@@ -3330,27 +3335,21 @@ int Client::_fsync(Fh *f, bool syncdataonly)
 
   Inode *in = f->inode;
 
+  dout(3) << "_fsync(" << f << ", " << (syndataonly ? "dataonly)":"data+metadata)") << dendl;
+  
   // metadata?
-  if (!syncdataonly) {
+  if (!syncdataonly)
     dout(0) << "fsync - not syncing metadata yet.. implement me" << dendl;
-  }
 
-  if (g_conf.client_oc) {
-    // data?
-    Cond cond;
-    bool done = false;
-    if (!objectcacher->commit_set(in->ino(),
-                                 new C_Cond(&cond, &done))) {
-      // wait for callback
-      while (!done) cond.Wait(client_lock);
-    }
-  } else {
-    while (in->cap_refs[CEPH_CAP_WRBUFFER] > 0) {
-      dout(10) << "ino " << in->inode.ino << " has " << in->cap_refs[CEPH_CAP_WRBUFFER] << " uncommitted, waiting" << dendl;
-      wait_on_list(in->waitfor_commit);
-    }    
-    dout(10) << "ino " << in->inode.ino << " has no uncommitted writes" << dendl;
-  }
+  if (g_conf.client_oc)
+    _flush(in);
+  
+  while (in->cap_refs[CEPH_CAP_WRBUFFER] > 0) {
+    dout(10) << "ino " << in->inode.ino << " has " << in->cap_refs[CEPH_CAP_WRBUFFER] << " uncommitted, waiting" << dendl;
+    wait_on_list(in->waitfor_commit);
+  }    
+  dout(10) << "ino " << in->inode.ino << " has no uncommitted writes" << dendl;
+
   return r;
 }
 
index e99c0d33910911a2e3c3e9577ac4aa257c9e9afc..348bad0eee646fb042e6a309f03732c96781c724 100644 (file)
@@ -749,12 +749,8 @@ protected:
   void handle_file_caps(class MClientFileCaps *m);
   void check_caps(Inode *in);
   void put_cap_ref(Inode *in, int cap);
-
-
-  void implemented_caps(class MClientFileCaps *m, Inode *in);
-  void release_caps(Inode *in, int retain=0);
-
-
+  void _flush(Inode *in);
+  void _flushed(Inode *in);
 
   void close_release(Inode *in);
   void close_safe(Inode *in);