]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/AsyncConnection: support NetworkStack api instead of posix
authorHaomai Wang <haomai@xsky.com>
Wed, 13 Jul 2016 07:59:12 +0000 (15:59 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 16 Aug 2016 15:17:55 +0000 (23:17 +0800)
1. replace sd to ConnectedSocket
2. Replace WorkerPool with Stack
3. Use Stack worker

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/common/config_opts.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 72cee895d64e742dbed67d7158f52a59977b3e9f..b42bab3ffdd1ce429aab8cf92d3a41a094bfb726 100644 (file)
@@ -197,8 +197,9 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
 OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0)   // seconds
 OPTION(ms_dump_on_send, OPT_BOOL, false)           // hexdump msg to log on send
 OPTION(ms_dump_corrupt_message_level, OPT_INT, 1)  // debug level to hexdump undecodeable messages at
-OPTION(ms_async_op_threads, OPT_INT, 3)            // number of worker processing threads for async messenger created on init
-OPTION(ms_async_max_op_threads, OPT_INT, 5)        // max number of worker processing threads for async messenger
+OPTION(ms_async_transport_type, OPT_STR, "posix")
+OPTION(ms_async_op_threads, OPT_U64, 3)            // number of worker processing threads for async messenger created on init
+OPTION(ms_async_max_op_threads, OPT_U64, 5)        // max number of worker processing threads for async messenger
 OPTION(ms_async_set_affinity, OPT_BOOL, true)
 // example: ms_async_affinity_cores = 0,1
 // The number of coreset is expected to equal to ms_async_op_threads, otherwise
index 80756a0e2782593ea17af4a760f7628c94ee3ff6..5012e92df0328b6164c6d93677ed9b4385038125 100644 (file)
@@ -14,8 +14,6 @@
  *
  */
 
-#include <sys/types.h>
-#include <sys/socket.h>
 #include <unistd.h>
 
 #include "include/Context.h"
@@ -23,8 +21,6 @@
 #include "AsyncMessenger.h"
 #include "AsyncConnection.h"
 
-#include "include/sock_compat.h"
-
 // Constant to limit starting sequence number to 2^31.  Nothing special about it, just a big number.  PLR
 #define SEQ_MASK  0x7fffffff 
 
@@ -32,8 +28,9 @@
 #undef dout_prefix
 #define dout_prefix _conn_prefix(_dout)
 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
+  int fd = cs ? cs.fd() : -1;
   return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
-                << " sd=" << sd << " :" << port
+                << " sd=" << fd << " :" << port
                 << " s=" << get_state_name(state)
                 << " pgs=" << peer_global_seq
                 << " cs=" << connect_seq
@@ -122,7 +119,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
                                  Worker *w)
   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
+    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
     dispatch_queue(q), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), recv_buf(NULL),
     recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
@@ -130,7 +127,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     last_active(ceph::coarse_mono_clock::now()),
     inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
     got_bad_auth(false), authorizer(NULL), replacing(false),
