}
reset_beacon_killer();
+ } else {
+ dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
+ << " seq " << m->get_seq() << " dne" << dendl;
}
- delete m;
+ m->put();
}
void MDS::reset_beacon_killer()
cap->get_cap_id(), cap->get_last_seq(),
cap->pending(), cap->wanted(), 0,
cap->get_mseq());
- mds->send_message_client(m, it->first);
+ mds->send_message_client_counted(m, it->first);
}
- in->clear_client_caps();
+ in->clear_client_caps_after_export();
}
void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished)
delay -= reconnect_start;
dout(10) << " reconnect_start " << reconnect_start << " delay " << delay << dendl;
- if (!mds->is_reconnect() || session->is_closed()) {
- if (!mds->is_reconnect()) {
- // XXX maybe in the future we can do better than this?
- dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
- ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
- << ") from " << m->get_source_inst();
- } else if (!session) {
- dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
- ss << "denied reconnect attempt from " << m->get_source_inst() << " (no session)";
- } else if (session->is_closed()) {
- dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
- ss << "denied reconnect attempt from " << m->get_source_inst() << " (session closed)";
- } else
- assert(0);
- ss << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
+ if (!mds->is_reconnect()) {
+ // XXX maybe in the future we can do better than this?
+ dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
+ ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
+ << ") from " << m->get_source_inst()
+ << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
mds->logclient.log(LOG_INFO, ss);
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst());
- delete m;
+ mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_connection());
+ m->put();
return;
}
// no need to respond to client: they're telling us they have no session
} else {
// notify client of success with an OPEN
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), m->get_source_inst());
+ mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), m->get_connection());
- if (session->is_new()) {
- dout(10) << " session is new, will make best effort to reconnect "
+ if (session->is_closed()) {
+ dout(10) << " session is closed, will make best effort to reconnect "
<< m->get_source_inst() << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPENING);
version_t pv = ++mds->sessionmap.projected;
session = get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
- delete req;
+ req->put();
return;
}
- if (session->is_closed() || session->is_closing() || session->is_stale_purging() ||
- session->is_stale_closing()) {
- dout(5) << "session closed|closing|stale_purging|stale_closing, dropping" << dendl;
+ if (session->is_closed() ||
+ session->is_closing() ||
+ session->is_killing()) {
+ dout(5) << "session closed|closing|killing, dropping" << dendl;
- delete req;
+ req->put();
return;
}
}
header.data_off = 0;
memset(&footer, 0, sizeof(footer));
}
+ protected:
virtual ~Message() {
- assert(nref.test() == 0);
+ assert(nref.read() == 0);
if (connection)
connection->put();
}
void queue_received(Message *m) {
m->set_recv_stamp(g_clock.now());
- assert(m->nref.read() == 0);
- assert(m->nref.test() == 1); //this is just to make sure that a changeset
- //is working properly; if you start using the refcounting more and have multiple
- //people hanging on to a message, ditch the assert!
++
++ // this is just to make sure that a changeset is working
++ // properly; if you start using the refcounting more and have
++ // multiple people hanging on to a message, ditch the assert!
++ assert(m->nref.read() == 1);
+
queue_received(m, m->get_priority());
}
// -- don't need lock --
case CEPH_MSG_PING:
- ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
dout(10) << "ping from " << m->get_source() << dendl;
- delete m;
+ m->put();
break;
// -- don't need OSDMap --
// osd
case CEPH_MSG_SHUTDOWN:
- ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
- shutdown();
+ session = (Session *)m->get_connection()->get_priv();
+ if (!session ||
+ session->caps.is_mon() ||
+ session->caps.is_osd()) shutdown();
+ else dout(0) << "shutdown message from connection with insufficient privs!"
+ << m->get_connection() << dendl;
- delete m;
- if (session) session->put();
+ m->put();
++ if (session)
++ session->put();
break;
case MSG_PGSTATSACK:
void OSD::handle_scrub(MOSDScrub *m)
{
dout(10) << "handle_scrub " << *m << dendl;
-
+ if (!require_mon_peer(m))
+ return;
if (ceph_fsid_compare(&m->fsid, &monc->get_fsid())) {
dout(0) << "handle_scrub fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
- delete m;
+ m->put();
return;
}
return;
}
- delete m;
+ Session *session = (Session *)m->get_connection()->get_priv();
+ if (session && !(session->caps.is_mon() || session->caps.is_osd())) {
+ //not enough perms!
++ m->put();
+ session->put();
+ return;
+ }
+ if(session) session->put();
+
if (osdmap) {
dout(3) << "handle_osd_map epochs ["
<< m->get_first() << "," << m->get_last()
- delete m;
+bool OSD::require_mon_peer(Message *m)
+{
+ if (!m->get_connection()->peer_is_mon()) {
+ dout(0) << "require_mon_peer received from non-mon " << m->get_connection()->get_peer_addr()
+ << " " << *m << dendl;
++ m->put();
+ return false;
+ }
+ return true;
+}
- delete m;
+bool OSD::require_osd_peer(Message *m)
+{
+ if (!m->get_connection()->peer_is_osd()) {
+ dout(0) << "require_osd_peer received from non-osd " << m->get_connection()->get_peer_addr()
+ << " " << *m << dendl;
++ m->put();
+ return false;
+ }
+ return true;
+}
bool OSD::require_current_map(Message *m, epoch_t ep)
{
do_infos(info_map);
kick_pg_split_queue();
- if (to_peer.size())
+ if (num_created)
update_heartbeat_peers();
- delete m;
+ m->put();
}