]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: fix I_COMPLETE_ORDERED checking 2813/head
authorYan, Zheng <zyan@redhat.com>
Mon, 27 Oct 2014 20:57:16 +0000 (13:57 -0700)
committerYan, Zheng <zyan@redhat.com>
Mon, 27 Oct 2014 22:27:37 +0000 (15:27 -0700)
Current code marks a directory inode as complete and ordered when readdir
finishes, but it does not check if the directory was modified in the middle
of readdir. This is wrong, directory inode should not be marked as ordered
if it was modified during readddir

The fix is introduce a new counter to the inode data struct, we increase
the counter each time the directory is modified. When readdir finishes, we
check the counter to decide if the directory should be marked as ordered.

Fixes: #9894
Signed-off-by: Yan, Zheng <zyan@redhat.com>
src/client/Client.cc
src/client/Client.h
src/client/Dir.h
src/client/Inode.h

index 18dcd5945a6d8f34e1180ce8fe5d7cf7a440b60f..205b4709cef6e1d4ebc1f56cf65aadc0826be50f 100644 (file)
@@ -143,7 +143,7 @@ bool Client::CommandHook::call(std::string command, cmdmap_t& cmdmap,
 
 dir_result_t::dir_result_t(Inode *in)
   : inode(in), offset(0), this_offset(2), next_offset(2),
-    release_count(0), start_shared_gen(0),
+    release_count(0), ordered_count(0), start_shared_gen(0),
     buffer(0) {
   inode->get();
 }
@@ -551,10 +551,10 @@ void Client::trim_dentry(Dentry *dn)
   ldout(cct, 15) << "trim_dentry unlinking dn " << dn->name 
                 << " in dir " << hex << dn->dir->parent_inode->ino 
                 << dendl;
+  dn->dir->release_count++;
   if (dn->dir->parent_inode->flags & I_COMPLETE) {
-    ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *dn->dir->parent_inode << dendl;
-    dn->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
-    dn->dir->release_count++;
+    ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *dn->dir->parent_inode << dendl;
+    dn->dir->parent_inode->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
   }
   unlink(dn, false, false);  // drop dir, drop dentry
 }
