mds_sessions[mds]->state == MetaSession::STATE_OPEN;
}
+MetaSession *Client::_get_mds_session(int mds, Connection *con)
+{
+ if (mds_sessions.count(mds) == 0)
+ return NULL;
+ MetaSession *s = mds_sessions[mds];
+ if (s->con != con)
+ return NULL;
+ return s;
+}
+
MetaSession *Client::_get_or_open_mds_session(int mds)
{
if (mds_sessions.count(mds))
int from = m->get_source().num();
ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl;
- if (mds_sessions.count(from) == 0) {
+ MetaSession *session = _get_mds_session(from, m->get_connection());
+ if (!session) {
ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl;
m->put();
return;
}
- MetaSession *session = mds_sessions[from];
switch (m->get_op()) {
case CEPH_SESSION_OPEN:
void Client::handle_client_request_forward(MClientRequestForward *fwd)
{
+ int mds = fwd->get_source().num();
+ MetaSession *session = _get_mds_session(mds, fwd->get_connection());
+ if (!session) {
+ fwd->put();
+ return;
+ }
tid_t tid = fwd->get_tid();
if (mds_requests.count(tid) == 0) {
void Client::handle_client_reply(MClientReply *reply)
{
+ int mds_num = reply->get_source().num();
+ MetaSession *session = _get_mds_session(mds_num, reply->get_connection());
+ if (!session) {
+ reply->put();
+ return;
+ }
+
tid_t tid = reply->get_tid();
bool is_safe = reply->is_safe();
}
ldout(cct, 20) << "handle_client_reply got a reply. Safe:" << is_safe
- << " tid " << tid << dendl;
- int mds_num = reply->get_source().num();
+ << " tid " << tid << dendl;
MetaRequest *request = mds_requests[tid];
assert(request);
* leases
*/
-void Client::got_mds_push(int mds)
+void Client::got_mds_push(MetaSession *s)
{
- MetaSession *s = mds_sessions[mds];
-
s->seq++;
- ldout(cct, 10) << " mds." << mds << " seq now " << s->seq << dendl;
+ ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl;
if (s->state == MetaSession::STATE_CLOSING) {
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq),
s->con);
assert(m->get_action() == CEPH_MDS_LEASE_REVOKE);
int mds = m->get_source().num();
- got_mds_push(mds);
+ MetaSession *session = _get_mds_session(mds, m->get_connection());
+ if (!session) {
+ m->put();
+ return;
+ }
+
+ got_mds_push(session);
ceph_seq_t seq = m->get_seq();
{
ldout(cct, 10) << "handle_snap " << *m << dendl;
int mds = m->get_source().num();
- got_mds_push(mds);
+ MetaSession *session = _get_mds_session(mds, m->get_connection());
+ if (!session) {
+ m->put();
+ return;
+ }
+
+ got_mds_push(session);
list<Inode*> to_move;
SnapRealm *realm = 0;
void Client::handle_caps(MClientCaps *m)
{
int mds = m->get_source().num();
+ MetaSession *session = _get_mds_session(mds, m->get_connection());
+ if (!session) {
+ m->put();
+ return;
+ }
+ got_mds_push(session);
m->clear_payload(); // for if/when we send back to MDS
- got_mds_push(mds);
-
Inode *in = 0;
vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
if (inode_map.count(vino))
if (!in) {
if (m->get_op() == CEPH_CAP_OP_IMPORT) {
ldout(cct, 5) << "handle_caps don't have vino " << vino << " on IMPORT, immediately releasing" << dendl;
- MetaSession *session = mds_sessions[mds];
if (!session->release)
session->release = new MClientCapRelease;
ceph_mds_cap_item i;
}
switch (m->get_op()) {
- case CEPH_CAP_OP_IMPORT: return handle_cap_import(in, m);
- case CEPH_CAP_OP_EXPORT: return handle_cap_export(in, m);
- case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(in, m);
+ case CEPH_CAP_OP_IMPORT: return handle_cap_import(session, in, m);
+ case CEPH_CAP_OP_EXPORT: return handle_cap_export(session, in, m);
+ case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(session, in, m);
}
if (in->caps.count(mds) == 0) {
Cap *cap = in->caps[mds];
switch (m->get_op()) {
- case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(in, m);
+ case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(session, in, m);
case CEPH_CAP_OP_REVOKE:
- case CEPH_CAP_OP_GRANT: return handle_cap_grant(in, mds, cap, m);
- case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(in, mds, cap, m);
+ case CEPH_CAP_OP_GRANT: return handle_cap_grant(session, in, cap, m);
+ case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(session, in, cap, m);
default:
m->put();
}
}
-void Client::handle_cap_import(Inode *in, MClientCaps *m)
+void Client::handle_cap_import(MetaSession *session, Inode *in, MClientCaps *m)
{
- int mds = m->get_source().num();
- MetaSession *session = mds_sessions[mds];
+ int mds = session->mds_num;
// add/update it
update_snap_trace(m->snapbl);
m->put();
}
-void Client::handle_cap_export(Inode *in, MClientCaps *m)
+void Client::handle_cap_export(MetaSession *session, Inode *in, MClientCaps *m)
{
- int mds = m->get_source().num();
+ int mds = session->mds_num;
Cap *cap = NULL;
// note?
m->put();
}
-void Client::handle_cap_trunc(Inode *in, MClientCaps *m)
+void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m)
{
- int mds = m->get_source().num();
+ int mds = session->mds_num;
assert(in->caps[mds]);
ldout(cct, 10) << "handle_cap_trunc on ino " << *in
m->put();
}
-void Client::handle_cap_flush_ack(Inode *in, int mds, Cap *cap, MClientCaps *m)
+void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m)
{
+ int mds = session->mds_num;
int dirty = m->get_dirty();
int cleaned = 0;
for (int i = 0; i < CEPH_CAP_BITS; ++i) {
}
-void Client::handle_cap_flushsnap_ack(Inode *in, MClientCaps *m)
+void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, MClientCaps *m)
{
- int mds = m->get_source().num();
+ int mds = session->mds_num;
assert(in->caps[mds]);
snapid_t follows = m->get_snap_follows();
}
-void Client::handle_cap_grant(Inode *in, int mds, Cap *cap, MClientCaps *m)
+void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m)
{
+ int mds = session->mds_num;
int used = in->caps_used();
const int old_caps = cap->issued;
list<Cond*> waiting_for_mdsmap;
bool have_open_session(int mds);
- void got_mds_push(int mds);
+ void got_mds_push(MetaSession *s);
+ MetaSession *_get_mds_session(int mds, Connection *con); ///< return session for mds *and* con; null otherwise
MetaSession *_get_or_open_mds_session(int mds);
MetaSession *_open_mds_session(int mds);
void _close_mds_session(MetaSession *s);
void handle_snap(class MClientSnap *m);
void handle_caps(class MClientCaps *m);
- void handle_cap_import(Inode *in, class MClientCaps *m);
- void handle_cap_export(Inode *in, class MClientCaps *m);
- void handle_cap_trunc(Inode *in, class MClientCaps *m);
- void handle_cap_flush_ack(Inode *in, int mds, Cap *cap, class MClientCaps *m);
- void handle_cap_flushsnap_ack(Inode *in, class MClientCaps *m);
- void handle_cap_grant(Inode *in, int mds, Cap *cap, class MClientCaps *m);
+ void handle_cap_import(MetaSession *session, Inode *in, class MClientCaps *m);
+ void handle_cap_export(MetaSession *session, Inode *in, class MClientCaps *m);
+ void handle_cap_trunc(MetaSession *session, Inode *in, class MClientCaps *m);
+ void handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, class MClientCaps *m);
+ void handle_cap_flushsnap_ack(MetaSession *session, Inode *in, class MClientCaps *m);
+ void handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, class MClientCaps *m);
void cap_delay_requeue(Inode *in);
void send_cap(Inode *in, MetaSession *session, Cap *cap,
int used, int want, int retain, int flush);