#include "crimson/net/Errors.h"
#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/SocketConnection.h"
+#include "crimson/net/SocketMessenger.h"
#include "msg/Message.h"
namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_ms);
- }
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
}
+} // namespace anonymous
+
namespace crimson::net {
Protocol::Protocol(ChainedDispatchers& dispatchers,
}
ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
- size_t num_msgs,
- bool require_keepalive,
- std::optional<utime_t> maybe_keepalive_ack,
- bool require_ack)
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack)
{
- ceph::bufferlist bl = do_sweep_messages(out_pending_msgs,
- num_msgs,
- require_keepalive,
- maybe_keepalive_ack,
- require_ack);
+ std::size_t num_msgs = out_pending_msgs.size();
+ ceph::bufferlist bl;
+
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(frame_assembler.get_buffer(keepalive_frame));
+ }
+
+ if (unlikely(maybe_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
+ bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
+ }
+
+ if (require_ack && num_msgs == 0u) {
+ auto ack_frame = AckFrame::Encode(get_in_seq());
+ bl.append(frame_assembler.get_buffer(ack_frame));
+ }
+
+ std::for_each(
+ out_pending_msgs.begin(),
+ out_pending_msgs.begin()+num_msgs,
+ [this, &bl](const MessageURef& msg) {
+ // set priority
+ msg->get_header().src = conn.messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++out_seq);
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ ceph_le32(0), header.data_off,
+ ceph_le64(get_in_seq()),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(frame_assembler.get_buffer(message));
+ });
+
if (!conn.policy.lossy) {
out_sent_msgs.insert(
out_sent_msgs.end(),
return seastar::now();
}
+void Protocol::set_out_state(
+ const Protocol::out_state_t &new_state)
+{
+ ceph_assert_always(!(
+ (new_state == out_state_t::none && out_state != out_state_t::none) ||
+ (new_state == out_state_t::open && out_state == out_state_t::open) ||
+ (new_state != out_state_t::drop && out_state == out_state_t::drop)
+ ));
+
+ if (out_state != out_state_t::open &&
+ new_state == out_state_t::open) {
+ // to open
+ ceph_assert_always(frame_assembler.is_socket_valid());
+ } else if (out_state == out_state_t::open &&
+ new_state != out_state_t::open) {
+ // from open
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::shared_promise<>();
+ }
+ }
+
+ if (out_state != new_state) {
+ out_state = new_state;
+ out_state_changed.set_value();
+ out_state_changed = seastar::shared_promise<>();
+ }
+}
+
void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
{
logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
return seastar::repeat([this] {
switch (out_state) {
case out_state_t::open: {
- size_t num_msgs = out_pending_msgs.size();
bool still_queued = is_out_queued();
if (unlikely(!still_queued)) {
return try_exit_out_dispatch();
// sweep all pending out with the concrete Protocol
return frame_assembler.write(
sweep_out_pending_msgs_to_sent(
- num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0)
+ need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
need_keepalive = false;
if (next_keepalive_ack == prv_keepalive_ack) {
ceph_abort();
}
- std::exception_ptr eptr;
- try {
- throw e;
- } catch(...) {
- eptr = std::current_exception();
- }
- notify_out_fault(eptr);
-
if (out_state == out_state_t::open) {
logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
conn, out_state, e);
- out_state = out_state_t::delay;
+ std::exception_ptr eptr;
+ try {
+ throw e;
+ } catch(...) {
+ eptr = std::current_exception();
+ }
+ set_out_state(out_state_t::delay);
+ notify_out_fault(eptr);
} else {
logger().info("{} do_out_dispatch(): fault at {} -- {}",
conn, out_state, e);
}
+
return do_out_dispatch();
});
}
Protocol(ChainedDispatchers& dispatchers,
SocketConnection& conn);
- virtual ceph::bufferlist do_sweep_messages(
- const std::deque<MessageURef>& msgs,
- size_t num_msgs,
- bool require_keepalive,
- std::optional<utime_t> maybe_keepalive_ack,
- bool require_ack) = 0;
-
virtual void notify_out() = 0;
virtual void notify_out_fault(std::exception_ptr) = 0;
open,
drop
};
-
friend class fmt::formatter<out_state_t>;
- void set_out_state(const out_state_t& state) {
- if (out_state == out_state_t::open &&
- state != out_state_t::open &&
- out_dispatching) {
- out_exit_dispatching = seastar::shared_promise<>();
- }
- out_state = state;
- out_state_changed.set_value();
- out_state_changed = seastar::shared_promise<>();
- }
+
+ void set_out_state(const out_state_t &new_state);
seastar::future<> wait_out_exit_dispatching() {
if (out_exit_dispatching) {
in_seq = _in_seq;
}
- seq_num_t increment_out_seq() {
- return ++out_seq;
- }
-
ChainedDispatchers& dispatchers;
SocketConnection &conn;
seastar::future<> do_out_dispatch();
ceph::bufferlist sweep_out_pending_msgs_to_sent(
- size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> maybe_keepalive_ack,
bool require_ack);
// READY state
-ceph::bufferlist ProtocolV2::do_sweep_messages(
- const std::deque<MessageURef>& msgs,
- size_t num_msgs,
- bool require_keepalive,
- std::optional<utime_t> maybe_keepalive_ack,
- bool require_ack)
-{
- ceph::bufferlist bl;
-
- if (unlikely(require_keepalive)) {
- auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(frame_assembler.get_buffer(keepalive_frame));
- }
-
- if (unlikely(maybe_keepalive_ack.has_value())) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
- bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
- }
-
- if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(get_in_seq());
- bl.append(frame_assembler.get_buffer(ack_frame));
- }
-
- std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
- // TODO: move to common code
- // set priority
- msg->get_header().src = messenger.get_myname();
-
- msg->encode(conn.features, 0);
-
- ceph_assert(!msg->get_seq() && "message already has seq");
- msg->set_seq(increment_out_seq());
-
- ceph_msg_header &header = msg->get_header();
- ceph_msg_footer &footer = msg->get_footer();
-
- ceph_msg_header2 header2{header.seq, header.tid,
- header.type, header.priority,
- header.version,
- ceph_le32(0), header.data_off,
- ceph_le64(get_in_seq()),
- footer.flags, header.compat_version,
- header.reserved};
-
- auto message = MessageFrame::Encode(header2,
- msg->get_payload(), msg->get_middle(), msg->get_data());
- logger().debug("{} --> #{} === {} ({})",
- conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(frame_assembler.get_buffer(message));
- });
-
- return bl;
-}
-
void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
{
fault(state_t::READY, "notify_out_fault", eptr);