}
auto peer = mgrmap.get_active_addrs().front();
return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then(
- [this](auto xconn) {
- conn = xconn->release();
+ [this](auto _conn) {
+ conn = _conn;
// ask for the mgrconfigure message
auto m = ceph::make_message<MMgrOpen>();
m->daemon_name = local_conf()->name.get_id();
auto peer = monmap.get_addrs(rank).front();
logger().info("connecting to mon.{}", rank);
return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
- [this] (auto xconn) -> seastar::future<Connection::AuthResult> {
- // sharded-messenger compatible mode assumes all connections running
- // in one shard.
- ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
- crimson::net::ConnectionRef conn = xconn->release();
+ [this] (auto conn) -> seastar::future<Connection::AuthResult> {
auto& mc = pending_conns.emplace_back(
std::make_unique<Connection>(auth_registry, conn, &keyring));
if (conn->get_peer_addr().is_msgr2()) {
// will wait for all connections closed
virtual seastar::future<> close() = 0;
- /// which shard id the connection lives
- virtual seastar::shard_id shard_id() const = 0;
-
virtual void print(ostream& out) const = 0;
void set_last_keepalive(clock_t::time_point when) {
bufferlist&) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});
}
-
- // get the local dispatcher shard if it is accessed by another core
- virtual Dispatcher* get_local_shard() {
- return this;
- }
};
} // namespace crimson::net
class Connection;
using ConnectionRef = seastar::shared_ptr<Connection>;
-// NOTE: ConnectionXRef should only be used in seastar world, because
-// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads.
-using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>;
class Dispatcher;
class Messenger;
-
-template <typename T, typename... Args>
-seastar::future<T*> create_sharded(Args... args) {
- // seems we should only construct/stop shards on #0
- return seastar::smp::submit_to(0, [=] {
- auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
- return sharded_obj->start(args...).then([sharded_obj]() {
- seastar::engine().at_exit([sharded_obj]() {
- return sharded_obj->stop().finally([sharded_obj] {});
- });
- return sharded_obj.get();
- });
- }).then([] (seastar::sharded<T> *ptr_shard) {
- // return the pointer valid for the caller CPU
- return &ptr_shard->local();
- });
-}
+using MessengerRef = seastar::shared_ptr<Messenger>;
} // namespace crimson::net
namespace crimson::net {
-seastar::future<Messenger*>
+MessengerRef
Messenger::create(const entity_name_t& name,
const std::string& lname,
- const uint64_t nonce,
- const int master_sid)
+ const uint64_t nonce)
{
- // enforce the messenger to a specific core (master_sid)
- // TODO: drop the cross-core feature and cleanup the related interfaces in
- // the future.
- ceph_assert(master_sid >= 0);
- return create_sharded<SocketMessenger>(
- name, lname, nonce, static_cast<seastar::shard_id>(master_sid)
- ).then([](Messenger *msgr) {
- return msgr;
- });
+ return seastar::make_shared<SocketMessenger>(name, lname, nonce);
}
} // namespace crimson::net
/// either return an existing connection to the peer,
/// or a new pending connection
- virtual seastar::future<ConnectionXRef>
+ virtual seastar::future<ConnectionRef>
connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) = 0;
auth_server = as;
}
- // get the local messenger shard if it is accessed by another core
- virtual Messenger* get_local_shard() {
- return this;
- }
-
virtual void print(ostream& out) const = 0;
virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
void set_require_authorizer(bool r) {
require_authorizer = r;
}
- static seastar::future<Messenger*>
+ static MessengerRef
create(const entity_name_t& name,
const std::string& lname,
- const uint64_t nonce,
- const int master_sid=-1);
+ const uint64_t nonce);
};
inline ostream& operator<<(ostream& out, const Messenger& msgr) {
bool is_msgr2)
: messenger(messenger)
{
- ceph_assert(&messenger.container().local() == &messenger);
if (is_msgr2) {
protocol = std::make_unique<ProtocolV2>(dispatcher, *this, messenger);
} else {
bool SocketConnection::is_connected() const
{
- ceph_assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::engine().cpu_id() == shard_id());
return protocol->is_connected();
}
#ifdef UNIT_TESTS_BUILT
bool SocketConnection::is_closed() const
{
- ceph_assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::engine().cpu_id() == shard_id());
return protocol->is_closed();
}
seastar::future<> SocketConnection::send(MessageRef msg)
{
- // Cannot send msg from another core now, its ref counter can be contaminated!
- ceph_assert(seastar::engine().cpu_id() == shard_id());
- return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
- return protocol->send(std::move(msg));
- });
+ assert(seastar::engine().cpu_id() == shard_id());
+ return protocol->send(std::move(msg));
}
seastar::future<> SocketConnection::keepalive()
{
- return seastar::smp::submit_to(shard_id(), [this] {
- return protocol->keepalive();
- });
+ assert(seastar::engine().cpu_id() == shard_id());
+ return protocol->keepalive();
}
seastar::future<> SocketConnection::close()
{
- ceph_assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::engine().cpu_id() == shard_id());
return protocol->close();
}
// messages sent, but not yet acked by peer
std::deque<MessageRef> sent;
+ seastar::shard_id shard_id() const;
+
public:
SocketConnection(SocketMessenger& messenger,
Dispatcher& dispatcher,
seastar::future<> close() override;
- seastar::shard_id shard_id() const override;
-
void print(ostream& out) const override;
/// start a handshake from the client's perspective,
SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce,
- seastar::shard_id master_sid)
+ uint32_t nonce)
: Messenger{myname},
- master_sid{master_sid},
+ master_sid{seastar::engine().cpu_id()},
logic_name{logic_name},
nonce{nonce}
{}
seastar::future<> SocketMessenger::start(Dispatcher *disp) {
assert(seastar::engine().cpu_id() == master_sid);
- dispatcher = disp->get_local_shard();
+ dispatcher = disp;
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
return seastar::now();
}
-seastar::future<crimson::net::ConnectionXRef>
+seastar::future<crimson::net::ConnectionRef>
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
assert(seastar::engine().cpu_id() == master_sid);
ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
ceph_assert(peer_addr.get_port() > 0);
- // TODO: use ConnectionRef
if (auto found = lookup_conn(peer_addr); found) {
- return seastar::make_ready_future<ConnectionXRef>(
- seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
- seastar::make_foreign(found->shared_from_this())));
+ return seastar::make_ready_future<ConnectionRef>(found->shared_from_this());
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
*this, *dispatcher, peer_addr.is_msgr2());
conn->start_connect(peer_addr, peer_type);
- return seastar::make_ready_future<ConnectionXRef>(
- seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
- seastar::make_foreign(conn->shared_from_this())));
+ return seastar::make_ready_future<ConnectionRef>(conn->shared_from_this());
}
seastar::future<> SocketMessenger::shutdown()
class FixedCPUServerSocket;
-class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+class SocketMessenger final : public Messenger {
const seastar::shard_id master_sid;
seastar::promise<> shutdown_promise;
public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce,
- seastar::shard_id master_sid);
+ uint32_t nonce);
~SocketMessenger() override { ceph_assert(!listener); }
seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
seastar::future<> start(Dispatcher *dispatcher) override;
- seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ seastar::future<ConnectionRef> connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) override;
// can only wait once
seastar::future<> wait() override {
assert(seastar::engine().cpu_id() == master_sid);
seastar::future<> shutdown() override;
- Messenger* get_local_shard() override {
- return &container().local();
- }
-
void print(ostream& out) const override {
out << get_myname()
<< "(" << logic_name
Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
crimson::mon::Client& monc,
- crimson::net::Messenger& front_msgr,
- crimson::net::Messenger& back_msgr)
+ crimson::net::MessengerRef front_msgr,
+ crimson::net::MessengerRef back_msgr)
: service{service},
monc{monc},
front_msgr{front_msgr},
}
using crimson::net::SocketPolicy;
- front_msgr.set_policy(entity_name_t::TYPE_OSD,
+ front_msgr->set_policy(entity_name_t::TYPE_OSD,
+ SocketPolicy::stateless_server(0));
+ back_msgr->set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
- back_msgr.set_policy(entity_name_t::TYPE_OSD,
- SocketPolicy::stateless_server(0));
- return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs),
- start_messenger(back_msgr, back_addrs))
+ return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
+ start_messenger(*back_msgr, back_addrs))
.then([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
seastar::future<> Heartbeat::stop()
{
- return seastar::when_all_succeed(front_msgr.shutdown(),
- back_msgr.shutdown());
+ return seastar::when_all_succeed(front_msgr->shutdown(),
+ back_msgr->shutdown());
}
const entity_addrvec_t& Heartbeat::get_front_addrs() const
{
- return front_msgr.get_myaddrs();
+ return front_msgr->get_myaddrs();
}
const entity_addrvec_t& Heartbeat::get_back_addrs() const
{
- return back_msgr.get_myaddrs();
+ return back_msgr->get_myaddrs();
}
void Heartbeat::set_require_authorizer(bool require_authorizer)
{
- if (front_msgr.get_require_authorizer() != require_authorizer) {
- front_msgr.set_require_authorizer(require_authorizer);
- back_msgr.set_require_authorizer(require_authorizer);
+ if (front_msgr->get_require_authorizer() != require_authorizer) {
+ front_msgr->set_require_authorizer(require_authorizer);
+ back_msgr->set_require_authorizer(require_authorizer);
}
}
auto osdmap = service.get_osdmap_service().get_map();
// TODO: use addrs
return seastar::when_all_succeed(
- front_msgr.connect(osdmap->get_hb_front_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD),
- back_msgr.connect(osdmap->get_hb_back_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD))
- .then([&info=peer_info->second] (auto xcon_front, auto xcon_back) {
- // sharded-messenger compatible mode
- info.con_front = xcon_front->release();
- info.con_back = xcon_back->release();
+ front_msgr->connect(osdmap->get_hb_front_addrs(peer).front(),
+ CEPH_ENTITY_TYPE_OSD),
+ back_msgr->connect(osdmap->get_hb_back_addrs(peer).front(),
+ CEPH_ENTITY_TYPE_OSD))
+ .then([&info=peer_info->second] (auto con_front, auto con_back) {
+ info.con_front = con_front;
+ info.con_back = con_back;
});
} else {
return seastar::now();
Heartbeat(const crimson::osd::ShardServices& service,
crimson::mon::Client& monc,
- crimson::net::Messenger& front_msgr,
- crimson::net::Messenger& back_msgr);
+ crimson::net::MessengerRef front_msgr,
+ crimson::net::MessengerRef back_msgr);
seastar::future<> start(entity_addrvec_t front,
entity_addrvec_t back);
private:
const crimson::osd::ShardServices& service;
crimson::mon::Client& monc;
- crimson::net::Messenger& front_msgr;
- crimson::net::Messenger& back_msgr;
+ crimson::net::MessengerRef front_msgr;
+ crimson::net::MessengerRef back_msgr;
seastar::timer<seastar::lowres_clock> timer;
// use real_clock so it can be converted to utime_t
#include "common/ceph_argparse.h"
#include "crimson/common/buffer_io.h"
#include "crimson/common/config_proxy.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
#include "global/pidfile.h"
#include "osd.h"
&cluster_name,
&conf_file_list);
seastar::sharded<crimson::osd::OSD> osd;
- seastar::sharded<crimson::net::SocketMessenger> cluster_msgr, client_msgr;
- seastar::sharded<crimson::net::SocketMessenger> hb_front_msgr, hb_back_msgr;
using crimson::common::sharded_conf;
using crimson::common::sharded_perf_coll;
try {
pidfile_write(local_conf()->pid_file);
const int whoami = std::stoi(local_conf()->name.get_id());
const auto nonce = static_cast<uint32_t>(getpid());
+ crimson::net::MessengerRef cluster_msgr, client_msgr;
+ crimson::net::MessengerRef hb_front_msgr, hb_back_msgr;
for (auto [msgr, name] : {make_pair(std::ref(cluster_msgr), "cluster"s),
make_pair(std::ref(client_msgr), "client"s),
make_pair(std::ref(hb_front_msgr), "hb_front"s),
make_pair(std::ref(hb_back_msgr), "hb_back"s)}) {
- const auto shard = seastar::engine().cpu_id();
- msgr.start(entity_name_t::OSD(whoami), name, nonce, shard).get();
+ msgr = crimson::net::Messenger::create(entity_name_t::OSD(whoami), name, nonce);
if (local_conf()->ms_crc_data) {
- msgr.local().set_crc_data();
+ msgr->set_crc_data();
}
if (local_conf()->ms_crc_header) {
- msgr.local().set_crc_header();
+ msgr->set_crc_header();
}
}
osd.start_single(whoami, nonce,
- reference_wrapper<crimson::net::Messenger>(cluster_msgr.local()),
- reference_wrapper<crimson::net::Messenger>(client_msgr.local()),
- reference_wrapper<crimson::net::Messenger>(hb_front_msgr.local()),
- reference_wrapper<crimson::net::Messenger>(hb_back_msgr.local())).get();
- seastar::engine().at_exit([&] {
- return seastar::when_all_succeed(cluster_msgr.stop(),
- client_msgr.stop(),
- hb_front_msgr.stop(),
- hb_back_msgr.stop());
- });
+ cluster_msgr, client_msgr,
+ hb_front_msgr, hb_back_msgr).get();
if (config.count("mkkey")) {
make_keyring().handle_exception([](std::exception_ptr) {
seastar::engine().exit(1);
namespace crimson::osd {
OSD::OSD(int id, uint32_t nonce,
- crimson::net::Messenger& cluster_msgr,
- crimson::net::Messenger& public_msgr,
- crimson::net::Messenger& hb_front_msgr,
- crimson::net::Messenger& hb_back_msgr)
+ crimson::net::MessengerRef cluster_msgr,
+ crimson::net::MessengerRef public_msgr,
+ crimson::net::MessengerRef hb_front_msgr,
+ crimson::net::MessengerRef hb_back_msgr)
: whoami{id},
nonce{nonce},
// do this in background
beacon_timer{[this] { (void)send_beacon(); }},
cluster_msgr{cluster_msgr},
public_msgr{public_msgr},
- monc{new crimson::mon::Client{public_msgr, *this}},
- mgrc{new crimson::mgr::Client{public_msgr, *this}},
+ monc{new crimson::mon::Client{*public_msgr, *this}},
+ mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{crimson::os::FuturizedStore::create(
local_conf().get_val<std::string>("osd_objectstore"),
local_conf().get_val<std::string>("osd_data"))},
- shard_services{*this, cluster_msgr, public_msgr, *monc, *mgrc, *store},
+ shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
osdmaps[0] = boost::make_local_shared<OSDMap>();
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
- msgr.get().set_auth_server(monc.get());
- msgr.get().set_auth_client(monc.get());
+ msgr.get()->set_auth_server(monc.get());
+ msgr.get()->set_auth_client(monc.get());
}
if (local_conf()->osd_open_classes_on_start) {
CEPH_FEATURE_OSDENC;
using crimson::net::SocketPolicy;
- public_msgr.set_default_policy(SocketPolicy::stateless_server(0));
- public_msgr.set_policy(entity_name_t::TYPE_MON,
- SocketPolicy::lossy_client(osd_required));
- public_msgr.set_policy(entity_name_t::TYPE_MGR,
- SocketPolicy::lossy_client(osd_required));
- public_msgr.set_policy(entity_name_t::TYPE_OSD,
- SocketPolicy::stateless_server(0));
-
- cluster_msgr.set_default_policy(SocketPolicy::stateless_server(0));
- cluster_msgr.set_policy(entity_name_t::TYPE_MON,
- SocketPolicy::lossy_client(0));
- cluster_msgr.set_policy(entity_name_t::TYPE_OSD,
- SocketPolicy::lossless_peer(osd_required));
- cluster_msgr.set_policy(entity_name_t::TYPE_CLIENT,
+ public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ public_msgr->set_policy(entity_name_t::TYPE_MON,
+ SocketPolicy::lossy_client(osd_required));
+ public_msgr->set_policy(entity_name_t::TYPE_MGR,
+ SocketPolicy::lossy_client(osd_required));
+ public_msgr->set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
+ cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ cluster_msgr->set_policy(entity_name_t::TYPE_MON,
+ SocketPolicy::lossy_client(0));
+ cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
+ SocketPolicy::lossless_peer(osd_required));
+ cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+ SocketPolicy::stateless_server(0));
+
dispatchers.push_front(this);
dispatchers.push_front(monc.get());
dispatchers.push_front(mgrc.get());
return seastar::when_all_succeed(
- cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+ cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return cluster_msgr->start(&dispatchers); }),
+ public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return cluster_msgr.start(&dispatchers); }),
- public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max)
- .then([this] { return public_msgr.start(&dispatchers); }));
+ .then([this] { return public_msgr->start(&dispatchers); }));
}).then([this] {
return seastar::when_all_succeed(monc->start(),
mgrc->start());
return monc->renew_subs();
}).then([this] {
if (auto [addrs, changed] =
- replace_unknown_addrs(cluster_msgr.get_myaddrs(),
- public_msgr.get_myaddrs()); changed) {
- return cluster_msgr.set_myaddrs(addrs);
+ replace_unknown_addrs(cluster_msgr->get_myaddrs(),
+ public_msgr->get_myaddrs()); changed) {
+ return cluster_msgr->set_myaddrs(addrs);
} else {
return seastar::now();
}
}).then([this] {
- return heartbeat->start(public_msgr.get_myaddrs(),
- cluster_msgr.get_myaddrs());
+ return heartbeat->start(public_msgr->get_myaddrs(),
+ cluster_msgr->get_myaddrs());
}).then([this] {
return start_boot();
});
logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
- logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr());
+ logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
auto m = make_message<MOSDBoot>(superblock,
osdmap->get_epoch(),
osdmap->get_epoch(),
heartbeat->get_back_addrs(),
heartbeat->get_front_addrs(),
- cluster_msgr.get_myaddrs(),
+ cluster_msgr->get_myaddrs(),
CEPH_FEATURES_ALL);
return monc->send_message(m);
}
return monc->stop();
}).then([this] {
return when_all_succeed(
- public_msgr.shutdown(),
- cluster_msgr.shutdown());
+ public_msgr->shutdown(),
+ cluster_msgr->shutdown());
}).then([this] {
return store->umount();
}).handle_exception([](auto ep) {
shard_services.update_map(osdmap);
if (up_epoch != 0 &&
osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
up_epoch = osdmap->get_epoch();
if (!boot_epoch) {
boot_epoch = osdmap->get_epoch();
});
}).then([m, this] {
if (osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
bind_epoch < osdmap->get_up_from(whoami)) {
if (state.is_booting()) {
logger().info("osd.{}: activating...", whoami);
logger().info("map e {} marked osd.{} down",
osdmap->get_epoch(), whoami);
return true;
- } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) {
+ } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
logger().error("map e {} had wrong client addr ({} != my {})",
osdmap->get_epoch(),
osdmap->get_addrs(whoami),
- public_msgr.get_myaddrs());
+ public_msgr->get_myaddrs());
return true;
- } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) {
+ } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
logger().error("map e {} had wrong cluster addr ({} != my {})",
osdmap->get_epoch(),
osdmap->get_cluster_addrs(whoami),
- cluster_msgr.get_myaddrs());
+ cluster_msgr->get_myaddrs());
return true;
} else {
return false;
const uint32_t nonce;
seastar::timer<seastar::lowres_clock> beacon_timer;
// talk with osd
- crimson::net::Messenger& cluster_msgr;
+ crimson::net::MessengerRef cluster_msgr;
// talk with client/mon/mgr
- crimson::net::Messenger& public_msgr;
+ crimson::net::MessengerRef public_msgr;
ChainedDispatchers dispatchers;
std::unique_ptr<crimson::mon::Client> monc;
std::unique_ptr<crimson::mgr::Client> mgrc;
public:
OSD(int id, uint32_t nonce,
- crimson::net::Messenger& cluster_msgr,
- crimson::net::Messenger& client_msgr,
- crimson::net::Messenger& hb_front_msgr,
- crimson::net::Messenger& hb_back_msgr);
+ crimson::net::MessengerRef cluster_msgr,
+ crimson::net::MessengerRef client_msgr,
+ crimson::net::MessengerRef hb_front_msgr,
+ crimson::net::MessengerRef hb_back_msgr);
~OSD() final;
seastar::future<> mkfs(uuid_d osd_uuid, uuid_d cluster_fsid);
} else {
return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
CEPH_ENTITY_TYPE_OSD)
- .then([m, this] (auto xconn) {
- return (*xconn)->send(m);
+ .then([m, this] (auto conn) {
+ return conn->send(m);
});
}
}
struct Server {
crimson::thread::Throttle byte_throttler;
- crimson::net::Messenger& msgr;
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
struct ServerDispatcher : crimson::net::Dispatcher {
unsigned count = 0;
0, bufferlist{});
}
} dispatcher;
- Server(crimson::net::Messenger& msgr)
+ Server(crimson::net::MessengerRef msgr)
: byte_throttler(crimson::net::conf.osd_client_message_size_cap),
msgr{msgr}
{
- msgr.set_crc_header();
- msgr.set_crc_data();
+ msgr->set_crc_header();
+ msgr->set_crc_data();
}
};
struct Client {
crimson::thread::Throttle byte_throttler;
- crimson::net::Messenger& msgr;
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
struct ClientDispatcher : crimson::net::Dispatcher {
unsigned count = 0;
return seastar::now();
}
} dispatcher;
- Client(crimson::net::Messenger& msgr)
+ Client(crimson::net::MessengerRef msgr)
: byte_throttler(crimson::net::conf.osd_client_message_size_cap),
msgr{msgr}
{
- msgr.set_crc_header();
- msgr.set_crc_data();
+ msgr->set_crc_header();
+ msgr->set_crc_data();
}
};
} // namespace seastar_pingpong
{
std::cout << "seastar/";
if (role == echo_role::as_server) {
- return crimson::net::Messenger::create(entity_name_t::OSD(0), "server",
- addr.get_nonce(), 0)
- .then([addr, count] (auto msgr) {
- return seastar::do_with(seastar_pingpong::Server{*msgr},
- [addr, count](auto& server) mutable {
- std::cout << "server listening at " << addr << std::endl;
- // bind the server
- server.msgr.set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
- &server.byte_throttler);
- server.msgr.set_require_authorizer(false);
- server.msgr.set_auth_client(&server.dummy_auth);
- server.msgr.set_auth_server(&server.dummy_auth);
- return server.msgr.bind(entity_addrvec_t{addr})
- .then([&server] {
- return server.msgr.start(&server.dispatcher);
- }).then([&dispatcher=server.dispatcher, count] {
- return dispatcher.on_reply.wait([&dispatcher, count] {
- return dispatcher.count >= count;
- });
- }).finally([&server] {
- std::cout << "server shutting down" << std::endl;
- return server.msgr.shutdown();
- });
- });
+ return seastar::do_with(
+ seastar_pingpong::Server{crimson::net::Messenger::create(
+ entity_name_t::OSD(0), "server", addr.get_nonce())},
+ [addr, count](auto& server) mutable {
+ std::cout << "server listening at " << addr << std::endl;
+ // bind the server
+ server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
+ &server.byte_throttler);
+ server.msgr->set_require_authorizer(false);
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
+ return server.msgr->bind(entity_addrvec_t{addr}
+ ).then([&server] {
+ return server.msgr->start(&server.dispatcher);
+ }).then([&dispatcher=server.dispatcher, count] {
+ return dispatcher.on_reply.wait([&dispatcher, count] {
+ return dispatcher.count >= count;
+ });
+ }).finally([&server] {
+ std::cout << "server shutting down" << std::endl;
+ return server.msgr->shutdown();
});
+ });
} else {
- return crimson::net::Messenger::create(entity_name_t::OSD(1), "client",
- addr.get_nonce(), 0)
- .then([addr, count] (auto msgr) {
- return seastar::do_with(seastar_pingpong::Client{*msgr},
- [addr, count](auto& client) {
- std::cout << "client sending to " << addr << std::endl;
- client.msgr.set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
- &client.byte_throttler);
- client.msgr.set_require_authorizer(false);
- client.msgr.set_auth_client(&client.dummy_auth);
- client.msgr.set_auth_server(&client.dummy_auth);
- return client.msgr.start(&client.dispatcher)
- .then([addr, &client] {
- return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
- }).then([&disp=client.dispatcher, count](crimson::net::ConnectionXRef conn) {
- return seastar::do_until(
- [&disp,count] { return disp.count >= count; },
- [&disp,conn] { return (*conn)->send(make_message<MPing>())
- .then([&] { return disp.on_reply.wait(); });
- });
- }).finally([&client] {
- std::cout << "client shutting down" << std::endl;
- return client.msgr.shutdown();
- });
- });
+ return seastar::do_with(
+ seastar_pingpong::Client{crimson::net::Messenger::create(
+ entity_name_t::OSD(1), "client", addr.get_nonce())},
+ [addr, count](auto& client) {
+ std::cout << "client sending to " << addr << std::endl;
+ client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
+ &client.byte_throttler);
+ client.msgr->set_require_authorizer(false);
+ client.msgr->set_auth_client(&client.dummy_auth);
+ client.msgr->set_auth_server(&client.dummy_auth);
+ return client.msgr->start(&client.dispatcher).then([addr, &client] {
+ return client.msgr->connect(addr, entity_name_t::TYPE_OSD);
+ }).then([&disp=client.dispatcher, count](crimson::net::ConnectionRef conn) {
+ return seastar::do_until(
+ [&disp,count] { return disp.count >= count; },
+ [&disp,conn] {
+ return conn->send(make_message<MPing>()).then([&] {
+ return disp.on_reply.wait();
+ });
+ }
+ );
+ }).finally([&client] {
+ std::cout << "client shutting down" << std::endl;
+ return client.msgr->shutdown();
});
+ });
}
}
{
struct test_state {
struct Server final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Server> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::make_ready_future<>();
- }
seastar::future<> ms_dispatch(crimson::net::Connection* c,
MessageRef m) override {
if (verbose) {
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
- auto&& fut = crimson::net::Messenger::create(name, lname, nonce, 0);
- return fut.then([this, addr](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& server) {
- server.msgr = messenger->get_local_shard();
- server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- server.msgr->set_require_authorizer(false);
- server.msgr->set_auth_client(&server.dummy_auth);
- server.msgr->set_auth_server(&server.dummy_auth);
- }).then([messenger, addr] {
- return messenger->bind(entity_addrvec_t{addr});
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_require_authorizer(false);
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(this);
+ });
}
seastar::future<> shutdown() {
ceph_assert(msgr);
};
struct Client final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Client> {
-
+ : public crimson::net::Dispatcher {
struct PingSession : public seastar::enable_shared_from_this<PingSession> {
unsigned count = 0u;
mono_time connected_time;
unsigned rounds;
std::bernoulli_distribution keepalive_dist;
- crimson::net::Messenger *msgr = nullptr;
+ crimson::net::MessengerRef msgr;
std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
std::map<crimson::net::Connection*, PingSessionRef> sessions;
crimson::auth::DummyAuthClientServer dummy_auth;
return found->second;
}
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::now();
- }
seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
auto session = seastar::make_shared<PingSession>();
auto [i, added] = sessions.emplace(conn.get(), session);
if (session->count == rounds) {
logger().info("{}: finished receiving {} pongs", *c, session->count);
session->finish_time = mono_clock::now();
- return container().invoke_on_all([c](auto &client) {
- auto found = client.pending_conns.find(c);
- ceph_assert(found != client.pending_conns.end());
- found->second.set_value();
- });
- } else {
- return seastar::now();
+ auto found = pending_conns.find(c);
+ ceph_assert(found != pending_conns.end());
+ found->second.set_value();
}
+ return seastar::now();
}
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce) {
- return crimson::net::Messenger::create(name, lname, nonce, 0)
- .then([this](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& client) {
- client.msgr = messenger->get_local_shard();
- client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr->set_auth_client(&client.dummy_auth);
- client.msgr->set_auth_server(&client.dummy_auth);
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start(this);
}
seastar::future<> shutdown() {
return msgr->shutdown();
}
- // Note: currently we don't support foreign dispatch a message because:
- // 1. it is not effecient because each ref-count modification needs
- // a cross-core jump, so it should be discouraged.
- // 2. messenger needs to be modified to hold a wrapper for the sending
- // message because it can be a nested seastar smart ptr or not.
- // 3. in 1:1 mapping OSD, there is no need to do foreign dispatch.
- seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr,
- bool foreign_dispatch) {
-#ifndef CRIMSON_MSGR_SEND_FOREIGN
- ceph_assert(!foreign_dispatch);
-#endif
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
mono_time start_time = mono_clock::now();
- return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
- .then([this, foreign_dispatch, start_time](auto conn) {
- return seastar::futurize_apply([this, conn, foreign_dispatch] {
- if (foreign_dispatch) {
- return do_dispatch_pingpong(&**conn);
- } else {
- // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
- return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
- return client.do_dispatch_pingpong(conn);
- });
- }
- }).finally([this, conn, start_time] {
- return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
- auto session = client.find_session(&**conn);
- std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
- std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
- logger().info("{}: handshake {}, pingpong {}",
- **conn, dur_handshake.count(), dur_pingpong.count());
- });
- });
+ return msgr->connect(peer_addr, entity_name_t::TYPE_OSD
+ ).then([this, start_time](auto conn) {
+ return seastar::futurize_apply([this, conn] {
+ return do_dispatch_pingpong(conn.get());
+ }).finally([this, conn, start_time] {
+ auto session = find_session(conn.get());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ *conn, dur_handshake.count(), dur_pingpong.count());
});
+ });
}
private:
seastar::future<> do_dispatch_pingpong(crimson::net::Connection* conn) {
- return container().invoke_on_all([conn](auto& client) {
- auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
- std::ignore = i;
- ceph_assert(added);
- }).then([this, conn] {
- return seastar::do_with(0u, 0u,
- [this, conn](auto &count_ping, auto &count_keepalive) {
- return seastar::do_until(
- [this, conn, &count_ping, &count_keepalive] {
- bool stop = (count_ping == rounds);
- if (stop) {
- logger().info("{}: finished sending {} pings with {} keepalives",
- *conn, count_ping, count_keepalive);
- }
- return stop;
- },
- [this, conn, &count_ping, &count_keepalive] {
- return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
- if (keepalive_dist(rng)) {
- return conn->keepalive()
- .then([&count_keepalive] {
- count_keepalive += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return conn->send(make_message<MPing>())
- .then([&count_ping] {
- count_ping += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
- }
+ auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
});
- }).then([this, conn] {
- auto found = pending_conns.find(conn);
- return found->second.get_future();
- });
- });
- });
+ } else {
+ return conn->send(make_message<MPing>())
+ .then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ return found->second.get_future();
+ }
+ );
+ });
}
};
};
logger().info("test_echo(rounds={}, keepalive_ratio={}, v2={}):",
rounds, keepalive_ratio, v2);
+ auto server1 = seastar::make_shared<test_state::Server>();
+ auto server2 = seastar::make_shared<test_state::Server>();
+ auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+ auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+ // start servers and clients
+ entity_addr_t addr1;
+ addr1.parse("127.0.0.1:9010", nullptr);
+ entity_addr_t addr2;
+ addr2.parse("127.0.0.1:9011", nullptr);
+ if (v2) {
+ addr1.set_type(entity_addr_t::TYPE_MSGR2);
+ addr2.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr1.set_type(entity_addr_t::TYPE_LEGACY);
+ addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ }
return seastar::when_all_succeed(
- crimson::net::create_sharded<test_state::Server>(),
- crimson::net::create_sharded<test_state::Server>(),
- crimson::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
- crimson::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
- .then([rounds, keepalive_ratio, v2](test_state::Server *server1,
- test_state::Server *server2,
- test_state::Client *client1,
- test_state::Client *client2) {
- // start servers and clients
- entity_addr_t addr1;
- addr1.parse("127.0.0.1:9010", nullptr);
- entity_addr_t addr2;
- addr2.parse("127.0.0.1:9011", nullptr);
- if (v2) {
- addr1.set_type(entity_addr_t::TYPE_MSGR2);
- addr2.set_type(entity_addr_t::TYPE_MSGR2);
- } else {
- addr1.set_type(entity_addr_t::TYPE_LEGACY);
- addr2.set_type(entity_addr_t::TYPE_LEGACY);
- }
- return seastar::when_all_succeed(
- server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
- server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
- client1->init(entity_name_t::OSD(2), "client1", 3),
- client2->init(entity_name_t::OSD(3), "client2", 4))
- // dispatch pingpoing
- .then([client1, client2, server1, server2] {
- return seastar::when_all_succeed(
- // test connecting in parallel, accepting in parallel
-#ifdef CRIMSON_MSGR_SEND_FOREIGN
- // operate the connection reference from a foreign core
- client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
- client2->dispatch_pingpong(server2->msgr->get_myaddr(), true),
-#endif
- // operate the connection reference from a local core
- client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
- client2->dispatch_pingpong(server1->msgr->get_myaddr(), false));
- // shutdown
- }).finally([client1] {
- logger().info("client1 shutdown...");
- return client1->shutdown();
- }).finally([client2] {
- logger().info("client2 shutdown...");
- return client2->shutdown();
- }).finally([server1] {
- logger().info("server1 shutdown...");
- return server1->shutdown();
- }).finally([server2] {
- logger().info("server2 shutdown...");
- return server2->shutdown();
- }).finally([] {
- logger().info("test_echo() done!\n");
- });
- });
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4)
+ // dispatch pingpoing
+ ).then([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel
+ client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+ // shutdown
+ }).finally([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).finally([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).finally([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).finally([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ }).finally([server1, server2, client1, client2] {
+ logger().info("test_echo() done!\n");
+ });
}
static seastar::future<> test_concurrent_dispatch(bool v2)
{
struct test_state {
struct Server final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Server> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
int count = 0;
seastar::promise<> on_second; // satisfied on second dispatch
seastar::promise<> on_done; // satisfied when first dispatch unblocks
switch (++count) {
case 1:
// block on the first request until we reenter with the second
- return on_second.get_future()
- .then([this] {
- return container().invoke_on_all([](Server& server) {
- server.on_done.set_value();
- });
- });
+ return on_second.get_future().then([this] {
+ on_done.set_value();
+ });
case 2:
on_second.set_value();
return seastar::now();
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
- return crimson::net::Messenger::create(name, lname, nonce, 0)
- .then([this, addr](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& server) {
- server.msgr = messenger->get_local_shard();
- server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- server.msgr->set_auth_client(&server.dummy_auth);
- server.msgr->set_auth_server(&server.dummy_auth);
- }).then([messenger, addr] {
- return messenger->bind(entity_addrvec_t{addr});
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
- }
-
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::make_ready_future<>();
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(this);
+ });
}
};
struct Client final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Client> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce) {
- return crimson::net::Messenger::create(name, lname, nonce, 0)
- .then([this](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& client) {
- client.msgr = messenger->get_local_shard();
- client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr->set_auth_client(&client.dummy_auth);
- client.msgr->set_auth_server(&client.dummy_auth);
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
- }
-
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::make_ready_future<>();
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start(this);
}
};
};
logger().info("test_concurrent_dispatch(v2={}):", v2);
+ auto server = seastar::make_shared<test_state::Server>();
+ auto client = seastar::make_shared<test_state::Client>();
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ if (v2) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
+ addr.set_family(AF_INET);
return seastar::when_all_succeed(
- crimson::net::create_sharded<test_state::Server>(),
- crimson::net::create_sharded<test_state::Client>())
- .then([v2](test_state::Server *server,
- test_state::Client *client) {
- entity_addr_t addr;
- addr.parse("127.0.0.1:9010", nullptr);
- if (v2) {
- addr.set_type(entity_addr_t::TYPE_MSGR2);
- } else {
- addr.set_type(entity_addr_t::TYPE_LEGACY);
- }
- addr.set_family(AF_INET);
- return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(4), "server3", 5, addr),
- client->init(entity_name_t::OSD(5), "client3", 6))
- .then([server, client] {
- return client->msgr->connect(server->msgr->get_myaddr(),
- entity_name_t::TYPE_OSD);
- }).then([](crimson::net::ConnectionXRef conn) {
- // send two messages
- return (*conn)->send(make_message<MPing>()).then([conn] {
- return (*conn)->send(make_message<MPing>());
- });
- }).then([server] {
- return server->wait();
- }).finally([client] {
- logger().info("client shutdown...");
- return client->msgr->shutdown();
- }).finally([server] {
- logger().info("server shutdown...");
- return server->msgr->shutdown();
- }).finally([] {
- logger().info("test_concurrent_dispatch() done!\n");
- });
- });
+ server->init(entity_name_t::OSD(4), "server3", 5, addr),
+ client->init(entity_name_t::OSD(5), "client3", 6)
+ ).then([server, client] {
+ return client->msgr->connect(server->msgr->get_myaddr(),
+ entity_name_t::TYPE_OSD);
+ }).then([](crimson::net::ConnectionRef conn) {
+ // send two messages
+ return conn->send(make_message<MPing>()).then([conn] {
+ return conn->send(make_message<MPing>());
+ });
+ }).then([server] {
+ return server->wait();
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->msgr->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->msgr->shutdown();
+ }).finally([server, client] {
+ logger().info("test_concurrent_dispatch() done!\n");
+ });
}
seastar::future<> test_preemptive_shutdown(bool v2) {
struct test_state {
class Server final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Server> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
seastar::future<> ms_dispatch(crimson::net::Connection* c,
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
- return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
- ).then([this, addr](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& server) {
- server.msgr = messenger->get_local_shard();
- server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- server.msgr->set_auth_client(&server.dummy_auth);
- server.msgr->set_auth_server(&server.dummy_auth);
- }).then([messenger, addr] {
- return messenger->bind(entity_addrvec_t{addr});
- }).then([this, messenger] {
- return messenger->start(this);
- });
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(this);
});
}
entity_addr_t get_addr() const {
seastar::future<> shutdown() {
return msgr->shutdown();
}
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::now();
- }
};
class Client final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Client> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
bool stop_send = false;
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce) {
- return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
- ).then([this](crimson::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& client) {
- client.msgr = messenger->get_local_shard();
- client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr->set_auth_client(&client.dummy_auth);
- client.msgr->set_auth_server(&client.dummy_auth);
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
+ msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start(this);
}
seastar::future<> send_pings(const entity_addr_t& addr) {
return msgr->connect(addr, entity_name_t::TYPE_OSD
- ).then([this](crimson::net::ConnectionXRef conn) {
+ ).then([this](crimson::net::ConnectionRef conn) {
// forwarded to stopped_send_promise
(void) seastar::do_until(
[this] { return stop_send; },
- [this, conn = &**conn] {
+ [this, conn] {
return conn->send(make_message<MPing>()).then([] {
return seastar::sleep(0ms);
});
return stopped_send_promise.get_future();
});
}
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::now();
- }
};
};
logger().info("test_preemptive_shutdown(v2={}):", v2);
+ auto server = seastar::make_shared<test_state::Server>();
+ auto client = seastar::make_shared<test_state::Client>();
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ if (v2) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
+ addr.set_family(AF_INET);
return seastar::when_all_succeed(
- crimson::net::create_sharded<test_state::Server>(),
- crimson::net::create_sharded<test_state::Client>()
- ).then([v2](test_state::Server *server,
- test_state::Client *client) {
- entity_addr_t addr;
- addr.parse("127.0.0.1:9010", nullptr);
- if (v2) {
- addr.set_type(entity_addr_t::TYPE_MSGR2);
- } else {
- addr.set_type(entity_addr_t::TYPE_LEGACY);
- }
- addr.set_family(AF_INET);
- return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(6), "server4", 7, addr),
- client->init(entity_name_t::OSD(7), "client4", 8)
- ).then([server, client] {
- return client->send_pings(server->get_addr());
- }).then([] {
- return seastar::sleep(100ms);
- }).then([client] {
- logger().info("client shutdown...");
- return client->shutdown();
- }).finally([server] {
- logger().info("server shutdown...");
- return server->shutdown();
- }).finally([] {
- logger().info("test_preemptive_shutdown() done!\n");
- });
+ server->init(entity_name_t::OSD(6), "server4", 7, addr),
+ client->init(entity_name_t::OSD(7), "client4", 8)
+ ).then([server, client] {
+ return client->send_pings(server->get_addr());
+ }).then([] {
+ return seastar::sleep(100ms);
+ }).then([client] {
+ logger().info("client shutdown...");
+ return client->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->shutdown();
+ }).finally([server, client] {
+ logger().info("test_preemptive_shutdown() done!\n");
});
}
using crimson::net::Dispatcher;
using crimson::net::Interceptor;
using crimson::net::Messenger;
+using crimson::net::MessengerRef;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
using ceph::net::test::cmd_t;
class FailoverSuite : public Dispatcher {
crimson::auth::DummyAuthClientServer dummy_auth;
- Messenger& test_msgr;
+ MessengerRef test_msgr;
const entity_addr_t test_peer_addr;
TestInterceptor interceptor;
private:
seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
- test_msgr.set_default_policy(policy);
- test_msgr.set_auth_client(&dummy_auth);
- test_msgr.set_auth_server(&dummy_auth);
- test_msgr.interceptor = &interceptor;
- return test_msgr.bind(entity_addrvec_t{addr}).then([this] {
- return test_msgr.start(this);
+ test_msgr->set_default_policy(policy);
+ test_msgr->set_auth_client(&dummy_auth);
+ test_msgr->set_auth_server(&dummy_auth);
+ test_msgr->interceptor = &interceptor;
+ return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return test_msgr->start(this);
});
}
// called by FailoverTest
public:
- FailoverSuite(Messenger& test_msgr,
+ FailoverSuite(MessengerRef test_msgr,
entity_addr_t test_peer_addr,
const TestInterceptor& interceptor)
: test_msgr(test_msgr),
interceptor(interceptor) { }
entity_addr_t get_addr() const {
- return test_msgr.get_myaddr();
+ return test_msgr->get_myaddr();
}
seastar::future<> shutdown() {
- return test_msgr.shutdown();
+ return test_msgr->shutdown();
}
void needs_receive() {
SocketPolicy test_policy,
entity_addr_t test_peer_addr,
const TestInterceptor& interceptor) {
- return Messenger::create(entity_name_t::OSD(2), "Test", 2, 0
- ).then([test_addr,
- test_policy,
- test_peer_addr,
- interceptor] (Messenger* test_msgr) {
- auto suite = std::make_unique<FailoverSuite>(
- *test_msgr, test_peer_addr, interceptor);
- return suite->init(test_addr, test_policy
- ).then([suite = std::move(suite)] () mutable {
- return std::move(suite);
- });
+ auto suite = std::make_unique<FailoverSuite>(
+ Messenger::create(entity_name_t::OSD(2), "Test", 2),
+ test_peer_addr, interceptor);
+ return suite->init(test_addr, test_policy
+ ).then([suite = std::move(suite)] () mutable {
+ return std::move(suite);
});
}
public:
seastar::future<> connect_peer() {
logger().info("[Test] connect_peer({})", test_peer_addr);
- return test_msgr.connect(test_peer_addr, entity_name_t::TYPE_OSD
- ).then([this] (auto xconn) {
- auto conn = xconn->release();
+ return test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD
+ ).then([this] (auto conn) {
auto result = interceptor.find_result(conn);
ceph_assert(result != nullptr);
class FailoverTest : public Dispatcher {
crimson::auth::DummyAuthClientServer dummy_auth;
- Messenger& cmd_msgr;
+ MessengerRef cmd_msgr;
ConnectionRef cmd_conn;
const entity_addr_t test_addr;
const entity_addr_t test_peer_addr;
}
seastar::future<> init(entity_addr_t cmd_peer_addr) {
- cmd_msgr.set_default_policy(SocketPolicy::lossy_client(0));
- cmd_msgr.set_auth_client(&dummy_auth);
- cmd_msgr.set_auth_server(&dummy_auth);
- return cmd_msgr.start(this).then([this, cmd_peer_addr] {
+ cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
+ cmd_msgr->set_auth_client(&dummy_auth);
+ cmd_msgr->set_auth_server(&dummy_auth);
+ return cmd_msgr->start(this).then([this, cmd_peer_addr] {
logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
- return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
+ return cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
}).then([this] (auto conn) {
- cmd_conn = conn->release();
+ cmd_conn = conn;
return pingpong();
});
}
public:
- FailoverTest(Messenger& cmd_msgr,
+ FailoverTest(MessengerRef cmd_msgr,
entity_addr_t test_addr,
entity_addr_t test_peer_addr)
: cmd_msgr(cmd_msgr),
return cmd_conn->send(m).then([this] {
return seastar::sleep(200ms);
}).finally([this] {
- return cmd_msgr.shutdown();
+ return cmd_msgr->shutdown();
});
}
static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
- return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0
- ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable {
- test_addr.set_nonce(2);
- cmd_peer_addr.set_nonce(3);
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
- test_peer_addr.set_nonce(4);
- auto test = seastar::make_lw_shared<FailoverTest>(
- *cmd_msgr, test_addr, test_peer_addr);
- return test->init(cmd_peer_addr).then([test] {
- logger().info("CmdCli ready");
- return test;
- });
+ test_addr.set_nonce(2);
+ cmd_peer_addr.set_nonce(3);
+ entity_addr_t test_peer_addr = cmd_peer_addr;
+ test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ test_peer_addr.set_nonce(4);
+ auto test = seastar::make_lw_shared<FailoverTest>(
+ Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+ test_addr, test_peer_addr);
+ return test->init(cmd_peer_addr).then([test] {
+ logger().info("CmdCli ready");
+ return test;
});
}
class FailoverSuitePeer : public Dispatcher {
using cb_t = std::function<seastar::future<>()>;
crimson::auth::DummyAuthClientServer dummy_auth;
- Messenger& peer_msgr;
+ MessengerRef peer_msgr;
cb_t op_callback;
ConnectionRef tracked_conn;
private:
seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
- peer_msgr.set_default_policy(policy);
- peer_msgr.set_auth_client(&dummy_auth);
- peer_msgr.set_auth_server(&dummy_auth);
- return peer_msgr.bind(entity_addrvec_t{addr}).then([this] {
- return peer_msgr.start(this);
+ peer_msgr->set_default_policy(policy);
+ peer_msgr->set_auth_client(&dummy_auth);
+ peer_msgr->set_auth_server(&dummy_auth);
+ return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return peer_msgr->start(this);
});
}
}
public:
- FailoverSuitePeer(Messenger& peer_msgr, cb_t op_callback)
+ FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
: peer_msgr(peer_msgr), op_callback(op_callback) { }
seastar::future<> shutdown() {
- return peer_msgr.shutdown();
+ return peer_msgr->shutdown();
}
seastar::future<> connect_peer(entity_addr_t addr) {
logger().info("[TestPeer] connect_peer({})", addr);
- return peer_msgr.connect(addr, entity_name_t::TYPE_OSD
- ).then([this] (auto xconn) {
- auto new_tracked_conn = xconn->release();
+ return peer_msgr->connect(addr, entity_name_t::TYPE_OSD
+ ).then([this] (auto conn) {
+ auto new_tracked_conn = conn;
if (tracked_conn) {
if (tracked_conn->is_closed()) {
ceph_assert(tracked_conn != new_tracked_conn);
static seastar::future<std::unique_ptr<FailoverSuitePeer>>
create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
- return Messenger::create(entity_name_t::OSD(4), "TestPeer", 4, 0
- ).then([addr, policy, op_callback] (Messenger* peer_msgr) {
- auto suite = std::make_unique<FailoverSuitePeer>(*peer_msgr, op_callback);
- return suite->init(addr, policy
- ).then([suite = std::move(suite)] () mutable {
- return std::move(suite);
- });
+ auto suite = std::make_unique<FailoverSuitePeer>(
+ Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
+ return suite->init(addr, policy
+ ).then([suite = std::move(suite)] () mutable {
+ return std::move(suite);
});
}
};
class FailoverTestPeer : public Dispatcher {
crimson::auth::DummyAuthClientServer dummy_auth;
- Messenger& cmd_msgr;
+ MessengerRef cmd_msgr;
ConnectionRef cmd_conn;
const entity_addr_t test_peer_addr;
std::unique_ptr<FailoverSuitePeer> test_suite;
if (cmd == cmd_t::shutdown) {
logger().info("CmdSrv shutdown...");
// forwarded to FailoverTestPeer::wait()
- (void) cmd_msgr.shutdown();
+ (void) cmd_msgr->shutdown();
return seastar::now();
}
return handle_cmd(cmd, m_cmd).then([c] {
}
seastar::future<> init(entity_addr_t cmd_peer_addr) {
- cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0));
- cmd_msgr.set_auth_client(&dummy_auth);
- cmd_msgr.set_auth_server(&dummy_auth);
- return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
- return cmd_msgr.start(this);
+ cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ cmd_msgr->set_auth_client(&dummy_auth);
+ cmd_msgr->set_auth_server(&dummy_auth);
+ return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+ return cmd_msgr->start(this);
});
}
public:
- FailoverTestPeer(Messenger& cmd_msgr,
+ FailoverTestPeer(MessengerRef cmd_msgr,
entity_addr_t test_peer_addr)
: cmd_msgr(cmd_msgr),
test_peer_addr(test_peer_addr) { }
seastar::future<> wait() {
- return cmd_msgr.wait();
+ return cmd_msgr->wait();
}
static seastar::future<std::unique_ptr<FailoverTestPeer>>
create(entity_addr_t cmd_peer_addr) {
- return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0
- ).then([cmd_peer_addr] (Messenger* cmd_msgr) {
- // suite bind to cmd_peer_addr, with port + 1
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
- auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr, test_peer_addr);
- return test_peer->init(cmd_peer_addr
- ).then([test_peer = std::move(test_peer)] () mutable {
- logger().info("CmdSrv ready");
- return std::move(test_peer);
- });
+ // suite bind to cmd_peer_addr, with port + 1
+ entity_addr_t test_peer_addr = cmd_peer_addr;
+ test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ auto test_peer = std::make_unique<FailoverTestPeer>(
+ Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+ return test_peer->init(cmd_peer_addr
+ ).then([test_peer = std::move(test_peer)] () mutable {
+ logger().info("CmdSrv ready");
+ return std::move(test_peer);
});
}
};
}).then([] {
return crimson::common::sharded_perf_coll().start();
}).then([] {
- return crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
- seastar::engine().cpu_id())
- .then([] (crimson::net::Messenger *msgr) {
- auto& conf = crimson::common::local_conf();
- if (conf->ms_crc_data) {
- msgr->set_crc_data();
- }
- if (conf->ms_crc_header) {
- msgr->set_crc_header();
- }
- msgr->set_require_authorizer(false);
- return seastar::do_with(MonClient{*msgr, dummy_handler},
- [msgr](auto& monc) {
- return msgr->start(&monc).then([&monc] {
- return seastar::with_timeout(
- seastar::lowres_clock::now() + std::chrono::seconds{10},
- monc.start());
- }).then([&monc] {
- return monc.stop();
- });
- }).finally([msgr] {
- return msgr->shutdown();
+ auto msgr = crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0);
+ auto& conf = crimson::common::local_conf();
+ if (conf->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (conf->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ msgr->set_require_authorizer(false);
+ return seastar::do_with(MonClient{*msgr, dummy_handler},
+ [msgr](auto& monc) {
+ return msgr->start(&monc).then([&monc] {
+ return seastar::with_timeout(
+ seastar::lowres_clock::now() + std::chrono::seconds{10},
+ monc.start());
+ }).then([&monc] {
+ return monc.stop();
});
+ }).finally([msgr] {
+ return msgr->shutdown();
});
}).finally([] {
return crimson::common::sharded_perf_coll().stop().then([] {
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/semaphore.hh>
+#include <seastar/core/smp.hh>
#include "common/ceph_time.h"
#include "messages/MOSDOp.h"
return crimson::get_logger(ceph_subsys_ms);
}
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ // seems we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [=] {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...).then([sharded_obj]() {
+ seastar::engine().at_exit([sharded_obj]() {
+ return sharded_obj->stop().finally([sharded_obj] {});
+ });
+ return sharded_obj.get();
+ });
+ }).then([] (seastar::sharded<T> *ptr_shard) {
+ // return the pointer valid for the caller CPU
+ return &ptr_shard->local();
+ });
+}
+
enum class perf_mode_t {
both,
client,
const server_config& server_conf)
{
struct test_state {
+ struct Server;
+ using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
+
struct Server final
- : public crimson::net::Dispatcher,
- public seastar::peering_sharded_service<Server> {
- crimson::net::Messenger *msgr = nullptr;
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- const seastar::shard_id sid;
const seastar::shard_id msgr_sid;
std::string lname;
unsigned msg_len;
bufferlist msg_data;
- Server(unsigned msgr_core, unsigned msg_len)
- : sid{seastar::engine().cpu_id()},
- msgr_sid{msgr_core},
+ Server(unsigned msg_len)
+ : msgr_sid{seastar::engine().cpu_id()},
msg_len{msg_len} {
lname = "server#";
- lname += std::to_string(sid);
+ lname += std::to_string(msgr_sid);
msg_data.append_zero(msg_len);
}
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::make_ready_future<>();
- }
seastar::future<> ms_dispatch(crimson::net::Connection* c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
}
seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
- return container().invoke_on(msgr_sid, [v1_crc_enabled, addr] (auto& server) {
+ return seastar::smp::submit_to(msgr_sid, [v1_crc_enabled, addr, this] {
// server msgr is always with nonce 0
- auto&& fut = crimson::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid);
- return fut.then(
- [&server, addr, v1_crc_enabled](crimson::net::Messenger *messenger) {
- return server.container().invoke_on_all(
- [messenger, v1_crc_enabled](auto& server) {
- server.msgr = messenger->get_local_shard();
- server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- server.msgr->set_auth_client(&server.dummy_auth);
- server.msgr->set_auth_server(&server.dummy_auth);
- if (v1_crc_enabled) {
- server.msgr->set_crc_header();
- server.msgr->set_crc_data();
- }
- }).then([messenger, addr] {
- return messenger->bind(entity_addrvec_t{addr});
- }).then([&server, messenger] {
- return messenger->start(&server);
- });
- });
+ msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ if (v1_crc_enabled) {
+ msgr->set_crc_header();
+ msgr->set_crc_data();
+ }
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(this);
+ });
});
}
seastar::future<> shutdown() {
logger().info("{} shutdown...", lname);
- return container().invoke_on(msgr_sid, [] (auto& server) {
- ceph_assert(server.msgr);
- return server.msgr->shutdown();
+ return seastar::smp::submit_to(msgr_sid, [this] {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ });
+ }
+ seastar::future<> wait() {
+ return seastar::smp::submit_to(msgr_sid, [this] {
+ ceph_assert(msgr);
+ return msgr->wait();
+ });
+ }
+
+ static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
+ return seastar::smp::submit_to(msgr_sid, [msg_len] {
+ return seastar::make_foreign(std::make_unique<Server>(msg_len));
});
}
};
std::string lname;
const unsigned jobs;
- crimson::net::Messenger *msgr = nullptr;
+ crimson::net::MessengerRef msgr;
const unsigned msg_len;
bufferlist msg_data;
const unsigned nr_depth;
return nr_depth - depth.current();
}
- Dispatcher* get_local_shard() override {
- return &(container().local());
- }
- seastar::future<> stop() {
- return seastar::now();
- }
seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
conn_stats.connected_time = mono_clock::now();
return seastar::now();
seastar::future<> init(bool v1_crc_enabled) {
return container().invoke_on_all([v1_crc_enabled] (auto& client) {
if (client.is_active()) {
- return crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid)
- .then([&client, v1_crc_enabled] (crimson::net::Messenger *messenger) {
- client.msgr = messenger;
- client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr->set_require_authorizer(false);
- client.msgr->set_auth_client(&client.dummy_auth);
- client.msgr->set_auth_server(&client.dummy_auth);
- if (v1_crc_enabled) {
- client.msgr->set_crc_header();
- client.msgr->set_crc_data();
- }
- return client.msgr->start(&client);
- });
+ client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
+ client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ client.msgr->set_require_authorizer(false);
+ client.msgr->set_auth_client(&client.dummy_auth);
+ client.msgr->set_auth_server(&client.dummy_auth);
+ if (v1_crc_enabled) {
+ client.msgr->set_crc_header();
+ client.msgr->set_crc_data();
+ }
+ return client.msgr->start(&client);
}
return seastar::now();
});
// start clients in active cores (#1 ~ #jobs)
if (client.is_active()) {
mono_time start_time = mono_clock::now();
- return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
- .then([&client] (auto conn) {
- client.active_conn = conn->release();
+ return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD
+ ).then([&client] (auto conn) {
+ client.active_conn = conn;
// make sure handshake won't hurt the performance
return seastar::sleep(1s);
}).then([&client, start_time] {
};
return seastar::when_all_succeed(
- crimson::net::create_sharded<test_state::Server>(server_conf.core, server_conf.block_size),
- crimson::net::create_sharded<test_state::Client>(client_conf.jobs,
- client_conf.block_size, client_conf.depth))
- .then([=](test_state::Server *server,
- test_state::Client *client) {
- if (mode == perf_mode_t::both) {
- logger().info("\nperf settings:\n {}\n {}\n",
- client_conf.str(), server_conf.str());
- ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
- ceph_assert(client_conf.jobs > 0);
- ceph_assert(seastar::smp::count >= 1+server_conf.core);
- ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
- return seastar::when_all_succeed(
- server->init(server_conf.v1_crc_enabled, server_conf.addr),
- client->init(client_conf.v1_crc_enabled))
- .then([client, addr = client_conf.server_addr] {
- return client->connect_wait_verify(addr);
- }).then([client, ramptime = client_conf.ramptime,
- msgtime = client_conf.msgtime] {
- return client->dispatch_with_timer(ramptime, msgtime);
- }).finally([client] {
- return client->shutdown();
- }).finally([server] {
- return server->shutdown();
- });
- } else if (mode == perf_mode_t::client) {
- logger().info("\nperf settings:\n {}\n", client_conf.str());
- ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
- ceph_assert(client_conf.jobs > 0);
- return client->init(client_conf.v1_crc_enabled)
- .then([client, addr = client_conf.server_addr] {
- return client->connect_wait_verify(addr);
- }).then([client, ramptime = client_conf.ramptime,
- msgtime = client_conf.msgtime] {
- return client->dispatch_with_timer(ramptime, msgtime);
- }).finally([client] {
- return client->shutdown();
- });
- } else { // mode == perf_mode_t::server
- ceph_assert(seastar::smp::count >= 1+server_conf.core);
- logger().info("\nperf settings:\n {}\n", server_conf.str());
- return server->init(server_conf.v1_crc_enabled, server_conf.addr)
- // dispatch ops
- .then([server] {
- return server->msgr->wait();
- // shutdown
- }).finally([server] {
- return server->shutdown();
- });
- }
- });
+ test_state::Server::create(server_conf.core, server_conf.block_size),
+ create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth)
+ ).then([=](test_state::ServerFRef fp_server,
+ test_state::Client *client) {
+ test_state::Server* server = fp_server.get();
+ if (mode == perf_mode_t::both) {
+ logger().info("\nperf settings:\n {}\n {}\n",
+ client_conf.str(), server_conf.str());
+ ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+ ceph_assert(client_conf.jobs > 0);
+ ceph_assert(seastar::smp::count >= 1+server_conf.core);
+ ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
+ return seastar::when_all_succeed(
+ server->init(server_conf.v1_crc_enabled, server_conf.addr),
+ client->init(client_conf.v1_crc_enabled)
+ ).then([client, addr = client_conf.server_addr] {
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
+ }).finally([client] {
+ return client->shutdown();
+ }).finally([server, fp_server = std::move(fp_server)] () mutable {
+ return server->shutdown().then([cleanup = std::move(fp_server)] {});
+ });
+ } else if (mode == perf_mode_t::client) {
+ logger().info("\nperf settings:\n {}\n", client_conf.str());
+ ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+ ceph_assert(client_conf.jobs > 0);
+ return client->init(client_conf.v1_crc_enabled
+ ).then([client, addr = client_conf.server_addr] {
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
+ }).finally([client] {
+ return client->shutdown();
+ });
+ } else { // mode == perf_mode_t::server
+ ceph_assert(seastar::smp::count >= 1+server_conf.core);
+ logger().info("\nperf settings:\n {}\n", server_conf.str());
+ return server->init(server_conf.v1_crc_enabled, server_conf.addr
+ // dispatch ops
+ ).then([server] {
+ return server->wait();
+ // shutdown
+ }).finally([server, fp_server = std::move(fp_server)] () mutable {
+ return server->shutdown().then([cleanup = std::move(fp_server)] {});
+ });
+ }
+ });
}
}