]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: fix simultaneous readdirs race 9655/head
authorYan, Zheng <zyan@redhat.com>
Mon, 9 May 2016 14:12:48 +0000 (22:12 +0800)
committerGreg Farnum <gfarnum@redhat.com>
Sun, 12 Jun 2016 21:10:32 +0000 (14:10 -0700)
Current readdir code uses list to track the order of the dentries
in readdir replies.  When handling a readdir reply, it pushes the
resulting dentries to the back of directory's dentry_list. After
readdir finishes, the dentry_list reflects how MDS sorts dentries.

This method is racy when there are simultaneous readdirs. The fix
is use vector instead of list to trace how dentries are sorted in
its parent directory. As long as shared_gen doesn't change, each
dentry is at fixed position of the vector. So cocurrent readdirs
do not affect each other.

Fixes: http://tracker.ceph.com/issues/15508
Signed-off-by: Yan, Zheng <zyan@redhat.com>
(cherry picked from commit 9d297c5e98f814b282dadc379ab70dfa678db73e)

Signed-off-by: Greg Farnum <gfarnum@redhat.com
src/client/Client.cc
src/client/Client.h
src/client/Dentry.h
src/client/Dir.h
src/client/Inode.h

index 476a62f93ed71d89f76d68d2e39e3d6e7dc85f45..69c6de5a2de1c3401d7a76ede73110a12afafe7a 100644 (file)
@@ -175,7 +175,7 @@ bool Client::CommandHook::call(std::string command, cmdmap_t& cmdmap,
 
 dir_result_t::dir_result_t(Inode *in)
   : inode(in), owner_uid(-1), owner_gid(-1), offset(0), next_offset(2),
-    release_count(0), ordered_count(0), start_shared_gen(0)
+    release_count(0), ordered_count(0), cache_index(0), start_shared_gen(0)
   { }
 
 void Client::_reset_faked_inos()
@@ -715,10 +715,7 @@ void Client::trim_dentry(Dentry *dn)
   if (dn->inode) {
     Inode *diri = dn->dir->parent_inode;
     diri->dir_release_count++;
-    if (diri->flags & I_COMPLETE) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
-      diri->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
-    }
+    clear_dir_complete_and_ordered(diri, true);
   }
   unlink(dn, false, false);  // drop dir, drop dentry
 }
@@ -954,13 +951,14 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
     in->flags |= I_COMPLETE | I_DIR_ORDERED;
     if (in->dir) {
       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
-                    << in->dir->dentry_list.size() << " entries, marking all dentries null" << dendl;
-      for (xlist<Dentry*>::iterator p = in->dir->dentry_list.begin();
-          !p.end();
+                    << in->dir->dentries.size() << " entries, marking all dentries null" << dendl;
+      in->dir->readdir_cache.clear();
+      for (auto p = in->dir->dentries.begin();
+          p != in->dir->dentries.end();
           ++p) {
-       unlink(*p, true, true);  // keep dir, keep dentry
+       unlink(p->second, true, true);  // keep dir, keep dentry
       }
-      if (in->dir->dentry_list.empty())
+      if (in->dir->dentries.empty())
        close_dir(in->dir);
     }
   }
@@ -1004,19 +1002,13 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
       if (old_dentry->dir != dir) {
        Inode *old_diri = old_dentry->dir->parent_inode;
        old_diri->dir_ordered_count++;
-       if (old_diri->flags & I_DIR_ORDERED) {
-         ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *old_diri << dendl;
-         old_diri->flags &= ~I_DIR_ORDERED;
-       }
+       clear_dir_complete_and_ordered(old_diri, false);
       }
       unlink(old_dentry, dir == old_dentry->dir, false);  // drop dentry, keep dir open if its the same dir
     }
     Inode *diri = dir->parent_inode;
     diri->dir_ordered_count++;
-    if (diri->flags & I_DIR_ORDERED) {
-       ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *diri << dendl;
-       diri->flags &= ~I_DIR_ORDERED;
-    }
+    clear_dir_complete_and_ordered(diri, false);
     dn = link(dir, dname, in, dn);
   }
 
@@ -1080,6 +1072,23 @@ void Client::update_dir_dist(Inode *in, DirStat *dst)
   */
 }
 
+void Client::clear_dir_complete_and_ordered(Inode *diri, bool complete)
+{
+  if (diri->flags & I_COMPLETE) {
+    if (complete) {
+      ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
+      diri->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
+    } else {
+      if (diri->flags & I_DIR_ORDERED) {
+       ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *diri << dendl;
+       diri->flags &= ~I_DIR_ORDERED;
+      }
+    }
+    if (diri->dir)
+      diri->dir->readdir_cache.clear();
+  }
+}
+
 /*
  * insert results from readdir or lssnap into the metadata cache.
  */
