seastar::future<Messenger*>
Messenger::create(const entity_name_t& name,
const std::string& lname,
- const uint64_t nonce)
+ const uint64_t nonce,
+ const int master_sid)
{
- return create_sharded<SocketMessenger>(name, lname, nonce)
+ return create_sharded<SocketMessenger>(name, lname, nonce, master_sid)
.then([](Messenger *msgr) {
return msgr;
});
virtual void print(ostream& out) const = 0;
static seastar::future<Messenger*>
- create(const entity_name_t& name, const std::string& lname, const uint64_t nonce);
+ create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const int master_sid=-1);
};
inline ostream& operator<<(ostream& out, const Messenger& msgr) {
SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce)
+ uint32_t nonce,
+ int master_sid)
: Messenger{myname},
+ master_sid{master_sid},
sid{seastar::engine().cpu_id()},
logic_name{logic_name},
nonce{nonce}
seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
{
ceph_assert(addr.get_family() == AF_INET);
+ if (master_sid >= 0) {
+ return master_sid;
+ }
std::size_t seed = 0;
boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
//boost::hash_combine(seed, addr.u.sin.sin_port);
void SocketMessenger::register_conn(SocketConnectionRef conn)
{
+ if (master_sid >= 0) {
+ ceph_assert(static_cast<int>(sid) == master_sid);
+ }
auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
std::ignore = i;
ceph_assert(added);
using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+ const int master_sid;
const seastar::shard_id sid;
seastar::promise<> shutdown_promise;
const entity_type_t& peer_type);
seastar::future<> do_shutdown();
// conn sharding options:
- // 1. Simplest: sharded by ip only
- // 2. Balanced: sharded by ip + port + nonce,
+ // 0. Compatible (master_sid >= 0): place all connections to one master shard
+ // 1. Simplest (master_sid < 0): sharded by ip only
+ // 2. Balanced (not implemented): sharded by ip + port + nonce,
// but, need to move SocketConnection between cores.
seastar::shard_id locate_shard(const entity_addr_t& addr);
public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce);
+ uint32_t nonce,
+ int master_sid);
seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;