req->releases.clear();
}
if (queue_replay) {
+ req->mark_queued_for_replay();
mds->enqueue_replay(new C_MDS_RetryMessage(mds, m));
return;
}
wait_for_active = false;
} else if (m->get_type() == CEPH_MSG_CLIENT_REQUEST) {
MClientRequest *req = static_cast<MClientRequest*>(m);
- if (req->is_replay()) {
+ if (req->is_queued_for_replay()) {
wait_for_active = false;
- } else {
- Session *session = get_session(req);
- if (session && session->have_completed_request(req->get_reqid().tid, NULL))
- wait_for_active = false;
}
}
}
mdr->committing = true;
submit_mdlog_entry(le, fin, mdr, __func__);
- if (mdr->client_request && mdr->client_request->is_replay()) {
+ if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
if (mds->queue_one_replay()) {
dout(10) << " queued next replay op" << dendl;
} else {
req->get_connection()->send_message(reply);
}
- const bool completed = mdr->has_completed;
+ if (req->is_queued_for_replay() &&
+ (mdr->has_completed || reply->get_result() < 0)) {
+ if (reply->get_result() < 0) {
+ int r = reply->get_result();
+ derr << "reply_client_request: failed to replay " << *req
+ << " error " << r << " (" << cpp_strerror(r) << ")" << dendl;
+ mds->clog->warn() << "failed to replay " << req->get_reqid() << " error " << r;
+ }
+ mds->queue_one_replay();
+ }
// clean up request
mdcache->request_finish(mdr);
tracedn->get_projected_linkage()->is_remote()) {
mdcache->eval_remote(tracedn);
}
-
- // Advance clientreplay process if we're in it
- if (completed && mds->is_clientreplay()) {
- mds->queue_one_replay();
- }
}
session = get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
- req->put();
- return;
- }
- if (session->is_closed() ||
- session->is_closing() ||
- session->is_killing()) {
+ } else if (session->is_closed() ||
+ session->is_closing() ||
+ session->is_killing()) {
dout(5) << "session closed|closing|killing, dropping" << dendl;
+ session = NULL;
+ }
+ if (!session) {
+ if (req->is_queued_for_replay())
+ mds->queue_one_replay();
req->put();
return;
}
}
req->get_connection()->send_message(reply);
- if (mds->is_clientreplay())
+ if (req->is_queued_for_replay())
mds->queue_one_replay();
req->put();
filepath path, path2;
vector<uint64_t> gid_list;
-
+ bool queued_for_replay = false;
public:
// cons
int get_dentry_wanted() { return get_flags() & CEPH_MDS_FLAG_WANT_DENTRY; }
+ void mark_queued_for_replay() { queued_for_replay = true; }
+ bool is_queued_for_replay() { return queued_for_replay; }
+
void decode_payload() override {
bufferlist::iterator p = payload.begin();
out << " RETRY=" << (int)head.num_retry;
if (get_flags() & CEPH_MDS_FLAG_REPLAY)
out << " REPLAY";
+ if (queued_for_replay)
+ out << " QUEUED_FOR_REPLAY";
out << " caller_uid=" << head.caller_uid
<< ", caller_gid=" << head.caller_gid
<< '{';