-    is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
+    is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
     worker(w), center(&w->center)
 {
   read_handler = new C_handle_read(this);
@@ -167,203 +164,52 @@ void AsyncConnection::maybe_start_delay_thread()
 
 /* return -1 means `fd` occurs error or closed, it should be closed
  * return 0 means EAGAIN or EINTR */
-ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len)
+ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
 {
   ssize_t nread;
  again:
-  nread = ::read(fd, buf, len);
-  if (nread == -1) {
-    if (errno == EAGAIN) {
+  nread = cs.read(buf, len);
+  if (nread < 0) {
+    if (nread == -EAGAIN) {
       nread = 0;
-    } else if (errno == EINTR) {
+    } else if (nread == -EINTR) {
       goto again;
     } else {
-      ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd
-                          << " : "<< strerror(errno) << dendl;
+      ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
+                          << " : "<< strerror(nread) << dendl;
       return -1;
     }
   } else if (nread == 0) {
     ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
-                              << fd << dendl;
+                              << cs.fd() << dendl;
     return -1;
   }
   return nread;
 }
 
-/* 
- SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
-  http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html 
-  http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html 
-*/
-void AsyncConnection::suppress_sigpipe()
-{
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
-  /*
-    We want to ignore possible SIGPIPE that we can generate on write.
-    SIGPIPE is delivered *synchronously* and *only* to the thread
-    doing the write.  So if it is reported as already pending (which
-    means the thread blocks it), then we do nothing: if we generate
-    SIGPIPE, it will be merged with the pending one (there's no
-    queuing), and that suits us well.  If it is not pending, we block
-    it in this thread (and we avoid changing signal action, because it
-    is per-process).
-  */
-  sigset_t pending;
-  sigemptyset(&pending);
-  sigpending(&pending);
-  sigpipe_pending = sigismember(&pending, SIGPIPE);
-  if (!sigpipe_pending) {
-    sigset_t blocked;
-    sigemptyset(&blocked);
-    pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
-
-    /* Maybe is was blocked already?  */
-    sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
-  }
-#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
-}
-
-
-void AsyncConnection::restore_sigpipe()
-{
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
-  /*
-    If SIGPIPE was pending already we do nothing.  Otherwise, if it
-    become pending (i.e., we generated it), then we sigwait() it (thus
-    clearing pending status).  Then we unblock SIGPIPE, but only if it
-    were us who blocked it.
-  */
-  if (!sigpipe_pending) {
-    sigset_t pending;
-    sigemptyset(&pending);
-    sigpending(&pending);
-    if (sigismember(&pending, SIGPIPE)) {
-      /*
-        Protect ourselves from a situation when SIGPIPE was sent
-        by the user to the whole process, and was delivered to
-        other thread before we had a chance to wait for it.
-      */
-      static const struct timespec nowait = { 0, 0 };
-      TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
-    }
-
-    if (sigpipe_unblock)
-      pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
-  }
-#endif  /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
-}
-
-// return the length of msg needed to be sent,
-// < 0 means error occured
-ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more)
-{
-  suppress_sigpipe();
-
-  while (len > 0) {
-    ssize_t r;
-#if defined(MSG_NOSIGNAL)
-    r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
-#else
-    r = ::sendmsg(sd, &msg, (more ? MSG_MORE : 0));
-#endif /* defined(MSG_NOSIGNAL) */
-
-    if (r == 0) {
-      ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl;
-    } else if (r < 0) {
-      if (errno == EINTR) {
-        continue;
-      } else if (errno == EAGAIN) {
-        break;
-      } else {
-        ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl;
-        restore_sigpipe();
-        return r;
-      }
-    }
-
-    len -= r;
-    if (len == 0) break;
-
-    // hrmph. drain r bytes from the front of our message.
-    ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl;
-    while (r > 0) {
-      if (msg.msg_iov[0].iov_len <= (size_t)r) {
-        // drain this whole item
-        r -= msg.msg_iov[0].iov_len;
-        msg.msg_iov++;
-        msg.msg_iovlen--;
-      } else {
-        msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
-        msg.msg_iov[0].iov_len -= r;
-        break;
-      }
-    }
-  }
-  restore_sigpipe();
-  return (ssize_t)len;
-}
-
 // return the remaining bytes, it may larger than the length of ptr
 // else return < 0 means error
 ssize_t AsyncConnection::_try_send(bool more)
 {
-  if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+  if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
     if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
       ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
-      ::shutdown(sd, SHUT_RDWR);
-    }
-  }
-
-  uint64_t sent_bytes = 0;
-  list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
-  uint64_t left_pbrs = outcoming_bl.buffers().size();
-  while (left_pbrs) {
-    struct msghdr msg;
-    uint64_t size = MIN(left_pbrs, ASYNC_IOV_MAX);
-    left_pbrs -= size;
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_iovlen = 0;
-    msg.msg_iov = msgvec;
-    unsigned msglen = 0;
-    while (size > 0) {
-      msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
-      msgvec[msg.msg_iovlen].iov_len = pb->length();
-      msg.msg_iovlen++;
-      msglen += pb->length();
-      ++pb;
-      size--;
+      cs.shutdown();
     }
-
-    ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more);
-    if (r < 0)
-      return r;
-
-    // "r" is the remaining length
-    sent_bytes += msglen - r;
-    if (r > 0) {
-      ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
-                          << " needed to be sent, creating event for writing"
-                          << dendl;
-      break;
-    }
-    // only "r" == 0 continue
   }
 
