void MDCache::dispatch_request(const MDRequestRef& mdr)
{
if (mdr->dead) {
- dout(20) << __func__ << ": dead " << *mdr << dendl;
+ dout(10) << __func__ << ": dead " << *mdr << dendl;
+ //if the mdr is a "batch_op" and it has followers, pick a follower as
+ //the new "head of the batch ops" and go on processing the new one.
+ if (mdr->client_request && mdr->is_batch_head()) {
+ int mask = mdr->client_request->head.args.getattr.mask;
+ auto it = mdr->batch_op_map->find(mask);
+ auto new_batch_head = it->second->find_new_head();
+ if (!new_batch_head) {
+ mdr->batch_op_map->erase(it);
+ dout(10) << __func__ << ": mask '" << mask
+ << "' batch head is dead and there is no follower" << dendl;
+ return;
+ }
+ dout(10) << __func__ << ": mask '" << mask
+ << "' batch head is dead and queue a new one "
+ << *new_batch_head << dendl;
+ mds->finisher->queue(new C_MDS_RetryRequest(this, new_batch_head));
+ return;
+ }
return;
}
if (mdr->client_request) {
auto it = mdr->batch_op_map->find(mask);
auto new_batch_head = it->second->find_new_head();
if (!new_batch_head) {
- mdr->batch_op_map->erase(it);
- return;
+ mdr->batch_op_map->erase(it);
+ dout(10) << __func__ << ": mask '" << mask
+ << "' batch head is killed and there is no follower" << dendl;
+ return;
}
+ dout(10) << __func__ << ": mask '" << mask
+ << "' batch head is killed and queue a new one "
+ << *new_batch_head << dendl;
mds->finisher->queue(new C_MDS_RetryRequest(mdcache, new_batch_head));
return;
} else {