]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: fix I_COMPLETE_ORDERED checking 2876/head
authorYan, Zheng <zyan@redhat.com>
Mon, 27 Oct 2014 20:57:16 +0000 (13:57 -0700)
committerYan, Zheng <zyan@redhat.com>
Fri, 7 Nov 2014 10:34:01 +0000 (18:34 +0800)
Current code marks a directory inode as complete and ordered when readdir
finishes, but it does not check if the directory was modified in the middle
of readdir. This is wrong, directory inode should not be marked as ordered
if it was modified during readddir

The fix is introduce a new counter to the inode data struct, we increase
the counter each time the directory is modified. When readdir finishes, we
check the counter to decide if the directory should be marked as ordered.

Fixes: #9894
Signed-off-by: Yan, Zheng <zyan@redhat.com>
(cherry picked from commit a4caed8a53d011b214ab516090676641f7c4699d)

src/client/Client.cc
src/client/Client.h
src/client/Dir.h
src/client/Inode.h

index 6617636846dc8f4fb017285afa86a742f2ea7c7f..d3630de5666cfcbdcd3de30edc4d6c4bd0b32ef2 100644 (file)
@@ -142,7 +142,7 @@ bool Client::CommandHook::call(std::string command, cmdmap_t& cmdmap,
 
 dir_result_t::dir_result_t(Inode *in)
   : inode(in), offset(0), this_offset(2), next_offset(2),
-    release_count(0), start_shared_gen(0),
+    release_count(0), ordered_count(0), start_shared_gen(0),
     buffer(0) {
   inode->get();
 }
@@ -511,10 +511,10 @@ void Client::trim_dentry(Dentry *dn)
   ldout(cct, 15) << "trim_dentry unlinking dn " << dn->name 
                 << " in dir " << hex << dn->dir->parent_inode->ino 
                 << dendl;
+  dn->dir->release_count++;
   if (dn->dir->parent_inode->flags & I_COMPLETE) {
-    ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *dn->dir->parent_inode << dendl;
-    dn->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
-    dn->dir->release_count++;
+    ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *dn->dir->parent_inode << dendl;
+    dn->dir->parent_inode->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
   }
   unlink(dn, false, false);  // drop dir, drop dentry
 }
