From: Sage Weil Date: Wed, 13 Mar 2013 23:06:02 +0000 (-0700) Subject: client: validate/lookup mds session in each message handler X-Git-Tag: v0.60~81^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bc2a2eb9105f37d922892805056f8a208105fa08;p=ceph.git client: validate/lookup mds session in each message handler For every message handler, look up the MetaSession by int mds and verify that the Connection* matches properly. If so, proceed; otherwise, discard the message. In the future, we probably want to link the MetaSession to the Connection's priv field, but that can come later. Clean up a bunch of submethods that take int mds while we're here. Signed-off-by: Sage Weil --- diff --git a/src/client/Client.cc b/src/client/Client.cc index e39615226e35..b3c934f064ee 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1461,6 +1461,16 @@ bool Client::have_open_session(int mds) mds_sessions[mds]->state == MetaSession::STATE_OPEN; } +MetaSession *Client::_get_mds_session(int mds, Connection *con) +{ + if (mds_sessions.count(mds) == 0) + return NULL; + MetaSession *s = mds_sessions[mds]; + if (s->con != con) + return NULL; + return s; +} + MetaSession *Client::_get_or_open_mds_session(int mds) { if (mds_sessions.count(mds)) @@ -1508,12 +1518,12 @@ void Client::handle_client_session(MClientSession *m) int from = m->get_source().num(); ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl; - if (mds_sessions.count(from) == 0) { + MetaSession *session = _get_mds_session(from, m->get_connection()); + if (!session) { ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl; m->put(); return; } - MetaSession *session = mds_sessions[from]; switch (m->get_op()) { case CEPH_SESSION_OPEN: @@ -1622,6 +1632,12 @@ MClientRequest* Client::build_client_request(MetaRequest *request) void Client::handle_client_request_forward(MClientRequestForward *fwd) { + int mds = fwd->get_source().num(); + MetaSession *session = _get_mds_session(mds, fwd->get_connection()); + if (!session) { + fwd->put(); + return; + } tid_t tid = fwd->get_tid(); if (mds_requests.count(tid) == 0) { @@ -1654,6 +1670,13 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd) void Client::handle_client_reply(MClientReply *reply) { + int mds_num = reply->get_source().num(); + MetaSession *session = _get_mds_session(mds_num, reply->get_connection()); + if (!session) { + reply->put(); + return; + } + tid_t tid = reply->get_tid(); bool is_safe = reply->is_safe(); @@ -1665,8 +1688,7 @@ void Client::handle_client_reply(MClientReply *reply) } ldout(cct, 20) << "handle_client_reply got a reply. Safe:" << is_safe - << " tid " << tid << dendl; - int mds_num = reply->get_source().num(); + << " tid " << tid << dendl; MetaRequest *request = mds_requests[tid]; assert(request); @@ -1964,12 +1986,10 @@ void Client::resend_unsafe_requests(MetaSession *session) * leases */ -void Client::got_mds_push(int mds) +void Client::got_mds_push(MetaSession *s) { - MetaSession *s = mds_sessions[mds]; - s->seq++; - ldout(cct, 10) << " mds." << mds << " seq now " << s->seq << dendl; + ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl; if (s->state == MetaSession::STATE_CLOSING) { messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq), s->con); @@ -1983,7 +2003,13 @@ void Client::handle_lease(MClientLease *m) assert(m->get_action() == CEPH_MDS_LEASE_REVOKE); int mds = m->get_source().num(); - got_mds_push(mds); + MetaSession *session = _get_mds_session(mds, m->get_connection()); + if (!session) { + m->put(); + return; + } + + got_mds_push(session); ceph_seq_t seq = m->get_seq(); @@ -3167,7 +3193,13 @@ void Client::handle_snap(MClientSnap *m) { ldout(cct, 10) << "handle_snap " << *m << dendl; int mds = m->get_source().num(); - got_mds_push(mds); + MetaSession *session = _get_mds_session(mds, m->get_connection()); + if (!session) { + m->put(); + return; + } + + got_mds_push(session); list to_move; SnapRealm *realm = 0; @@ -3237,11 +3269,15 @@ void Client::handle_snap(MClientSnap *m) void Client::handle_caps(MClientCaps *m) { int mds = m->get_source().num(); + MetaSession *session = _get_mds_session(mds, m->get_connection()); + if (!session) { + m->put(); + return; + } + got_mds_push(session); m->clear_payload(); // for if/when we send back to MDS - got_mds_push(mds); - Inode *in = 0; vinodeno_t vino(m->get_ino(), CEPH_NOSNAP); if (inode_map.count(vino)) @@ -3249,7 +3285,6 @@ void Client::handle_caps(MClientCaps *m) if (!in) { if (m->get_op() == CEPH_CAP_OP_IMPORT) { ldout(cct, 5) << "handle_caps don't have vino " << vino << " on IMPORT, immediately releasing" << dendl; - MetaSession *session = mds_sessions[mds]; if (!session->release) session->release = new MClientCapRelease; ceph_mds_cap_item i; @@ -3269,9 +3304,9 @@ void Client::handle_caps(MClientCaps *m) } switch (m->get_op()) { - case CEPH_CAP_OP_IMPORT: return handle_cap_import(in, m); - case CEPH_CAP_OP_EXPORT: return handle_cap_export(in, m); - case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(in, m); + case CEPH_CAP_OP_IMPORT: return handle_cap_import(session, in, m); + case CEPH_CAP_OP_EXPORT: return handle_cap_export(session, in, m); + case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(session, in, m); } if (in->caps.count(mds) == 0) { @@ -3283,19 +3318,18 @@ void Client::handle_caps(MClientCaps *m) Cap *cap = in->caps[mds]; switch (m->get_op()) { - case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(in, m); + case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(session, in, m); case CEPH_CAP_OP_REVOKE: - case CEPH_CAP_OP_GRANT: return handle_cap_grant(in, mds, cap, m); - case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(in, mds, cap, m); + case CEPH_CAP_OP_GRANT: return handle_cap_grant(session, in, cap, m); + case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(session, in, cap, m); default: m->put(); } } -void Client::handle_cap_import(Inode *in, MClientCaps *m) +void Client::handle_cap_import(MetaSession *session, Inode *in, MClientCaps *m) { - int mds = m->get_source().num(); - MetaSession *session = mds_sessions[mds]; + int mds = session->mds_num; // add/update it update_snap_trace(m->snapbl); @@ -3328,9 +3362,9 @@ void Client::handle_cap_import(Inode *in, MClientCaps *m) m->put(); } -void Client::handle_cap_export(Inode *in, MClientCaps *m) +void Client::handle_cap_export(MetaSession *session, Inode *in, MClientCaps *m) { - int mds = m->get_source().num(); + int mds = session->mds_num; Cap *cap = NULL; // note? @@ -3373,9 +3407,9 @@ void Client::handle_cap_export(Inode *in, MClientCaps *m) m->put(); } -void Client::handle_cap_trunc(Inode *in, MClientCaps *m) +void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m) { - int mds = m->get_source().num(); + int mds = session->mds_num; assert(in->caps[mds]); ldout(cct, 10) << "handle_cap_trunc on ino " << *in @@ -3391,8 +3425,9 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m) m->put(); } -void Client::handle_cap_flush_ack(Inode *in, int mds, Cap *cap, MClientCaps *m) +void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m) { + int mds = session->mds_num; int dirty = m->get_dirty(); int cleaned = 0; for (int i = 0; i < CEPH_CAP_BITS; ++i) { @@ -3428,9 +3463,9 @@ void Client::handle_cap_flush_ack(Inode *in, int mds, Cap *cap, MClientCaps *m) } -void Client::handle_cap_flushsnap_ack(Inode *in, MClientCaps *m) +void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, MClientCaps *m) { - int mds = m->get_source().num(); + int mds = session->mds_num; assert(in->caps[mds]); snapid_t follows = m->get_snap_follows(); @@ -3456,8 +3491,9 @@ void Client::handle_cap_flushsnap_ack(Inode *in, MClientCaps *m) } -void Client::handle_cap_grant(Inode *in, int mds, Cap *cap, MClientCaps *m) +void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m) { + int mds = session->mds_num; int used = in->caps_used(); const int old_caps = cap->issued; diff --git a/src/client/Client.h b/src/client/Client.h index ab63ebb23b5d..1d257073a7ea 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -232,7 +232,8 @@ public: list waiting_for_mdsmap; bool have_open_session(int mds); - void got_mds_push(int mds); + void got_mds_push(MetaSession *s); + MetaSession *_get_mds_session(int mds, Connection *con); ///< return session for mds *and* con; null otherwise MetaSession *_get_or_open_mds_session(int mds); MetaSession *_open_mds_session(int mds); void _close_mds_session(MetaSession *s); @@ -432,12 +433,12 @@ protected: void handle_snap(class MClientSnap *m); void handle_caps(class MClientCaps *m); - void handle_cap_import(Inode *in, class MClientCaps *m); - void handle_cap_export(Inode *in, class MClientCaps *m); - void handle_cap_trunc(Inode *in, class MClientCaps *m); - void handle_cap_flush_ack(Inode *in, int mds, Cap *cap, class MClientCaps *m); - void handle_cap_flushsnap_ack(Inode *in, class MClientCaps *m); - void handle_cap_grant(Inode *in, int mds, Cap *cap, class MClientCaps *m); + void handle_cap_import(MetaSession *session, Inode *in, class MClientCaps *m); + void handle_cap_export(MetaSession *session, Inode *in, class MClientCaps *m); + void handle_cap_trunc(MetaSession *session, Inode *in, class MClientCaps *m); + void handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, class MClientCaps *m); + void handle_cap_flushsnap_ack(MetaSession *session, Inode *in, class MClientCaps *m); + void handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, class MClientCaps *m); void cap_delay_requeue(Inode *in); void send_cap(Inode *in, MetaSession *session, Cap *cap, int used, int want, int retain, int flush);