]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: kill QuotaTree 9591/head
authorYan, Zheng <zyan@redhat.com>
Wed, 8 Jun 2016 09:47:58 +0000 (17:47 +0800)
committerYan, Zheng <zyan@redhat.com>
Tue, 14 Jun 2016 09:06:01 +0000 (17:06 +0800)
Multiple clients can modify cephfs at the same time. It is
very tricky to keep QuotaTree consistant with the global FS
hiberarchy. This patch kills the quota tree.

After removing the quota tree, we traverse inode's path to
find quota root.

Fixes: http://tracker.ceph.com/issues/16066
Fixes: http://tracker.ceph.com/issues/16067
Signed-off-by: Yan, Zheng <zyan@redhat.com>
src/client/Client.cc
src/client/Client.h
src/client/Inode.h

index cd69511832c94e120fe796961835d2ecfb453b2d..17b6c6d0f5637781f6f697a9faa47807f963ec42 100644 (file)
@@ -875,18 +875,14 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
 
     in->dirstat = st->dirstat;
     in->rstat = st->rstat;
+    in->quota = st->quota;
+    in->layout = st->layout;
 
     if (in->is_dir()) {
       in->dir_layout = st->dir_layout;
       ldout(cct, 20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
     }
 
-    if (st->quota.is_enable() ^ in->quota.is_enable())
-      invalidate_quota_tree(in);
-    in->quota = st->quota;
-
-    in->layout = st->layout;
-
     update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
                           st->time_warp_seq, st->ctime, st->mtime, st->atime,
                           st->inline_version, st->inline_data,
@@ -1218,6 +1214,7 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
 Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
 {
   MClientReply *reply = request->reply;
+  int op = request->get_op();
 
   ldout(cct, 10) << "insert_trace from " << request->sent_stamp << " mds." << session->mds_num
           << " is_target=" << (int)reply->head.is_target
@@ -1242,14 +1239,14 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     }
 
     if (d && reply->get_result() == 0) {
-      if (request->head.op == CEPH_MDS_OP_RENAME) {
+      if (op == CEPH_MDS_OP_RENAME) {
        // rename
        Dentry *od = request->old_dentry();
        ldout(cct, 10) << " unlinking rename src dn " << od << " for traceless reply" << dendl;
        assert(od);
        unlink(od, true, true);  // keep dir, dentry
-      } else if (request->head.op == CEPH_MDS_OP_RMDIR ||
-                request->head.op == CEPH_MDS_OP_UNLINK) {
+      } else if (op == CEPH_MDS_OP_RMDIR ||
+                op == CEPH_MDS_OP_UNLINK) {
        // unlink, rmdir
        ldout(cct, 10) << " unlinking unlink/rmdir dn " << d << " for traceless reply" << dendl;
        unlink(d, true, true);  // keep dir, dentry
@@ -1288,7 +1285,6 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
   if (reply->head.is_target) {
     ist.decode(p, features);
     if (cct->_conf->client_debug_getattr_caps) {
-      int op = request->get_op();
       unsigned wanted = 0;
       if (op == CEPH_MDS_OP_GETATTR || op == CEPH_MDS_OP_LOOKUP)
        wanted = request->head.args.getattr.mask;
@@ -1303,15 +1299,15 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     in = add_update_inode(&ist, request->sent_stamp, session);
   }
 
+  Inode *diri = NULL;
   if (reply->head.is_dentry) {
-    Inode *diri = add_update_inode(&dirst, request->sent_stamp, session);
+    diri = add_update_inode(&dirst, request->sent_stamp, session);
     update_dir_dist(diri, &dst);  // dir stat info is attached to ..
 
     if (in) {
       Dir *dir = diri->open_dir();
       insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session,
-                          ((request->head.op == CEPH_MDS_OP_RENAME) ?
-                                        request->old_dentry() : NULL));
+                          (op == CEPH_MDS_OP_RENAME) ? request->old_dentry() : NULL);
     } else {
       if (diri->dir && diri->dir->dentries.count(dname)) {
        Dentry *dn = diri->dir->dentries[dname];
@@ -1322,14 +1318,14 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
        }
       }
     }
-  } else if (reply->head.op == CEPH_MDS_OP_LOOKUPSNAP ||
-            reply->head.op == CEPH_MDS_OP_MKSNAP) {
+  } else if (op == CEPH_MDS_OP_LOOKUPSNAP ||
+            op == CEPH_MDS_OP_MKSNAP) {
     ldout(cct, 10) << " faking snap lookup weirdness" << dendl;
     // fake it for snap lookup
     vinodeno_t vino = ist.vino;
     vino.snapid = CEPH_SNAPDIR;
     assert(inode_map.count(vino));
-    Inode *diri = inode_map[vino];
+    diri = inode_map[vino];
     
     string dname = request->path.last_dentry();
     
@@ -1349,9 +1345,13 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
   }
 
   if (in) {
-    if (reply->head.op == CEPH_MDS_OP_READDIR ||
-       reply->head.op == CEPH_MDS_OP_LSSNAP)
+    if (op == CEPH_MDS_OP_READDIR ||
+       op == CEPH_MDS_OP_LSSNAP) {
       insert_readdir_results(request, session, in);
+    } else if (op == CEPH_MDS_OP_LOOKUPNAME) {
+      // hack: return parent inode instead
+      in = diri;
+    }
 
     if (request->dentry() == NULL && in != request->inode()) {
       // pin the target inode if its parent dentry is not pinned
@@ -2823,7 +2823,6 @@ void Client::put_inode(Inode *in, int n)
     ldout(cct, 10) << "put_inode deleting " << *in << dendl;
     bool unclean = objectcacher->release_set(&in->oset);
     assert(!unclean);
-    put_qtree(in);
     inode_map.erase(in->vino());
     if (use_faked_inos())
       _release_faked_ino(in);
@@ -2927,7 +2926,6 @@ void Client::unlink(Dentry *dn, bool keepdir, bool keepdentry)
 
   // unlink from inode
   if (in) {
-    invalidate_quota_tree(in.get());
     if (in->is_dir()) {
       if (in->dir)
        dn->put(); // dir -> dn pin
@@ -4475,8 +4473,6 @@ void Client::handle_quota(MClientQuota *m)
     in = inode_map[vino];
 
     if (in) {
-      if (in->quota.is_enable() ^ m->quota.is_enable())
-       invalidate_quota_tree(in);
       in->quota = m->quota;
       in->rstat = m->rstat;
     }
@@ -8899,7 +8895,7 @@ int Client::statfs(const char *path, struct statvfs *stbuf)
   // quota but we can see a parent of it that does have a quota, we'll
   // respect that one instead.
   assert(root != nullptr);
-  Inode *quota_root = get_quota_root(root);
+  Inode *quota_root = root->quota.is_enable() ? root : get_quota_root(root);
 
   // get_quota_root should always give us something if client quotas are
   // enabled
@@ -12081,88 +12077,73 @@ bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool
   return true;
 }
 
-void Client::put_qtree(Inode *in)
-{
-  QuotaTree *qtree = in->qtree;
-  if (qtree) {
-    qtree->invalidate();
-    in->qtree = NULL;
-  }
-}
-
-void Client::invalidate_quota_tree(Inode *in)
-{
-  QuotaTree *qtree = in->qtree;
-  if (qtree) {
-    ldout(cct, 10) << "invalidate quota tree node " << *in << dendl;
-    if (qtree->parent_ref()) {
-      assert(in->is_dir());
-      ldout(cct, 15) << "invalidate quota tree ancestor " << *in << dendl;
-      Inode *ancestor = qtree->ancestor()->in();
-      if (ancestor)
-        put_qtree(ancestor);
-    }
-    put_qtree(in);
-  }
-}
-
 Inode *Client::get_quota_root(Inode *in)
 {
   if (!cct->_conf->client_quota)
     return NULL;
 
-  QuotaTree *ancestor = NULL;
-  QuotaTree *parent = NULL;
+  Inode *cur = in;
+  utime_t now = ceph_clock_now(cct);
 
-  vector<Inode*> inode_list;
-  while (in) {
-    if (in->qtree && in->qtree->ancestor()->in()) {
-      ancestor = in->qtree->ancestor();
-      parent = in->qtree;
+  while (cur) {
+    if (cur != in && cur->quota.is_enable())
       break;
-    }
-
-    inode_list.push_back(in);
 
-    if (!in->dn_set.empty())
-      in = in->get_first_parent()->dir->parent_inode;
-    else if (root_parents.count(in))
-      in = root_parents[in].get();
-    else
-      in = NULL;
-  }
-
-  if (!in) {
-    assert(!parent && !ancestor);
-    assert(root_ancestor->qtree == NULL);
-    root_ancestor->qtree = ancestor = new QuotaTree(root_ancestor);
-    ancestor->set_ancestor(ancestor);
-    parent = ancestor;
-  }
-  assert(parent && ancestor);
-
-  for (vector<Inode*>::reverse_iterator iter = inode_list.rbegin();
-       iter != inode_list.rend(); ++iter) {
-    Inode *cur = *iter;
+    Inode *parent_in = NULL;
+    if (!cur->dn_set.empty()) {
+      for (auto p = cur->dn_set.begin(); p != cur->dn_set.end(); ++p) {
+       Dentry *dn = *p;
+       if (dn->lease_mds >= 0 &&
+           dn->lease_ttl > now &&
+           mds_sessions.count(dn->lease_mds)) {
+         parent_in = dn->dir->parent_inode;
+       } else {
+         Inode *diri = dn->dir->parent_inode;
+         if (diri->caps_issued_mask(CEPH_CAP_FILE_SHARED) &&
+             diri->shared_gen == dn->cap_shared_gen) {
+           parent_in = dn->dir->parent_inode;
+         }
+       }
+       if (parent_in)
+         break;
+      }
+    } else if (root_parents.count(cur)) {
+      parent_in = root_parents[cur].get();
+    }
 
-    if (!cur->qtree)
-      cur->qtree = new QuotaTree(cur);
+    if (parent_in) {
+      cur = parent_in;
+      continue;
+    }
 
-    cur->qtree->set_parent(parent);
-    if (parent->in()->quota.is_enable())
-      ancestor = parent;
-    cur->qtree->set_ancestor(ancestor);
+    if (cur == root_ancestor)
+      break;
 
-    ldout(cct, 20) << "link quota tree " << cur->ino
-                   << " to parent (" << parent->in()->ino << ")"
-                   << " ancestor (" << ancestor->in()->ino << ")" << dendl;
+    MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPNAME);
+    filepath path(cur->ino);
+    req->set_filepath(path);
+    req->set_inode(cur);
+
+    InodeRef parent_ref;
+    int ret = make_request(req, -1, -1, &parent_ref);
+    if (ret < 0) {
+      ldout(cct, 1) << __func__ << " " << in->vino()
+                   << " failed to find parent of " << cur->vino()
+                   << " err " << ret <<  dendl;
+      // FIXME: what to do?
+      cur = root_ancestor;
+      break;
+    }
 
-    parent = cur->qtree;
-    if (cur->quota.is_enable())
-      ancestor = cur->qtree;
+    now = ceph_clock_now(cct);
+    if (cur == in)
+      cur = parent_ref.get();
+    else
+      cur = in; // start over
   }
 
-  return ancestor->in();
+  ldout(cct, 10) << __func__ << " " << in->vino() << " -> " << cur->vino() << dendl;
+  return cur;
 }
 
 /**
index 98cb089b141a781b6ee7e9d8cf91223618f84465..05a0b29affbedd8db6ab3e16f7398bf82ee9a1e4 100644 (file)
@@ -559,10 +559,7 @@ protected:
 
   int authenticate();
 
-  void put_qtree(Inode *in);
-  void invalidate_quota_tree(Inode *in);
   Inode* get_quota_root(Inode *in);
-
   bool check_quota_condition(
       Inode *in,
       std::function<bool (const Inode &)> test);
index b68a10a2eb209913901dd86c25b06ba68513abef..5398c68c41f60a91b525c47a30caec05ade90f44 100644 (file)
@@ -76,80 +76,6 @@ struct CapSnap {
   void dump(Formatter *f) const;
 };
 
-class QuotaTree {
-private:
-  Inode *_in;
-
-  int _ancestor_ref;
-  QuotaTree *_ancestor;
-  int _parent_ref;
-  QuotaTree *_parent;
-
-  void _put()
-  {
-    if (!_in && !_ancestor_ref && !_parent_ref) {
-      set_parent(NULL);
-      set_ancestor(NULL);
-      delete this;
-    }
-  }
-  ~QuotaTree() {}
-public:
-  explicit QuotaTree(Inode *i) :
-    _in(i),
-    _ancestor_ref(0),
-    _ancestor(NULL),
-    _parent_ref(0),
-    _parent(NULL)
-  { assert(i); }
-
-  Inode *in() { return _in; }
-
-  int ancestor_ref() { return _ancestor_ref; }
-  int parent_ref() { return _parent_ref; }
-
-  QuotaTree *ancestor() { return _ancestor; }
-  void set_ancestor(QuotaTree *ancestor)
-  {
-    if (ancestor == _ancestor)
-      return;
-
-    if (_ancestor) {
-      --_ancestor->_ancestor_ref;
-      _ancestor->_put();
-    }
-    _ancestor = ancestor;
-    if (_ancestor)
-      ++_ancestor->_ancestor_ref;
-  }
-
-  QuotaTree *parent() { return _parent; }
-  void set_parent(QuotaTree *parent)
-  {
-    if (parent == _parent)
-      return;
-
-    if (_parent) {
-      --_parent->_parent_ref;
-      _parent->_put();
-    }
-    _parent = parent;
-    if (parent)
-      ++_parent->_parent_ref;
-  }
-
-  void invalidate()
-  {
-    if (!_in)
-      return;
-
-    _in = NULL;
-    set_ancestor(NULL);
-    set_parent(NULL);
-    _put();
-  }
-};
-
 // inode flags
 #define I_COMPLETE     1
 #define I_DIR_ORDERED  2
@@ -219,7 +145,6 @@ struct Inode {
   unsigned flags;
 
   quota_info_t quota;
-  QuotaTree* qtree;
 
   bool is_complete_and_ordered() {
     static const unsigned wants = I_COMPLETE | I_DIR_ORDERED;
@@ -302,7 +227,7 @@ struct Inode {
       rdev(0), mode(0), uid(0), gid(0), nlink(0),
       size(0), truncate_seq(1), truncate_size(-1),
       time_warp_seq(0), max_size(0), version(0), xattr_version(0),
-      inline_version(0), flags(0), qtree(NULL),
+      inline_version(0), flags(0),
       dir(0), dir_release_count(1), dir_ordered_count(1),
       dir_hashed(false), dir_replicated(false), auth_cap(NULL),
       cap_dirtier_uid(-1), cap_dirtier_gid(-1),