<< " l=" << connection->policy.lossy << ").";
}
-#define WRITE(B, F) \
- connection->write(B, std::bind(F, this, std::placeholders::_1))
+#define WRITE(B, C) write(CONTINUATION(C), B)
-#define READ(L, F) \
- connection->read( \
- L, temp_buffer, \
- std::bind(F, this, std::placeholders::_1, std::placeholders::_2))
+#define READ(L, C) read(CONTINUATION(C), L)
-#define READB(L, B, F) \
- connection->read( \
- L, B, std::bind(F, this, std::placeholders::_1, std::placeholders::_2))
+#define READB(L, B, C) read(CONTINUATION(C), L, B)
// Constant to limit starting sequence number to 2^31. Nothing special about
// it, just a big number. PLR
ldout(cct, 20) << __func__ << dendl;
switch (state) {
case START_CONNECT:
- send_client_banner();
+ CONTINUATION_RUN(CONTINUATION(send_client_banner));
break;
case START_ACCEPT:
- send_server_banner();
+ CONTINUATION_RUN(CONTINUATION(send_server_banner));
break;
case OPENED:
- wait_message();
+ CONTINUATION_RUN(CONTINUATION(wait_message));
break;
case THROTTLE_MESSAGE:
- throttle_message();
+ CONTINUATION_RUN(CONTINUATION(throttle_message));
break;
case THROTTLE_BYTES:
- throttle_bytes();
+ CONTINUATION_RUN(CONTINUATION(throttle_bytes));
break;
case THROTTLE_DISPATCH_QUEUE:
- throttle_dispatch_queue();
+ CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
break;
default:
break;
bool ProtocolV1::is_queued() { return !out_q.empty(); }
-void ProtocolV1::ready() {
+void ProtocolV1::run_continuation(CtPtr continuation) {
+ CONTINUATION_RUN(continuation);
+}
+
+CtPtr ProtocolV1::read(CONTINUATION_PARAM(next, ProtocolV1, char *, int),
+ int len, char *buffer) {
+ if (!buffer) {
+ buffer = temp_buffer;
+ }
+ ssize_t r = connection->read(len, buffer,
+ [CONTINUATION(next), this](char *buffer, int r) {
+ CONTINUATION(next)->setParams(buffer, r);
+ CONTINUATION_RUN(CONTINUATION(next));
+ });
+ if (r <= 0) {
+ return CONTINUE(next, buffer, r);
+ }
+
+ return nullptr;
+}
+
+CtPtr ProtocolV1::write(CONTINUATION_PARAM(next, ProtocolV1, int),
+ bufferlist &buffer) {
+ ssize_t r = connection->write(buffer, [CONTINUATION(next), this](int r) {
+ CONTINUATION(next)->setParams(r);
+ CONTINUATION_RUN(CONTINUATION(next));
+ });
+ if (r <= 0) {
+ return CONTINUE(next, r);
+ }
+
+ return nullptr;
+}
+
+CtPtr ProtocolV1::ready() {
ldout(cct, 25) << __func__ << dendl;
// make sure no pending tick timer
connection->maybe_start_delay_thread();
state = OPENED;
- wait_message();
+ return wait_message();
}
-void ProtocolV1::wait_message() {
+CtPtr ProtocolV1::wait_message() {
if (state != OPENED) { // must have changed due to a replace
- return;
+ return nullptr;
}
ldout(cct, 20) << __func__ << dendl;
- READ(sizeof(char), &ProtocolV1::handle_message);
+ return READ(sizeof(char), handle_message);
}
-void ProtocolV1::handle_message(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read tag failed" << dendl;
- fault();
- return;
+ return _fault();
}
char tag = buffer[0];
ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
connection->set_last_keepalive(ceph_clock_now());
} else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
- READ(sizeof(ceph_timespec), &ProtocolV1::handle_keepalive2);
+ return READ(sizeof(ceph_timespec), handle_keepalive2);
} else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
- READ(sizeof(ceph_timespec), &ProtocolV1::handle_keepalive2_ack);
+ return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
} else if (tag == CEPH_MSGR_TAG_ACK) {
- READ(sizeof(ceph_le64), &ProtocolV1::handle_tag_ack);
+ return READ(sizeof(ceph_le64), handle_tag_ack);
} else if (tag == CEPH_MSGR_TAG_MSG) {
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
ltt_recv_stamp = ceph_clock_now();
#endif
recv_stamp = ceph_clock_now();
ldout(cct, 20) << __func__ << " begin MSG" << dendl;
- READ(sizeof(ceph_msg_header), &ProtocolV1::handle_message_header);
+ return READ(sizeof(ceph_msg_header), handle_message_header);
} else if (tag == CEPH_MSGR_TAG_CLOSE) {
ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
stop();
} else {
ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
- fault();
+ return _fault();
}
+ return nullptr;
}
-void ProtocolV1::handle_keepalive2(char *buffer, int r) {
+CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
connection->center->dispatch_event_external(connection->write_handler);
}
- wait_message();
+ return CONTINUE(wait_message);
}
void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
}
}
-void ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
+CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
- fault();
- return;
+ return _fault();
}
ceph_timespec *t;
connection->set_last_keepalive_ack(utime_t(*t));
ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
- wait_message();
+ return CONTINUE(wait_message);
}
-void ProtocolV1::handle_tag_ack(char *buffer, int r) {
+CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
- fault();
- return;
+ return _fault();
}
ceph_le64 seq;
pending[k]->put();
}
- wait_message();
+ return CONTINUE(wait_message);
}
-void ProtocolV1::handle_message_header(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read message header failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__ << " got MSG header" << dendl;
if (header_crc != header.crc) {
ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
<< " != " << header.crc << dendl;
- fault();
- return;
+ return _fault();
}
}
current_header = header;
state = THROTTLE_MESSAGE;
- throttle_message();
+ return CONTINUE(throttle_message);
}
-void ProtocolV1::throttle_message() {
+CtPtr ProtocolV1::throttle_message() {
ldout(cct, 20) << __func__ << dendl;
if (connection->policy.throttler_messages) {
connection->center->create_time_event(1000,
connection->wakeup_handler));
}
- return;
+ return nullptr;
}
}
state = THROTTLE_BYTES;
- throttle_bytes();
+ return CONTINUE(throttle_bytes);
}
-void ProtocolV1::throttle_bytes() {
+CtPtr ProtocolV1::throttle_bytes() {
ldout(cct, 20) << __func__ << dendl;
cur_msg_size = current_header.front_len + current_header.middle_len +
connection->center->create_time_event(
1000, connection->wakeup_handler));
}
- return;
+ return nullptr;
}
}
}
state = THROTTLE_DISPATCH_QUEUE;
- throttle_dispatch_queue();
+ return CONTINUE(throttle_dispatch_queue);
}
-void ProtocolV1::throttle_dispatch_queue() {
+CtPtr ProtocolV1::throttle_dispatch_queue() {
ldout(cct, 20) << __func__ << dendl;
if (cur_msg_size) {
connection->center->create_time_event(1000,
connection->wakeup_handler));
}
- return;
+ return nullptr;
}
}
throttle_stamp = ceph_clock_now();
state = READ_MESSAGE_FRONT;
- read_message_front();
+ return read_message_front();
}
-void ProtocolV1::read_message_front() {
+CtPtr ProtocolV1::read_message_front() {
ldout(cct, 20) << __func__ << dendl;
unsigned front_len = current_header.front_len;
if (!front.length()) {
front.push_back(buffer::create(front_len));
}
- READB(front_len, front.c_str(), &ProtocolV1::handle_message_front);
- } else {
- read_message_middle();
+ return READB(front_len, front.c_str(), handle_message_front);
}
+ return read_message_middle();
}
-void ProtocolV1::handle_message_front(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read message front failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
- read_message_middle();
+ return read_message_middle();
}
-void ProtocolV1::read_message_middle() {
+CtPtr ProtocolV1::read_message_middle() {
ldout(cct, 20) << __func__ << dendl;
if (current_header.middle_len) {
if (!middle.length()) {
middle.push_back(buffer::create(current_header.middle_len));
}
- READB(current_header.middle_len, middle.c_str(),
- &ProtocolV1::handle_message_middle);
- } else {
- read_message_data_prepare();
+ return READB(current_header.middle_len, middle.c_str(),
+ handle_message_middle);
}
+
+ return read_message_data_prepare();
}
-void ProtocolV1::handle_message_middle(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
- read_message_data_prepare();
+ return read_message_data_prepare();
}
-void ProtocolV1::read_message_data_prepare() {
+CtPtr ProtocolV1::read_message_data_prepare() {
ldout(cct, 20) << __func__ << dendl;
unsigned data_len = le32_to_cpu(current_header.data_len);
msg_left = data_len;
- read_message_data();
+ return CONTINUE(read_message_data);
}
-void ProtocolV1::read_message_data() {
+CtPtr ProtocolV1::read_message_data() {
ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
if (msg_left > 0) {
bufferptr bp = data_blp.get_current_ptr();
unsigned read_len = std::min(bp.length(), msg_left);
- READB(read_len, bp.c_str(), &ProtocolV1::handle_message_data);
- } else {
- read_message_footer();
+ return READB(read_len, bp.c_str(), handle_message_data);
}
+
+ return read_message_footer();
}
-void ProtocolV1::handle_message_data(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read data error " << dendl;
- fault();
- return;
+ return _fault();
}
bufferptr bp = data_blp.get_current_ptr();
data.append(bp, 0, read_len);
msg_left -= read_len;
- read_message_data();
+ return CONTINUE(read_message_data);
}
-void ProtocolV1::read_message_footer() {
+CtPtr ProtocolV1::read_message_footer() {
ldout(cct, 20) << __func__ << dendl;
state = READ_FOOTER_AND_DISPATCH;
len = sizeof(ceph_msg_footer_old);
}
- READ(len, &ProtocolV1::handle_message_footer);
+ return READ(len, handle_message_footer);
}
-void ProtocolV1::handle_message_footer(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read footer data error " << dendl;
- fault();
- return;
+ return _fault();
}
ceph_msg_footer footer;
ldout(cct, 0) << __func__ << " got " << front.length() << " + "
<< middle.length() << " + " << data.length()
<< " byte message.. ABORTED" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__ << " got " << front.length() << " + "
footer, front, middle, data, connection);
if (!message) {
ldout(cct, 1) << __func__ << " decode message failed " << dendl;
- fault();
- return;
+ return _fault();
}
//
if (session_security->check_message_signature(message)) {
ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
message->put();
- fault();
- return;
+ return _fault();
}
}
message->set_byte_throttler(connection->policy.throttler_bytes);
cct->_conf->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
}
- return;
+ return nullptr;
}
if (message->get_seq() > cur_seq + 1) {
ldout(cct, 0) << __func__ << " missed message? skipped from seq "
connection->center->dispatch_event_external(connection->write_handler);
}
- wait_message();
+ return CONTINUE(wait_message);
}
void ProtocolV1::session_reset() {
* Client Protocol V1
**/
-void ProtocolV1::send_client_banner() {
+CtPtr ProtocolV1::send_client_banner() {
ldout(cct, 20) << __func__ << dendl;
state = CONNECTING;
bufferlist bl;
bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
- WRITE(bl, &ProtocolV1::handle_client_banner_write);
+ return WRITE(bl, handle_client_banner_write);
}
-void ProtocolV1::handle_client_banner_write(int r) {
+CtPtr ProtocolV1::handle_client_banner_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 10) << __func__ << " connect write banner done: "
<< connection->get_peer_addr() << dendl;
- wait_server_banner();
+ return wait_server_banner();
}
-void ProtocolV1::wait_server_banner() {
+CtPtr ProtocolV1::wait_server_banner() {
state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
ldout(cct, 20) << __func__ << dendl;
bufferlist myaddrbl;
unsigned banner_len = strlen(CEPH_BANNER);
unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
- READ(need_len, &ProtocolV1::handle_server_banner_and_identify);
+ return READ(need_len, handle_server_banner_and_identify);
}
-void ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
+CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
<< dendl;
- fault();
- return;
+ return _fault();
}
unsigned banner_len = strlen(CEPH_BANNER);
if (memcmp(buffer, CEPH_BANNER, banner_len)) {
ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
<< connection->get_peer_addr() << dendl;
- fault();
- return;
+ return _fault();
}
bufferlist bl;
decode(peer_addr_for_me, p);
} catch (const buffer::error &e) {
lderr(cct) << __func__ << " decode peer addr failed " << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
<< " on socket " << connection->cs.fd() << dendl;
} else {
ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
<< peer_addr << dendl;
- fault();
- return;
+ return _fault();
}
}
ldout(cct, 1) << __func__
<< " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
- return;
+ return nullptr;
}
bufferlist myaddrbl;
encode(messenger->get_myaddrs().legacy_addr(), myaddrbl, 0); // legacy
- WRITE(myaddrbl, &ProtocolV1::handle_my_addr_write);
+ return WRITE(myaddrbl, handle_my_addr_write);
}
-void ProtocolV1::handle_my_addr_write(int r) {
+CtPtr ProtocolV1::handle_my_addr_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
<< cpp_strerror(r) << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 10) << __func__ << " connect sent my addr "
<< messenger->get_myaddrs().legacy_addr() << dendl;
- send_connect_message();
+ return CONTINUE(send_connect_message);
}
-void ProtocolV1::send_connect_message() {
+CtPtr ProtocolV1::send_connect_message() {
state = CONNECTING_SEND_CONNECT_MSG;
ldout(cct, 20) << __func__ << dendl;
<< " cseq=" << connect_seq
<< " proto=" << connect.protocol_version << dendl;
- WRITE(bl, &ProtocolV1::handle_connect_message_write);
+ return WRITE(bl, handle_connect_message_write);
}
-void ProtocolV1::handle_connect_message_write(int r) {
+CtPtr ProtocolV1::handle_connect_message_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 2) << __func__ << " connect couldn't send reply "
<< cpp_strerror(r) << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 20) << __func__
<< " connect wrote (self +) cseq, waiting for reply" << dendl;
- wait_connect_reply();
+ return wait_connect_reply();
}
-void ProtocolV1::wait_connect_reply() {
+CtPtr ProtocolV1::wait_connect_reply() {
ldout(cct, 20) << __func__ << dendl;
memset(&connect_reply, 0, sizeof(connect_reply));
- READ(sizeof(connect_reply), &ProtocolV1::handle_connect_reply_1);
+ return READ(sizeof(connect_reply), handle_connect_reply_1);
}
-void ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
- fault();
- return;
+ return _fault();
}
connect_reply = *((ceph_msg_connect_reply *)buffer);
<< connect_reply.features << dendl;
if (connect_reply.authorizer_len) {
- wait_connect_reply_auth();
- return;
+ return wait_connect_reply_auth();
}
- handle_connect_reply_2();
+ return handle_connect_reply_2();
}
-void ProtocolV1::wait_connect_reply_auth() {
+CtPtr ProtocolV1::wait_connect_reply_auth() {
ldout(cct, 20) << __func__ << dendl;
ldout(cct, 10) << __func__
ceph_assert(connect_reply.authorizer_len < 4096);
- READ(connect_reply.authorizer_len, &ProtocolV1::handle_connect_reply_auth);
+ return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
}
-void ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
<< dendl;
- fault();
- return;
+ return _fault();
}
bufferlist authorizer_reply;
if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
authorizer->add_challenge(cct, authorizer_reply);
- send_connect_message();
- return;
+ return CONTINUE(send_connect_message);
}
auto iter = authorizer_reply.cbegin();
if (authorizer && !authorizer->verify_reply(iter)) {
ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
- fault();
- return;
+ return _fault();
}
- handle_connect_reply_2();
+ return handle_connect_reply_2();
}
-void ProtocolV1::handle_connect_reply_2() {
+CtPtr ProtocolV1::handle_connect_reply_2() {
ldout(cct, 20) << __func__ << dendl;
if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
<< (connect_reply.features &
~connection->policy.features_supported)
<< std::dec << dendl;
- fault();
- return;
+ return _fault();
}
if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
<< messenger->get_proto_version(connection->peer_type, true)
<< " != " << connect_reply.protocol_version << dendl;
- fault();
- return;
+ return _fault();
}
if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
if (got_bad_auth) {
- fault();
- return;
+ return _fault();
}
got_bad_auth = true;
delete authorizer;
authorizer =
messenger->get_authorizer(connection->peer_type, true); // try harder
- send_connect_message();
- return;
+ return CONTINUE(send_connect_message);
}
if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
// see session_reset
connection->outcoming_bl.clear();
- send_connect_message();
- return;
+ return CONTINUE(send_connect_message);
}
if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
<< connect_reply.global_seq << " chose new " << global_seq
<< dendl;
- send_connect_message();
- return;
+ return CONTINUE(send_connect_message);
}
if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
<< " -> " << connect_reply.connect_seq << dendl;
connect_seq = connect_reply.connect_seq;
- send_connect_message();
- return;
+ return CONTINUE(send_connect_message);
}
if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
state = WAIT;
- fault();
- return;
+ return _fault();
}
uint64_t feat_missing;
if (feat_missing) {
ldout(cct, 1) << __func__ << " missing required features " << std::hex
<< feat_missing << std::dec << dendl;
- fault();
- return;
+ return _fault();
}
if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
<< " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
<< dendl;
- wait_ack_seq();
- return;
+ return wait_ack_seq();
}
if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
}
- client_ready();
+ return client_ready();
}
-void ProtocolV1::wait_ack_seq() {
+CtPtr ProtocolV1::wait_ack_seq() {
ldout(cct, 20) << __func__ << dendl;
- READ(sizeof(uint64_t), &ProtocolV1::handle_ack_seq);
+ return READ(sizeof(uint64_t), handle_ack_seq);
}
-void ProtocolV1::handle_ack_seq(char *buffer, int r) {
+CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
- fault();
- return;
+ return _fault();
}
uint64_t newly_acked_seq = 0;
uint64_t s = in_seq;
bl.append((char *)&s, sizeof(s));
- WRITE(bl, &ProtocolV1::handle_in_seq_write);
+ return WRITE(bl, handle_in_seq_write);
}
-void ProtocolV1::handle_in_seq_write(int r) {
+CtPtr ProtocolV1::handle_in_seq_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
- client_ready();
+ return client_ready();
}
-void ProtocolV1::client_ready() {
+CtPtr ProtocolV1::client_ready() {
ldout(cct, 20) << __func__ << dendl;
// hooray!
connection->dispatch_queue->queue_connect(connection);
messenger->ms_deliver_handle_fast_connect(connection);
- ready();
+ return ready();
}
/**
* Server Protocol V1
**/
-void ProtocolV1::send_server_banner() {
+CtPtr ProtocolV1::send_server_banner() {
ldout(cct, 20) << __func__ << dendl;
state = ACCEPTING;
ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() << " "
<< connection->socket_addr << dendl;
- WRITE(bl, &ProtocolV1::handle_server_banner_write);
+ return WRITE(bl, handle_server_banner_write);
}
-void ProtocolV1::handle_server_banner_write(int r) {
+CtPtr ProtocolV1::handle_server_banner_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << " write server banner failed" << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 10) << __func__ << " write banner and addr done: "
<< connection->get_peer_addr() << dendl;
- wait_client_banner();
+ return wait_client_banner();
}
-void ProtocolV1::wait_client_banner() {
+CtPtr ProtocolV1::wait_client_banner() {
ldout(cct, 20) << __func__ << dendl;
- READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
- &ProtocolV1::handle_client_banner);
+ return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
+ handle_client_banner);
}
-void ProtocolV1::handle_client_banner(char *buffer, int r) {
+CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
- fault();
- return;
+ return _fault();
}
if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
<< "' (should be '" << CEPH_BANNER << "')" << dendl;
- fault();
- return;
+ return _fault();
}
bufferlist addr_bl;
decode(peer_addr, ti);
} catch (const buffer::error &e) {
lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
- fault();
- return;
+ return _fault();
}
ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
connection->set_peer_addr(peer_addr); // so that connection_state gets set up
connection->target_addr = peer_addr;
- wait_connect_message();
+ return CONTINUE(wait_connect_message);
}
-void ProtocolV1::wait_connect_message() {
+CtPtr ProtocolV1::wait_connect_message() {
ldout(cct, 20) << __func__ << dendl;
memset(&connect_msg, 0, sizeof(connect_msg));
- READ(sizeof(connect_msg), &ProtocolV1::handle_connect_message_1);
+ return READ(sizeof(connect_msg), handle_connect_message_1);
}
-void ProtocolV1::handle_connect_message_1(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
- fault();
- return;
+ return _fault();
}
connect_msg = *((ceph_msg_connect *)buffer);
state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
if (connect_msg.authorizer_len) {
- wait_connect_message_auth();
- return;
+ return wait_connect_message_auth();
}
- handle_connect_message_2();
+ return handle_connect_message_2();
}
-void ProtocolV1::wait_connect_message_auth() {
+CtPtr ProtocolV1::wait_connect_message_auth() {
ldout(cct, 20) << __func__ << dendl;
if (!authorizer_buf.length()) {
authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
}
- READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
- &ProtocolV1::handle_connect_message_auth);
+ return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
+ handle_connect_message_auth);
}
-void ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
- fault();
- return;
+ return _fault();
}
- handle_connect_message_2();
+ return handle_connect_message_2();
}
-void ProtocolV1::handle_connect_message_2() {
+CtPtr ProtocolV1::handle_connect_message_2() {
ldout(cct, 20) << __func__ << dendl;
ldout(cct, 20) << __func__ << " accept got peer connect_seq "
<< ", their proto " << connect_msg.protocol_version << dendl;
if (connect_msg.protocol_version != reply.protocol_version) {
- send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
+ authorizer_reply);
}
// require signatures for cephx?
if (feat_missing) {
ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
<< feat_missing << std::dec << dendl;
- send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply, authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
+ authorizer_reply);
}
connection->lock.unlock();
if (need_challenge && !had_challenge && authorizer_challenge) {
ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
ceph_assert(authorizer_reply.length());
- send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
+ reply, authorizer_reply);
} else {
ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
<< authorizer_reply.length() << dendl;
session_security.reset();
- send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
+ authorizer_reply);
}
}
<< " state changed while accept, it must be mark_down"
<< dendl;
ceph_assert(state == CLOSED);
- fault();
- return;
+ return _fault();
}
if (existing == connection) {
existing->lock.unlock();
existing = nullptr;
- open(reply, authorizer_reply);
- return;
+ return open(reply, authorizer_reply);
}
if (exproto->replacing) {
<< connection->get_state_name(existing->state) << dendl;
reply.global_seq = exproto->peer_global_seq;
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
+ authorizer_reply);
}
if (connect_msg.global_seq < exproto->peer_global_seq) {
<< connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
reply.global_seq = exproto->peer_global_seq; // so we can send it below..
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
+ authorizer_reply);
} else {
ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
<< exproto->peer_global_seq
<< " accept replacing existing (lossy) channel (new one lossy="
<< connection->policy.lossy << ")" << dendl;
exproto->session_reset();
- replace(existing, reply, authorizer_reply);
- return;
+ return replace(existing, reply, authorizer_reply);
}
ldout(cct, 1) << __func__ << " accept connect_seq "
exproto->session_reset(); // this resets out_queue, msg_ and
// connect_seq #'s
}
- replace(existing, reply, authorizer_reply);
- return;
+ return replace(existing, reply, authorizer_reply);
}
if (connect_msg.connect_seq < exproto->connect_seq) {
<< ", RETRY_SESSION" << dendl;
reply.connect_seq = exproto->connect_seq + 1;
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
+ authorizer_reply);
}
if (connect_msg.connect_seq == exproto->connect_seq) {
// if connect_seq both zero, dont stuck into dead lock. it's ok to
// replace
if (connection->policy.resetcheck && exproto->connect_seq == 0) {
- replace(existing, reply, authorizer_reply);
- return;
+ return replace(existing, reply, authorizer_reply);
}
reply.connect_seq = exproto->connect_seq + 1;
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
+ authorizer_reply);
}
// connection race?
<< existing << ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq
<< ", or we are server, replacing my attempt" << dendl;
- replace(existing, reply, authorizer_reply);
- return;
+ return replace(existing, reply, authorizer_reply);
} else {
// our existing outgoing wins
ldout(messenger->cct, 10)
ceph_assert(connection->peer_addrs.legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply, authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
+ authorizer_reply);
}
}
<< ".cseq = " << exproto->connect_seq
<< "), sending RESETSESSION " << dendl;
existing->lock.unlock();
- send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
+ authorizer_reply);
}
// reconnect
ldout(cct, 10) << __func__ << " accept peer sent cseq "
<< connect_msg.connect_seq << " > " << exproto->connect_seq
<< dendl;
- replace(existing, reply, authorizer_reply);
- return;
+ return replace(existing, reply, authorizer_reply);
} // existing
else if (!replacing && connect_msg.connect_seq > 0) {
// we reset, and they are opening a new session
ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect_msg.connect_seq << "), sending RESETSESSION"
<< dendl;
- send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
- authorizer_reply);
- return;
+ return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
+ authorizer_reply);
} else {
// new session
ldout(cct, 10) << __func__ << " accept new session" << dendl;
existing = nullptr;
- open(reply, authorizer_reply);
- return;
+ return open(reply, authorizer_reply);
}
}
-void ProtocolV1::send_connect_message_reply(char tag,
- ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::send_connect_message_reply(char tag,
+ ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply) {
ldout(cct, 20) << __func__ << dendl;
bufferlist reply_bl;
reply.tag = tag;
authorizer_reply.clear();
}
- WRITE(reply_bl, &ProtocolV1::handle_connect_message_reply_write);
+ return WRITE(reply_bl, handle_connect_message_reply_write);
}
-void ProtocolV1::handle_connect_message_reply_write(int r) {
+CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << " write connect message reply failed" << dendl;
connection->inject_delay();
- fault();
- return;
+ return _fault();
}
- wait_connect_message();
+ return CONTINUE(wait_connect_message);
}
-void ProtocolV1::replace(AsyncConnectionRef existing,
- ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
+ ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply) {
ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
connection->inject_delay();
existing->center->create_file_event(
existing->cs.fd(), EVENT_READABLE, existing->read_handler);
reply.global_seq = exproto->peer_global_seq;
- exproto->send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL,
- reply, authorizer_reply);
+ exproto->run_continuation(exproto->send_connect_message_reply(
+ CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
};
if (existing->center->in_thread())
transfer_existing();
std::move(deactivate_existing), true);
existing->write_lock.unlock();
existing->lock.unlock();
- return;
+ return nullptr;
}
existing->lock.unlock();
- open(reply, authorizer_reply);
+ return open(reply, authorizer_reply);
}
-void ProtocolV1::open(ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply) {
ldout(cct, 20) << __func__ << dendl;
connect_seq = connect_msg.connect_seq + 1;
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
- fault();
- return;
+ return _fault();
}
if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(cct, 1) << __func__
ceph_assert(state == CLOSED || state == NONE);
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
- fault();
- return;
+ return _fault();
}
- WRITE(reply_bl, &ProtocolV1::handle_ready_connect_message_reply_write);
+ return WRITE(reply_bl, handle_ready_connect_message_reply_write);
}
-void ProtocolV1::handle_ready_connect_message_reply_write(int r) {
+CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " write ready connect message reply failed"
<< dendl;
- fault();
- return;
+ return _fault();
}
// notify
state = ACCEPTING_HANDLED_CONNECT_MSG;
if (wait_for_seq) {
- wait_seq();
- } else {
- server_ready();
+ return wait_seq();
}
+
+ return server_ready();
}
-void ProtocolV1::wait_seq() {
+CtPtr ProtocolV1::wait_seq() {
ldout(cct, 20) << __func__ << dendl;
- READ(sizeof(uint64_t), &ProtocolV1::handle_seq);
+ return READ(sizeof(uint64_t), handle_seq);
}
-void ProtocolV1::handle_seq(char *buffer, int r) {
+CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
- fault();
- return;
+ return _fault();
}
uint64_t newly_acked_seq = *(uint64_t *)buffer;
<< dendl;
out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
- server_ready();
+ return server_ready();
}
-void ProtocolV1::server_ready() {
+CtPtr ProtocolV1::server_ready() {
ldout(cct, 20) << __func__ << dendl;
ldout(cct, 20) << __func__ << " accept done" << dendl;
ceph_assert(connection->delay_state->ready());
}
- ready();
+ return ready();
}
#include "include/buffer.h"
#include "include/msgr.h"
+/*
+ * Continuation Helper Classes
+ */
+
+#include <memory>
+#include <tuple>
+
+template <class C>
+class Ct {
+public:
+ virtual ~Ct() {}
+ virtual Ct<C> *call(C *foo) const = 0;
+};
+
+template <class C, typename... Args>
+class CtFun : public Ct<C> {
+private:
+ using fn = Ct<C> *(C::*)(Args...);
+ fn _f;
+ std::tuple<Args...> _params;
+
+ template <std::size_t... Is>
+ inline Ct<C> *_call(C *foo, std::index_sequence<Is...>) const {
+ return (foo->*_f)(std::get<Is>(_params)...);
+ }
+
+public:
+ CtFun(fn f) : _f(f) {}
+
+ inline void setParams(Args... args) { _params = std::make_tuple(args...); }
+ inline Ct<C> *call(C *foo) const override {
+ return _call(foo, std::index_sequence_for<Args...>());
+ }
+};
+
+#define CONTINUATION_DECL(C, F, ...) \
+ std::unique_ptr<CtFun<C, ##__VA_ARGS__>> F##_cont_ = \
+ std::make_unique<CtFun<C, ##__VA_ARGS__>>(&C::F); \
+ CtFun<C, ##__VA_ARGS__> *F##_cont = F##_cont_.get()
+
+#define CONTINUATION_PARAM(V, C, ...) CtFun<C, ##__VA_ARGS__> *V##_cont
+
+#define CONTINUATION(F) F##_cont
+#define CONTINUE(F, ...) F##_cont->setParams(__VA_ARGS__), F##_cont
+
+#define CONTINUATION_RUN(CT) \
+ { \
+ Ct<std::remove_reference<decltype(*this)>::type> *_cont = CT; \
+ while (_cont) { \
+ _cont = _cont->call(this); \
+ } \
+ }
+
+//////////////////////////////////////////////////////////////////////
+
class AsyncMessenger;
class Protocol {
virtual bool is_queued() = 0;
};
+class ProtocolV1;
+using CtPtr = Ct<ProtocolV1>*;
+#define READ_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, char*, int)
+#define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
+
class ProtocolV1 : public Protocol {
/*
* ProtocolV1 State Machine
*/
protected:
+
enum State {
NONE = 0,
START_CONNECT,
State state;
- void ready();
- void wait_message();
- void handle_message(char *buffer, int r);
+ void run_continuation(CtPtr continuation);
+ CtPtr read(CONTINUATION_PARAM(next, ProtocolV1, char *, int), int len,
+ char *buffer = nullptr);
+ CtPtr write(CONTINUATION_PARAM(next, ProtocolV1, int), bufferlist &bl);
+ inline CtPtr _fault() { // helper fault method that stops continuation
+ fault();
+ return nullptr;
+ }
- void handle_keepalive2(char *buffer, int r);
+ CONTINUATION_DECL(ProtocolV1, wait_message);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2_ack);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_tag_ack);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_header);
+ CONTINUATION_DECL(ProtocolV1, throttle_message);
+ CONTINUATION_DECL(ProtocolV1, throttle_bytes);
+ CONTINUATION_DECL(ProtocolV1, throttle_dispatch_queue);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_front);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_middle);
+ CONTINUATION_DECL(ProtocolV1, read_message_data);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_data);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_footer);
+
+ CtPtr ready();
+ CtPtr wait_message();
+ CtPtr handle_message(char *buffer, int r);
+
+ CtPtr handle_keepalive2(char *buffer, int r);
void append_keepalive_or_ack(bool ack = false, utime_t *t = nullptr);
- void handle_keepalive2_ack(char *buffer, int r);
- void handle_tag_ack(char *buffer, int r);
-
- void handle_message_header(char *buffer, int r);
- void throttle_message();
- void throttle_bytes();
- void throttle_dispatch_queue();
- void read_message_front();
- void handle_message_front(char *buffer, int r);
- void read_message_middle();
- void handle_message_middle(char *buffer, int r);
- void read_message_data_prepare();
- void read_message_data();
- void handle_message_data(char *buffer, int r);
- void read_message_footer();
- void handle_message_footer(char *buffer, int r);
+ CtPtr handle_keepalive2_ack(char *buffer, int r);
+ CtPtr handle_tag_ack(char *buffer, int r);
+
+ CtPtr handle_message_header(char *buffer, int r);
+ CtPtr throttle_message();
+ CtPtr throttle_bytes();
+ CtPtr throttle_dispatch_queue();
+ CtPtr read_message_front();
+ CtPtr handle_message_front(char *buffer, int r);
+ CtPtr read_message_middle();
+ CtPtr handle_message_middle(char *buffer, int r);
+ CtPtr read_message_data_prepare();
+ CtPtr read_message_data();
+ CtPtr handle_message_data(char *buffer, int r);
+ CtPtr read_message_footer();
+ CtPtr handle_message_footer(char *buffer, int r);
void session_reset();
void randomize_out_seq();
bool got_bad_auth;
AuthAuthorizer *authorizer;
- void send_client_banner();
- void handle_client_banner_write(int r);
- void wait_server_banner();
- void handle_server_banner_and_identify(char *buffer, int r);
- void handle_my_addr_write(int r);
- void send_connect_message();
- void handle_connect_message_write(int r);
- void wait_connect_reply();
- void handle_connect_reply_1(char *buffer, int r);
- void wait_connect_reply_auth();
- void handle_connect_reply_auth(char *buffer, int r);
- void handle_connect_reply_2();
- void wait_ack_seq();
- void handle_ack_seq(char *buffer, int r);
- void handle_in_seq_write(int r);
- void client_ready();
+ CONTINUATION_DECL(ProtocolV1, send_client_banner);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner_write);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_and_identify);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_my_addr_write);
+ CONTINUATION_DECL(ProtocolV1, send_connect_message);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_write);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_1);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_auth);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_ack_seq);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_in_seq_write);
+
+ CtPtr send_client_banner();
+ CtPtr handle_client_banner_write(int r);
+ CtPtr wait_server_banner();
+ CtPtr handle_server_banner_and_identify(char *buffer, int r);
+ CtPtr handle_my_addr_write(int r);
+ CtPtr send_connect_message();
+ CtPtr handle_connect_message_write(int r);
+ CtPtr wait_connect_reply();
+ CtPtr handle_connect_reply_1(char *buffer, int r);
+ CtPtr wait_connect_reply_auth();
+ CtPtr handle_connect_reply_auth(char *buffer, int r);
+ CtPtr handle_connect_reply_2();
+ CtPtr wait_ack_seq();
+ CtPtr handle_ack_seq(char *buffer, int r);
+ CtPtr handle_in_seq_write(int r);
+ CtPtr client_ready();
// Server Protocol
-private:
+protected:
bool wait_for_seq;
- void send_server_banner();
- void handle_server_banner_write(int r);
- void wait_client_banner();
- void handle_client_banner(char *buffer, int r);
- void wait_connect_message();
- void handle_connect_message_1(char *buffer, int r);
- void wait_connect_message_auth();
- void handle_connect_message_auth(char *buffer, int r);
- void handle_connect_message_2();
- void send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply);
- void handle_connect_message_reply_write(int r);
- void replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply);
- void open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
- void handle_ready_connect_message_reply_write(int r);
- void wait_seq();
- void handle_seq(char *buffer, int r);
- void server_ready();
+ CONTINUATION_DECL(ProtocolV1, send_server_banner);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_write);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner);
+ CONTINUATION_DECL(ProtocolV1, wait_connect_message);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_1);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_auth);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
+ handle_connect_message_reply_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
+ handle_ready_connect_message_reply_write);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_seq);
+
+ CtPtr send_server_banner();
+ CtPtr handle_server_banner_write(int r);
+ CtPtr wait_client_banner();
+ CtPtr handle_client_banner(char *buffer, int r);
+ CtPtr wait_connect_message();
+ CtPtr handle_connect_message_1(char *buffer, int r);
+ CtPtr wait_connect_message_auth();
+ CtPtr handle_connect_message_auth(char *buffer, int r);
+ CtPtr handle_connect_message_2();
+ CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply);
+ CtPtr handle_connect_message_reply_write(int r);
+ CtPtr replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply);
+ CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
+ CtPtr handle_ready_connect_message_reply_write(int r);
+ CtPtr wait_seq();
+ CtPtr handle_seq(char *buffer, int r);
+ CtPtr server_ready();
};
class LoopbackProtocolV1 : public ProtocolV1 {