int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
{
- std::lock_guard l{lock};
-
FUNCTRACE(cct);
ceph_assert(m);
return -EINVAL;
}
- auto av = _filter_addrs(addrs);
- const AsyncConnectionRef& conn = _lookup_conn(av);
- submit_message(m, conn, av, type);
+ if (cct->_conf->ms_dump_on_send) {
+ m->encode(-1, MSG_CRC_ALL);
+ ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
+ m->get_payload().hexdump(*_dout);
+ if (m->get_data().length() > 0) {
+ *_dout << " data:\n";
+ m->get_data().hexdump(*_dout);
+ }
+ *_dout << dendl;
+ m->clear_payload();
+ }
+
+ connect_to(type, addrs, false)->send_message(m);
return 0;
}
return conn;
}
-void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con,
- const entity_addrvec_t& dest_addrs,
- int dest_type)
-{
- if (cct->_conf->ms_dump_on_send) {
- m->encode(-1, MSG_CRC_ALL);
- ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
- m->get_payload().hexdump(*_dout);
- if (m->get_data().length() > 0) {
- *_dout << " data:\n";
- m->get_data().hexdump(*_dout);
- }
- *_dout << dendl;
- m->clear_payload();
- }
-
- // existing connection?
- if (con) {
- con->send_message(m);
- return ;
- }
-
- // local?
- if (*my_addrs == dest_addrs ||
- (dest_addrs.v.size() == 1 &&
- my_addrs->contains(dest_addrs.front()))) {
- // local
- local_connection->send_message(m);
- return ;
- }
-
- // remote, no existing connection.
- const Policy& policy = get_policy(dest_type);
- if (policy.server) {
- ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
- << ", lossy server for target type "
- << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
- m->put();
- } else {
- ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
- << ", new connection." << dendl;
- auto&& new_con = create_connect(dest_addrs, dest_type, false);
- new_con->send_message(m);
- }
-}
-
/**
* If my_addr doesn't have an IP set, this function
* will fill it in from the passed addr. Otherwise it does nothing and returns.
AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
bool anon);
- /**
- * Queue up a Message for delivery to the entity specified
- * by addr and dest_type.
- * submit_message() is responsible for creating
- * new AsyncConnection (and closing old ones) as necessary.
- *
- * @param m The Message to queue up. This function eats a reference.
- * @param con The existing Connection to use, or NULL if you don't know of one.
- * @param dest_addr The address to send the Message to.
- * @param dest_type The peer type of the address we're sending to
- * just drop silently under failure.
- */
- void submit_message(Message *m, const AsyncConnectionRef& con,
- const entity_addrvec_t& dest_addrs, int dest_type);
void _finish_bind(const entity_addrvec_t& bind_addrs,
const entity_addrvec_t& listen_addrs);