<< " l=" << connection->policy.lossy << ").";
}
+// TODO: REMOVE THIS
+void ProtocolV2::log(const std::string message, uint64_t val, uint64_t val2) {
+ ldout(cct, 1) << __func__ << " " << message << val << ":" << val2 << dendl;
+}
+
+using CtPtr = Ct<ProtocolV2> *;
+
+void ProtocolV2::run_continuation(CtPtr continuation) {
+ try {
+ CONTINUATION_RUN(continuation)
+ } catch (const buffer::error &e) {
+ lderr(cct) << __func__ << " failed decoding of frame header: " << e
+ << dendl;
+ _fault();
+ }
+}
+
const int ASYNC_COALESCE_THRESHOLD = 256;
-#define WRITE(B, C) write(CONTINUATION(C), B)
+#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
#define READ(L, C) read(CONTINUATION(C), L)
#define READB(L, B, C) read(CONTINUATION(C), L, B)
-using CtPtr = Ct<ProtocolV2> *;
-
static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
// create a buffer to read into that matches the data alignment
unsigned alloc_len = 0;
data.push_back(std::move(ptr));
}
+/**
+ * Protocol V2 Frame Structures
+ **/
+
+template <class T>
+struct Frame {
+protected:
+ bufferlist payload;
+ bufferlist frame_buffer;
+
+public:
+ Frame() {}
+
+ bufferlist &get_buffer() {
+ if (frame_buffer.length()) {
+ return frame_buffer;
+ }
+ encode((uint32_t)(payload.length() + sizeof(uint32_t)), frame_buffer, -1ll);
+ uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+ ceph_assert(tag != 0);
+ encode(tag, frame_buffer, -1ll);
+ frame_buffer.claim_append(payload);
+ return frame_buffer;
+ }
+
+ void decode_frame(char *payload, uint32_t length) {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ auto ti = bl.cbegin();
+ static_cast<T *>(this)->decode_payload(ti);
+ }
+
+ void decode_payload(bufferlist::const_iterator &ti) {}
+};
+
+template <class C, typename... Args>
+struct PayloadFrame : public Frame<C> {
+ std::tuple<Args...> _values;
+
+ template <typename T>
+ inline void _encode_payload_each(T &t) {
+ if constexpr (std::is_same<T, bufferlist const>()) {
+ encode((uint32_t)t.length(), this->payload, -1ll);
+ this->payload.claim_append((bufferlist &)t);
+ } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
+ encode((uint32_t)t.size(), this->payload, -1ll);
+ for (const auto &elem : t) {
+ encode(elem, this->payload, 0);
+ }
+ } else {
+ encode(t, this->payload, -1ll);
+ }
+ }
+
+ PayloadFrame(const Args &... args) { (_encode_payload_each(args), ...); }
+
+ PayloadFrame(char *payload, uint32_t length) {
+ this->decode_frame(payload, length);
+ }
+
+ template <typename T>
+ inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const {
+ if constexpr (std::is_same<T, bufferlist>()) {
+ uint32_t len;
+ decode(len, ti);
+ ceph_assert(len == ti.get_remaining());
+ if (len) {
+ t.append(ti.get_current_ptr());
+ }
+ } else if constexpr (std::is_same<T, std::vector<uint32_t>>()) {
+ uint32_t size;
+ decode(size, ti);
+ for (uint32_t i = 0; i < size; ++i) {
+ decode(t[i], ti);
+ }
+ } else {
+ decode(t, ti);
+ }
+ }
+
+ template <std::size_t... Is>
+ inline void _decode_payload(bufferlist::const_iterator &ti,
+ std::index_sequence<Is...>) const {
+ (_decode_payload_each((Args &)std::get<Is>(_values), ti), ...);
+ }
+
+ void decode_payload(bufferlist::const_iterator &ti) {
+ _decode_payload(ti, std::index_sequence_for<Args...>());
+ }
+
+ template <std::size_t N>
+ inline decltype(auto) get_val() {
+ return std::get<N>(_values);
+ }
+};
+
+struct AuthRequestFrame
+ : public PayloadFrame<AuthRequestFrame, uint32_t, bufferlist> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
+ using PayloadFrame::PayloadFrame;
+
+ AuthRequestFrame(uint32_t method) : AuthRequestFrame(method, bufferlist()) {}
+
+ inline uint32_t &method() { return get_val<0>(); }
+ inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+struct AuthBadMethodFrame
+ : public PayloadFrame<AuthBadMethodFrame, uint32_t, std::vector<uint32_t>> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_METHOD;
+ using PayloadFrame::PayloadFrame;
+
+ inline uint32_t &method() { return get_val<0>(); }
+ inline std::vector<uint32_t> &allowed_methods() { return get_val<1>(); }
+};
+
+struct AuthBadAuthFrame
+ : public PayloadFrame<AuthBadAuthFrame, uint32_t, std::string> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_AUTH;
+ using PayloadFrame::PayloadFrame;
+
+ inline uint32_t &error_code() { return get_val<0>(); }
+ inline std::string &error_msg() { return get_val<1>(); }
+};
+
+struct AuthMoreFrame : public PayloadFrame<AuthMoreFrame, bufferlist> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_MORE;
+ using PayloadFrame::PayloadFrame;
+
+ inline bufferlist &auth_payload() { return get_val<0>(); }
+};
+
+struct AuthDoneFrame
+ : public PayloadFrame<AuthDoneFrame, uint64_t, bufferlist> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_DONE;
+ using PayloadFrame::PayloadFrame;
+
+ inline uint64_t &flags() { return get_val<0>(); }
+ inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+template <class T, typename... Args>
+struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
+protected:
+ ProtocolV2 *protocol;
+
+public:
+ SignedEncryptedFrame(ProtocolV2 *protocol, const Args &... args)
+ : PayloadFrame<T, Args...>(args...), protocol(protocol) {}
+
+ SignedEncryptedFrame(ProtocolV2 *protocol, char *payload, uint32_t length)
+ : PayloadFrame<T, Args...>(payload, length), protocol(protocol) {}
+
+ bufferlist &get_buffer() {
+ if (this->frame_buffer.length()) {
+ return this->frame_buffer;
+ }
+
+ bufferlist signature;
+ if (protocol->session_security) {
+ protocol->session_security->sign_bufferlist(this->payload, signature);
+ protocol->log("payload signature=", signature.length(), this->payload.length());
+ }
+
+ encode((uint32_t)(this->payload.length() + sizeof(uint32_t)),
+ this->frame_buffer, -1ll);
+ uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+ ceph_assert(tag != 0);
+ encode(tag, this->frame_buffer, -1ll);
+ this->frame_buffer.claim_append(this->payload);
+ return this->frame_buffer;
+ }
+};
+
+struct ClientIdentFrame
+ : public SignedEncryptedFrame<ClientIdentFrame, entity_addrvec_t, int64_t,
+ uint64_t, uint64_t, uint64_t, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline entity_addrvec_t &addrs() { return get_val<0>(); }
+ inline int64_t &gid() { return get_val<1>(); }
+ inline uint64_t &global_seq() { return get_val<2>(); }
+ inline uint64_t &supported_features() { return get_val<3>(); }
+ inline uint64_t &required_features() { return get_val<4>(); }
+ inline uint64_t &flags() { return get_val<5>(); }
+};
+
+struct ServerIdentFrame
+ : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t, int64_t,
+ uint64_t, uint64_t, uint64_t, uint64_t,
+ uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline entity_addrvec_t &addrs() { return get_val<0>(); }
+ inline int64_t &gid() { return get_val<1>(); }
+ inline uint64_t &global_seq() { return get_val<2>(); }
+ inline uint64_t &supported_features() { return get_val<3>(); }
+ inline uint64_t &required_features() { return get_val<4>(); }
+ inline uint64_t &flags() { return get_val<5>(); }
+ inline uint64_t &cookie() { return get_val<6>(); }
+};
+
+struct ReconnectFrame
+ : public SignedEncryptedFrame<ReconnectFrame, entity_addrvec_t, uint64_t,
+ uint64_t, uint64_t, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline entity_addrvec_t &addrs() { return get_val<0>(); }
+ inline uint64_t &cookie() { return get_val<1>(); }
+ inline uint64_t &global_seq() { return get_val<2>(); }
+ inline uint64_t &connect_seq() { return get_val<3>(); }
+ inline uint64_t &msg_seq() { return get_val<4>(); }
+};
+
+struct ResetFrame : public Frame<ResetFrame> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET;
+};
+
+struct RetryFrame : public SignedEncryptedFrame<RetryFrame, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ uint64_t connect_seq() { return get_val<0>(); }
+};
+
+struct RetryGlobalFrame
+ : public SignedEncryptedFrame<RetryGlobalFrame, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY_GLOBAL;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline uint64_t &global_seq() { return get_val<0>(); }
+};
+
+struct WaitFrame : public Frame<WaitFrame> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::WAIT;
+};
+
+struct ReconnectOkFrame
+ : public SignedEncryptedFrame<ReconnectOkFrame, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT_OK;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline uint64_t &msg_seq() { return get_val<0>(); }
+};
+
+struct IdentMissingFeaturesFrame
+ : public SignedEncryptedFrame<IdentMissingFeaturesFrame, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT_MISSING_FEATURES;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline uint64_t &features() { return get_val<0>(); }
+};
+
+struct MessageFrame : public Frame<MessageFrame> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE;
+ const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
+
+ ceph_msg_header2 header2;
+
+ MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
+ bool calc_crc) {
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ header2 = ceph_msg_header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version, header.front_len,
+ header.middle_len, 0,
+ header.data_len, header.data_off,
+ ack_seq, footer.front_crc,
+ footer.middle_crc, footer.data_crc,
+ footer.flags, header.compat_version,
+ header.reserved, 0};
+
+ if (calc_crc) {
+ header2.header_crc =
+ ceph_crc32c(0, (unsigned char *)&header2,
+ sizeof(header2) - sizeof(header2.header_crc));
+ }
+
+ payload.append((char *)&header2, sizeof(header2));
+ if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
+ (data.buffers().size() > 1)) {
+ for (const auto &pb : data.buffers()) {
+ payload.append((char *)pb.c_str(), pb.length());
+ }
+ } else {
+ payload.claim_append(data);
+ }
+ }
+};
+
+struct KeepAliveFrame : public SignedEncryptedFrame<KeepAliveFrame, utime_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ KeepAliveFrame(ProtocolV2 *protocol)
+ : KeepAliveFrame(protocol, ceph_clock_now()) {}
+
+ inline utime_t ×tamp() { return get_val<0>(); }
+};
+
+struct AckFrame : public SignedEncryptedFrame<AckFrame, uint64_t> {
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::ACK;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline uint64_t &seq() { return get_val<0>(); }
+};
+
ProtocolV2::ProtocolV2(AsyncConnection *connection)
: Protocol(2, connection),
temp_buffer(nullptr),
peer_required_features(0),
authorizer(nullptr),
got_bad_auth(false),
+ got_bad_method(0),
+ auth_flags(0),
cookie(0),
+ global_seq(0),
connect_seq(0),
peer_global_seq(0),
message_seq(0),
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
- next_frame_len(0),
+ next_payload_len(0),
keepalive(false) {
temp_buffer = new char[4096];
}
state = START_CONNECT;
got_bad_auth = false;
+ got_bad_method = 0;
if (authorizer) {
delete authorizer;
authorizer = nullptr;
}
- global_seq = messenger->get_global_seq();
}
void ProtocolV2::accept() { state = START_ACCEPT; }
(*p)->put();
}
sent.clear();
- for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
+ for (map<int, list<pair<bufferlist, Message *>>>::iterator p =
out_queue.begin();
p != out_queue.end(); ++p) {
- for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
+ for (list<pair<bufferlist, Message *>>::iterator r = p->second.begin();
r != p->second.end(); ++r) {
ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
r->second->put();
return;
}
- list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
while (!sent.empty()) {
Message *m = sent.back();
if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
return seq;
}
- list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
uint64_t count = out_seq;
while (!rq.empty()) {
pair<bufferlist, Message *> p = rq.front();
}
authorizer = nullptr;
got_bad_auth = false;
+ got_bad_method = 0;
}
// clean read and write callbacks
switch (state) {
case START_CONNECT:
- CONTINUATION_RUN(CONTINUATION(start_client_banner_exchange));
+ run_continuation(CONTINUATION(start_client_banner_exchange));
break;
case START_ACCEPT:
- CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange));
+ run_continuation(CONTINUATION(start_server_banner_exchange));
break;
case READY:
- CONTINUATION_RUN(CONTINUATION(read_frame));
+ run_continuation(CONTINUATION(read_frame));
break;
case THROTTLE_MESSAGE:
- CONTINUATION_RUN(CONTINUATION(throttle_message));
+ run_continuation(CONTINUATION(throttle_message));
break;
case THROTTLE_BYTES:
- CONTINUATION_RUN(CONTINUATION(throttle_bytes));
+ run_continuation(CONTINUATION(throttle_bytes));
break;
case THROTTLE_DISPATCH_QUEUE:
- CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
+ run_continuation(CONTINUATION(throttle_dispatch_queue));
break;
default:
break;
Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
Message *m = 0;
if (!out_queue.empty()) {
- map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
+ map<int, list<pair<bufferlist, Message *>>>::reverse_iterator it =
out_queue.rbegin();
ceph_assert(!it->second.empty());
- list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
+ list<pair<bufferlist, Message *>>::iterator p = it->second.begin();
m = p->second;
if (bl) {
bl->swap(p->first);
void ProtocolV2::append_keepalive() {
ldout(cct, 10) << __func__ << dendl;
- KeepAliveFrame keepalive_frame;
+ KeepAliveFrame keepalive_frame(this);
connection->outcoming_bl.claim_append(keepalive_frame.get_buffer());
}
void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
- struct ceph_timespec ts;
- timestamp.encode_timeval(&ts);
- KeepAliveFrame keepalive_ack_frame(ts);
+ KeepAliveFrame keepalive_ack_frame(this, timestamp);
connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer());
}
if (left) {
ceph_le64 s;
s = in_seq;
- AckFrame ack(in_seq);
+ AckFrame ack(this, in_seq);
connection->outcoming_bl.claim_append(ack.get_buffer());
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
ssize_t r = connection->read(len, buffer,
[CONTINUATION(next), this](char *buffer, int r) {
CONTINUATION(next)->setParams(buffer, r);
- CONTINUATION_RUN(CONTINUATION(next));
+ run_continuation(CONTINUATION(next));
});
if (r <= 0) {
return CONTINUE(next, buffer, r);
return nullptr;
}
-CtPtr ProtocolV2::write(CONTINUATION_PARAM(next, ProtocolV2, int),
+CtPtr ProtocolV2::write(const std::string &desc,
+ CONTINUATION_PARAM(next, ProtocolV2),
bufferlist &buffer) {
- ssize_t r = connection->write(buffer, [CONTINUATION(next), this](int r) {
- CONTINUATION(next)->setParams(r);
- CONTINUATION_RUN(CONTINUATION(next));
- });
+ ssize_t r =
+ connection->write(buffer, [CONTINUATION(next), desc, this](int r) {
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
+ connection->inject_delay();
+ _fault();
+ }
+ run_continuation(CONTINUATION(next));
+ });
+
if (r <= 0) {
- return CONTINUE(next, r);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+ return CONTINUE(next);
}
return nullptr;
bufferlist bl;
bl.append(banner, banner_len);
- return WRITE(bl, _banner_exchange_handle_write);
+ return WRITE(bl, "banner", _wait_for_peer_banner);
}
-CtPtr ProtocolV2::_banner_exchange_handle_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
- if (r < 0) {
- ldout(cct, 1) << __func__ << " write banner failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
+CtPtr ProtocolV2::_wait_for_peer_banner() {
unsigned banner_len =
strlen(CEPH_BANNER_V2_PREFIX) + sizeof(uint8_t) + 2 * sizeof(__le64);
return READ(banner_len, _banner_exchange_handle_peer_banner);
if (connection->get_peer_type() != peer_type) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
<< " peer advertises " << connection->get_peer_type()
- << " != " << peer_type << dendl;
+ << " != " << (int)peer_type << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
return _fault();
}
- next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t);
- next_tag = static_cast<Tag>(*(uint32_t *)(buffer + sizeof(uint32_t)));
+ bufferlist bl;
+ bl.push_back(buffer::create_static(sizeof(uint32_t) * 2, buffer));
+ try {
+ auto ti = bl.cbegin();
+ uint32_t frame_len;
+ decode(frame_len, ti);
+ next_payload_len = frame_len - sizeof(uint32_t);
+ uint32_t tag;
+ decode(tag, ti);
+ next_tag = static_cast<Tag>(tag);
+ } catch (const buffer::error &e) {
+ lderr(cct) << __func__ << " failed decoding of frame header: " << e
+ << dendl;
+ return _fault();
+ }
- ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len
+ ldout(cct, 10) << __func__ << " next payload_len=" << next_payload_len
<< " tag=" << static_cast<uint32_t>(next_tag) << dendl;
switch (next_tag) {
case Tag::KEEPALIVE2:
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
- return READ(next_frame_len, handle_frame_payload);
+ return READ(next_payload_len, handle_frame_payload);
case Tag::SESSION_RESET:
return handle_session_reset();
case Tag::WAIT:
return handle_wait();
case Tag::MESSAGE:
return handle_message();
+ default: {
+ lderr(cct) << __func__
+ << " received unknown tag=" << static_cast<uint32_t>(next_tag)
+ << dendl;
+ ceph_abort();
+ }
}
return nullptr;
switch (next_tag) {
case Tag::AUTH_REQUEST:
- return handle_auth_request(buffer, next_frame_len);
+ return handle_auth_request(buffer, next_payload_len);
case Tag::AUTH_BAD_METHOD:
- return handle_auth_bad_method(buffer, next_frame_len);
+ return handle_auth_bad_method(buffer, next_payload_len);
case Tag::AUTH_BAD_AUTH:
- return handle_auth_bad_auth(buffer, next_frame_len);
+ return handle_auth_bad_auth(buffer, next_payload_len);
case Tag::AUTH_MORE:
- return handle_auth_more(buffer, next_frame_len);
+ return handle_auth_more(buffer, next_payload_len);
case Tag::AUTH_DONE:
- return handle_auth_done(buffer, next_frame_len);
+ return handle_auth_done(buffer, next_payload_len);
case Tag::IDENT:
- return handle_ident(buffer, next_frame_len);
+ return handle_ident(buffer, next_payload_len);
case Tag::IDENT_MISSING_FEATURES:
- return handle_ident_missing_features(buffer, next_frame_len);
+ return handle_ident_missing_features(buffer, next_payload_len);
case Tag::SESSION_RECONNECT:
- return handle_reconnect(buffer, next_frame_len);
+ return handle_reconnect(buffer, next_payload_len);
case Tag::SESSION_RETRY:
- return handle_session_retry(buffer, next_frame_len);
+ return handle_session_retry(buffer, next_payload_len);
case Tag::SESSION_RETRY_GLOBAL:
- return handle_session_retry_global(buffer, next_frame_len);
+ return handle_session_retry_global(buffer, next_payload_len);
case Tag::SESSION_RECONNECT_OK:
- return handle_reconnect_ok(buffer, next_frame_len);
+ return handle_reconnect_ok(buffer, next_payload_len);
case Tag::KEEPALIVE2:
- return handle_keepalive2(buffer, next_frame_len);
+ return handle_keepalive2(buffer, next_payload_len);
case Tag::KEEPALIVE2_ACK:
- return handle_keepalive2_ack(buffer, next_frame_len);
+ return handle_keepalive2_ack(buffer, next_payload_len);
case Tag::ACK:
- return handle_message_ack(buffer, next_frame_len);
+ return handle_message_ack(buffer, next_payload_len);
default:
ceph_abort();
}
AuthMoreFrame auth_more(payload, length);
ldout(cct, 1) << __func__
- << " auth more len=" << auth_more.auth_payload.length()
+ << " auth more len=" << auth_more.auth_payload().length()
<< dendl;
if (state == CONNECTING) {
ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
-
- ceph_assert(authorizer);
- authorizer->add_challenge(cct, auth_more.auth_payload);
- AuthMoreFrame more_reply(authorizer->bl);
- return WRITE(more_reply.get_buffer(), handle_auth_more_write);
-
+ if (auth_method == CEPH_AUTH_CEPHX) {
+ ceph_assert(authorizer);
+ authorizer->add_challenge(cct, auth_more.auth_payload());
+ AuthMoreFrame more_reply(authorizer->bl);
+ return WRITE(more_reply.get_buffer(), "auth more", read_frame);
+ } else {
+ ceph_abort("Auth method %d not implemented", auth_method);
+ }
} else if (state == ACCEPTING) {
- connection->lock.unlock();
- bool authorizer_valid;
- bufferlist authorizer_reply;
- ceph_assert((bool)authorizer_challenge);
- if (!messenger->ms_deliver_verify_authorizer(
- connection, connection->peer_type, auth_method,
- auth_more.auth_payload, authorizer_reply, authorizer_valid,
- session_key, &authorizer_challenge) ||
- !authorizer_valid) {
- connection->lock.lock();
-
- ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len="
- << authorizer_reply.length() << dendl;
- session_security.reset();
- AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer");
- bufferlist &bl = bad_auth.get_buffer();
- return WRITE(bl, handle_auth_bad_auth_write);
+ if (auth_method == CEPH_AUTH_CEPHX) {
+ return handle_cephx_auth(auth_more.auth_payload());
+ } else {
+ ceph_abort("Auth method %d not implemented", auth_method);
}
-
- connection->lock.lock();
-
- session_security.reset(get_auth_session_handler(
- cct, auth_method, session_key,
- CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2));
-
- AuthDoneFrame auth_done(0, authorizer_reply);
- bufferlist &bl = auth_done.get_buffer();
- return WRITE(bl, handle_auth_done_write);
} else {
ceph_abort();
}
return nullptr;
}
-CtPtr ProtocolV2::handle_auth_more_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " auth more write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
-}
-
CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) {
if (state == CONNECTING) {
return handle_server_ident(payload, length);
if (data_len) {
// get a buffer
- map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
+ map<ceph_tid_t, pair<bufferlist, int>>::iterator p =
connection->rx_buffers.find(current_header.tid);
if (p != connection->rx_buffers.end()) {
ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- KeepAliveFrame keepalive_frame(payload, length);
+ KeepAliveFrame keepalive_frame(this, payload, length);
ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
- utime_t kp_t = utime_t(keepalive_frame.timestamp);
connection->write_lock.lock();
- append_keepalive_ack(kp_t);
+ append_keepalive_ack(keepalive_frame.timestamp());
connection->write_lock.unlock();
- ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
+ ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
+ << keepalive_frame.timestamp() << dendl;
connection->set_last_keepalive(ceph_clock_now());
if (is_connected()) {
CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- KeepAliveFrame keepalive_ack_frame(payload, length);
- connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp));
+ KeepAliveFrame keepalive_ack_frame(this, payload, length);
+ connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
return CONTINUE(read_frame);
CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- AckFrame ack(payload, length);
- handle_message_ack(ack.seq);
+ AckFrame ack(this, payload, length);
+ handle_message_ack(ack.seq());
return CONTINUE(read_frame);
}
ldout(cct, 20) << __func__ << dendl;
state = CONNECTING;
+ global_seq = messenger->get_global_seq();
+
return _banner_exchange(CONTINUATION(post_client_banner_exchange));
}
return send_auth_request();
}
-CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> allowed_methods) {
+CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
ldout(cct, 20) << __func__ << dendl;
if (!authorizer) {
<< connection->peer_type << dendl;
AuthRequestFrame authFrame(auth_method);
bufferlist &bl = authFrame.get_buffer();
- return WRITE(bl, handle_auth_request_write);
+ return WRITE(bl, "auth request", read_frame);
}
auth_method = authorizer->protocol;
AuthRequestFrame authFrame(auth_method, authorizer->bl);
bufferlist &bl = authFrame.get_buffer();
- return WRITE(bl, handle_auth_request_write);
-}
-
-CtPtr ProtocolV2::handle_auth_request_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " auth request write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
+ return WRITE(bl, "auth request", read_frame);
}
CtPtr ProtocolV2::handle_auth_bad_method(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
AuthBadMethodFrame bad_method(payload, length);
- ldout(cct, 1) << __func__ << " auth method=" << bad_method.method
- << " rejected, allowed methods=" << bad_method.allowed_methods
+ ldout(cct, 1) << __func__ << " auth method=" << bad_method.method()
+ << " rejected, allowed methods=" << bad_method.allowed_methods()
<< dendl;
- return send_auth_request(bad_method.allowed_methods);
+ if (got_bad_method == bad_method.allowed_methods().size()) {
+ ldout(cct, 1) << __func__ << " too many attempts, closing connection"
+ << dendl;
+ return _fault();
+ }
+ got_bad_method++;
+
+ return send_auth_request(bad_method.allowed_methods());
}
CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) {
AuthBadAuthFrame bad_auth(payload, length);
ldout(cct, 1) << __func__ << " authentication failed"
- << " error code=" << bad_auth.error_code
- << " error message=" << bad_auth.error_msg << dendl;
+ << " error code=" << bad_auth.error_code()
+ << " error message=" << bad_auth.error_msg() << dendl;
if (got_bad_auth) {
+ ldout(cct, 1) << __func__ << " too many attempts, closing connection"
+ << dendl;
return _fault();
}
AuthRequestFrame authFrame(auth_method, authorizer->bl);
bufferlist &bl = authFrame.get_buffer();
- return WRITE(bl, handle_auth_request_write);
+ return WRITE(bl, "auth request", read_frame);
}
CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
AuthDoneFrame auth_done(payload, length);
if (authorizer) {
- auto iter = auth_done.auth_payload.cbegin();
+ auto iter = auth_done.auth_payload().cbegin();
if (!authorizer->verify_reply(iter)) {
ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
return _fault();
}
ldout(cct, 1) << __func__ << " authentication done,"
- << " flags=" << auth_done.flags << dendl;
+ << " flags=" << std::hex << auth_done.flags() << std::dec
+ << dendl;
if (authorizer) {
ldout(cct, 10) << __func__ << " setting up session_security with auth "
session_security.reset();
}
+ auth_flags = auth_done.flags();
+
if (!cookie) {
ceph_assert(connect_seq == 0);
return send_client_ident();
flags |= CEPH_MSG_CONNECT_LOSSY;
}
- ClientIdentFrame client_ident(messenger->get_myaddrs(),
+ ClientIdentFrame client_ident(this, messenger->get_myaddrs(),
messenger->get_myname().num(), global_seq,
connection->policy.features_supported,
connection->policy.features_required, flags);
<< " flags=" << flags << std::dec << dendl;
bufferlist &bl = client_ident.get_buffer();
- return WRITE(bl, handle_client_ident_write);
-}
-
-CtPtr ProtocolV2::handle_client_ident_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " client ident write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
+ return WRITE(bl, "client ident", read_frame);
}
CtPtr ProtocolV2::send_reconnect() {
ldout(cct, 20) << __func__ << dendl;
- ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq,
+ ReconnectFrame reconnect(this, messenger->get_myaddrs(), cookie, global_seq,
connect_seq, in_seq);
ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie
<< " gs=" << global_seq << " cs=" << connect_seq
<< " ms=" << in_seq << dendl;
bufferlist &bl = reconnect.get_buffer();
- return WRITE(bl, handle_reconnect_write);
-}
-
-CtPtr ProtocolV2::handle_reconnect_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
+ return WRITE(bl, "reconnect", read_frame);
}
CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- IdentMissingFeaturesFrame ident_missing(payload, length);
+ IdentMissingFeaturesFrame ident_missing(this, payload, length);
lderr(cct) << __func__
<< " client does not support all server features: " << std::hex
- << ident_missing.features << std::dec << dendl;
+ << ident_missing.features() << std::dec << dendl;
return _fault();
}
CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- RetryFrame retry(payload, length);
- connect_seq = retry.connect_seq + 1;
+ RetryFrame retry(this, payload, length);
+ connect_seq = retry.connect_seq() + 1;
ldout(cct, 1) << __func__
- << " received session retry connect_seq=" << retry.connect_seq
+ << " received session retry connect_seq=" << retry.connect_seq()
<< ", inc to cs=" << connect_seq << dendl;
return send_reconnect();
CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- RetryGlobalFrame retry(payload, length);
- global_seq = messenger->get_global_seq(retry.global_seq);
+ RetryGlobalFrame retry(this, payload, length);
+ global_seq = messenger->get_global_seq(retry.global_seq());
ldout(cct, 1) << __func__ << " received session retry global global_seq="
- << retry.global_seq << ", choose new gs=" << global_seq
+ << retry.global_seq() << ", choose new gs=" << global_seq
<< dendl;
return send_reconnect();
CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- ReconnectOkFrame reconnect_ok(payload, length);
+ ReconnectOkFrame reconnect_ok(this, payload, length);
ldout(cct, 5) << __func__
- << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl;
+ << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
+ << dendl;
- out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq);
+ out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
backoff = utime_t();
ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- ServerIdentFrame server_ident(payload, length);
+ ServerIdentFrame server_ident(this, payload, length);
ldout(cct, 5) << __func__ << " received server identification: "
- << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid
- << " global_seq=" << server_ident.global_seq
+ << "addrs=" << server_ident.addrs()
+ << " gid=" << server_ident.gid()
+ << " global_seq=" << server_ident.global_seq()
<< " features_supported=" << std::hex
- << server_ident.supported_features
- << " features_required=" << server_ident.required_features
- << " flags=" << server_ident.flags << " cookie=" << std::dec
- << server_ident.cookie << dendl;
+ << server_ident.supported_features()
+ << " features_required=" << server_ident.required_features()
+ << " flags=" << server_ident.flags() << " cookie=" << std::dec
+ << server_ident.cookie() << dendl;
- cookie = server_ident.cookie;
+ cookie = server_ident.cookie();
- connection->set_peer_addrs(server_ident.addrs);
- connection->peer_global_id = server_ident.gid;
- connection->set_features(server_ident.supported_features &
+ connection->set_peer_addrs(server_ident.addrs());
+ connection->peer_global_id = server_ident.gid();
+ connection->set_features(server_ident.supported_features() &
connection->policy.features_supported);
- peer_global_seq = server_ident.global_seq;
+ peer_global_seq = server_ident.global_seq();
- connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY;
+ connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
backoff = utime_t();
ldout(cct, 10) << __func__ << " connect success " << connect_seq
return CONTINUE(read_frame);
}
-CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
- ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
-
- AuthRequestFrame auth_request(payload, length);
-
- ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method
- << ", auth_len=" << auth_request.len << ")" << dendl;
-
- std::vector<uint32_t> allowed_methods;
- messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type,
- allowed_methods);
-
- bool found = std::find(allowed_methods.begin(), allowed_methods.end(),
- auth_request.method) != allowed_methods.end();
- if (!found) {
- ldout(cct, 1) << __func__ << " auth method=" << auth_request.method
- << " not allowed" << dendl;
- AuthBadMethodFrame bad_method(auth_request.method, allowed_methods);
- bufferlist &bl = bad_method.get_buffer();
- return WRITE(bl, handle_auth_bad_method_write);
- }
-
- ldout(cct, 10) << __func__ << " auth method=" << auth_request.method
- << " accepted" << dendl;
-
- auth_method = auth_request.method;
+CtPtr ProtocolV2::handle_cephx_auth(bufferlist &auth_payload) {
+ ldout(cct, 20) << __func__ << dendl;
- if (auth_request.method == CEPH_AUTH_NONE) {
- ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl;
+ ceph_assert(auth_method == CEPH_AUTH_CEPHX);
- session_security.reset();
- bufferlist empty_bl;
- AuthDoneFrame auth_done(0, empty_bl);
- bufferlist &bl = auth_done.get_buffer();
- return WRITE(bl, handle_auth_done_write);
- }
+ ldout(cct, 15) << __func__
+ << " authorizer payload len=" << auth_payload.length()
+ << dendl;
- connection->lock.unlock();
bool authorizer_valid;
bufferlist authorizer_reply;
bool had_challenge = (bool)authorizer_challenge;
+
+ connection->lock.unlock();
if (!messenger->ms_deliver_verify_authorizer(
- connection, connection->peer_type, auth_request.method,
- auth_request.auth_payload, authorizer_reply, authorizer_valid,
- session_key, &authorizer_challenge) ||
+ connection, connection->peer_type, auth_method, auth_payload,
+ authorizer_reply, authorizer_valid, session_key,
+ &authorizer_challenge) ||
!authorizer_valid) {
connection->lock.lock();
ldout(cct, 10) << __func__ << " challenging authorizer" << dendl;
ceph_assert(authorizer_reply.length());
AuthMoreFrame more(authorizer_reply);
- return WRITE(more.get_buffer(), handle_auth_more_write);
+ return WRITE(more.get_buffer(), "auth more", read_frame);
} else {
ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len="
<< authorizer_reply.length() << dendl;
session_security.reset();
AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer");
- bufferlist &bl = bad_auth.get_buffer();
- return WRITE(bl, handle_auth_bad_auth_write);
+ return WRITE(bad_auth.get_buffer(), "bad auth", read_frame);
}
}
get_auth_session_handler(cct, auth_method, session_key,
CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2));
- AuthDoneFrame auth_done(0, authorizer_reply);
- bufferlist &bl = auth_done.get_buffer();
- return WRITE(bl, handle_auth_done_write);
+ if (cct->_conf.get_val<bool>("ms_msgr2_sign_messages")) {
+ auth_flags |= static_cast<uint64_t>(AuthFlag::SIGNED);
+ }
+ if (cct->_conf.get_val<bool>("ms_msgr2_encrypt_messages")) {
+ auth_flags |= static_cast<uint64_t>(AuthFlag::ENCRYPTED);
+ }
+
+ ldout(cct, 1) << __func__ << " authentication done,"
+ << " flags=" << std::hex << auth_flags << std::dec << dendl;
+
+ AuthDoneFrame auth_done(auth_flags, authorizer_reply);
+ return WRITE(auth_done.get_buffer(), "auth done", read_frame);
}
-CtPtr ProtocolV2::handle_auth_bad_method_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
+CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- if (r < 0) {
- ldout(cct, 1) << __func__ << " auth bad method write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
+ AuthRequestFrame auth_request(payload, length);
- return CONTINUE(read_frame);
-}
+ ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method()
+ << ", auth_len=" << auth_request.auth_payload().length() << ")"
+ << dendl;
-CtPtr ProtocolV2::handle_auth_bad_auth_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
+ std::vector<uint32_t> allowed_methods;
+ messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type,
+ allowed_methods);
- if (r < 0) {
- ldout(cct, 1) << __func__ << " auth bad auth write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
+ bool found = std::find(allowed_methods.begin(), allowed_methods.end(),
+ auth_request.method()) != allowed_methods.end();
+ if (!found) {
+ ldout(cct, 1) << __func__ << " auth method=" << auth_request.method()
+ << " not allowed" << dendl;
+ AuthBadMethodFrame bad_method(auth_request.method(), allowed_methods);
+ bufferlist &bl = bad_method.get_buffer();
+ return WRITE(bl, "bad auth method", read_frame);
}
- return CONTINUE(read_frame);
-}
+ ldout(cct, 10) << __func__ << " auth method=" << auth_request.method()
+ << " accepted" << dendl;
-CtPtr ProtocolV2::handle_auth_done_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
+ auth_method = auth_request.method();
- if (r < 0) {
- ldout(cct, 1) << __func__ << " auth done write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
+ if (auth_method == CEPH_AUTH_NONE) {
+ ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl;
+
+ session_security.reset();
+ bufferlist empty_bl;
+ AuthDoneFrame auth_done(0, empty_bl);
+ return WRITE(auth_done.get_buffer(), "auth done", read_frame);
}
- return CONTINUE(read_frame);
+ if (auth_method == CEPH_AUTH_CEPHX) {
+ return handle_cephx_auth(auth_request.auth_payload());
+ }
+
+ lderr(cct) << __func__ << " auth method " << auth_method << " not implemented"
+ << dendl;
+ ceph_abort();
+ return nullptr;
}
CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
- ClientIdentFrame client_ident(payload, length);
+ ClientIdentFrame client_ident(this, payload, length);
ldout(cct, 5) << __func__ << " received client identification: "
- << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid
- << " global_seq=" << client_ident.global_seq
+ << "addrs=" << client_ident.addrs()
+ << " gid=" << client_ident.gid()
+ << " global_seq=" << client_ident.global_seq()
<< " features_supported=" << std::hex
- << client_ident.supported_features
- << " features_required=" << client_ident.required_features
- << " flags=" << client_ident.flags << std::dec << dendl;
+ << client_ident.supported_features()
+ << " features_required=" << client_ident.required_features()
+ << " flags=" << client_ident.flags() << std::dec << dendl;
- if (client_ident.addrs.empty()) {
+ if (client_ident.addrs().empty()) {
connection->set_peer_addr(connection->target_addr);
} else {
// Should we check if one of the ident.addrs match connection->target_addr
// as we do in ProtocolV1?
- connection->set_peer_addrs(client_ident.addrs);
- connection->target_addr = client_ident.addrs.msgr2_addr();
+ connection->set_peer_addrs(client_ident.addrs());
+ connection->target_addr = client_ident.addrs().msgr2_addr();
}
uint64_t feat_missing = connection->policy.features_required &
- ~(uint64_t)client_ident.supported_features;
+ ~(uint64_t)client_ident.supported_features();
if (feat_missing) {
ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
<< feat_missing << std::dec << dendl;
- IdentMissingFeaturesFrame ident_missing_features(feat_missing);
+ IdentMissingFeaturesFrame ident_missing_features(this, feat_missing);
bufferlist &bl = ident_missing_features.get_buffer();
- return WRITE(bl, handle_ident_missing_features_write);
+ return WRITE(bl, "ident missing features", read_frame);
}
connection_features =
- client_ident.supported_features & connection->policy.features_supported;
+ client_ident.supported_features() & connection->policy.features_supported;
state = ACCEPTING_SESSION;
- peer_global_seq = client_ident.global_seq;
+ peer_global_seq = client_ident.global_seq();
// Looks good so far, let's check if there is already an existing connection
// to this peer.
return send_server_ident();
}
-CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
- << " (" << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
-}
-
CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
- ReconnectFrame reconnect(payload, length);
+ ReconnectFrame reconnect(this, payload, length);
ldout(cct, 5) << __func__
- << " received reconnect: cookie=" << reconnect.cookie
- << " gs=" << reconnect.global_seq
- << " cs=" << reconnect.connect_seq
- << " ms=" << reconnect.msg_seq << dendl;
+ << " received reconnect: cookie=" << reconnect.cookie()
+ << " gs=" << reconnect.global_seq()
+ << " cs=" << reconnect.connect_seq()
+ << " ms=" << reconnect.msg_seq() << dendl;
- if (reconnect.addrs.empty()) {
+ if (reconnect.addrs().empty()) {
connection->set_peer_addr(connection->target_addr);
} else {
// Should we check if one of the ident.addrs match connection->target_addr
// as we do in ProtocolV1?
- connection->set_peer_addrs(reconnect.addrs);
- connection->target_addr = reconnect.addrs.msgr2_addr();
+ connection->set_peer_addrs(reconnect.addrs());
+ connection->target_addr = reconnect.addrs().msgr2_addr();
}
connection->lock.unlock();
// session
ldout(cct, 0) << __func__
<< " no existing connection exists, reseting client" << dendl;
- return WRITE(bl, handle_session_reset_write);
+ return WRITE(bl, "session reset", read_frame);
}
std::lock_guard<std::mutex> l(existing->lock);
if (exproto->state == CLOSED) {
ldout(cct, 5) << __func__ << " existing " << existing
<< " already closed. Reseting client" << dendl;
- return WRITE(bl, handle_session_reset_write);
+ return WRITE(bl, "session reset", read_frame);
}
if (exproto->replacing) {
ldout(cct, 1) << __func__
<< " existing racing replace happened while replacing."
<< " existing=" << existing << dendl;
- RetryGlobalFrame retry(exproto->peer_global_seq);
+ RetryGlobalFrame retry(this, exproto->peer_global_seq);
bufferlist &bl = retry.get_buffer();
- return WRITE(bl, handle_session_retry_write);
+ return WRITE(bl, "session retry", read_frame);
}
if (!exproto->cookie) {
// server connection was reseted, reset client
ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl;
- return WRITE(bl, handle_session_reset_write);
- } else if (exproto->cookie != reconnect.cookie) {
+ return WRITE(bl, "session reset", read_frame);
+ } else if (exproto->cookie != reconnect.cookie()) {
ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie
- << " cc=" << reconnect.cookie << ", reseting client" << dendl;
- return WRITE(bl, handle_session_reset_write);
+ << " cc=" << reconnect.cookie() << ", reseting client"
+ << dendl;
+ return WRITE(bl, "session reset", read_frame);
}
- if (exproto->peer_global_seq > reconnect.global_seq) {
+ if (exproto->peer_global_seq > reconnect.global_seq()) {
ldout(cct, 5) << __func__
<< " stale global_seq: sgs=" << exproto->peer_global_seq
- << " cgs=" << reconnect.global_seq
+ << " cgs=" << reconnect.global_seq()
<< ", ask client to retry global" << dendl;
- RetryGlobalFrame retry(exproto->peer_global_seq);
+ RetryGlobalFrame retry(this, exproto->peer_global_seq);
bufferlist &bl = retry.get_buffer();
- return WRITE(bl, handle_session_retry_write);
+ return WRITE(bl, "session retry", read_frame);
}
- if (exproto->connect_seq >= reconnect.connect_seq) {
+ if (exproto->connect_seq >= reconnect.connect_seq()) {
ldout(cct, 5) << __func__
<< " stale connect_seq scs=" << exproto->connect_seq
- << " ccs=" << reconnect.connect_seq
+ << " ccs=" << reconnect.connect_seq()
<< " , ask client to retry" << dendl;
- RetryFrame retry(exproto->connect_seq);
+ RetryFrame retry(this, exproto->connect_seq);
bufferlist &bl = retry.get_buffer();
- return WRITE(bl, handle_session_retry_write);
+ return WRITE(bl, "session retry", read_frame);
}
// everything looks good
- exproto->connect_seq = reconnect.connect_seq;
- exproto->message_seq = reconnect.msg_seq;
+ exproto->connect_seq = reconnect.connect_seq();
+ exproto->message_seq = reconnect.msg_seq();
return reuse_connection(existing, exproto, true);
}
-CtPtr ProtocolV2::handle_session_reset_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
-}
-
-CtPtr ProtocolV2::handle_session_retry_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
- return CONTINUE(read_frame);
-}
-
CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
<< " existing=" << existing << dendl;
WaitFrame wait;
bufferlist &bl = wait.get_buffer();
- return WRITE(bl, handle_wait_write);
+ return WRITE(bl, "wait", read_frame);
}
if (existing->policy.lossy) {
messenger->get_myaddrs().msgr2_addr());
WaitFrame wait;
bufferlist &bl = wait.get_buffer();
- return WRITE(bl, handle_wait_write);
- }
-}
-
-CtPtr ProtocolV2::handle_wait_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " wait write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
+ return WRITE(bl, "wait", read_frame);
}
-
- return CONTINUE(read_frame);
}
CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
<< dendl;
exproto->can_write = false;
exproto->replacing = true;
+ exproto->session_security = session_security;
+ exproto->auth_method = auth_method;
+ exproto->session_key = session_key;
+ exproto->authorizer_challenge = std::move(authorizer_challenge);
existing->state_offset = 0;
// avoid previous thread modify event
exproto->state = NONE;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);
if (!reconnect) {
- CONTINUATION_RUN2(exproto, exproto->send_server_ident())
+ exproto->run_continuation(exproto->send_server_ident());
} else {
- CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok())
+ exproto->run_continuation(exproto->send_reconnect_ok());
}
};
if (existing->center->in_thread())
}
uint64_t gs = messenger->get_global_seq();
- ServerIdentFrame server_ident(
+ ServerIdentFrame server_ident(this,
messenger->get_myaddrs(), messenger->get_myname().num(), gs,
connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
messenger->ms_deliver_handle_fast_accept(connection);
bufferlist &bl = server_ident.get_buffer();
- return WRITE(bl, handle_server_ident_write);
+ return WRITE(bl, "server ident", server_ready);
}
-CtPtr ProtocolV2::handle_server_ident_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- connection->inject_delay();
- return _fault();
- }
+CtPtr ProtocolV2::server_ready() {
+ ldout(cct, 20) << __func__ << dendl;
if (connection->delay_state) {
ceph_assert(connection->delay_state->ready());
out_seq = discard_requeued_up_to(out_seq, message_seq);
uint64_t ms = in_seq;
- ReconnectOkFrame reconnect_ok(ms);
+ ReconnectOkFrame reconnect_ok(this, ms);
ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
messenger->ms_deliver_handle_fast_accept(connection);
bufferlist &bl = reconnect_ok.get_buffer();
- return WRITE(bl, handle_reconnect_ok_write);
-}
-
-CtPtr ProtocolV2::handle_reconnect_ok_write(int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- connection->inject_delay();
- return _fault();
- }
-
- if (connection->delay_state) {
- ceph_assert(connection->delay_state->ready());
- }
-
- return ready();
+ return WRITE(bl, "reconnect ok", server_ready);
}
return statenames[state];
}
+public:
enum class Tag : uint32_t {
- AUTH_REQUEST,
+ AUTH_REQUEST = 1,
AUTH_BAD_METHOD,
AUTH_BAD_AUTH,
AUTH_MORE,
ACK
};
- struct Frame {
- uint32_t tag;
- bufferlist payload;
- bufferlist frame_buffer;
-
- Frame(Tag tag) : tag(static_cast<uint32_t>(tag)) {
- encode(this->tag, payload, 0);
- }
-
- Frame() {}
-
- bufferlist &get_buffer() {
- if (frame_buffer.length()) {
- return frame_buffer;
- }
- encode((uint32_t)payload.length(), frame_buffer, 0);
- frame_buffer.claim_append(payload);
- return frame_buffer;
- }
- };
-
- struct SignedEncryptedFrame : public Frame {
- SignedEncryptedFrame(Tag tag) : Frame(tag) {}
- SignedEncryptedFrame() : Frame() {}
- bufferlist &get_buffer() { return Frame::get_buffer(); }
- };
-
- struct AuthRequestFrame : public Frame {
- uint32_t method;
- uint32_t len;
- bufferlist auth_payload;
-
- AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
- : Frame(Tag::AUTH_REQUEST) {
- encode(method, payload, 0);
- encode(auth_payload.length(), payload, 0);
- payload.claim_append(auth_payload);
- }
-
- AuthRequestFrame(uint32_t method) : Frame(Tag::AUTH_REQUEST) {
- encode(method, payload, 0);
- encode((uint32_t)0, payload, 0);
- }
-
- AuthRequestFrame(char *payload, uint32_t length) : Frame() {
- method = *(uint32_t *)payload;
- len = *(uint32_t *)(payload + sizeof(uint32_t));
- ceph_assert((length - (sizeof(uint32_t) * 2)) == len);
- auth_payload.append((payload + (sizeof(uint32_t) * 2)), len);
- }
- };
-
- struct AuthBadMethodFrame : public Frame {
- uint32_t method;
- std::vector<uint32_t> allowed_methods;
-
- AuthBadMethodFrame(uint32_t method, std::vector<uint32_t> methods)
- : Frame(Tag::AUTH_BAD_METHOD) {
- encode(method, payload, 0);
- encode((uint32_t)methods.size(), payload, 0);
- for (const auto &a_meth : methods) {
- encode(a_meth, payload, 0);
- }
- }
-
- AuthBadMethodFrame(char *payload, uint32_t length) : Frame() {
- method = *(uint32_t *)payload;
- uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t));
- payload += sizeof(uint32_t) * 2;
- for (unsigned i = 0; i < num_methods; ++i) {
- allowed_methods.push_back(
- *(uint32_t *)(payload + sizeof(uint32_t) * i));
- }
- }
- };
-
- struct AuthBadAuthFrame : public Frame {
- uint32_t error_code;
- std::string error_msg;
-
- AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
- : Frame(Tag::AUTH_BAD_AUTH) {
- encode(error_code, payload, 0);
- encode(error_msg, payload, 0);
- }
-
- AuthBadAuthFrame(char *payload, uint32_t length) : Frame() {
- error_code = *(uint32_t *)payload;
- uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t));
- error_msg = std::string(payload + sizeof(uint32_t) * 2, len);
- }
- };
-
- struct AuthMoreFrame : public Frame {
- bufferlist auth_payload;
-
- AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) {
- encode(auth_payload.length(), payload, 0);
- payload.claim_append(auth_payload);
- }
-
- AuthMoreFrame(char *payload, uint32_t length) : Frame() {
- uint32_t len = *(uint32_t *)payload;
- ceph_assert((length - sizeof(uint32_t)) == len);
- auth_payload.append(payload + sizeof(uint32_t), len);
- }
- };
-
- struct AuthDoneFrame : public Frame {
- uint64_t flags;
- bufferlist auth_payload;
-
- AuthDoneFrame(uint64_t flags, bufferlist &auth_payload)
- : Frame(Tag::AUTH_DONE) {
- encode(flags, payload, 0);
- encode(auth_payload.length(), payload, 0);
- payload.claim_append(auth_payload);
- }
-
- AuthDoneFrame(char *payload, uint32_t length) : Frame() {
- flags = *(uint64_t *)payload;
- payload += sizeof(uint64_t);
- uint32_t len = *(uint32_t *)payload;
- ceph_assert((length - sizeof(uint32_t) - sizeof(uint64_t)) == len);
- auth_payload.append(payload + sizeof(uint32_t), len);
- }
- };
-
- struct ClientIdentFrame : public SignedEncryptedFrame {
- entity_addrvec_t addrs;
- int64_t gid;
- uint64_t global_seq;
- uint64_t supported_features; // CEPH_FEATURE_*
- uint64_t required_features; // CEPH_FEATURE_*
- uint64_t flags; // CEPH_MSG_CONNECT_*
-
- ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
- uint64_t global_seq, uint64_t supported_features,
- uint64_t required_features, uint64_t flags)
- : SignedEncryptedFrame(Tag::IDENT) {
- encode(addrs, payload, -1ll);
- encode(gid, payload, -1ll);
- encode(global_seq, payload, -1ll);
- encode(supported_features, payload, -1ll);
- encode(required_features, payload, -1ll);
- encode(flags, payload, -1ll);
- }
-
- ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- decode_frame(ti);
- } catch (const buffer::error &e) {
- }
- }
-
- ClientIdentFrame() : SignedEncryptedFrame() {}
-
- protected:
- void decode_frame(ceph::buffer::list::const_iterator &ti) {
- decode(addrs, ti);
- decode(gid, ti);
- decode(global_seq, ti);
- decode(supported_features, ti);
- decode(required_features, ti);
- decode(flags, ti);
- }
- };
-
- struct ServerIdentFrame : public ClientIdentFrame {
- uint64_t cookie;
-
- ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
- uint64_t global_seq, uint64_t supported_features,
- uint64_t required_features, uint64_t flags,
- uint64_t cookie)
- : ClientIdentFrame(addrs, gid, global_seq, supported_features,
- required_features, flags) {
- encode(cookie, payload, -1ll);
- }
-
- ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- ClientIdentFrame::decode_frame(ti);
- decode(cookie, ti);
- } catch (const buffer::error &e) {
- }
- }
- };
-
- struct ReconnectFrame : public SignedEncryptedFrame {
- entity_addrvec_t addrs;
- uint64_t cookie;
- uint64_t global_seq;
- uint64_t connect_seq;
- uint64_t msg_seq;
-
- ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie,
- uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq)
- : SignedEncryptedFrame(Tag::SESSION_RECONNECT) {
- encode(addrs, payload, -1ll);
- encode(cookie, payload, 0);
- encode(global_seq, payload, 0);
- encode(connect_seq, payload, 0);
- encode(msg_seq, payload, 0);
- }
-
- ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- decode(addrs, ti);
- decode(cookie, ti);
- decode(global_seq, ti);
- decode(connect_seq, ti);
- decode(msg_seq, ti);
- } catch (const buffer::error &e) {
- }
- }
- };
-
- struct ResetFrame : public SignedEncryptedFrame {
- ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {}
- };
-
- struct RetryFrame : public SignedEncryptedFrame {
- uint64_t connect_seq;
-
- RetryFrame(uint64_t connect_seq)
- : SignedEncryptedFrame(Tag::SESSION_RETRY) {
- encode(connect_seq, payload);
- }
-
- RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- decode(connect_seq, ti);
- } catch (const buffer::error &e) {
- }
- }
- };
-
- struct RetryGlobalFrame : public SignedEncryptedFrame {
- uint64_t global_seq;
-
- RetryGlobalFrame(uint64_t global_seq)
- : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) {
- encode(global_seq, payload);
- }
-
- RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- decode(global_seq, ti);
- } catch (const buffer::error &e) {
- }
- }
- };
-
- struct WaitFrame : public SignedEncryptedFrame {
- WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {}
- };
-
- struct ReconnectOkFrame : public SignedEncryptedFrame {
- uint64_t msg_seq;
-
- ReconnectOkFrame(uint64_t msg_seq)
- : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) {
- encode(msg_seq, payload, 0);
- }
-
- ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- bufferlist bl;
- bl.push_back(buffer::create_static(length, payload));
- try {
- auto ti = bl.cbegin();
- decode(msg_seq, ti);
- } catch (const buffer::error &e) {
- }
- }
- };
-
- struct IdentMissingFeaturesFrame : public SignedEncryptedFrame {
- __le64 features;
-
- IdentMissingFeaturesFrame(uint64_t features)
- : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES),
- features(features) {
- encode(features, payload, -1ll);
- }
-
- IdentMissingFeaturesFrame(char *payload, uint32_t length)
- : SignedEncryptedFrame() {
- features = *(uint64_t *)payload;
- }
- };
-
- struct MessageFrame : public SignedEncryptedFrame {
- const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
-
- ceph_msg_header2 header2;
-
- MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
- bool calc_crc)
- : SignedEncryptedFrame(Tag::MESSAGE) {
- ceph_msg_header &header = msg->get_header();
- ceph_msg_footer &footer = msg->get_footer();
-
- header2 = ceph_msg_header2{header.seq, header.tid,
- header.type, header.priority,
- header.version, header.front_len,
- header.middle_len, 0,
- header.data_len, header.data_off,
- ack_seq, footer.front_crc,
- footer.middle_crc, footer.data_crc,
- footer.flags, header.compat_version,
- header.reserved, 0};
-
- if (calc_crc) {
- header2.header_crc =
- ceph_crc32c(0, (unsigned char *)&header2,
- sizeof(header2) - sizeof(header2.header_crc));
- }
-
- payload.append((char *)&header2, sizeof(header2));
- if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
- (data.buffers().size() > 1)) {
- for (const auto &pb : data.buffers()) {
- payload.append((char *)pb.c_str(), pb.length());
- }
- } else {
- payload.claim_append(data);
- }
- }
- };
-
- struct KeepAliveFrame : public SignedEncryptedFrame {
- struct ceph_timespec timestamp;
-
- KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) {
- struct ceph_timespec ts;
- utime_t t = ceph_clock_now();
- t.encode_timeval(&ts);
- payload.append((char *)&ts, sizeof(ts));
- }
-
- KeepAliveFrame(struct ceph_timespec ×tamp)
- : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) {
- payload.append((char *)×tamp, sizeof(timestamp));
- }
-
- KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- ceph_assert(length == sizeof(timestamp));
- timestamp = *(struct ceph_timespec *)payload;
- }
- };
-
- struct AckFrame : public SignedEncryptedFrame {
- uint64_t seq;
-
- AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) {
- encode(seq, payload, 0);
- }
-
- AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
- seq = *(uint64_t *)payload;
- }
- };
+private:
+ enum class AuthFlag : uint64_t { ENCRYPTED = 1, SIGNED = 2 };
char *temp_buffer;
State state;
AuthAuthorizer *authorizer;
uint32_t auth_method;
bool got_bad_auth;
+ uint32_t got_bad_method;
CryptoKey session_key;
std::shared_ptr<AuthSessionHandler> session_security;
std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
+ uint64_t auth_flags;
uint64_t connection_features;
uint64_t cookie;
uint64_t global_seq;
using ProtFuncPtr = void (ProtocolV2::*)();
Ct<ProtocolV2> *bannerExchangeCallback;
- uint32_t next_frame_len;
+ uint32_t next_payload_len;
Tag next_tag;
ceph_msg_header2 current_header;
utime_t backoff; // backoff time
bool keepalive;
ostream &_conn_prefix(std::ostream *_dout);
+ inline void run_continuation(Ct<ProtocolV2> *continuation);
Ct<ProtocolV2> *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
int len, char *buffer = nullptr);
- Ct<ProtocolV2> *write(CONTINUATION_PARAM(next, ProtocolV2, int),
- bufferlist &bl);
+ Ct<ProtocolV2> *write(const std::string &desc,
+ CONTINUATION_PARAM(next, ProtocolV2),
+ bufferlist &buffer);
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
void append_keepalive_ack(utime_t ×tamp);
void handle_message_ack(uint64_t seq);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write);
+ CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
_banner_exchange_handle_peer_banner);
Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> *callback);
- Ct<ProtocolV2> *_banner_exchange_handle_write(int r);
+ Ct<ProtocolV2> *_wait_for_peer_banner();
Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
CONTINUATION_DECL(ProtocolV2, read_frame);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
CONTINUATION_DECL(ProtocolV2, throttle_message);
CONTINUATION_DECL(ProtocolV2, throttle_bytes);
Ct<ProtocolV2> *read_frame();
Ct<ProtocolV2> *handle_read_frame_length_and_tag(char *buffer, int r);
Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
-
Ct<ProtocolV2> *handle_auth_more(char *payload, uint32_t length);
- Ct<ProtocolV2> *handle_auth_more_write(int r);
-
Ct<ProtocolV2> *handle_ident(char *payload, uint32_t length);
Ct<ProtocolV2> *ready();
// Client Protocol
CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write);
Ct<ProtocolV2> *start_client_banner_exchange();
Ct<ProtocolV2> *post_client_banner_exchange();
- Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> allowed_methods = {});
- Ct<ProtocolV2> *handle_auth_request_write(int r);
+ inline Ct<ProtocolV2> *send_auth_request() {
+ std::vector<uint32_t> empty;
+ return send_auth_request(empty);
+ }
+ Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> &allowed_methods);
Ct<ProtocolV2> *handle_auth_bad_method(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_auth_bad_auth(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_auth_done(char *payload, uint32_t length);
Ct<ProtocolV2> *send_client_ident();
- Ct<ProtocolV2> *handle_client_ident_write(int r);
Ct<ProtocolV2> *send_reconnect();
- Ct<ProtocolV2> *handle_reconnect_write(int r);
Ct<ProtocolV2> *handle_ident_missing_features(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_session_reset();
Ct<ProtocolV2> *handle_session_retry(char *payload, uint32_t length);
// Server Protocol
CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_method_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_auth_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2,
- handle_ident_missing_features_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write);
+ CONTINUATION_DECL(ProtocolV2, server_ready);
Ct<ProtocolV2> *start_server_banner_exchange();
Ct<ProtocolV2> *post_server_banner_exchange();
+ Ct<ProtocolV2> *handle_cephx_auth(bufferlist &auth_payload);
Ct<ProtocolV2> *handle_auth_request(char *payload, uint32_t length);
- Ct<ProtocolV2> *handle_auth_bad_method_write(int r);
- Ct<ProtocolV2> *handle_auth_bad_auth_write(int r);
- Ct<ProtocolV2> *handle_auth_done_write(int r);
Ct<ProtocolV2> *handle_client_ident(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
Ct<ProtocolV2> *handle_reconnect(char *payload, uint32_t length);
- Ct<ProtocolV2> *handle_session_reset_write(int r);
- Ct<ProtocolV2> *handle_session_retry_write(int r);
Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
- Ct<ProtocolV2> *handle_wait_write(int r);
Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
ProtocolV2 *exproto, bool reconnect);
Ct<ProtocolV2> *send_server_ident();
- Ct<ProtocolV2> *handle_server_ident_write(int r);
Ct<ProtocolV2> *send_reconnect_ok();
- Ct<ProtocolV2> *handle_reconnect_ok_write(int r);
+ Ct<ProtocolV2> *server_ready();
+
+public:
+ template <class T, typename... Args>
+ friend struct SignedEncryptedFrame;
+
+ // TODO: REMOVE THIS
+ void log(const std::string message, uint64_t val, uint64_t val2);
};
#endif /* _MSG_ASYNC_PROTOCOL_V2_ */