]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: introduce a new flag indicating if dentries in directory are sorted
authorYan, Zheng <ukernel@gmail.com>
Tue, 9 Sep 2014 06:06:06 +0000 (14:06 +0800)
committerYan, Zheng <zyan@redhat.com>
Fri, 7 Nov 2014 10:32:34 +0000 (18:32 +0800)
When creating a file, Client::insert_dentry_inode() set the dentry's offset
based on directory's max offset. The offset does not reflect the real
postion of the dentry in directory. Later readdir reply from real postion
of the dentry in directory. Later readdir reply from MDS may change the
dentry's position/offset. This inconsistency can cause missing/duplicate
entries in readdir result if readdir is partly satisfied by dcache_readdir().

The fix is introduce a new flag indicating if dentries in directory are
sorted. We use _readdir_cache_cb() to handle readdir only when the flag is
set, clear the flag after creating/deleting/renaming file.

Fixes: #9178
Signed-off-by: Yan, Zheng <zyan@redhat.com>
(cherry picked from commit 600af25493947871c38214aa370e2544a7fea399)

src/client/Client.cc
src/client/Client.h
src/client/Dir.h
src/client/Inode.h

index ec3a2321f9a4750135587892319122f38c17a4e7..0db6ee260ec4460a7838e485906eb937e0bfd67b 100644 (file)
@@ -512,8 +512,8 @@ void Client::trim_dentry(Dentry *dn)
                 << " in dir " << hex << dn->dir->parent_inode->ino 
                 << dendl;
   if (dn->dir->parent_inode->flags & I_COMPLETE) {
-    ldout(cct, 10) << " clearing I_COMPLETE on " << *dn->dir->parent_inode << dendl;
-    dn->dir->parent_inode->flags &= ~I_COMPLETE;
+    ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *dn->dir->parent_inode << dendl;
+    dn->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
     dn->dir->release_count++;
   }
   unlink(dn, false, false);  // drop dir, drop dentry
@@ -735,8 +735,8 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
       in->dirstat.nfiles == 0 &&
       in->dirstat.nsubdirs == 0) {
-    ldout(cct, 10) << " marking I_COMPLETE on empty dir " << *in << dendl;
-    in->flags |= I_COMPLETE;
+    ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on empty dir " << *in << dendl;
+    in->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
     if (in->dir) {
       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
                     << in->dir->dentry_map.size() << " entries, marking all dentries null" << dendl;
@@ -758,7 +758,7 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
  * insert_dentry_inode - insert + link a single dentry + inode into the metadata cache.
  */
 Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
-                                   Inode *in, utime_t from, MetaSession *session, bool set_offset,
+                                   Inode *in, utime_t from, MetaSession *session,
                                    Dentry *old_dentry)
 {
   Dentry *dn = NULL;
@@ -785,14 +785,20 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
   
   if (!dn || dn->inode == 0) {
     in->get();
-    if (old_dentry)
+    if (old_dentry) {
+      if (old_dentry->dir->parent_inode->flags & I_COMPLETE_ORDERED) {
+       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on "
+                      << *old_dentry->dir->parent_inode << dendl;
+       old_dentry->dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+      }
       unlink(old_dentry, dir == old_dentry->dir, false);  // drop dentry, keep dir open if its the same dir
+    }
+    if (dir->parent_inode->flags & I_COMPLETE_ORDERED) {
+       ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *dir->parent_inode << dendl;
+       dir->parent_inode->flags &= ~I_COMPLETE_ORDERED;
+    }
     dn = link(dir, dname, in, dn);
     put_inode(in);
-    if (set_offset) {
-      ldout(cct, 15) << " setting dn offset to " << dir->max_offset << dendl;
-      dn->offset = dir->max_offset++;
-    }
   }
 
   update_dentry_lease(dn, dlease, from, session);
@@ -1017,8 +1023,8 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     Dentry *d = request->dentry();
     if (d && d->dir &&
        (d->dir->parent_inode->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing I_COMPLETE on " << *d->dir->parent_inode << dendl;
-      d->dir->parent_inode->flags &= ~I_COMPLETE;
+      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *d->dir->parent_inode << dendl;
+      d->dir->parent_inode->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
       d->dir->release_count++;
     }
 
@@ -1078,14 +1084,19 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
 
     if (in) {
       Dir *dir = diri->open_dir();
-      insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session, true,
+      insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session,
                           ((request->head.op == CEPH_MDS_OP_RENAME) ?
                                         request->old_dentry() : NULL));
     } else {
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
-       if (dn->inode)
+       if (dn->inode) {
+         if (diri->flags & I_COMPLETE_ORDERED) {
+           ldout(cct, 10) << " clearing I_COMPLETE_ORDERED on " << *diri << dendl;
+           diri->flags &= ~I_COMPLETE_ORDERED;
+         }
          unlink(dn, true, true);  // keep dir, dentry
+       }
       }
     }
   } else if (reply->head.op == CEPH_MDS_OP_LOOKUPSNAP ||
@@ -1104,7 +1115,7 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
 
     if (in) {
       Dir *dir = diri->open_dir();
-      insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session, true);
+      insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session);
     } else {
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
@@ -3029,8 +3040,8 @@ void Client::check_cap_issue(Inode *in, Cap *cap, unsigned issued)
     in->shared_gen++;
 
     if (in->is_dir() && (in->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing I_COMPLETE on " << *in << dendl;
-      in->flags &= ~I_COMPLETE;
+      ldout(cct, 10) << " clearing (I_COMPLETE|I_COMPLETE_ORDERED) on " << *in << dendl;
+      in->flags &= ~(I_COMPLETE | I_COMPLETE_ORDERED);
     }
   }
 }