@@ -1137,10 +1146,12 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
                   << ", hash_order=" << hash_order << ", offset " << readdir_offset
                   << ", readdir_start " << readdir_start << dendl;
 
-    if (fg.is_leftmost() && readdir_offset == 2) {
+    if (diri->snapid != CEPH_SNAPDIR &&
+       fg.is_leftmost() && readdir_offset == 2) {
       dirp->release_count = diri->dir_release_count;
       dirp->ordered_count = diri->dir_ordered_count;
       dirp->start_shared_gen = diri->shared_gen;
+      dirp->cache_index = 0;
     }
 
     dirp->buffer_frag = fg;
@@ -1170,7 +1181,6 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
          // keep existing dn
          dn = olddn;
          touch_dn(dn);
-         dn->item_dentry_list.move_to_back();
        }
       } else {
        // new dn
@@ -1187,6 +1197,26 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
       } else {
        dn->offset = dir_result_t::make_fpos(fg, readdir_offset++, false);
       }
+      // add to readdir cache
+      if (dirp->release_count == diri->dir_release_count &&
+         dirp->ordered_count == diri->dir_ordered_count &&
+         dirp->start_shared_gen == diri->shared_gen) {
+       if (dirp->cache_index == dir->readdir_cache.size()) {
+         if (i == 0) {
+           assert(!dirp->inode->is_complete_and_ordered());
+           dir->readdir_cache.reserve(dirp->cache_index + numdn);
+         }
+         dir->readdir_cache.push_back(dn);
+       } else if (dirp->cache_index < dir->readdir_cache.size()) {
+         if (dirp->inode->is_complete_and_ordered())
+           assert(dir->readdir_cache[dirp->cache_index] == dn);
+         else
+           dir->readdir_cache[dirp->cache_index] = dn;
+       } else {
+         assert(0 == "unexpected readdir buffer idx");
+       }
+       dirp->cache_index++;
+      }
       // add to cached result list
       dirp->buffer.push_back(dir_result_t::dentry(dn->offset, dname, in));
       ldout(cct, 15) << __func__ << "  " << hex << dn->offset << dec << ": '" << dname << "' -> " << in->ino << dendl;
@@ -1228,13 +1258,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     ldout(cct, 10) << "insert_trace -- no trace" << dendl;
 
     Dentry *d = request->dentry();
-    if (d && d->dir) {
+    if (d) {
       Inode *diri = d->dir->parent_inode;
       diri->dir_release_count++;
-      if (diri->flags & I_COMPLETE) {
-       ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
-       diri->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
-      }
+      clear_dir_complete_and_ordered(diri, true);
     }
 
     if (d && reply->get_result() == 0) {
@@ -1313,10 +1340,7 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
        Dentry *dn = diri->dir->dentries[dname];
        if (dn->inode) {
          diri->dir_ordered_count++;
-         if (diri->flags & I_DIR_ORDERED) {
-           ldout(cct, 10) << " clearing I_DIR_ORDERED on " << *diri << dendl;
-           diri->flags &= ~I_DIR_ORDERED;
-         }
+         clear_dir_complete_and_ordered(diri, false);
          unlink(dn, true, true);  // keep dir, dentry
        }
       }
@@ -2865,7 +2889,6 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
     // link to dir
     dn->dir = dir;
     dir->dentries[dn->name] = dn;
-    dir->dentry_list.push_back(&dn->item_dentry_list);
     lru.lru_insert_mid(dn);    // mid or top?
 
     ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
@@ -2873,7 +2896,6 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
   } else {
     ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
                   << " dn " << dn << " (old dn)" << dendl;
-    dn->item_dentry_list.move_to_back();
   }
 
   if (in) {    // link to inode
@@ -2891,6 +2913,9 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
     if (in->is_dir() && !in->dn_set.empty()) {
       Dentry *olddn = in->get_first_parent();
       assert(olddn->dir != dir || olddn->name != name);
+      Inode *old_diri = olddn->dir->parent_inode;
+      old_diri->dir_release_count++;
+      clear_dir_complete_and_ordered(old_diri, true);
       unlink(olddn, true, true);  // keep dir, dentry
     }
 
@@ -2931,7 +2956,6 @@ void Client::unlink(Dentry *dn, bool keepdir, bool keepdentry)
 
     // unlink from dir
     dn->dir->dentries.erase(dn->name);
-    dn->item_dentry_list.remove_myself();
     if (dn->dir->is_empty() && !keepdir)
       close_dir(dn->dir);
     dn->dir = 0;
@@ -3688,10 +3712,8 @@ void Client::check_cap_issue(Inode *in, Cap *cap, unsigned issued)
       !(had & CEPH_CAP_FILE_SHARED)) {
     in->shared_gen++;
 
-    if (in->is_dir() && (in->flags & I_COMPLETE)) {
-      ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *in << dendl;
-      in->flags &= ~(I_COMPLETE | I_DIR_ORDERED);
-    }
+    if (in->is_dir())
+      clear_dir_complete_and_ordered(in, true);
   }
 }
 
