Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MDS(-1), "mds",
- getpid());
+ getpid(), 0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
// bind
int rank = monmap.get_rank(g_conf->name.get_id());
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
- entity_name_t::MON(rank),
- "mon",
- 0);
+ entity_name_t::MON(rank), "mon",
+ 0, 0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
- getpid());
+ getpid(), 0,
+ Messenger::HAS_HEAVY_TRAFFIC |
+ Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
- getpid(), CEPH_FEATURES_ALL);
+ getpid(), CEPH_FEATURES_ALL,
+ Messenger::HAS_HEAVY_TRAFFIC |
+ Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
- getpid());
+ getpid(), 0, Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
- getpid());
+ getpid(), 0, Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_server",
- getpid());
+ getpid(), 0, Messenger::HEARTBEAT);
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid());
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
- uint64_t nonce, uint64_t features)
+ uint64_t nonce, uint64_t features, uint64_t cflags)
{
int r = -1;
if (type == "random") {
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
- return new XioMessenger(cct, name, lname, nonce, features);
+ return new XioMessenger(cct, name, lname, nonce, features, cflags);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
int socket_priority;
public:
+ /**
+ * Various Messenger conditional config/type flags to allow
+ * different "transport" Messengers to tune themselves
+ */
+ static const int HAS_HEAVY_TRAFFIC = 0x0001;
+ static const int HAS_MANY_CONNECTIONS = 0x0002;
+ static const int HEARTBEAT = 0x0004;
+
/**
* The CephContext this Messenger uses. Many other components initialize themselves
* from this value.
* @param lname logical name of the messenger in this process (e.g., "client")
* @param nonce nonce value to uniquely identify this instance on the current host
* @param features bits for the local connection
+ * @param cflags general set of flags to configure transport resources
*/
static Messenger *create(CephContext *cct,
const string &type,
entity_name_t name,
string lname,
uint64_t nonce,
- uint64_t features = 0);
+ uint64_t features = 0,
+ uint64_t cflags = 0);
/**
* create a new messenger
XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features,
- DispatchStrategy *ds)
+ uint64_t cflags, DispatchStrategy *ds)
: SimplePolicyMessenger(cct, name, mname, _nonce),
XioInit(cct),
nsessions(0),
shutdown_called(false),
- portals(this, get_nportals(), get_nconns_per_portal()),
+ portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
dispatch_strategy(ds),
loop_con(new XioLoopbackConnection(this)),
special_handling(0),
ldout(cct,2) << "Create msgr: " << this << " instance: "
<< nInstances.read() << " type: " << name.type_str()
- << " subtype: " << mname << " nportals: " << get_nportals()
- << " nconns_per_portal: " << get_nconns_per_portal() << " features: "
+ << " subtype: " << mname << " nportals: " << get_nportals(cflags)
+ << " nconns_per_portal: " << get_nconns_per_portal(cflags) << " features: "
<< features << dendl;
} /* ctor */
XMSG_MEMPOOL_QUANTUM, 0);
}
-int XioMessenger::get_nconns_per_portal()
+int XioMessenger::get_nconns_per_portal(uint64_t cflags)
{
- return max(cct->_conf->xio_max_conns_per_portal, 32);
+ const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
+ int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
+
+ if (cflags & Messenger::HAS_MANY_CONNECTIONS)
+ nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+ else if (cflags & Messenger::HEARTBEAT)
+ nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+
+ return nconns;
}
-int XioMessenger::get_nportals()
+int XioMessenger::get_nportals(uint64_t cflags)
{
- return max(cct->_conf->xio_portal_threads, 1);
+ int nportals = 1;
+
+ if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
+ nportals = max(cct->_conf->xio_portal_threads, 1);
+
+ return nportals;
}
void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce, uint64_t features,
+ uint64_t cflags = 0,
DispatchStrategy* ds = new QueueStrategy(1));
virtual ~XioMessenger();
void learned_addr(const entity_addr_t& peer_addr_for_me);
private:
- int get_nconns_per_portal();
- int get_nportals();
+ int get_nconns_per_portal(uint64_t cflags);
+ int get_nportals(uint64_t cflags);
protected:
virtual void ready()
messenger = new XioMessenger(g_ceph_context,
entity_name_t::MON(-1),
"xio_client",
- 0 /* nonce */, XIO_ALL_FEATURES,
+ 0 /* nonce */, XIO_ALL_FEATURES, 0 /* cflags */,
dstrategy);
// enable timing prints
messenger = new XioMessenger(g_ceph_context,
entity_name_t::MON(-1),
"xio_server",
- 0 /* nonce */, XIO_ALL_FEATURES,
+ 0 /* nonce */, XIO_ALL_FEATURES, 0 /* cflags */,
dstrategy);
static_cast<XioMessenger*>(messenger)->set_magic(