]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: kill QuotaTree 10107/head
authorYan, Zheng <zyan@redhat.com>
Wed, 8 Jun 2016 09:47:58 +0000 (17:47 +0800)
committerXiaoxi Chen <xiaoxchen@ebay.com>
Sat, 2 Jul 2016 07:49:24 +0000 (00:49 -0700)
Multiple clients can modify cephfs at the same time. It is
very tricky to keep QuotaTree consistant with the global FS
hiberarchy. This patch kills the quota tree.

After removing the quota tree, we traverse inode's path to
find quota root.

Fixes: http://tracker.ceph.com/issues/16066
Fixes: http://tracker.ceph.com/issues/16067
Signed-off-by: Yan, Zheng <zyan@redhat.com>
(cherry picked from commit 161954bbfeb13d5588668637d5258221948128ea)

src/client/Client.cc
src/client/Client.h
src/client/Inode.h

index edbe073988e571636e30ba2dd5b51732482003ec..c8889f5f7fc88c8ca3554499410cac64123463b9 100644 (file)
@@ -898,18 +898,14 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
 
     in->dirstat = st->dirstat;
     in->rstat = st->rstat;
+    in->quota = st->quota;
+    in->layout = st->layout;
 
     if (in->is_dir()) {
       in->dir_layout = st->dir_layout;
       ldout(cct, 20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
     }
 
-    if (st->quota.is_enable() ^ in->quota.is_enable())
-      invalidate_quota_tree(in);
-    in->quota = st->quota;
-
-    in->layout = st->layout;
-
     update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
                           st->time_warp_seq, st->ctime, st->mtime, st->atime,
                           st->inline_version, st->inline_data,
@@ -1241,6 +1237,7 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
 Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
 {
   MClientReply *reply = request->reply;
+  int op = request->get_op();
 
   ldout(cct, 10) << "insert_trace from " << request->sent_stamp << " mds." << session->mds_num
           << " is_target=" << (int)reply->head.is_target
@@ -1265,14 +1262,14 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     }
 
     if (d && reply->get_result() == 0) {
-      if (request->head.op == CEPH_MDS_OP_RENAME) {
+      if (op == CEPH_MDS_OP_RENAME) {
        // rename
        Dentry *od = request->old_dentry();
        ldout(cct, 10) << " unlinking rename src dn " << od << " for traceless reply" << dendl;
        assert(od);
        unlink(od, true, true);  // keep dir, dentry
-      } else if (request->head.op == CEPH_MDS_OP_RMDIR ||
-                request->head.op == CEPH_MDS_OP_UNLINK) {
+      } else if (op == CEPH_MDS_OP_RMDIR ||
+                op == CEPH_MDS_OP_UNLINK) {
        // unlink, rmdir
        ldout(cct, 10) << " unlinking unlink/rmdir dn " << d << " for traceless reply" << dendl;
        unlink(d, true, true);  // keep dir, dentry
@@ -1311,7 +1308,6 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
   if (reply->head.is_target) {
     ist.decode(p, features);
     if (cct->_conf->client_debug_getattr_caps) {
-      int op = request->get_op();
       unsigned wanted = 0;
       if (op == CEPH_MDS_OP_GETATTR || op == CEPH_MDS_OP_LOOKUP)
        wanted = request->head.args.getattr.mask;
@@ -1326,15 +1322,15 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     in = add_update_inode(&ist, request->sent_stamp, session);
   }
 
+  Inode *diri = NULL;
   if (reply->head.is_dentry) {
-    Inode *diri = add_update_inode(&dirst, request->sent_stamp, session);
+    diri = add_update_inode(&dirst, request->sent_stamp, session);
     update_dir_dist(diri, &dst);  // dir stat info is attached to ..
 
     if (in) {
       Dir *dir = diri->open_dir();
       insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session,
-                          ((request->head.op == CEPH_MDS_OP_RENAME) ?
-                                        request->old_dentry() : NULL));
+                          (op == CEPH_MDS_OP_RENAME) ? request->old_dentry() : NULL);
     } else {
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
@@ -1345,14 +1341,14 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
        }
       }
     }
-  } else if (reply->head.op == CEPH_MDS_OP_LOOKUPSNAP ||
-            reply->head.op == CEPH_MDS_OP_MKSNAP) {
+  } else if (op == CEPH_MDS_OP_LOOKUPSNAP ||
+            op == CEPH_MDS_OP_MKSNAP) {
     ldout(cct, 10) << " faking snap lookup weirdness" << dendl;
     // fake it for snap lookup
     vinodeno_t vino = ist.vino;
     vino.snapid = CEPH_SNAPDIR;
     assert(inode_map.count(vino));
-    Inode *diri = inode_map[vino];
+    diri = inode_map[vino];
     
     string dname = request->path.last_dentry();
     
@@ -1372,9 +1368,13 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
   }
 
   if (in) {
-    if (reply->head.op == CEPH_MDS_OP_READDIR ||
-       reply->head.op == CEPH_MDS_OP_LSSNAP)
+    if (op == CEPH_MDS_OP_READDIR ||
+       op == CEPH_MDS_OP_LSSNAP) {
       insert_readdir_results(request, session, in);
+    } else if (op == CEPH_MDS_OP_LOOKUPNAME) {
+      // hack: return parent inode instead
+      in = diri;
+    }
 
     if (request->dentry() == NULL && in != request->inode()) {
       // pin the target inode if its parent dentry is not pinned
@@ -2832,7 +2832,6 @@ void Client::put_inode(Inode *in, int n)
     ldout(cct, 10) << "put_inode deleting " << *in << dendl;
     bool unclean = objectcacher->release_set(&in->oset);
     assert(!unclean);
-    put_qtree(in);
     inode_map.erase(in->vino());
     if (use_faked_inos())
       _release_faked_ino(in);
@@ -2936,7 +2935,6 @@ void Client::unlink(Dentry *dn, bool keepdir, bool keepdentry)
 
   // unlink from inode
   if (in) {
-    invalidate_quota_tree(in.get());
     if (in->is_dir()) {
       if (in->dir)
        dn->put(); // dir -> dn pin
@@ -4466,8 +4464,6 @@ void Client::handle_quota(MClientQuota *m)
     in = inode_map[vino];
 
     if (in) {
-      if (in->quota.is_enable() ^ m->quota.is_enable())
-       invalidate_quota_tree(in);
       in->quota = m->quota;
       in->rstat = m->rstat;
     }
@@ -8814,7 +8810,7 @@ int Client::statfs(const char *path, struct statvfs *stbuf)
   // quota but we can see a parent of it that does have a quota, we'll
   // respect that one instead.
   assert(root != nullptr);
-  Inode *quota_root = get_quota_root(root);
+  Inode *quota_root = root->quota.is_enable() ? root : get_quota_root(root);
 
   // get_quota_root should always give us something if client quotas are
   // enabled
@@ -11991,88 +11987,73 @@ bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool
   return true;
 }
 
-void Client::put_qtree(Inode *in)
-{
-  QuotaTree *qtree = in->qtree;
-  if (qtree) {
-    qtree->invalidate();
-    in->qtree = NULL;
-  }
-}
-
-void Client::invalidate_quota_tree(Inode *in)
-{
-  QuotaTree *qtree = in->qtree;
-  if (qtree) {
-    ldout(cct, 10) << "invalidate quota tree node " << *in << dendl;
-    if (qtree->parent_ref()) {
-      assert(in->is_dir());
-      ldout(cct, 15) << "invalidate quota tree ancestor " << *in << dendl;
-      Inode *ancestor = qtree->ancestor()->in();
-      if (ancestor)
-        put_qtree(ancestor);
-    }
-    put_qtree(in);
-  }
-}
-
 Inode *Client::get_quota_root(Inode *in)
 {
   if (!cct->_conf->client_quota)
     return NULL;
 
-  QuotaTree *ancestor = NULL;
-  QuotaTree *parent = NULL;
+  Inode *cur = in;
+  utime_t now = ceph_clock_now(cct);
 
-  vector<Inode*> inode_list;
-  while (in) {
-    if (in->qtree && in->qtree->ancestor()->in()) {
-      ancestor = in->qtree->ancestor();
-      parent = in->qtree;
+  while (cur) {
+    if (cur != in && cur->quota.is_enable())
       break;
-    }
-
-    inode_list.push_back(in);
 
-    if (!in->dn_set.empty())
-      in = in->get_first_parent()->dir->parent_inode;
-    else if (root_parents.count(in))
-      in = root_parents[in].get();
-    else
-      in = NULL;
-  }
-
-  if (!in) {
-    assert(!parent && !ancestor);
-    assert(root_ancestor->qtree == NULL);
-    root_ancestor->qtree = ancestor = new QuotaTree(root_ancestor);
-    ancestor->set_ancestor(ancestor);
-    parent = ancestor;
-  }
-  assert(parent && ancestor);
-
-  for (vector<Inode*>::reverse_iterator iter = inode_list.rbegin();
-       iter != inode_list.rend(); ++iter) {
-    Inode *cur = *iter;
+    Inode *parent_in = NULL;
+    if (!cur->dn_set.empty()) {
+      for (auto p = cur->dn_set.begin(); p != cur->dn_set.end(); ++p) {
+       Dentry *dn = *p;
+       if (dn->lease_mds >= 0 &&
+           dn->lease_ttl > now &&
+           mds_sessions.count(dn->lease_mds)) {
+         parent_in = dn->dir->parent_inode;
+       } else {
+         Inode *diri = dn->dir->parent_inode;
+         if (diri->caps_issued_mask(CEPH_CAP_FILE_SHARED) &&
+             diri->shared_gen == dn->cap_shared_gen) {
+           parent_in = dn->dir->parent_inode;
+         }
+       }
+       if (parent_in)
+         break;
+      }
+    } else if (root_parents.count(cur)) {
+      parent_in = root_parents[cur].get();
+    }
 
-    if (!cur->qtree)
-      cur->qtree = new QuotaTree(cur);
+    if (parent_in) {
+      cur = parent_in;
+      continue;
+    }
 
-    cur->qtree->set_parent(parent);
-    if (parent->in()->quota.is_enable())
-      ancestor = parent;
-    cur->qtree->set_ancestor(ancestor);
+    if (cur == root_ancestor)
+      break;
 
-    ldout(cct, 20) << "link quota tree " << cur->ino
-                   << " to parent (" << parent->in()->ino << ")"
-                   << " ancestor (" << ancestor->in()->ino << ")" << dendl;
+    MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPNAME);
+    filepath path(cur->ino);
+    req->set_filepath(path);
+    req->set_inode(cur);
+
+    InodeRef parent_ref;
+    int ret = make_request(req, -1, -1, &parent_ref);
+    if (ret < 0) {
+      ldout(cct, 1) << __func__ << " " << in->vino()
+                   << " failed to find parent of " << cur->vino()
+                   << " err " << ret <<  dendl;
+      // FIXME: what to do?
+      cur = root_ancestor;
+      break;
+    }
 
-    parent = cur->qtree;
-    if (cur->quota.is_enable())
-      ancestor = cur->qtree;
+    now = ceph_clock_now(cct);
+    if (cur == in)
+      cur = parent_ref.get();
+    else
+      cur = in; // start over
   }
 
-  return ancestor->in();
+  ldout(cct, 10) << __func__ << " " << in->vino() << " -> " << cur->vino() << dendl;
+  return cur;
 }
 
 /**
index 647a122f90b8cbb88401a36437b7caff648de9cf..c9e4b7398a07ad5ca4728ffd17e163d583dbd2db 100644 (file)
@@ -563,10 +563,7 @@ protected:
 
   int authenticate();
 
-  void put_qtree(Inode *in);
-  void invalidate_quota_tree(Inode *in);
   Inode* get_quota_root(Inode *in);
-
   bool check_quota_condition(
       Inode *in,
       std::function<bool (const Inode &)> test);
index 969da137014d01d663efd4ced3542f826996d69a..f305780e7e008ee1becc3c2528a477f269b7830f 100644 (file)
@@ -76,80 +76,6 @@ struct CapSnap {
   void dump(Formatter *f) const;
 };
 
-class QuotaTree {
-private:
-  Inode *_in;
-
-  int _ancestor_ref;
-  QuotaTree *_ancestor;
-  int _parent_ref;
-  QuotaTree *_parent;
-
-  void _put()
-  {
-    if (!_in && !_ancestor_ref && !_parent_ref) {
-      set_parent(NULL);
-      set_ancestor(NULL);
-      delete this;
-    }
-  }
-  ~QuotaTree() {}
-public:
-  explicit QuotaTree(Inode *i) :
-    _in(i),
-    _ancestor_ref(0),
-    _ancestor(NULL),
-    _parent_ref(0),
-    _parent(NULL)
-  { assert(i); }
-
-  Inode *in() { return _in; }
-
-  int ancestor_ref() { return _ancestor_ref; }
-  int parent_ref() { return _parent_ref; }
-
-  QuotaTree *ancestor() { return _ancestor; }
-  void set_ancestor(QuotaTree *ancestor)
-  {
-    if (ancestor == _ancestor)
-      return;
-
-    if (_ancestor) {
-      --_ancestor->_ancestor_ref;
-      _ancestor->_put();
-    }
-    _ancestor = ancestor;
-    if (_ancestor)
-      ++_ancestor->_ancestor_ref;
-  }
-
-  QuotaTree *parent() { return _parent; }
-  void set_parent(QuotaTree *parent)
-  {
-    if (parent == _parent)
-      return;
-
-    if (_parent) {
-      --_parent->_parent_ref;
-      _parent->_put();
-    }
-    _parent = parent;
-    if (parent)
-      ++_parent->_parent_ref;
-  }
-
-  void invalidate()
-  {
-    if (!_in)
-      return;
-
-    _in = NULL;
-    set_ancestor(NULL);
-    set_parent(NULL);
-    _put();
-  }
-};
-
 // inode flags
 #define I_COMPLETE 1
 #define I_DIR_ORDERED 2
@@ -218,7 +144,6 @@ struct Inode {
   unsigned flags;
 
   quota_info_t quota;
-  QuotaTree* qtree;
 
   bool is_complete_and_ordered() {
     static const unsigned wants = I_COMPLETE | I_DIR_ORDERED;
@@ -301,7 +226,7 @@ struct Inode {
       rdev(0), mode(0), uid(0), gid(0), nlink(0),
       size(0), truncate_seq(1), truncate_size(-1),
       time_warp_seq(0), max_size(0), version(0), xattr_version(0),
-      inline_version(0), flags(0), qtree(NULL),
+      inline_version(0), flags(0),
       dir(0), dir_release_count(1), dir_ordered_count(1),
       dir_hashed(false), dir_replicated(false), auth_cap(NULL),
       cap_dirtier_uid(-1), cap_dirtier_gid(-1),