]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: allow different public and cluster msgr type
authorHaomai Wang <haomai@xsky.com>
Wed, 16 Nov 2016 15:27:31 +0000 (23:27 +0800)
committerHaomai Wang <haomai@xsky.com>
Thu, 26 Jan 2017 05:58:42 +0000 (13:58 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/ceph_mon.cc
src/ceph_osd.cc
src/common/config_opts.h
src/msg/Messenger.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc
src/msg/async/Event.h
src/msg/async/Stack.cc
src/test/msgr/test_async_networkstack.cc

index 31606130378c2418444b9be5316e8f4949f148df..489cb4137a1e67409afaa5453ebb0d7b383e6961 100644 (file)
@@ -651,7 +651,8 @@ int main(int argc, const char **argv)
 
   // bind
   int rank = monmap.get_rank(g_conf->name.get_id());
-  Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
+  std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
+  Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
                                      entity_name_t::MON(rank), "mon",
                                      0, Messenger::HAS_MANY_CONNECTIONS);
   if (!msgr)
index 49f7b2e62505f814924cdc433595dd48952fef05..6d796056e1657dc4252854d14f01495bb7e5a283 100644 (file)
@@ -440,29 +440,31 @@ int main(int argc, const char **argv)
         << TEXT_NORMAL << dendl;
   }
 
-  Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
+  std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
+  std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->ms_type : g_conf->ms_cluster_type;
+  Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type,
                                           entity_name_t::OSD(whoami), "client",
                                           getpid(),
                                           Messenger::HAS_HEAVY_TRAFFIC |
                                           Messenger::HAS_MANY_CONNECTIONS);
-  Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type,
                                            entity_name_t::OSD(whoami), "cluster",
                                            getpid(),
                                            Messenger::HAS_HEAVY_TRAFFIC |
                                            Messenger::HAS_MANY_CONNECTIONS);
-  Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msgr_type,
                                             entity_name_t::OSD(whoami), "hb_back_client",
                                             getpid(), Messenger::HEARTBEAT);
-  Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msgr_type,
                                             entity_name_t::OSD(whoami), "hb_front_client",
                                             getpid(), Messenger::HEARTBEAT);
-  Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msgr_type,
                                                   entity_name_t::OSD(whoami), "hb_back_server",
                                                   getpid(), Messenger::HEARTBEAT);
-  Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msgr_type,
                                                    entity_name_t::OSD(whoami), "hb_front_server",
                                                    getpid(), Messenger::HEARTBEAT);
-  Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
+  Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msgr_type,
                                             entity_name_t::OSD(whoami), "ms_objecter",
                                             getpid(), 0);
   if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
index f2e820924f9799a00d2602151125b2d5530a380e..826d2af92da90edc9eeb7c0c775e2429a7b3f9f7 100644 (file)
@@ -175,7 +175,9 @@ OPTION(heartbeat_file, OPT_STR, "")
 OPTION(heartbeat_inject_failure, OPT_INT, 0)    // force an unhealthy heartbeat for N seconds
 OPTION(perf, OPT_BOOL, true)       // enable internal perf counters
 
-OPTION(ms_type, OPT_STR, "async")   // messenger backend
+OPTION(ms_type, OPT_STR, "async+posix")   // messenger backend
+OPTION(ms_public_type, OPT_STR, "")   // messenger backend
+OPTION(ms_cluster_type, OPT_STR, "")   // messenger backend
 OPTION(ms_tcp_nodelay, OPT_BOOL, true)
 OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
 OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
@@ -212,7 +214,6 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
 OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0)   // seconds
 OPTION(ms_dump_on_send, OPT_BOOL, false)           // hexdump msg to log on send
 OPTION(ms_dump_corrupt_message_level, OPT_INT, 1)  // debug level to hexdump undecodeable messages at
-OPTION(ms_async_transport_type, OPT_STR, "posix")
 OPTION(ms_async_op_threads, OPT_U64, 3)            // number of worker processing threads for async messenger created on init
 OPTION(ms_async_max_op_threads, OPT_U64, 5)        // max number of worker processing threads for async messenger
 OPTION(ms_async_set_affinity, OPT_BOOL, true)
index bc6509914ba5a1ad3f4c870b83abb3e881d4e1b9..ff9812e32618f113dd11184b4d1b9b970d43836e 100644 (file)
 
 Messenger *Messenger::create_client_messenger(CephContext *cct, string lname)
 {
+  std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->ms_type : cct->_conf->ms_public_type;
   uint64_t nonce = 0;
   get_random_bytes((char*)&nonce, sizeof(nonce));
-  return Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(),
+  return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(),
                           std::move(lname), nonce, 0);
 }
 
@@ -36,8 +37,8 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
   }
   if (r == 0 || type == "simple")
     return new SimpleMessenger(cct, name, std::move(lname), nonce);
