return reply.get_future();
}).then([this] (Ref<MAuthReply> m) {
logger().info("mon {} => {} returns {}: {}",
- conn->get_my_addr(),
+ conn->get_messenger()->get_myaddr(),
conn->get_peer_addr(), *m, m->result);
reply = decltype(reply){};
auto p = m->result_bl.cbegin();
Ref<MAuthReply> m)
{
logger().info("mon {} => {} returns {}: {}",
- conn->get_my_addr(),
+ conn->get_messenger()->get_myaddr(),
conn->get_peer_addr(), *m, m->result);
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
class Connection : public boost::intrusive_ref_counter<Connection,
boost::thread_unsafe_counter> {
protected:
- entity_addr_t my_addr;
entity_addr_t peer_addr;
peer_type_t peer_type = -1;
public:
- Connection(const entity_addr_t& my_addr)
- : my_addr(my_addr) {}
+ Connection() {}
virtual ~Connection() {}
virtual Messenger* get_messenger() const = 0;
- const entity_addr_t& get_my_addr() const { return my_addr; }
const entity_addr_t& get_peer_addr() const { return peer_addr; }
virtual int get_peer_type() const = 0;
const entity_name_t& get_myname() const { return my_name; }
const entity_addr_t& get_myaddr() const { return my_addr; }
- void set_myaddr(const entity_addr_t& addr) {
+ virtual void set_myaddr(const entity_addr_t& addr) {
my_addr = addr;
}
}
SocketConnection::SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr,
Dispatcher& dispatcher)
- : Connection(my_addr),
- messenger(messenger),
+ : messenger(messenger),
dispatcher(dispatcher),
send_ready(h.promise.get_future())
{
h.reply.connect_seq = existing->connect_seq() + 1;
return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
}
- } else if (peer_addr < my_addr ||
+ } else if (peer_addr < messenger.get_myaddr() ||
existing->is_server_side()) {
// incoming wins
return replace_existing(existing, std::move(authorizer_reply));
ceph_assert(p.end());
validate_peer_addr(saddr, peer_addr);
- if (my_addr != caddr) {
- // take peer's address for me, but preserve my nonce
- caddr.nonce = my_addr.nonce;
- my_addr = caddr;
- }
+ side = side_t::connector;
+ socket_port = caddr.get_port();
+ messenger.learned_addr(caddr);
+
// encode/send client's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
- ::encode(my_addr, bl, 0);
+ ::encode(messenger.get_myaddr(), bl, 0);
h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
{
ceph_assert(state == state_t::none);
ceph_assert(!socket);
- peer_addr = _peer_addr;
+ peer_addr.u = _peer_addr.u;
+ peer_addr.set_port(0);
+ side = side_t::acceptor;
+ socket_port = _peer_addr.get_port();
socket.emplace(std::move(fd));
messenger.accept_conn(this);
logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
state = state_t::accepting;
- seastar::with_gate(pending_dispatch, [this] {
+ seastar::with_gate(pending_dispatch, [this, _peer_addr] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
- ::encode(my_addr, bl, 0);
- ::encode(peer_addr, bl, 0);
+ ::encode(messenger.get_myaddr(), bl, 0);
+ ::encode(_peer_addr, bl, 0);
return socket->write_flush(std::move(bl))
.then([this] {
// read client's handshake header and connect request
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
- if (!addr.is_blank_ip()) {
- peer_addr = addr;
- }
+ peer_addr.set_type(addr.get_type());
+ peer_addr.set_port(addr.get_port());
+ peer_addr.set_nonce(addr.get_nonce());
}).then([this] {
return seastar::repeat([this] {
return repeat_handle_connect();
void SocketConnection::print(ostream& out) const {
messenger.print(out);
- out << " >> " << peer_addr;
+ if (side == side_t::none) {
+ out << " >> " << peer_addr;
+ } else if (side == side_t::acceptor) {
+ out << " >> " << peer_addr
+ << "@" << socket_port;
+ } else { // side == side_t::connector
+ out << "@" << socket_port
+ << " >> " << peer_addr;
+ }
}
Dispatcher& dispatcher;
seastar::gate pending_dispatch;
+ // if acceptor side, socket_port is different from peer_addr.get_port();
+ // if connector side, socket_port is different from my_addr.get_port().
+ enum class side_t {
+ none,
+ acceptor,
+ connector
+ };
+ side_t side = side_t::none;
+ uint16_t socket_port = 0;
+
enum class state_t {
none,
accepting,
public:
SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr,
Dispatcher& dispatcher);
~SocketConnection();
using namespace ceph::net;
SocketMessenger::SocketMessenger(const entity_name_t& myname,
- const std::string& logic_name)
- : Messenger{myname}, logic_name{logic_name}
+ const std::string& logic_name,
+ uint32_t nonce)
+ : Messenger{myname}, logic_name{logic_name}, nonce{nonce}
{}
+void SocketMessenger::set_myaddr(const entity_addr_t& addr)
+{
+ entity_addr_t my_addr = addr;
+ my_addr.nonce = nonce;
+ // TODO: propagate to all the cores of the Messenger
+ Messenger::set_myaddr(my_addr);
+}
+
void SocketMessenger::bind(const entity_addr_t& addr)
{
if (addr.get_family() != AF_INET) {
seastar::socket_address paddr) {
// allocate the connection
entity_addr_t peer_addr;
- peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+ SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
// don't wait before accepting another
conn->start_accept(std::move(socket), peer_addr);
});
if (auto found = lookup_conn(peer_addr); found) {
return found;
}
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+ SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
conn->start_connect(peer_addr, peer_type);
return conn;
}
});
}
+void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+ if (!get_myaddr().is_blank_ip()) {
+ // already learned or binded
+ return;
+ }
+
+ // Only learn IP address if blank.
+ entity_addr_t addr = get_myaddr();
+ addr.u = peer_addr_for_me.u;
+ addr.set_type(peer_addr_for_me.get_type());
+ addr.set_port(get_myaddr().get_port());
+ set_myaddr(addr);
+}
+
void SocketMessenger::set_default_policy(const SocketPolicy& p)
{
policy_set.set_default(p);
ceph::net::PolicySet<Throttle> policy_set;
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
+ const uint32_t nonce;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
public:
SocketMessenger(const entity_name_t& myname,
- const std::string& logic_name);
+ const std::string& logic_name,
+ uint32_t nonce);
+
+ void set_myaddr(const entity_addr_t& addr) override;
void bind(const entity_addr_t& addr) override;
}
public:
+ void learned_addr(const entity_addr_t &peer_addr_for_me);
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
struct Server {
ceph::thread::Throttle byte_throttler;
static constexpr int64_t server_num = 0;
- ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server"};
+ ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0};
struct ServerDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
struct Client {
ceph::thread::Throttle byte_throttler;
static constexpr int64_t client_num = 1;
- ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client"};
+ ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0};
struct ClientDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
entity_addr_t addr;
struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1"};
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1};
struct ServerDispatcher : ceph::net::Dispatcher {
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
struct {
unsigned rounds;
std::bernoulli_distribution keepalive_dist{};
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1"};
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2};
struct ClientDispatcher : ceph::net::Dispatcher {
seastar::promise<MessageRef> reply;
unsigned count = 0u;
return seastar::do_with(test_state{},
[rounds, keepalive_ratio] (test_state& t) {
// bind the server
+ t.addr.set_type(entity_addr_t::TYPE_LEGACY);
t.addr.set_family(AF_INET);
t.addr.set_port(9010);
+ t.addr.set_nonce(1);
t.server.messenger.bind(t.addr);
t.client.rounds = rounds;
entity_addr_t addr;
struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2"};
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3};
class ServerDispatcher : public ceph::net::Dispatcher {
int count = 0;
seastar::promise<> on_second; // satisfied on second dispatch
} server;
struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2"};
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4};
ceph::net::Dispatcher dispatcher;
} client;
};
return seastar::do_with(test_state{},
[] (test_state& t) {
// bind the server
+ t.addr.set_type(entity_addr_t::TYPE_LEGACY);
t.addr.set_family(AF_INET);
t.addr.set_port(9010);
+ t.addr.set_nonce(3);
t.server.messenger.bind(t.addr);
return t.server.messenger.start(&t.server.dispatcher)
conf->cluster = cluster;
return conf.parse_config_files(conf_file_list);
}).then([] {
- return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc"},
+ return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0},
[](ceph::net::Messenger& msgr) {
auto& conf = ceph::common::local_conf();
if (conf->ms_crc_data) {