if (mdr->client_request && mdr->client_request->get_source().is_client()) {
dout(7) << "request_forward " << *mdr << " to mds." << who << " req "
<< *mdr->client_request << dendl;
- mds->forward_message_mds(mdr->release_client_request(), who);
+ if (mdr->is_batch_head) {
+ BatchOp* bop = nullptr;
+ int mask = mdr->client_request->head.args.getattr.mask;
+
+ switch (mdr->client_request->get_op()) {
+ case CEPH_MDS_OP_GETATTR:
+ {
+ CInode* in = mdr->in[0];
+ if (in) {
+ auto it = in->batch_ops.find(mask);
+ if (it != in->batch_ops.end()) {
+ bop = it->second;
+ }
+ }
+ break;
+ }
+ case CEPH_MDS_OP_LOOKUP:
+ {
+ CDentry* dn = mdr->dn[0].back();
+ if (dn) {
+ auto it = dn->batch_ops.find(mask);
+ if (it != dn->batch_ops.end()) {
+ bop = it->second;
+ }
+ }
+ break;
+ }
+ default:
+ ceph_abort();
+ }
+ if (bop) {
+ dout(20) << __func__ << ": forward other batch ops(GETATTR/LOOKUP) to "
+ << who << ":" << port <<", too. " << *mdr << dendl;
+ bop->forward_requests(who);
+ }
+ } else {
+ mds->forward_message_mds(mdr->release_client_request(), who);
+ }
if (mds->logger) mds->logger->inc(l_mds_forward);
} else if (mdr->internal_op >= 0) {
dout(10) << "request_forward on internal op; cancelling" << dendl;
}
};
+class Batch_Getattr_Lookup : public BatchOp {
+protected:
+ Server* server;
+ MDRequestRef mdr;
+ MDCache* mdcache;
+ int res = 0;
+public:
+ Batch_Getattr_Lookup(Server* s, MDRequestRef& r, MDCache* mdc) : server(s), mdr(r), mdcache(mdc) {}
+ void add_request(const MDRequestRef& m) override {
+ mdr->batch_reqs.push_back(m);
+ }
+ void set_request(const MDRequestRef& m) override {
+ mdr = m;
+ }
+ void forward_all(mds_rank_t t) override {
+ mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
+ mdr->set_mds_stamp(ceph_clock_now());
+ for (auto m : mdr->batch_reqs) {
+ if (!m->killed)
+ mdcache->request_forward(m, t);
+ }
+ mdr->batch_reqs.clear();
+ }
+ void respond_all(int r) {
+ mdr->set_mds_stamp(ceph_clock_now());
+ for (auto m : mdr->batch_reqs) {
+ if (!m->killed) {
+ m->tracei = mdr->tracei;
+ m->tracedn = mdr->tracedn;
+ server->respond_to_request(m, r);
+ }
+ }
+ mdr->batch_reqs.clear();
+ server->reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+ }
+};
+
class ServerLogContext : public MDSLogContextBase {
protected:
Server *server;
void Server::respond_to_request(MDRequestRef& mdr, int r)
{
if (mdr->client_request) {
- reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+ if (mdr->is_batch_op() && mdr->is_batch_head) {
+ int mask = mdr->client_request->head.args.getattr.mask;
+
+ BatchOp *fin;
+ if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+ dout(20) << __func__ << ": respond other getattr ops. " << *mdr << dendl;
+ fin = mdr->in[0]->batch_ops[mask];
+ } else {
+ dout(20) << __func__ << ": respond other lookup ops. " << *mdr << dendl;
+ fin = mdr->dn[0].back()->batch_ops[mask];
+ }
+
+ fin->finish(r);
+
+ if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+ mdr->in[0]->batch_ops.erase(mask);
+ } else {
+ mdr->dn[0].back()->batch_ops.erase(mask);
+ }
+ } else {
+ reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+ }
} else if (mdr->internal_op > -1) {
dout(10) << "respond_to_request on internal request " << mdr << dendl;
if (!mdr->internal_op_finish)
});
}
+void Server::clear_batch_ops(const MDRequestRef& mdr)
+{
+ int mask = mdr->client_request->head.args.getattr.mask;
+ if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR && mdr->in[0]) {
+ auto it = mdr->in[0]->batch_ops.find(mask);
+ auto bocp = it->second;
+ mdr->in[0]->batch_ops.erase(it);
+ delete bocp;
+ } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP && mdr->dn[0].size()) {
+ auto it = mdr->dn[0].back()->batch_ops.find(mask);
+ auto bocp = it->second;
+ mdr->dn[0].back()->batch_ops.erase(it);
+ delete bocp;
+ }
+}
+
void Server::dispatch_client_request(MDRequestRef& mdr)
{
// we shouldn't be waiting on anyone.
if (mdr->killed) {
dout(10) << "request " << *mdr << " was killed" << dendl;
- return;
+ //if the mdr is a "batch_op" and it has followers, pick a follower as
+ //the new "head of the batch ops" and go on processing the new one.
+ if (mdr->is_batch_op() && mdr->is_batch_head ) {
+ if (!mdr->batch_reqs.empty()) {
+ MDRequestRef new_batch_head;
+ for (auto itr = mdr->batch_reqs.cbegin(); itr != mdr->batch_reqs.cend();) {
+ auto req = *itr;
+ itr = mdr->batch_reqs.erase(itr);
+ if (!req->killed) {
+ new_batch_head = req;
+ break;
+ }
+ }
+
+ if (!new_batch_head) {
+ clear_batch_ops(mdr);
+ return;
+ }
+
+ new_batch_head->batch_reqs = std::move(mdr->batch_reqs);
+
+ mdr = new_batch_head;
+ mdr->is_batch_head = true;
+ int mask = mdr->client_request->head.args.getattr.mask;
+ if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+ auto fin = mdr->in[0]->batch_ops[mask];
+ fin->set_request(new_batch_head);
+ } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP) {
+ auto fin = mdr->dn[0].back()->batch_ops[mask];
+ fin->set_request(new_batch_head);
+ }
+ } else {
+ clear_batch_ops(mdr);
+ return;
+ }
+ } else {
+ return;
+ }
} else if (mdr->aborted) {
mdr->aborted = false;
mdcache->request_kill(mdr);
!is_lookup);
if (!ref) return;
+ mdr->getattr_caps = mask;
+
+ if (!mdr->is_batch_head && mdr->is_batch_op()) {
+ if (!is_lookup) {
+ if (ref->batch_ops.count(mask)) {
+ dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+ ref->batch_ops[mask]->add_request(mdr);
+ return;
+ } else
+ ref->batch_ops[mask] = new Batch_Getattr_Lookup(this, mdr, mdcache);
+ } else {
+ CDentry* dn = mdr->dn[0].back();
+ if (dn->batch_ops.count(mask)) {
+ dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+ dn->batch_ops[mask]->add_request(mdr);
+ return;
+ } else
+ dn->batch_ops[mask] = new Batch_Getattr_Lookup(this, mdr, mdcache);
+ }
+ }
+ mdr->is_batch_head = true;
+
/*
* if client currently holds the EXCL cap on a field, do not rdlock
* it; client's stat() will result in valid info if _either_ EXCL