From d09d28c5f1fd1a7d6e701fa663671c747b7d7ed0 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 12 Sep 2014 15:51:18 +0800 Subject: [PATCH] Add AsyncMessenger support Signed-off-by: Haomai Wang --- src/common/config_opts.h | 3 + src/msg/AsyncConnection.cc | 1906 ++++++++++++++++++++++++++++++++ src/msg/AsyncConnection.h | 221 ++++ src/msg/AsyncMessenger.cc | 635 +++++++++++ src/msg/AsyncMessenger.h | 391 +++++++ src/msg/Event.cc | 116 ++ src/msg/Event.h | 162 +++ src/msg/EventEpoll.cc | 118 ++ src/msg/EventEpoll.h | 37 + src/msg/Makefile.am | 10 +- src/msg/net_handler.cc | 128 +++ src/msg/net_handler.h | 23 + src/msg/simple/DispatchQueue.h | 4 +- 13 files changed, 3751 insertions(+), 3 deletions(-) create mode 100644 src/msg/AsyncConnection.cc create mode 100644 src/msg/AsyncConnection.h create mode 100644 src/msg/AsyncMessenger.cc create mode 100644 src/msg/AsyncMessenger.h create mode 100644 src/msg/Event.cc create mode 100644 src/msg/Event.h create mode 100644 src/msg/EventEpoll.cc create mode 100644 src/msg/EventEpoll.h create mode 100644 src/msg/net_handler.cc create mode 100644 src/msg/net_handler.h diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 38c25a2bef7..9376b759041 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -139,6 +139,9 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at +OPTION(ms_event_op_threads, OPT_INT, 2) +OPTION(ms_event_thread_timeout, OPT_INT, 15) +OPTION(ms_event_thread_suicide_timeout, OPT_INT, 180) OPTION(inject_early_sigterm, OPT_BOOL, false) diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc new file mode 100644 index 00000000000..296c4451357 --- /dev/null +++ b/src/msg/AsyncConnection.cc @@ -0,0 +1,1906 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include +#include +#include "include/Context.h" +#include "common/errno.h" +#include "AsyncMessenger.h" +#include "AsyncConnection.h" + +// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR +#define SEQ_MASK 0x7fffffff + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _conn_prefix(_dout) +ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { + return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this + << " sd=" << sd << " :" << port + << " s=" << state + << " pgs=" << peer_global_seq + << " cs=" << connect_seq + << " l=" << policy.lossy + << ")."; +} + +class C_handle_read : public EventCallback { + AsyncConnection *conn; + + public: + C_handle_read(AsyncConnection *c): conn(c) {} + void do_request(int fd, int mask) { + conn->process(); + } +}; + +class C_handle_write : public EventCallback { + AsyncConnection *conn; + + public: + C_handle_write(AsyncConnection *c): conn(c) {} + void do_request(int fd, int mask) { + conn->handle_write(); + } +}; + +static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) +{ + // create a buffer to read into that matches the data alignment + unsigned left = len; + if (off & ~CEPH_PAGE_MASK) { + // head + unsigned head = 0; + head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); + bufferptr bp = buffer::create(head); + data.push_back(bp); + left -= head; + } + unsigned middle = left & CEPH_PAGE_MASK; + if (middle > 0) { + bufferptr bp = buffer::create_page_aligned(middle); + data.push_back(bp); + left -= middle; + } + if (left) { + bufferptr bp = buffer::create(left); + data.push_back(bp); + } +} + +AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) + : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), + state(STATE_NONE), state_after_send(0), sd(-1), conn_id(m->dispatch_queue.get_id()), + in_q(&(m->dispatch_queue)), lock("AsyncConnection::lock"), backoff(0), + got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { } + +AsyncConnection::~AsyncConnection() +{ + assert(!authorizer); +} + +/* return -1 means `fd` occurs error or closed, it should be closed + * return 0 means EAGAIN or EINTR */ +int AsyncConnection::read_bulk(int fd, char *buf, int len) +{ + int nread = ::read(fd, buf, len); + if (nread == -1) { + if (errno == EAGAIN || errno == EINTR) { + nread = 0; + } else { + ldout(async_msgr->cct, 1) << __func__ << " Reading from fd %d " << fd + << " : "<< strerror(errno) << dendl; + return -1; + } + } else if (nread == 0) { + ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor %d " + << fd << dendl; + return -1; + } + return nread; +} + +// return the length of msg needed to be sent, +// < 0 means error occured +int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more) +{ + while (len > 0) { + int r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + + if (r == 0) { + ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl; + } else if (r < 0) { + if (errno == EAGAIN || errno == EINTR) { + r = len; + } else { + ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl; + } + + return r; + } + + len -= r; + if (len == 0) break; + + // hrmph. trim r bytes off the front of our message. + ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl; + while (r > 0) { + if (msg.msg_iov[0].iov_len <= (size_t)r) { + // lose this whole item + r -= msg.msg_iov[0].iov_len; + msg.msg_iov++; + msg.msg_iovlen--; + } else { + msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; + msg.msg_iov[0].iov_len -= r; + break; + } + } + } + return 0; +} + +// return the remaining bytes, it may larger than the length of ptr +// else return < 0 means error +int AsyncConnection::_try_send(bufferlist send_bl, bool send) +{ + if (send_bl.length()) + outcoming_bl.claim_append(send_bl); + + if (!send) + return 0; + + if (state == STATE_CLOSED) { + ldout(async_msgr->cct, 1) << __func__ << " connection is closed" << dendl; + return -EINTR; + } + + int r = 0; + uint64_t sended = 0; + list::const_iterator pb = outcoming_bl.buffers().begin(); + while (outcoming_bl.length() > sended) { + struct msghdr msg; + int size = MIN(outcoming_bl.buffers().size(), IOV_MAX); + struct iovec *msgvec = new iovec[size]; + memset(&msg, 0, sizeof(msg)); + msg.msg_iovlen = 0; + int msglen = 0; + while (size > 0) { + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); + msgvec[msg.msg_iovlen].iov_len = pb->length(); + msg.msg_iovlen++; + msglen += pb->length(); + pb++; + size--; + } + + r = do_sendmsg(msg, msglen, false); + if (r < 0) + return r; + + // "r" is the remaining length + sended += msglen - r; + if (r > 0) { + center->create_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + ldout(async_msgr->cct, 5) << __func__ << " remaining " << r + << " needed to be sent, creating event for writing" + << dendl; + break; + } + // only "r" == 0 continue + } + + // trim already sent for outcoming_bl + if (sended) { + bufferlist bl; + bl.splice(sended, outcoming_bl.length()-sended, &outcoming_bl); + bl.swap(outcoming_bl); + } + + if (!outcoming_bl.length()) + center->delete_event(sd, EVENT_WRITABLE); + + return outcoming_bl.length(); +} + +// Because this func will be called multi times to populate +// the needed buffer, so the passed in bufferptr must be the same. +// Normally, only "read_message" will pass existing bufferptr in +// +// return the remaining bytes, 0 means this buffer is finished +// else return < 0 means error +int AsyncConnection::read_until(uint64_t needed, bufferptr &p) +{ + assert(needed); + int offset = state_offset; + int left = needed - offset; + int r; + do { + r = read_bulk(sd, p.c_str()+offset, left); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << state << dendl; + return -1; + } else if (r == left) { + state_offset = 0; + return 0; + } + left -= r; + offset += r; + } while (r > 0); + + state_offset = offset; + ldout(async_msgr->cct, 20) << __func__ << " read " << r << " bytes, state is " + << state << dendl; + return needed - offset; +} + +void AsyncConnection::process() +{ + int r = 0; + int prev_state; + assert(!lock.is_locked()); + Mutex::Locker l(lock); + do { + prev_state = state; + switch (state) { + case STATE_OPEN: + { + char tag = -1; + r = read_bulk(sd, &tag, sizeof(tag)); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read tag failed, state is " << state << dendl; + goto fail; + } else if (r == 0) { + break; + } + assert(r == 1); + + if (tag == CEPH_MSGR_TAG_KEEPALIVE) { + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; + } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { + state = STATE_OPEN_KEEPALIVE2; + } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + state = STATE_OPEN_KEEPALIVE2_ACK; + } else if (tag == CEPH_MSGR_TAG_ACK) { + state = STATE_OPEN_TAG_ACK; + } else if (tag == CEPH_MSGR_TAG_MSG) { + state = STATE_OPEN_MESSAGE_HEADER; + } else if (tag == CEPH_MSGR_TAG_CLOSE) { + state = STATE_OPEN_TAG_CLOSE; + } else { + ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl; + goto fail; + } + + break; + } + + case STATE_OPEN_KEEPALIVE2: + { + ceph_timespec *t; + r = read_until(sizeof(*t), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; + t = (ceph_timespec*)(state_buffer.c_str()); + utime_t kp_t = utime_t(*t); + _send_keepalive_or_ack(true, &kp_t); + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; + state = STATE_OPEN; + break; + } + + case STATE_OPEN_KEEPALIVE2_ACK: + { + ceph_timespec *t; + r = read_until(sizeof(*t), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + t = (ceph_timespec*)(state_buffer.c_str()); + last_keepalive_ack = utime_t(*t); + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; + state = STATE_OPEN; + break; + } + + case STATE_OPEN_TAG_ACK: + { + ceph_le64 *seq; + r = read_until(sizeof(seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + seq = (ceph_le64*)(state_buffer.c_str()); + ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl; + handle_ack(*seq); + state = STATE_OPEN; + break; + } + + case STATE_OPEN_MESSAGE_HEADER: + { + ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl; + ceph_msg_header header; + ceph_msg_header_old oldheader; + __u32 header_crc; + int len; + if (has_feature(CEPH_FEATURE_NOSRCADDR)) + len = sizeof(header); + else + len = sizeof(oldheader); + + r = read_until(len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl; + + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + header = *((ceph_msg_header*)state_buffer.c_str()); + header_crc = ceph_crc32c(0, (unsigned char *)&header, + sizeof(header) - sizeof(header.crc)); + } else { + oldheader = *((ceph_msg_header_old*)state_buffer.c_str()); + // this is fugly + memcpy(&header, &oldheader, sizeof(header)); + header.src = oldheader.src.name; + header.reserved = oldheader.reserved; + header.crc = oldheader.crc; + header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + } + + ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type + << " src " << entity_name_t(header.src) + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off << dendl; + + // verify header crc + if (header_crc != header.crc) { + ldout(async_msgr->cct,0) << __func__ << "reader got bad header crc " + << header_crc << " != " << header.crc << dendl; + goto fail; + } + + // Reset state + front.clear(); + middle.clear(); + data.clear(); + memset(&connect_msg, 0, sizeof(connect_msg)); + memset(&connect_reply, 0, sizeof(connect_reply)); + recv_stamp = ceph_clock_now(async_msgr->cct); + current_header = header; + state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE; + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE: + { + if (policy.throttler_messages) { + ldout(async_msgr->cct,10) << __func__ << " wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + // FIXME: when to try it again? + if (policy.throttler_messages->get_or_fail()) + state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; + } + + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_BYTES: + { + uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; + if (message_size) { + if (policy.throttler_bytes) { + ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + // FIXME: when to try it again? + if (policy.throttler_bytes->get_or_fail(message_size)) + state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH; + } + } + + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH: + { + uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; + if (message_size) { + // throttle total bytes waiting for dispatch. do this _after_ the + // policy throttle, as this one does not deadlock (unless dispatch + // blocks indefinitely, which it shouldn't). in contrast, the + // policy throttle carries for the lifetime of the message. + ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " from dispatch throttler " + << async_msgr->dispatch_throttler.get_current() << "/" + << async_msgr->dispatch_throttler.get_max() << dendl; + if (async_msgr->dispatch_throttler.get_or_fail(message_size)) { + state = STATE_OPEN_MESSAGE_READ_FRONT; + throttle_stamp = ceph_clock_now(async_msgr->cct); + } + } + + break; + } + case STATE_OPEN_MESSAGE_READ_FRONT: + { + // read front + int front_len = current_header.front_len; + if (front_len) { + bufferptr ptr = buffer::create(front_len); + r = read_until(front_len, ptr); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + front.push_back(ptr); + ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl; + } + state = STATE_OPEN_MESSAGE_READ_MIDDLE; + break; + } + + case STATE_OPEN_MESSAGE_READ_MIDDLE: + { + // read middle + int middle_len = current_header.middle_len; + if (middle_len) { + bufferptr ptr = buffer::create(middle_len); + r = read_until(middle_len, ptr); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + middle.push_back(ptr); + ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl; + } + + state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE; + break; + } + + case STATE_OPEN_MESSAGE_READ_DATA_PREPARE: + { + // read data + uint64_t data_len = le32_to_cpu(current_header.data_len); + int data_off = le32_to_cpu(current_header.data_off); + bufferlist bl; + if (data_len) { + // get a buffer + lock.Lock(); + map >::iterator p = rx_buffers.find(current_header.tid); + if (p != rx_buffers.end()) { + ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second + << " at offset " << data_off + << " len " << p->second.first.length() << dendl; + bl = p->second.first; + // make sure it's big enough + if (bl.length() < data_len) + bl.push_back(buffer::create(data_len - bl.length())); + data_blp = bl.begin(); + } else { + ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl; + alloc_aligned_buffer(bl, data_len, data_off); + data_blp = bl.begin(); + } + lock.Unlock(); + } + + msg_left = data_len; + state = STATE_OPEN_MESSAGE_READ_DATA; + break; + } + + case STATE_OPEN_MESSAGE_READ_DATA: + { + do { + bufferptr bp = data_blp.get_current_ptr(); + uint64_t read = MIN(bp.length(), msg_left); + r = read_until(read, bp); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl; + goto fail; + } else if (r > 0) { + break; + } + + data_blp.advance(read); + data.append(bp, 0, read); + } while (msg_left > 0); + + if (msg_left == 0) + state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH; + + break; + } + + case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH: + { + ceph_msg_footer footer; + ceph_msg_footer_old old_footer; + int len; + // footer + if (has_feature(CEPH_FEATURE_MSG_AUTH)) + len = sizeof(footer); + else + len = sizeof(old_footer); + + r = read_until(len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + footer = *((ceph_msg_footer*)state_buffer.c_str()); + } else { + old_footer = *((ceph_msg_footer_old*)state_buffer.c_str()); + footer.front_crc = old_footer.front_crc; + footer.middle_crc = old_footer.middle_crc; + footer.data_crc = old_footer.data_crc; + footer.sig = 0; + footer.flags = old_footer.flags; + } + int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; + ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl; + if (aborted) { + ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length() + << " byte message.. ABORTED" << dendl; + goto fail; + } + + ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length() + << " + " << data.length() << " byte message" << dendl; + Message *message = decode_message(async_msgr->cct, current_header, footer, front, middle, data); + if (!message) { + ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl; + goto fail; + } + + // + // Check the signature if one should be present. A zero return indicates success. PLR + // + + ceph::shared_ptr auth_handler = session_security; + if (auth_handler == NULL) { + ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl; + } else { + if (auth_handler->check_message_signature(message)) { + ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl; + goto fail; + } + } + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; + message->set_dispatch_throttle_size(message_size); + + message->set_recv_stamp(recv_stamp); + message->set_throttle_stamp(throttle_stamp); + message->set_recv_complete_stamp(ceph_clock_now(async_msgr->cct)); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the client + // side queueing because messages can't be renumbered, but the (kernel) client will + // occasionally pull a message out of the sent queue to send elsewhere. in that case + // it doesn't matter if we "got" it or not. + if (message->get_seq() <= in_seq) { + ldout(async_msgr->cct,0) << __func__ << " got old message " + << message->get_seq() << " <= " << in_seq << " " << message << " " << *message + << ", discarding" << dendl; + async_msgr->dispatch_throttle_release(message->get_dispatch_throttle_size()); + message->put(); + if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) + assert(0 == "old msgs despite reconnect_seq feature"); + goto fail; + } + get(); + message->set_connection(this); + + // note last received message. + in_seq = message->get_seq(); + ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq() + << " " << message << " " << *message << dendl; + in_q->fast_preprocess(message); + if (in_q->can_fast_dispatch(message)) { + in_q->fast_dispatch(message); + } else { + in_q->enqueue(message, message->get_priority(), conn_id); + } + + state = STATE_OPEN; + break; + } + + case STATE_OPEN_TAG_CLOSE: + { + ldout(async_msgr->cct,20) << __func__ << " got CLOSE" << dendl; + _stop(); + break; + } + + case STATE_CLOSED: + { + ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; + center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE); + break; + } + + case STATE_FAULT: + { + ldout(async_msgr->cct, 20) << __func__ << " socket is in error" << dendl; + break; + } + + default: + { + if (_process_connection() < 0) + goto fail; + } + } + +fail: + // clean up state internal variables and states + if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && + state <= STATE_CONNECTING_READY) { + delete authorizer; + authorizer = NULL; + got_bad_auth = false; + } + + if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH + && policy.throttler_messages) { + ldout(async_msgr->cct,10) << __func__ << " releasing " << 1 + << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } + if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; + if (policy.throttler_bytes) { + ldout(async_msgr->cct,10) << __func__ << " releasing " << message_size + << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(message_size); + } + + if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + async_msgr->dispatch_throttle_release(message_size); + } + } + fault(); + state = STATE_FAULT; + } while (prev_state != state); +} + +int AsyncConnection::_process_connection() +{ + int r = 0; + + switch(state) { + case STATE_WAIT_SEND: + { + if (!outcoming_bl.length()) { + assert(state_after_send); + state = state_after_send; + state_after_send = 0; + } + break; + } + + case STATE_CONNECTING: + { + global_seq = async_msgr->get_global_seq(); + // close old socket. this is safe because we stopped the reader thread above. + if (sd >= 0) + ::close(sd); + + sd = net.connect(get_peer_addr()); + if (sd < 0) { + goto fail; + } + r = net.set_nonblock(sd); + if (r < 0) { + goto fail; + } + net.set_socket_options(sd); + + center->create_event(sd, EVENT_READABLE, new C_handle_read(this)); + state = STATE_CONNECTING_WAIT_BANNER; + break; + } + + case STATE_CONNECTING_WAIT_BANNER: + { + r = read_until(strlen(CEPH_BANNER), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read banner failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer " + << get_peer_addr() << dendl; + goto fail; + } + + bufferlist bl; + bl.append(state_buffer.c_str(), strlen(CEPH_BANNER)); + r = _try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_WAIT_IDENTIFY_PEER; + ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: " + << get_peer_addr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_WAIT_IDENTIFY_PEER; + ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: " + << get_peer_addr() << dendl; + } else { + goto fail; + } + break; + } + + case STATE_CONNECTING_WAIT_IDENTIFY_PEER: + { + entity_addr_t paddr, peer_addr_for_me; + int port; + bufferlist myaddrbl; + + r = read_until(sizeof(paddr)*2, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read identify peeraddr failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + bufferlist bl; + bl.append(state_buffer); + bufferlist::iterator p = bl.begin(); + try { + ::decode(paddr, p); + ::decode(peer_addr_for_me, p); + } catch (const buffer::error& e) { + lderr(async_msgr->cct) << __func__ << " decode peer addr failed " << dendl; + goto fail; + } + port = peer_addr_for_me.get_port(); + ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " + << paddr << " on socket " << sd << dendl; + if (peer_addr != paddr) { + if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && + peer_addr.get_nonce() == paddr.get_nonce()) { + ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " << paddr + << " not " << peer_addr + << " - presumably this is the same node!" << dendl; + } else { + ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " + << paddr << " not " << peer_addr << " - wrong node!" << dendl; + goto fail; + } + } + + ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; + async_msgr->learned_addr(peer_addr_for_me); + ::encode(async_msgr->get_myaddr(), myaddrbl); + r = _try_send(myaddrbl); + if (r == 0) { + state = STATE_CONNECTING_SEND_CONNECT_MSG; + ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr " + << async_msgr->get_myaddr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG; + ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: " + << async_msgr->get_myaddr() << dendl; + } else { + ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " + << cpp_strerror(errno) << dendl; + goto fail; + } + + break; + } + + case STATE_CONNECTING_SEND_CONNECT_MSG: + { + if (!got_bad_auth) { + delete authorizer; + authorizer = async_msgr->get_authorizer(peer_type, false); + } + assert(authorizer); + bufferlist bl; + + ceph_msg_connect connect; + connect.features = policy.features_supported; + connect.host_type = async_msgr->get_myinst().name.type(); + connect.global_seq = global_seq; + connect.connect_seq = connect_seq; + connect.protocol_version = async_msgr->get_proto_version(peer_type, true); + connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; + connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; + if (authorizer) + ldout(async_msgr->cct, 10) << __func__ << "connect.authorizer_len=" + << connect.authorizer_len << " protocol=" + << connect.authorizer_protocol << dendl; + connect.flags = 0; + if (policy.lossy) + connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! + bl.append((char*)&connect, sizeof(connect)); + if (authorizer) { + bl.append(authorizer->bl.c_str(), authorizer->bl.length()); + } + ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq=" + << connect_seq << " proto=" << connect.protocol_version << dendl; + + r = _try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_WAIT_CONNECT_REPLY; + ldout(async_msgr->cct,20) << __func__ << "connect wrote (self +) cseq, waiting for reply" << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY; + ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; + } else { + ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " + << cpp_strerror(errno) << dendl; + goto fail; + } + + break; + } + + case STATE_CONNECTING_WAIT_CONNECT_REPLY: + { + r = read_until(sizeof(connect_reply), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + connect_reply = *((ceph_msg_connect_reply*)state_buffer.c_str()); + connect_reply.features = ceph_sanitize_features(connect_reply.features); + + ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag + << " connect_seq " << connect_reply.connect_seq << " global_seq " + << connect_reply.global_seq << " proto " << connect_reply.protocol_version + << " flags " << (int)connect_reply.flags << " features " + << connect_reply.features << dendl; + state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH; + + break; + } + + case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH: + { + bufferlist authorizer_reply; + if (connect_reply.authorizer_len) { + ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl; + r = read_until(connect_reply.authorizer_len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + authorizer_reply.push_back(state_buffer); + bufferlist::iterator iter = authorizer_reply.begin(); + if (!authorizer->verify_reply(iter)) { + ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl; + goto fail; + } + } + r = handle_connect_reply(connect_msg, connect_reply); + if (r < 0) + goto fail; + + // state must be changed! + assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH); + break; + } + + case STATE_CONNECTING_WAIT_ACK_SEQ: + { + uint64_t newly_acked_seq = 0; + bufferlist bl; + + r = read_until(sizeof(newly_acked_seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + newly_acked_seq = *((uint64_t*)state_buffer.c_str()); + ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; + while (newly_acked_seq > out_seq) { + Message *m = _get_next_outgoing(); + assert(m); + ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() + << " " << *m << dendl; + assert(m->get_seq() <= newly_acked_seq); + m->put(); + ++out_seq; + } + + bl.append((char*)&in_seq, sizeof(in_seq)); + r = _try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_READY; + ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl; + } else if (r > 0) { + state_after_send = STATE_CONNECTING_READY; + state = STATE_WAIT_SEND; + ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl; + } else { + goto fail; + } + break; + } + + case STATE_CONNECTING_READY: + { + // hooray! + peer_global_seq = connect_reply.global_seq; + policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY; + state = STATE_OPEN; + connect_seq += 1; + assert(connect_seq == connect_reply.connect_seq); + backoff = utime_t(); + set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features); + ldout(async_msgr->cct, 10) << __func__ << "connect success " << connect_seq + << ", lossy = " << policy.lossy << ", features " + << get_features() << dendl; + + // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the + // connection. PLR + if (authorizer != NULL) { + session_security.reset( + get_auth_session_handler(async_msgr->cct, + authorizer->protocol, + authorizer->session_key, + get_features())); + } else { + // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR + session_security.reset(); + } + + get(); + async_msgr->dispatch_queue.queue_connect(this); + get(); + async_msgr->ms_deliver_handle_fast_connect(this); + + // reset connect state variables + got_bad_auth = false; + delete authorizer; + authorizer = NULL; + return 0; + } + + case STATE_ACCEPTING: + { + bufferlist bl; + + if (net.set_nonblock(sd) < 0) + goto fail; + + net.set_socket_options(sd); + + bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); + + ::encode(async_msgr->get_myaddr(), bl); + port = async_msgr->get_myaddr().get_port(); + // and peer's socket addr (they might not know their ip) + socklen_t len = sizeof(socket_addr.ss_addr()); + r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); + if (r < 0) { + ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername " + << cpp_strerror(errno) << dendl; + goto fail; + } + ::encode(socket_addr, bl); + ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl; + + r = _try_send(bl); + if (r == 0) { + state = STATE_ACCEPTING_WAIT_BANNER_ADDR; + ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: " + << get_peer_addr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR; + ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: " + << get_peer_addr() << dendl; + } else { + goto fail; + } + + break; + } + case STATE_ACCEPTING_WAIT_BANNER_ADDR: + { + bufferlist addr_bl; + entity_addr_t peer_addr; + + r = read_until(strlen(CEPH_BANNER) + sizeof(peer_addr), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) { + ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer.c_str() + << "' (should be '" << CEPH_BANNER << "')" << dendl; + goto fail; + } + + addr_bl.append(state_buffer, strlen(CEPH_BANNER), sizeof(peer_addr)); + { + bufferlist::iterator ti = addr_bl.begin(); + ::decode(peer_addr, ti); + } + + ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl; + if (peer_addr.is_blank_ip()) { + // peer apparently doesn't know what ip they have; figure it out for them. + int port = peer_addr.get_port(); + peer_addr.addr = socket_addr.addr; + peer_addr.set_port(port); + ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr + << " (socket is " << socket_addr << ")" << dendl; + } + set_peer_addr(peer_addr); // so that connection_state gets set up + state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + break; + } + + case STATE_ACCEPTING_WAIT_CONNECT_MSG: + { + r = read_until(sizeof(connect_msg), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + connect_msg = *((ceph_msg_connect*)state_buffer.c_str()); + // sanitize features + connect_msg.features = ceph_sanitize_features(connect_msg.features); + state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH; + break; + } + + case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH: + { + bufferlist authorizer_bl, authorizer_reply; + + if (connect_msg.authorizer_len) { + r = read_until(connect_msg.authorizer_len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + authorizer_bl.push_back(state_buffer); + } + + ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq " + << connect_msg.connect_seq << " global_seq " + << connect_msg.global_seq << dendl; + set_peer_type(connect_msg.host_type); + policy = async_msgr->get_policy(connect_msg.host_type); + ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type + << ", policy.lossy=" << policy.lossy << " policy.server=" + << policy.server << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck << dendl; + + r = handle_connect_msg(connect_msg, authorizer_bl, authorizer_reply); + if (r < 0) + goto fail; + + // state is changed by "handle_connect_msg" + assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH); + break; + } + + case STATE_ACCEPTING_WAIT_SEQ: + { + uint64_t newly_acked_seq; + r = read_until(sizeof(newly_acked_seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + newly_acked_seq = *((uint64_t*)state_buffer.c_str()); + ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl; + discard_requeued_up_to(newly_acked_seq); + state = STATE_ACCEPTING_READY; + break; + } + + case STATE_ACCEPTING_READY: + { + ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; + state = STATE_OPEN; + break; + } + + default: + { + lderr(async_msgr->cct) << __func__ << " bad state" << state << dendl; + assert(0); + } + } + + return 0; + +fail: + return -1; +} + +int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply) +{ + uint64_t feat_missing; + if (reply.tag == CEPH_MSGR_TAG_FEATURES) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my " + << std::hex << connect.features << " < peer " + << reply.features << " missing " + << (reply.features & ~policy.features_supported) + << std::dec << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my " + << connect.protocol_version << " != " << reply.protocol_version + << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { + ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl; + if (got_bad_auth) + goto fail; + got_bad_auth = true; + assert(authorizer); + delete authorizer; + authorizer = async_msgr->get_authorizer(peer_type, true); // try harder + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { + ldout(async_msgr->cct, 0) << __func__ << "connect got RESETSESSION" << dendl; + was_session_reset(); + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + global_seq = async_msgr->get_global_seq(reply.global_seq); + ldout(async_msgr->cct, 10) << __func__ << " connect got RETRY_GLOBAL " + << reply.global_seq << " chose new " + << global_seq << dendl; + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { + assert(reply.connect_seq > connect_seq); + connect_seq = reply.connect_seq; + ldout(async_msgr->cct, 10) << __func__ << " connect got RETRY_SESSION " + << connect_seq << " -> " + << reply.connect_seq << dendl; + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_WAIT) { + ldout(async_msgr->cct, 3) << __func__ << " connect got WAIT (connection race)" << dendl; + state = STATE_WAIT; + } + + feat_missing = policy.features_required & ~(uint64_t)connect_reply.features; + if (feat_missing) { + ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex + << feat_missing << std::dec << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_SEQ) { + ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; + state = STATE_CONNECTING_WAIT_ACK_SEQ; + } + if (reply.tag == CEPH_MSGR_TAG_READY) { + ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_READY " << dendl; + state = STATE_CONNECTING_READY; + } + + return 0; + + fail: + return -1; +} + +int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl, + bufferlist &authorizer_reply) +{ + int r; + ceph_msg_connect_reply reply; + bufferlist reply_bl; + uint64_t existing_seq = -1; + bool is_reset_from_peer = false; + char reply_tag; + + memset(&reply, 0, sizeof(reply)); + reply.protocol_version = async_msgr->get_proto_version(peer_type, false); + + // mismatch? + ldout(async_msgr->cct,10) << __func__ << "accept my proto " << reply.protocol_version + << ", their proto " << connect.protocol_version << dendl; + if (connect.protocol_version != reply.protocol_version) { + return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply); + } + // require signatures for cephx? + if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { + if (peer_type == CEPH_ENTITY_TYPE_OSD || + peer_type == CEPH_ENTITY_TYPE_MDS) { + if (async_msgr->cct->_conf->cephx_require_signatures || + async_msgr->cct->_conf->cephx_cluster_require_signatures) { + ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + } else { + if (async_msgr->cct->_conf->cephx_require_signatures || + async_msgr->cct->_conf->cephx_service_require_signatures) { + ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + } + } + uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features; + if (feat_missing) { + ldout(async_msgr->cct, 1) << __func__ << "peer missing required features " + << std::hex << feat_missing << std::dec << dendl; + return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); + } + + bool authorizer_valid; + get(); + if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, + authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { + ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; + session_security.reset(); + return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply); + } + + // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR + ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl; + + // existing? + AsyncConnection *existing = async_msgr->lookup_conn(peer_addr); + if (existing) { + if (connect.global_seq < existing->peer_global_seq) { + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing + << ".gseq " << existing->peer_global_seq << " > " + << connect.global_seq << ", RETRY_GLOBAL" << dendl; + reply.global_seq = existing->peer_global_seq; // so we can send it below.. + return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); + } else { + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing + << ".gseq " << existing->peer_global_seq + << " <= " << connect.global_seq << ", looks ok" << dendl; + } + + if (existing->policy.lossy) { + ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy=" + << policy.lossy << ")" << dendl; + existing->was_session_reset(); + goto replace; + } + + ldout(async_msgr->cct, 0) << __func__ << "accept connect_seq " << connect.connect_seq + << " vs existing " << existing->connect_seq + << " state " << existing->state << dendl; + + if (connect.connect_seq == 0 && existing->connect_seq > 0) { + ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl; + // this is a hard reset from peer + is_reset_from_peer = true; + if (policy.resetcheck) + existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s + goto replace; + } + + if (connect.connect_seq < existing->connect_seq) { + // old attempt, or we sent READY but they didn't get it. + ldout(async_msgr->cct, 10) << __func__ << "accept existing " << existing << ".cseq " + << existing->connect_seq << " > " << connect.connect_seq + << ", RETRY_SESSION" << dendl; + reply.connect_seq = existing->connect_seq + 1; + return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + } + + if (connect.connect_seq == existing->connect_seq) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->state == STATE_OPEN || + existing->state == STATE_STANDBY) { + ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing + << ".cseq " << existing->connect_seq << " == " + << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; + reply.connect_seq = existing->connect_seq + 1; + return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + } + + // connection race? + if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) { + // incoming wins + ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing + << ".cseq " << existing->connect_seq << " == " << connect.connect_seq + << ", or we are server, replacing my attempt" << dendl; + goto replace; + } else { + // our existing outgoing wins + ldout(async_msgr->cct,10) << __func__ << "accept connection race, existing " + << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", sending WAIT" << dendl; + assert(peer_addr > async_msgr->get_myaddr()); + // make sure our outgoing connection will follow through + existing->_send_keepalive_or_ack(); + return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); + } + } + + assert(connect.connect_seq > existing->connect_seq); + assert(connect.global_seq >= existing->peer_global_seq); + if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other + existing->connect_seq == 0) { + ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq " + << connect.connect_seq << ", " << existing << ".cseq = " + << existing->connect_seq << "), sending RESETSESSION" << dendl; + return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); + } + + // reconnect + ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq + << " > " << existing->connect_seq << dendl; + goto replace; + } // existing + else if (policy.resetcheck && connect.connect_seq > 0) { + // we reset, and they are opening a new session + ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq " + << connect.connect_seq << "), sending RESETSESSION" << dendl; + return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); + } else { + // new session + ldout(async_msgr->cct,10) << __func__ << "accept new session" << dendl; + existing = NULL; + goto open; + } + assert(0); + + replace: + // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence + if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + existing_seq = existing->in_seq; + } + ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; + existing->mark_down(); + + // In order to avoid dead lock, here need to lock in ordering. + // It may be another thread access this connection between unlock and lock + // call, this is rely to EventCenter to guarantee only one thread can access + // one connection. + lock.Unlock(); + if (existing->sd > sd) { + existing->lock.Lock(); + lock.Lock(); + } else { + lock.Lock(); + existing->lock.Unlock(); + } + if (existing->policy.lossy) { + // disconnect from the Connection + existing->get(); + async_msgr->dispatch_queue.queue_reset(existing); + } else { + // queue a reset on the new connection, which we're dumping for the old + get(); + async_msgr->dispatch_queue.queue_reset(this); + + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + if (is_reset_from_peer) + existing->in_seq = 0; + + // Clean up output buffer + existing->outcoming_bl.clear(); + existing->requeue_sent(); + reply.connect_seq = existing->connect_seq + 1; + if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) + goto fail; + + uint64_t s = existing->sd; + existing->sd = sd; + sd = s; + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + _stop(); + existing->lock.Unlock(); + return 0; + } + existing->lock.Unlock(); + + open: + connect_seq = connect.connect_seq + 1; + peer_global_seq = connect.global_seq; + ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " + << connect_seq << ", sending READY" << dendl; + + // send READY reply + reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); + reply.features = policy.features_supported; + reply.global_seq = async_msgr->get_global_seq(); + reply.connect_seq = connect_seq; + reply.flags = 0; + reply.authorizer_len = authorizer_reply.length(); + if (policy.lossy) + reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; + + set_features((uint64_t)reply.features & (uint64_t)connect.features); + ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl; + + session_security.reset( + get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol, + session_key, get_features())); + + // notify + get(); + async_msgr->dispatch_queue.queue_accept(this); + get(); + async_msgr->ms_deliver_handle_fast_accept(this); + + // ok! + async_msgr->accept_conn(this); + + reply_bl.append((char*)&reply, sizeof(reply)); + + if (reply.authorizer_len) + reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); + + int next_state; + + if (reply_tag == CEPH_MSGR_TAG_SEQ) { + reply_bl.append((char*)existing_seq, sizeof(existing_seq)); + next_state = STATE_ACCEPTING_WAIT_SEQ; + } else { + next_state = STATE_ACCEPTING_READY; + discard_requeued_up_to(0); + } + + r = _try_send(reply_bl); + if (r < 0) { + goto fail; + } + + if (r == 0) { + state = next_state; + ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl; + } else { + state = STATE_WAIT_SEND; + state_after_send = next_state; + } + + return 0; + + fail: + return -1; +} + +void AsyncConnection::_connect() +{ + ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl; + + state = STATE_CONNECTING; + process(); +} + +void AsyncConnection::accept(int incoming) +{ + ldout(async_msgr->cct, 10) << __func__ << " " << incoming << dendl; + assert(sd < 0); + + sd = incoming; + center->create_event(sd, EVENT_READABLE, new C_handle_read(this)); + process(); +} + +void AsyncConnection::requeue_sent() +{ + if (sent.empty()) + return; + + list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!sent.empty()) { + Message *m = sent.back(); + sent.pop_back(); + ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(m); + out_seq--; + } +} + +void AsyncConnection::discard_requeued_up_to(uint64_t seq) +{ + ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) + return; + list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!rq.empty()) { + Message *m = rq.front(); + if (m->get_seq() == 0 || m->get_seq() > seq) + break; + ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq + << " <= " << seq << ", discarding" << dendl; + m->put(); + rq.pop_front(); + out_seq++; + } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); +} + +/* + * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue + * Must hold pipe_lock prior to calling. + */ +void AsyncConnection::discard_out_queue() +{ + ldout(async_msgr->cct, 10) << __func__ << " " << dendl; + + for (list::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; + (*p)->put(); + } + sent.clear(); + for (map >::iterator p = out_q.begin(); p != out_q.end(); ++p) + for (list::iterator r = p->second.begin(); r != p->second.end(); ++r) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << *r << dendl; + (*r)->put(); + } + out_q.clear(); +} + +int AsyncConnection::randomize_out_seq() +{ + if (get_features() & CEPH_FEATURE_MSG_AUTH) { + // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error + // here. We'll check it on the call. PLR + int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq)); + out_seq &= SEQ_MASK; + lsubdout(async_msgr->cct, ms, 10) << __func__ << "randomize_out_seq " << out_seq << dendl; + return seq_error; + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + return 0; + } +} + +void AsyncConnection::fault() +{ + if (state == STATE_CLOSED) { + ldout(async_msgr->cct, 10) << __func__ << " state is already STATE_CLOSED" << dendl; + return ; + } + + if (policy.lossy && state != STATE_CONNECTING) { + ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl; + in_q->discard_queue(conn_id); + _stop(); + return ; + } + + shutdown_socket(); + center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE); + + // requeue sent items + requeue_sent(); + if (policy.standby && !is_queued()) { + ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; + state = STATE_STANDBY; + return; + } + + //TODO we need to rescheduler connect again!!! + if (state != STATE_CONNECTING) { + if (policy.server) { + ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl; + state = STATE_STANDBY; + } else { + ldout(async_msgr->cct,0) << __func__ << " initiating reconnect" << dendl; + connect_seq++; + state = STATE_CONNECTING; + } + backoff = utime_t(); + ldout(async_msgr->cct,0) << __func__ << dendl; + } else { + if (backoff == utime_t()) { + backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff); + } else { + backoff += backoff; + if (backoff > async_msgr->cct->_conf->ms_max_backoff) + backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); + } + ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl; + // TODO wait!!!!! + ldout(async_msgr->cct, 10) << __func__ << " done waiting or woke up" << dendl; + } +} + +void AsyncConnection::was_session_reset() +{ + ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl; + in_q->discard_queue(conn_id); + discard_out_queue(); + outcoming_bl.clear(); + + get(); + async_msgr->dispatch_queue.queue_remote_reset(this); + + if (randomize_out_seq()) { + lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; + } + + in_seq = 0; + connect_seq = 0; +} + +// Who call "_stop(): +// 1. receive STATE_OPEN_TAG_CLOSE +// 2. fault when policy.lossy +// 3. mark_down +// 4. caller from Messenger +void AsyncConnection::_stop() +{ + ldout(async_msgr->cct, 10) << __func__ << dendl; + get(); + async_msgr->dispatch_queue.queue_reset(this); + shutdown_socket(); + discard_out_queue(); + outcoming_bl.clear(); + state = STATE_CLOSED; +} + +int AsyncConnection::_send(Message *m) +{ + m->set_seq(++out_seq); + if (!policy.lossy) { + // put on sent list + sent.push_back(m); + m->get(); + } + + // associate message with Connection (for benefit of encode_payload) + get(); + m->set_connection(this); + + uint64_t features = get_features(); + if (m->empty_payload()) + ldout(async_msgr->cct, 20) << __func__ << " encoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; + else + ldout(async_msgr->cct, 20) << __func__ << " half-reencoding " << m->get_seq() << " features " + << features << " " << m << " " << *m << dendl; + + // encode and copy out of *m + m->encode(features, !async_msgr->cct->_conf->ms_nocrc); + + // prepare everything + ceph_msg_header& header = m->get_header(); + ceph_msg_footer& footer = m->get_footer(); + + // Now that we have all the crcs calculated, handle the + // digital signature for the message, if the AsyncConnection has session + // security set up. Some session security options do not + // actually calculate and check the signature, but they should + // handle the calls to sign_message and check_signature. PLR + if (session_security.get() == NULL) { + ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl; + } else { + if (session_security->sign_message(m)) { + ldout(async_msgr->cct, 20) << __func__ << " failed to sign seq # " + << header.seq << "): sig = " << footer.sig << dendl; + } else { + ldout(async_msgr->cct, 20) << __func__ << " signed seq # " << header.seq + << "): sig = " << footer.sig << dendl; + } + } + + bufferlist blist = m->get_payload(); + blist.append(m->get_middle()); + blist.append(m->get_data()); + + ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() + << " " << m << dendl; + int rc = write_message(header, footer, blist); + + if (rc < 0) { + ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " + << cpp_strerror(errno) << dendl; + } else if (rc == 0) { + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; + } else { + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl; + } + m->put(); + + return rc; +} + +int AsyncConnection::write_message(ceph_msg_header& header, ceph_msg_footer& footer, + bufferlist& blist) +{ + bufferlist bl; + int ret; + + // send tag + char tag = CEPH_MSGR_TAG_MSG; + bl.append(&tag, sizeof(tag)); + + // send envelope + ceph_msg_header_old oldheader; + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + bl.append((char*)&header, sizeof(header)); + } else { + memcpy(&oldheader, &header, sizeof(header)); + oldheader.src.name = header.src; + oldheader.src.addr = get_peer_addr(); + oldheader.orig_src = oldheader.src; + oldheader.reserved = header.reserved; + oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + bl.append((char*)&oldheader, sizeof(oldheader)); + } + + bl.claim_append(blist); + + // send footer; if receiver doesn't support signatures, use the old footer format + ceph_msg_footer_old old_footer; + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + bl.append((char*)&footer, sizeof(footer)); + } else { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + old_footer.data_crc = footer.data_crc; + old_footer.flags = footer.flags; + bl.append((char*)&old_footer, sizeof(old_footer)); + } + + // send + ret = _try_send(bl); + if (ret < 0) + return ret; + + return ret; +} + +void AsyncConnection::handle_ack(uint64_t seq) +{ + lsubdout(async_msgr->cct, ms, 15) << __func__ << " got ack seq " << seq << dendl; + // trim sent list + while (!sent.empty() && sent.front()->get_seq() <= seq) { + Message *m = sent.front(); + sent.pop_front(); + lsubdout(async_msgr->cct, ms, 10) << __func__ << "reader got ack seq " + << seq << " >= " << m->get_seq() << " on " + << m << " " << *m << dendl; + m->put(); + } +} + +void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) +{ + bufferlist bl; + + utime_t t = ceph_clock_now(async_msgr->cct); + struct ceph_timespec ts; + t.encode_timeval(&ts); + if (ack) { + assert(tp); + tp->encode_timeval(&ts); + bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + bl.append((char*)&ts, sizeof(ts)); + } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct ceph_timespec ts; + t.encode_timeval(&ts); + bl.append(CEPH_MSGR_TAG_KEEPALIVE2); + bl.append((char*)&ts, sizeof(ts)); + } else { + bl.append(CEPH_MSGR_TAG_KEEPALIVE); + } + + try_send(bl, false); +} + +void AsyncConnection::handle_write() +{ + ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; + Mutex::Locker l(lock); + bufferlist bl; + if (in_seq > in_seq_acked) { + ceph_le64 s; + s = in_seq; + bl.append(CEPH_MSGR_TAG_ACK); + bl.append((char*)&s, sizeof(s)); + } + + int r = _try_send(bl); + if (r < 0) + goto fail; + else if (r > 0) + return ; + + while (1) { + Message *m = _get_next_outgoing(); + if (!m) + break; + + assert(m); + r = _send(m); + if (r < 0) + goto fail; + else if (r > 0) + break; + } + + return ; + fail: + fault(); +} diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h new file mode 100644 index 00000000000..b97622613ba --- /dev/null +++ b/src/msg/AsyncConnection.h @@ -0,0 +1,221 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_MSG_ASYNCCONNECTION_H +#define CEPH_MSG_ASYNCCONNECTION_H + +#include +#include +using namespace std; + +#include "common/Mutex.h" +#include "include/buffer.h" + +#include "auth/AuthSessionHandler.h" +#include "include/buffer.h" +#include "Connection.h" +#include "net_handler.h" +#include "Event.h" +#include "Messenger.h" + +class AsyncMessenger; + +class AsyncConnection : public Connection { + const static uint64_t send_threshold = 16 * 1024 * 1024; + + int read_bulk(int fd, char *buf, int len); + int do_sendmsg(struct msghdr &msg, int len, bool more); + // if "send" is false, it will only append bl to send buffer + // the main usage is avoid error happen outside messenger threads + int _try_send(bufferlist bl, bool send=true); + int _send(Message *m); + int read_until(uint64_t needed, bufferptr &p); + int _process_connection(); + void _connect(); + void _stop(); + int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); + int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); + void was_session_reset(); + void fault(); + void discard_out_queue(); + void discard_requeued_up_to(uint64_t seq); + void requeue_sent(); + int randomize_out_seq(); + void handle_ack(uint64_t seq); + void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL); + int write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist); + int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, + bufferlist authorizer_reply) { + bufferlist reply_bl; + reply.tag = tag; + reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; + reply.authorizer_len = authorizer_reply.length(); + reply_bl.append((char*)&reply, sizeof(reply)); + if (reply.authorizer_len) { + reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); + } + int r = try_send(reply_bl); + if (r < 0) + return -1; + + state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + return 0; + } + bool _can_prepare_send() { + if (outcoming_bl.length() > send_threshold) + return false; + return true; + } + bool is_queued() { + return !out_q.empty() || outcoming_bl.length(); + } + void shutdown_socket() { + if (sd >= 0) + ::shutdown(sd, SHUT_RDWR); + } + Message *_get_next_outgoing() { + Message *m = 0; + while (!m && !out_q.empty()) { + map >::reverse_iterator p = out_q.rbegin(); + if (!p->second.empty()) { + m = p->second.front(); + p->second.pop_front(); + } + if (p->second.empty()) + out_q.erase(p->first); + } + return m; + } + + public: + AsyncConnection(CephContext *cct, AsyncMessenger *m); + ~AsyncConnection(); + + ostream& _conn_prefix(std::ostream *_dout); + + bool is_connected() { + Mutex::Locker l(lock); + return state != STATE_CLOSED; + } + + // Only call when AsyncConnection first construct + void connect(const entity_addr_t& addr, int type) { + set_peer_type(type); + set_peer_addr(addr); + policy = msgr->get_policy(type); + _connect(); + } + // Only call when AsyncConnection first construct + void accept(int sd); + int send_message(Message *m) { + Mutex::Locker l(lock); + out_q[m->get_priority()].push_back(m); + return 0; + } + + void send_keepalive() { + Mutex::Locker l(lock); + _send_keepalive_or_ack(); + } + void mark_down() { + Mutex::Locker l(lock); + _stop(); + } + void mark_disposable() { + Mutex::Locker l(lock); + policy.lossy = true; + } + + int try_send(bufferlist bl, bool send=true) { + Mutex::Locker l(lock); + return _try_send(bl, send); + } + + void handle_write(); + void process(); + + private: + enum { + STATE_NONE, + STATE_OPEN, + STATE_OPEN_KEEPALIVE2, + STATE_OPEN_KEEPALIVE2_ACK, + STATE_OPEN_TAG_ACK, + STATE_OPEN_MESSAGE_HEADER, + STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, + STATE_OPEN_MESSAGE_THROTTLE_BYTES, + STATE_OPEN_MESSAGE_THROTTLE_DISPATCH, + STATE_OPEN_MESSAGE_READ_FRONT, + STATE_OPEN_MESSAGE_READ_MIDDLE, + STATE_OPEN_MESSAGE_READ_DATA_PREPARE, + STATE_OPEN_MESSAGE_READ_DATA, + STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH, + STATE_OPEN_TAG_CLOSE, + STATE_WAIT_SEND, + STATE_CONNECTING, + STATE_CONNECTING_WAIT_BANNER, + STATE_CONNECTING_WAIT_IDENTIFY_PEER, + STATE_CONNECTING_SEND_CONNECT_MSG, + STATE_CONNECTING_WAIT_CONNECT_REPLY, + STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH, + STATE_CONNECTING_WAIT_ACK_SEQ, + STATE_CONNECTING_READY, + STATE_ACCEPTING, + STATE_ACCEPTING_HANDLE_CONNECT, + STATE_ACCEPTING_WAIT_BANNER_ADDR, + STATE_ACCEPTING_WAIT_CONNECT_MSG, + STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, + STATE_ACCEPTING_WAIT_SEQ, + STATE_ACCEPTING_READY, + STATE_STANDBY, + STATE_CLOSED, + STATE_WAIT, // just wait for racing connection + STATE_FAULT + }; + + CephContext *cc; + AsyncMessenger *async_msgr; + int global_seq; + __u32 connect_seq, peer_global_seq; + uint64_t out_seq; + uint64_t in_seq, in_seq_acked; + int state; + int state_after_send; + int sd; + int port; + uint64_t conn_id; + Messenger::Policy policy; + map > out_q; // priority queue for outbound msgs + DispatchQueue *in_q; + list sent; + Mutex lock; + utime_t backoff; // backoff time + + // Tis section are temp variables used by state transition + + // Open state + utime_t recv_stamp; + utime_t throttle_stamp; + uint64_t msg_left; + ceph_msg_header current_header; + bufferlist::iterator data_blp; + bufferlist front, middle, data; + ceph_msg_connect connect_msg; + ceph_msg_connect_reply connect_reply; + // Connecting state + bool got_bad_auth; + AuthAuthorizer *authorizer; + // Accepting state + entity_addr_t socket_addr; + CryptoKey session_key; + + // used only for local state, it will be overwrite when state transition + bufferptr state_buffer; + // used only by "read_until" + uint64_t state_offset; + bufferlist outcoming_bl; + NetHandler net; + EventCenter *center; + ceph::shared_ptr session_security; +}; /* AsyncConnection */ + +#endif diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc new file mode 100644 index 00000000000..7bc8e4cba36 --- /dev/null +++ b/src/msg/AsyncMessenger.cc @@ -0,0 +1,635 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include + + +#include "AsyncMessenger.h" + +#include "common/config.h" +#include "common/Timer.h" +#include "common/errno.h" +#include "auth/Crypto.h" +#include "include/Spinlock.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { + return *_dout << "-- " << m->get_myaddr() << " "; +} + +static ostream& _prefix(std::ostream *_dout, Processor *p) { + return *_dout << "-- "; +} + +/******************* + * EventCallBack + */ + +class C_handle_accept : public EventCallback { + Processor *p; + + public: + C_handle_accept(Processor *p): p(p) {} + void do_request(int fd, int mask) { + p->accept(); + } +}; + + +/******************* + * Processor + */ + +int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) +{ + const md_config_t *conf = msgr->cct->_conf; + // bind to a socket + ldout(msgr->cct, 10) << __func__ << dendl; + + int family, flags; + switch (bind_addr.get_family()) { + case AF_INET: + case AF_INET6: + family = bind_addr.get_family(); + break; + + default: + // bind_addr is empty + family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; + } + + /* socket creation */ + listen_sd = ::socket(family, SOCK_STREAM, 0); + if (listen_sd < 0) { + lderr(msgr->cct) << __func__ << " unable to create socket: " + << cpp_strerror(errno) << dendl; + return -errno; + } + + // use whatever user specified (if anything) + entity_addr_t listen_addr = bind_addr; + listen_addr.set_family(family); + + /* bind to port */ + int rc = -1; + if (listen_addr.get_port()) { + // specific port + + // reuse addr+port when possible + int on = 1; + rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to setsockopt: " + << cpp_strerror(errno) << dendl; + return -errno; + } + + rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size()); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr.ss_addr() + << ": " << cpp_strerror(errno) << dendl; + return -errno; + } + } else { + // try a range of ports + for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) { + if (avoid_ports.count(port)) + continue; + listen_addr.set_port(port); + rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size()); + if (rc == 0) + break; + } + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr.ss_addr() + << " on any port in range " << msgr->cct->_conf->ms_bind_port_min + << "-" << msgr->cct->_conf->ms_bind_port_max + << ": " << cpp_strerror(errno) << dendl; + return -errno; + } + ldout(msgr->cct,10) << __func__ << " bound on random port " << listen_addr << dendl; + } + + // what port did we get? + socklen_t llen = sizeof(listen_addr.ss_addr()); + rc = getsockname(listen_sd, (sockaddr*)&listen_addr.ss_addr(), &llen); + if (rc < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl; + return rc; + } + + ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; + + // listen! + rc = ::listen(listen_sd, 128); + if (rc < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr + << ": " << cpp_strerror(rc) << dendl; + return rc; + } + + if ((flags = fcntl(listen_sd, F_GETFL, 0)) < 0 || + fcntl(listen_sd, F_SETFL, flags | O_NONBLOCK) < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " unable to setnonblock on " << listen_addr + << ": " << cpp_strerror(rc) << dendl; + return rc; + } + + msgr->set_myaddr(bind_addr); + if (bind_addr != entity_addr_t()) + msgr->learned_addr(bind_addr); + + if (msgr->get_myaddr().get_port() == 0) { + msgr->set_myaddr(listen_addr); + } + entity_addr_t addr = msgr->get_myaddr(); + addr.nonce = nonce; + msgr->set_myaddr(addr); + + msgr->init_local_connection(); + + ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl; + return 0; +} + +int Processor::rebind(const set& avoid_ports) +{ + ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl; + + entity_addr_t addr = msgr->get_myaddr(); + set new_avoid = avoid_ports; + new_avoid.insert(addr.get_port()); + addr.set_port(0); + + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + msgr->my_inst.addr.nonce = nonce; + ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; + + ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl; + int r = bind(addr, new_avoid); + if (r == 0) + start(); + return r; +} + +int Processor::start() +{ + ldout(msgr->cct, 1) << __func__ << " start" << dendl; + + // start thread + create(); + center->create_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); + + return 0; +} + +void Processor::accept() +{ + ldout(msgr->cct, 10) << __func__ << " starting" << dendl; + // accept + entity_addr_t addr; + socklen_t slen = sizeof(addr.ss_addr()); + int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); + if (sd >= 0) { + ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; + + msgr->add_accept(sd); + } else { + ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } +} + +void *Processor::entry() +{ + ldout(msgr->cct, 10) << __func__ << " starting" << dendl; + int r; + + while (!done) { + ldout(msgr->cct,20) << __func__ << " calling poll" << dendl; + + r = center->process_events(500); + if (r < 0) { + ldout(msgr->cct,20) << __func__ << " process events failed: " + << cpp_strerror(errno) << dendl; + // TODO do something? + } + } + + ldout(msgr->cct,20) << __func__ << " closing" << dendl; + // don't close socket, in case we start up again? blech. + if (listen_sd >= 0) { + ::close(listen_sd); + listen_sd = -1; + } + ldout(msgr->cct,10) << __func__ << " stopping" << dendl; + return 0; +} + +void Processor::stop() +{ + done = true; + ldout(msgr->cct, 10) << __func__ << " processor" << dendl; + + center->delete_event(listen_sd, EVENT_READABLE); + if (listen_sd >= 0) { + ::shutdown(listen_sd, SHUT_RDWR); + } + + // wait for thread to stop before closing the socket, to avoid + // racing against fd re-use. + if (is_started()) { + join(); + } + + if (listen_sd >= 0) { + ::close(listen_sd); + listen_sd = -1; + } + done = false; +} + + +/******************* + * AsyncMessenger + */ + +AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce) + : SimplePolicyMessenger(cct, name,mname, _nonce), + processor(this, _nonce, ¢er), + nonce(_nonce), + lock("AsyncMessenger::lock"), + center(cct), did_bind(false), + global_seq(0), + cluster_protocol(0), + local_connection(new AsyncConnection(cct, this)), + dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname, + cct->_conf->ms_dispatch_throttle_bytes), + dispatch_queue(cct, this) +{ + ceph_spin_init(&global_seq_lock); + _init_local_connection(); +} + +/** + * Destroy the AsyncMessenger. Pretty simple since all the work is done + * elsewhere. + */ +AsyncMessenger::~AsyncMessenger() +{ + assert(!did_bind); // either we didn't bind or we shut down the Processor +} + +void AsyncMessenger::ready() +{ + ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + dispatch_queue.start(); + + lock.Lock(); + if (did_bind) + processor.start(); + lock.Unlock(); +} + +int AsyncMessenger::shutdown() +{ + ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl; + mark_down_all(); + dispatch_queue.shutdown(); + + // break ref cycles on the loopback connection + local_connection->set_priv(NULL); + return 0; +} + + +int AsyncMessenger::bind(const entity_addr_t &bind_addr) +{ + lock.Lock(); + if (started) { + ldout(cct,10) << __func__ << " already started" << dendl; + lock.Unlock(); + return -1; + } + ldout(cct,10) << __func__ << " bind " << bind_addr << dendl; + lock.Unlock(); + + // bind to a socket + set avoid_ports; + int r = processor.bind(bind_addr, avoid_ports); + if (r >= 0) + did_bind = true; + return r; +} + +int AsyncMessenger::rebind(const set& avoid_ports) +{ + ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; + assert(did_bind); + processor.stop(); + mark_down_all(); + return processor.rebind(avoid_ports); +} + +int AsyncMessenger::start() +{ + lock.Lock(); + ldout(cct,1) << __func__ << " start" << dendl; + + // register at least one entity, first! + assert(my_inst.name.type() >= 0); + + assert(!started); + started = true; + + if (!did_bind) { + my_inst.addr.nonce = nonce; + _init_local_connection(); + } + + lock.Unlock(); + + // FIXME + center.init(5000); + return 0; +} + +void AsyncMessenger::wait() +{ + lock.Lock(); + if (!started) { + lock.Unlock(); + return; + } + lock.Unlock(); + + if(dispatch_queue.is_started()) { + ldout(cct,10) << __func__ << ": waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + ldout(cct,10) << __func__ << ": dispatch queue is stopped" << dendl; + } + + // done! clean up. + if (did_bind) { + ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; + processor.stop(); + did_bind = false; + ldout(cct,20) << __func__ << ": stopped processor thread" << dendl; + } + + // close all pipes + lock.Lock(); + { + ldout(cct, 10) << __func__ << ": closing pipes" << dendl; + + while (!conns.empty()) { + AsyncConnection *p = conns.begin()->second; + _stop_conn(p); + } + } + lock.Unlock(); + + ldout(cct, 10) << __func__ << ": done." << dendl; + ldout(cct, 1) << __func__ << " complete." << dendl; + started = false; +} + +AsyncConnection *AsyncMessenger::add_accept(int sd) +{ + lock.Lock(); + AsyncConnection *conn = new AsyncConnection(cct, this); + conn->accept(sd); + accepting_conns.insert(conn); + lock.Unlock(); + return conn; +} + +AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int type) +{ + assert(lock.is_locked()); + assert(addr != my_inst.addr); + + ldout(cct, 10) << __func__ << " " << addr + << ", creating connection and registering" << dendl; + + // create connection + AsyncConnection *conn = new AsyncConnection(cct, this); + conn->connect(addr, type); + assert(!conns.count(addr)); + conns[addr] = conn; + + return conn; +} + +ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) +{ + Mutex::Locker l(lock); + if (my_inst.addr == dest.addr) { + // local + return local_connection; + } + + AsyncConnection *conn = _lookup_conn(dest.addr); + if (conn) { + ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl; + } else { + conn = create_connect(dest.addr, dest.name.type()); + ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl; + } + + return conn; +} + +ConnectionRef AsyncMessenger::get_loopback_connection() +{ + return local_connection; +} + +int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest) +{ + // set envelope + m->get_header().src = get_myname(); + + if (!m->get_priority()) + m->set_priority(get_default_send_priority()); + + ldout(cct, 1) << __func__ << "--> " << dest.name << " " + << dest.addr << " -- " << *m << " -- ?+" + << m->get_data().length() << " " << m << dendl; + + if (dest.addr == entity_addr_t()) { + ldout(cct,0) << __func__ << " message " << *m + << " with empty dest " << dest.addr << dendl; + m->put(); + return -EINVAL; + } + + AsyncConnection *conn = _lookup_conn(dest.addr); + submit_message(m, conn, dest.addr, dest.name.type()); + return 0; +} + +void AsyncMessenger::submit_message(Message *m, AsyncConnection *con, + const entity_addr_t& dest_addr, int dest_type) +{ + if (cct->_conf->ms_dump_on_send) { + m->encode(-1, true); + ldout(cct, 0) << __func__ << "submit_message " << *m << "\n"; + m->get_payload().hexdump(*_dout); + if (m->get_data().length() > 0) { + *_dout << " data:\n"; + m->get_data().hexdump(*_dout); + } + *_dout << dendl; + m->clear_payload(); + } + + // existing connection? + if (con) { + con->send_message(m); + return ; + } + + // local? + if (my_inst.addr == dest_addr) { + // local + ldout(cct, 20) << __func__ << " " << *m << " local" << dendl; + dispatch_queue.local_delivery(m, m->get_priority()); + return; + } + + // remote, no existing pipe. + const Policy& policy = get_policy(dest_type); + if (policy.server) { + ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr + << ", lossy server for target type " + << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl; + m->put(); + } else { + ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new pipe." << dendl; + } +} + +/** + * If my_inst.addr doesn't have an IP set, this function + * will fill it in from the passed addr. Otherwise it does nothing and returns. + */ +void AsyncMessenger::set_addr_unknowns(entity_addr_t &addr) +{ + Mutex::Locker l(lock); + if (my_inst.addr.is_blank_ip()) { + int port = my_inst.addr.get_port(); + my_inst.addr.addr = addr.addr; + my_inst.addr.set_port(port); + _init_local_connection(); + } +} + +int AsyncMessenger::send_keepalive(Connection *con) +{ + con->send_keepalive(); + return 0; +} + +void AsyncMessenger::mark_down_all() +{ + ldout(cct,1) << __func__ << " " << dendl; + lock.Lock(); + for (set::iterator q = accepting_conns.begin(); + q != accepting_conns.end(); ++q) { + AsyncConnection *p = *q; + ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl; + p->mark_down(); + p->get(); + dispatch_queue.queue_reset(p); + } + accepting_conns.clear(); + + while (!conns.empty()) { + ceph::unordered_map::iterator it = conns.begin(); + AsyncConnection *p = it->second; + ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl; + conns.erase(it); + p->mark_down(); + p->get(); + dispatch_queue.queue_reset(p); + } + lock.Unlock(); +} + +void AsyncMessenger::mark_down(const entity_addr_t& addr) +{ + lock.Lock(); + AsyncConnection *p = _lookup_conn(addr); + if (p) { + ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; + _stop_conn(p); + p->get(); + dispatch_queue.queue_reset(p); + } else { + ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +int AsyncMessenger::get_proto_version(int peer_type, bool connect) +{ + int my_type = my_inst.name.type(); + + // set reply protocol version + if (peer_type == my_type) { + // internal + return cluster_protocol; + } else { + // public + if (connect) { + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } else { + switch (my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } + } + return 0; +} + +void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_inst.addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + lock.Lock(); + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.addr = t.addr; + ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl; + _init_local_connection(); + lock.Unlock(); +} + +void AsyncMessenger::dispatch_throttle_release(uint64_t msize) { + if (msize) { + ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler " + << dispatch_throttler.get_current() << "/" + << dispatch_throttler.get_max() << dendl; + dispatch_throttler.put(msize); + } +} diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h new file mode 100644 index 00000000000..55a21eecf42 --- /dev/null +++ b/src/msg/AsyncMessenger.h @@ -0,0 +1,391 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_ASYNCMESSENGER_H +#define CEPH_ASYNCMESSENGER_H + +#include "include/types.h" +#include "include/xlist.h" + +#include +#include +using namespace std; +#include "include/unordered_map.h" +#include "include/unordered_set.h" + +#include "common/Mutex.h" +#include "include/atomic.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "common/Throttle.h" + +#include "SimplePolicyMessenger.h" +#include "include/assert.h" +#include "DispatchQueue.h" +#include "AsyncConnection.h" +#include "Event.h" + + +class AsyncMessenger; + +/** + * If the Messenger binds to a specific address, the Processor runs + * and listens for incoming connections. + */ +class Processor : public Thread { + AsyncMessenger *msgr; + bool done; + int listen_sd; + uint64_t nonce; + EventCenter *center; + + public: + Processor(AsyncMessenger *r, uint64_t n, EventCenter *c) : msgr(r), done(false), listen_sd(-1), nonce(n), center(c) {} + + void *entry(); + void stop(); + int bind(const entity_addr_t &bind_addr, const set& avoid_ports); + int rebind(const set& avoid_port); + int start(); + void accept(); +}; + + +/* + * This class handles transmission and reception of messages. Generally + * speaking, there are several major components: + * + * - Connection + * Each logical session is associated with a Connection. + * - AsyncConnection + * Each network connection is handled through a AsyncConnection, which handles + * the input and output of each message. There is normally a 1:1 + * relationship between AsyncConnection and Connection, but logical sessions may + * get handed off between AsyncConnection when sockets reconnect or during + * connection races. + * - IncomingQueue + * Incoming messages are associated with an IncomingQueue, and there + * is one such queue associated with each AsyncConnection. + * - DispatchQueue + * IncomingQueues get queued in the DispatchQueue, which is responsible + * for doing a round-robin sweep and processing them via a worker thread. + * - AsyncMessenger + * It's the exterior class passed to the external message handler and + * most of the API details. + * + * Lock ordering: + * + * AsyncMessenger::lock + * Pipe::pipe_lock + * DispatchQueue::lock + * IncomingQueue::lock + */ + +class AsyncMessenger : public SimplePolicyMessenger { + // First we have the public Messenger interface implementation... +public: + /** + * Initialize the AsyncMessenger! + * + * @param cct The CephContext to use + * @param name The name to assign ourselves + * _nonce A unique ID to use for this AsyncMessenger. It should not + * be a value that will be repeated if the daemon restarts. + */ + AsyncMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce); + + /** + * Destroy the AsyncMessenger. Pretty simple since all the work is done + * elsewhere. + */ + virtual ~AsyncMessenger(); + + /** @defgroup Accessors + * @{ + */ + void set_addr_unknowns(entity_addr_t& addr); + + int get_dispatch_queue_len() { + return dispatch_queue.get_queue_len(); + } + + double get_dispatch_queue_max_age(utime_t now) { + return dispatch_queue.get_max_age(now); + } + /** @} Accessors */ + + /** + * @defgroup Configuration functions + * @{ + */ + void set_cluster_protocol(int p) { + assert(!started && !did_bind); + cluster_protocol = p; + } + + int bind(const entity_addr_t& bind_addr); + int rebind(const set& avoid_ports); + + /** @} Configuration functions */ + + /** + * @defgroup Startup/Shutdown + * @{ + */ + virtual int start(); + virtual void wait(); + virtual int shutdown(); + + /** @} // Startup/Shutdown */ + + /** + * @defgroup Messaging + * @{ + */ + virtual int send_message(Message *m, const entity_inst_t& dest) { + return _send_message(m, dest); + } + + /** @} // Messaging */ + + /** + * @defgroup Connection Management + * @{ + */ + virtual ConnectionRef get_connection(const entity_inst_t& dest); + virtual ConnectionRef get_loopback_connection(); + int send_keepalive(Connection *con); + virtual void mark_down(const entity_addr_t& addr); + virtual void mark_down_all(); + /** @} // Connection Management */ + + /** + * @defgroup Inner classes + * @{ + */ + + Connection *create_anon_connection() { + return new AsyncConnection(cct, this); + } + + /** + * @} // Inner classes + */ + +protected: + /** + * @defgroup Messenger Interfaces + * @{ + */ + /** + * Start up the DispatchQueue thread once we have somebody to dispatch to. + */ + virtual void ready(); + /** @} // Messenger Interfaces */ + +private: + + /** + * @defgroup Utility functions + * @{ + */ + + /** + * Create a connection associated with the given entity (of the given type). + * Initiate the connection. (This function returning does not guarantee + * connection success.) + * + * @param addr The address of the entity to connect to. + * @param type The peer type of the entity at the address. + * @param con An existing Connection to associate with the new connection. If + * NULL, it creates a new Connection. + * @param msg an initial message to queue on the new connection + * + * @return a pointer to the newly-created connection. Caller does not own a + * reference; take one if you need it. + */ + AsyncConnection *create_connect(const entity_addr_t& addr, int type); + + /** + * Queue up a Message for delivery to the entity specified + * by addr and dest_type. + * submit_message() is responsible for creating + * new AsyncConnection (and closing old ones) as necessary. + * + * @param m The Message to queue up. This function eats a reference. + * @param con The existing Connection to use, or NULL if you don't know of one. + * @param addr The address to send the Message to. + * @param dest_type The peer type of the address we're sending to + * just drop silently under failure. + */ + void submit_message(Message *m, AsyncConnection *con, + const entity_addr_t& dest_addr, int dest_type); + + int _send_message(Message *m, const entity_inst_t& dest); + + private: + Processor processor; + friend class Processor; + + /// overall lock used for AsyncMessenger data structures + Mutex lock; + // AsyncMessenger stuff + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + + /** + * The following aren't lock-protected since you shouldn't be able to race + * the only writers. + */ + + int listen_sd; + /** + * false; set to true if the AsyncMessenger bound to a specific address; + * and set false again by Accepter::stop(). + */ + bool did_bind; + /// counter for the global seq our connection protocol uses + __u32 global_seq; + /// lock to protect the global_seq + ceph_spinlock_t global_seq_lock; + + /** + * hash map of addresses to Asyncconnection + * + * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered + * invalid and can be replaced by anyone holding the msgr lock + */ + ceph::unordered_map conns; + + /** + * list of connection are in teh process of accepting + * + * These are not yet in the conns map. + */ + set accepting_conns; + + /// internal cluster protocol version, if any, for talking to entities of the same type. + int cluster_protocol; + + AsyncConnection *_lookup_conn(const entity_addr_t& k) { + assert(lock.is_locked()); + ceph::unordered_map::iterator p = conns.find(k); + if (p == conns.end()) + return NULL; + if (!p->second->is_connected()) { + // FIXME + p->second->put(); + return NULL; + } + return p->second; + } + + void *_stop_conn(AsyncConnection *c) { + assert(lock.is_locked()); + if (c) { + c->mark_down(); + conns.erase(c->peer_addr); + } + } + + void _init_local_connection() { + assert(lock.is_locked()); + local_connection->peer_addr = my_inst.addr; + local_connection->peer_type = my_inst.name.type(); + ms_deliver_handle_fast_connect(local_connection.get()); + } + + +public: + + /// con used for sending messages to ourselves + ConnectionRef local_connection; + + /// Throttle preventing us from building up a big backlog waiting for dispatch + Throttle dispatch_throttler; + + EventCenter center; + DispatchQueue dispatch_queue; + + /** + * @defgroup AsyncMessenger internals + * @{ + */ + /** + * This wraps _lookup_conn. + */ + AsyncConnection *lookup_conn(const entity_addr_t& k) { + Mutex::Locker l(lock); + return _lookup_conn(k); + } + + void accept_conn(AsyncConnection *conn) { + Mutex::Locker l(lock); + if (conns.count(conn->get_peer_addr())) + delete conns[conn->get_peer_addr()]; + conns[conn->peer_addr] = conn; + accepting_conns.erase(conn); + } + + void learned_addr(const entity_addr_t &peer_addr_for_me); + AsyncConnection *add_accept(int sd); + + /** + * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection. + */ + AuthAuthorizer *get_authorizer(int peer_type, bool force_new) { + return ms_deliver_get_authorizer(peer_type, force_new); + } + + /** + * This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection. + */ + bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply, + bool& isvalid, CryptoKey& session_key) { + return ms_deliver_verify_authorizer(con, peer_type, protocol, auth, + auth_reply, isvalid, session_key); + } + /** + * Increment the global sequence for this AsyncMessenger and return it. + * This is for the connect protocol, although it doesn't hurt if somebody + * else calls it. + * + * @return a global sequence ID that nobody else has seen. + */ + __u32 get_global_seq(__u32 old=0) { + ceph_spin_lock(&global_seq_lock); + if (old > global_seq) + global_seq = old; + __u32 ret = ++global_seq; + ceph_spin_unlock(&global_seq_lock); + return ret; + } + /** + * Get the protocol version we support for the given peer type: either + * a peer protocol (if it matches our own), the protocol version for the + * peer (if we're connecting), or our protocol version (if we're accepting). + */ + int get_proto_version(int peer_type, bool connect); + + /** + * Fill in the address and peer type for the local connection, which + * is used for delivering messages back to ourself. + */ + void init_local_connection() { + Mutex::Locker l(lock); + _init_local_connection(); + } + + /** + * Release memory accounting back to the dispatch throttler. + * + * @param msize The amount of memory to release. + */ + void dispatch_throttle_release(uint64_t msize); + + /** + * @} // AsyncMessenger Internals + */ +} ; + +#endif /* CEPH_SIMPLEMESSENGER_H */ diff --git a/src/msg/Event.cc b/src/msg/Event.cc new file mode 100644 index 00000000000..d7979712a0a --- /dev/null +++ b/src/msg/Event.cc @@ -0,0 +1,116 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "common/errno.h" + +#ifdef HAVE_EPOLL +#include "EventEpoll.h"" +#else +#ifdef HAVE_KQUEUE +#include "EventKqueue.h" +#else +#include "EventSelect.h" +#endif +#endif + +int EventCenter::init(int n) +{ + // can't init multi times + assert(nevent == 0); +#ifdef HAVE_EPOLL + driver = new EpollDriver(cct); +#else +#ifdef HAVE_KQUEUE + driver = new KqueueDriver(cct); +#else + driver = new SelectDriver(cct); +#endif + + if (!driver) { + lderr(cct) << __func__ << " failed to create event driver " << dendl; + return -1; + } + + int r = driver->init(n); + if (r < 0) { + lderr(cct) << __func__ << " failed to init event driver." << dendl; + return r; + } + + nevent = n; + event_tp.start(); + return 0; +} + +EventCenter::~EventCenter(); +{ + if (driver) + delete driver; +} + +int EventCenter::create_event(int fd, int mask, EventCallback *ctxt) +{ + Mutex::Locker l(lock); + if (events.size() > center->nevent) { + lderr(cct) << __func__ << " event count is exceed." << dendl; + return -ERANGE; + } + + int r = driver->add_event(fd, mask); + if (r < 0) + return r; + + if (events.find(fd) == events.end()) { + events[fd] = EventCenter::Event(); + } + + EventCenter::Event *event = &events[fd]; + + event->mask |= mask; + if (mask & EVENT_READABLE) + event->read_cb = ctxt; + if (mask & EVENT_WRITABLE) + event->write_cb = ctxt; + return 0; +} + +void delete_event(int fd, int mask) +{ + Mutex::Locker l(lock); + if (event->mask == EVENT_NONE) + return; + + driver->del_event(fd, mask); + struct event *event = &events[fd]; + + if (mask & EVENT_READABLE) + delete event->read_cb; + if (mask & EVENT_WRITABLE) + delete event->write_cb; + + event->mask = event->mask & (~mask); + if (event->mask == EVENT_NONE) + events.erase(fd); +} + +int process_events(int timeout_millionseconds) +{ + struct timeval tv; + int j, processed, numevents, mask, fd, rfired; + + if (timeout_millionseconds > 0) { + tv.tv_sec = timeout_millionseconds / 1000; + tv.tv_usec = (timeout_millionseconds % 1000) * 1000; + } + else { + tv.tv_sec = 0; + tv.tv_usec = 0; + } + + processed = 0; + vector fired_events; + numevents = driver->event_wait(fired_events, &tv); + for (j = 0; j < numevents; j++) + event_wq.queue(fired_events[i]); + + return numevents; +} diff --git a/src/msg/Event.h b/src/msg/Event.h new file mode 100644 index 00000000000..ac58f0b4638 --- /dev/null +++ b/src/msg/Event.h @@ -0,0 +1,162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_MSG_EVENT_H +#define CEPH_MSG_EVENT_H + +#ifdef __APPLE__ +#include +#endif + +// We use epoll, kqueue, evport, select in descending order by performance. +#ifdef __linux__ +#define HAVE_EPOLL 1 +#endif + +#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) +#define HAVE_KQUEUE 1 +#endif + +#ifdef __sun +#include +#ifdef _DTRACE_VERSION +#define HAVE_EVPORT 1 +#endif +#endif + +#include "include/Context.h" +#include "common/WorkQueue.h" + +#define EVENT_NONE 0 +#define EVENT_READABLE 1 +#define EVENT_WRITABLE 2 + +class EventCenter; + +// Attention: +// This event library use file description as index to search correspond event +// in `events` and `fired_events`. So it's important to estimate a suitable +// capacity in calling eventcenterInit(capacity). + +struct FiredEvent { + int mask; + int fd; + + FiredEvent(): mask(0), fd(0) {} +}; + +class EventDriver { + public: + virtual ~EventDriver() {} // we want a virtual destructor!!! + virtual int init(int nevent) = 0; + virtual int add_event(int fd, int mask) = 0; + virtual void delete_event(int fd, int del_mask) = 0; + virtual int event_wait(FiredEvent &fired_events, struct timeval *tp) = 0; +}; + +class EventCallback { + + public: + virtual void do_request(int fd, int mask) = 0; + virtual ~EventCallback() {} // we want a virtual destructor!!! +}; + +class EventCenter { + struct Event { + int mask; + EventCallback *read_cb; + EventCallback *write_cb; + Event(): mask(0), read_cb(NULL), write_cb(NULL) {} + }; + + Mutex lock; + map events; + EventDriver *driver; + CephContext *cct; + int nevent; + ThreadPool event_tp; + + Event *get_event(int fd) { + Mutex::Locker l(lock); + map::iterator it = events.find(fd); + if (it != events.end()) { + return &it->second; + } + + return NULL; + } + struct EventWQ : public ThreadPool::WorkQueueVal { + EventCenter *center; + // In order to ensure the file descriptor is unique in conn_queue, + // pending is introduced to check + // + // + deque conn_queue; + // + map pending; + + EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueueVal("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {} + + void _enqueue(FiredEvent e) { + // Ensure only one thread process one file descriptor + map::iterator it = pending.find(e.fd); + if (it != pending.end()) + it->second |= e.mask; + else + pending[e.fd] = e.mask; + } + void _enqueue_front(FiredEvent e) { + assert(0); + } + void _dequeue(FiredEvent c) { + assert(0); + } + bool _empty() { + return conn_queue.empty(); + } + FiredEvent _dequeue() { + assert(!conn_queue.empty()); + FiredEvent e; + e.fd = conn_queue.front(); + conn_queue.pop_front(); + assert(pending.count(e.fd)); + e.mask = pending[e.fd]; + pending.erase(e.fd); + return e; + } + void _process(FiredEvent e, ThreadPool::TPHandle &handle) { + int rfired = 0; + Event *event = center->get_event(e.fd); + if (!event) + return ; + + /* note the event->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (event->mask & e.mask & EVENT_READABLE) { + rfired = 1; + event->read_cb->do_request(e.fd, e.mask); + } + if (event->mask & e.mask & EVENT_WRITABLE) { + if (!rfired || event->read_cb != event->write_cb) + event->write_cb->do_request(e.fd, e.mask); + } + } + void _clear() { + assert(conn_queue.empty()); + } + } event_wq; + + public: + EventCenter(CephContext *c): + lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), + event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"), + event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {} + ~EventCenter(); + int init(int nevent); + int create_event(int fd, int mask, EventCallback *ctxt); + int delete_event(int fd, int mask); + int process_events(int timeout_milliseconds); +}; + +#endif diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc new file mode 100644 index 00000000000..ef6825bc68e --- /dev/null +++ b/src/msg/EventEpoll.cc @@ -0,0 +1,118 @@ +#include "EventEpoll.h" + +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "EpollDriver." + +int EpollDriver::init(int nevent) +{ + events = malloc(sizeof(struct epoll_event)*nevent); + if (!events) { + lderr(cct) << __func__ << " unable to malloc memory: " + << cpp_strerror(errno) << dendl; + return -error; + } + memset(events, 0, sizeof(struct epoll_event)*nevent); + + epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */ + if (epfd == -1) { + lderr(cct) << __func__ << " unable to do epoll_create: " + << cpp_strerror(errno) << dendl; + return -error; + } + + return 0; +} + +int EpollDriver::add_event(int fd, int mask) +{ + struct epoll_event ee; + /* If the fd was already monitored for some event, we need a MOD + * operation. Otherwise we need an ADD operation. */ + int op, pos; + map::iterator it = fds.find(fd); + if (it == fds.end()) { + op = EPOLL_CTL_ADD; + if (deleted_fds.length()) { + pos = deleted.fds.front(); + deleted.fds.pop_front(); + } else { + fds[fd] = pos = next_pos; + next_pos++; + } + } else { + pos = it->second; + op = events[pos].mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + } + + ee.events = 0; + mask |= events[pos].mask; /* Merge old events */ + if (mask & EVENT_READABLE) + ee.events |= EPOLLIN; + if (mask & EVENT_WRITABLE) + ee.events |= EPOLLOUT; + ee.data.u64 = 0; /* avoid valgrind warning */ + ee.data.fd = fd; + if (epoll_ctl(epfd, op, fd, &ee) == -1) { + lderr(cct) << __func__ << " unable to add event: " + << cpp_strerror(errno) << dendl; + return -error; + } + return 0; +} + +void EpollDriver::del_event(int fd, int cur_mask, int delmask) +{ + struct epoll_event ee; + map::iterator it = fds.find(fd); + if (it == fds.end()) + return 0; + + int mask = cur_mask & (~delmask); + + ee.events = 0; + if (mask & EVENT_READABLE) ee.events |= EPOLLIN; + if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT; + ee.data.u64 = 0; /* avoid valgrind warning */ + ee.data.fd = fd; + if (mask != EVENT_NONE) { + epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee); + } else { + /* Note, Kernel < 2.6.9 requires a non null event pointer even for + * EPOLL_CTL_DEL. */ + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee); + + if (next_pos == it->second) + next_pos--; + else + deleted_fds.push_back(it->second); + fds.erase(fd); + } +} + +int EpollDriver::event_wait(FiredEvent &fired_events, struct timeval *tvp) +{ + int retval, numevents = 0; + + retval = epoll_wait(epfd, events, next_pos, + tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); + if (retval > 0) { + int j; + + numevents = retval; + fired_events.resize(numevents); + for (j = 0; j < numevents; j++) { + int mask = 0; + struct epoll_event *e = events + j; + + if (e->events & EPOLLIN) mask |= EVENT_READABLE; + if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE; + if (e->events & EPOLLERR) mask |= EVENT_WRITABLE; + if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE; + fired_events[j].fd = e->data.fd; + fired_events[j].mask = mask; + } + } + return numevents; +} diff --git a/src/msg/EventEpoll.h b/src/msg/EventEpoll.h new file mode 100644 index 00000000000..4513113c866 --- /dev/null +++ b/src/msg/EventEpoll.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_MSG_EVENTEPOLL_H +#define CEPH_MSG_EVENTEPOLL_H + +#include +#include + +#include "Event.h" + +class EpollDriver : public EventDriver { + int epfd; + // map "fd" to the pos of "events" + map fds; + // used to store the deleted position + list deleted_fds; + int next_pos; + struct epoll_event *events; + CephContext *cct; + + public: + EpollDriver(CephContext *c): epfd(-1), next_pos(0), events(NULL), cct(c) {} + virtual ~EpollDriver() { + if (epfd != -1) + close(epfd); + + if (events) + free(events); + } + + int init(int nevent); + int add_event(int fd, int mask); + void del_event(int fd, int cur_mask, int del_mask); + int event_wait(FiredEvent &fired_events, struct timeval *tp); +}; + +#endif diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index 189ecfa9661..ebf698388b4 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -8,8 +8,16 @@ noinst_HEADERS += \ msg/Dispatcher.h \ msg/Message.h \ msg/Messenger.h \ + msg/Pipe.h \ + msg/PipeConnection.h \ + msg/AsyncConnection.h \ + msg/SimpleMessenger.h \ + msg/AsyncMessenger.h \ msg/SimplePolicyMessenger.h \ - msg/msg_types.h + msg/msg_types.h \ + msg/Event.h \ + msg/EventEpoll.h \ + msg/net_handler.h # simple libmsg_la_SOURCES += \ diff --git a/src/msg/net_handler.cc b/src/msg/net_handler.cc new file mode 100644 index 00000000000..e51b19e9391 --- /dev/null +++ b/src/msg/net_handler.cc @@ -0,0 +1,128 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include +#include +#include +#include + +#include "net_handler.h" +#include "common/errno.h" +#include "common/debug.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "net_handler: " + +namespace ceph{ + +int NetHandler::create_socket(int domain, bool reuse_addr) +{ + int s, on = 1; + + if ((s = ::socket(domain, SOCK_STREAM, 0)) == -1) { + lderr(cct) << __func__ << " couldn't created socket " << cpp_strerror(errno) << dendl; + return -errno; + } + + /* Make sure connection-intensive things like the benckmark + * will be able to close/open sockets a zillion of times */ + if (reuse_addr) { + if (::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { + lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: %s" + << strerror(errno) << dendl; + return -errno; + } + } + + return s; +} + +int NetHandler::set_nonblock(int sd) +{ + int flags; + + /* Set the socket nonblocking. + * Note that fcntl(2) for F_GETFL and F_SETFL can't be + * interrupted by a signal. */ + if ((flags = fcntl(sd, F_GETFL)) < 0 ) { + lderr(cct) << __func__ << " fcntl(F_GETFL) failed: %s" << strerror(errno) << dendl; + return -errno; + } + if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { + lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): %s" << strerror(errno) << dendl; + return -errno; + } + + return 0; +} + +void NetHandler::set_socket_options(int sd) +{ + // disable Nagle algorithm? + if (cct->_conf->ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) { + r = -errno; + ldout(cct, 0) << "couldn't set TCP_NODELAY: " << cpp_strerror(r) << dendl; + } + } + if (cct->_conf->ms_tcp_rcvbuf) { + int size = cct->_conf->ms_tcp_rcvbuf; + int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); + if (r < 0) { + r = -errno; + ldout(cct, 0) << "couldn't set SO_RCVBUF to " << size << ": " << cpp_strerror(r) << dendl; + } + } + + // block ESIGPIPE +#ifdef CEPH_USE_SO_NOSIGPIPE + int val = 1; + int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); + if (r) { + r = -errno; + ldout(cct,0) << "couldn't set SO_NOSIGPIPE: " << cpp_strerror(r) << dendl; + } +#endif +} + +int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock) +{ + int ret; + int s = create_socket(addr.get_family()); + if (s < 0) + return s; + + if (nonblock) { + ret = set_nonblock(s); + if (ret < 0) + return ret; + } + ret = ::connect(s, (sockaddr*)&addr.addr, addr.addr_size()); + if (ret < 0) { + if (errno == EINPROGRESS && nonblock) + return s; + + lderr(cct) << __func__ << " connect: %s " << strerror(errno) << dendl; + close(s); + return -errno; + } + + set_socket_options(s); + + return s; +} + +int NetHandler::connect(const entity_addr_t &addr) +{ + return generic_connect(addr, false); +} + +int NetHandler::nonblock_connect(const entity_addr_t &addr) +{ + return generic_connect(addr, true); +} + + +} diff --git a/src/msg/net_handler.h b/src/msg/net_handler.h new file mode 100644 index 00000000000..bc8487af6f5 --- /dev/null +++ b/src/msg/net_handler.h @@ -0,0 +1,23 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_COMMON_NET_UTILS_H +#define CEPH_COMMON_NET_UTILS_H +#include "common/config.h" + +namespace ceph { + class NetHandler { + private: + int create_socket(int domain, bool reuse_addr=false); + int generic_connect(const entity_addr_t& addr, bool nonblock); + + CephContext *cct; + public: + NetHandler(CephContext *c): cct(c) {} + int set_nonblock(int sd); + void set_socket_options(int sd); + int connect(const entity_addr_t &addr); + int nonblock_connect(const entity_addr_t &addr); + }; +} + +#endif diff --git a/src/msg/simple/DispatchQueue.h b/src/msg/simple/DispatchQueue.h index 5fe17dcf593..006938107c2 100644 --- a/src/msg/simple/DispatchQueue.h +++ b/src/msg/simple/DispatchQueue.h @@ -65,7 +65,7 @@ class DispatchQueue { }; CephContext *cct; - SimpleMessenger *msgr; + Messenger *msgr; Mutex lock; Cond cond; @@ -191,7 +191,7 @@ class DispatchQueue { void shutdown(); bool is_started() {return dispatch_thread.is_started();} - DispatchQueue(CephContext *cct, SimpleMessenger *msgr) + DispatchQueue(CephContext *cct, Messenger *msgr) : cct(cct), msgr(msgr), lock("SimpleMessenger::DispatchQeueu::lock"), mqueue(cct->_conf->ms_pq_max_tokens_per_priority, -- 2.47.3