OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
-OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init
-OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger
+OPTION(ms_async_transport_type, OPT_STR, "posix")
+OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init
+OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger
OPTION(ms_async_set_affinity, OPT_BOOL, true)
// example: ms_async_affinity_cores = 0,1
// The number of coreset is expected to equal to ms_async_op_threads, otherwise
*
*/
-#include <sys/types.h>
-#include <sys/socket.h>
#include <unistd.h>
#include "include/Context.h"
#include "AsyncMessenger.h"
#include "AsyncConnection.h"
-#include "include/sock_compat.h"
-
// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
#define SEQ_MASK 0x7fffffff
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
+ int fd = cs ? cs.fd() : -1;
return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
- << " sd=" << sd << " :" << port
+ << " sd=" << fd << " :" << port
<< " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
+ out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
dispatch_queue(q), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), recv_buf(NULL),
recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
last_active(ceph::coarse_mono_clock::now()),
inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
got_bad_auth(false), authorizer(NULL), replacing(false),
- is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
+ is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
worker(w), center(&w->center)
{
read_handler = new C_handle_read(this);
/* return -1 means `fd` occurs error or closed, it should be closed
* return 0 means EAGAIN or EINTR */
-ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len)
+ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
{
ssize_t nread;
again:
- nread = ::read(fd, buf, len);
- if (nread == -1) {
- if (errno == EAGAIN) {
+ nread = cs.read(buf, len);
+ if (nread < 0) {
+ if (nread == -EAGAIN) {
nread = 0;
- } else if (errno == EINTR) {
+ } else if (nread == -EINTR) {
goto again;
} else {
- ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd
- << " : "<< strerror(errno) << dendl;
+ ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
+ << " : "<< strerror(nread) << dendl;
return -1;
}
} else if (nread == 0) {
ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
- << fd << dendl;
+ << cs.fd() << dendl;
return -1;
}
return nread;
}
-/*
- SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
- http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
- http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
-*/
-void AsyncConnection::suppress_sigpipe()
-{
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
- /*
- We want to ignore possible SIGPIPE that we can generate on write.
- SIGPIPE is delivered *synchronously* and *only* to the thread
- doing the write. So if it is reported as already pending (which
- means the thread blocks it), then we do nothing: if we generate
- SIGPIPE, it will be merged with the pending one (there's no
- queuing), and that suits us well. If it is not pending, we block
- it in this thread (and we avoid changing signal action, because it
- is per-process).
- */
- sigset_t pending;
- sigemptyset(&pending);
- sigpending(&pending);
- sigpipe_pending = sigismember(&pending, SIGPIPE);
- if (!sigpipe_pending) {
- sigset_t blocked;
- sigemptyset(&blocked);
- pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
-
- /* Maybe is was blocked already? */
- sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
- }
-#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
-}
-
-
-void AsyncConnection::restore_sigpipe()
-{
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
- /*
- If SIGPIPE was pending already we do nothing. Otherwise, if it
- become pending (i.e., we generated it), then we sigwait() it (thus
- clearing pending status). Then we unblock SIGPIPE, but only if it
- were us who blocked it.
- */
- if (!sigpipe_pending) {
- sigset_t pending;
- sigemptyset(&pending);
- sigpending(&pending);
- if (sigismember(&pending, SIGPIPE)) {
- /*
- Protect ourselves from a situation when SIGPIPE was sent
- by the user to the whole process, and was delivered to
- other thread before we had a chance to wait for it.
- */
- static const struct timespec nowait = { 0, 0 };
- TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
- }
-
- if (sigpipe_unblock)
- pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
- }
-#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
-}
-
-// return the length of msg needed to be sent,
-// < 0 means error occured
-ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more)
-{
- suppress_sigpipe();
-
- while (len > 0) {
- ssize_t r;
-#if defined(MSG_NOSIGNAL)
- r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
-#else
- r = ::sendmsg(sd, &msg, (more ? MSG_MORE : 0));
-#endif /* defined(MSG_NOSIGNAL) */
-
- if (r == 0) {
- ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl;
- } else if (r < 0) {
- if (errno == EINTR) {
- continue;
- } else if (errno == EAGAIN) {
- break;
- } else {
- ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl;
- restore_sigpipe();
- return r;
- }
- }
-
- len -= r;
- if (len == 0) break;
-
- // hrmph. drain r bytes from the front of our message.
- ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl;
- while (r > 0) {
- if (msg.msg_iov[0].iov_len <= (size_t)r) {
- // drain this whole item
- r -= msg.msg_iov[0].iov_len;
- msg.msg_iov++;
- msg.msg_iovlen--;
- } else {
- msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
- msg.msg_iov[0].iov_len -= r;
- break;
- }
- }
- }
- restore_sigpipe();
- return (ssize_t)len;
-}
-
// return the remaining bytes, it may larger than the length of ptr
// else return < 0 means error
ssize_t AsyncConnection::_try_send(bool more)
{
- if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
- ::shutdown(sd, SHUT_RDWR);
- }
- }
-
- uint64_t sent_bytes = 0;
- list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
- uint64_t left_pbrs = outcoming_bl.buffers().size();
- while (left_pbrs) {
- struct msghdr msg;
- uint64_t size = MIN(left_pbrs, ASYNC_IOV_MAX);
- left_pbrs -= size;
- memset(&msg, 0, sizeof(msg));
- msg.msg_iovlen = 0;
- msg.msg_iov = msgvec;
- unsigned msglen = 0;
- while (size > 0) {
- msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
- msgvec[msg.msg_iovlen].iov_len = pb->length();
- msg.msg_iovlen++;
- msglen += pb->length();
- ++pb;
- size--;
+ cs.shutdown();
}
-
- ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more);
- if (r < 0)
- return r;
-
- // "r" is the remaining length
- sent_bytes += msglen - r;
- if (r > 0) {
- ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
- << " needed to be sent, creating event for writing"
- << dendl;
- break;
- }
- // only "r" == 0 continue
}
- // trim already sent for outcoming_bl
- if (sent_bytes) {
- if (sent_bytes < outcoming_bl.length()) {
- outcoming_bl.splice(0, sent_bytes);
- } else {
- outcoming_bl.clear();
- }
+ ssize_t r = cs.send(outcoming_bl, more);
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
+ return r;
}
- ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes
+ ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
<< " remaining bytes " << outcoming_bl.length() << dendl;
if (!open_write && is_queued()) {
if (center->in_thread()) {
- center->create_file_event(sd, EVENT_WRITABLE, write_handler);
+ center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
open_write = true;
} else {
center->dispatch_event_external(write_handler);
if (open_write && !is_queued()) {
if (center->in_thread()) {
- center->delete_file_event(sd, EVENT_WRITABLE);
+ center->delete_file_event(cs.fd(), EVENT_WRITABLE);
open_write = false;
} else {
center->dispatch_event_external(write_handler);
ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
<< state_offset << dendl;
- if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
- ::shutdown(sd, SHUT_RDWR);
+ cs.shutdown();
}
}
if (len > recv_max_prefetch) {
/* this was a large read, we don't prefetch for these */
do {
- r = read_bulk(sd, p+state_offset, left);
+ r = read_bulk(p+state_offset, left);
ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
} while (r > 0);
} else {
do {
- r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch);
+ r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
<< " left is " << left << " got " << r << dendl;
if (r < 0) {
global_seq = async_msgr->get_global_seq();
// close old socket. this is safe because we stopped the reader thread above.
- if (sd >= 0) {
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- ::close(sd);
+ if (cs) {
+ center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
+ cs.close();
}
- sd = net.nonblock_connect(get_peer_addr());
- if (sd < 0) {
+ SocketOptions opts;
+ r = worker->connect(get_peer_addr(), opts, &cs);
+ if (r < 0)
goto fail;
- }
- center->create_file_event(sd, EVENT_READABLE, read_handler);
+ center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
state = STATE_CONNECTING_RE;
break;
}
case STATE_CONNECTING_RE:
{
- r = net.reconnect(get_peer_addr(), sd);
+ r = cs.is_connected();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
goto fail;
- } else if (r > 0) {
+ } else if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
- center->create_file_event(sd, EVENT_WRITABLE, read_handler);
+ center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
break;
}
- center->delete_file_event(sd, EVENT_WRITABLE);
- net.set_priority(sd, async_msgr->get_socket_priority());
+ center->delete_file_event(cs.fd(), EVENT_WRITABLE);
ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl;
bufferlist bl;
goto fail;
}
ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr "
- << paddr << " on socket " << sd << dendl;
+ << paddr << " on socket " << cs.fd() << dendl;
if (peer_addr != paddr) {
if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
peer_addr.get_nonce() == paddr.get_nonce()) {
<< async_msgr->get_myaddr() << dendl;
} else {
ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, "
- << cpp_strerror(errno) << dendl;
+ << cpp_strerror(r) << dendl;
goto fail;
}
ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl;
} else {
ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply "
- << cpp_strerror(errno) << dendl;
+ << cpp_strerror(r) << dendl;
goto fail;
}
case STATE_ACCEPTING:
{
bufferlist bl;
-
- if (net.set_nonblock(sd) < 0)
- goto fail;
-
- net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf);
- net.set_priority(sd, async_msgr->get_socket_priority());
- center->create_file_event(sd, EVENT_READABLE, read_handler);
+ center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
::encode(async_msgr->get_myaddr(), bl, 0); // legacy
port = async_msgr->get_myaddr().get_port();
- // and peer's socket addr (they might not know their ip)
- sockaddr_storage ss;
- socklen_t len = sizeof(ss);
- r = ::getpeername(sd, (sockaddr*)&ss, &len);
- if (r < 0) {
- ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername "
- << cpp_strerror(errno) << dendl;
- goto fail;
- }
- socket_addr.set_sockaddr((sockaddr*)&ss);
::encode(socket_addr, bl, 0); // legacy
- ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl;
+ ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl;
r = try_send(bl);
if (r == 0) {
existing->is_reset_from_peer = true;
}
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
// Clean up output buffer
existing->outcoming_bl.clear();
existing->requeue_sent();
existing->reset_recv_state();
- int new_fd = sd;
+ auto temp_cs = std::move(cs);
EventCenter *new_center = center;
Worker *new_worker = worker;
// avoid _stop shutdown replacing socket
- sd = -1;
// queue a reset on the new connection, which we're dumping for the old
_stop();
+
dispatch_queue->queue_reset(this);
ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
existing->can_write = WriteStatus::REPLACING;
// there shouldn't exist any buffer
assert(recv_start == recv_end);
- existing->write_lock.unlock();
- // new sd now isn't registered any event while origin events
- // have been deleted.
- // previous existing->sd now is still open, event will continue to
- // notify previous existing->center from now.
- // From now, no one will dispatch event to `existing`
- // Note: we must use async dispatch instead of execute this inline
- // even existing->center == center. Because we must ensure below
- // event executed after all pending external events like
- // "dispatch_state->queue"
- existing->center->submit_to(
- existing->center->get_id(),
- [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
+ auto deactivate_existing = std::bind(
+ [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
if (existing->state == STATE_NONE) {
existing->shutdown_socket();
- existing->sd = new_fd;
+ existing->cs = std::move(cs);
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
if (existing->delay_state)
existing->delay_state->set_center(new_center);
} else if (existing->state == STATE_CLOSED) {
- ::close(new_fd);
+ cs.close();
return ;
} else {
assert(0);
// Before changing existing->center, it may already exists some events in existing->center's queue.
// Then if we mark down `existing`, it will execute in another thread and clean up connection.
// Previous event will result in segment fault
- auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+ auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable {
std::lock_guard<std::mutex> l(existing->lock);
if (existing->state == STATE_CLOSED)
return ;
- assert(new_fd == existing->sd);
assert(existing->state == STATE_NONE);
-
+
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
- existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler);
+ existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
reply.global_seq = existing->peer_global_seq;
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
else
existing->center->submit_to(
existing->center->get_id(), std::move(transfer_existing), true);
- }, true);
+ }, std::move(temp_cs));
+ existing->center->submit_to(
+ existing->center->get_id(), std::move(deactivate_existing), true);
+ existing->write_lock.unlock();
existing->lock.unlock();
return 0;
}
center->dispatch_event_external(read_handler);
}
-void AsyncConnection::accept(int incoming)
+void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
{
- ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
- assert(sd < 0);
+ ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
+ assert(socket.fd() > 0);
std::lock_guard<std::mutex> l(lock);
- sd = incoming;
+ cs = std::move(socket);
+ socket_addr = addr;
state = STATE_ACCEPTING;
// rescheduler connection in order to avoid lock dep
center->dispatch_event_external(read_handler);
ssize_t rc = _try_send(more);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
- << cpp_strerror(errno) << dendl;
+ << cpp_strerror(rc) << dendl;
} else if (rc == 0) {
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
} else {
if (state == STATE_STANDBY && !policy.server && is_queued()) {
ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
_connect();
- } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
+ } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
#include "msg/Messenger.h"
#include "Event.h"
-#include "net_handler.h"
+#include "Stack.h"
class AsyncMessenger;
class Worker;
*/
class AsyncConnection : public Connection {
- ssize_t read_bulk(int fd, char *buf, unsigned len);
- void suppress_sigpipe();
- void restore_sigpipe();
+ ssize_t read_bulk(char *buf, unsigned len);
ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
ssize_t try_send(bufferlist &bl, bool more=false) {
std::lock_guard<std::mutex> l(write_lock);
center->delete_time_event(last_tick_id);
last_tick_id = 0;
}
- if (sd >= 0) {
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- ::shutdown(sd, SHUT_RDWR);
- ::close(sd);
- sd = -1;
+ if (cs) {
+ center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
+ cs.shutdown();
+ cs.close();
}
}
Message *_get_next_outgoing(bufferlist *bl) {
_connect();
}
// Only call when AsyncConnection first construct
- void accept(int sd);
+ void accept(ConnectedSocket socket, entity_addr_t &addr);
int send_message(Message *m) override;
void send_keepalive() override;
atomic64_t ack_left, in_seq;
int state;
int state_after_send;
- int sd;
+ ConnectedSocket cs;
int port;
Messenger::Policy policy;
char *state_buffer;
// used only by "read_until"
uint64_t state_offset;
- NetHandler net;
Worker *worker;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;
-#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
- sigset_t sigpipe_mask;
- bool sigpipe_pending;
- bool sigpipe_unblock;
-#endif
-
public:
// used by eventcallback
void handle_write();
#include "acconfig.h"
-#include <errno.h>
#include <iostream>
#include <fstream>
#include "AsyncMessenger.h"
-#include "include/str_list.h"
-#include "common/strtol.h"
#include "common/config.h"
#include "common/Timer.h"
#include "common/errno.h"
-#include "auth/Crypto.h"
-#include "include/Spinlock.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
return *_dout << " Processor -- ";
}
-static ostream& _prefix(std::ostream *_dout, Worker *w) {
- return *_dout << " Worker -- ";
-}
-
-static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
- return *_dout << " WorkerPool -- ";
-}
-
/*******************
* Processor
}
};
-Processor::Processor(AsyncMessenger *r, CephContext *c, uint64_t n)
- : msgr(r),
- net(c),
- worker(NULL),
- listen_sd(-1),
- nonce(n),
- listen_handler(new C_processor_accept(this)) {}
+Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n)
+ : msgr(r), net(c), worker(w), nonce(n),
+ listen_handler(new C_processor_accept(this)) {}
int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
}
- /* socket creation */
- listen_sd = ::socket(family, SOCK_STREAM, 0);
- if (listen_sd < 0) {
- lderr(msgr->cct) << __func__ << " unable to create socket: "
- << cpp_strerror(errno) << dendl;
- return -errno;
- }
-
- int r = net.set_nonblock(listen_sd);
- if (r < 0) {
- ::close(listen_sd);
- listen_sd = -1;
- return r;
- }
- net.set_close_on_exec(listen_sd);
- net.set_socket_options(listen_sd, msgr->cct->_conf->ms_tcp_nodelay, msgr->cct->_conf->ms_tcp_rcvbuf);
+ SocketOptions opts;
+ opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
+ opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
// use whatever user specified (if anything)
entity_addr_t listen_addr = bind_addr;
listen_addr.set_family(family);
/* bind to port */
- int rc = -1;
- r = -1;
+ int r = -1;
for (int i = 0; i < conf->ms_bind_retry_count; i++) {
if (i > 0) {
}
if (listen_addr.get_port()) {
- // specific port
- // reuse addr+port when possible
- int on = 1;
- rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- if (rc < 0) {
- lderr(msgr->cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;
- r = -errno;
- continue;
- }
-
- rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
- listen_addr.get_sockaddr_len());
- if (rc < 0) {
+ worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, opts, &listen_socket);
+ }, false);
+ if (r < 0) {
lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
- << ": " << cpp_strerror(errno) << dendl;
- r = -errno;
+ << ": " << cpp_strerror(r) << dendl;
continue;
}
} else {
continue;
listen_addr.set_port(port);
- rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
- listen_addr.get_sockaddr_len());
- if (rc == 0)
+ worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, opts, &listen_socket);
+ }, false);
+ if (r == 0)
break;
}
- if (rc < 0) {
+ if (r < 0) {
lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
<< " on any port in range " << msgr->cct->_conf->ms_bind_port_min
<< "-" << msgr->cct->_conf->ms_bind_port_max << ": "
- << cpp_strerror(errno) << dendl;
- r = -errno;
+ << cpp_strerror(r) << dendl;
listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
continue;
}
ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
}
- if (rc == 0)
+ if (r == 0)
break;
}
// It seems that binding completely failed, return with that exit status
- if (rc < 0) {
+ if (r < 0) {
lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
- << " attempts: " << cpp_strerror(errno) << dendl;
- ::close(listen_sd);
- listen_sd = -1;
+ << " attempts: " << cpp_strerror(r) << dendl;
return r;
}
- // what port did we get?
- sockaddr_storage ss;
- socklen_t llen = sizeof(ss);
- rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
- if (rc < 0) {
- rc = -errno;
- lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
- ::close(listen_sd);
- listen_sd = -1;
- return rc;
- }
- listen_addr.set_sockaddr((sockaddr*)&ss);
-
ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
- // listen!
- rc = ::listen(listen_sd, 128);
- if (rc < 0) {
- rc = -errno;
- lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
- << ": " << cpp_strerror(rc) << dendl;
- ::close(listen_sd);
- listen_sd = -1;
- return rc;
- }
-
msgr->set_myaddr(bind_addr);
if (bind_addr != entity_addr_t())
msgr->learned_addr(bind_addr);
return bind(addr, new_avoid);
}
-void Processor::start(Worker *w)
+void Processor::start()
{
- ldout(msgr->cct, 1) << __func__ << " " << dendl;
+ ldout(msgr->cct, 1) << __func__ << dendl;
// start thread
- if (listen_sd >= 0) {
- worker = w;
+ if (listen_socket) {
worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); });
+ worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
}
}
void Processor::accept()
{
- ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
+ ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
+ SocketOptions opts;
+ opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
+ opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
while (true) {
- sockaddr_storage ss;
- socklen_t slen = sizeof(ss);
- int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
- if (sd >= 0) {
- net.set_close_on_exec(sd);
- ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
-
- msgr->add_accept(sd);
+ entity_addr_t addr;
+ ConnectedSocket cli_socket;
+ int r = listen_socket.accept(&cli_socket, opts, &addr);
+ if (r == 0) {
+ ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
+
+ msgr->add_accept(worker, std::move(cli_socket), addr);
continue;
} else {
- if (errno == EINTR) {
+ if (r == -EINTR) {
continue;
- } else if (errno == EAGAIN) {
+ } else if (r == -EAGAIN) {
break;
- } else if (errno == EMFILE || errno == ENFILE) {
- lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ } else if (r == -EMFILE || r == -ENFILE) {
+ lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
break;
- } else if (errno == ECONNABORTED) {
- ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ } else if (r == -ECONNABORTED) {
+ ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
continue;
} else {
- lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ lderr(msgr->cct) << __func__ << " no incoming connection?"
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
break;
}
}
{
ldout(msgr->cct,10) << __func__ << dendl;
- if (listen_sd >= 0) {
+ if (listen_socket) {
worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.delete_file_event(listen_sd, EVENT_READABLE);
- ::shutdown(listen_sd, SHUT_RDWR);
- ::close(listen_sd);
- listen_sd = -1;
- });
- }
-}
-
-void Worker::stop()
-{
- ldout(cct, 10) << __func__ << dendl;
- done = true;
- center.wakeup();
-}
-
-class WorkerPool {
- CephContext *cct;
- vector<Worker*> workers;
- vector<int> coreids;
- // Used to indicate whether thread started
- bool started;
- Mutex barrier_lock;
- Cond barrier_cond;
- atomic_t barrier_count;
- simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
-
- class C_barrier : public EventCallback {
- WorkerPool *pool;
- public:
- explicit C_barrier(WorkerPool *p): pool(p) {}
- void do_request(int id) {
- Mutex::Locker l(pool->barrier_lock);
- pool->barrier_count.dec();
- pool->barrier_cond.Signal();
- delete this;
- }
- };
- friend class C_barrier;
- public:
- std::atomic_uint pending;
- explicit WorkerPool(CephContext *c);
- WorkerPool(const WorkerPool &) = delete;
- WorkerPool& operator=(const WorkerPool &) = delete;
- virtual ~WorkerPool();
- void start();
- Worker *get_worker();
- int get_cpuid(int id) {
- if (coreids.empty())
- return -1;
- return coreids[id % coreids.size()];
- }
- void barrier();
-};
-
-void *Worker::entry()
-{
- ldout(cct, 10) << __func__ << " starting" << dendl;
- if (cct->_conf->ms_async_set_affinity) {
- int cid = pool->get_cpuid(id);
- if (cid >= 0 && set_affinity(cid)) {
- ldout(cct, 0) << __func__ << " sched_setaffinity failed: "
- << cpp_strerror(errno) << dendl;
- }
- }
-
- center.set_owner();
- pool->pending--;
- while (!done) {
- ldout(cct, 20) << __func__ << " calling event process" << dendl;
-
- int r = center.process_events(EventMaxWaitUs);
- if (r < 0) {
- ldout(cct, 20) << __func__ << " process events failed: "
- << cpp_strerror(errno) << dendl;
- // TODO do something?
- }
- }
-
- return 0;
-}
-
-/*******************
- * WorkerPool
- *******************/
-WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
- barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
- barrier_count(0), pending(0)
-{
- assert(cct->_conf->ms_async_op_threads > 0);
- // make sure user won't try to force some crazy number of worker threads
- assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads &&
- cct->_conf->ms_async_op_threads <= 32);
- for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
- Worker *w = new Worker(cct, this, i);
- workers.push_back(w);
- }
- vector<string> corestrs;
- get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
- for (vector<string>::iterator it = corestrs.begin();
- it != corestrs.end(); ++it) {
- string err;
- int coreid = strict_strtol(it->c_str(), 10, &err);
- if (err == "")
- coreids.push_back(coreid);
- else
- lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
- }
-
-}
-
-WorkerPool::~WorkerPool()
-{
- for (uint64_t i = 0; i < workers.size(); ++i) {
- if (workers[i]->is_started()) {
- workers[i]->stop();
- workers[i]->join();
- }
- delete workers[i];
- }
-}
-
-void WorkerPool::start()
-{
- if (!started) {
- for (uint64_t i = 0; i < workers.size(); ++i) {
- pending++;
- workers[i]->create("ms_async_worker");
- }
- started = true;
+ worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
+ listen_socket.abort_accept();
+ }, false);
}
- while (pending)
- usleep(50);
}
-Worker* WorkerPool::get_worker()
-{
- ldout(cct, 10) << __func__ << dendl;
-
- // start with some reasonably large number
- unsigned min_load = std::numeric_limits<int>::max();
- Worker* current_best = nullptr;
-
- simple_spin_lock(&pool_spin);
- // find worker with least references
- // tempting case is returning on references == 0, but in reality
- // this will happen so rarely that there's no need for special case.
- for (auto p = workers.begin(); p != workers.end(); ++p) {
- unsigned worker_load = (*p)->references.load();
- ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl;
- if (worker_load < min_load) {
- current_best = *p;
- min_load = worker_load;
- }
- }
- // if minimum load exceeds amount of workers, make a new worker
- // logic behind this is that we're not going to create new worker
- // just because others have *some* load, we'll defer worker creation
- // until others have *plenty* of load. This will cause new worker
- // to get assigned to all new connections *unless* one or more
- // of workers get their load reduced - in that case, this worker
- // will be assigned to new connection.
- // TODO: add more logic and heuristics, so connections known to be
- // of light workload (heartbeat service, etc.) won't overshadow
- // heavy workload (clients, etc).
- if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads)
- && (min_load > workers.size()))) {
- ldout(cct, 20) << __func__ << " creating worker" << dendl;
- current_best = new Worker(cct, this, workers.size());
- workers.push_back(current_best);
- pending++;
- current_best->create("ms_async_worker");
- } else {
- ldout(cct, 20) << __func__ << " picked " << current_best
- << " as best worker with load " << min_load << dendl;
+struct StackSingleton {
+ std::shared_ptr<NetworkStack> stack;
+ StackSingleton(CephContext *c) {
+ stack = NetworkStack::create(c, c->_conf->ms_async_transport_type);
}
-
- ++current_best->references;
- simple_spin_unlock(&pool_spin);
-
- while (pending)
- usleep(50);
- assert(current_best);
- return current_best;
-}
-
-void WorkerPool::barrier()
-{
- ldout(cct, 10) << __func__ << " started." << dendl;
- for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
- barrier_count.inc();
- (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
+ ~StackSingleton() {
+ stack->stop();
}
- ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl;
- Mutex::Locker l(barrier_lock);
- while (barrier_count.read())
- barrier_cond.Wait(barrier_lock);
+};
- ldout(cct, 10) << __func__ << " end." << dendl;
-}
class C_handle_reap : public EventCallback {
AsyncMessenger *msgr;
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features)
: SimplePolicyMessenger(cct, name,mname, _nonce),
- processor(this, cct, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), did_bind(false),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
- // uniq name for CephContext to distinguish from other objects
- cct->lookup_or_create_singleton_object<WorkerPool>(pool, "AsyncMessenger::WorkerPool");
- local_worker = pool->get_worker();
+ StackSingleton *single;
+ cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack");
+ stack = single->stack.get();
+ stack->start();
+ local_worker = stack->get_worker();
local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
local_features = features;
init_local_connection();
reap_handler = new C_handle_reap(this);
+ unsigned processor_num = 1;
+ if (stack->support_local_listen_table())
+ processor_num = stack->get_num_worker();
+ for (unsigned i = 0; i < processor_num; ++i)
+ processors.push_back(new Processor(this, stack->get_worker(i), cct, _nonce));
}
/**
delete reap_handler;
assert(!did_bind); // either we didn't bind or we shut down the Processor
local_connection->mark_down();
+ for (auto &&p : processors)
+ delete p;
}
void AsyncMessenger::ready()
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
Mutex::Locker l(lock);
- pool->start();
- Worker *w = pool->get_worker();
- processor.start(w);
+ for (auto &&p : processors)
+ p->start();
dispatch_queue.start();
}
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ // done! clean up.
+ for (auto &&p : processors)
+ p->stop();
mark_down_all();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
- // done! clean up.
- processor.stop();
did_bind = false;
lock.Lock();
stop_cond.Signal();
stopped = true;
lock.Unlock();
- pool->barrier();
+ stack->drain();
return 0;
}
// bind to a socket
set<int> avoid_ports;
- int r = processor.bind(bind_addr, avoid_ports);
+ int r = 0;
+ unsigned i = 0;
+ for (auto &&p : processors) {
+ r = p->bind(bind_addr, avoid_ports);
+ if (r < 0) {
+ // Note: this is related to local tcp listen table problem.
+ // Posix(default kernel implementation) backend shares listen table
+ // in the kernel, so all threads can use the same listen table naturally
+ // and only one thread need to bind. But other backends(like dpdk) uses local
+ // listen table, we need to bind/listen tcp port for each worker. So if the
+ // first worker failed to bind, it could be think the normal error then handle
+ // it, like port is used case. But if the first worker successfully to bind
+ // but the second worker failed, it's not expected and we need to assert
+ // here
+ assert(i == 0);
+ break;
+ }
+ ++i;
+ }
if (r >= 0)
did_bind = true;
return r;
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
assert(did_bind);
- processor.stop();
+ for (auto &&p : processors)
+ p->stop();
mark_down_all();
- int r = processor.rebind(avoid_ports);
- if (r == 0) {
- Worker *w = pool->get_worker();
- processor.start(w);
+ unsigned i = 0;
+ int r = 0;
+ for (auto &&p : processors) {
+ r = p->rebind(avoid_ports);
+ if (r == 0) {
+ p->start();
+ } else {
+ assert(i == 0);
+ break;
+ }
+ i++;
}
return r;
}
// close all connections
shutdown_connections(false);
- pool->barrier();
+ stack->drain();
ldout(cct, 10) << __func__ << ": done." << dendl;
ldout(cct, 1) << __func__ << " complete." << dendl;
started = false;
}
-AsyncConnectionRef AsyncMessenger::add_accept(int sd)
+AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
{
lock.Lock();
- Worker *w = pool->get_worker();
+ if (!stack->support_local_listen_table())
+ w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
- conn->accept(sd);
+ conn->accept(std::move(cli_socket), addr);
accepting_conns.insert(conn);
lock.Unlock();
return conn;
<< ", creating connection and registering" << dendl;
// create connection
- Worker *w = pool->get_worker();
+ Worker *w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
conn->connect(addr, type);
assert(!conns.count(addr));
#include "include/assert.h"
#include "AsyncConnection.h"
#include "Event.h"
-#include "common/simple_spin.h"
class AsyncMessenger;
-class WorkerPool;
-
-enum {
- l_msgr_first = 94000,
- l_msgr_recv_messages,
- l_msgr_send_messages,
- l_msgr_send_messages_inline,
- l_msgr_recv_bytes,
- l_msgr_send_bytes,
- l_msgr_created_connections,
- l_msgr_active_connections,
- l_msgr_last,
-};
-
-
-class Worker : public Thread {
- static const uint64_t InitEventNumber = 5000;
- static const uint64_t EventMaxWaitUs = 30000000;
- CephContext *cct;
- WorkerPool *pool;
- bool done;
- int id;
- PerfCounters *perf_logger;
-
- public:
- EventCenter center;
- std::atomic_uint references;
- Worker(CephContext *c, WorkerPool *p, int i)
- : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
- center.init(InitEventNumber, i);
- char name[128];
- sprintf(name, "AsyncMessenger::Worker-%d", id);
- // initialize perf_logger
- PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
-
- plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
- plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
- plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
- plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
- plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
- plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
- plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
-
- perf_logger = plb.create_perf_counters();
- cct->get_perfcounters_collection()->add(perf_logger);
- }
- ~Worker() {
- if (perf_logger) {
- cct->get_perfcounters_collection()->remove(perf_logger);
- delete perf_logger;
- }
- }
- void *entry();
- void stop();
- PerfCounters *get_perf_counter() { return perf_logger; }
- void release_worker() {
- int oldref = references.fetch_sub(1);
- assert(oldref > 0);
- }
-};
/**
* If the Messenger binds to a specific address, the Processor runs
AsyncMessenger *msgr;
NetHandler net;
Worker *worker;
- int listen_sd;
+ ServerSocket listen_socket;
uint64_t nonce;
EventCallbackRef listen_handler;
class C_processor_accept;
public:
- Processor(AsyncMessenger *r, CephContext *c, uint64_t n);
+ Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n);
~Processor() { delete listen_handler; };
void stop();
int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
int rebind(const set<int>& avoid_port);
- void start(Worker *w);
+ void start();
void accept();
};
private:
static const uint64_t ReapDeadConnectionThreshold = 5;
- WorkerPool *pool;
-
- Processor processor;
+ NetworkStack *stack;
+ std::vector<Processor*> processors;
friend class Processor;
DispatchQueue dispatch_queue;
}
void learned_addr(const entity_addr_t &peer_addr_for_me);
- AsyncConnectionRef add_accept(int sd);
+ AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
/**
* This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.