// kick dispatcher (we've got it!)
assert(request->dispatch_cond);
request->dispatch_cond->Signal();
- request->dispatch_cond = 0;
dout(20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
+ request->dispatch_cond = 0;
// insert trace
utime_t from = request->sent_stamp;
r->decode_payload();
r->set_retry_attempt(request->retry_attempt);
r->set_dentry_wanted();
+ if(request->got_unsafe) r->set_replayed_op();
}
else
request->retry_attempt++;
void Client::handle_client_reply(MClientReply *reply)
{
tid_t tid = reply->get_tid();
+ bool is_safe = reply->is_safe();
if (mds_requests.count(tid) == 0) {
- dout(10) << "handle_client_reply no pending request on tid " << tid << dendl;
+ dout(10) << "handle_client_reply no pending request on tid " << tid
+ << " safe is:" << is_safe << dendl;
delete reply;
return;
}
- dout(20) << "handle_client_reply got a reply. Safe:" << reply->is_safe()
- << " tid:" << tid << dendl;
+ dout(20) << "handle_client_reply got a reply. Safe:" << is_safe
+ << " tid " << tid << dendl;
int mds_num = reply->get_source().num();
MetaRequest *request = mds_requests[tid];
assert(request);
- // store reply
- request->reply = reply;
- if ((request->got_unsafe && !reply->is_safe())
- || (request->got_safe && reply->is_safe())) {
+ if ((request->got_unsafe && !is_safe)
+ || (request->got_safe && is_safe)) {
//duplicate response
- dout(0) << "got a duplicate reply on " << tid << " from mds "
- << mds_num << " safe:" << reply->is_safe() << dendl;
+ dout(0) << "got a duplicate reply on tid " << tid << " from mds "
+ << mds_num << " safe:" << is_safe << dendl;
+ delete reply;
return;
}
request->got_unsafe = true;
mds_sessions[mds_num].unsafe_requests.push_back(&request->unsafe_item);
+ // store reply
+ request->reply = reply;
+
Cond cond;
request->dispatch_cond = &cond;
}
}
- if (reply->is_safe()) {
+ if (is_safe) {
// the filesystem change is committed to disk
request->got_safe = true;
if (request->got_unsafe) {
//we're done, clean up
request->unsafe_item.remove_myself();
- goto cleanup;
+ mds_requests.erase(tid);
+ request->put(); // for the dumb data structure
}
}
-
- cleanup:
- if (reply->is_safe()) {
- mds_requests.erase(tid);
- request->put();
- }
- return;
}
void Client::resend_unsafe_requests(int mds_num) {
MDSSession& mds = mds_sessions[mds_num];
- MetaRequest* current = mds.unsafe_requests.front();
- MClientRequest *m;
- while (current) {
- current->unsafe_item.remove_myself();
- m = new MClientRequest;
- m->copy_payload(current->request_payload);
- m->decode_payload();
- m->set_retry_attempt(current->retry_attempt);
- m->set_dentry_wanted();
- m->set_replayed_op();
- current->request = m;
- current->got_unsafe = false;
- current->got_safe = false;
- send_request(current, mds_num);
- current = mds.unsafe_requests.front();
- }
+ for (xlist<MetaRequest*>::iterator iter = mds.unsafe_requests.begin();
+ !iter.end();
+ ++iter)
+ send_request(*iter, mds_num);
}
/************