dout(10) << "handle_open_ino " << *m << " err " << err << dendl;
+ auto from = mds_rank_t(m->get_source().num());
inodeno_t ino = m->ino;
MMDSOpenInoReply::ref reply;
CInode *in = get_inode(ino);
return;
reply = MMDSOpenInoReply::create(m->get_tid(), ino, hint, ret);
}
- m->get_connection()->send_message2(reply); /* FIXME, why not send_client? */
+ mds->send_message_mds(reply, from);
}
void MDCache::handle_open_ino_reply(const MMDSOpenInoReply::const_ref &m)
in->make_path(r->path);
dout(10) << " have " << r->path << " " << *in << dendl;
}
- m->get_connection()->send_message2(r);
+ mds->send_message_mds(r, mds_rank_t(m->get_source().num()));
}
break;
case CEPH_SESSION_REQUEST_RENEWCAPS:
- if (session->is_open() ||
- session->is_stale()) {
+ if (session->is_open() || session->is_stale()) {
mds->sessionmap.touch_session(session);
if (session->is_stale()) {
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->locker->resume_stale_caps(session);
mds->sessionmap.touch_session(session);
}
- m->get_connection()->send_message2(MClientSession::create(CEPH_SESSION_RENEWCAPS, m->get_seq()));
+ auto reply = MClientSession::create(CEPH_SESSION_RENEWCAPS, m->get_seq());
+ mds->send_message_client(reply, session);
} else {
dout(10) << "ignoring renewcaps on non open|stale session (" << session->get_state_name() << ")" << dendl;
}
auto reply = MClientSession::create(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
reply->supported_features = supported_features;
- session->get_connection()->send_message2(reply);
- if (mdcache->is_readonly())
- session->get_connection()->send_message2(MClientSession::create(CEPH_SESSION_FORCE_RO));
+ mds->send_message_client(reply, session);
+ if (mdcache->is_readonly()) {
+ auto m = MClientSession::create(CEPH_SESSION_FORCE_RO);
+ mds->send_message_client(m, session);
+ }
} else if (session->is_closing() ||
session->is_killing()) {
// kill any lingering capabilities, leases, requests
}
if (deny) {
- m->get_connection()->send_message2(MClientSession::create(CEPH_SESSION_CLOSE));
+ auto m = MClientSession::create(CEPH_SESSION_CLOSE);
+ mds->send_message_client(m, session);
if (session->is_open())
kill_session(session, nullptr);
return;
auto reply = MClientSession::create(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
reply->supported_features = supported_features;
- m->get_connection()->send_message2(reply);
+ mds->send_message_client(reply, session);
session->last_cap_renew = clock::now();
mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
}
reply->set_extra_bl(mdr->reply_extra_bl);
- req->get_connection()->send_message2(reply);
+ mds->send_message_client(reply, mdr->session);
mdr->did_early_reply = true;
reply->set_extra_bl(mdr->reply_extra_bl);
reply->set_mdsmap_epoch(mds->mdsmap->get_epoch());
- req->get_connection()->send_message2(reply);
+ mds->send_message_client(reply, session);
}
if (req->is_queued_for_replay() &&
encode(created, extra);
reply->set_extra_bl(extra);
}
- req->get_connection()->send_message2(reply);
+ mds->send_message_client(reply, session);
if (req->is_queued_for_replay())
mds->queue_one_replay();