@@ -5530,12 +5541,13 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
 
   // can we read from our cache?
   ldout(cct, 10) << "offset " << hex << dirp->offset << dec << " at_cache_name " << dirp->at_cache_name
-          << " snapid " << dirp->inode->snapid << " complete " << (bool)(dirp->inode->flags & I_COMPLETE)
+          << " snapid " << dirp->inode->snapid << " I_COMPLETE_ORDERED "
+          << (bool)(dirp->inode->flags & I_COMPLETE_ORDERED)
           << " issued " << ccap_string(dirp->inode->caps_issued())
           << dendl;
   if ((dirp->offset == 2 || dirp->at_cache_name.length()) &&
       dirp->inode->snapid != CEPH_SNAPDIR &&
-      (dirp->inode->flags & I_COMPLETE) &&
+      (dirp->inode->flags & I_COMPLETE_ORDERED) &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
     int err = _readdir_cache_cb(dirp, cb, p);
     if (err != -EAGAIN)
@@ -5604,10 +5616,8 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
     if (diri->dir &&
        diri->dir->release_count == dirp->release_count &&
        diri->shared_gen == dirp->start_shared_gen) {
-      ldout(cct, 10) << " marking I_COMPLETE on " << *diri << dendl;
-      diri->flags |= I_COMPLETE;
-      if (diri->dir)
-       diri->dir->max_offset = dirp->offset;
+      ldout(cct, 10) << " marking (I_COMPLETE|I_COMPLETE_ORDERED) on " << *diri << dendl;
+      diri->flags |= I_COMPLETE | I_COMPLETE_ORDERED;
     }
 
     dirp->set_end();
index 1ac9754e767506515b949a08f78bce88e5ec2b9b..db509a5ffc8c852fbd6f280c7031d0baf87a66f8 100644 (file)
@@ -534,7 +534,7 @@ protected:
                              int issued);
   Inode *add_update_inode(InodeStat *st, utime_t ttl, MetaSession *session);
   Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
-                             Inode *in, utime_t from, MetaSession *session, bool set_offset,
+                             Inode *in, utime_t from, MetaSession *session,
                              Dentry *old_dentry = NULL);
   void update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, MetaSession *session);
 
index ad4ca52bdbe0cdb9f0c868c99d8214c089e2754a..4ec59b31c05e5c238698c382d7e61b4c768944c6 100644 (file)
@@ -9,9 +9,8 @@ class Dir {
   ceph::unordered_map<string, Dentry*> dentries;
   map<string, Dentry*> dentry_map;
   uint64_t release_count;
-  uint64_t max_offset;
 
-  Dir(Inode* in) : release_count(0), max_offset(2) { parent_inode = in; }
+  Dir(Inode* in) : release_count(0) { parent_inode = in; }
 
   bool is_empty() {  return dentries.empty(); }
 };
index f0c3b328ea697527b28284c2d8cdb9ff512be8fa..0e6cec07880866d4251af5dbe00beb8c281a9672 100644 (file)
@@ -70,6 +70,7 @@ struct CapSnap {
 
 // inode flags
 #define I_COMPLETE 1
+#define I_COMPLETE_ORDERED 2
 
 struct Inode {
   CephContext *cct;