From: Sage Weil Date: Mon, 4 Jun 2018 20:00:57 +0000 (-0500) Subject: msg/async: multiple listening ServerSockets X-Git-Tag: v14.0.1~951^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6625caa7ff3739a3b8bbae1d22cb08e02d7a1af4;p=ceph.git msg/async: multiple listening ServerSockets No functional change yet. Signed-off-by: Sage Weil --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index f34e66357ab..352b907d26e 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -92,6 +92,7 @@ int Processor::bind(const entity_addr_t &bind_addr, /* bind to port */ int r = -1; + listen_sockets.resize(1); for (int i = 0; i < conf->ms_bind_retry_count; i++) { if (i > 0) { @@ -102,7 +103,7 @@ int Processor::bind(const entity_addr_t &bind_addr, if (listen_addr.get_port()) { worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_socket); + r = worker->listen(listen_addr, opts, &listen_sockets[0]); }, false); if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr @@ -117,7 +118,7 @@ int Processor::bind(const entity_addr_t &bind_addr, listen_addr.set_port(port); worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_socket); + r = worker->listen(listen_addr, opts, &listen_sockets[0]); }, false); if (r == 0) break; @@ -152,50 +153,58 @@ void Processor::start() ldout(msgr->cct, 1) << __func__ << dendl; // start thread - if (listen_socket) { - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); - } + worker->center.submit_to(worker->center.get_id(), [this]() { + for (auto& l : listen_sockets) { + if (l) { + worker->center.create_file_event(l.fd(), EVENT_READABLE, + listen_handler); } + } + }, false); } void Processor::accept() { - ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl; SocketOptions opts; opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; opts.priority = msgr->get_socket_priority(); - while (true) { - entity_addr_t addr; - ConnectedSocket cli_socket; - Worker *w = worker; - if (!msgr->get_stack()->support_local_listen_table()) - w = msgr->get_stack()->get_worker(); - else - ++w->references; - int r = listen_socket.accept(&cli_socket, opts, &addr, w); - if (r == 0) { - ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; - - msgr->add_accept(w, std::move(cli_socket), addr); - continue; - } else { - if (r == -EINTR) { - continue; - } else if (r == -EAGAIN) { - break; - } else if (r == -EMFILE || r == -ENFILE) { - lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() - << " errno " << r << " " << cpp_strerror(r) << dendl; - break; - } else if (r == -ECONNABORTED) { - ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() - << " errno " << r << " " << cpp_strerror(r) << dendl; - continue; + + for (auto& listen_socket : listen_sockets) { + ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() + << dendl; + while (true) { + entity_addr_t addr; + ConnectedSocket cli_socket; + Worker *w = worker; + if (!msgr->get_stack()->support_local_listen_table()) + w = msgr->get_stack()->get_worker(); + else + ++w->references; + int r = listen_socket.accept(&cli_socket, opts, &addr, w); + if (r == 0) { + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " + << cli_socket.fd() << dendl; + + msgr->add_accept(w, std::move(cli_socket), addr); + continue; } else { - lderr(msgr->cct) << __func__ << " no incoming connection?" - << " errno " << r << " " << cpp_strerror(r) << dendl; - break; + if (r == -EINTR) { + continue; + } else if (r == -EAGAIN) { + break; + } else if (r == -EMFILE || r == -ENFILE) { + lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; + break; + } else if (r == -ECONNABORTED) { + ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; + continue; + } else { + lderr(msgr->cct) << __func__ << " no incoming connection?" + << " errno " << r << " " << cpp_strerror(r) << dendl; + break; + } } } } @@ -205,12 +214,14 @@ void Processor::stop() { ldout(msgr->cct,10) << __func__ << dendl; - if (listen_socket) { - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); - listen_socket.abort_accept(); + worker->center.submit_to(worker->center.get_id(), [this]() { + for (auto& listen_socket : listen_sockets) { + if (listen_socket) { + worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); + listen_socket.abort_accept(); + } + } }, false); - } } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 698d5e85f03..0d4301e8942 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -47,7 +47,7 @@ class Processor { AsyncMessenger *msgr; NetHandler net; Worker *worker; - ServerSocket listen_socket; + vector listen_sockets; EventCallbackRef listen_handler; class C_processor_accept;