@@ -4753,10 +4775,10 @@ void Client::_try_to_trim_inode(Inode *in)
 {
   int ref = in->get_num_ref();
 
-  if (in->dir && !in->dir->dentry_list.empty()) {
-    for (xlist<Dentry*>::iterator p = in->dir->dentry_list.begin();
-       !p.end(); ) {
-      Dentry *dn = *p;
+  if (in->dir && !in->dir->dentries.empty()) {
+    for (auto p = in->dir->dentries.begin();
+        p != in->dir->dentries.end(); ) {
+      Dentry *dn = p->second;
       ++p;
       if (dn->lru_is_expireable())
        unlink(dn, false, false);  // close dir, drop dentry
@@ -6862,6 +6884,14 @@ void Client::seekdir(dir_result_t *dirp, loff_t offset)
 
   ldout(cct, 3) << "seekdir(" << dirp << ", " << offset << ")" << dendl;
 
+  if (offset == dirp->offset)
+    return;
+
+  if (offset > dirp->offset)
+    dirp->release_count = 0;   // bump if we do a forward seek
+  else
+    dirp->ordered_count = 0;   // disable filling readdir cache
+
   if (dirp->hash_order()) {
     if (dirp->offset > offset) {
       _readdir_drop_dirp_buffer(dirp);
@@ -6876,9 +6906,6 @@ void Client::seekdir(dir_result_t *dirp, loff_t offset)
     }
   }
 
-  if (offset > dirp->offset)
-    dirp->release_count--;   // bump if we do a forward seek
-
   dirp->offset = offset;
 }
 
@@ -7008,11 +7035,17 @@ int Client::_readdir_get_frag(dir_result_t *dirp)
   return res;
 }
 
+struct dentry_off_lt {
+  bool operator()(const Dentry* dn, int64_t off) const {
+    return dir_result_t::fpos_cmp(dn->offset, off) < 0;
+  }
+};
+
 int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p)
 {
   assert(client_lock.is_locked());
   ldout(cct, 10) << "_readdir_cache_cb " << dirp << " on " << dirp->inode->ino
-          << " at_cache_name " << dirp->at_cache_name << " offset " << hex << dirp->offset << dec
+          << " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
           << dendl;
   Dir *dir = dirp->inode->dir;
 
@@ -7022,25 +7055,15 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p)
     return 0;
   }
 
-  xlist<Dentry*>::iterator pd = dir->dentry_list.begin();
-  if (dirp->at_cache_name.length()) {
-    ceph::unordered_map<string,Dentry*>::iterator it = dir->dentries.find(dirp->at_cache_name);
-    if (it == dir->dentries.end())
-      return -EAGAIN;
-    Dentry *dn = it->second;
-    assert(dir_result_t::fpos_cmp(dn->offset, dirp->offset) < 0);
-    pd = xlist<Dentry*>::iterator(&dn->item_dentry_list);
-    ++pd;
-    while (!pd.end() &&
-          dir_result_t::fpos_cmp((*pd)->offset, dirp->offset) < 0)
-      ++pd;
-  }
+  vector<Dentry*>::iterator pd = std::lower_bound(dir->readdir_cache.begin(),
+                                                 dir->readdir_cache.end(),
+                                                 dirp->offset, dentry_off_lt());
 
   string dn_name;
   while (true) {
     if (!dirp->inode->is_complete_and_ordered())
       return -EAGAIN;
-    if (pd.end())
+    if (pd == dir->readdir_cache.end())
       break;
     Dentry *dn = *pd;
     if (dn->inode == NULL) {
@@ -7060,7 +7083,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p)
 
     uint64_t next_off = dn->offset + 1;
     ++pd;
-    if (pd.end())
+    if (pd == dir->readdir_cache.end())
       next_off = dir_result_t::END;
 
     fill_dirent(&de, dn->name.c_str(), st.st_mode, st.st_ino, next_off);
@@ -7081,7 +7104,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p)
       dirp->next_offset = 2;
     else
       dirp->next_offset = dirp->offset_low();
-    dirp->at_cache_name = dn_name; // we successfully returned this one; update!
+    dirp->last_name = dn_name; // we successfully returned this one; update!
     if (r > 0)
       return r;
   }
@@ -7155,23 +7178,18 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
   }
 
   // can we read from our cache?
-  ldout(cct, 10) << "offset " << hex << dirp->offset << dec << " at_cache_name " << dirp->at_cache_name
+  ldout(cct, 10) << "offset " << hex << dirp->offset
           << " snapid " << dirp->inode->snapid << " (complete && ordered) "
           << dirp->inode->is_complete_and_ordered()
           << " issued " << ccap_string(dirp->inode->caps_issued())
           << dendl;