@@ -735,8 +735,8 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
       in->dirstat.nfiles == 0 &&
       in->dirstat.nsubdirs == 0) {
-    ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on empty dir " << *in << dendl;
-    in->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
+    ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
+    in->flags |= I_COMPLETE | I_DIR_ORDERED;
     if (in->dir) {
       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
                     << in->dir->dentry_list.size() << " entries, marking all dentries null" << dendl;
@@ -786,16 +786,20 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
   if (!dn || dn->inode == 0) {
     in->get();
     if (old_dentry) {
-      if (old_dentry->dir->parent_inode->flags & I_COMPLETE_ORDERED) {
-       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on "
-                      << *old_dentry->dir->parent_inode << dendl;
-       old_dentry->dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+      if (old_dentry->dir != dir) {
+       old_dentry->dir->ordered_count++;
+       if (old_dentry->dir->parent_inode->flags & I_DIR_ORDERED) {
+         ldout(cct, 10) << " clearing I_DIR_ORDERED on "
+                        << *old_dentry->dir->parent_inode << dendl;
+         old_dentry->dir->parent_inode->flags &= ~I_DIR_ORDERED;
+       }
       }
       unlink(old_dentry, dir == old_dentry->dir, false);  // drop dentry, keep dir open if its the same dir
     }
-    if (dir->parent_inode->flags & I_COMPLETE_ORDERED) {
-       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *dir->parent_inode << dendl;
-       dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+    dir->ordered_count++;
+    if (dir->parent_inode->flags & I_DIR_ORDERED) {
+       ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *dir->parent_inode << dendl;
+       dir->parent_inode->flags &= ~I_DIR_ORDERED;
     }
     dn = link(dir, dname, in, dn);
     put_inode(in);
@@ -982,11 +986,12 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     ldout(cct, 10) << "insert_trace -- no trace" << dendl;
 
     Dentry *d = request->dentry();
-    if (d && d->dir &&
-       (d->dir->parent_inode->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *d->dir->parent_inode << dendl;
-      d->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
+    if (d && d->dir) {
       d->dir->release_count++;
+      if (d->dir->parent_inode->flags & I_COMPLETE) {
+       ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *d->dir->parent_inode << dendl;
+       d->dir->parent_inode->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
+      }
     }
 
     if (d && reply->get_result() == 0) {
@@ -1052,9 +1057,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
        if (dn->inode) {
-         if (diri->flags & I_COMPLETE_ORDERED) {
-           ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *diri << dendl;
-           diri->flags &= ~I_COMPLETE_ORDERED;
+         diri->dir->ordered_count++;
+         if (diri->flags & I_DIR_ORDERED) {
+           ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *diri << dendl;
+           diri->flags &= ~I_DIR_ORDERED;
          }
          unlink(dn, true, true);  // keep dir, dentry
        }
@@ -3002,8 +3008,8 @@ void Client::check_cap_issue(Inode *in, Cap *cap, unsigned issued)
     in->shared_gen++;
 
     if (in->is_dir() && (in->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *in << dendl;
-      in->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
+      ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *in << dendl;
+      in->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
     }
   }
 }
@@ -5150,8 +5156,10 @@ int Client::_opendir(Inode *in, dir_result_t **dirpp, int uid, int gid)
     return -ENOTDIR;
   *dirpp = new dir_result_t(in);
   (*dirpp)->set_frag(in->dirfragtree[0]);
-  if (in->dir)
+  if (in->dir) {
     (*dirpp)->release_count = in->dir->release_count;
+    (*dirpp)->ordered_count = in->dir->ordered_count;
+  }
   (*dirpp)->start_shared_gen = in->shared_gen;
   ldout(cct, 10) << "_opendir " << in->ino << ", our cache says the first dirfrag is " << (*dirpp)->frag() << dendl;
   ldout(cct, 3) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
@@ -5508,13 +5516,13 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
 
   // can we read from our cache?
   ldout(cct, 10) << "offset " << hex << dirp->offset << dec << " at_cache_name " << dirp->at_cache_name
-          << " snapid " << dirp->inode->snapid << " I_COMPLETE_ORDERED "
-          << (bool)(dirp->inode->flags & I_COMPLETE_ORDERED)
+          << " snapid " << dirp->inode->snapid << " (complete && ordered) "
+          << dirp->inode->is_complete_and_ordered()
           << " issued " << ccap_string(dirp->inode->caps_issued())
           << dendl;
   if ((dirp->offset == 2 || dirp->at_cache_name.length()) &&
       dirp->inode->snapid != CEPH_SNAPDIR &&
-      (dirp->inode->flags & I_COMPLETE_ORDERED) &&
+      dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
     int err = _readdir_cache_cb(dirp, cb, p);
     if (err != -EAGAIN)
@@ -5581,10 +5589,15 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
     }
 
     if (diri->dir &&
-       diri->dir->release_count == dirp->release_count &&
-       diri->shared_gen == dirp->start_shared_gen) {
-      ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on " << *diri << dendl;
-      diri->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
+       diri->shared_gen == dirp->start_shared_gen &&
+       diri->dir->release_count == dirp->release_count) {
+      if (diri->dir->ordered_count == dirp->ordered_count) {
+       ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
+       diri->flags |= I_COMPLETE | I_DIR_ORDERED;
+      } else {
+       ldout(cct, 10) << " marking I_COMPLETE on " << *diri << dendl;
+       diri->flags |= I_COMPLETE;
+      }
     }
 
     dirp->set_end();
index db509a5ffc8c852fbd6f280c7031d0baf87a66f8..54ccc175ec6af891485f136a08dabc106952571e 100644 (file)
@@ -155,6 +155,7 @@ struct dir_result_t {
   string last_name;      // last entry in previous chunk
 
   uint64_t release_count;
+  uint64_t ordered_count;
   int start_shared_gen;  // dir shared_gen at start of readdir
 
   frag_t buffer_frag;
index d7a99f7a618a8f194293a78160172fc57a99d9fc..0e6d28bbb298ff0a4702612dcf773b3eeefb1089 100644 (file)
@@ -9,8 +9,9 @@ class Dir {
   ceph::unordered_map<string, Dentry*> dentries;
   xlist<Dentry*> dentry_list;
   uint64_t release_count;
+  uint64_t ordered_count;
 
-  Dir(Inode* in) : release_count(0) { parent_inode = in; }
+  Dir(Inode* in) : release_count(0), ordered_count(0) { parent_inode = in; }
 
   bool is_empty() {  return dentries.empty(); }
 };
index 0e6cec07880866d4251af5dbe00beb8c281a9672..bb08030d53d6980e5b69d45415c3ee6970800c96 100644 (file)
@@ -70,7 +70,7 @@ struct CapSnap {
 
 // inode flags
 #define I_COMPLETE 1
-#define I_COMPLETE_ORDERED 2
+#define I_DIR_ORDERED 2
 
 struct Inode {
   CephContext *cct;
@@ -135,6 +135,11 @@ struct Inode {
 
   unsigned flags;
 
+  bool is_complete_and_ordered() {
+    static const unsigned wants = I_COMPLETE | I_DIR_ORDERED;
+    return (flags & wants) == wants;
+  }
+
   // about the dir (if this is one!)
   set<int>  dir_contacts;
   bool      dir_hashed, dir_replicated;