*
* @return 0 on success, or -errno on failure.
*/
- virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
+ virtual int send_message(Message *m, const entity_inst_t& dest) {
+ return send_to(m, dest.name.type(), entity_addrvec_t(dest.addr));
+ }
virtual int send_to(
Message *m,
int type,
- const entity_addrvec_t& addr) {
- // temporary
- return send_message(m, entity_inst_t(entity_name_t(type, -1),
- addr.legacy_addr()));
- }
+ const entity_addrvec_t& addr) = 0;
int send_to_mon(
Message *m, const entity_addrvec_t& addrs) {
return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
return local_connection;
}
-int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
+int AsyncMessenger::_send_to(Message *m, int type, const entity_addrvec_t& addrs)
{
FUNCTRACE(cct);
assert(m);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
- ldout(cct, 1) << __func__ << "--> " << dest.name << " "
- << dest.addr << " -- " << *m << " -- ?+"
+ ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
+ << addrs << " -- " << *m << " -- ?+"
<< m->get_data().length() << " " << m << dendl;
- if (dest.addr == entity_addr_t()) {
+ if (addrs.empty()) {
ldout(cct,0) << __func__ << " message " << *m
- << " with empty dest " << dest.addr << dendl;
+ << " with empty dest " << addrs << dendl;
m->put();
return -EINVAL;
}
- AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr));
- submit_message(m, conn, dest.addr, dest.name.type());
+ AsyncConnectionRef conn = _lookup_conn(addrs);
+ submit_message(m, conn, addrs, type);
return 0;
}
void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
- const entity_addr_t& dest_addr, int dest_type)
+ const entity_addrvec_t& dest_addrs,
+ int dest_type)
{
if (cct->_conf->ms_dump_on_send) {
m->encode(-1, MSG_CRC_ALL);
}
// local?
- if (my_addrs->legacy_addr() == dest_addr) {
+ if (*my_addrs == dest_addrs) {
// local
local_connection->send_message(m);
return ;
// remote, no existing connection.
const Policy& policy = get_policy(dest_type);
if (policy.server) {
- ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr
+ ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
<< ", lossy server for target type "
<< ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
m->put();
} else {
- ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
- con = create_connect(entity_addrvec_t(dest_addr), dest_type);
+ ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
+ << ", new connection." << dendl;
+ con = create_connect(dest_addrs, dest_type);
con->send_message(m);
}
}
* @defgroup Messaging
* @{
*/
- int send_message(Message *m, const entity_inst_t& dest) override {
+ int send_to(Message *m, int type, const entity_addrvec_t& addrs) override {
Mutex::Locker l(lock);
- return _send_message(m, dest);
+ return _send_to(m, type, addrs);
}
/** @} // Messaging */
* just drop silently under failure.
*/
void submit_message(Message *m, AsyncConnectionRef con,
- const entity_addr_t& dest_addr, int dest_type);
+ const entity_addrvec_t& dest_addrs, int dest_type);
- int _send_message(Message *m, const entity_inst_t& dest);
+ int _send_to(Message *m, int type, const entity_addrvec_t& addrs);
void _finish_bind(const entity_addrvec_t& bind_addrs,
const entity_addrvec_t& listen_addrs);