]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tearing up client cap code
authorSage Weil <sage@newdream.net>
Thu, 15 May 2008 02:26:07 +0000 (19:26 -0700)
committerSage Weil <sage@newdream.net>
Thu, 15 May 2008 02:26:07 +0000 (19:26 -0700)
src/client/Client.cc
src/client/Client.h
src/client/FileCache.cc
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h

index a7723312d7e805d97c88d3f430618eb3e55b584f..9c5cc3b203b4ac95a1f5ddc7e18d9eb1d075d991 100644 (file)
@@ -317,7 +317,7 @@ void Client::update_inode(Inode *in, InodeStat *st, LeaseStat *lease, utime_t fr
   dout(12) << "update_inode mask " << lease->mask << " ttl " << ttl << dendl;
 
   if (lease->mask & CEPH_STAT_MASK_INODE) {
-    int issued = in->file_caps();
+    int issued = in->caps_issued();
     
     in->inode.ino = st->ino;
     in->inode.layout = st->layout;
@@ -1150,13 +1150,13 @@ void Client::send_reconnect(int mds)
         p++) {
       if (p->second->caps.count(mds)) {
        dout(10) << " caps on " << p->first
-                << " " << cap_string(p->second->caps[mds].caps)
-                << " wants " << cap_string(p->second->file_caps_wanted())
+                << " " << cap_string(p->second->caps[mds].issued)
+                << " wants " << cap_string(p->second->caps_wanted())
                 << dendl;
        p->second->caps[mds].seq = 0;  // reset seq.
        m->add_inode_caps(p->first,    // ino
-                         p->second->file_caps_wanted(), // wanted
-                         p->second->caps[mds].caps,     // issued
+                         p->second->caps_wanted(), // wanted
+                         p->second->caps[mds].issued,     // issued
                          p->second->inode.size, p->second->inode.mtime, p->second->inode.atime);
        filepath path;
        p->second->make_path(path);
@@ -1273,6 +1273,112 @@ void Client::release_lease(Inode *in, Dentry *dn, int mask)
  * caps
  */
 
+
+
+void Inode::get_cap_ref(int cap)
+{
+  int n = 0;
+  while (cap) {
+    if (cap & 1) {
+      int c = 1 << n;
+      cap_refs[c]++;
+      cout << "inode " << inode.ino << " get " << cap_string(c) << " "
+          << (cap_refs[c]-1) << " -> " << cap_refs[c] << std::endl;
+    }
+    cap >>= 1;
+    n++;
+  }
+}
+
+bool Inode::put_cap_ref(int cap)
+{
+  bool last;
+  int n = 0;
+  while (cap) {
+    if (cap & 1) {
+      int c = 1 << n;
+      if (--cap_refs[c] == 0)
+       last = true;      
+      cout << "inode " << inode.ino << " put " << cap_string(c) << " "
+          << (cap_refs[c]+1) << " -> " << cap_refs[c] << std::endl;
+    }
+    cap >>= 1;
+    n++;
+  }
+  return last;
+}
+
+void Client::put_cap_ref(Inode *in, int cap)
+{
+  if (in->put_cap_ref(cap))
+    check_caps(in);
+}
+
+void Client::check_caps(Inode *in)
+{
+  int wanted = in->caps_wanted();
+  int used = in->caps_used();
+
+  dout(10) << "check_caps on " << in->inode.ino 
+          << " wanted " << cap_string(wanted)
+          << " used " << cap_string(used)
+          << dendl;
+  
+  for (map<int,InodeCap>::iterator it = in->caps.begin();
+       it != in->caps.end();
+       it++) {
+    InodeCap &cap = it->second;
+    int revoking = cap.implemented & ~cap.issued;
+    
+    if (in->wanted_max_size > in->inode.max_size &&
+       in->wanted_max_size > in->requested_max_size)
+      goto ack;
+
+    /* completed revocation? */
+    if (revoking && (revoking && used) == 0) {
+      dout(10) << "completed revocation of " << (cap.implemented & ~cap.issued) << dendl;
+      goto ack;
+    }
+
+    /* approaching file_max? */
+    if ((cap.issued & CEPH_CAP_WR) &&
+       (in->inode.size << 1) >= in->inode.max_size) {
+       //(in->i_reported_size << 1) < in->inode.max_size) {
+       dout(10) << "size approaching max_size" << dendl;
+      goto ack;
+    }
+
+    if ((cap.issued & ~wanted) == 0)
+      continue;     /* nothing extra, all good */
+
+    /*
+    if (time_before(jiffies, ci->i_hold_caps_until)) {
+    // delaying cap release for a bit
+    dout(30, "delaying cap release\n");
+    continue;
+    }
+    */
+    
+  ack:
+    MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
+                                            in->inode, 
+                                             it->second.seq,
+                                             it->second.issued,
+                                             wanted);
+    m->set_max_size(in->wanted_max_size);
+    in->requested_max_size = in->wanted_max_size;
+    messenger->send_message(m, mdsmap->get_inst(it->first));
+    if (wanted == 0)
+      mds_sessions[it->first].num_caps--;
+  }
+
+  if (wanted == 0 && !in->caps.empty()) {
+    in->caps.clear();
+    put_inode(in);
+  }
+}
+
+
 class C_Client_ImplementedCaps : public Context {
   Client *client;
   MClientFileCaps *msg;
@@ -1284,6 +1390,15 @@ public:
   }
 };
 
+
+void signal_cond_list(list<Cond*>& ls)
+{
+  for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); it++)
+    (*it)->Signal();
+  ls.clear();
+}
+
+
 /** handle_file_caps
  * handle caps update from mds.  including mds to mds caps transitions.
  * do not block.
@@ -1335,7 +1450,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
        mds_sessions[mds].num_caps++;
         if (in->caps.empty()) in->get();
         in->caps[mds].seq = m->get_seq();
-        in->caps[mds].caps = m->get_caps();
+        in->caps[mds].issued = m->get_caps();
       }
       
       assert(in->stale_caps.count(other));
@@ -1410,9 +1525,12 @@ void Client::handle_file_caps(MClientFileCaps *m)
     delete m;
     return;
   }
+  
+  InodeCap &cap = in->caps[mds];
+
 
   // don't want?
-  int wanted = in->file_caps_wanted();
+  int wanted = in->caps_wanted();
   if (wanted == 0) {
     dout(5) << "handle_file_caps on ino " << m->get_ino() 
             << " seq " << m->get_seq() 
@@ -1425,11 +1543,11 @@ void Client::handle_file_caps(MClientFileCaps *m)
     return;
   }
 
+  int used = in->caps_used();
+
   // update per-mds caps
-  const int old_caps = in->caps[mds].caps;
+  const int old_caps = cap.issued;
   const int new_caps = m->get_caps();
-  in->caps[mds].caps = new_caps;
-  in->caps[mds].seq = m->get_seq();
   dout(5) << "handle_file_caps on in " << m->get_ino() 
           << " mds" << mds << " seq " << m->get_seq() 
           << " caps now " << cap_string(new_caps) 
@@ -1446,6 +1564,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
     in->inode.time_warp_seq = m->get_time_warp_seq();
   }
 
+  bool kick_writers = false;
   if (m->get_max_size() != in->inode.max_size) {
     dout(10) << "max_size " << in->inode.max_size << " -> " << m->get_max_size() << dendl;
     in->inode.max_size = m->get_max_size();
@@ -1453,71 +1572,56 @@ void Client::handle_file_caps(MClientFileCaps *m)
       in->wanted_max_size = 0;
       in->requested_max_size = 0;
     }
+    kick_writers = true;
+  }
+
+  // update caps
+  cap.seq = m->get_seq();
+
+  bool ack = false;
+  bool invalidate = false;
+  bool writeback = false;
+
+  if (old_caps & ~new_caps) { 
+    dout(10) << "  revocation of " << cap_string(~new_caps & old_caps) << dendl;
+    cap.issued = new_caps;
+
+    if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
+      invalidate = true;
+    if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
+      writeback = true;
+    else {
+      ack = true;
+      cap.implemented = new_caps;
+
+      // share our (possibly newer) file size, mtime, atime
+      m->set_size(in->inode.size);
+      m->set_max_size(0);  // dont re-request
+      m->set_mtime(in->inode.mtime);
+      m->set_atime(in->inode.atime);
+      m->set_wanted(wanted);
+    }
+  } else if (old_caps == new_caps) {
+    dout(10) << "  caps unchanged at " << cap_string(old_caps) << dendl;
+  } else {
+    dout(10) << "  grant, new caps are " << cap_string(new_caps & ~old_caps) << dendl;
+    cap.issued = cap.implemented = new_caps;
   }
 
-  // share our (possibly newer) file size, mtime, atime
-  m->set_size(in->inode.size);
-  m->set_mtime(in->inode.mtime);
-  m->set_atime(in->inode.atime);
+  // wake up waiters
+  if (new_caps & CEPH_CAP_RD) signal_cond_list(in->waitfor_read);
+  if ((new_caps & CEPH_CAP_WR) || kick_writers) signal_cond_list(in->waitfor_write);
+  if ((new_caps & CEPH_CAP_LAZYIO) || kick_writers) signal_cond_list(in->waitfor_lazy);
 
-  if (g_conf.client_oc) {
-    // caching on, use FileCache.
-    Context *onimplement = 0;
-    if (old_caps & ~new_caps) {     // this mds is revoking caps
-      if (in->fc.get_caps() & ~(in->file_caps()))   // net revocation
-        onimplement = new C_Client_ImplementedCaps(this, m, in);
-      else {
-        implemented_caps(m, in);        // ack now.
-      }
-    }
-    in->fc.set_caps(new_caps, onimplement);
-  } else {
-    // caching off.
+  if (ack)
+    messenger->send_message(m, m->get_source_inst());
 
-    // wake up waiters?
-    if (new_caps & CEPH_CAP_RD) {
-      for (list<Cond*>::iterator it = in->waitfor_read.begin();
-           it != in->waitfor_read.end();
-           it++) {
-        dout(5) << "signaling read waiter " << *it << dendl;
-        (*it)->Signal();
-      }
-      in->waitfor_read.clear();
-    }
-    if (new_caps & CEPH_CAP_WR) {
-      for (list<Cond*>::iterator it = in->waitfor_write.begin();
-           it != in->waitfor_write.end();
-           it++) {
-        dout(5) << "signaling write waiter " << *it << dendl;
-        (*it)->Signal();
-      }
-      in->waitfor_write.clear();
-    }
-    if (new_caps & CEPH_CAP_LAZYIO) {
-      for (list<Cond*>::iterator it = in->waitfor_lazy.begin();
-           it != in->waitfor_lazy.end();
-           it++) {
-        dout(5) << "signaling lazy waiter " << *it << dendl;
-        (*it)->Signal();
-      }
-      in->waitfor_lazy.clear();
-    }
+  if (invalidate)
+    in->fc.release_clean();
+
+  if (writeback)
+    in->fc.flush_dirty();
 
-    // ack?
-    if (old_caps & ~new_caps) {
-      if (in->sync_writes) {
-        // wait for sync writes to finish
-        dout(5) << "sync writes in progress, will ack on finish" << dendl;
-        in->waitfor_no_write.push_back(new C_Client_ImplementedCaps(this, m, in));
-      } else {
-        // ok now
-        implemented_caps(m, in);
-      }
-    } else {
-      // discard
-      delete m;
-    }
-  }
 }
 
 void Client::implemented_caps(MClientFileCaps *m, Inode *in)
@@ -1529,69 +1633,6 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in)
 }
 
 
-void Client::release_caps(Inode *in,
-                          int retain)
-{
-  int wanted = in->file_caps_wanted();
-  dout(5) << "releasing caps on ino " << in->inode.ino << dec
-          << " had " << cap_string(in->file_caps())
-          << " retaining " << cap_string(retain) 
-         << " want " << cap_string(wanted)
-          << dendl;
-  
-  for (map<int,InodeCap>::iterator it = in->caps.begin();
-       it != in->caps.end();
-       it++) {
-    //if (it->second.caps & ~retain) {
-    if (1) {
-      // release (some of?) these caps
-      it->second.caps = retain & it->second.caps;
-      // note: tell mds _full_ wanted; it'll filter/behave based on what it is allowed to do
-      MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
-                                              in->inode, 
-                                               it->second.seq,
-                                               it->second.caps,
-                                               wanted);
-      messenger->send_message(m, mdsmap->get_inst(it->first));
-    }
-    if (wanted == 0)
-      mds_sessions[it->first].num_caps--;
-  }
-  if (wanted == 0 && !in->caps.empty()) {
-    in->caps.clear();
-    put_inode(in);
-  }
-}
-
-void Client::update_caps_wanted(Inode *in)
-{
-  int wanted = in->file_caps_wanted();
-  dout(5) << "updating caps wanted on ino " << in->inode.ino 
-          << " to " << cap_string(wanted)
-         << " max_size " << in->wanted_max_size
-          << dendl;
-  
-  // FIXME: pick a single mds and let the others off the hook..
-  for (map<int,InodeCap>::iterator it = in->caps.begin();
-       it != in->caps.end();
-       it++) {
-    MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
-                                            in->inode, 
-                                             it->second.seq,
-                                             it->second.caps,
-                                             wanted);
-    m->set_max_size(in->wanted_max_size);
-    in->requested_max_size = in->wanted_max_size;
-    messenger->send_message(m, mdsmap->get_inst(it->first));
-    if (wanted == 0)
-      mds_sessions[it->first].num_caps--;
-  }
-  if (wanted == 0 && !in->caps.empty()) {
-    in->caps.clear();
-    put_inode(in);
-  }
-}
-  
 
 
 // -------------------
@@ -1746,7 +1787,7 @@ int Client::unmount()
           in->fc.empty(new C_Client_CloseRelease(this, in));
         } else {
           dout(10) << "unmount residual caps on " << in->ino()  << ", releasing" << dendl;
-          release_caps(in);
+          check_caps(in);
         }
       }
     }
@@ -2295,7 +2336,7 @@ int Client::_utimes(const filepath &path, utime_t mtime, utime_t atime, bool fol
   Dentry *dn = lookup(path);
   int want = CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
   if (dn && dn->inode &&
-      (dn->inode->file_caps() & want) == want) {
+      (dn->inode->caps_issued() & want) == want) {
     dout(5) << " have WR and EXCL caps, just updating our m/atime" << dendl;
     dn->inode->inode.time_warp_seq++;
     dn->inode->inode.mtime = mtime;
@@ -2714,7 +2755,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
   Inode *in = 0;
   if (dn) {
     in = dn->inode;
-    in->add_open(cmode);  // make note of pending open, since it effects _wanted_ caps.
+    in->get_open_ref(cmode);  // make note of pending open, since it effects _wanted_ caps.
   }
   
   in = 0;
@@ -2737,7 +2778,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
     f->inode = in;
     f->inode->get();
     if (!dn)
-      in->add_open(f->mode);  // i may have alrady added it above!
+      in->get_open_ref(f->mode);  // i may have alrady added it above!
 
     dout(10) << in->inode.ino << " wr " << in->num_open_wr << " rd " << in->num_open_rd
             << " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
@@ -2753,11 +2794,11 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
     if (in->caps.count(mds) == 0)
       mds_sessions[mds].num_caps++;
 
-    int new_caps = reply->get_file_caps();
+    int new_caps = reply->get_caps_issued();
 
-    assert(reply->get_file_caps_seq() >= in->caps[mds].seq);
-    if (reply->get_file_caps_seq() > in->caps[mds].seq) {   
-      int old_caps = in->caps[mds].caps;
+    assert(reply->get_file_caps_seq() >= cap.seq);
+    if (reply->get_file_caps_seq() > cap.seq) {   
+      int old_caps = cap.caps;
 
       dout(7) << "open got caps " << cap_string(new_caps)
              << " (had " << cap_string(old_caps) << ")"
@@ -2766,12 +2807,12 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
               << " from mds" << mds 
              << dendl;
 
-      in->caps[mds].caps = new_caps;
-      in->caps[mds].seq = reply->get_file_caps_seq();
+      cap.caps = new_caps;
+      cap.seq = reply->get_file_caps_seq();
       
       // we shouldn't ever lose caps at this point.
       // actually, we might...?
-      assert((old_caps & ~in->caps[mds].caps) == 0);
+      assert((old_caps & ~cap.caps) == 0);
 
       if (g_conf.client_oc)
         in->fc.set_caps(new_caps);
@@ -2784,7 +2825,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
              << dendl;
     }
     
-    dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->file_caps()) << dendl;
+    dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->caps_issued()) << dendl;
   }
 
   delete reply;
@@ -2846,15 +2887,15 @@ int Client::_release(Fh *f)
   Inode *in = f->inode;
 
   // update inode rd/wr counts
-  int before = in->file_caps_wanted();
-  in->sub_open(f->mode);
-  int after = in->file_caps_wanted();
+  int before = in->caps_wanted();
+  in->put_open_ref(f->mode);
+  int after = in->caps_wanted();
 
   delete f;
 
   // does this change what caps we want?
   if (before != after && after)
-    update_caps_wanted(in);
+    check_caps(in);
 
   // release caps right away?
   dout(10) << "num_open_rd " << in->num_open_rd << "  num_open_wr " << in->num_open_wr << dendl;
@@ -2998,7 +3039,7 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
 
     if (lazy) {
       // wait for lazy cap
-      if ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+      if ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
        dout(7) << " don't have lazy cap, waiting" << dendl;
        Cond cond;
        in->waitfor_lazy.push_back(&cond);
@@ -3007,14 +3048,14 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
       }
     } else {
       // wait for RD cap?
-      while ((in->file_caps() & CEPH_CAP_RD) == 0) {
+      while ((in->caps_issued() & CEPH_CAP_RD) == 0) {
        dout(7) << " don't have read cap, waiting" << dendl;
        goto wait;
       }
     }
     
     // async i/o?
-    if ((in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
+    if ((in->caps_issued() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
 
       // FIXME: this logic needs to move info FileCache!
 
@@ -3109,14 +3150,7 @@ void Client::sync_write_commit(Inode *in)
   unsafe_sync_write--;
   in->uncommitted_writes--;
 
-  if (in->uncommitted_writes == 0) {
-    dout(15) << "sync_write_commit all sync writes committed on ino " << in->inode.ino << dendl;
-    for (list<Cond*>::iterator p = in->waitfor_commit.begin();
-        p != in->waitfor_commit.end();
-        p++)
-      (*p)->Signal();
-    in->waitfor_commit.clear();
-  }
+  cl->put_cap_ref(in, CEPH_CAP_WRBUFFER);
 
   dout(15) << "sync_write_commit unsafe_sync_write = " << unsafe_sync_write << dendl;
   if (unsafe_sync_write == 0 && unmounting) {
@@ -3177,80 +3211,54 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
   bufferlist blist;
   blist.push_back( bp );
 
-  if (g_conf.client_oc) { // buffer cache ON?
-    assert(objectcacher);
-
-    // write (this may block!)
-    in->fc.write(offset, size, blist, client_lock);
-
-  } else {
-    // legacy, inconsistent synchronous write.
-    dout(7) << "synchronous write" << dendl;
-
-    // do we have write file cap?
-    __u64 endoff = offset + size;
-
-    if ((endoff >= in->inode.max_size ||
-        endoff > (in->inode.size << 1)) &&
-       endoff > in->wanted_max_size) {
-      dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
-      in->wanted_max_size = endoff;
-      update_caps_wanted(in);
-    }
-
-    while (!lazy &&
-          ((in->file_caps() & CEPH_CAP_WR) == 0 ||
-           endoff > in->inode.max_size)) {
-      dout(7) << " don't have write cap for endoff " << endoff
-             << " (max " << in->inode.max_size << "), waiting" << dendl;
-      Cond cond;
-      in->waitfor_write.push_back(&cond);
-      cond.Wait(client_lock);
-    }
-    while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
-      dout(7) << " don't have lazy cap, waiting" << dendl;
-      Cond cond;
+  // request larger max_size?
+  __u64 endoff = offset + size;
+  if ((endoff >= in->inode.max_size ||
+       endoff > (in->inode.size << 1)) &&
+      endoff > in->wanted_max_size) {
+    dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
+    in->wanted_max_size = endoff;
+    check_caps(in);
+  }
+  
+  // wait for caps, max_size
+  while ((lazy && (in->caps_issued() & CEPH_CAP_LAZYIO) == 0) ||
+        (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0) ||
+        endoff > in->inode.max_size) {
+    dout(7) << "missing wr|lazy cap OR endoff " << endoff
+           << " > max_size " << in->inode.max_size 
+           << ", waiting" << dendl;
+    Cond cond;
+    if (lazy)
       in->waitfor_lazy.push_back(&cond);
-      cond.Wait(client_lock);
-    }
+    else 
+      in->waitfor_write.push_back(&cond);
+    cond.Wait(client_lock);
+  }
 
-    // avoid livelock with fsync
-    if (in->uncommitted_writes > 0 &&
-       !in->waitfor_commit.empty())
-      _fsync(f, true);
+  in->get_cap_ref(CEPH_CAP_WR);
 
-    // prepare write
+  // avoid livelock with fsync?
+
+  if (g_conf.client_oc) {
+    // buffer cache
+    in->fc.write(offset, size, blist, client_lock);
+  } else {
+    // simple, non-atomic sync write
     Cond cond;
     bool done = false;
     Context *onfinish = new C_Cond(&cond, &done);
-    in->get();
     Context *onsafe = new C_Client_SyncCommit(this, in);
+
     unsafe_sync_write++;
-    in->sync_writes++;
-    
-    dout(20) << " sync write start " << onfinish << dendl;
+    in->get();
+    in->get_cap_ref(CEPH_CAP_WRBUFFER);
     
     filer->write(in->inode, offset, size, blist, 0, 
-                 onfinish, onsafe
-                //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
-                ); 
+                 onfinish, onsafe);
     
-    while (!done) {
+    while (!done)
       cond.Wait(client_lock);
-      dout(20) << " sync write bump " << onfinish << dendl;
-    }
-
-    in->sync_writes--;
-    if (in->sync_writes == 0 &&
-        !in->waitfor_no_write.empty()) {
-      for (list<Context*>::iterator i = in->waitfor_no_write.begin();
-           i != in->waitfor_no_write.end();
-           i++)
-        (*i)->finish(0);
-      in->waitfor_no_write.clear();
-    }
-
-    dout(20) << " sync write done " << onfinish << dendl;
   }
 
   // time
@@ -3275,6 +3283,8 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
   // mtime
   in->inode.mtime = g_clock.real_now();
 
+  put_cap_ref(in, CEPH_CAP_WR);
+  
   // ok!
   return totalwritten;  
 }
@@ -3482,7 +3492,7 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count)
 
   if (f->mode & CEPH_FILE_MODE_LAZY) {
     // wait for lazy cap
-    while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+    while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
       dout(7) << " don't have lazy cap, waiting" << dendl;
       Cond cond;
       in->waitfor_lazy.push_back(&cond);
@@ -3518,7 +3528,7 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count)
   
   if (f->mode & CEPH_FILE_MODE_LAZY) {
     // wait for lazy cap
-    while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+    while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
       dout(7) << " don't have lazy cap, waiting" << dendl;
       Cond cond;
       in->waitfor_lazy.push_back(&cond);
index 0a9f28f634618a2f40d2f6cee1a370bba41f7db9..561c2d359ab182831124dc5795378711865bc4d8 100644 (file)
@@ -127,9 +127,10 @@ class Dir {
 
 class InodeCap {
  public:
-  int  caps;
+  unsigned issued;
+  unsigned implemented;
   unsigned seq;
-  InodeCap() : caps(0), seq(0) {}
+  InodeCap() : issued(0), implemented(0), seq(0) {}
 };
 
 
@@ -148,7 +149,9 @@ class Inode {
   map<int,InodeCap> caps;            // mds -> InodeCap
   map<int,InodeCap> stale_caps;      // mds -> cap .. stale
 
-  int       num_open_rd, num_open_wr, num_open_lazy;  // num readers, writers
+  int open_by_mode[CEPH_FILE_MODE_NUM];
+  map<int,int> cap_refs;
+
   __u64     wanted_max_size, requested_max_size;
 
   int       ref;      // ref count. 1 for each dentry, fh that links to me.
@@ -162,10 +165,6 @@ class Inode {
   // for caching i/o mode
   FileCache fc;
 
-  // for sync i/o mode
-  int       sync_reads;   // sync reads in progress
-  int       sync_writes;  // sync writes in progress
-  int       uncommitted_writes;  // sync writes missing commits
 
   list<Cond*>       waitfor_write;
   list<Cond*>       waitfor_read;
@@ -207,14 +206,13 @@ class Inode {
     //inode(_inode),
     lease_mask(0), lease_mds(-1),
     dir_auth(-1), dir_hashed(false), dir_replicated(false), 
-    num_open_rd(0), num_open_wr(0), num_open_lazy(0),
     wanted_max_size(0), requested_max_size(0),
     ref(0), ll_ref(0), 
     dir(0), dn(0), symlink(0),
     fc(_oc, ino, layout),
-    sync_reads(0), sync_writes(0), uncommitted_writes(0),
     hack_balance_reads(false)
   {
+    memset(open_by_mode, 0, sizeof(int)*CEPH_FILE_MODE_NUM);
     inode.ino = ino;
   }
   ~Inode() {
@@ -225,34 +223,60 @@ class Inode {
 
   bool is_dir() { return inode.is_dir(); }
 
-  int file_caps() {
+
+  // CAPS --------
+  void get_open_ref(int mode) {
+    open_by_mode[mode]++;
+  }
+  bool put_open_ref(int mode) {
+    if (--open_by_mode[mode] == 0)
+      return true;
+    return false;
+  }
+
+  void get_cap_ref(int cap);
+  bool put_cap_ref(int cap);
+
+  int caps_issued() {
     int c = 0;
     for (map<int,InodeCap>::iterator it = caps.begin();
          it != caps.end();
          it++)
-      c |= it->second.caps;
+      c |= it->second.issued;
     for (map<int,InodeCap>::iterator it = stale_caps.begin();
          it != stale_caps.end();
          it++)
-      c |= it->second.caps;
+      c |= it->second.issued;
     return c;
   }
 
-  int file_caps_wanted() {
+  int caps_used() {
     int w = 0;
-    if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
-    if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
-    if (num_open_lazy) w |= CEPH_CAP_LAZYIO;
-    if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
-    if (fc.is_cached()) w |= CEPH_CAP_RDCACHE;
+    for (map<int,int>::iterator p = cap_refs.begin();
+        p != cap_refs.end();
+        p++)
+      w |= p->first;
     return w;
   }
+  int caps_file_wanted() {
+    int want = 0;
+    for (int mode = 0; mode < 4; mode++)
+      if (open_by_mode[mode])
+       want |= ceph_caps_for_mode(mode);
+    return want;
+  }
+  int caps_wanted() {
+    int want = caps_file_wanted() | caps_used();
+    if (want & CEPH_CAP_WRBUFFER)
+      want |= CEPH_CAP_EXCL;
+    return want;
+  }
 
   int get_effective_lease_mask(utime_t now) {
     int havemask = 0;
     if (now < lease_ttl && lease_mds >= 0)
       havemask |= lease_mask;
-    if (file_caps() & CEPH_CAP_EXCL) 
+    if (caps_issued() & CEPH_CAP_EXCL) 
       havemask |= CEPH_LOCK_ICONTENT;
     if (havemask & CEPH_LOCK_ICONTENT)
       havemask |= CEPH_LOCK_ICONTENT;   // hack: if we have one, we have both, for the purposes of below
@@ -261,9 +285,9 @@ class Inode {
 
   bool have_valid_size() {
     // RD+RDCACHE or WR+WRBUFFER => valid size
-    if ((file_caps() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE))
+    if ((caps_issued() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE))
       return true;
-    if ((file_caps() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))
+    if ((caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))
       return true;
     // otherwise, look for lease or EXCL...
     if (get_effective_lease_mask(g_clock.now()) & CEPH_LOCK_ICONTENT)
@@ -271,17 +295,7 @@ class Inode {
     return false;
   }
 
-  void add_open(int cmode) {
-    if (cmode & CEPH_FILE_MODE_RD) num_open_rd++;
-    if (cmode & CEPH_FILE_MODE_WR) num_open_wr++;
-    if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy++;
-  }
-  void sub_open(int cmode) {
-    if (cmode & CEPH_FILE_MODE_RD) num_open_rd--;
-    if (cmode & CEPH_FILE_MODE_WR) num_open_wr--;
-    if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy--;
-  }
-  
   int authority(const string& dname) {
     if (!dirfragtree.empty()) {
       __gnu_cxx::hash<string> H;
@@ -728,9 +742,14 @@ protected:
 
   // file caps
   void handle_file_caps(class MClientFileCaps *m);
+  void check_caps(Inode *in);
+  void put_cap_ref(Inode *in, int cap);
+
+
   void implemented_caps(class MClientFileCaps *m, Inode *in);
   void release_caps(Inode *in, int retain=0);
-  void update_caps_wanted(Inode *in);
+
+
 
   void close_release(Inode *in);
   void close_safe(Inode *in);
index 3f6a345f592d9c8aa9c9f727cc2f106af22f5e7b..ff0c38c7a3391a62441445b8ffe059d816567b59 100644 (file)
@@ -220,15 +220,6 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
 
 void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
 {
-  // can i write
-  while ((latest_caps & CEPH_CAP_WR) == 0) {
-    dout(10) << "write doesn't have WR cap, blocking" << dendl;
-    Cond c;
-    waitfor_write.insert(&c);
-    c.Wait(client_lock);
-    waitfor_write.erase(&c);
-  }
-
   // inc writing counter
   num_writing++;
 
index 2b6e67dfc9a396ee67145ec5b8e69d42d7571e41..4f611298f93fcb9db452d7c257fd2601c0ccc355 100644 (file)
@@ -915,8 +915,9 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
  
 
 // blocking wait for write.
-void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
+bool ObjectCacher::wait_for_write(size_t len, Mutex& lock)
 {
+  int blocked = 0;
   while (get_stat_dirty() + get_stat_tx() >= g_conf.client_oc_max_dirty) {
     dout(10) << "wait_for_write waiting on " << len << ", dirty|tx " 
             << (get_stat_dirty() + get_stat_tx()) 
@@ -926,8 +927,10 @@ void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
     stat_waiter++;
     stat_cond.Wait(lock);
     stat_waiter--;
+    blocked++;
     dout(10) << "wait_for_write woke up" << dendl;
   }
+  return blocked;
 }
 
 void ObjectCacher::flusher_entry()
index 196687f92094e197aa911e10b5c2816a6842312d..2f45ba7c903206c5554b31c6c55c3215e59d4f78 100644 (file)
@@ -456,7 +456,7 @@ class ObjectCacher {
   int writex(Objecter::OSDWrite *wr, inodeno_t ino);
 
   // write blocking
-  void wait_for_write(size_t len, Mutex& lock);
+  bool wait_for_write(size_t len, Mutex& lock);
   
   // blocking.  atomic+sync.
   int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);