(m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
(static_cast<MClientRequest*>(m))->is_replay()))) {
// replaying!
- } else if (mds->is_clientreplay() && m->get_type() == MSG_MDS_SLAVE_REQUEST &&
- ((static_cast<MMDSSlaveRequest*>(m))->is_reply() ||
- !mds->mdsmap->is_active(m->get_source().num()))) {
- // slave reply or the master is also in the clientreplay stage
+ } else if (m->get_type() == MSG_MDS_SLAVE_REQUEST) {
+ // handle_slave_request() will wait if necessary
} else {
dout(3) << "not active yet, waiting" << dendl;
mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
if (m->is_reply())
return handle_slave_request_reply(m);
+ CDentry *straydn = NULL;
+ if (m->stray.length() > 0) {
+ straydn = mdcache->add_replica_stray(m->stray, from);
+ assert(straydn);
+ m->stray.clear();
+ }
+
// am i a new slave?
MDRequest *mdr = NULL;
if (mdcache->have_request(m->get_reqid())) {
m->put();
return;
}
- mdr = mdcache->request_start_slave(m->get_reqid(), m->get_attempt(), m->get_source().num());
+ mdr = mdcache->request_start_slave(m->get_reqid(), m->get_attempt(), from);
}
assert(mdr->slave_request == 0); // only one at a time, please!
+
+ if (straydn) {
+ mdr->pin(straydn);
+ mdr->straydn = straydn;
+ }
+
+ if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
+ dout(3) << "not clientreplay|active yet, waiting" << dendl;
+ mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ } else if (mds->is_clientreplay() && !mds->mdsmap->is_clientreplay(from) &&
+ mdr->locks.empty()) {
+ dout(3) << "not active yet, waiting" << dendl;
+ mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+
mdr->slave_request = m;
dispatch_slave_request(mdr);
{
int from = m->get_source().num();
+ if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
+ dout(3) << "not clientreplay|active yet, waiting" << dendl;
+ mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+
if (m->get_op() == MMDSSlaveRequest::OP_COMMITTED) {
metareqid_t r = m->get_reqid();
mds->mdcache->committed_master_slave(r, from);
dout(10) << " dn " << *dn << dendl;
mdr->pin(dn);
- assert(mdr->slave_request->stray.length() > 0);
- CDentry *straydn = mdcache->add_replica_stray(mdr->slave_request->stray, mdr->slave_to_mds);
- assert(straydn);
- mdr->pin(straydn);
+ assert(mdr->straydn);
+ CDentry *straydn = mdr->straydn;
dout(10) << " straydn " << *straydn << dendl;
mdr->now = mdr->slave_request->now;
// done.
mdr->slave_request->put();
mdr->slave_request = 0;
+ mdr->straydn = 0;
}
void Server::handle_slave_rmdir_prep_ack(MDRequest *mdr, MMDSSlaveRequest *ack)
// stray?
bool linkmerge = (srcdnl->get_inode() == destdnl->get_inode() &&
(srcdnl->is_primary() || destdnl->is_primary()));
- CDentry *straydn = 0;
- if (destdnl->is_primary() && !linkmerge) {
- assert(mdr->slave_request->stray.length() > 0);
- straydn = mdcache->add_replica_stray(mdr->slave_request->stray, mdr->slave_to_mds);
+ CDentry *straydn = mdr->straydn;
+ if (destdnl->is_primary() && !linkmerge)
assert(straydn);
- mdr->pin(straydn);
- }
mdr->now = mdr->slave_request->now;
+ mdr->more()->srcdn_auth_mds = srcdn->authority().first;
// set up commit waiter (early, to clean up any freezing etc we do)
if (!mdr->more()->slave_commit)
// done.
mdr->slave_request->put();
mdr->slave_request = 0;
+ mdr->straydn = 0;
}
void Server::_commit_slave_rename(MDRequest *mdr, int r,