Connection(const AuthRegistry& auth_registry,
ceph::net::ConnectionRef conn,
KeyRing* keyring);
+ enum class AuthResult {
+ success = 0,
+ failure,
+ canceled
+ };
seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
// v1
- seastar::future<> authenticate_v1(epoch_t epoch,
- const EntityName& name,
- uint32_t want_keys);
+ seastar::future<AuthResult> authenticate_v1(
+ epoch_t epoch,
+ const EntityName& name,
+ uint32_t want_keys);
// v2
- seastar::future<> authenticate_v2();
+ seastar::future<AuthResult> authenticate_v2();
auth::AuthClient::auth_request_t
get_auth_request(const EntityName& name,
uint32_t want_keys);
rotating,
general,
};
- seastar::future<bool> do_auth(request_t);
+ seastar::future<std::optional<AuthResult>> do_auth_single(request_t);
+ seastar::future<AuthResult> do_auth(request_t);
private:
bool closed = false;
using clock_t = seastar::lowres_system_clock;
clock_t::time_point auth_start;
ceph::auth::method_t auth_method = 0;
- seastar::promise<> auth_done;
+ seastar::promise<AuthResult> auth_done;
// v1 and v2
const AuthRegistry& auth_registry;
ceph::net::ConnectionRef conn;
seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
{
reply.set_value(m);
+ reply = {};
return seastar::now();
}
seastar::future<> Connection::renew_tickets()
{
if (auth->need_tickets()) {
- return do_auth(request_t::general).then([](bool success) {
- if (!success) {
- throw std::system_error(make_error_code(
- ceph::net::error::negotiation_failure));
+ return do_auth(request_t::general).then([](AuthResult r) {
+ if (r != AuthResult::success) {
+ throw std::system_error(
+ make_error_code(
+ ceph::net::error::negotiation_failure));
}
});
}
return seastar::now();
}
last_rotating_renew_sent = now;
- return do_auth(request_t::rotating).then([](bool success) {
- if (!success) {
+ return do_auth(request_t::rotating).then([](AuthResult r) {
+ if (r != AuthResult::success) {
throw std::system_error(make_error_code(
ceph::net::error::negotiation_failure));
}
return conn->send(m);
}
-seastar::future<bool> Connection::do_auth(Connection::request_t what)
+seastar::future<std::optional<Connection::AuthResult>>
+Connection::do_auth_single(Connection::request_t what)
{
auto m = make_message<MAuth>();
m->protocol = auth->get_protocol();
logger().info("waiting");
return reply.get_future();
}).then([this] (Ref<MAuthReply> m) {
- logger().info("mon {} => {} returns {}: {}",
- conn->get_messenger()->get_myaddr(),
- conn->get_peer_addr(), *m, m->result);
- reply = {};
+ if (!m) {
+ ceph_assert(closed);
+ logger().info("do_auth: connection closed");
+ return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
+ std::make_optional(AuthResult::canceled));
+ }
+ logger().info(
+ "do_auth: mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
auto p = m->result_bl.cbegin();
auto ret = auth->handle_response(m->result, p,
nullptr, nullptr);
if (ret != 0 && ret != -EAGAIN) {
- throw std::system_error(make_error_code(
- ceph::net::error::negotiation_failure));
+ logger().error(
+ "do_auth: got error {} on mon {}",
+ ret,
+ conn->get_peer_addr());
}
- return seastar::make_ready_future<bool>(ret == 0);
+ return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
+ ret == -EAGAIN
+ ? std::nullopt
+ : std::make_optional(ret == 0
+ ? AuthResult::success
+ : AuthResult::failure
+ ));
});
}
-seastar::future<>
+seastar::future<Connection::AuthResult>
+Connection::do_auth(Connection::request_t what) {
+ return seastar::repeat_until_value([this, what]() {
+ return do_auth_single(what);
+ });
+}
+
+seastar::future<Connection::AuthResult>
Connection::authenticate_v1(epoch_t epoch,
const EntityName& name,
uint32_t want_keys)
}).then([this] {
return reply.get_future();
}).then([name, want_keys, this](Ref<MAuthReply> m) {
- reply = {};
+ if (!m) {
+ logger().error("authenticate_v1 canceled on {}", name);
+ return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
+ }
global_id = m->global_id;
auth = create_auth(m->protocol, m->global_id, name, want_keys);
switch (auto p = m->result_bl.cbegin();
nullptr, nullptr)) {
case 0:
// none
- return seastar::now();
+ return seastar::make_ready_future<AuthResult>(AuthResult::success);
case -EAGAIN:
// cephx
- return seastar::repeat([this] {
- return do_auth(request_t::general).then([](bool success) {
- return seastar::make_ready_future<seastar::stop_iteration>(
- success ?
- seastar::stop_iteration::yes:
- seastar::stop_iteration::no);
- });
- });
+ return do_auth(request_t::general);
default:
ceph_assert_always(0);
}
+ }).handle_exception([](auto ep) {
+ logger().error("authenticate_v1 failed with {}", ep);
+ return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
});
}
-seastar::future<> Connection::authenticate_v2()
+seastar::future<Connection::AuthResult> Connection::authenticate_v2()
{
auth_start = seastar::lowres_system_clock::now();
return conn->send(make_message<MMonGetMap>()).then([this] {
secret_t connection_secret;
int r = auth->handle_response(0, p, &session_key, &connection_secret);
conn->set_last_keepalive_ack(auth_start);
- auth_done.set_value();
+ auth_done.set_value(AuthResult::success);
return {session_key, connection_secret, r};
}
seastar::future<> Connection::close()
{
+ reply.set_value(Ref<MAuthReply>(nullptr));
+ reply = {};
+ auth_done.set_value(AuthResult::canceled);
+ auth_done = {};
if (conn && !std::exchange(closed, true)) {
return conn->close();
} else {
bool Connection::is_my_peer(const entity_addr_t& addr) const
{
+ ceph_assert(conn);
return conn->get_peer_addr() == addr;
}
{
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found != pending_conns.end()) {
logger().warn("pending conn reset by {}", conn->get_peer_addr());
- return found->close();
+ return (*found)->close();
} else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
logger().warn("active conn reset {}", conn->get_peer_addr());
active_con.reset();
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = con->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found == pending_conns.end()) {
throw ceph::auth::error{"unknown connection"};
}
- return found->get_auth_request(entity_name, want_keys);
+ return (*found)->get_auth_request(entity_name, want_keys);
} else {
// generate authorizer
if (!active_con) {
if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found == pending_conns.end()) {
throw ceph::auth::error{"unknown connection"};
}
bufferlist reply;
tie(auth_meta->session_key, auth_meta->connection_secret, reply) =
- found->handle_auth_reply_more(bl);
+ (*found)->handle_auth_reply_more(bl);
return reply;
} else {
// authorizer challenges
if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found == pending_conns.end()) {
return -ENOENT;
}
int r = 0;
tie(auth_meta->session_key, auth_meta->connection_secret, r) =
- found->handle_auth_done(global_id, bl);
+ (*found)->handle_auth_done(global_id, bl);
return r;
} else {
// verify authorizer reply
if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found != pending_conns.end()) {
- return found->handle_auth_bad_method(old_auth_method, result,
- allowed_methods, allowed_modes);
+ return (*found)->handle_auth_bad_method(
+ old_auth_method, result,
+ allowed_methods, allowed_modes);
} else {
return -ENOENT;
}
if (monmap.get_addr_name(peer_addr, cur_mon)) {
if (active_con) {
- return seastar::when_all_succeed(active_con->renew_tickets(),
- active_con->renew_rotating_keyring());
+ logger().info("handle_monmap: renewing tickets");
+ return seastar::when_all_succeed(
+ active_con->renew_tickets(),
+ active_con->renew_rotating_keyring()).then([](){
+ logger().info("handle_mon_map: renewed tickets");
+ });
} else {
return seastar::now();
}
seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn,
Ref<MAuthReply> m)
{
- logger().info("mon {} => {} returns {}: {}",
- conn->get_messenger()->get_myaddr(),
- conn->get_peer_addr(), *m, m->result);
+ logger().info(
+ "handle_auth_reply mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc.is_my_peer(peer_addr);
+ return mc->is_my_peer(peer_addr);
});
if (found != pending_conns.end()) {
- return found->handle_auth_reply(m);
+ return (*found)->handle_auth_reply(m);
} else if (active_con) {
return active_con->handle_auth_reply(m);
} else {
seastar::future<> Client::reopen_session(int rank)
{
+ logger().info("{} to mon.{}", __func__, rank);
vector<unsigned> mons;
if (rank >= 0) {
mons.push_back(rank);
#warning fixme
auto peer = monmap.get_addrs(rank).front();
logger().info("connecting to mon.{}", rank);
- return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then([this] (auto xconn) {
+ return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
+ [this] (auto xconn) -> seastar::future<Connection::AuthResult> {
// sharded-messenger compatible mode assumes all connections running
// in one shard.
ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
ceph::net::ConnectionRef conn = xconn->release();
- auto& mc = pending_conns.emplace_back(auth_registry, conn, &keyring);
+ auto& mc = pending_conns.emplace_back(
+ std::make_unique<Connection>(auth_registry, conn, &keyring));
if (conn->get_peer_addr().is_msgr2()) {
- return mc.authenticate_v2();
+ return mc->authenticate_v2();
} else {
- return mc.authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
+ return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
.handle_exception([conn](auto ep) {
- return conn->close().then([ep = std::move(ep)] {
- std::rethrow_exception(ep);
+ return conn->close().then([ep=std::move(ep)](){
+ return seastar::make_exception_future<Connection::AuthResult>(ep);
});
});
}
- }).then([peer, this] {
+ }).then([peer, this](auto result) {
+ if (result == Connection::AuthResult::canceled) {
+ return seastar::now();
+ }
+
if (!is_hunting()) {
return seastar::now();
}
logger().info("found mon.{}", monmap.get_name(peer));
- return seastar::parallel_for_each(pending_conns, [peer, this] (auto& conn) {
- if (conn.is_my_peer(peer)) {
- active_con.reset(new Connection{std::move(conn)});
- return seastar::now();
- } else {
- return conn.close();
- }
- });
+
+ auto found = std::find_if(
+ pending_conns.begin(), pending_conns.end(),
+ [peer](auto& conn) {
+ return conn->is_my_peer(peer);
+ });
+ if (found == pending_conns.end()) {
+ // Happens if another connection has won the race
+ ceph_assert(active_con && pending_conns.empty());
+ logger().info(
+ "no pending connection for mon.{}, peer {}",
+ monmap.get_name(peer),
+ peer);
+ return seastar::now();
+ }
+
+ ceph_assert(!active_con && !pending_conns.empty());
+ active_con = std::move(*found);
+ found->reset();
+ auto ret = seastar::do_with(
+ std::move(pending_conns),
+ [this](auto &pending_conns) {
+ return seastar::parallel_for_each(
+ pending_conns,
+ [this] (auto &conn) {
+ if (!conn) {
+ return seastar::now();
+ } else {
+ return conn->close();
+ }
+ });
+ });
+ pending_conns.clear();
+ return ret;
+ }).then([]() {
+ logger().debug("reopen_session mon connection attempts complete");
+ }).handle_exception([](auto ep) {
+ logger().error("mon connections failed with ep {}", ep);
+ return seastar::make_exception_future(ep);
});
}).then([this] {
- pending_conns.clear();
ceph_assert_always(active_con);
return active_con->renew_rotating_keyring();
});