From d12d92d6bf65be72a21ca98d04a143fe72c3e9f3 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 3 Feb 2015 15:31:11 +0800 Subject: [PATCH] mds: process completed requests in clientreplay stage Completed requests may have created new file/directorie. This guarantees completed requests are processed before any other client gets change to modify the new files/directorie. Signed-off-by: Yan, Zheng --- src/mds/Server.cc | 60 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/src/mds/Server.cc b/src/mds/Server.cc index e31da938a0e54..2dfe172770d4d 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -114,23 +114,51 @@ void Server::dispatch(Message *m) // active? if (!mds->is_active() && !(mds->is_stopping() && m->get_source().is_mds())) { - if ((mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) && - m->get_type() == CEPH_MSG_CLIENT_REQUEST && - (static_cast(m))->is_replay()) { - dout(3) << "queuing replayed op" << dendl; - mds->enqueue_replay(new C_MDS_RetryMessage(mds, m)); - return; - } else if (mds->is_clientreplay() && - // session open requests need to be handled during replay, - // close requests need to be delayed - ((m->get_type() == CEPH_MSG_CLIENT_SESSION && - (static_cast(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE) || - (m->get_type() == CEPH_MSG_CLIENT_REQUEST && - (static_cast(m))->is_replay()))) { - // replaying! - } else if (m->get_type() == MSG_MDS_SLAVE_REQUEST) { + if (m->get_type() == CEPH_MSG_CLIENT_REQUEST && + (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT)) { + MClientRequest *req = static_cast(m); + bool queue_replay = false; + if (req->is_replay()) { + dout(3) << "queuing replayed op" << dendl; + queue_replay = true; + } else if (req->get_retry_attempt()) { + // process completed request in clientreplay stage. The completed request + // might have created new file/directorie. This guarantees MDS sends a reply + // to client before other request modifies the new file/directorie. + Session *session = get_session(req); + if (session && session->have_completed_request(req->get_reqid().tid, NULL)) { + dout(3) << "queuing completed op" << dendl; + queue_replay = true; + } + } + if (queue_replay) { + mds->enqueue_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + } + + bool wait_for_active = true; + if (m->get_type() == MSG_MDS_SLAVE_REQUEST) { // handle_slave_request() will wait if necessary - } else { + wait_for_active = false; + } else if (mds->is_clientreplay()) { + // session open requests need to be handled during replay, + // close requests need to be delayed + if ((m->get_type() == CEPH_MSG_CLIENT_SESSION && + (static_cast(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE)) { + wait_for_active = false; + } else if (m->get_type() == CEPH_MSG_CLIENT_REQUEST) { + MClientRequest *req = static_cast(m); + if (req->is_replay()) { + wait_for_active = false; + } else { + Session *session = get_session(req); + if (session && session->have_completed_request(req->get_reqid().tid, NULL)) + wait_for_active = false; + } + } + } + if (wait_for_active) { dout(3) << "not active yet, waiting" << dendl; mds->wait_for_active(new C_MDS_RetryMessage(mds, m)); return; -- 2.39.5