// active?
if (!mds->is_active() &&
!(mds->is_stopping() && m->get_source().is_mds())) {
- if ((mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) &&
- m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
- (static_cast<MClientRequest*>(m))->is_replay()) {
- dout(3) << "queuing replayed op" << dendl;
- mds->enqueue_replay(new C_MDS_RetryMessage(mds, m));
- return;
- } else if (mds->is_clientreplay() &&
- // session open requests need to be handled during replay,
- // close requests need to be delayed
- ((m->get_type() == CEPH_MSG_CLIENT_SESSION &&
- (static_cast<MClientSession*>(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE) ||
- (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
- (static_cast<MClientRequest*>(m))->is_replay()))) {
- // replaying!
- } else if (m->get_type() == MSG_MDS_SLAVE_REQUEST) {
+ if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
+ (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT)) {
+ MClientRequest *req = static_cast<MClientRequest*>(m);
+ bool queue_replay = false;
+ if (req->is_replay()) {
+ dout(3) << "queuing replayed op" << dendl;
+ queue_replay = true;
+ } else if (req->get_retry_attempt()) {
+ // process completed request in clientreplay stage. The completed request
+ // might have created new file/directorie. This guarantees MDS sends a reply
+ // to client before other request modifies the new file/directorie.
+ Session *session = get_session(req);
+ if (session && session->have_completed_request(req->get_reqid().tid, NULL)) {
+ dout(3) << "queuing completed op" << dendl;
+ queue_replay = true;
+ }
+ }
+ if (queue_replay) {
+ mds->enqueue_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+ }
+
+ bool wait_for_active = true;
+ if (m->get_type() == MSG_MDS_SLAVE_REQUEST) {
// handle_slave_request() will wait if necessary
- } else {
+ wait_for_active = false;
+ } else if (mds->is_clientreplay()) {
+ // session open requests need to be handled during replay,
+ // close requests need to be delayed
+ if ((m->get_type() == CEPH_MSG_CLIENT_SESSION &&
+ (static_cast<MClientSession*>(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE)) {
+ wait_for_active = false;
+ } else if (m->get_type() == CEPH_MSG_CLIENT_REQUEST) {
+ MClientRequest *req = static_cast<MClientRequest*>(m);
+ if (req->is_replay()) {
+ wait_for_active = false;
+ } else {
+ Session *session = get_session(req);
+ if (session && session->have_completed_request(req->get_reqid().tid, NULL))
+ wait_for_active = false;
+ }
+ }
+ }
+ if (wait_for_active) {
dout(3) << "not active yet, waiting" << dendl;
mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
return;