#include "ProtocolV2.h"
#include "AsyncMessenger.h"
+#include "common/EventTrace.h"
#include "common/errno.h"
#include "include/random.h"
<< " l=" << connection->policy.lossy << ").";
}
+const int ASYNC_COALESCE_THRESHOLD = 256;
+
#define WRITE(B, C) write(CONTINUATION(C), B)
#define READ(L, C) read(CONTINUATION(C), L)
using CtPtr = Ct<ProtocolV2> *;
+static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
+ // create a buffer to read into that matches the data alignment
+ unsigned alloc_len = 0;
+ unsigned left = len;
+ unsigned head = 0;
+ if (off & ~CEPH_PAGE_MASK) {
+ // head
+ alloc_len += CEPH_PAGE_SIZE;
+ head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
+ left -= head;
+ }
+ alloc_len += left;
+ bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
+ if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
+ data.push_back(std::move(ptr));
+}
+
ProtocolV2::ProtocolV2(AsyncConnection *connection)
: Protocol(2, connection),
temp_buffer(nullptr),
state(NONE),
peer_required_features(0),
cookie(0),
+ message_seq(0),
+ can_write(false),
bannerExchangeCallback(nullptr),
- next_frame_len(0) {
+ next_frame_len(0),
+ keepalive(false) {
temp_buffer = new char[4096];
}
void ProtocolV2::accept() { state = START_ACCEPT; }
-bool ProtocolV2::is_connected() { return false; }
+bool ProtocolV2::is_connected() { return can_write; }
+
+/*
+ * Tears down the message queues, and removes them from the
+ * DispatchQueue Must hold write_lock prior to calling.
+ */
+void ProtocolV2::discard_out_queue() {
+ ldout(cct, 10) << __func__ << " started" << dendl;
+
+ for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
+ ldout(cct, 20) << __func__ << " discard " << *p << dendl;
+ (*p)->put();
+ }
+ sent.clear();
+ for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
+ out_queue.begin();
+ p != out_queue.end(); ++p) {
+ for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
+ r != p->second.end(); ++r) {
+ ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
+ r->second->put();
+ }
+ }
+ out_queue.clear();
+}
void ProtocolV2::stop() {
ldout(cct, 2) << __func__ << dendl;
if (connection->delay_state) connection->delay_state->flush();
+ std::lock_guard<std::mutex> l(connection->write_lock);
+
+ discard_out_queue();
+
connection->_stop();
+ can_write = false;
state = CLOSED;
}
}
}
-void ProtocolV2::send_message(Message *m) {}
+void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
+ bufferlist &bl) {
+ ldout(cct, 20) << __func__ << " m=" << *m << dendl;
+
+ // associate message with Connection (for benefit of encode_payload)
+ if (m->empty_payload()) {
+ ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
+ << " " << *m << dendl;
+ } else {
+ ldout(cct, 20) << __func__ << " half-reencoding features " << features
+ << " " << m << " " << *m << dendl;
+ }
-void ProtocolV2::send_keepalive() {}
+ // encode and copy out of *m
+ m->encode(features, messenger->crcflags);
+
+ bl.append(m->get_payload());
+ bl.append(m->get_middle());
+ bl.append(m->get_data());
+}
+
+void ProtocolV2::send_message(Message *m) {
+ bufferlist bl;
+ uint64_t f = connection->get_features();
+
+ // TODO: Currently not all messages supports reencode like MOSDMap, so here
+ // only let fast dispatch support messages prepare message
+ bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+ if (can_fast_prepare) {
+ prepare_send_message(f, m, bl);
+ }
+
+ std::lock_guard<std::mutex> l(connection->write_lock);
+ // "features" changes will change the payload encoding
+ if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
+ // ensure the correctness of message encoding
+ bl.clear();
+ m->clear_payload();
+ ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
+ << " != " << connection->get_features() << dendl;
+ }
+ if (state == CLOSED) {
+ ldout(cct, 10) << __func__ << " connection closed."
+ << " Drop message " << m << dendl;
+ m->put();
+ } else {
+ m->trace.event("async enqueueing message");
+ out_queue[m->get_priority()].emplace_back(std::move(bl), m);
+ ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
+ << dendl;
+ if (can_write) {
+ connection->center->dispatch_event_external(connection->write_handler);
+ }
+ }
+}
+
+void ProtocolV2::send_keepalive() {
+ ldout(cct, 10) << __func__ << dendl;
+ std::lock_guard<std::mutex> l(connection->write_lock);
+ if (can_write) {
+ keepalive = true;
+ connection->center->dispatch_event_external(connection->write_handler);
+ }
+}
void ProtocolV2::read_event() {
ldout(cct, 20) << __func__ << dendl;
case START_ACCEPT:
CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange));
break;
+ case READY:
+ CONTINUATION_RUN(CONTINUATION(read_frame));
+ break;
+ case THROTTLE_MESSAGE:
+ CONTINUATION_RUN(CONTINUATION(throttle_message));
+ break;
+ case THROTTLE_BYTES:
+ CONTINUATION_RUN(CONTINUATION(throttle_bytes));
+ break;
+ case THROTTLE_DISPATCH_QUEUE:
+ CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
+ break;
default:
break;
}
}
-void ProtocolV2::write_event() {}
+Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
+ Message *m = 0;
+ if (!out_queue.empty()) {
+ map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
+ out_queue.rbegin();
+ ceph_assert(!it->second.empty());
+ list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
+ m = p->second;
+ if (bl) {
+ bl->swap(p->first);
+ }
+ it->second.erase(p);
+ if (it->second.empty()) {
+ out_queue.erase(it->first);
+ }
+ }
+ return m;
+}
+
+ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
+ FUNCTRACE(cct);
+ ceph_assert(connection->center->in_thread());
+ m->set_seq(++out_seq);
+
+ connection->lock.lock();
+ uint64_t ack_seq = in_seq;
+ ack_left = 0;
+ connection->lock.unlock();
+
+ MessageFrame message(m, bl, ack_seq, messenger->crcflags & MSG_CRC_HEADER);
+
+ ldout(cct, 20) << __func__ << " sending message type=" << message.header2.type
+ << " src " << entity_name_t(messenger->get_myname())
+ << " front=" << message.header2.front_len
+ << " data=" << message.header2.data_len << " off "
+ << message.header2.data_off << dendl;
+
+ bufferlist &msg_bl = message.get_buffer();
+ connection->outcoming_bl.claim_append(msg_bl);
+
+ m->trace.event("async writing message");
+ ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
+ << dendl;
+ ssize_t total_send_size = connection->outcoming_bl.length();
+ ssize_t rc = connection->_try_send(more);
+ if (rc < 0) {
+ ldout(cct, 1) << __func__ << " error sending " << m << ", "
+ << cpp_strerror(rc) << dendl;
+ } else {
+ connection->logger->inc(
+ l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+ ldout(cct, 10) << __func__ << " sending " << m
+ << (rc ? " continuely." : " done.") << dendl;
+ }
+ if (m->get_type() == CEPH_MSG_OSD_OP)
+ OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
+ else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
+ OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+ m->put();
+
+ return rc;
+}
+
+void ProtocolV2::append_keepalive() {
+ ldout(cct, 10) << __func__ << dendl;
+ KeepAliveFrame keepalive_frame;
+ connection->outcoming_bl.claim_append(keepalive_frame.get_buffer());
+}
+
+void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
+ struct ceph_timespec ts;
+ timestamp.encode_timeval(&ts);
+ KeepAliveFrame keepalive_ack_frame(ts);
+ connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer());
+}
+
+void ProtocolV2::handle_message_ack(uint64_t seq) {
+ if (connection->policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+
+ ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
+
+ // trim sent list
+ static const int max_pending = 128;
+ int i = 0;
+ Message *pending[max_pending];
+ connection->write_lock.lock();
+ while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
+ Message *m = sent.front();
+ sent.pop_front();
+ pending[i++] = m;
+ ldout(cct, 10) << __func__ << " got ack seq " << seq
+ << " >= " << m->get_seq() << " on " << m << " " << *m
+ << dendl;
+ }
+ connection->write_lock.unlock();
+ for (int k = 0; k < i; k++) {
+ pending[k]->put();
+ }
+}
+
+void ProtocolV2::write_event() {
+ ldout(cct, 10) << __func__ << dendl;
+ ssize_t r = 0;
+
+ connection->write_lock.lock();
+ if (can_write) {
+ if (keepalive) {
+ append_keepalive();
+ keepalive = false;
+ }
+
+ auto start = ceph::mono_clock::now();
+ bool more;
+ do {
+ bufferlist data;
+ Message *m = _get_next_outgoing(&data);
+ if (!m) {
+ break;
+ }
+
+ if (!connection->policy.lossy) {
+ // put on sent list
+ sent.push_back(m);
+ m->get();
+ }
+ more = !out_queue.empty();
+ connection->write_lock.unlock();
+
+ // send_message or requeue messages may not encode message
+ if (!data.length()) {
+ prepare_send_message(connection->get_features(), m, data);
+ }
+
+ r = write_message(m, data, more);
+
+ connection->write_lock.lock();
+ if (r == 0) {
+ ;
+ } else if (r < 0) {
+ ldout(cct, 1) << __func__ << " send msg failed" << dendl;
+ break;
+ } else if (r > 0)
+ break;
+ } while (can_write);
+ connection->write_lock.unlock();
+
+ // if r > 0 mean data still lefted, so no need _try_send.
+ if (r == 0) {
+ uint64_t left = ack_left;
+ if (left) {
+ ceph_le64 s;
+ s = in_seq;
+ AckFrame ack(in_seq);
+ connection->outcoming_bl.claim_append(ack.get_buffer());
+ ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
+ << " messages" << dendl;
+ ack_left -= left;
+ left = ack_left;
+ r = connection->_try_send(left);
+ } else if (is_queued()) {
+ r = connection->_try_send();
+ }
+ }
+
+ connection->logger->tinc(l_msgr_running_send_time,
+ ceph::mono_clock::now() - start);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send msg failed" << dendl;
+ connection->lock.lock();
+ fault();
+ connection->lock.unlock();
+ return;
+ }
+ } else {
+ connection->write_lock.unlock();
+ connection->lock.lock();
+ connection->write_lock.lock();
+ // if (state == STANDBY && !connection->policy.server && is_queued()) {
+ // ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+ // connection->_connect();
+ // } else
+ if (connection->cs && state != NONE && state != CLOSED &&
+ state != START_CONNECT) {
+ r = connection->_try_send();
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
+ connection->write_lock.unlock();
+ fault();
+ connection->lock.unlock();
+ return;
+ }
+ }
+ connection->write_lock.unlock();
+ connection->lock.unlock();
+ }
+}
bool ProtocolV2::is_queued() { return false; }
}
CtPtr ProtocolV2::read_frame() {
+ if (state == CLOSED) {
+ return nullptr;
+ }
+
ldout(cct, 20) << __func__ << dendl;
- return READ(sizeof(__le32), handle_read_frame_length);
+ return READ(sizeof(__le32) * 2, handle_read_frame_length_and_tag);
}
-CtPtr ProtocolV2::handle_read_frame_length(char *buffer, int r) {
+CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " read frame length failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
+ ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- next_frame_len = *(__le32 *)buffer;
+ next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t);
+ next_tag = static_cast<Tag>(*(uint32_t *)(buffer + sizeof(uint32_t)));
- return READ(next_frame_len, handle_frame);
+ ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len
+ << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
+
+ switch (next_tag) {
+ case Tag::AUTH_REQUEST:
+ case Tag::AUTH_BAD_METHOD:
+ case Tag::AUTH_BAD_AUTH:
+ case Tag::AUTH_MORE:
+ case Tag::AUTH_DONE:
+ case Tag::IDENT:
+ case Tag::IDENT_MISSING_FEATURES:
+ case Tag::KEEPALIVE2:
+ case Tag::KEEPALIVE2_ACK:
+ case Tag::ACK:
+ return READ(next_frame_len, handle_frame_payload);
+ case Tag::MESSAGE:
+ return handle_message();
+ }
+
+ return nullptr;
}
-CtPtr ProtocolV2::handle_frame(char *buffer, int r) {
+CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
return _fault();
}
- Tag tag = static_cast<Tag>(*(__le32 *)buffer);
- buffer += sizeof(__le32);
- uint32_t payload_len = next_frame_len - sizeof(__le32);
-
- ldout(cct, 10) << __func__ << " tag=" << static_cast<uint32_t>(tag) << dendl;
-
- switch (tag) {
+ switch (next_tag) {
case Tag::AUTH_REQUEST:
- return handle_auth_request(buffer, payload_len);
+ return handle_auth_request(buffer, next_frame_len);
case Tag::AUTH_BAD_METHOD:
- return handle_auth_bad_method(buffer, payload_len);
+ return handle_auth_bad_method(buffer, next_frame_len);
case Tag::AUTH_BAD_AUTH:
- return handle_auth_bad_auth(buffer, payload_len);
+ return handle_auth_bad_auth(buffer, next_frame_len);
case Tag::AUTH_MORE:
- return handle_auth_more(buffer, payload_len);
+ return handle_auth_more(buffer, next_frame_len);
case Tag::AUTH_DONE:
- return handle_auth_done(buffer, payload_len);
+ return handle_auth_done(buffer, next_frame_len);
case Tag::IDENT:
- return handle_ident(buffer, payload_len);
+ return handle_ident(buffer, next_frame_len);
case Tag::IDENT_MISSING_FEATURES:
- return handle_ident_missing_features(buffer, payload_len);
+ return handle_ident_missing_features(buffer, next_frame_len);
+ case Tag::KEEPALIVE2:
+ return handle_keepalive2(buffer, next_frame_len);
+ case Tag::KEEPALIVE2_ACK:
+ return handle_keepalive2_ack(buffer, next_frame_len);
+ case Tag::ACK:
+ return handle_message_ack(buffer, next_frame_len);
default:
ceph_abort();
}
encode(hello, auth_bl, 0);
/* END TO REMOVE */
AuthMoreFrame more(auth_bl);
- bufferlist bl = more.to_bufferlist();
+ bufferlist &bl = more.get_buffer();
return WRITE(bl, handle_auth_more_write);
}
/* END TO REMOVE */
AuthDoneFrame auth_done(0);
- auto bl = auth_done.to_bufferlist();
+ bufferlist &bl = auth_done.get_buffer();
return WRITE(bl, handle_auth_done_write);
}
ceph_abort("wrong state at handle_ident");
}
+CtPtr ProtocolV2::ready() {
+ ldout(cct, 25) << __func__ << dendl;
+
+ // make sure no pending tick timer
+ if (connection->last_tick_id) {
+ connection->center->delete_time_event(connection->last_tick_id);
+ }
+ connection->last_tick_id = connection->center->create_time_event(
+ connection->inactive_timeout_us, connection->tick_handler);
+
+ {
+ std::lock_guard<std::mutex> l(connection->write_lock);
+ can_write = true;
+ if (!out_queue.empty()) {
+ connection->center->dispatch_event_external(connection->write_handler);
+ }
+ }
+
+ state = READY;
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_message() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ ceph_assert(state == READY);
+
+ return READ(sizeof(ceph_msg_header2), handle_message_header);
+}
+
+CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read message header failed" << dendl;
+ return _fault();
+ }
+
+ ceph_msg_header2 header;
+ header = *((ceph_msg_header2 *)buffer);
+
+ entity_name_t src(connection->peer_type, connection->peer_global_id);
+
+ ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src "
+ << src << " front=" << header.front_len
+ << " data=" << header.data_len << " off " << header.data_off
+ << dendl;
+
+ if (messenger->crcflags & MSG_CRC_HEADER) {
+ __u32 header_crc = 0;
+ header_crc = ceph_crc32c(0, (unsigned char *)&header,
+ sizeof(header) - sizeof(header.header_crc));
+ // verify header crc
+ if (header_crc != header.header_crc) {
+ ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
+ << " != " << header.header_crc << dendl;
+ return _fault();
+ }
+ }
+
+ // Reset state
+ data_buf.clear();
+ front.clear();
+ middle.clear();
+ data.clear();
+ current_header = header;
+
+ state = THROTTLE_MESSAGE;
+ return CONTINUE(throttle_message);
+}
+
+CtPtr ProtocolV2::throttle_message() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ if (connection->policy.throttler_messages) {
+ ldout(cct, 10) << __func__ << " wants " << 1
+ << " message from policy throttler "
+ << connection->policy.throttler_messages->get_current()
+ << "/" << connection->policy.throttler_messages->get_max()
+ << dendl;
+ if (!connection->policy.throttler_messages->get_or_fail()) {
+ ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
+ << connection->policy.throttler_messages->get_current()
+ << "/" << connection->policy.throttler_messages->get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(1000,
+ connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+
+ state = THROTTLE_BYTES;
+ return CONTINUE(throttle_bytes);
+}
+
+CtPtr ProtocolV2::throttle_bytes() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+ if (cur_msg_size) {
+ if (connection->policy.throttler_bytes) {
+ ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+ << " bytes from policy throttler "
+ << connection->policy.throttler_bytes->get_current() << "/"
+ << connection->policy.throttler_bytes->get_max() << dendl;
+ if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+ ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+ << " bytes from policy throttler "
+ << connection->policy.throttler_bytes->get_current()
+ << "/" << connection->policy.throttler_bytes->get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(
+ 1000, connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+ }
+
+ state = THROTTLE_DISPATCH_QUEUE;
+ return CONTINUE(throttle_dispatch_queue);
+}
+
+CtPtr ProtocolV2::throttle_dispatch_queue() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+
+ if (cur_msg_size) {
+ if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
+ cur_msg_size)) {
+ ldout(cct, 10)
+ << __func__ << " wants " << cur_msg_size
+ << " bytes from dispatch throttle "
+ << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+ << connection->dispatch_queue->dispatch_throttler.get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(1000,
+ connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+
+ throttle_stamp = ceph_clock_now();
+
+ state = READ_MESSAGE_FRONT;
+ return read_message_front();
+}
+
+CtPtr ProtocolV2::read_message_front() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ unsigned front_len = current_header.front_len;
+ if (front_len) {
+ if (!front.length()) {
+ front.push_back(buffer::create(front_len));
+ }
+ return READB(front_len, front.c_str(), handle_message_front);
+ }
+ return read_message_middle();
+}
+
+CtPtr ProtocolV2::handle_message_front(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read message front failed" << dendl;
+ return _fault();
+ }
+
+ ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
+
+ return read_message_middle();
+}
+
+CtPtr ProtocolV2::read_message_middle() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ if (current_header.middle_len) {
+ if (!middle.length()) {
+ middle.push_back(buffer::create(current_header.middle_len));
+ }
+ return READB(current_header.middle_len, middle.c_str(),
+ handle_message_middle);
+ }
+
+ return read_message_data_prepare();
+}
+
+CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
+ return _fault();
+ }
+
+ ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
+
+ return read_message_data_prepare();
+}
+
+CtPtr ProtocolV2::read_message_data_prepare() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ unsigned data_len = le32_to_cpu(current_header.data_len);
+ unsigned data_off = le32_to_cpu(current_header.data_off);
+
+ if (data_len) {
+ // get a buffer
+ map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
+ connection->rx_buffers.find(current_header.tid);
+ if (p != connection->rx_buffers.end()) {
+ ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
+ << " at offset " << data_off << " len "
+ << p->second.first.length() << dendl;
+ data_buf = p->second.first;
+ // make sure it's big enough
+ if (data_buf.length() < data_len)
+ data_buf.push_back(buffer::create(data_len - data_buf.length()));
+ data_blp = data_buf.begin();
+ } else {
+ ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
+ << data_off << dendl;
+ alloc_aligned_buffer(data_buf, data_len, data_off);
+ data_blp = data_buf.begin();
+ }
+ }
+
+ msg_left = data_len;
+
+ return CONTINUE(read_message_data);
+}
+
+CtPtr ProtocolV2::read_message_data() {
+ ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
+
+ if (msg_left > 0) {
+ bufferptr bp = data_blp.get_current_ptr();
+ unsigned read_len = std::min(bp.length(), msg_left);
+
+ return READB(read_len, bp.c_str(), handle_message_data);
+ }
+
+ state = READ_MESSAGE_COMPLETE;
+ return handle_message_complete();
+}
+
+CtPtr ProtocolV2::handle_message_data(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read data error " << dendl;
+ return _fault();
+ }
+
+ bufferptr bp = data_blp.get_current_ptr();
+ unsigned read_len = std::min(bp.length(), msg_left);
+ ceph_assert(read_len < std::numeric_limits<int>::max());
+ data_blp.advance(read_len);
+ data.append(bp, 0, read_len);
+ msg_left -= read_len;
+
+ return CONTINUE(read_message_data);
+}
+
+CtPtr ProtocolV2::handle_message_complete() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ ldout(cct, 20) << __func__ << " got " << front.length() << " + "
+ << middle.length() << " + " << data.length() << " byte message"
+ << dendl;
+
+ ceph_msg_header header{
+ current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ current_header.front_len,
+ current_header.middle_len,
+ current_header.data_len,
+ current_header.data_off,
+ entity_name_t(connection->peer_type, connection->peer_global_id),
+ current_header.compat_version,
+ current_header.reserved,
+ 0};
+ ceph_msg_footer footer{current_header.front_crc, current_header.middle_crc,
+ current_header.data_crc, 0, current_header.flags};
+
+ Message *message = decode_message(cct, messenger->crcflags, header, footer,
+ front, middle, data, connection);
+ if (!message) {
+ ldout(cct, 1) << __func__ << " decode message failed " << dendl;
+ return _fault();
+ }
+
+ //
+ // Check the signature if one should be present. A zero return indicates
+ // success. PLR
+ //
+
+ // if (session_security.get() == NULL) {
+ // ldout(cct, 10) << __func__ << " no session security set" << dendl;
+ // } else {
+ // if (session_security->check_message_signature(message)) {
+ // ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
+ // message->put();
+ // return _fault();
+ // }
+ // }
+ message->set_byte_throttler(connection->policy.throttler_bytes);
+ message->set_message_throttler(connection->policy.throttler_messages);
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(cur_msg_size);
+
+ message->set_recv_stamp(recv_stamp);
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_complete_stamp(ceph_clock_now());
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = in_seq;
+ if (message->get_seq() <= cur_seq) {
+ ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
+ << " <= " << cur_seq << " " << message << " " << *message
+ << ", discarding" << dendl;
+ message->put();
+ if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
+ cct->_conf->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return nullptr;
+ }
+ if (message->get_seq() > cur_seq + 1) {
+ ldout(cct, 0) << __func__ << " missed message? skipped from seq "
+ << cur_seq << " to " << message->get_seq() << dendl;
+ if (cct->_conf->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ message->set_connection(connection);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+ if (message->get_type() == CEPH_MSG_OSD_OP ||
+ message->get_type() == CEPH_MSG_OSD_OPREPLY) {
+ utime_t ltt_processed_stamp = ceph_clock_now();
+ double usecs_elapsed =
+ (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+ ostringstream buf;
+ if (message->get_type() == CEPH_MSG_OSD_OP)
+ OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
+ false);
+ else
+ OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
+ false);
+ }
+#endif
+
+ // note last received message.
+ in_seq = message->get_seq();
+ ldout(cct, 5) << " rx " << message->get_source() << " seq "
+ << message->get_seq() << " " << message << " " << *message
+ << dendl;
+
+ bool need_dispatch_writer = true;
+ if (!connection->policy.lossy) {
+ ack_left++;
+ need_dispatch_writer = true;
+ }
+
+ state = READY;
+
+ connection->logger->inc(l_msgr_recv_messages);
+ connection->logger->inc(
+ l_msgr_recv_bytes,
+ cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+
+ messenger->ms_fast_preprocess(message);
+ auto fast_dispatch_time = ceph::mono_clock::now();
+ connection->logger->tinc(l_msgr_running_recv_time,
+ fast_dispatch_time - connection->recv_start_time);
+ if (connection->delay_state) {
+ double delay_period = 0;
+ if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
+ delay_period =
+ cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ ldout(cct, 1) << "queue_received will delay after "
+ << (ceph_clock_now() + delay_period) << " on " << message
+ << " " << *message << dendl;
+ }
+ connection->delay_state->queue(delay_period, message);
+ } else if (messenger->ms_can_fast_dispatch(message)) {
+ connection->lock.unlock();
+ connection->dispatch_queue->fast_dispatch(message);
+ connection->recv_start_time = ceph::mono_clock::now();
+ connection->logger->tinc(l_msgr_running_fast_dispatch_time,
+ connection->recv_start_time - fast_dispatch_time);
+ connection->lock.lock();
+ } else {
+ connection->dispatch_queue->enqueue(message, message->get_priority(),
+ connection->conn_id);
+ }
+
+ handle_message_ack(current_header.ack_seq);
+
+ // clean up local buffer references
+ data_buf.clear();
+ front.clear();
+ middle.clear();
+ data.clear();
+
+ if (need_dispatch_writer && connection->is_connected()) {
+ connection->center->dispatch_event_external(connection->write_handler);
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ KeepAliveFrame keepalive_frame(payload, length);
+
+ ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
+
+ utime_t kp_t = utime_t(keepalive_frame.timestamp);
+ connection->write_lock.lock();
+ append_keepalive_ack(kp_t);
+ connection->write_lock.unlock();
+
+ ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
+ connection->set_last_keepalive(ceph_clock_now());
+
+ if (is_connected()) {
+ connection->center->dispatch_event_external(connection->write_handler);
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ KeepAliveFrame keepalive_ack_frame(payload, length);
+ connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp));
+ ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ AckFrame ack(payload, length);
+ handle_message_ack(ack.seq);
+ return CONTINUE(read_frame);
+}
+
/* Client Protocol Methods */
CtPtr ProtocolV2::start_client_banner_exchange() {
}
AuthRequestFrame authFrame(method, auth_bl);
- bufferlist bl = authFrame.to_bufferlist();
+ bufferlist &bl = authFrame.get_buffer();
return WRITE(bl, handle_auth_request_write);
}
<< " flags: " << ident.flags << " cookie: " << std::dec
<< ident.cookie << dendl;
- bufferlist bl = ident.to_bufferlist();
+ bufferlist &bl = ident.get_buffer();
return WRITE(bl, handle_client_ident_write);
}
connection->set_features(server_ident.required_features &
connection->policy.features_supported);
- state = READY;
-
- return CONTINUE(read_frame);
+ return ready();
}
/* Server Protocol Methods */
ldout(cct, 1) << __func__ << " auth method=" << auth_request.method
<< " not allowed" << dendl;
AuthBadMethodFrame bad_method(auth_request.method, allowed_methods);
- bufferlist bl = bad_method.to_bufferlist();
+ bufferlist &bl = bad_method.get_buffer();
return WRITE(bl, handle_auth_bad_method_write);
}
if (!valid) {
AuthBadAuthFrame bad_auth(12, "Permission denied");
- bufferlist bl = bad_auth.to_bufferlist();
+ bufferlist &bl = bad_auth.get_buffer();
return WRITE(bl, handle_auth_bad_auth_write);
}
encode(hello, auth_bl, 0);
/* END TO REMOVE */
AuthMoreFrame more(auth_bl);
- bufferlist bl = more.to_bufferlist();
+ bufferlist &bl = more.get_buffer();
return WRITE(bl, handle_auth_more_write);
}
<< feat_missing << std::dec << dendl;
IdentMissingFeaturesFrame ident_missing_features(feat_missing);
- bufferlist bl;
- bl = ident_missing_features.to_bufferlist();
+ bufferlist &bl = ident_missing_features.get_buffer();
return WRITE(bl, handle_ident_missing_features_write);
}
<< " flags: " << ident.flags << " cookie: " << std::dec
<< ident.cookie << dendl;
- bufferlist bl = ident.to_bufferlist();
+ connection->lock.unlock();
+ // Because "replacing" will prevent other connections preempt this addr,
+ // it's safe that here we don't acquire Connection's lock
+ ssize_t r = messenger->accept_conn(connection);
+
+ connection->inject_delay();
+
+ connection->lock.lock();
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
+ << connection->peer_addrs.msgr2_addr()
+ << " just fail later one(this)" << dendl;
+ ldout(cct, 10) << "accept fault after register" << dendl;
+ connection->inject_delay();
+ return _fault();
+ }
+ if (state != ACCEPTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept_conn, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED || state == NONE);
+ ldout(cct, 10) << "accept fault after register" << dendl;
+ connection->inject_delay();
+ return _fault();
+ }
+
+ bufferlist &bl = ident.get_buffer();
return WRITE(bl, handle_send_server_ident_write);
}
return _fault();
}
+ // notify
+ connection->dispatch_queue->queue_accept(connection);
+ messenger->ms_deliver_handle_fast_accept(connection);
+
return CONTINUE(read_frame);
}
return _fault();
}
- state = READY;
-
- return CONTINUE(read_frame);
+ return ready();
}
START_ACCEPT,
ACCEPTING,
READY,
+ THROTTLE_MESSAGE,
+ THROTTLE_BYTES,
+ THROTTLE_DISPATCH_QUEUE,
+ READ_MESSAGE_FRONT,
+ READ_MESSAGE_COMPLETE,
CLOSED
};
static const char *get_state_name(int state) {
- const char *const statenames[] = {"NONE", "START_CONNECT",
- "CONNECTING", "START_ACCEPT",
- "ACCEPTING", "CLOSED"};
+ const char *const statenames[] = {"NONE",
+ "START_CONNECT",
+ "CONNECTING",
+ "START_ACCEPT",
+ "ACCEPTING",
+ "READY",
+ "THROTTLE_MESSAGE",
+ "THROTTLE_BYTES",
+ "THROTTLE_DISPATCH_QUEUE",
+ "READ_MESSAGE_FRONT",
+ "READ_MESSAGE_COMPLETE",
+ "CLOSED"};
return statenames[state];
}
- enum class Tag : __le32 {
+ enum class Tag : uint32_t {
AUTH_REQUEST,
AUTH_BAD_METHOD,
AUTH_BAD_AUTH,
AUTH_DONE,
IDENT,
IDENT_MISSING_FEATURES,
+ MESSAGE,
+ KEEPALIVE2,
+ KEEPALIVE2_ACK,
+ ACK
};
struct Frame {
- __le32 frame_len;
- __le32 tag;
+ uint32_t tag;
bufferlist payload;
+ bufferlist frame_buffer;
- Frame(Tag tag, __le32 payload_len)
- : frame_len(sizeof(__le32) + payload_len),
- tag(static_cast<__le32>(tag)) {}
+ Frame(Tag tag) : tag(static_cast<uint32_t>(tag)) {
+ encode(this->tag, payload, 0);
+ }
- bufferlist to_bufferlist() {
- ceph_assert(payload.length() == (frame_len - sizeof(__le32)));
- bufferlist bl;
- encode(frame_len, bl, 0);
- encode(tag, bl, 0);
- bl.claim_append(payload);
- return bl;
+ Frame() {}
+
+ bufferlist &get_buffer() {
+ if (frame_buffer.length()) {
+ return frame_buffer;
+ }
+ encode((uint32_t)payload.length(), frame_buffer, 0);
+ frame_buffer.claim_append(payload);
+ return frame_buffer;
}
};
struct SignedEncryptedFrame : public Frame {
- SignedEncryptedFrame(Tag tag, __le32 payload_len)
- : Frame(tag, payload_len) {}
- bufferlist to_bufferlist() { return Frame::to_bufferlist(); }
+ SignedEncryptedFrame(Tag tag) : Frame(tag) {}
+ SignedEncryptedFrame() : Frame() {}
+ bufferlist &get_buffer() { return Frame::get_buffer(); }
};
struct AuthRequestFrame : public Frame {
- __le32 method;
- __le32 len;
+ uint32_t method;
+ uint32_t len;
bufferlist auth_payload;
- AuthRequestFrame(__le32 method, bufferlist &auth_payload)
- : Frame(Tag::AUTH_REQUEST, sizeof(__le32) * 2 + auth_payload.length()),
+ AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
+ : Frame(Tag::AUTH_REQUEST),
method(method),
len(auth_payload.length()),
auth_payload(auth_payload) {
payload.claim_append(auth_payload);
}
- AuthRequestFrame(char *payload, uint32_t length)
- : Frame(Tag::AUTH_REQUEST, length) {
- method = *(__le32 *)payload;
- len = *(__le32 *)(payload + sizeof(__le32));
- ceph_assert((length - (sizeof(__le32) * 2)) == len);
- auth_payload.append((payload + (sizeof(__le32) * 2)), len);
+ AuthRequestFrame(char *payload, uint32_t length) : Frame() {
+ method = *(uint32_t *)payload;
+ len = *(uint32_t *)(payload + sizeof(uint32_t));
+ ceph_assert((length - (sizeof(uint32_t) * 2)) == len);
+ auth_payload.append((payload + (sizeof(uint32_t) * 2)), len);
}
};
struct AuthBadMethodFrame : public Frame {
- __le32 method;
+ uint32_t method;
std::vector<__u32> allowed_methods;
- AuthBadMethodFrame(__le32 method, std::vector<__u32> methods)
- : Frame(Tag::AUTH_BAD_METHOD, sizeof(__le32) * (2 + methods.size())),
+ AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods)
+ : Frame(Tag::AUTH_BAD_METHOD),
method(method),
allowed_methods(methods) {
encode(method, payload, 0);
- encode((__le32)allowed_methods.size(), payload, 0);
+ encode((uint32_t)allowed_methods.size(), payload, 0);
for (const auto &a_meth : allowed_methods) {
encode(a_meth, payload, 0);
}
}
- AuthBadMethodFrame(char *payload, uint32_t length)
- : Frame(Tag::AUTH_BAD_METHOD, length) {
- method = *(__le32 *)payload;
- __le32 num_methods = *(__le32 *)(payload + sizeof(__le32));
- payload += sizeof(__le32) * 2;
+ AuthBadMethodFrame(char *payload, uint32_t length) : Frame() {
+ method = *(uint32_t *)payload;
+ uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t));
+ payload += sizeof(uint32_t) * 2;
for (unsigned i = 0; i < num_methods; ++i) {
- allowed_methods.push_back(*(__le32 *)(payload + sizeof(__le32) * i));
+ allowed_methods.push_back(
+ *(uint32_t *)(payload + sizeof(uint32_t) * i));
}
}
};
struct AuthBadAuthFrame : public Frame {
- __le32 error_code;
+ uint32_t error_code;
std::string error_msg;
- AuthBadAuthFrame(__le32 error_code, std::string error_msg)
- : Frame(Tag::AUTH_BAD_AUTH, sizeof(__le32) * 2 + error_msg.size()),
+ AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
+ : Frame(Tag::AUTH_BAD_AUTH),
error_code(error_code),
error_msg(error_msg) {
encode(error_code, payload, 0);
encode(error_msg, payload, 0);
}
- AuthBadAuthFrame(char *payload, uint32_t length)
- : Frame(Tag::AUTH_BAD_AUTH, length) {
- error_code = *(__le32 *)payload;
- __le32 len = *(__le32 *)(payload + sizeof(__le32));
- error_msg = std::string(payload + sizeof(__le32) * 2, len);
+ AuthBadAuthFrame(char *payload, uint32_t length) : Frame() {
+ error_code = *(uint32_t *)payload;
+ uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t));
+ error_msg = std::string(payload + sizeof(uint32_t) * 2, len);
}
};
struct AuthMoreFrame : public Frame {
- __le32 len;
+ uint32_t len;
bufferlist auth_payload;
AuthMoreFrame(bufferlist &auth_payload)
- : Frame(Tag::AUTH_MORE, sizeof(__le32) + auth_payload.length()),
+ : Frame(Tag::AUTH_MORE),
len(auth_payload.length()),
auth_payload(auth_payload) {
encode(len, payload, 0);
payload.claim_append(auth_payload);
}
- AuthMoreFrame(char *payload, uint32_t length)
- : Frame(Tag::AUTH_BAD_AUTH, length) {
- len = *(__le32 *)payload;
- ceph_assert((length - sizeof(__le32)) == len);
- auth_payload.append(payload + sizeof(__le32), len);
+ AuthMoreFrame(char *payload, uint32_t length) : Frame() {
+ len = *(uint32_t *)payload;
+ ceph_assert((length - sizeof(uint32_t)) == len);
+ auth_payload.append(payload + sizeof(uint32_t), len);
}
};
struct AuthDoneFrame : public Frame {
- __le64 flags;
+ uint64_t flags;
- AuthDoneFrame(uint64_t flags)
- : Frame(Tag::AUTH_DONE, sizeof(__le64)), flags(flags) {
+ AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) {
encode(flags, payload, 0);
}
- AuthDoneFrame(char *payload, uint32_t length)
- : Frame(Tag::AUTH_DONE, length) {
- flags = *(__le64 *)payload;
+ AuthDoneFrame(char *payload, uint32_t length) : Frame() {
+ flags = *(uint64_t *)payload;
}
};
IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features,
uint64_t required_features, uint64_t flags, uint64_t cookie)
- : SignedEncryptedFrame(Tag::IDENT, 0),
+ : SignedEncryptedFrame(Tag::IDENT),
addrs(addrs),
gid(gid),
supported_features(supported_features),
encode(required_features, payload, -1ll);
encode(flags, payload, -1ll);
encode(cookie, payload, -1ll);
- frame_len = sizeof(uint32_t) + payload.length();
}
- IdentFrame(char *payload, uint32_t length)
- : SignedEncryptedFrame(Tag::IDENT, length) {
+ IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
bufferlist bl;
- bl.append(payload, length);
+ bl.push_back(buffer::create_static(length, payload));
try {
auto ti = bl.cbegin();
decode(addrs, ti);
__le64 features;
IdentMissingFeaturesFrame(uint64_t features)
- : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, sizeof(uint64_t)),
+ : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES),
features(features) {
encode(features, payload, -1ll);
}
IdentMissingFeaturesFrame(char *payload, uint32_t length)
- : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, length) {
+ : SignedEncryptedFrame() {
features = *(uint64_t *)payload;
}
};
+ struct MessageFrame : public SignedEncryptedFrame {
+ const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
+
+ ceph_msg_header2 header2;
+
+ MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
+ bool calc_crc)
+ : SignedEncryptedFrame(Tag::MESSAGE) {
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ header2 = ceph_msg_header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version, header.front_len,
+ header.middle_len, 0,
+ header.data_len, header.data_off,
+ ack_seq, footer.front_crc,
+ footer.middle_crc, footer.data_crc,
+ footer.flags, header.compat_version,
+ header.reserved, 0};
+
+ if (calc_crc) {
+ header2.header_crc =
+ ceph_crc32c(0, (unsigned char *)&header2,
+ sizeof(header2) - sizeof(header2.header_crc));
+ }
+
+ payload.append((char *)&header2, sizeof(header2));
+ if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
+ (data.buffers().size() > 1)) {
+ for (const auto &pb : data.buffers()) {
+ payload.append((char *)pb.c_str(), pb.length());
+ }
+ } else {
+ payload.claim_append(data);
+ }
+ }
+ };
+
+ struct KeepAliveFrame : public SignedEncryptedFrame {
+ struct ceph_timespec timestamp;
+
+ KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) {
+ struct ceph_timespec ts;
+ utime_t t = ceph_clock_now();
+ t.encode_timeval(&ts);
+ payload.append((char *)&ts, sizeof(ts));
+ }
+
+ KeepAliveFrame(struct ceph_timespec ×tamp)
+ : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) {
+ payload.append((char *)×tamp, sizeof(timestamp));
+ }
+
+ KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ ceph_assert(length == sizeof(timestamp));
+ timestamp = *(struct ceph_timespec *)payload;
+ }
+ };
+
+ struct AckFrame : public SignedEncryptedFrame {
+ uint64_t seq;
+
+ AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) {
+ encode(seq, payload, 0);
+ }
+
+ AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ seq = *(uint64_t *)payload;
+ }
+ };
+
char *temp_buffer;
State state;
uint64_t peer_required_features;
uint64_t cookie;
+ uint64_t message_seq;
+ bool can_write;
+ std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
+ std::list<Message *> sent;
+ std::atomic<uint64_t> out_seq{0};
+ std::atomic<uint64_t> in_seq{0};
+ std::atomic<uint64_t> ack_left{0};
using ProtFuncPtr = void (ProtocolV2::*)();
Ct<ProtocolV2> *bannerExchangeCallback;
+ uint32_t next_frame_len;
+ Tag next_tag;
+ ceph_msg_header2 current_header;
+ utime_t backoff; // backoff time
+ utime_t recv_stamp;
+ utime_t throttle_stamp;
+ unsigned msg_left;
+ bufferlist data_buf;
+ bufferlist::iterator data_blp;
+ bufferlist front, middle, data;
+
+ bool keepalive;
+
ostream &_conn_prefix(std::ostream *_dout);
Ct<ProtocolV2> *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
return nullptr;
}
+ void discard_out_queue();
+ void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
+ Message *_get_next_outgoing(bufferlist *bl);
+ ssize_t write_message(Message *m, bufferlist &bl, bool more);
+ void append_keepalive();
+ void append_keepalive_ack(utime_t ×tamp);
+ void handle_message_ack(uint64_t seq);
+
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
_banner_exchange_handle_peer_banner);
Ct<ProtocolV2> *_banner_exchange_handle_write(int r);
Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
- uint32_t next_frame_len;
CONTINUATION_DECL(ProtocolV2, read_frame);
- READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length);
- READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
+ CONTINUATION_DECL(ProtocolV2, throttle_message);
+ CONTINUATION_DECL(ProtocolV2, throttle_bytes);
+ CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_front);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_middle);
+ CONTINUATION_DECL(ProtocolV2, read_message_data);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data);
Ct<ProtocolV2> *read_frame();
- Ct<ProtocolV2> *handle_read_frame_length(char *buffer, int r);
- Ct<ProtocolV2> *handle_frame(char *buffer, int r);
+ Ct<ProtocolV2> *handle_read_frame_length_and_tag(char *buffer, int r);
+ Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
+
Ct<ProtocolV2> *handle_auth_more(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_auth_more_write(int r);
+
Ct<ProtocolV2> *handle_ident(char *payload, uint32_t length);
+ Ct<ProtocolV2> *ready();
+
+ Ct<ProtocolV2> *handle_message();
+ Ct<ProtocolV2> *handle_message_header(char *buffer, int r);
+ Ct<ProtocolV2> *throttle_message();
+ Ct<ProtocolV2> *throttle_bytes();
+ Ct<ProtocolV2> *throttle_dispatch_queue();
+ Ct<ProtocolV2> *read_message_front();
+ Ct<ProtocolV2> *handle_message_front(char *buffer, int r);
+ Ct<ProtocolV2> *read_message_middle();
+ Ct<ProtocolV2> *handle_message_middle(char *buffer, int r);
+ Ct<ProtocolV2> *read_message_data_prepare();
+ Ct<ProtocolV2> *read_message_data();
+ Ct<ProtocolV2> *handle_message_data(char *buffer, int r);
+ Ct<ProtocolV2> *handle_message_complete();
+
+ Ct<ProtocolV2> *handle_keepalive2(char *payload, uint32_t length);
+ Ct<ProtocolV2> *handle_keepalive2_ack(char *payload, uint32_t length);
+
+ Ct<ProtocolV2> *handle_message_ack(char *payload, uint32_t length);
+
public:
ProtocolV2(AsyncConnection *connection);
virtual ~ProtocolV2();