// bind
int rank = monmap.get_rank(g_conf->name.get_id());
- Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
+ std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
+ Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MON(rank), "mon",
0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
<< TEXT_NORMAL << dendl;
}
- Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
+ std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
+ std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->ms_type : g_conf->ms_cluster_type;
+ Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "client",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
- Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "cluster",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
- Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "hb_back_client",
getpid(), Messenger::HEARTBEAT);
- Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "hb_front_client",
getpid(), Messenger::HEARTBEAT);
- Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "hb_back_server",
getpid(), Messenger::HEARTBEAT);
- Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "hb_front_server",
getpid(), Messenger::HEARTBEAT);
- Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
+ Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
OPTION(heartbeat_inject_failure, OPT_INT, 0) // force an unhealthy heartbeat for N seconds
OPTION(perf, OPT_BOOL, true) // enable internal perf counters
-OPTION(ms_type, OPT_STR, "async") // messenger backend
+OPTION(ms_type, OPT_STR, "async+posix") // messenger backend
+OPTION(ms_public_type, OPT_STR, "") // messenger backend
+OPTION(ms_cluster_type, OPT_STR, "") // messenger backend
OPTION(ms_tcp_nodelay, OPT_BOOL, true)
OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
-OPTION(ms_async_transport_type, OPT_STR, "posix")
OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init
OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger
OPTION(ms_async_set_affinity, OPT_BOOL, true)
Messenger *Messenger::create_client_messenger(CephContext *cct, string lname)
{
+ std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->ms_type : cct->_conf->ms_public_type;
uint64_t nonce = 0;
get_random_bytes((char*)&nonce, sizeof(nonce));
- return Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(),
+ return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(),
std::move(lname), nonce, 0);
}
}
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, std::move(lname), nonce);
- else if (r == 1 || type == "async")
- return new AsyncMessenger(cct, name, std::move(lname), nonce);
+ else if (r == 1 || type.find("async") != std::string::npos)
+ return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
struct StackSingleton {
+ CephContext *cct;
std::shared_ptr<NetworkStack> stack;
- StackSingleton(CephContext *c) {
- stack = NetworkStack::create(c, c->_conf->ms_async_transport_type);
+
+ StackSingleton(CephContext *c): cct(c) {}
+ void ready(std::string &type) {
+ if (!stack)
+ stack = NetworkStack::create(cct, type);
}
~StackSingleton() {
stack->stop();
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce)
+ const std::string &type, string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
+ std::string transport_type = "posix";
+ if (type.find("rdma") != std::string::npos)
+ transport_type = "rdma";
+ else if (type.find("dpdk") != std::string::npos)
+ transport_type = "dpdk";
+
ceph_spin_init(&global_seq_lock);
StackSingleton *single;
- cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack");
+ cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
+ single->ready(transport_type);
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();
* _nonce A unique ID to use for this AsyncMessenger. It should not
* be a value that will be repeated if the daemon restarts.
*/
- AsyncMessenger(CephContext *cct, entity_name_t name,
+ AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
string mname, uint64_t _nonce);
/**
// the worker run messenger's cron jobs
Worker *local_worker;
+ std::string ms_type;
+
/// overall lock used for AsyncMessenger data structures
Mutex lock;
// AsyncMessenger stuff
<< " time_id=" << time_event_next_id << ").";
}
-int EventCenter::init(int n, unsigned i)
+int EventCenter::init(int n, unsigned i, std::string &t)
{
// can't init multi times
assert(nevent == 0);
idx = i;
- if (cct->_conf->ms_async_transport_type == "dpdk") {
+ if (t == "dpdk") {
#ifdef HAVE_DPDK
driver = new DPDKDriver(cct);
#endif
~EventCenter();
ostream& _event_prefix(std::ostream *_dout);
- int init(int nevent, unsigned idx);
+ int init(int nevent, unsigned idx, std::string &t);
void set_owner();
pthread_t get_owner() const { return owner; }
unsigned get_id() const { return idx; }
for (unsigned i = 0; i < num_workers; ++i) {
Worker *w = create_worker(cct, type, i);
- w->center.init(InitEventNumber, i);
+ w->center.init(InitEventNumber, i, type);
workers.push_back(w);
}
cct->register_fork_watcher(this);
virtual void SetUp() {
cerr << __func__ << " start set up " << GetParam() << std::endl;
if (strncmp(GetParam(), "dpdk", 4)) {
- g_ceph_context->_conf->set_val("ms_async_transport_type", "posix", false, false);
+ g_ceph_context->_conf->set_val("ms_type", "async+posix", false, false);
addr = "127.0.0.1:15000";
port_addr = "127.0.0.1:15001";
} else {
- g_ceph_context->_conf->set_val("ms_async_transport_type", "dpdk", false, false);
+ g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false, false);
g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false, false);
g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false, false);
g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false, false);