messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
}
- if (h.backoff.count()) {
- h.backoff += h.backoff;
- } else {
- h.backoff = conf.ms_initial_backoff;
- }
- if (h.backoff > conf.ms_max_backoff) {
- h.backoff = conf.ms_max_backoff;
- }
- return seastar::sleep(h.backoff);
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
+ return seastar::now();
}
} // namespace ceph::net
namespace ceph::net {
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+ logger().warn("{} waiting {} seconds ...", conn, seconds);
+ cancel();
+ last_dur_ = seconds;
+ as = seastar::abort_source();
+ auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(seconds));
+ return seastar::sleep_abortable(dur, *as
+ ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+ logger().warn("{} wait aborted", conn);
+ abort_protocol();
+ });
+}
+
ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
SocketConnection& conn,
SocketMessenger& messenger)
- : Protocol(proto_t::v2, dispatcher, conn), messenger{messenger} {}
+ : Protocol(proto_t::v2, dispatcher, conn),
+ messenger{messenger},
+ protocol_timer{conn}
+{}
ProtocolV2::~ProtocolV2() {}
connect_seq = 0;
server_cookie = 0;
}
- // TODO: backoff = utime_t();
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
requeue_up_to(reconnect_ok.msg_seq());
- // TODO
- // backoff = utime_t();
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()))
break;
}
case next_step_t::wait: {
- execute_wait();
+ execute_wait(true);
break;
}
default: {
{
trigger_state(state_t::READY, write_state_t::open, false);
execution_done = seastar::with_gate(pending_dispatch, [this] {
+ protocol_timer.cancel();
return seastar::keep_doing([this] {
return read_main_preamble()
.then([this] (Tag tag) {
// WAIT state
-void ProtocolV2::execute_wait()
+void ProtocolV2::execute_wait(bool max_backoff)
{
- // TODO not implemented
- // trigger_state(state_t::WAIT, write_state_t::delay, false);
- ceph_assert(false);
+ trigger_state(state_t::WAIT, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+ execution_done = seastar::with_gate(pending_dispatch,
+ [this, max_backoff] {
+ double backoff = protocol_timer.last_dur();
+ if (max_backoff) {
+ backoff = conf.ms_max_backoff;
+ } else if (backoff > 0) {
+ backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+ } else {
+ backoff = conf.ms_initial_backoff;
+ }
+ return protocol_timer.backoff(backoff).then([this] {
+ execute_connecting();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().debug("{} execute_wait(): got exception {} at state {}, abort",
+ conn, eptr, get_state_name(state));
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ });
+ });
}
// SERVER_WAIT state
ceph_assert(false);
}
+ protocol_timer.cancel();
+
if (!socket) {
ceph_assert(state == state_t::CONNECTING);
}
#pragma once
+#include <seastar/core/sleep.hh>
+
#include "Protocol.h"
#include "msg/async/frames_v2.h"
#include "msg/async/crypto_onwire.h"
CONNECTING,
READY,
STANDBY,
- WAIT, // ? CLIENT_WAIT
+ WAIT,
REPLACING, // ?
CLOSING
};
"CONNECTING",
"READY",
"STANDBY",
- "WAIT", // ? CLIENT_WAIT
+ "WAIT",
"REPLACING", // ?
"CLOSING"};
return statenames[static_cast<int>(state)];
seastar::future<> execution_done = seastar::now();
+ class Timer {
+ double last_dur_ = 0.0;
+ const SocketConnection& conn;
+ std::optional<seastar::abort_source> as;
+ public:
+ Timer(SocketConnection& conn) : conn(conn) {}
+ double last_dur() const { return last_dur_; }
+ seastar::future<> backoff(double seconds);
+ void cancel() {
+ last_dur_ = 0.0;
+ if (as) {
+ as->request_abort();
+ as = std::nullopt;
+ }
+ }
+ };
+ Timer protocol_timer;
+
// TODO: Frame related implementations, probably to a separate class.
private:
bool record_io = false;
void execute_standby();
// WAIT
- void execute_wait();
+ void execute_wait(bool max_backoff);
// SERVER_WAIT
void execute_server_wait();