thread/Throttle.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
- # TODO: fix crimson_mon_client with the new design
- # ${crimson_mon_srcs}
+ ${crimson_mon_srcs}
${crimson_net_srcs}
${crimson_thread_srcs}
${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
#warning fixme
auto peer = monmap.get_addrs(rank).legacy_addr();
logger().info("connecting to mon.{}", rank);
- auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
- auto& mc = pending_conns.emplace_back(conn, &keyring);
- return mc.authenticate(
- monmap.get_epoch(), entity_name,
- *auth_methods, want_keys).handle_exception([conn](auto ep) {
- return conn->close().then([ep = std::move(ep)] {
- std::rethrow_exception(ep);
+ return msgr.connect(peer, CEPH_ENTITY_TYPE_MON)
+ .then([this] (auto xconn) {
+ // sharded-messenger compatible mode assumes all connections running
+ // in one shard.
+ ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
+ ceph::net::ConnectionRef conn = xconn->release();
+ auto& mc = pending_conns.emplace_back(conn, &keyring);
+ return mc.authenticate(
+ monmap.get_epoch(), entity_name,
+ *auth_methods, want_keys).handle_exception([conn](auto ep) {
+ return conn->close().then([ep = std::move(ep)] {
+ std::rethrow_exception(ep);
+ });
});
}).then([peer, this] {
if (!is_hunting()) {
#include "crimson/common/config_proxy.h"
#include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/mon/MonClient.h"
uint32_t nonce,
const OSDMapService& service,
ceph::mon::Client& monc)
- : front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
- "hb_front", nonce}},
- back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
- "hb_back", nonce}},
+ : whoami{whoami},
+ nonce{nonce},
service{service},
monc{monc},
timer{[this] {send_heartbeats();}}
for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
addr.set_port(0);
}
- front_msgr->try_bind(front_addrs,
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max);
- back_msgr->try_bind(front_addrs,
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max);
- return seastar::when_all_succeed(front_msgr->start(this),
- back_msgr->start(this)).then([this] {
- timer.arm_periodic(
- std::chrono::seconds(local_conf()->osd_heartbeat_interval));
- });
+ return seastar::when_all_succeed(
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "hb_front",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this, front_addrs] (auto msgr) {
+ front_msgr = msgr;
+ return front_msgr->try_bind(front_addrs,
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max);
+ }).then([this] { return front_msgr->start(this); }),
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "hb_back",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this, back_addrs] (auto msgr) {
+ back_msgr = msgr;
+ return back_msgr->try_bind(back_addrs,
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max);
+ }).then([this] { return back_msgr->start(this); }))
+ .then([this] {
+ timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_heartbeat_interval));
+ });
}
seastar::future<> Heartbeat::stop()
return back_msgr->get_myaddrs();
}
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
{
auto found = peers.find(peer);
if (found == peers.end()) {
logger().info("add_peer({})", peer);
- PeerInfo info;
auto osdmap = service.get_map();
// TODO: msgr v2
- info.con_front =
- front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
- CEPH_ENTITY_TYPE_OSD);
- info.con_back =
- back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
- CEPH_ENTITY_TYPE_OSD);
- info.epoch = epoch;
- peers.emplace(peer, std::move(info));
+ return seastar::when_all_succeed(
+ front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD),
+ back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD))
+ .then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
+ PeerInfo info;
+ // sharded-messenger compatible mode
+ info.con_front = xcon_front->release();
+ info.con_back = xcon_back->release();
+ info.epoch = epoch;
+ peers.emplace(peer, std::move(info));
+ });
} else {
found->second.epoch = epoch;
+ return seastar::now();
}
}
entity_addrvec_t back);
seastar::future<> stop();
- void add_peer(osd_id_t peer, epoch_t epoch);
+ seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
seastar::future<> update_peers(int whoami);
seastar::future<> remove_peer(osd_id_t peer);
void add_reporter_peers(int whoami);
private:
- std::unique_ptr<ceph::net::Messenger> front_msgr;
- std::unique_ptr<ceph::net::Messenger> back_msgr;
+ const int whoami;
+ const uint32_t nonce;
+ ceph::net::Messenger* front_msgr = nullptr;
+ ceph::net::Messenger* back_msgr = nullptr;
const OSDMapService& service;
ceph::mon::Client& monc;
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
#include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_object.h"
#include "crimson/os/cyan_store.h"
OSD::OSD(int id, uint32_t nonce)
: whoami{id},
- cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
- "cluster", nonce}},
- public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
- "client", nonce}},
- monc{*public_msgr},
- heartbeat{new Heartbeat{whoami, nonce, *this, monc}},
- heartbeat_timer{[this] { update_heartbeat_peers(); }}
-{
- for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) {
- if (local_conf()->ms_crc_data) {
- msgr->set_crc_data();
- }
- if (local_conf()->ms_crc_header) {
- msgr->set_crc_header();
- }
- }
- dispatchers.push_front(this);
- dispatchers.push_front(&monc);
- osdmaps[0] = seastar::make_lw_shared<OSDMap>();
- beacon_timer.set_callback([this] {
- send_beacon();
- });
-}
+ nonce{nonce}
+{}
OSD::~OSD() = default;
seastar::future<> OSD::start()
{
logger().info("start");
- const auto data_path = local_conf().get_val<std::string>("osd_data");
- store = std::make_unique<ceph::os::CyanStore>(data_path);
- return store->mount().then([this] {
+
+ return seastar::when_all_succeed(
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "cluster",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { cluster_msgr = msgr; }),
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "client",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { public_msgr = msgr; }))
+ .then([this] {
+ monc.reset(new ceph::mon::Client{*public_msgr});
+ heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
+
+ for (auto msgr : {cluster_msgr, public_msgr}) {
+ if (local_conf()->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (local_conf()->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ }
+ dispatchers.push_front(this);
+ dispatchers.push_front(monc.get());
+ osdmaps[0] = seastar::make_lw_shared<OSDMap>();
+
+ const auto data_path = local_conf().get_val<std::string>("osd_data");
+ store = std::make_unique<ceph::os::CyanStore>(data_path);
+ return store->mount();
+ }).then([this] {
meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
store.get());
return meta_coll->load_superblock();
osdmap = std::move(map);
return load_pgs();
}).then([this] {
- cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max);
- public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max);
- return seastar::when_all_succeed(cluster_msgr->start(&dispatchers),
- public_msgr->start(&dispatchers));
+ return seastar::when_all_succeed(
+ cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return cluster_msgr->start(&dispatchers); }),
+ public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return public_msgr->start(&dispatchers); }));
}).then([this] {
- return monc.start();
+ return monc->start();
}).then([this] {
- monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0);
- monc.sub_want("mgrmap", 0, 0);
- monc.sub_want("osdmap", 0, 0);
- return monc.renew_subs();
+ monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
+ monc->sub_want("mgrmap", 0, 0);
+ monc->sub_want("osdmap", 0, 0);
+ return monc->renew_subs();
}).then([this] {
return heartbeat->start(public_msgr->get_myaddrs(),
cluster_msgr->get_myaddrs());
}).then([this] {
+ beacon_timer.set_callback([this] { send_beacon(); });
+ heartbeat_timer.set_callback([this] { update_heartbeat_peers(); });
return start_boot();
});
}
seastar::future<> OSD::start_boot()
{
state.set_preboot();
- return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) {
+ return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
return _preboot(newest, oldest);
});
}
heartbeat->get_front_addrs(),
cluster_msgr->get_myaddrs(),
CEPH_FEATURES_ALL);
- return monc.send_message(m);
+ return monc->send_message(m);
}
seastar::future<> OSD::stop()
return gate.close().then([this] {
return heartbeat->stop();
}).then([this] {
- return monc.stop();
+ return monc->stop();
}).then([this] {
return public_msgr->shutdown();
+ }).then([this] {
+ return cluster_msgr->shutdown();
});
}
seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
logger().info("{}({})", __func__, epoch);
- if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+ if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
- return monc.renew_subs();
+ return monc->renew_subs();
} else {
return seastar::now();
}
[=](auto& t) {
return store_maps(t, start, m).then([=, &t] {
// even if this map isn't from a mon, we may have satisfied our subscription
- monc.sub_got("osdmap", last);
+ monc->sub_got("osdmap", last);
if (!superblock.oldest_map || skip_maps) {
superblock.oldest_map = first;
}
epoch_t min_last_epoch_clean = osdmap->get_epoch();
auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
min_last_epoch_clean);
- return monc.send_message(m);
+ return monc->send_message(m);
}
void OSD::update_heartbeat_peers()
seastar::gate gate;
seastar::timer<seastar::lowres_clock> beacon_timer;
const int whoami;
+ const uint32_t nonce;
// talk with osd
- std::unique_ptr<ceph::net::Messenger> cluster_msgr;
+ ceph::net::Messenger* cluster_msgr = nullptr;
// talk with client/mon/mgr
- std::unique_ptr<ceph::net::Messenger> public_msgr;
+ ceph::net::Messenger* public_msgr = nullptr;
ChainedDispatchers dispatchers;
- ceph::mon::Client monc;
+ std::unique_ptr<ceph::mon::Client> monc;
std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> heartbeat_timer;
add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common crimson)
-# TODO: fix unittest_seastar_echo with the new design
-#add_executable(unittest_seastar_echo
-# test_alien_echo.cc)
-#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
+add_executable(unittest_seastar_echo
+ test_alien_echo.cc)
+target_link_libraries(unittest_seastar_echo ceph-common global crimson)
add_executable(unittest_seastar_thread_pool
test_thread_pool.cc)
test_config.cc)
target_link_libraries(unittest_seastar_config crimson)
-# TODO: fix unittest_seastar_monc with the new design
-#add_executable(unittest_seastar_monc
-# test_monc.cc)
-#target_link_libraries(unittest_seastar_monc crimson)
+add_executable(unittest_seastar_monc
+ test_monc.cc)
+target_link_libraries(unittest_seastar_monc crimson)
add_executable(unittest_seastar_perfcounters
test_perfcounters.cc)
#include "msg/Messenger.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
#include "crimson/net/Config.h"
#include "crimson/thread/Condition.h"
#include "crimson/thread/Throttle.h"
struct Server {
ceph::thread::Throttle byte_throttler;
- static constexpr int64_t server_num = 0;
- ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0};
+ ceph::net::Messenger& msgr;
struct ServerDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
new DummyAuthAuthorizer{});
}
} dispatcher;
- Server()
- : byte_throttler(ceph::net::conf.osd_client_message_size_cap)
+ Server(ceph::net::Messenger& msgr)
+ : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+ msgr{msgr}
{
msgr.set_crc_header();
msgr.set_crc_data();
struct Client {
ceph::thread::Throttle byte_throttler;
- static constexpr int64_t client_num = 1;
- ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0};
+ ceph::net::Messenger& msgr;
struct ClientDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
return seastar::now();
}
} dispatcher;
- Client()
- : byte_throttler(ceph::net::conf.osd_client_message_size_cap)
+ Client(ceph::net::Messenger& msgr)
+ : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+ msgr{msgr}
{
msgr.set_crc_header();
msgr.set_crc_data();
{
std::cout << "seastar/";
if (role == echo_role::as_server) {
- return seastar::do_with(seastar_pingpong::Server{},
- [&addr, count](auto& server) mutable {
- std::cout << "server listening at " << addr << std::endl;
- // bind the server
- server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
- &server.byte_throttler);
- server.msgr.bind(entity_addrvec_t{addr});
- return server.msgr.start(&server.dispatcher)
- .then([&dispatcher=server.dispatcher, count] {
- return dispatcher.on_reply.wait([&dispatcher, count] {
- return dispatcher.count >= count;
- });
- }).finally([&server] {
- std::cout << "server shutting down" << std::endl;
- return server.msgr.shutdown();
+ return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", 0,
+ seastar::engine().cpu_id())
+ .then([&addr, count] (auto msgr) {
+ return seastar::do_with(seastar_pingpong::Server{*msgr},
+ [&addr, count](auto& server) mutable {
+ std::cout << "server listening at " << addr << std::endl;
+ // bind the server
+ server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+ &server.byte_throttler);
+ return server.msgr.bind(entity_addrvec_t{addr})
+ .then([&server] {
+ return server.msgr.start(&server.dispatcher);
+ }).then([&dispatcher=server.dispatcher, count] {
+ return dispatcher.on_reply.wait([&dispatcher, count] {
+ return dispatcher.count >= count;
+ });
+ }).finally([&server] {
+ std::cout << "server shutting down" << std::endl;
+ return server.msgr.shutdown();
+ });
});
});
} else {
- return seastar::do_with(seastar_pingpong::Client{},
- [&addr, count](auto& client) {
- std::cout << "client sending to " << addr << std::endl;
- client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
- &client.byte_throttler);
- return client.msgr.start(&client.dispatcher)
- .then([&] {
- return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
- }).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) {
- return seastar::do_until(
- [&disp,count] { return disp.count >= count; },
- [&disp,conn] { return conn->send(MessageRef{new MPing(), false})
- .then([&] { return disp.on_reply.wait(); });
- });
- }).finally([&client] {
- std::cout << "client shutting down" << std::endl;
- return client.msgr.shutdown();
+ return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", 1,
+ seastar::engine().cpu_id())
+ .then([&addr, count] (auto msgr) {
+ return seastar::do_with(seastar_pingpong::Client{*msgr},
+ [&addr, count](auto& client) {
+ std::cout << "client sending to " << addr << std::endl;
+ client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+ &client.byte_throttler);
+ return client.msgr.start(&client.dispatcher)
+ .then([&] {
+ return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
+ }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) {
+ return seastar::do_until(
+ [&disp,count] { return disp.count >= count; },
+ [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false})
+ .then([&] { return disp.on_reply.wait(); });
+ });
+ }).finally([&client] {
+ std::cout << "client shutting down" << std::endl;
+ return client.msgr.shutdown();
+ });
});
});
}
#include "crimson/common/config_proxy.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
using Config = ceph::common::ConfigProxy;
using MonClient = ceph::mon::Client;
}).then([] {
return ceph::common::sharded_perf_coll().start();
}).then([] {
- return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0},
- [](ceph::net::Messenger& msgr) {
+ return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
+ seastar::engine().cpu_id())
+ .then([] (ceph::net::Messenger *msgr) {
auto& conf = ceph::common::local_conf();
if (conf->ms_crc_data) {
- msgr.set_crc_data();
+ msgr->set_crc_data();
}
if (conf->ms_crc_header) {
- msgr.set_crc_header();
+ msgr->set_crc_header();
}
- return seastar::do_with(MonClient{msgr},
- [&msgr](auto& monc) {
- return msgr.start(&monc).then([&monc] {
+ return seastar::do_with(MonClient{*msgr},
+ [msgr](auto& monc) {
+ return msgr->start(&monc).then([&monc] {
return seastar::with_timeout(
seastar::lowres_clock::now() + std::chrono::seconds{10},
monc.start());
}).then([&monc] {
return monc.stop();
});
- }).finally([&msgr] {
- return msgr.shutdown();
+ }).finally([msgr] {
+ return msgr->shutdown();
});
});
}).finally([] {