From: Haomai Wang Date: Wed, 16 Nov 2016 15:27:31 +0000 (+0800) Subject: msgr: allow different public and cluster msgr type X-Git-Tag: v12.0.0~17^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2850132f3e592cb2551fcedc94f9fe8f53973f17;p=ceph.git msgr: allow different public and cluster msgr type Signed-off-by: Haomai Wang --- diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 31606130378c..489cb4137a1e 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -651,7 +651,8 @@ int main(int argc, const char **argv) // bind int rank = monmap.get_rank(g_conf->name.get_id()); - Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, + std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type; + Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::MON(rank), "mon", 0, Messenger::HAS_MANY_CONNECTIONS); if (!msgr) diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 49f7b2e62505..6d796056e165 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -440,29 +440,31 @@ int main(int argc, const char **argv) << TEXT_NORMAL << dendl; } - Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type, + std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type; + std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->ms_type : g_conf->ms_cluster_type; + Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "client", getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); - Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "cluster", getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); - Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "hb_back_client", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "hb_front_client", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "hb_back_server", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "hb_front_server", getpid(), Messenger::HEARTBEAT); - Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "ms_objecter", getpid(), 0); if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index f2e820924f97..826d2af92da9 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -175,7 +175,9 @@ OPTION(heartbeat_file, OPT_STR, "") OPTION(heartbeat_inject_failure, OPT_INT, 0) // force an unhealthy heartbeat for N seconds OPTION(perf, OPT_BOOL, true) // enable internal perf counters -OPTION(ms_type, OPT_STR, "async") // messenger backend +OPTION(ms_type, OPT_STR, "async+posix") // messenger backend +OPTION(ms_public_type, OPT_STR, "") // messenger backend +OPTION(ms_cluster_type, OPT_STR, "") // messenger backend OPTION(ms_tcp_nodelay, OPT_BOOL, true) OPTION(ms_tcp_rcvbuf, OPT_INT, 0) OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy @@ -212,7 +214,6 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_transport_type, OPT_STR, "posix") OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index bc6509914ba5..ff9812e32618 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -14,9 +14,10 @@ Messenger *Messenger::create_client_messenger(CephContext *cct, string lname) { + std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->ms_type : cct->_conf->ms_public_type; uint64_t nonce = 0; get_random_bytes((char*)&nonce, sizeof(nonce)); - return Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(), + return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(), std::move(lname), nonce, 0); } @@ -36,8 +37,8 @@ Messenger *Messenger::create(CephContext *cct, const string &type, } if (r == 0 || type == "simple") return new SimpleMessenger(cct, name, std::move(lname), nonce); - else if (r == 1 || type == "async") - return new AsyncMessenger(cct, name, std::move(lname), nonce); + else if (r == 1 || type.find("async") != std::string::npos) + return new AsyncMessenger(cct, name, type, std::move(lname), nonce); #ifdef HAVE_XIO else if ((type == "xio") && cct->check_experimental_feature_enabled("ms-type-xio")) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 2b2eb2c0c9ef..506b71be471f 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -213,9 +213,13 @@ void Processor::stop() struct StackSingleton { + CephContext *cct; std::shared_ptr stack; - StackSingleton(CephContext *c) { - stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); + + StackSingleton(CephContext *c): cct(c) {} + void ready(std::string &type) { + if (!stack) + stack = NetworkStack::create(cct, type); } ~StackSingleton() { stack->stop(); @@ -239,7 +243,7 @@ class C_handle_reap : public EventCallback { */ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce) + const std::string &type, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), @@ -247,9 +251,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), cluster_protocol(0), stopped(true) { + std::string transport_type = "posix"; + if (type.find("rdma") != std::string::npos) + transport_type = "rdma"; + else if (type.find("dpdk") != std::string::npos) + transport_type = "dpdk"; + ceph_spin_init(&global_seq_lock); StackSingleton *single; - cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack"); + cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack::"+transport_type); + single->ready(transport_type); stack = single->stack.get(); stack->start(); local_worker = stack->get_worker(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 384e4465a7bd..771dfdbe8a7b 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -82,7 +82,7 @@ public: * _nonce A unique ID to use for this AsyncMessenger. It should not * be a value that will be repeated if the daemon restarts. */ - AsyncMessenger(CephContext *cct, entity_name_t name, + AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, string mname, uint64_t _nonce); /** @@ -225,6 +225,8 @@ private: // the worker run messenger's cron jobs Worker *local_worker; + std::string ms_type; + /// overall lock used for AsyncMessenger data structures Mutex lock; // AsyncMessenger stuff diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 5999b0d77e8f..0ce65ed23b2b 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -99,14 +99,14 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } -int EventCenter::init(int n, unsigned i) +int EventCenter::init(int n, unsigned i, std::string &t) { // can't init multi times assert(nevent == 0); idx = i; - if (cct->_conf->ms_async_transport_type == "dpdk") { + if (t == "dpdk") { #ifdef HAVE_DPDK driver = new DPDKDriver(cct); #endif diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 5b210efe0684..f57a1cf49495 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -190,7 +190,7 @@ class EventCenter { ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent, unsigned idx); + int init(int nevent, unsigned idx, std::string &t); void set_owner(); pthread_t get_owner() const { return owner; } unsigned get_id() const { return idx; } diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 26d119b999cd..625e472d808d 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -107,7 +107,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa for (unsigned i = 0; i < num_workers; ++i) { Worker *w = create_worker(cct, type, i); - w->center.init(InitEventNumber, i); + w->center.init(InitEventNumber, i, type); workers.push_back(w); } cct->register_fork_watcher(this); diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc index af39f79c47c7..585077c764b7 100644 --- a/src/test/msgr/test_async_networkstack.cc +++ b/src/test/msgr/test_async_networkstack.cc @@ -41,11 +41,11 @@ class NetworkWorkerTest : public ::testing::TestWithParam { virtual void SetUp() { cerr << __func__ << " start set up " << GetParam() << std::endl; if (strncmp(GetParam(), "dpdk", 4)) { - g_ceph_context->_conf->set_val("ms_async_transport_type", "posix", false, false); + g_ceph_context->_conf->set_val("ms_type", "async+posix", false, false); addr = "127.0.0.1:15000"; port_addr = "127.0.0.1:15001"; } else { - g_ceph_context->_conf->set_val("ms_async_transport_type", "dpdk", false, false); + g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false, false); g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false, false); g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false, false); g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false, false);