in->dirstat = st->dirstat;
in->rstat = st->rstat;
+ in->quota = st->quota;
+ in->layout = st->layout;
if (in->is_dir()) {
in->dir_layout = st->dir_layout;
ldout(cct, 20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
}
- if (st->quota.is_enable() ^ in->quota.is_enable())
- invalidate_quota_tree(in);
- in->quota = st->quota;
-
- in->layout = st->layout;
-
update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
st->time_warp_seq, st->ctime, st->mtime, st->atime,
st->inline_version, st->inline_data,
Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
{
MClientReply *reply = request->reply;
+ int op = request->get_op();
ldout(cct, 10) << "insert_trace from " << request->sent_stamp << " mds." << session->mds_num
<< " is_target=" << (int)reply->head.is_target
}
if (d && reply->get_result() == 0) {
- if (request->head.op == CEPH_MDS_OP_RENAME) {
+ if (op == CEPH_MDS_OP_RENAME) {
// rename
Dentry *od = request->old_dentry();
ldout(cct, 10) << " unlinking rename src dn " << od << " for traceless reply" << dendl;
assert(od);
unlink(od, true, true); // keep dir, dentry
- } else if (request->head.op == CEPH_MDS_OP_RMDIR ||
- request->head.op == CEPH_MDS_OP_UNLINK) {
+ } else if (op == CEPH_MDS_OP_RMDIR ||
+ op == CEPH_MDS_OP_UNLINK) {
// unlink, rmdir
ldout(cct, 10) << " unlinking unlink/rmdir dn " << d << " for traceless reply" << dendl;
unlink(d, true, true); // keep dir, dentry
if (reply->head.is_target) {
ist.decode(p, features);
if (cct->_conf->client_debug_getattr_caps) {
- int op = request->get_op();
unsigned wanted = 0;
if (op == CEPH_MDS_OP_GETATTR || op == CEPH_MDS_OP_LOOKUP)
wanted = request->head.args.getattr.mask;
in = add_update_inode(&ist, request->sent_stamp, session);
}
+ Inode *diri = NULL;
if (reply->head.is_dentry) {
- Inode *diri = add_update_inode(&dirst, request->sent_stamp, session);
+ diri = add_update_inode(&dirst, request->sent_stamp, session);
update_dir_dist(diri, &dst); // dir stat info is attached to ..
if (in) {
Dir *dir = diri->open_dir();
insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session,
- ((request->head.op == CEPH_MDS_OP_RENAME) ?
- request->old_dentry() : NULL));
+ (op == CEPH_MDS_OP_RENAME) ? request->old_dentry() : NULL);
} else {
if (diri->dir && diri->dir->dentries.count(dname)) {
Dentry *dn = diri->dir->dentries[dname];
}
}
}
- } else if (reply->head.op == CEPH_MDS_OP_LOOKUPSNAP ||
- reply->head.op == CEPH_MDS_OP_MKSNAP) {
+ } else if (op == CEPH_MDS_OP_LOOKUPSNAP ||
+ op == CEPH_MDS_OP_MKSNAP) {
ldout(cct, 10) << " faking snap lookup weirdness" << dendl;
// fake it for snap lookup
vinodeno_t vino = ist.vino;
vino.snapid = CEPH_SNAPDIR;
assert(inode_map.count(vino));
- Inode *diri = inode_map[vino];
+ diri = inode_map[vino];
string dname = request->path.last_dentry();
}
if (in) {
- if (reply->head.op == CEPH_MDS_OP_READDIR ||
- reply->head.op == CEPH_MDS_OP_LSSNAP)
+ if (op == CEPH_MDS_OP_READDIR ||
+ op == CEPH_MDS_OP_LSSNAP) {
insert_readdir_results(request, session, in);
+ } else if (op == CEPH_MDS_OP_LOOKUPNAME) {
+ // hack: return parent inode instead
+ in = diri;
+ }
if (request->dentry() == NULL && in != request->inode()) {
// pin the target inode if its parent dentry is not pinned
ldout(cct, 10) << "put_inode deleting " << *in << dendl;
bool unclean = objectcacher->release_set(&in->oset);
assert(!unclean);
- put_qtree(in);
inode_map.erase(in->vino());
if (use_faked_inos())
_release_faked_ino(in);
// unlink from inode
if (in) {
- invalidate_quota_tree(in.get());
if (in->is_dir()) {
if (in->dir)
dn->put(); // dir -> dn pin
in = inode_map[vino];
if (in) {
- if (in->quota.is_enable() ^ m->quota.is_enable())
- invalidate_quota_tree(in);
in->quota = m->quota;
in->rstat = m->rstat;
}
// quota but we can see a parent of it that does have a quota, we'll
// respect that one instead.
assert(root != nullptr);
- Inode *quota_root = get_quota_root(root);
+ Inode *quota_root = root->quota.is_enable() ? root : get_quota_root(root);
// get_quota_root should always give us something if client quotas are
// enabled
return true;
}
-void Client::put_qtree(Inode *in)
-{
- QuotaTree *qtree = in->qtree;
- if (qtree) {
- qtree->invalidate();
- in->qtree = NULL;
- }
-}
-
-void Client::invalidate_quota_tree(Inode *in)
-{
- QuotaTree *qtree = in->qtree;
- if (qtree) {
- ldout(cct, 10) << "invalidate quota tree node " << *in << dendl;
- if (qtree->parent_ref()) {
- assert(in->is_dir());
- ldout(cct, 15) << "invalidate quota tree ancestor " << *in << dendl;
- Inode *ancestor = qtree->ancestor()->in();
- if (ancestor)
- put_qtree(ancestor);
- }
- put_qtree(in);
- }
-}
-
Inode *Client::get_quota_root(Inode *in)
{
if (!cct->_conf->client_quota)
return NULL;
- QuotaTree *ancestor = NULL;
- QuotaTree *parent = NULL;
+ Inode *cur = in;
+ utime_t now = ceph_clock_now(cct);
- vector<Inode*> inode_list;
- while (in) {
- if (in->qtree && in->qtree->ancestor()->in()) {
- ancestor = in->qtree->ancestor();
- parent = in->qtree;
+ while (cur) {
+ if (cur != in && cur->quota.is_enable())
break;
- }
-
- inode_list.push_back(in);
- if (!in->dn_set.empty())
- in = in->get_first_parent()->dir->parent_inode;
- else if (root_parents.count(in))
- in = root_parents[in].get();
- else
- in = NULL;
- }
-
- if (!in) {
- assert(!parent && !ancestor);
- assert(root_ancestor->qtree == NULL);
- root_ancestor->qtree = ancestor = new QuotaTree(root_ancestor);
- ancestor->set_ancestor(ancestor);
- parent = ancestor;
- }
- assert(parent && ancestor);
-
- for (vector<Inode*>::reverse_iterator iter = inode_list.rbegin();
- iter != inode_list.rend(); ++iter) {
- Inode *cur = *iter;
+ Inode *parent_in = NULL;
+ if (!cur->dn_set.empty()) {
+ for (auto p = cur->dn_set.begin(); p != cur->dn_set.end(); ++p) {
+ Dentry *dn = *p;
+ if (dn->lease_mds >= 0 &&
+ dn->lease_ttl > now &&
+ mds_sessions.count(dn->lease_mds)) {
+ parent_in = dn->dir->parent_inode;
+ } else {
+ Inode *diri = dn->dir->parent_inode;
+ if (diri->caps_issued_mask(CEPH_CAP_FILE_SHARED) &&
+ diri->shared_gen == dn->cap_shared_gen) {
+ parent_in = dn->dir->parent_inode;
+ }
+ }
+ if (parent_in)
+ break;
+ }
+ } else if (root_parents.count(cur)) {
+ parent_in = root_parents[cur].get();
+ }
- if (!cur->qtree)
- cur->qtree = new QuotaTree(cur);
+ if (parent_in) {
+ cur = parent_in;
+ continue;
+ }
- cur->qtree->set_parent(parent);
- if (parent->in()->quota.is_enable())
- ancestor = parent;
- cur->qtree->set_ancestor(ancestor);
+ if (cur == root_ancestor)
+ break;
- ldout(cct, 20) << "link quota tree " << cur->ino
- << " to parent (" << parent->in()->ino << ")"
- << " ancestor (" << ancestor->in()->ino << ")" << dendl;
+ MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPNAME);
+ filepath path(cur->ino);
+ req->set_filepath(path);
+ req->set_inode(cur);
+
+ InodeRef parent_ref;
+ int ret = make_request(req, -1, -1, &parent_ref);
+ if (ret < 0) {
+ ldout(cct, 1) << __func__ << " " << in->vino()
+ << " failed to find parent of " << cur->vino()
+ << " err " << ret << dendl;
+ // FIXME: what to do?
+ cur = root_ancestor;
+ break;
+ }
- parent = cur->qtree;
- if (cur->quota.is_enable())
- ancestor = cur->qtree;
+ now = ceph_clock_now(cct);
+ if (cur == in)
+ cur = parent_ref.get();
+ else
+ cur = in; // start over
}
- return ancestor->in();
+ ldout(cct, 10) << __func__ << " " << in->vino() << " -> " << cur->vino() << dendl;
+ return cur;
}
/**
void dump(Formatter *f) const;
};
-class QuotaTree {
-private:
- Inode *_in;
-
- int _ancestor_ref;
- QuotaTree *_ancestor;
- int _parent_ref;
- QuotaTree *_parent;
-
- void _put()
- {
- if (!_in && !_ancestor_ref && !_parent_ref) {
- set_parent(NULL);
- set_ancestor(NULL);
- delete this;
- }
- }
- ~QuotaTree() {}
-public:
- explicit QuotaTree(Inode *i) :
- _in(i),
- _ancestor_ref(0),
- _ancestor(NULL),
- _parent_ref(0),
- _parent(NULL)
- { assert(i); }
-
- Inode *in() { return _in; }
-
- int ancestor_ref() { return _ancestor_ref; }
- int parent_ref() { return _parent_ref; }
-
- QuotaTree *ancestor() { return _ancestor; }
- void set_ancestor(QuotaTree *ancestor)
- {
- if (ancestor == _ancestor)
- return;
-
- if (_ancestor) {
- --_ancestor->_ancestor_ref;
- _ancestor->_put();
- }
- _ancestor = ancestor;
- if (_ancestor)
- ++_ancestor->_ancestor_ref;
- }
-
- QuotaTree *parent() { return _parent; }
- void set_parent(QuotaTree *parent)
- {
- if (parent == _parent)
- return;
-
- if (_parent) {
- --_parent->_parent_ref;
- _parent->_put();
- }
- _parent = parent;
- if (parent)
- ++_parent->_parent_ref;
- }
-
- void invalidate()
- {
- if (!_in)
- return;
-
- _in = NULL;
- set_ancestor(NULL);
- set_parent(NULL);
- _put();
- }
-};
-
// inode flags
#define I_COMPLETE 1
#define I_DIR_ORDERED 2
unsigned flags;
quota_info_t quota;
- QuotaTree* qtree;
bool is_complete_and_ordered() {
static const unsigned wants = I_COMPLETE | I_DIR_ORDERED;
rdev(0), mode(0), uid(0), gid(0), nlink(0),
size(0), truncate_seq(1), truncate_size(-1),
time_warp_seq(0), max_size(0), version(0), xattr_version(0),
- inline_version(0), flags(0), qtree(NULL),
+ inline_version(0), flags(0),
dir(0), dir_release_count(1), dir_ordered_count(1),
dir_hashed(false), dir_replicated(false), auth_cap(NULL),
cap_dirtier_uid(-1), cap_dirtier_gid(-1),