bool OSDService::ObjecterDispatcher::ms_dispatch(Message *m)
{
- osd->objecter->dispatch(m);
+ if (!osd->objecter->ms_dispatch(m))
+ m->put();
return true;
}
AuthAuthorizer **authorizer,
bool force_new)
{
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
- *authorizer = osd->monc->auth->build_authorizer(dest_type);
- return *authorizer != NULL;
+ return osd->objecter->ms_get_authorizer(dest_type, authorizer, force_new);
}
-void Objecter::dispatch(Message *m)
+bool Objecter::ms_dispatch(Message *m)
{
+ ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
switch (m->get_type()) {
+ // these we exlusively handle
case CEPH_MSG_OSD_OPREPLY:
handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
- break;
-
- case CEPH_MSG_OSD_MAP:
- handle_osd_map(static_cast<MOSDMap*>(m));
- break;
+ return true;
+
+ case MSG_COMMAND_REPLY:
+ handle_command_reply(static_cast<MCommandReply*>(m));
+ return true;
case MSG_GETPOOLSTATSREPLY:
handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
- break;
-
- case CEPH_MSG_STATFS_REPLY:
- handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
- break;
+ return true;
case CEPH_MSG_POOLOP_REPLY:
handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
- break;
+ return true;
- case MSG_COMMAND_REPLY:
- handle_command_reply(static_cast<MCommandReply*>(m));
- break;
+ // these we give others a chance to inspect
+
+ // MDS, OSD
+ case CEPH_MSG_OSD_MAP:
+ m->get();
+ handle_osd_map(static_cast<MOSDMap*>(m));
+ return false;
- default:
- ldout(cct, 0) << "don't know message type " << m->get_type() << dendl;
- assert(0);
+ // Client
+ case CEPH_MSG_STATFS_REPLY:
+ m->get();
+ handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
+ return false;
}
+ return false;
}
void Objecter::_scan_requests(OSDSession *s,
resend_mon_ops();
}
-void Objecter::ms_handle_reset(Connection *con)
+bool Objecter::ms_handle_reset(Connection *con)
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
//
} else {
ldout(cct, 10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
}
+ return true;
}
+ return false;
}
void Objecter::ms_handle_remote_reset(Connection *con)
ms_handle_reset(con);
}
+bool Objecter::ms_get_authorizer(int dest_type,
+ AuthAuthorizer **authorizer,
+ bool force_new)
+{
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+ *authorizer = monc->auth->build_authorizer(dest_type);
+ return *authorizer != NULL;
+}
+
void Objecter::op_target_t::dump(Formatter *f) const
{
// ----------------
-class Objecter : public md_config_obs_t {
+class Objecter : public md_config_obs_t, public Dispatcher {
public:
// config observer bits
virtual const char** get_tracked_conf_keys() const;
Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
OSDMap *om, double mon_timeout,
double osd_timeout) :
+ Dispatcher(cct),
messenger(m), monc(mc), osdmap(om), cct(cct_),
initialized(false),
last_tid(0), client_inc(-1), max_linger_id(0),
// messages
public:
- void dispatch(Message *m);
+ bool ms_dispatch(Message *m);
void handle_osd_op_reply(class MOSDOpReply *m);
void handle_osd_map(class MOSDMap *m);
void wait_for_osd_map();
}
void ms_handle_connect(Connection *con);
- void ms_handle_reset(Connection *con);
+ bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+ bool ms_get_authorizer(int dest_type,
+ AuthAuthorizer **authorizer,
+ bool force_new);
+
void blacklist_self(bool set);
};