abort_in_fault();
}
+inline uint64_t generate_client_cookie() {
+ return ceph::util::generate_random_number<uint64_t>(
+ 1, std::numeric_limits<uint64_t>::max());
+}
+
} // namespace anonymous
namespace fmt {
void ProtocolV2::reset_session(bool full)
{
+ server_cookie = 0;
+ connect_seq = 0;
+ conn.in_seq = 0;
if (full) {
- server_cookie = 0;
- connect_seq = 0;
- conn.in_seq = 0;
- } else {
- conn.out_seq = 0;
- conn.in_seq = 0;
- client_cookie = 0;
- server_cookie = 0;
- connect_seq = 0;
+ client_cookie = generate_client_cookie();
peer_global_seq = 0;
+ conn.out_seq = 0;
// TODO:
// discard_out_queue();
// message_seq = 0;
ProtocolV2::client_connect()
{
// send_client_ident() logic
- if (!conn.policy.lossy && !client_cookie) {
- client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
- }
-
uint64_t flags = 0;
if (conn.policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
conn.policy.features_supported);
peer_global_seq = server_ident.global_seq();
- // TODO: lossless policy
- ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY);
- conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
+ conn.policy.lossy = lossy;
+ }
+ if (lossy && (connect_seq != 0 || server_cookie != 0)) {
+ logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
+ conn, connect_seq, server_cookie);
+ connect_seq = 0;
+ server_cookie = 0;
+ }
// TODO: backoff = utime_t();
return dispatcher.ms_handle_connect(
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
return messenger.get_global_seq().then([this] (auto gs) {
global_seq = gs;
+ if (!conn.policy.lossy && server_cookie != 0) {
+ assert(client_cookie != 0);
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else {
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ client_cookie = generate_client_cookie();
+ logger().debug("{} UPDATE: gs={}, cc={} for connect",
+ conn, global_seq, client_cookie);
+ }
enable_recording();
- logger().debug("{} UPDATE: gs={}", conn, global_seq);
return Socket::connect(conn.peer_addr);
}).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
}).then([this] {
return client_auth();
}).then([this] {
- if (!server_cookie) {
+ if (server_cookie == 0) {
ceph_assert(connect_seq == 0);
return client_connect();
} else {