return client_request ? client_request->is_queued_for_replay() : false;
}
-bool MDRequestImpl::is_batch_op()
+bool MDRequestImpl::can_batch()
{
- return (client_request->get_op() == CEPH_MDS_OP_LOOKUP &&
- client_request->get_filepath().depth() == 1) ||
- (client_request->get_op() == CEPH_MDS_OP_GETATTR &&
- client_request->get_filepath().depth() == 0);
+ if (num_auth_pins || num_remote_auth_pins || lock_cache || !locks.empty())
+ return false;
+
+ auto op = client_request->get_op();
+ auto& path = client_request->get_filepath();
+ if (op == CEPH_MDS_OP_GETATTR) {
+ if (path.depth() == 0)
+ return true;
+ } else if (op == CEPH_MDS_OP_LOOKUP) {
+ if (path.depth() == 1 && !path.is_last_snap())
+ return true;
+ }
+
+ return false;
}
std::unique_ptr<BatchOp> MDRequestImpl::release_batch_op()
if (mask & CEPH_STAT_RSTAT)
want_auth = true; // set want_auth for CEPH_STAT_RSTAT mask
- CInode *ref = rdlock_path_pin_ref(mdr, want_auth, false);
- if (!ref)
- return;
+ if (!mdr->is_batch_head() && mdr->can_batch()) {
+ CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
+ int r = mdcache->path_traverse(mdr, cf, mdr->get_filepath(),
+ (want_auth ? MDS_TRAVERSE_WANT_AUTH : 0),
+ &mdr->dn[0], &mdr->in[0]);
+ if (r > 0)
+ return; // delayed
- mdr->getattr_caps = mask;
-
- if (mdr->snapid == CEPH_NOSNAP && !mdr->is_batch_head() && mdr->is_batch_op()) {
- if (!is_lookup) {
- mdr->pin(ref);
- auto em = ref->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
+ if (r < 0) {
+ // fall-thru. let rdlock_path_pin_ref() check again.
+ } else if (is_lookup) {
+ CDentry* dn = mdr->dn[0].back();
+ mdr->pin(dn);
+ auto em = dn->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
- dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+ dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
return;
}
} else {
- CDentry* dn = mdr->dn[0].back();
- mdr->pin(dn);
- auto em = dn->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
+ CInode *in = mdr->in[0];
+ mdr->pin(in);
+ auto em = in->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
- dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+ dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
return;
}
}
}
+ CInode *ref = rdlock_path_pin_ref(mdr, want_auth, false);
+ if (!ref)
+ return;
+
+ mdr->getattr_caps = mask;
+
/*
* if client currently holds the EXCL cap on a field, do not rdlock
* it; client's stat() will result in valid info if _either_ EXCL