-  // trim already sent for outcoming_bl
-  if (sent_bytes) {
-    if (sent_bytes < outcoming_bl.length()) {
-      outcoming_bl.splice(0, sent_bytes);
-    } else {
-      outcoming_bl.clear();
-    }
+  ssize_t r = cs.send(outcoming_bl, more);
+  if (r < 0) {
+    ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
+    return r;
   }
 
-  ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes
+  ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
                              << " remaining bytes " << outcoming_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
     if (center->in_thread()) {
-      center->create_file_event(sd, EVENT_WRITABLE, write_handler);
+      center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
       open_write = true;
     } else {
       center->dispatch_event_external(write_handler);
@@ -372,7 +218,7 @@ ssize_t AsyncConnection::_try_send(bool more)
 
   if (open_write && !is_queued()) {
     if (center->in_thread()) {
-      center->delete_file_event(sd, EVENT_WRITABLE);
+      center->delete_file_event(cs.fd(), EVENT_WRITABLE);
       open_write = false;
     } else {
       center->dispatch_event_external(write_handler);
@@ -398,10 +244,10 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
   ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
                              << state_offset << dendl;
 
-  if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+  if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
     if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
       ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
-      ::shutdown(sd, SHUT_RDWR);
+      cs.shutdown();
     }
   }
 
@@ -426,7 +272,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
   if (len > recv_max_prefetch) {
     /* this was a large read, we don't prefetch for these */
     do {
-      r = read_bulk(sd, p+state_offset, left);
+      r = read_bulk(p+state_offset, left);
       ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
@@ -440,7 +286,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
     } while (r > 0);
   } else {
     do {
-      r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch);
+      r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
       ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
                                  << " left is " << left << " got " << r << dendl;
       if (r < 0) {
@@ -1011,35 +857,34 @@ ssize_t AsyncConnection::_process_connection()
 
         global_seq = async_msgr->get_global_seq();
         // close old socket.  this is safe because we stopped the reader thread above.
-        if (sd >= 0) {
-          center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-          ::close(sd);
+        if (cs) {
+          center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
+          cs.close();
         }
 
-        sd = net.nonblock_connect(get_peer_addr());
-        if (sd < 0) {
+        SocketOptions opts;
+        r = worker->connect(get_peer_addr(), opts, &cs);
+        if (r < 0)
           goto fail;
-        }
 
-        center->create_file_event(sd, EVENT_READABLE, read_handler);
+        center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
         state = STATE_CONNECTING_RE;
         break;
       }
 
     case STATE_CONNECTING_RE:
       {
-        r = net.reconnect(get_peer_addr(), sd);
+        r = cs.is_connected();
         if (r < 0) {
           ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
           goto fail;
-        } else if (r > 0) {
+        } else if (r == 0) {
           ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
-          center->create_file_event(sd, EVENT_WRITABLE, read_handler);
+          center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
           break;
         }
 
-        center->delete_file_event(sd, EVENT_WRITABLE);
-        net.set_priority(sd, async_msgr->get_socket_priority());
+        center->delete_file_event(cs.fd(), EVENT_WRITABLE);
         ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl;
 
         bufferlist bl;
@@ -1092,7 +937,7 @@ ssize_t AsyncConnection::_process_connection()
           goto fail;
         }
         ldout(async_msgr->cct, 20) << __func__ <<  " connect read peer addr "
-                             << paddr << " on socket " << sd << dendl;
+                             << paddr << " on socket " << cs.fd() << dendl;
         if (peer_addr != paddr) {
           if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
               peer_addr.get_nonce() == paddr.get_nonce()) {
@@ -1139,7 +984,7 @@ ssize_t AsyncConnection::_process_connection()
               << async_msgr->get_myaddr() << dendl;
         } else {
           ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, "
-              << cpp_strerror(errno) << dendl;
+              << cpp_strerror(r) << dendl;
           goto fail;
         }
 
@@ -1185,7 +1030,7 @@ ssize_t AsyncConnection::_process_connection()
           ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl;
         } else {
           ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply "
-              << cpp_strerror(errno) << dendl;
+              << cpp_strerror(r) << dendl;
           goto fail;
         }
 
@@ -1340,30 +1185,14 @@ ssize_t AsyncConnection::_process_connection()
     case STATE_ACCEPTING:
       {
         bufferlist bl;
-
-        if (net.set_nonblock(sd) < 0)
-          goto fail;
-
-        net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf);
-        net.set_priority(sd, async_msgr->get_socket_priority());
-        center->create_file_event(sd, EVENT_READABLE, read_handler);
+        center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
 
         bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
 
         ::encode(async_msgr->get_myaddr(), bl, 0); // legacy
         port = async_msgr->get_myaddr().get_port();
-        // and peer's socket addr (they might not know their ip)
-       sockaddr_storage ss;
-        socklen_t len = sizeof(ss);
-        r = ::getpeername(sd, (sockaddr*)&ss, &len);
-        if (r < 0) {
-          ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername "
-                              << cpp_strerror(errno) << dendl;
-          goto fail;
-        }
-       socket_addr.set_sockaddr((sockaddr*)&ss);
         ::encode(socket_addr, bl, 0); // legacy
-        ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl;
+        ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl;
 
         r = try_send(bl);
         if (r == 0) {
@@ -1813,7 +1642,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
       existing->is_reset_from_peer = true;
     }
 
-    center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+    center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
 
     // Clean up output buffer
     existing->outcoming_bl.clear();
@@ -1824,13 +1653,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     existing->requeue_sent();
     existing->reset_recv_state();
 
-    int new_fd = sd;
+    auto temp_cs = std::move(cs);
     EventCenter *new_center = center;
     Worker *new_worker = worker;
     // avoid _stop shutdown replacing socket
-    sd = -1;
     // queue a reset on the new connection, which we're dumping for the old
     _stop();
+
     dispatch_queue->queue_reset(this);
     ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
     existing->can_write = WriteStatus::REPLACING;
@@ -1844,25 +1673,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     // there shouldn't exist any buffer
     assert(recv_start == recv_end);
 
-    existing->write_lock.unlock();
-    // new sd now isn't registered any event while origin events
-    // have been deleted.
-    // previous existing->sd now is still open, event will continue to
-    // notify previous existing->center from now.
-    // From now, no one will dispatch event to `existing`
-    // Note: we must use async dispatch instead of execute this inline
-    // even existing->center == center. Because we must ensure below
-    // event executed after all pending external events like
-    // "dispatch_state->queue"
-    existing->center->submit_to(
-        existing->center->get_id(),
-        [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
+    auto deactivate_existing = std::bind(
+        [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
       // we need to delete time event in original thread
       {
         std::lock_guard<std::mutex> l(existing->lock);
         if (existing->state == STATE_NONE) {
           existing->shutdown_socket();
-          existing->sd = new_fd;
+          existing->cs = std::move(cs);
           existing->worker->references--;
           new_worker->references++;
           existing->logger = new_worker->get_perf_counter();
@@ -1871,7 +1689,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
           if (existing->delay_state)
             existing->delay_state->set_center(new_center);
         } else if (existing->state == STATE_CLOSED) {
-          ::close(new_fd);
+          cs.close();
           return ;
         } else {
           assert(0);
@@ -1881,15 +1699,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
       // Before changing existing->center, it may already exists some events in existing->center's queue.
       // Then if we mark down `existing`, it will execute in another thread and clean up connection.
       // Previous event will result in segment fault
-      auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+      auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable {
         std::lock_guard<std::mutex> l(existing->lock);
         if (existing->state == STATE_CLOSED)
           return ;
-        assert(new_fd == existing->sd);
         assert(existing->state == STATE_NONE);
-
+  
         existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
-        existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler);
+        existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
         reply.global_seq = existing->peer_global_seq;
         if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
           // handle error
@@ -1901,8 +1718,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
       else
         existing->center->submit_to(
             existing->center->get_id(), std::move(transfer_existing), true);
-    }, true);
+    }, std::move(temp_cs));
 
+    existing->center->submit_to(
+        existing->center->get_id(), std::move(deactivate_existing), true);
+    existing->write_lock.unlock();
     existing->lock.unlock();
     return 0;
   }
@@ -2012,13 +1832,14 @@ void AsyncConnection::_connect()
   center->dispatch_event_external(read_handler);
 }
 
-void AsyncConnection::accept(int incoming)
+void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
 {
-  ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
-  assert(sd < 0);
+  ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
+  assert(socket.fd() > 0);
 
   std::lock_guard<std::mutex> l(lock);
-  sd = incoming;
+  cs = std::move(socket);
+  socket_addr = addr;
   state = STATE_ACCEPTING;
   // rescheduler connection in order to avoid lock dep
   center->dispatch_event_external(read_handler);
@@ -2407,7 +2228,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
   ssize_t rc = _try_send(more);
   if (rc < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
-                              << cpp_strerror(errno) << dendl;
+                              << cpp_strerror(rc) << dendl;
   } else if (rc == 0) {
     ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
   } else {
@@ -2617,7 +2438,7 @@ void AsyncConnection::handle_write()
     if (state == STATE_STANDBY && !policy.server && is_queued()) {
       ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
       _connect();
-    } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
+    } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
       r = _try_send();
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
index 3988bf9a7447b051a5d7507975be8cf4475f738f..9d6c3435eef64468ba05b17e4baa8261ddfb1a19 100644 (file)
@@ -34,7 +34,7 @@ using namespace std;
 #include "msg/Messenger.h"
 
 #include "Event.h"
-#include "net_handler.h"
+#include "Stack.h"
 
 class AsyncMessenger;
 class Worker;
@@ -50,9 +50,7 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
  */
 class AsyncConnection : public Connection {
 
-  ssize_t read_bulk(int fd, char *buf, unsigned len);
-  void suppress_sigpipe();
-  void restore_sigpipe();
+  ssize_t read_bulk(char *buf, unsigned len);
   ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
   ssize_t try_send(bufferlist &bl, bool more=false) {
     std::lock_guard<std::mutex> l(write_lock);
@@ -110,11 +108,10 @@ class AsyncConnection : public Connection {
       center->delete_time_event(last_tick_id);
       last_tick_id = 0;
     }
-    if (sd >= 0) {
-      center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-      ::shutdown(sd, SHUT_RDWR);
-      ::close(sd);
-      sd = -1;
+    if (cs) {
+      center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
+      cs.shutdown();
+      cs.close();
     }
   }
   Message *_get_next_outgoing(bufferlist *bl) {
@@ -209,7 +206,7 @@ class AsyncConnection : public Connection {
     _connect();
   }
   // Only call when AsyncConnection first construct
-  void accept(int sd);
+  void accept(ConnectedSocket socket, entity_addr_t &addr);
   int send_message(Message *m) override;
 
   void send_keepalive() override;
@@ -303,7 +300,7 @@ class AsyncConnection : public Connection {
   atomic64_t ack_left, in_seq;
   int state;
   int state_after_send;
-  int sd;
+  ConnectedSocket cs;
   int port;
   Messenger::Policy policy;
 
@@ -372,17 +369,10 @@ class AsyncConnection : public Connection {
   char *state_buffer;
   // used only by "read_until"
   uint64_t state_offset;
-  NetHandler net;
   Worker *worker;
   EventCenter *center;
   ceph::shared_ptr<AuthSessionHandler> session_security;
 
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
-  sigset_t sigpipe_mask;
-  bool sigpipe_pending;
-  bool sigpipe_unblock;
-#endif
-
  public:
   // used by eventcallback
   void handle_write();
index 53eff733e2703af7b7b48e60a260471544919db8..95ec0ebd1409f30ce42958f94e8f551d3d576087 100644 (file)
 
 #include "acconfig.h"
 
-#include <errno.h>
 #include <iostream>
 #include <fstream>
 
 #include "AsyncMessenger.h"
 
-#include "include/str_list.h"
-#include "common/strtol.h"
 #include "common/config.h"
 #include "common/Timer.h"
 #include "common/errno.h"
-#include "auth/Crypto.h"
-#include "include/Spinlock.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -41,14 +36,6 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) {
   return *_dout << " Processor -- ";
 }
 
-static ostream& _prefix(std::ostream *_dout, Worker *w) {
-  return *_dout << " Worker -- ";
-}
-
-static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
-  return *_dout << " WorkerPool -- ";
-}
-
 
 /*******************
  * Processor
@@ -64,13 +51,9 @@ class Processor::C_processor_accept : public EventCallback {
   }
 };
 
-Processor::Processor(AsyncMessenger *r, CephContext *c, uint64_t n)
-  : msgr(r),
-  net(c),
-  worker(NULL),
-  listen_sd(-1),
-  nonce(n),
-  listen_handler(new C_processor_accept(this)) {}
+Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n)
+  : msgr(r), net(c), worker(w), nonce(n),
+    listen_handler(new C_processor_accept(this)) {}
 
 int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
 {
@@ -90,30 +73,16 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
       family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
   }
 
-  /* socket creation */
-  listen_sd = ::socket(family, SOCK_STREAM, 0);
-  if (listen_sd < 0) {
-    lderr(msgr->cct) << __func__ << " unable to create socket: "
-        << cpp_strerror(errno) << dendl;
-    return -errno;
-  }
-
-  int r = net.set_nonblock(listen_sd);
-  if (r < 0) {
-    ::close(listen_sd);
-    listen_sd = -1;
-    return r;
-  }
-  net.set_close_on_exec(listen_sd);
-  net.set_socket_options(listen_sd, msgr->cct->_conf->ms_tcp_nodelay, msgr->cct->_conf->ms_tcp_rcvbuf);
+  SocketOptions opts;
+  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
+  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
 
   // use whatever user specified (if anything)
   entity_addr_t listen_addr = bind_addr;
   listen_addr.set_family(family);
 
   /* bind to port */
-  int rc = -1;
-  r = -1;
+  int r = -1;
 
   for (int i = 0; i < conf->ms_bind_retry_count; i++) {
     if (i > 0) {
@@ -123,22 +92,12 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     }
 
     if (listen_addr.get_port()) {
-      // specific port
-      // reuse addr+port when possible
-      int on = 1;
-      rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
-      if (rc < 0) {
-        lderr(msgr->cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;
-        r = -errno;
-        continue;
-      }
-
-      rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
-                 listen_addr.get_sockaddr_len());
-      if (rc < 0) {
+      worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
+        r = worker->listen(listen_addr, opts, &listen_socket);
+      }, false);
+      if (r < 0) {
         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
-                         << ": " << cpp_strerror(errno) << dendl;
-        r = -errno;
+                         << ": " << cpp_strerror(r) << dendl;
         continue;
       }
     } else {
@@ -148,60 +107,34 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
           continue;
 
         listen_addr.set_port(port);
-        rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
-                   listen_addr.get_sockaddr_len());
-        if (rc == 0)
+        worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
+          r = worker->listen(listen_addr, opts, &listen_socket);
+        }, false);
+        if (r == 0)
           break;
       }
-      if (rc < 0) {
+      if (r < 0) {
         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
                          << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
                          << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
-                         << cpp_strerror(errno) << dendl;
-        r = -errno;
+                         << cpp_strerror(r) << dendl;
         listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
         continue;
       }
       ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
     }
-    if (rc == 0)
+    if (r == 0)
       break;
   }
   // It seems that binding completely failed, return with that exit status
-  if (rc < 0) {
+  if (r < 0) {
     lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
-                     << " attempts: " << cpp_strerror(errno) << dendl;
-    ::close(listen_sd);
-    listen_sd = -1;
+                     << " attempts: " << cpp_strerror(r) << dendl;
     return r;
   }
 
-  // what port did we get?
-  sockaddr_storage ss;
-  socklen_t llen = sizeof(ss);
-  rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
-  if (rc < 0) {
-    rc = -errno;
-    lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
-    ::close(listen_sd);
-    listen_sd = -1;
-    return rc;
-  }
-  listen_addr.set_sockaddr((sockaddr*)&ss);
-
   ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
 
-  // listen!
-  rc = ::listen(listen_sd, 128);
-  if (rc < 0) {
-    rc = -errno;
-    lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
-        << ": " << cpp_strerror(rc) << dendl;
-    ::close(listen_sd);
-    listen_sd = -1;
-    return rc;
-  }
-
   msgr->set_myaddr(bind_addr);
   if (bind_addr != entity_addr_t())
     msgr->learned_addr(bind_addr);
@@ -237,47 +170,48 @@ int Processor::rebind(const set<int>& avoid_ports)
   return bind(addr, new_avoid);
 }
 
-void Processor::start(Worker *w)
+void Processor::start()
 {
-  ldout(msgr->cct, 1) << __func__ << " " << dendl;
+  ldout(msgr->cct, 1) << __func__ << dendl;
 
   // start thread
-  if (listen_sd >= 0) {
-    worker = w;
+  if (listen_socket) {
     worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); });
+      worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
   }
 }
 
 void Processor::accept()
 {
-  ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
+  ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
+  SocketOptions opts;
+  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
+  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
   while (true) {
-    sockaddr_storage ss;
-    socklen_t slen = sizeof(ss);
-    int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
-    if (sd >= 0) {
-      net.set_close_on_exec(sd);
-      ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
-
-      msgr->add_accept(sd);
+    entity_addr_t addr;
+    ConnectedSocket cli_socket;
+    int r = listen_socket.accept(&cli_socket, opts, &addr);
+    if (r == 0) {
+      ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
+
+      msgr->add_accept(worker, std::move(cli_socket), addr);
       continue;
     } else {
-      if (errno == EINTR) {
+      if (r == -EINTR) {
         continue;
-      } else if (errno == EAGAIN) {
+      } else if (r == -EAGAIN) {
         break;
-      } else if (errno == EMFILE || errno == ENFILE) {
-        lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd
-                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      } else if (r == -EMFILE || r == -ENFILE) {
+        lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
+                         << " errno " << r << " " << cpp_strerror(r) << dendl;
         break;
-      } else if (errno == ECONNABORTED) {
-        ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd
-                            << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      } else if (r == -ECONNABORTED) {
+        ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
+                            << " errno " << r << " " << cpp_strerror(r) << dendl;
         continue;
       } else {
-        lderr(msgr->cct) << __func__ << " no incoming connection?  sd = " << sd
-                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+        lderr(msgr->cct) << __func__ << " no incoming connection?"
+                         << " errno " << r << " " << cpp_strerror(r) << dendl;
         break;
       }
     }
@@ -288,208 +222,25 @@ void Processor::stop()
 {
   ldout(msgr->cct,10) << __func__ << dendl;
 
-  if (listen_sd >= 0) {
+  if (listen_socket) {
     worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.delete_file_event(listen_sd, EVENT_READABLE);
-      ::shutdown(listen_sd, SHUT_RDWR);
-      ::close(listen_sd);
-      listen_sd = -1;
-    });
-  }
-}
-
-void Worker::stop()
-{
-  ldout(cct, 10) << __func__ << dendl;
-  done = true;
-  center.wakeup();
-}
-
-class WorkerPool {
-  CephContext *cct;
-  vector<Worker*> workers;
-  vector<int> coreids;
-  // Used to indicate whether thread started
-  bool started;
-  Mutex barrier_lock;
-  Cond barrier_cond;
-  atomic_t barrier_count;
-  simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
-
-  class C_barrier : public EventCallback {
-    WorkerPool *pool;
-    public:
-    explicit C_barrier(WorkerPool *p): pool(p) {}
-    void do_request(int id) {
-      Mutex::Locker l(pool->barrier_lock);
-      pool->barrier_count.dec();
-      pool->barrier_cond.Signal();
-      delete this;
-    }
-  };
-  friend class C_barrier;
-  public:
-  std::atomic_uint pending;
-  explicit WorkerPool(CephContext *c);
-  WorkerPool(const WorkerPool &) = delete;
-  WorkerPool& operator=(const WorkerPool &) = delete;
-  virtual ~WorkerPool();
-  void start();
-  Worker *get_worker();
-  int get_cpuid(int id) {
-    if (coreids.empty())
-      return -1;
-    return coreids[id % coreids.size()];
-  }
-  void barrier();
-};
-
-void *Worker::entry()
-{
-  ldout(cct, 10) << __func__ << " starting" << dendl;
-  if (cct->_conf->ms_async_set_affinity) {
-    int cid = pool->get_cpuid(id);
-    if (cid >= 0 && set_affinity(cid)) {
-      ldout(cct, 0) << __func__ << " sched_setaffinity failed: "
-                    << cpp_strerror(errno) << dendl;
-    }
-  }
-
-  center.set_owner();
-  pool->pending--;
-  while (!done) {
-    ldout(cct, 20) << __func__ << " calling event process" << dendl;
-
-    int r = center.process_events(EventMaxWaitUs);
-    if (r < 0) {
-      ldout(cct, 20) << __func__ << " process events failed: "
-          << cpp_strerror(errno) << dendl;
-      // TODO do something?
-    }
-  }
-
-  return 0;
-}
-
-/*******************
- * WorkerPool
- *******************/
-WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
-                                        barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
-                                        barrier_count(0), pending(0)
-{
-  assert(cct->_conf->ms_async_op_threads > 0);
-  // make sure user won't try to force some crazy number of worker threads
-  assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && 
-         cct->_conf->ms_async_op_threads <= 32);
-  for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
-    Worker *w = new Worker(cct, this, i);
-    workers.push_back(w);
-  }
-  vector<string> corestrs;
-  get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
-  for (vector<string>::iterator it = corestrs.begin();
-       it != corestrs.end(); ++it) {
-    string err;
-    int coreid = strict_strtol(it->c_str(), 10, &err);
-    if (err == "")
-      coreids.push_back(coreid);
-    else
-      lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
-  }
-
-}
-
-WorkerPool::~WorkerPool()
-{
-  for (uint64_t i = 0; i < workers.size(); ++i) {
-    if (workers[i]->is_started()) {
-      workers[i]->stop();
-      workers[i]->join();
-    }
-    delete workers[i];
-  }
-}
-
-void WorkerPool::start()
-{
-  if (!started) {
-    for (uint64_t i = 0; i < workers.size(); ++i) {
-      pending++;
-      workers[i]->create("ms_async_worker");
-    }
-    started = true;
+      worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
+      listen_socket.abort_accept();
+    }, false);
   }
-  while (pending)
-    usleep(50);
 }
 