-  if ((dirp->offset == 2 || dirp->at_cache_name.length()) &&
-      dirp->inode->snapid != CEPH_SNAPDIR &&
+  if (dirp->inode->snapid != CEPH_SNAPDIR &&
       dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
     int err = _readdir_cache_cb(dirp, cb, p);
     if (err != -EAGAIN)
       return err;
   }
-  if (dirp->at_cache_name.length()) {
-    dirp->last_name = dirp->at_cache_name;
-    dirp->at_cache_name.clear();
-  }
 
   while (1) {
     if (dirp->at_end())
@@ -7229,6 +7247,10 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
        diri->dir_release_count == dirp->release_count) {
       if (diri->dir_ordered_count == dirp->ordered_count) {
        ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
+       if (diri->dir) {
+         assert(diri->dir->readdir_cache.size() >= dirp->cache_index);
+         diri->dir->readdir_cache.resize(dirp->cache_index);
+       }
        diri->flags |= I_COMPLETE | I_DIR_ORDERED;
       } else {
        ldout(cct, 10) << " marking I_COMPLETE on " << *diri << dendl;
index 8ca18069fb54078c8eb8594452e0a9626ebef925..34d4fd66ec4b1450bd70d05fce7412d7e7c96419 100644 (file)
@@ -205,6 +205,7 @@ struct dir_result_t {
 
   uint64_t release_count;
   uint64_t ordered_count;
+  unsigned cache_index;
   int start_shared_gen;  // dir shared_gen at start of readdir
 
   frag_t buffer_frag;
@@ -224,8 +225,6 @@ struct dir_result_t {
   };
   vector<dentry> buffer;
 
-  string at_cache_name;  // last entry we successfully returned
-
   explicit dir_result_t(Inode *in);
 
   unsigned offset_high() { return fpos_high(offset); }
@@ -249,9 +248,10 @@ struct dir_result_t {
 
   void reset() {
     last_name.clear();
-    at_cache_name.clear();
     next_offset = 2;
     offset = 0;
+    ordered_count = 0;
+    cache_index = 0;
     buffer.clear();
   }
 };
@@ -696,6 +696,7 @@ protected:
   // metadata cache
   void update_dir_dist(Inode *in, DirStat *st);
 
+  void clear_dir_complete_and_ordered(Inode *diri, bool complete);
   void insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri);
   Inode* insert_trace(MetaRequest *request, MetaSession *session);
   void update_inode_file_bits(Inode *in,
index 192ef01a80b4073fbd340d7709c5307f0c62bfe0..c2181956cbcd55b5504407d24021612b61380d77 100644 (file)
@@ -24,8 +24,6 @@ class Dentry : public LRUObject {
   ceph_seq_t lease_seq;
   int cap_shared_gen;
 
-  xlist<Dentry*>::item item_dentry_list;
-
   /*
    * ref==1 -> cached, unused
    * ref >1 -> pinned in lru
@@ -49,8 +47,8 @@ class Dentry : public LRUObject {
 
   Dentry() :
     dir(0), ref(1), offset(0),
-    lease_mds(-1), lease_gen(0), lease_seq(0), cap_shared_gen(0),
-    item_dentry_list(this)  { }
+    lease_mds(-1), lease_gen(0), lease_seq(0), cap_shared_gen(0)
+  { }
 private:
   ~Dentry() {
     assert(ref == 0);
index 608473beb961fde1abfcf7297f13001aa20974b6..e6d7f99c903f865bbe95a6f14e542376255c8c77 100644 (file)
@@ -7,7 +7,7 @@ class Dir {
  public:
   Inode    *parent_inode;  // my inode
   ceph::unordered_map<string, Dentry*> dentries;
-  xlist<Dentry*> dentry_list;
+  vector<Dentry*> readdir_cache;
 
   explicit Dir(Inode* in) { parent_inode = in; }
 
index 958e84911bb2766995a6d8529f0ce0ff8172ed51..969da137014d01d663efd4ced3542f826996d69a 100644 (file)
@@ -302,7 +302,7 @@ struct Inode {
       size(0), truncate_seq(1), truncate_size(-1),
       time_warp_seq(0), max_size(0), version(0), xattr_version(0),
       inline_version(0), flags(0), qtree(NULL),
-      dir(0), dir_release_count(0), dir_ordered_count(0),
+      dir(0), dir_release_count(1), dir_ordered_count(1),
       dir_hashed(false), dir_replicated(false), auth_cap(NULL),
       cap_dirtier_uid(-1), cap_dirtier_gid(-1),
       dirty_caps(0), flushing_caps(0), shared_gen(0), cache_gen(0),