return seastar::now();
}
auto peer = mgrmap.get_active_addrs().front();
- return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then(
- [this](auto _conn) {
- conn = _conn;
- // ask for the mgrconfigure message
- auto m = ceph::make_message<MMgrOpen>();
- m->daemon_name = local_conf()->name.get_id();
- return conn->send(std::move(m));
- });
+ conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
+ // ask for the mgrconfigure message
+ auto m = ceph::make_message<MMgrOpen>();
+ m->daemon_name = local_conf()->name.get_id();
+ return conn->send(std::move(m));
});
}
#warning fixme
auto peer = monmap.get_addrs(rank).front();
logger().info("connecting to mon.{}", rank);
- return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
- [this] (auto conn) -> seastar::future<Connection::AuthResult> {
+ return seastar::futurize_apply(
+ [peer, this] () -> seastar::future<Connection::AuthResult> {
+ auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
auto& mc = pending_conns.emplace_back(
std::make_unique<Connection>(auth_registry, conn, &keyring));
if (conn->get_peer_addr().is_msgr2()) {
/// either return an existing connection to the peer,
/// or a new pending connection
- virtual seastar::future<ConnectionRef>
+ virtual ConnectionRef
connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) = 0;
return seastar::now();
}
-seastar::future<crimson::net::ConnectionRef>
+crimson::net::ConnectionRef
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
assert(seastar::engine().cpu_id() == master_sid);
ceph_assert(peer_addr.get_port() > 0);
if (auto found = lookup_conn(peer_addr); found) {
- return seastar::make_ready_future<ConnectionRef>(found->shared_from_this());
+ return found->shared_from_this();
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
*this, *dispatcher, peer_addr.is_msgr2());
conn->start_connect(peer_addr, peer_type);
- return seastar::make_ready_future<ConnectionRef>(conn->shared_from_this());
+ return conn->shared_from_this();
}
seastar::future<> SocketMessenger::shutdown()
seastar::future<> start(Dispatcher *dispatcher) override;
- seastar::future<ConnectionRef> connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) override;
// can only wait once
seastar::future<> wait() override {
assert(seastar::engine().cpu_id() == master_sid);
}
}
-seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
{
auto [peer_info, added] = peers.try_emplace(peer);
auto& info = peer_info->second;
logger().info("add_peer({})", peer);
auto osdmap = service.get_osdmap_service().get_map();
// TODO: use addrs
- return seastar::when_all_succeed(
- front_msgr->connect(osdmap->get_hb_front_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD),
- back_msgr->connect(osdmap->get_hb_back_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD))
- .then([&info=peer_info->second] (auto con_front, auto con_back) {
- info.con_front = con_front;
- info.con_back = con_back;
- });
- } else {
- return seastar::now();
+ peer_info->second.con_front = front_msgr->connect(
+ osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+ peer_info->second.con_back = back_msgr->connect(
+ osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
}
}
});
}
-seastar::future<> Heartbeat::add_reporter_peers(int whoami)
+void Heartbeat::add_reporter_peers(int whoami)
{
auto osdmap = service.get_osdmap_service().get_map();
// include next and previous up osds to ensure we have a fully-connected set
auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
osdmap->get_random_up_osds_by_subtree(
whoami, subtree, min_down, want, &want);
- return seastar::parallel_for_each(
- std::move(want),
- [epoch=osdmap->get_epoch(), this](int osd) {
- return add_peer(osd, epoch);
- });
+ auto epoch = osdmap->get_epoch();
+ for (int osd : want) {
+ add_peer(osd, epoch);
+ };
}
seastar::future<> Heartbeat::update_peers(int whoami)
{
const auto min_peers = static_cast<size_t>(
local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
- return add_reporter_peers(whoami).then([this] {
- return remove_down_peers();
- }).then([=](osds_t&& extra) {
+ add_reporter_peers(whoami);
+ return remove_down_peers().then([=](osds_t&& extra) {
// too many?
struct iteration_state {
osds_t::const_iterator where;
next = osdmap->get_next_up_osd_after(next)) {
want.push_back(next);
}
- return seastar::parallel_for_each(
- std::move(want),
- [epoch=osdmap->get_epoch(), this](int osd) {
- return add_peer(osd, epoch);
- });
+ auto epoch = osdmap->get_epoch();
+ for (int osd : want) {
+ add_peer(osd, epoch);
+ }
});
}
const auto peer = found->first;
const auto epoch = found->second.epoch;
return remove_peer(peer).then([peer, epoch, this] {
- return add_peer(peer, epoch);
+ add_peer(peer, epoch);
});
}
entity_addrvec_t back);
seastar::future<> stop();
- seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
+ void add_peer(osd_id_t peer, epoch_t epoch);
seastar::future<> update_peers(int whoami);
seastar::future<> remove_peer(osd_id_t peer);
/// @return peers not needed in this epoch
seastar::future<osds_t> remove_down_peers();
/// add enough reporters for fast failure detection
- seastar::future<> add_reporter_peers(int whoami);
+ void add_reporter_peers(int whoami);
seastar::future<> start_messenger(crimson::net::Messenger& msgr,
const entity_addrvec_t& addrs);
if (!state.is_active()) {
return seastar::now();
}
- return seastar::parallel_for_each(
- pg_map.get_pgs(),
- [this](auto& pg) {
- vector<int> up, acting;
- osdmap->pg_to_up_acting_osds(pg.first.pgid,
- &up, nullptr,
- &acting, nullptr);
- return seastar::parallel_for_each(
- boost::join(up, acting),
- [this](int osd) {
- if (osd == CRUSH_ITEM_NONE || osd == whoami) {
- return seastar::now();
- } else {
- return heartbeat->add_peer(osd, osdmap->get_epoch());
- }
- });
- }).then([this] {
- return heartbeat->update_peers(whoami);
- });
+ for (auto& pg : pg_map.get_pgs()) {
+ vector<int> up, acting;
+ osdmap->pg_to_up_acting_osds(pg.first.pgid,
+ &up, nullptr,
+ &acting, nullptr);
+ for (int osd : boost::join(up, acting)) {
+ if (osd == CRUSH_ITEM_NONE || osd == whoami) {
+ continue;
+ } else {
+ heartbeat->add_peer(osd, osdmap->get_epoch());
+ }
+ }
+ }
+ return heartbeat->update_peers(whoami);
}
seastar::future<> OSD::handle_peering_op(
osdmap->get_info(peer).up_from, from_epoch);
return seastar::now();
} else {
- return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD)
- .then([m, this] (auto conn) {
- return conn->send(m);
- });
+ auto conn = cluster_msgr.connect(
+ osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+ return conn->send(m);
}
}
client.msgr->set_require_authorizer(false);
client.msgr->set_auth_client(&client.dummy_auth);
client.msgr->set_auth_server(&client.dummy_auth);
- return client.msgr->start(&client.dispatcher).then([addr, &client] {
- return client.msgr->connect(addr, entity_name_t::TYPE_OSD);
- }).then([&disp=client.dispatcher, count](crimson::net::ConnectionRef conn) {
+ return client.msgr->start(&client.dispatcher).then(
+ [addr, &client, &disp=client.dispatcher, count] {
+ auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
return seastar::do_until(
[&disp,count] { return disp.count >= count; },
[&disp,conn] {
seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
mono_time start_time = mono_clock::now();
- return msgr->connect(peer_addr, entity_name_t::TYPE_OSD
- ).then([this, start_time](auto conn) {
- return seastar::futurize_apply([this, conn] {
- return do_dispatch_pingpong(conn.get());
- }).finally([this, conn, start_time] {
- auto session = find_session(conn.get());
- std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
- std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
- logger().info("{}: handshake {}, pingpong {}",
- *conn, dur_handshake.count(), dur_pingpong.count());
- });
+ auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ return seastar::futurize_apply([this, conn] {
+ return do_dispatch_pingpong(conn.get());
+ }).finally([this, conn, start_time] {
+ auto session = find_session(conn.get());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ *conn, dur_handshake.count(), dur_pingpong.count());
});
}
server->init(entity_name_t::OSD(4), "server3", 5, addr),
client->init(entity_name_t::OSD(5), "client3", 6)
).then([server, client] {
- return client->msgr->connect(server->msgr->get_myaddr(),
- entity_name_t::TYPE_OSD);
- }).then([](crimson::net::ConnectionRef conn) {
+ auto conn = client->msgr->connect(server->msgr->get_myaddr(),
+ entity_name_t::TYPE_OSD);
// send two messages
return conn->send(make_message<MPing>()).then([conn] {
return conn->send(make_message<MPing>());
msgr->set_auth_server(&dummy_auth);
return msgr->start(this);
}
- seastar::future<> send_pings(const entity_addr_t& addr) {
- return msgr->connect(addr, entity_name_t::TYPE_OSD
- ).then([this](crimson::net::ConnectionRef conn) {
- // forwarded to stopped_send_promise
- (void) seastar::do_until(
- [this] { return stop_send; },
- [this, conn] {
- return conn->send(make_message<MPing>()).then([] {
- return seastar::sleep(0ms);
- });
- }
- ).then_wrapped([this, conn] (auto fut) {
- fut.forward_to(std::move(stopped_send_promise));
- });
+ void send_pings(const entity_addr_t& addr) {
+ auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
+ // forwarded to stopped_send_promise
+ (void) seastar::do_until(
+ [this] { return stop_send; },
+ [this, conn] {
+ return conn->send(make_message<MPing>()).then([] {
+ return seastar::sleep(0ms);
+ });
+ }
+ ).then_wrapped([this, conn] (auto fut) {
+ fut.forward_to(std::move(stopped_send_promise));
});
}
seastar::future<> shutdown() {
server->init(entity_name_t::OSD(6), "server4", 7, addr),
client->init(entity_name_t::OSD(7), "client4", 8)
).then([server, client] {
- return client->send_pings(server->get_addr());
- }).then([] {
+ client->send_pings(server->get_addr());
return seastar::sleep(100ms);
}).then([client] {
logger().info("client shutdown...");
public:
seastar::future<> connect_peer() {
logger().info("[Test] connect_peer({})", test_peer_addr);
- return test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD
- ).then([this] (auto conn) {
- auto result = interceptor.find_result(conn);
- ceph_assert(result != nullptr);
-
- if (tracked_conn) {
- if (tracked_conn->is_closed()) {
- ceph_assert(tracked_conn != conn);
- logger().info("[Test] this is a new session replacing an closed one");
- } else {
- ceph_assert(tracked_index == result->index);
- ceph_assert(tracked_conn == conn);
- logger().info("[Test] this is not a new session");
- }
+ auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
+ auto result = interceptor.find_result(conn);
+ ceph_assert(result != nullptr);
+
+ if (tracked_conn) {
+ if (tracked_conn->is_closed()) {
+ ceph_assert(tracked_conn != conn);
+ logger().info("[Test] this is a new session replacing an closed one");
} else {
- logger().info("[Test] this is a new session");
+ ceph_assert(tracked_index == result->index);
+ ceph_assert(tracked_conn == conn);
+ logger().info("[Test] this is not a new session");
}
- tracked_index = result->index;
- tracked_conn = conn;
+ } else {
+ logger().info("[Test] this is a new session");
+ }
+ tracked_index = result->index;
+ tracked_conn = conn;
- return flush_pending_send();
- });
+ return flush_pending_send();
}
seastar::future<> send_peer() {
cmd_msgr->set_auth_server(&dummy_auth);
return cmd_msgr->start(this).then([this, cmd_peer_addr] {
logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
- return cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
- }).then([this] (auto conn) {
- cmd_conn = conn;
+ cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
return pingpong();
});
}
seastar::future<> connect_peer(entity_addr_t addr) {
logger().info("[TestPeer] connect_peer({})", addr);
- return peer_msgr->connect(addr, entity_name_t::TYPE_OSD
- ).then([this] (auto conn) {
- auto new_tracked_conn = conn;
- if (tracked_conn) {
- if (tracked_conn->is_closed()) {
- ceph_assert(tracked_conn != new_tracked_conn);
- logger().info("[TestPeer] this is a new session"
- " replacing an closed one");
- } else {
- ceph_assert(tracked_conn == new_tracked_conn);
- logger().info("[TestPeer] this is not a new session");
- }
+ auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+ if (tracked_conn) {
+ if (tracked_conn->is_closed()) {
+ ceph_assert(tracked_conn != new_tracked_conn);
+ logger().info("[TestPeer] this is a new session"
+ " replacing an closed one");
} else {
- logger().info("[TestPeer] this is a new session");
+ ceph_assert(tracked_conn == new_tracked_conn);
+ logger().info("[TestPeer] this is not a new session");
}
- tracked_conn = new_tracked_conn;
- return flush_pending_send();
- });
+ } else {
+ logger().info("[TestPeer] this is a new session");
+ }
+ tracked_conn = new_tracked_conn;
+ return flush_pending_send();
}
seastar::future<> send_peer() {
// start clients in active cores (#1 ~ #jobs)
if (client.is_active()) {
mono_time start_time = mono_clock::now();
- return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD
- ).then([&client] (auto conn) {
- client.active_conn = conn;
- // make sure handshake won't hurt the performance
- return seastar::sleep(1s);
- }).then([&client, start_time] {
+ client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ // make sure handshake won't hurt the performance
+ return seastar::sleep(1s).then([&client, start_time] {
if (client.conn_stats.connected_time == mono_clock::zero()) {
logger().error("\n{} not connected after 1s!\n", client.lname);
ceph_assert(false);