From: Haomai Wang Date: Wed, 13 Jul 2016 07:59:12 +0000 (+0800) Subject: msg/async/AsyncConnection: support NetworkStack api instead of posix X-Git-Tag: ses5-milestone5~107^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d0cd88b3ee805854c6c0c5272f14793e95211b43;p=ceph.git msg/async/AsyncConnection: support NetworkStack api instead of posix 1. replace sd to ConnectedSocket 2. Replace WorkerPool with Stack 3. Use Stack worker Signed-off-by: Haomai Wang --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 72cee895d64..b42bab3ffdd 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -197,8 +197,9 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init -OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger +OPTION(ms_async_transport_type, OPT_STR, "posix") +OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init +OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) // example: ms_async_affinity_cores = 0,1 // The number of coreset is expected to equal to ms_async_op_threads, otherwise diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 80756a0e278..5012e92df03 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -14,8 +14,6 @@ * */ -#include -#include #include #include "include/Context.h" @@ -23,8 +21,6 @@ #include "AsyncMessenger.h" #include "AsyncConnection.h" -#include "include/sock_compat.h" - // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR #define SEQ_MASK 0x7fffffff @@ -32,8 +28,9 @@ #undef dout_prefix #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { + int fd = cs ? cs.fd() : -1; return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this - << " sd=" << sd << " :" << port + << " sd=" << fd << " :" << port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -122,7 +119,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), + out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), dispatch_queue(q), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), @@ -130,7 +127,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), worker(w), center(&w->center) { read_handler = new C_handle_read(this); @@ -167,203 +164,52 @@ void AsyncConnection::maybe_start_delay_thread() /* return -1 means `fd` occurs error or closed, it should be closed * return 0 means EAGAIN or EINTR */ -ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len) +ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) { ssize_t nread; again: - nread = ::read(fd, buf, len); - if (nread == -1) { - if (errno == EAGAIN) { + nread = cs.read(buf, len); + if (nread < 0) { + if (nread == -EAGAIN) { nread = 0; - } else if (errno == EINTR) { + } else if (nread == -EINTR) { goto again; } else { - ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd - << " : "<< strerror(errno) << dendl; + ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() + << " : "<< strerror(nread) << dendl; return -1; } } else if (nread == 0) { ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " - << fd << dendl; + << cs.fd() << dendl; return -1; } return nread; } -/* - SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL - http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html - http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html -*/ -void AsyncConnection::suppress_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - We want to ignore possible SIGPIPE that we can generate on write. - SIGPIPE is delivered *synchronously* and *only* to the thread - doing the write. So if it is reported as already pending (which - means the thread blocks it), then we do nothing: if we generate - SIGPIPE, it will be merged with the pending one (there's no - queuing), and that suits us well. If it is not pending, we block - it in this thread (and we avoid changing signal action, because it - is per-process). - */ - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - sigpipe_pending = sigismember(&pending, SIGPIPE); - if (!sigpipe_pending) { - sigset_t blocked; - sigemptyset(&blocked); - pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); - - /* Maybe is was blocked already? */ - sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - - -void AsyncConnection::restore_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - If SIGPIPE was pending already we do nothing. Otherwise, if it - become pending (i.e., we generated it), then we sigwait() it (thus - clearing pending status). Then we unblock SIGPIPE, but only if it - were us who blocked it. - */ - if (!sigpipe_pending) { - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - if (sigismember(&pending, SIGPIPE)) { - /* - Protect ourselves from a situation when SIGPIPE was sent - by the user to the whole process, and was delivered to - other thread before we had a chance to wait for it. - */ - static const struct timespec nowait = { 0, 0 }; - TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); - } - - if (sigpipe_unblock) - pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - -// return the length of msg needed to be sent, -// < 0 means error occured -ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more) -{ - suppress_sigpipe(); - - while (len > 0) { - ssize_t r; -#if defined(MSG_NOSIGNAL) - r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); -#else - r = ::sendmsg(sd, &msg, (more ? MSG_MORE : 0)); -#endif /* defined(MSG_NOSIGNAL) */ - - if (r == 0) { - ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl; - } else if (r < 0) { - if (errno == EINTR) { - continue; - } else if (errno == EAGAIN) { - break; - } else { - ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl; - restore_sigpipe(); - return r; - } - } - - len -= r; - if (len == 0) break; - - // hrmph. drain r bytes from the front of our message. - ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl; - while (r > 0) { - if (msg.msg_iov[0].iov_len <= (size_t)r) { - // drain this whole item - r -= msg.msg_iov[0].iov_len; - msg.msg_iov++; - msg.msg_iovlen--; - } else { - msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; - msg.msg_iov[0].iov_len -= r; - break; - } - } - } - restore_sigpipe(); - return (ssize_t)len; -} - // return the remaining bytes, it may larger than the length of ptr // else return < 0 means error ssize_t AsyncConnection::_try_send(bool more) { - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); - } - } - - uint64_t sent_bytes = 0; - list::const_iterator pb = outcoming_bl.buffers().begin(); - uint64_t left_pbrs = outcoming_bl.buffers().size(); - while (left_pbrs) { - struct msghdr msg; - uint64_t size = MIN(left_pbrs, ASYNC_IOV_MAX); - left_pbrs -= size; - memset(&msg, 0, sizeof(msg)); - msg.msg_iovlen = 0; - msg.msg_iov = msgvec; - unsigned msglen = 0; - while (size > 0) { - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); - msgvec[msg.msg_iovlen].iov_len = pb->length(); - msg.msg_iovlen++; - msglen += pb->length(); - ++pb; - size--; + cs.shutdown(); } - - ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more); - if (r < 0) - return r; - - // "r" is the remaining length - sent_bytes += msglen - r; - if (r > 0) { - ldout(async_msgr->cct, 5) << __func__ << " remaining " << r - << " needed to be sent, creating event for writing" - << dendl; - break; - } - // only "r" == 0 continue } - // trim already sent for outcoming_bl - if (sent_bytes) { - if (sent_bytes < outcoming_bl.length()) { - outcoming_bl.splice(0, sent_bytes); - } else { - outcoming_bl.clear(); - } + ssize_t r = cs.send(outcoming_bl, more); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; + return r; } - ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes + ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r << " remaining bytes " << outcoming_bl.length() << dendl; if (!open_write && is_queued()) { if (center->in_thread()) { - center->create_file_event(sd, EVENT_WRITABLE, write_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); open_write = true; } else { center->dispatch_event_external(write_handler); @@ -372,7 +218,7 @@ ssize_t AsyncConnection::_try_send(bool more) if (open_write && !is_queued()) { if (center->in_thread()) { - center->delete_file_event(sd, EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); open_write = false; } else { center->dispatch_event_external(write_handler); @@ -398,10 +244,10 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " << state_offset << dendl; - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); + cs.shutdown(); } } @@ -426,7 +272,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) if (len > recv_max_prefetch) { /* this was a large read, we don't prefetch for these */ do { - r = read_bulk(sd, p+state_offset, left); + r = read_bulk(p+state_offset, left); ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; @@ -440,7 +286,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) } while (r > 0); } else { do { - r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch); + r = read_bulk(recv_buf+recv_end, recv_max_prefetch); ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end << " left is " << left << " got " << r << dendl; if (r < 0) { @@ -1011,35 +857,34 @@ ssize_t AsyncConnection::_process_connection() global_seq = async_msgr->get_global_seq(); // close old socket. this is safe because we stopped the reader thread above. - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::close(sd); + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.close(); } - sd = net.nonblock_connect(get_peer_addr()); - if (sd < 0) { + SocketOptions opts; + r = worker->connect(get_peer_addr(), opts, &cs); + if (r < 0) goto fail; - } - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); state = STATE_CONNECTING_RE; break; } case STATE_CONNECTING_RE: { - r = net.reconnect(get_peer_addr(), sd); + r = cs.is_connected(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; goto fail; - } else if (r > 0) { + } else if (r == 0) { ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl; - center->create_file_event(sd, EVENT_WRITABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler); break; } - center->delete_file_event(sd, EVENT_WRITABLE); - net.set_priority(sd, async_msgr->get_socket_priority()); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; bufferlist bl; @@ -1092,7 +937,7 @@ ssize_t AsyncConnection::_process_connection() goto fail; } ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " - << paddr << " on socket " << sd << dendl; + << paddr << " on socket " << cs.fd() << dendl; if (peer_addr != paddr) { if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { @@ -1139,7 +984,7 @@ ssize_t AsyncConnection::_process_connection() << async_msgr->get_myaddr() << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1185,7 +1030,7 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1340,30 +1185,14 @@ ssize_t AsyncConnection::_process_connection() case STATE_ACCEPTING: { bufferlist bl; - - if (net.set_nonblock(sd) < 0) - goto fail; - - net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf); - net.set_priority(sd, async_msgr->get_socket_priority()); - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); ::encode(async_msgr->get_myaddr(), bl, 0); // legacy port = async_msgr->get_myaddr().get_port(); - // and peer's socket addr (they might not know their ip) - sockaddr_storage ss; - socklen_t len = sizeof(ss); - r = ::getpeername(sd, (sockaddr*)&ss, &len); - if (r < 0) { - ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername " - << cpp_strerror(errno) << dendl; - goto fail; - } - socket_addr.set_sockaddr((sockaddr*)&ss); ::encode(socket_addr, bl, 0); // legacy - ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl; + ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl; r = try_send(bl); if (r == 0) { @@ -1813,7 +1642,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->is_reset_from_peer = true; } - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); // Clean up output buffer existing->outcoming_bl.clear(); @@ -1824,13 +1653,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->requeue_sent(); existing->reset_recv_state(); - int new_fd = sd; + auto temp_cs = std::move(cs); EventCenter *new_center = center; Worker *new_worker = worker; // avoid _stop shutdown replacing socket - sd = -1; // queue a reset on the new connection, which we're dumping for the old _stop(); + dispatch_queue->queue_reset(this); ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; existing->can_write = WriteStatus::REPLACING; @@ -1844,25 +1673,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // there shouldn't exist any buffer assert(recv_start == recv_end); - existing->write_lock.unlock(); - // new sd now isn't registered any event while origin events - // have been deleted. - // previous existing->sd now is still open, event will continue to - // notify previous existing->center from now. - // From now, no one will dispatch event to `existing` - // Note: we must use async dispatch instead of execute this inline - // even existing->center == center. Because we must ensure below - // event executed after all pending external events like - // "dispatch_state->queue" - existing->center->submit_to( - existing->center->get_id(), - [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable { + auto deactivate_existing = std::bind( + [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); if (existing->state == STATE_NONE) { existing->shutdown_socket(); - existing->sd = new_fd; + existing->cs = std::move(cs); existing->worker->references--; new_worker->references++; existing->logger = new_worker->get_perf_counter(); @@ -1871,7 +1689,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing->delay_state) existing->delay_state->set_center(new_center); } else if (existing->state == STATE_CLOSED) { - ::close(new_fd); + cs.close(); return ; } else { assert(0); @@ -1881,15 +1699,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Before changing existing->center, it may already exists some events in existing->center's queue. // Then if we mark down `existing`, it will execute in another thread and clean up connection. // Previous event will result in segment fault - auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable { + auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable { std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; - assert(new_fd == existing->sd); assert(existing->state == STATE_NONE); - + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; - existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler); + existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); reply.global_seq = existing->peer_global_seq; if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error @@ -1901,8 +1718,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis else existing->center->submit_to( existing->center->get_id(), std::move(transfer_existing), true); - }, true); + }, std::move(temp_cs)); + existing->center->submit_to( + existing->center->get_id(), std::move(deactivate_existing), true); + existing->write_lock.unlock(); existing->lock.unlock(); return 0; } @@ -2012,13 +1832,14 @@ void AsyncConnection::_connect() center->dispatch_event_external(read_handler); } -void AsyncConnection::accept(int incoming) +void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) { - ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; - assert(sd < 0); + ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; + assert(socket.fd() > 0); std::lock_guard l(lock); - sd = incoming; + cs = std::move(socket); + socket_addr = addr; state = STATE_ACCEPTING; // rescheduler connection in order to avoid lock dep center->dispatch_event_external(read_handler); @@ -2407,7 +2228,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) ssize_t rc = _try_send(more); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " - << cpp_strerror(errno) << dendl; + << cpp_strerror(rc) << dendl; } else if (rc == 0) { ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; } else { @@ -2617,7 +2438,7 @@ void AsyncConnection::handle_write() if (state == STATE_STANDBY && !policy.server && is_queued()) { ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; _connect(); - } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { + } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3988bf9a744..9d6c3435eef 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -34,7 +34,7 @@ using namespace std; #include "msg/Messenger.h" #include "Event.h" -#include "net_handler.h" +#include "Stack.h" class AsyncMessenger; class Worker; @@ -50,9 +50,7 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); */ class AsyncConnection : public Connection { - ssize_t read_bulk(int fd, char *buf, unsigned len); - void suppress_sigpipe(); - void restore_sigpipe(); + ssize_t read_bulk(char *buf, unsigned len); ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool more=false) { std::lock_guard l(write_lock); @@ -110,11 +108,10 @@ class AsyncConnection : public Connection { center->delete_time_event(last_tick_id); last_tick_id = 0; } - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::shutdown(sd, SHUT_RDWR); - ::close(sd); - sd = -1; + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.shutdown(); + cs.close(); } } Message *_get_next_outgoing(bufferlist *bl) { @@ -209,7 +206,7 @@ class AsyncConnection : public Connection { _connect(); } // Only call when AsyncConnection first construct - void accept(int sd); + void accept(ConnectedSocket socket, entity_addr_t &addr); int send_message(Message *m) override; void send_keepalive() override; @@ -303,7 +300,7 @@ class AsyncConnection : public Connection { atomic64_t ack_left, in_seq; int state; int state_after_send; - int sd; + ConnectedSocket cs; int port; Messenger::Policy policy; @@ -372,17 +369,10 @@ class AsyncConnection : public Connection { char *state_buffer; // used only by "read_until" uint64_t state_offset; - NetHandler net; Worker *worker; EventCenter *center; ceph::shared_ptr session_security; -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - sigset_t sigpipe_mask; - bool sigpipe_pending; - bool sigpipe_unblock; -#endif - public: // used by eventcallback void handle_write(); diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 53eff733e27..95ec0ebd140 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -16,19 +16,14 @@ #include "acconfig.h" -#include #include #include #include "AsyncMessenger.h" -#include "include/str_list.h" -#include "common/strtol.h" #include "common/config.h" #include "common/Timer.h" #include "common/errno.h" -#include "auth/Crypto.h" -#include "include/Spinlock.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -41,14 +36,6 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) { return *_dout << " Processor -- "; } -static ostream& _prefix(std::ostream *_dout, Worker *w) { - return *_dout << " Worker -- "; -} - -static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { - return *_dout << " WorkerPool -- "; -} - /******************* * Processor @@ -64,13 +51,9 @@ class Processor::C_processor_accept : public EventCallback { } }; -Processor::Processor(AsyncMessenger *r, CephContext *c, uint64_t n) - : msgr(r), - net(c), - worker(NULL), - listen_sd(-1), - nonce(n), - listen_handler(new C_processor_accept(this)) {} +Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n) + : msgr(r), net(c), worker(w), nonce(n), + listen_handler(new C_processor_accept(this)) {} int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) { @@ -90,30 +73,16 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; } - /* socket creation */ - listen_sd = ::socket(family, SOCK_STREAM, 0); - if (listen_sd < 0) { - lderr(msgr->cct) << __func__ << " unable to create socket: " - << cpp_strerror(errno) << dendl; - return -errno; - } - - int r = net.set_nonblock(listen_sd); - if (r < 0) { - ::close(listen_sd); - listen_sd = -1; - return r; - } - net.set_close_on_exec(listen_sd); - net.set_socket_options(listen_sd, msgr->cct->_conf->ms_tcp_nodelay, msgr->cct->_conf->ms_tcp_rcvbuf); + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; // use whatever user specified (if anything) entity_addr_t listen_addr = bind_addr; listen_addr.set_family(family); /* bind to port */ - int rc = -1; - r = -1; + int r = -1; for (int i = 0; i < conf->ms_bind_retry_count; i++) { if (i > 0) { @@ -123,22 +92,12 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) } if (listen_addr.get_port()) { - // specific port - // reuse addr+port when possible - int on = 1; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl; - r = -errno; - continue; - } - - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc < 0) { + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << ": " << cpp_strerror(errno) << dendl; - r = -errno; + << ": " << cpp_strerror(r) << dendl; continue; } } else { @@ -148,60 +107,34 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) continue; listen_addr.set_port(port); - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc == 0) + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r == 0) break; } - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr << " on any port in range " << msgr->cct->_conf->ms_bind_port_min << "-" << msgr->cct->_conf->ms_bind_port_max << ": " - << cpp_strerror(errno) << dendl; - r = -errno; + << cpp_strerror(r) << dendl; listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. continue; } ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl; } - if (rc == 0) + if (r == 0) break; } // It seems that binding completely failed, return with that exit status - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count - << " attempts: " << cpp_strerror(errno) << dendl; - ::close(listen_sd); - listen_sd = -1; + << " attempts: " << cpp_strerror(r) << dendl; return r; } - // what port did we get? - sockaddr_storage ss; - socklen_t llen = sizeof(ss); - rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - listen_addr.set_sockaddr((sockaddr*)&ss); - ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; - // listen! - rc = ::listen(listen_sd, 128); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - msgr->set_myaddr(bind_addr); if (bind_addr != entity_addr_t()) msgr->learned_addr(bind_addr); @@ -237,47 +170,48 @@ int Processor::rebind(const set& avoid_ports) return bind(addr, new_avoid); } -void Processor::start(Worker *w) +void Processor::start() { - ldout(msgr->cct, 1) << __func__ << " " << dendl; + ldout(msgr->cct, 1) << __func__ << dendl; // start thread - if (listen_sd >= 0) { - worker = w; + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); }); + worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); } } void Processor::accept() { - ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl; + ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl; + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; while (true) { - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); - if (sd >= 0) { - net.set_close_on_exec(sd); - ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl; - - msgr->add_accept(sd); + entity_addr_t addr; + ConnectedSocket cli_socket; + int r = listen_socket.accept(&cli_socket, opts, &addr); + if (r == 0) { + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; + + msgr->add_accept(worker, std::move(cli_socket), addr); continue; } else { - if (errno == EINTR) { + if (r == -EINTR) { continue; - } else if (errno == EAGAIN) { + } else if (r == -EAGAIN) { break; - } else if (errno == EMFILE || errno == ENFILE) { - lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -EMFILE || r == -ENFILE) { + lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; break; - } else if (errno == ECONNABORTED) { - ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -ECONNABORTED) { + ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; continue; } else { - lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + lderr(msgr->cct) << __func__ << " no incoming connection?" + << " errno " << r << " " << cpp_strerror(r) << dendl; break; } } @@ -288,208 +222,25 @@ void Processor::stop() { ldout(msgr->cct,10) << __func__ << dendl; - if (listen_sd >= 0) { + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(listen_sd, EVENT_READABLE); - ::shutdown(listen_sd, SHUT_RDWR); - ::close(listen_sd); - listen_sd = -1; - }); - } -} - -void Worker::stop() -{ - ldout(cct, 10) << __func__ << dendl; - done = true; - center.wakeup(); -} - -class WorkerPool { - CephContext *cct; - vector workers; - vector coreids; - // Used to indicate whether thread started - bool started; - Mutex barrier_lock; - Cond barrier_cond; - atomic_t barrier_count; - simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; - - class C_barrier : public EventCallback { - WorkerPool *pool; - public: - explicit C_barrier(WorkerPool *p): pool(p) {} - void do_request(int id) { - Mutex::Locker l(pool->barrier_lock); - pool->barrier_count.dec(); - pool->barrier_cond.Signal(); - delete this; - } - }; - friend class C_barrier; - public: - std::atomic_uint pending; - explicit WorkerPool(CephContext *c); - WorkerPool(const WorkerPool &) = delete; - WorkerPool& operator=(const WorkerPool &) = delete; - virtual ~WorkerPool(); - void start(); - Worker *get_worker(); - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - void barrier(); -}; - -void *Worker::entry() -{ - ldout(cct, 10) << __func__ << " starting" << dendl; - if (cct->_conf->ms_async_set_affinity) { - int cid = pool->get_cpuid(id); - if (cid >= 0 && set_affinity(cid)) { - ldout(cct, 0) << __func__ << " sched_setaffinity failed: " - << cpp_strerror(errno) << dendl; - } - } - - center.set_owner(); - pool->pending--; - while (!done) { - ldout(cct, 20) << __func__ << " calling event process" << dendl; - - int r = center.process_events(EventMaxWaitUs); - if (r < 0) { - ldout(cct, 20) << __func__ << " process events failed: " - << cpp_strerror(errno) << dendl; - // TODO do something? - } - } - - return 0; -} - -/******************* - * WorkerPool - *******************/ -WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), - barrier_lock("WorkerPool::WorkerPool::barrier_lock"), - barrier_count(0), pending(0) -{ - assert(cct->_conf->ms_async_op_threads > 0); - // make sure user won't try to force some crazy number of worker threads - assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && - cct->_conf->ms_async_op_threads <= 32); - for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { - Worker *w = new Worker(cct, this, i); - workers.push_back(w); - } - vector corestrs; - get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); - for (vector::iterator it = corestrs.begin(); - it != corestrs.end(); ++it) { - string err; - int coreid = strict_strtol(it->c_str(), 10, &err); - if (err == "") - coreids.push_back(coreid); - else - lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl; - } - -} - -WorkerPool::~WorkerPool() -{ - for (uint64_t i = 0; i < workers.size(); ++i) { - if (workers[i]->is_started()) { - workers[i]->stop(); - workers[i]->join(); - } - delete workers[i]; - } -} - -void WorkerPool::start() -{ - if (!started) { - for (uint64_t i = 0; i < workers.size(); ++i) { - pending++; - workers[i]->create("ms_async_worker"); - } - started = true; + worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); + listen_socket.abort_accept(); + }, false); } - while (pending) - usleep(50); } -Worker* WorkerPool::get_worker() -{ - ldout(cct, 10) << __func__ << dendl; - - // start with some reasonably large number - unsigned min_load = std::numeric_limits::max(); - Worker* current_best = nullptr; - - simple_spin_lock(&pool_spin); - // find worker with least references - // tempting case is returning on references == 0, but in reality - // this will happen so rarely that there's no need for special case. - for (auto p = workers.begin(); p != workers.end(); ++p) { - unsigned worker_load = (*p)->references.load(); - ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl; - if (worker_load < min_load) { - current_best = *p; - min_load = worker_load; - } - } - // if minimum load exceeds amount of workers, make a new worker - // logic behind this is that we're not going to create new worker - // just because others have *some* load, we'll defer worker creation - // until others have *plenty* of load. This will cause new worker - // to get assigned to all new connections *unless* one or more - // of workers get their load reduced - in that case, this worker - // will be assigned to new connection. - // TODO: add more logic and heuristics, so connections known to be - // of light workload (heartbeat service, etc.) won't overshadow - // heavy workload (clients, etc). - if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads) - && (min_load > workers.size()))) { - ldout(cct, 20) << __func__ << " creating worker" << dendl; - current_best = new Worker(cct, this, workers.size()); - workers.push_back(current_best); - pending++; - current_best->create("ms_async_worker"); - } else { - ldout(cct, 20) << __func__ << " picked " << current_best - << " as best worker with load " << min_load << dendl; +struct StackSingleton { + std::shared_ptr stack; + StackSingleton(CephContext *c) { + stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); } - - ++current_best->references; - simple_spin_unlock(&pool_spin); - - while (pending) - usleep(50); - assert(current_best); - return current_best; -} - -void WorkerPool::barrier() -{ - ldout(cct, 10) << __func__ << " started." << dendl; - for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { - barrier_count.inc(); - (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); + ~StackSingleton() { + stack->stop(); } - ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl; - Mutex::Locker l(barrier_lock); - while (barrier_count.read()) - barrier_cond.Wait(barrier_lock); +}; - ldout(cct, 10) << __func__ << " end." << dendl; -} class C_handle_reap : public EventCallback { AsyncMessenger *msgr; @@ -509,7 +260,6 @@ class C_handle_reap : public EventCallback { AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce, uint64_t features) : SimplePolicyMessenger(cct, name,mname, _nonce), - processor(this, cct, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), nonce(_nonce), need_addr(true), did_bind(false), @@ -517,13 +267,20 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); - // uniq name for CephContext to distinguish from other objects - cct->lookup_or_create_singleton_object(pool, "AsyncMessenger::WorkerPool"); - local_worker = pool->get_worker(); + StackSingleton *single; + cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack"); + stack = single->stack.get(); + stack->start(); + local_worker = stack->get_worker(); local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); + unsigned processor_num = 1; + if (stack->support_local_listen_table()) + processor_num = stack->get_num_worker(); + for (unsigned i = 0; i < processor_num; ++i) + processors.push_back(new Processor(this, stack->get_worker(i), cct, _nonce)); } /** @@ -535,6 +292,8 @@ AsyncMessenger::~AsyncMessenger() delete reap_handler; assert(!did_bind); // either we didn't bind or we shut down the Processor local_connection->mark_down(); + for (auto &&p : processors) + delete p; } void AsyncMessenger::ready() @@ -542,9 +301,8 @@ void AsyncMessenger::ready() ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; Mutex::Locker l(lock); - pool->start(); - Worker *w = pool->get_worker(); - processor.start(w); + for (auto &&p : processors) + p->start(); dispatch_queue.start(); } @@ -552,17 +310,18 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + // done! clean up. + for (auto &&p : processors) + p->stop(); mark_down_all(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); - // done! clean up. - processor.stop(); did_bind = false; lock.Lock(); stop_cond.Signal(); stopped = true; lock.Unlock(); - pool->barrier(); + stack->drain(); return 0; } @@ -580,7 +339,25 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // bind to a socket set avoid_ports; - int r = processor.bind(bind_addr, avoid_ports); + int r = 0; + unsigned i = 0; + for (auto &&p : processors) { + r = p->bind(bind_addr, avoid_ports); + if (r < 0) { + // Note: this is related to local tcp listen table problem. + // Posix(default kernel implementation) backend shares listen table + // in the kernel, so all threads can use the same listen table naturally + // and only one thread need to bind. But other backends(like dpdk) uses local + // listen table, we need to bind/listen tcp port for each worker. So if the + // first worker failed to bind, it could be think the normal error then handle + // it, like port is used case. But if the first worker successfully to bind + // but the second worker failed, it's not expected and we need to assert + // here + assert(i == 0); + break; + } + ++i; + } if (r >= 0) did_bind = true; return r; @@ -591,12 +368,20 @@ int AsyncMessenger::rebind(const set& avoid_ports) ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; assert(did_bind); - processor.stop(); + for (auto &&p : processors) + p->stop(); mark_down_all(); - int r = processor.rebind(avoid_ports); - if (r == 0) { - Worker *w = pool->get_worker(); - processor.start(w); + unsigned i = 0; + int r = 0; + for (auto &&p : processors) { + r = p->rebind(avoid_ports); + if (r == 0) { + p->start(); + } else { + assert(i == 0); + break; + } + i++; } return r; } @@ -644,19 +429,20 @@ void AsyncMessenger::wait() // close all connections shutdown_connections(false); - pool->barrier(); + stack->drain(); ldout(cct, 10) << __func__ << ": done." << dendl; ldout(cct, 1) << __func__ << " complete." << dendl; started = false; } -AsyncConnectionRef AsyncMessenger::add_accept(int sd) +AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) { lock.Lock(); - Worker *w = pool->get_worker(); + if (!stack->support_local_listen_table()) + w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); - conn->accept(sd); + conn->accept(std::move(cli_socket), addr); accepting_conns.insert(conn); lock.Unlock(); return conn; @@ -671,7 +457,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int << ", creating connection and registering" << dendl; // create connection - Worker *w = pool->get_worker(); + Worker *w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8125cff2f91..47898f49d91 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,70 +36,9 @@ using namespace std; #include "include/assert.h" #include "AsyncConnection.h" #include "Event.h" -#include "common/simple_spin.h" class AsyncMessenger; -class WorkerPool; - -enum { - l_msgr_first = 94000, - l_msgr_recv_messages, - l_msgr_send_messages, - l_msgr_send_messages_inline, - l_msgr_recv_bytes, - l_msgr_send_bytes, - l_msgr_created_connections, - l_msgr_active_connections, - l_msgr_last, -}; - - -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber, i); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } -}; /** * If the Messenger binds to a specific address, the Processor runs @@ -109,20 +48,20 @@ class Processor { AsyncMessenger *msgr; NetHandler net; Worker *worker; - int listen_sd; + ServerSocket listen_socket; uint64_t nonce; EventCallbackRef listen_handler; class C_processor_accept; public: - Processor(AsyncMessenger *r, CephContext *c, uint64_t n); + Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n); ~Processor() { delete listen_handler; }; void stop(); int bind(const entity_addr_t &bind_addr, const set& avoid_ports); int rebind(const set& avoid_port); - void start(Worker *w); + void start(); void accept(); }; @@ -276,9 +215,8 @@ private: private: static const uint64_t ReapDeadConnectionThreshold = 5; - WorkerPool *pool; - - Processor processor; + NetworkStack *stack; + std::vector processors; friend class Processor; DispatchQueue dispatch_queue; @@ -413,7 +351,7 @@ public: } void learned_addr(const entity_addr_t &peer_addr_for_me); - AsyncConnectionRef add_accept(int sd); + AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); /** * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.