-  else if (r == 1 || type == "async")
-    return new AsyncMessenger(cct, name, std::move(lname), nonce);
+  else if (r == 1 || type.find("async") != std::string::npos)
+    return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
 #ifdef HAVE_XIO
   else if ((type == "xio") &&
           cct->check_experimental_feature_enabled("ms-type-xio"))
index 2b2eb2c0c9ef32a41bbc4066a85374a2fd0ab353..506b71be471ff06a756a6905cfa05a55dd292e7f 100644 (file)
@@ -213,9 +213,13 @@ void Processor::stop()
 
 
 struct StackSingleton {
+  CephContext *cct;
   std::shared_ptr<NetworkStack> stack;
-  StackSingleton(CephContext *c) {
-    stack = NetworkStack::create(c, c->_conf->ms_async_transport_type);
+
+  StackSingleton(CephContext *c): cct(c) {}
+  void ready(std::string &type) {
+    if (!stack)
+      stack = NetworkStack::create(cct, type);
   }
   ~StackSingleton() {
     stack->stop();
@@ -239,7 +243,7 @@ class C_handle_reap : public EventCallback {
  */
 
 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
-                               string mname, uint64_t _nonce)
+                               const std::string &type, string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     dispatch_queue(cct, this, mname),
     lock("AsyncMessenger::lock"),
@@ -247,9 +251,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
     cluster_protocol(0), stopped(true)
 {
+  std::string transport_type = "posix";
+  if (type.find("rdma") != std::string::npos)
+    transport_type = "rdma";
+  else if (type.find("dpdk") != std::string::npos)
+    transport_type = "dpdk";
+
   ceph_spin_init(&global_seq_lock);
   StackSingleton *single;
-  cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack");
+  cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
+  single->ready(transport_type);
   stack = single->stack.get();
   stack->start();
   local_worker = stack->get_worker();
index 384e4465a7bde7de46909ba8efc4ab43cb702f37..771dfdbe8a7b0bdee28e590fd0a1f63da91aca23 100644 (file)
@@ -82,7 +82,7 @@ public:
    * _nonce A unique ID to use for this AsyncMessenger. It should not
    * be a value that will be repeated if the daemon restarts.
    */
-  AsyncMessenger(CephContext *cct, entity_name_t name,
+  AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
                  string mname, uint64_t _nonce);
 
   /**
@@ -225,6 +225,8 @@ private:
   // the worker run messenger's cron jobs
   Worker *local_worker;
 
+  std::string ms_type;
+
   /// overall lock used for AsyncMessenger data structures
   Mutex lock;
   // AsyncMessenger stuff
index 5999b0d77e8f231bf1c85be1081fd3edfaf00ba7..0ce65ed23b2bdb46eeeca725fa23cd1bf5dd7a5a 100644 (file)
@@ -99,14 +99,14 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
                 << " time_id=" << time_event_next_id << ").";
 }
 
-int EventCenter::init(int n, unsigned i)
+int EventCenter::init(int n, unsigned i, std::string &t)
 {
   // can't init multi times
   assert(nevent == 0);
 
   idx = i;
 
-  if (cct->_conf->ms_async_transport_type == "dpdk") {
+  if (t == "dpdk") {
 #ifdef HAVE_DPDK
     driver = new DPDKDriver(cct);
 #endif
index 5b210efe0684efe969c482d0e06cff29df57fdf1..f57a1cf49495e43789278735e5a0d2a7ce5af53a 100644 (file)
@@ -190,7 +190,7 @@ class EventCenter {
   ~EventCenter();
   ostream& _event_prefix(std::ostream *_dout);
 
-  int init(int nevent, unsigned idx);
+  int init(int nevent, unsigned idx, std::string &t);
   void set_owner();
   pthread_t get_owner() const { return owner; }
   unsigned get_id() const { return idx; }
index 26d119b999cdc68cd8d102647936578e8d3a5ea7..625e472d808d0deca7a430b1662cb7d854bede7c 100644 (file)
@@ -107,7 +107,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa
 
   for (unsigned i = 0; i < num_workers; ++i) {
     Worker *w = create_worker(cct, type, i);
-    w->center.init(InitEventNumber, i);
+    w->center.init(InitEventNumber, i, type);
     workers.push_back(w);
   }
   cct->register_fork_watcher(this);
index af39f79c47c7fef6cb4c96f2ddce6ec1928b13e4..585077c764b713a81be97678485a16b4fca5c1ea 100644 (file)
@@ -41,11 +41,11 @@ class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
   virtual void SetUp() {
     cerr << __func__ << " start set up " << GetParam() << std::endl;
     if (strncmp(GetParam(), "dpdk", 4)) {
-      g_ceph_context->_conf->set_val("ms_async_transport_type", "posix", false, false);
+      g_ceph_context->_conf->set_val("ms_type", "async+posix", false, false);
       addr = "127.0.0.1:15000";
       port_addr = "127.0.0.1:15001";
     } else {
-      g_ceph_context->_conf->set_val("ms_async_transport_type", "dpdk", false, false);
+      g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false, false);
       g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false, false);
       g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false, false);
       g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false, false);