@@ -775,8 +775,8 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
       in->dirstat.nfiles == 0 &&
       in->dirstat.nsubdirs == 0) {
-    ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on empty dir " << *in << dendl;
-    in->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
+    ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
+    in->flags |= I_COMPLETE | I_DIR_ORDERED;
     if (in->dir) {
       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
                     << in->dir->dentry_list.size() << " entries, marking all dentries null" << dendl;
@@ -826,16 +826,20 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
   if (!dn || dn->inode == 0) {
     in->get();
     if (old_dentry) {
-      if (old_dentry->dir->parent_inode->flags & I_COMPLETE_ORDERED) {
-       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on "
-                      << *old_dentry->dir->parent_inode << dendl;
-       old_dentry->dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+      if (old_dentry->dir != dir) {
+       old_dentry->dir->ordered_count++;
+       if (old_dentry->dir->parent_inode->flags & I_DIR_ORDERED) {
+         ldout(cct, 10) << " clearing I_DIR_ORDERED on "
+                        << *old_dentry->dir->parent_inode << dendl;
+         old_dentry->dir->parent_inode->flags &= ~I_DIR_ORDERED;
+       }
       }
       unlink(old_dentry, dir == old_dentry->dir, false);  // drop dentry, keep dir open if its the same dir
     }
-    if (dir->parent_inode->flags & I_COMPLETE_ORDERED) {
-       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *dir->parent_inode << dendl;
-       dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+    dir->ordered_count++;
+    if (dir->parent_inode->flags & I_DIR_ORDERED) {
+       ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *dir->parent_inode << dendl;
+       dir->parent_inode->flags &= ~I_DIR_ORDERED;
     }
     dn = link(dir, dname, in, dn);
     put_inode(in);
@@ -1022,11 +1026,12 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     ldout(cct, 10) << "insert_trace -- no trace" << dendl;
 
     Dentry *d = request->dentry();
-    if (d && d->dir &&
-       (d->dir->parent_inode->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *d->dir->parent_inode << dendl;
-      d->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
+    if (d && d->dir) {
       d->dir->release_count++;
+      if (d->dir->parent_inode->flags & I_COMPLETE) {
+       ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *d->dir->parent_inode << dendl;
+       d->dir->parent_inode->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
+      }
     }
 
     if (d && reply->get_result() == 0) {
@@ -1092,9 +1097,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
        if (dn->inode) {
-         if (diri->flags & I_COMPLETE_ORDERED) {
-           ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *diri << dendl;
-           diri->flags &= ~I_COMPLETE_ORDERED;
+         diri->dir->ordered_count++;
+         if (diri->flags & I_DIR_ORDERED) {
+           ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *diri << dendl;
+           diri->flags &= ~I_DIR_ORDERED;
          }
          unlink(dn, true, true);  // keep dir, dentry
        }
@@ -3092,8 +3098,8 @@ void Client::check_cap_issue(Inode *in, Cap *cap, unsigned issued)
     in->shared_gen++;
 
     if (in->is_dir() && (in->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *in << dendl;
-      in->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
+      ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *in << dendl;
+      in->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
     }
   }
 }
@@ -5456,8 +5462,10 @@ int Client::_opendir(Inode *in, dir_result_t **dirpp, int uid, int gid)
     return -ENOTDIR;
   *dirpp = new dir_result_t(in);
   (*dirpp)->set_frag(in->dirfragtree[0]);
-  if (in->dir)
+  if (in->dir) {
     (*dirpp)->release_count = in->dir->release_count;
+    (*dirpp)->ordered_count = in->dir->ordered_count;
+  }
   (*dirpp)->start_shared_gen = in->shared_gen;
   ldout(cct, 10) << "_opendir " << in->ino << ", our cache says the first dirfrag is " << (*dirpp)->frag() << dendl;
   ldout(cct, 3) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
@@ -5814,13 +5822,13 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
 
   // can we read from our cache?
   ldout(cct, 10) << "offset " << hex << dirp->offset << dec << " at_cache_name " << dirp->at_cache_name
-          << " snapid " << dirp->inode->snapid << " I_COMPLETE_ORDERED "
-          << (bool)(dirp->inode->flags & I_COMPLETE_ORDERED)
+          << " snapid " << dirp->inode->snapid << " (complete && ordered) "
+          << dirp->inode->is_complete_and_ordered()
           << " issued " << ccap_string(dirp->inode->caps_issued())
           << dendl;
   if ((dirp->offset == 2 || dirp->at_cache_name.length()) &&
       dirp->inode->snapid != CEPH_SNAPDIR &&
-      (dirp->inode->flags & I_COMPLETE_ORDERED) &&
+      dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
     int err = _readdir_cache_cb(dirp, cb, p);
     if (err != -EAGAIN)
@@ -5887,10 +5895,15 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
     }
 
     if (diri->dir &&
-       diri->dir->release_count == dirp->release_count &&
-       diri->shared_gen == dirp->start_shared_gen) {
-      ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on " << *diri << dendl;
-      diri->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
+       diri->shared_gen == dirp->start_shared_gen &&
+       diri->dir->release_count == dirp->release_count) {
+      if (diri->dir->ordered_count == dirp->ordered_count) {
+       ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
+       diri->flags |= I_COMPLETE | I_DIR_ORDERED;
+      } else {
+       ldout(cct, 10) << " marking I_COMPLETE on " << *diri << dendl;
+       diri->flags |= I_COMPLETE;
+      }
     }
 
     dirp->set_end();
index 0922a0d473db9d751f4342b97d1ccaf2cd4f9b31..4afc206077d530150f2907fa2d4a8f84259c3f41 100644 (file)
@@ -168,6 +168,7 @@ struct dir_result_t {
   string last_name;      // last entry in previous chunk
 
   uint64_t release_count;
+  uint64_t ordered_count;
   int start_shared_gen;  // dir shared_gen at start of readdir
 
   frag_t buffer_frag;
index d7a99f7a618a8f194293a78160172fc57a99d9fc..0e6d28bbb298ff0a4702612dcf773b3eeefb1089 100644 (file)
@@ -9,8 +9,9 @@ class Dir {
   ceph::unordered_map<string, Dentry*> dentries;
   xlist<Dentry*> dentry_list;
   uint64_t release_count;
+  uint64_t ordered_count;
 
-  Dir(Inode* in) : release_count(0) { parent_inode = in; }
+  Dir(Inode* in) : release_count(0), ordered_count(0) { parent_inode = in; }
 
   bool is_empty() {  return dentries.empty(); }
 };
index daddfedd31450530c10e72f03543f70050bdcc03..62cc0d016714e57edd9ff951e29da1b101a356e9 100644 (file)
@@ -71,7 +71,7 @@ struct CapSnap {
 
 // inode flags
 #define I_COMPLETE 1
-#define I_COMPLETE_ORDERED 2
+#define I_DIR_ORDERED 2
 
 struct Inode {
   CephContext *cct;
@@ -136,6 +136,11 @@ struct Inode {
 
   unsigned flags;
 
+  bool is_complete_and_ordered() {
+    static const unsigned wants = I_COMPLETE | I_DIR_ORDERED;
+    return (flags & wants) == wants;
+  }
+
   // about the dir (if this is one!)
   set<int>  dir_contacts;
   bool      dir_hashed, dir_replicated;