center->dispatch_event_external(read_handler);
}
-void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
+void AsyncConnection::accept(ConnectedSocket socket,
+ const entity_addr_t &listen_addr,
+ const entity_addr_t &peer_addr)
{
ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
- << " on " << addr << dendl;
+ << " listen_addr " << listen_addr
+ << " peer_addr " << peer_addr << dendl;
ceph_assert(socket.fd() >= 0);
std::lock_guard<std::mutex> l(lock);
cs = std::move(socket);
- socket_addr = addr;
- target_addr = addr; // until we know better
+ socket_addr = listen_addr;
+ target_addr = peer_addr; // until we know better
state = STATE_ACCEPTING;
protocol->accept();
// rescheduler connection in order to avoid lock dep
1) << "-- " << async_msgr->get_myaddrs() << " --> "
<< get_peer_addrs() << " -- "
<< *m << " -- " << m << " con "
- << m->get_connection().get()
+ << this
<< dendl;
// optimistic think it's ok to encode(actually may broken now)
void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
// Only call when AsyncConnection first construct
- void accept(ConnectedSocket socket, entity_addr_t &addr);
+ void accept(ConnectedSocket socket,
+ const entity_addr_t &listen_addr,
+ const entity_addr_t &peer_addr);
int send_message(Message *m) override;
void send_keepalive() override;
// Accepting state
bool msgr2 = false;
- entity_addr_t socket_addr;
- entity_addr_t target_addr; // which of the peer_addrs we're using
+ entity_addr_t socket_addr; ///< local socket addr
+ entity_addr_t target_addr; ///< which of the peer_addrs we're using
// used only by "read_until"
uint64_t state_offset;
}
if (listen_addr.get_port()) {
- worker->center.submit_to(worker->center.get_id(),
- [this, k, &listen_addr, &opts, &r]() {
- r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+ worker->center.submit_to(
+ worker->center.get_id(),
+ [this, k, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
}, false);
if (r < 0) {
lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
continue;
listen_addr.set_port(port);
- worker->center.submit_to(worker->center.get_id(),
- [this, k, &listen_addr, &opts, &r]() {
- r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+ worker->center.submit_to(
+ worker->center.get_id(),
+ [this, k, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
}, false);
if (r == 0)
break;
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
<< cli_socket.fd() << dendl;
- msgr->add_accept(w, std::move(cli_socket), addr);
+ msgr->add_accept(
+ w, std::move(cli_socket),
+ msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
+ addr);
accept_error_num = 0;
continue;
} else {
started = false;
}
-void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
+ const entity_addr_t &listen_addr,
+ const entity_addr_t &peer_addr)
{
lock.Lock();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
- addr.is_msgr2(), false);
- conn->accept(std::move(cli_socket), addr);
+ listen_addr.is_msgr2(), false);
+ conn->accept(std::move(cli_socket), listen_addr, peer_addr);
accepting_conns.insert(conn);
lock.Unlock();
}
}
void learned_addr(const entity_addr_t &peer_addr_for_me);
- void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+ void add_accept(Worker *w, ConnectedSocket cli_socket,
+ const entity_addr_t &listen_addr,
+ const entity_addr_t &peer_addr);
NetworkStack *get_stack() {
return stack;
}
int _fd;
public:
- explicit PosixServerSocketImpl(NetHandler &h, int f, int type)
- : ServerSocketImpl(type),
+ explicit PosixServerSocketImpl(NetHandler &h, int f,
+ const entity_addr_t& listen_addr, unsigned slot)
+ : ServerSocketImpl(listen_addr.get_type(), slot),
handler(h), _fd(f) {}
int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
void abort_accept() override {
{
}
-int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
+int PosixWorker::listen(entity_addr_t &sa,
+ unsigned addr_slot,
+ const SocketOptions &opt,
ServerSocket *sock)
{
int listen_sd = net.create_socket(sa.get_family(), true);
*sock = ServerSocket(
std::unique_ptr<PosixServerSocketImpl>(
- new PosixServerSocketImpl(net, listen_sd, sa.get_type())));
+ new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
return 0;
}
public:
PosixWorker(CephContext *c, unsigned i)
: Worker(c, i), net(c) {}
- int listen(entity_addr_t &sa, const SocketOptions &opt,
- ServerSocket *socks) override;
+ int listen(entity_addr_t &sa,
+ unsigned addr_slot,
+ const SocketOptions &opt,
+ ServerSocket *socks) override;
int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
};
auto legacy = messenger->get_myaddrs().legacy_addr();
encode(legacy, bl, 0); // legacy
connection->port = legacy.get_port();
- encode(connection->socket_addr, bl, 0); // legacy
+ encode(connection->target_addr, bl, 0); // legacy
- ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() << " "
- << connection->socket_addr << dendl;
+ ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
+ << " legacy " << legacy
+ << " socket_addr " << connection->socket_addr
+ << " target_addr " << connection->target_addr
+ << dendl;
return WRITE(bl, handle_server_banner_write);
}
if (peer_addr.is_blank_ip()) {
// peer apparently doesn't know what ip they have; figure it out for them.
int port = peer_addr.get_port();
- peer_addr.u = connection->socket_addr.u;
+ peer_addr.u = connection->target_addr.u;
peer_addr.set_port(port);
ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
/// \cond internal
class ServerSocketImpl {
public:
- int addr_type = 0;
- ServerSocketImpl(int t) : addr_type(t) {}
+ unsigned addr_type; ///< entity_addr_t::TYPE_*
+ unsigned addr_slot; ///< position of our addr in myaddrs().v
+ ServerSocketImpl(unsigned type, unsigned slot)
+ : addr_type(type), addr_slot(slot) {}
virtual ~ServerSocketImpl() {}
virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
virtual void abort_accept() = 0;
return _ssi->fd();
}
+ /// get listen/bind addr
+ unsigned get_addr_slot() {
+ return _ssi->addr_slot;
+ }
+
explicit operator bool() const {
return _ssi.get();
}
}
}
- virtual int listen(entity_addr_t &addr,
+ virtual int listen(entity_addr_t &addr, unsigned addr_slot,
const SocketOptions &opts, ServerSocket *) = 0;
virtual int connect(const entity_addr_t &addr,
const SocketOptions &opts, ConnectedSocket *socket) = 0;
#undef dout_prefix
#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
-RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband* i,
- RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : RDMAServerSocketImpl(cct, i, s, w, a)
+RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
+ CephContext *cct, Infiniband* i,
+ RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
+ : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot)
{
}
-int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa,
+ const SocketOptions &opt)
{
ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
cm_channel = rdma_create_event_channel();
RDMAServerSocketImpl::RDMAServerSocketImpl(
CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w,
- entity_addr_t& a)
- : ServerSocketImpl(a.get_type()),
+ entity_addr_t& a, unsigned slot)
+ : ServerSocketImpl(a.get_type(), slot),
cct(cct), net(cct), server_setup_socket(-1), infiniband(i),
dispatcher(s), worker(w), sa(a)
{
}
}
-int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
+ const SocketOptions &opt,ServerSocket *sock)
{
get_stack()->get_infiniband().init();
dispatcher->polling_start();
RDMAServerSocketImpl *p;
if (cct->_conf->ms_async_rdma_type == "iwarp") {
- p = new RDMAIWARPServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+ p = new RDMAIWARPServerSocketImpl(
+ cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this,
+ sa, addr_slot);
} else {
- p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+ p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(),
+ &get_stack()->get_dispatcher(), this, sa,
+ addr_slot);
}
int r = p->listen(sa, opt);
if (r < 0) {
PerfCounters *perf_logger;
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker();
- virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
+ virtual int listen(entity_addr_t &addr,
+ unsigned addr_slot,
+ const SocketOptions &opts, ServerSocket *) override;
virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
virtual void initialize() override;
RDMAStack *get_stack() { return stack; }
public:
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
- RDMAWorker *w, entity_addr_t& a);
+ RDMAWorker *w, entity_addr_t& a, unsigned slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
public:
- RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+ RDMAIWARPServerSocketImpl(
+ CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w,
+ entity_addr_t& addr, unsigned addr_slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
EventCenter *center = &worker->center;
ssize_t r = 0;
if (stack->support_local_listen_table() || worker->id == 0)
- r = worker->listen(bind_addr, options, &bind_socket);
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
ASSERT_EQ(0, r);
ConnectedSocket cli_socket, srv_socket;
ServerSocket bind_socket;
int r = 0;
if (stack->support_local_listen_table() || worker->id == 0)
- r = worker->listen(bind_addr, options, &bind_socket);
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
ASSERT_EQ(0, r);
ConnectedSocket cli_socket1, cli_socket2;
ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
SocketOptions options;
ServerSocket bind_socket1, bind_socket2;
- int r = worker->listen(bind_addr, options, &bind_socket1);
+ int r = worker->listen(bind_addr, 0, options, &bind_socket1);
ASSERT_EQ(0, r);
- r = worker->listen(bind_addr, options, &bind_socket2);
+ r = worker->listen(bind_addr, 0, options, &bind_socket2);
ASSERT_EQ(-EADDRINUSE, r);
}
{
ServerSocket bind_socket;
if (stack->support_local_listen_table() || worker->id == 0)
- r = worker->listen(bind_addr, options, &bind_socket);
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
ASSERT_EQ(0, r);
ConnectedSocket srv_socket, cli_socket;
ServerSocket bind_socket;
int r = 0;
if (stack->support_local_listen_table() || worker->id == 0) {
- r = worker->listen(bind_addr, options, &bind_socket);
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
ASSERT_EQ(0, r);
*listen_p = true;
}
t_data.worker = worker;
ServerSocket bind_socket;
if (stack->support_local_listen_table() || worker->id == 0) {
- r = worker->listen(bind_addr, options, &bind_socket);
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
ASSERT_EQ(0, r);
already_bind = true;
}