-Worker* WorkerPool::get_worker()
-{
-  ldout(cct, 10) << __func__ << dendl;
-
-   // start with some reasonably large number
-  unsigned min_load = std::numeric_limits<int>::max();
-  Worker* current_best = nullptr;
-
-  simple_spin_lock(&pool_spin);
-  // find worker with least references
-  // tempting case is returning on references == 0, but in reality
-  // this will happen so rarely that there's no need for special case.
-  for (auto p = workers.begin(); p != workers.end(); ++p) {
-    unsigned worker_load = (*p)->references.load();
-    ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl;
-    if (worker_load < min_load) {
-      current_best = *p;
-      min_load = worker_load;
-    }
-  }
 
-  // if minimum load exceeds amount of workers, make a new worker
-  // logic behind this is that we're not going to create new worker
-  // just because others have *some* load, we'll defer worker creation
-  // until others have *plenty* of load. This will cause new worker
-  // to get assigned to all new connections *unless* one or more
-  // of workers get their load reduced - in that case, this worker
-  // will be assigned to new connection.
-  // TODO: add more logic and heuristics, so connections known to be
-  // of light workload (heartbeat service, etc.) won't overshadow
-  // heavy workload (clients, etc).
-  if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads)
-      && (min_load > workers.size()))) {
-     ldout(cct, 20) << __func__ << " creating worker" << dendl;
-     current_best = new Worker(cct, this, workers.size());
-     workers.push_back(current_best);
-     pending++;
-     current_best->create("ms_async_worker");
-  } else {
-    ldout(cct, 20) << __func__ << " picked " << current_best 
-                   << " as best worker with load " << min_load << dendl;
+struct StackSingleton {
+  std::shared_ptr<NetworkStack> stack;
+  StackSingleton(CephContext *c) {
+    stack = NetworkStack::create(c, c->_conf->ms_async_transport_type);
   }
-
-  ++current_best->references;
-  simple_spin_unlock(&pool_spin);
-
-  while (pending)
-    usleep(50);
-  assert(current_best);
-  return current_best;
-}
-
-void WorkerPool::barrier()
-{
-  ldout(cct, 10) << __func__ << " started." << dendl;
-  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
-    barrier_count.inc();
-    (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
+  ~StackSingleton() {
+    stack->stop();
   }
-  ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl;
-  Mutex::Locker l(barrier_lock);
-  while (barrier_count.read())
-    barrier_cond.Wait(barrier_lock);
+};
 
-  ldout(cct, 10) << __func__ << " end." << dendl;
-}
 
 class C_handle_reap : public EventCallback {
   AsyncMessenger *msgr;
@@ -509,7 +260,6 @@ class C_handle_reap : public EventCallback {
 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                string mname, uint64_t _nonce, uint64_t features)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
-    processor(this, cct, _nonce),
     dispatch_queue(cct, this, mname),
     lock("AsyncMessenger::lock"),
     nonce(_nonce), need_addr(true), did_bind(false),
@@ -517,13 +267,20 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     cluster_protocol(0), stopped(true)
 {
   ceph_spin_init(&global_seq_lock);
-  // uniq name for CephContext to distinguish from other objects
-  cct->lookup_or_create_singleton_object<WorkerPool>(pool, "AsyncMessenger::WorkerPool");
-  local_worker = pool->get_worker();
+  StackSingleton *single;
+  cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack");
+  stack = single->stack.get();
+  stack->start();
+  local_worker = stack->get_worker();
   local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
   local_features = features;
   init_local_connection();
   reap_handler = new C_handle_reap(this);
+  unsigned processor_num = 1;
+  if (stack->support_local_listen_table())
+    processor_num = stack->get_num_worker();
+  for (unsigned i = 0; i < processor_num; ++i)
+    processors.push_back(new Processor(this, stack->get_worker(i), cct, _nonce));
 }
 
 /**
@@ -535,6 +292,8 @@ AsyncMessenger::~AsyncMessenger()
   delete reap_handler;
   assert(!did_bind); // either we didn't bind or we shut down the Processor
   local_connection->mark_down();
+  for (auto &&p : processors)
+    delete p;
 }
 
 void AsyncMessenger::ready()
@@ -542,9 +301,8 @@ void AsyncMessenger::ready()
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
   Mutex::Locker l(lock);
-  pool->start();
-  Worker *w = pool->get_worker();
-  processor.start(w);
+  for (auto &&p : processors)
+    p->start();
   dispatch_queue.start();
 }
 
@@ -552,17 +310,18 @@ int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
+  // done!  clean up.
+  for (auto &&p : processors)
+    p->stop();
   mark_down_all();
   // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
-   // done!  clean up.
-  processor.stop();
   did_bind = false;
   lock.Lock();
   stop_cond.Signal();
   stopped = true;
   lock.Unlock();
-  pool->barrier();
+  stack->drain();
   return 0;
 }
 
@@ -580,7 +339,25 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
 
   // bind to a socket
   set<int> avoid_ports;
-  int r = processor.bind(bind_addr, avoid_ports);
+  int r = 0;
+  unsigned i = 0;
+  for (auto &&p : processors) {
+    r = p->bind(bind_addr, avoid_ports);
+    if (r < 0) {
+      // Note: this is related to local tcp listen table problem.
+      // Posix(default kernel implementation) backend shares listen table
+      // in the kernel, so all threads can use the same listen table naturally
+      // and only one thread need to bind. But other backends(like dpdk) uses local
+      // listen table, we need to bind/listen tcp port for each worker. So if the
+      // first worker failed to bind, it could be think the normal error then handle
+      // it, like port is used case. But if the first worker successfully to bind
+      // but the second worker failed, it's not expected and we need to assert
+      // here
+      assert(i == 0);
+      break;
+    }
+    ++i;
+  }
   if (r >= 0)
     did_bind = true;
   return r;
@@ -591,12 +368,20 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
   assert(did_bind);
 
-  processor.stop();
+  for (auto &&p : processors)
+    p->stop();
   mark_down_all();
-  int r = processor.rebind(avoid_ports);
-  if (r == 0) {
-    Worker *w = pool->get_worker();
-    processor.start(w);
+  unsigned i = 0;
+  int r = 0;
+  for (auto &&p : processors) {
+    r = p->rebind(avoid_ports);
+    if (r == 0) {
+      p->start();
+    } else {
+      assert(i == 0);
+      break;
+    }
+    i++;
   }
   return r;
 }
@@ -644,19 +429,20 @@ void AsyncMessenger::wait()
 
   // close all connections
   shutdown_connections(false);
-  pool->barrier();
+  stack->drain();
 
   ldout(cct, 10) << __func__ << ": done." << dendl;
   ldout(cct, 1) << __func__ << " complete." << dendl;
   started = false;
 }
 
-AsyncConnectionRef AsyncMessenger::add_accept(int sd)
+AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
 {
   lock.Lock();
-  Worker *w = pool->get_worker();
+  if (!stack->support_local_listen_table())
+    w = stack->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
-  conn->accept(sd);
+  conn->accept(std::move(cli_socket), addr);
   accepting_conns.insert(conn);
   lock.Unlock();
   return conn;
@@ -671,7 +457,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
       << ", creating connection and registering" << dendl;
 
   // create connection
-  Worker *w = pool->get_worker();
+  Worker *w = stack->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
   conn->connect(addr, type);
   assert(!conns.count(addr));
index 8125cff2f91ced8551cd7e346c00d9b1172144c2..47898f49d91d450a6b42be6f0c287e0d4387ded8 100644 (file)
@@ -36,70 +36,9 @@ using namespace std;
 #include "include/assert.h"
 #include "AsyncConnection.h"
 #include "Event.h"
-#include "common/simple_spin.h"
 
 
 class AsyncMessenger;
-class WorkerPool;
-
-enum {
-  l_msgr_first = 94000,
-  l_msgr_recv_messages,
-  l_msgr_send_messages,
-  l_msgr_send_messages_inline,
-  l_msgr_recv_bytes,
-  l_msgr_send_bytes,
-  l_msgr_created_connections,
-  l_msgr_active_connections,
-  l_msgr_last,
-};
-
-
-class Worker : public Thread {
-  static const uint64_t InitEventNumber = 5000;
-  static const uint64_t EventMaxWaitUs = 30000000;
-  CephContext *cct;
-  WorkerPool *pool;
-  bool done;
-  int id;
-  PerfCounters *perf_logger;
-
- public:
-  EventCenter center;
-  std::atomic_uint references;
-  Worker(CephContext *c, WorkerPool *p, int i)
-    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
-    center.init(InitEventNumber, i);
-    char name[128];
-    sprintf(name, "AsyncMessenger::Worker-%d", id);
-    // initialize perf_logger
-    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
-
-    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
-    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
-    plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
-    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
-    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
-    plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
-    plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
-
-    perf_logger = plb.create_perf_counters();
-    cct->get_perfcounters_collection()->add(perf_logger);
-  }
-  ~Worker() {
-    if (perf_logger) {
-      cct->get_perfcounters_collection()->remove(perf_logger);
-      delete perf_logger;
-    }
-  }
-  void *entry();
-  void stop();
-  PerfCounters *get_perf_counter() { return perf_logger; }
-  void release_worker() {
-    int oldref = references.fetch_sub(1);
-    assert(oldref > 0);
-  }
-};
 
 /**
  * If the Messenger binds to a specific address, the Processor runs
@@ -109,20 +48,20 @@ class Processor {
   AsyncMessenger *msgr;
   NetHandler net;
   Worker *worker;
-  int listen_sd;
+  ServerSocket listen_socket;
   uint64_t nonce;
   EventCallbackRef listen_handler;
 
   class C_processor_accept;
 
  public:
-  Processor(AsyncMessenger *r, CephContext *c, uint64_t n);
+  Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n);
   ~Processor() { delete listen_handler; };
 
   void stop();
   int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
   int rebind(const set<int>& avoid_port);
-  void start(Worker *w);
+  void start();
   void accept();
 };
 
@@ -276,9 +215,8 @@ private:
  private:
   static const uint64_t ReapDeadConnectionThreshold = 5;
 
-  WorkerPool *pool;
-
-  Processor processor;
+  NetworkStack *stack;
+  std::vector<Processor*> processors;
   friend class Processor;
   DispatchQueue dispatch_queue;
 
@@ -413,7 +351,7 @@ public:
   }
 
   void learned_addr(const entity_addr_t &peer_addr_for_me);
-  AsyncConnectionRef add_accept(int sd);
+  AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
 
   /**
    * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.