if (mdr->client_request && mdr->client_request->get_source().is_client()) {
dout(7) << "request_forward " << *mdr << " to mds." << who << " req "
<< *mdr->client_request << dendl;
- if (mdr->is_batch_head) {
- int mask = mdr->client_request->head.args.getattr.mask;
-
- switch (mdr->client_request->get_op()) {
- case CEPH_MDS_OP_GETATTR:
- {
- CInode* in = mdr->in[0];
- if (in) {
- auto it = in->batch_ops.find(mask);
- if (it != in->batch_ops.end()) {
- it->second->forward(who);
- in->batch_ops.erase(it);
- }
- }
- break;
- }
- case CEPH_MDS_OP_LOOKUP:
- {
- if (mdr->dn[0].size()) {
- CDentry* dn = mdr->dn[0].back();
- auto it = dn->batch_ops.find(mask);
- if (it != dn->batch_ops.end()) {
- it->second->forward(who);
- dn->batch_ops.erase(it);
- }
- }
- break;
- }
- default:
- ceph_abort();
- }
+ if (mdr->is_batch_head()) {
+ mdr->release_batch_op()->forward(who);
} else {
mds->forward_message_mds(mdr->release_client_request(), who);
}
protected:
Server* server;
ceph::ref_t<MDRequestImpl> mdr;
- MDCache* mdcache;
int res = 0;
public:
- Batch_Getattr_Lookup(Server* s, ceph::ref_t<MDRequestImpl> r, MDCache* mdc) : server(s), mdr(std::move(r)), mdcache(mdc) {}
+ Batch_Getattr_Lookup(Server* s, const ceph::ref_t<MDRequestImpl>& r)
+ : server(s), mdr(r) {
+ if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP)
+ mdr->batch_op_map = &mdr->dn[0].back()->batch_ops;
+ else
+ mdr->batch_op_map = &mdr->in[0]->batch_ops;
+ }
void add_request(const ceph::ref_t<MDRequestImpl>& m) override {
mdr->batch_reqs.push_back(m);
}
mdr = m;
}
void _forward(mds_rank_t t) override {
+ MDCache* mdcache = server->mdcache;
mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
mdr->set_mds_stamp(ceph_clock_now());
for (auto& m : mdr->batch_reqs) {
void Server::respond_to_request(MDRequestRef& mdr, int r)
{
if (mdr->client_request) {
- if (mdr->is_batch_op() && mdr->is_batch_head) {
- int mask = mdr->client_request->head.args.getattr.mask;
-
- std::unique_ptr<BatchOp> bop;
- if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
- dout(20) << __func__ << ": respond other getattr ops. " << *mdr << dendl;
- auto it = mdr->in[0]->batch_ops.find(mask);
- bop = std::move(it->second);
- mdr->in[0]->batch_ops.erase(it);
- } else {
- dout(20) << __func__ << ": respond other lookup ops. " << *mdr << dendl;
- auto it = mdr->dn[0].back()->batch_ops.find(mask);
- bop = std::move(it->second);
- mdr->dn[0].back()->batch_ops.erase(it);
- }
-
- bop->respond(r);
+ if (mdr->is_batch_head()) {
+ dout(20) << __func__ << " batch head " << *mdr << dendl;
+ mdr->release_batch_op()->respond(r);
} else {
reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
}
});
}
-void Server::clear_batch_ops(const MDRequestRef& mdr)
-{
- int mask = mdr->client_request->head.args.getattr.mask;
- if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR && mdr->in[0]) {
- mdr->in[0]->batch_ops.erase(mask);
- } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP && mdr->dn[0].size()) {
- mdr->dn[0].back()->batch_ops.erase(mask);
- }
-}
-
void Server::dispatch_client_request(MDRequestRef& mdr)
{
// we shouldn't be waiting on anyone.
dout(10) << "request " << *mdr << " was killed" << dendl;
//if the mdr is a "batch_op" and it has followers, pick a follower as
//the new "head of the batch ops" and go on processing the new one.
- if (mdr->is_batch_op() && mdr->is_batch_head ) {
+ if (mdr->is_batch_head()) {
+ int mask = mdr->client_request->head.args.getattr.mask;
+ auto it = mdr->batch_op_map->find(mask);
+
if (!mdr->batch_reqs.empty()) {
MDRequestRef new_batch_head;
for (auto itr = mdr->batch_reqs.cbegin(); itr != mdr->batch_reqs.cend();) {
break;
}
}
-
if (!new_batch_head) {
- clear_batch_ops(mdr);
+ mdr->batch_op_map->erase(it);
return;
}
new_batch_head->batch_reqs = std::move(mdr->batch_reqs);
-
+ new_batch_head->batch_op_map = mdr->batch_op_map;
+ mdr->batch_op_map = nullptr;
+ it->second->set_request(new_batch_head);
mdr = new_batch_head;
- mdr->is_batch_head = true;
- int mask = mdr->client_request->head.args.getattr.mask;
- if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
- auto& fin = mdr->in[0]->batch_ops[mask];
- fin->set_request(new_batch_head);
- } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP) {
- auto& fin = mdr->dn[0].back()->batch_ops[mask];
- fin->set_request(new_batch_head);
- }
} else {
- clear_batch_ops(mdr);
+ mdr->batch_op_map->erase(it);
return;
}
} else {
mdr->getattr_caps = mask;
- if (mdr->snapid == CEPH_NOSNAP && !mdr->is_batch_head && mdr->is_batch_op()) {
+ if (mdr->snapid == CEPH_NOSNAP && !mdr->is_batch_head() && mdr->is_batch_op()) {
if (!is_lookup) {
+ mdr->pin(ref);
auto em = ref->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
- em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr, mdcache);
+ em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
}
} else {
CDentry* dn = mdr->dn[0].back();
+ mdr->pin(dn);
auto em = dn->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
- em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr, mdcache);
- mdr->pin(dn);
+ em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
return;
}
}
- mdr->is_batch_head = true;
}
/*