write_event();
}
+void Protocol::notify_ack()
+{
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ write_event();
+ }
+}
+
void Protocol::requeue_sent()
{
assert(write_state != write_state_t::open);
conn.sent.clear();
need_keepalive = false;
keepalive_ack = std::nullopt;
+ ack_left = 0;
}
void Protocol::ack_writes(seq_num_t seq)
conn.pending_q.begin(),
conn.pending_q.end());
}
+ auto acked = ack_left;
+ assert(acked == 0 || conn.in_seq > 0);
// sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages(
- conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
- .then([this, prv_keepalive_ack=keepalive_ack] {
+ conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+ ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
need_keepalive = false;
if (keepalive_ack == prv_keepalive_ack) {
keepalive_ack = std::nullopt;
}
+ assert(ack_left >= acked);
+ ack_left -= acked;
if (!is_queued()) {
// good, we have nothing pending to send now.
return socket->flush().then([this] {
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> keepalive_ack) = 0;
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) = 0;
public:
const proto_t proto_type;
void notify_keepalive_ack(utime_t keepalive_ack);
+ void notify_ack();
+
void requeue_up_to(seq_num_t seq);
void requeue_sent();
bool is_queued() const {
return (!conn.out_q.empty() ||
+ ack_left > 0 ||
need_keepalive ||
keepalive_ack.has_value());
}
bool need_keepalive = false;
std::optional<utime_t> keepalive_ack = std::nullopt;
+ uint64_t ack_left = 0;
bool write_dispatching = false;
// Indicate if we are in the middle of writing.
bool open_write = false;
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> _keepalive_ack)
+ std::optional<utime_t> _keepalive_ack,
+ bool require_ack)
{
static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
sizeof(ceph_msg_header) +
bl.append(create_static(k.ack));
}
+ if (require_ack) {
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
+ }
+
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
ceph_assert(!msg->get_seq() && "message already has seq");
msg->set_seq(++conn.out_seq);
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> keepalive_ack) override;
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) override;
private:
SocketMessenger &messenger;
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> _keepalive_ack)
+ std::optional<utime_t> _keepalive_ack,
+ bool require_ack)
{
ceph::bufferlist bl;
bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
}
+ if (require_ack && !num_msgs) {
+ auto ack_frame = AckFrame::Encode(conn.in_seq);
+ bl.append(ack_frame.get_buffer(session_stream_handlers));
+ }
+
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
// TODO: move to common code
// set priority
ceph_assert(!msg->get_seq() && "message already has seq");
msg->set_seq(++conn.out_seq);
- uint64_t ack_seq = conn.in_seq;
- // ack_left = 0;
ceph_msg_header &header = msg->get_header();
ceph_msg_footer &footer = msg->get_footer();
header.type, header.priority,
header.version,
0, header.data_off,
- ack_seq,
+ conn.in_seq,
footer.flags, header.compat_version,
header.reserved};
conn.in_seq = message->get_seq();
logger().debug("{} <== #{} === {} ({})",
conn, message->get_seq(), *message, message->get_type());
- if (!conn.policy.lossy) {
- // ++ack_left;
- }
+ notify_ack();
ack_writes(current_header.ack_seq);
// TODO: change MessageRef with seastar::shared_ptr
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> keepalive_ack) override;
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) override;
private:
SocketMessenger &messenger;