From 001ea293860e43d13d7eb9594f304ba7e1585495 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 7 Dec 2014 02:14:46 +0800 Subject: [PATCH] Messenger: Create an Messenger implementation by name. Signed-off-by: Haomai Wang --- src/ceph_fuse.cc | 2 +- src/ceph_mds.cc | 2 +- src/ceph_mon.cc | 2 +- src/ceph_osd.cc | 12 ++++++------ src/ceph_syn.cc | 2 +- src/libcephfs.cc | 2 +- src/librados/RadosClient.cc | 2 +- src/mon/MonClient.cc | 4 ++-- src/msg/Messenger.cc | 13 ++++++------- src/msg/Messenger.h | 2 ++ src/test/mon/test-mon-msg.cc | 2 +- src/test/mon/test_mon_workloadgen.cc | 4 ++-- src/test/msgr/test_msgr.cc | 12 ++---------- src/tools/cephfs/MDSUtility.cc | 2 +- 14 files changed, 28 insertions(+), 35 deletions(-) diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index f6a421b93bc17..50406b92d1828 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -120,7 +120,7 @@ int main(int argc, const char **argv, const char *envp[]) { goto out_mc_start_failed; // start up network - messenger = Messenger::create(g_ceph_context, + messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::CLIENT(), "client", getpid()); messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0)); diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index fca21c40130ee..59f1dff59f36e 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -151,7 +151,7 @@ int main(int argc, const char **argv) "MDS names may not start with a numeric digit." << dendl; } - Messenger *messenger = Messenger::create(g_ceph_context, + Messenger *messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MDS(-1), "mds", getpid()); messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL); diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 3bd6022b81c9a..30ce2dfbca7d8 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -685,7 +685,7 @@ int main(int argc, const char **argv) // bind int rank = monmap.get_rank(g_conf->name.get_id()); - Messenger *messenger = Messenger::create(g_ceph_context, + Messenger *messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(rank), "mon", 0); diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index a6228d442a627..393a165c2ee73 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -369,22 +369,22 @@ int main(int argc, const char **argv) << TEXT_NORMAL << dendl; } - Messenger *ms_public = Messenger::create(g_ceph_context, + Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "client", getpid()); - Messenger *ms_cluster = Messenger::create(g_ceph_context, + Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "cluster", getpid()); - Messenger *ms_hbclient = Messenger::create(g_ceph_context, + Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hbclient", getpid()); - Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, + Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hb_back_server", getpid()); - Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, + Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hb_front_server", getpid()); - Messenger *ms_objecter = Messenger::create(g_ceph_context, + Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "ms_objecter", getpid()); ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL); diff --git a/src/ceph_syn.cc b/src/ceph_syn.cc index c3410aa61d413..d9954db42e55f 100644 --- a/src/ceph_syn.cc +++ b/src/ceph_syn.cc @@ -65,7 +65,7 @@ int main(int argc, const char **argv, char *envp[]) cout << "ceph-syn: starting " << g_conf->num_client << " syn client(s)" << std::endl; for (int i=0; inum_client; i++) { - messengers[i] = Messenger::create(g_ceph_context, + messengers[i] = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t(entity_name_t::TYPE_CLIENT,-1), "synclient", i * 1000000 + getpid()); messengers[i]->bind(g_conf->public_addr); diff --git a/src/libcephfs.cc b/src/libcephfs.cc index 4ce77c7d72ef2..e3281de273484 100644 --- a/src/libcephfs.cc +++ b/src/libcephfs.cc @@ -77,7 +77,7 @@ public: goto fail; //network connection - messenger = Messenger::create(cct, entity_name_t::CLIENT(), "client", msgr_nonce); + messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(), "client", msgr_nonce); //at last the client ret = -CEPHFS_ERROR_NEW_CLIENT; //defined in libcephfs.h; diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 56f52bb431408..81427dbcdde8d 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -209,7 +209,7 @@ int librados::RadosClient::connect() err = -ENOMEM; nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc()); - messenger = Messenger::create(cct, entity_name_t::CLIENT(-1), + messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), "radosclient", nonce); if (!messenger) goto out; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index b8af1d28c5a17..8f42fbc150077 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -116,7 +116,7 @@ int MonClient::get_monmap_privately() bool temp_msgr = false; Messenger* smessenger = NULL; if (!messenger) { - messenger = smessenger = Messenger::create(cct, + messenger = smessenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), "temp_mon_client", getpid()); messenger->add_dispatcher_head(this); @@ -218,7 +218,7 @@ int MonClient::ping_monitor(const string &mon_id, string *result_reply) MonClientPinger *pinger = new MonClientPinger(cct, result_reply); - Messenger *smsgr = Messenger::create(cct, + Messenger *smsgr = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), "temp_ping_client", getpid()); smsgr->add_dispatcher_head(pinger); diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index bd48b3ddd1bc8..e079ec3c577a6 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -5,17 +5,16 @@ #include "msg/simple/SimpleMessenger.h" #include "msg/async/AsyncMessenger.h" -Messenger *Messenger::create(CephContext *cct, - entity_name_t name, - string lname, - uint64_t nonce) +Messenger *Messenger::create(CephContext *cct, const string &type, + entity_name_t name, string lname, + uint64_t nonce) { int r = -1; - if (cct->_conf->ms_type == "random") + if (type == "random") r = rand() % 2; - if (r == 0 || cct->_conf->ms_type == "simple") + if (r == 0 || type == "simple") return new SimpleMessenger(cct, name, lname, nonce); - else if (r == 1 || cct->_conf->ms_type == "async") + else if (r == 1 || type == "async") return new AsyncMessenger(cct, name, lname, nonce); lderr(cct) << "unrecognized ms_type '" << cct->_conf->ms_type << "'" << dendl; return NULL; diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 89d5f0415d08c..4462d6c5dfd1f 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -139,11 +139,13 @@ public: * available or specified via the configuration in cct. * * @param cct context + * @param type name of messenger type * @param name entity name to register * @param lname logical name of the messenger in this process (e.g., "client") * @param nonce nonce value to uniquely identify this instance on the current host */ static Messenger *create(CephContext *cct, + const string &type, entity_name_t name, string lname, uint64_t nonce); diff --git a/src/test/mon/test-mon-msg.cc b/src/test/mon/test-mon-msg.cc index 69612327c7e82..a20741a99c22c 100644 --- a/src/test/mon/test-mon-msg.cc +++ b/src/test/mon/test-mon-msg.cc @@ -78,7 +78,7 @@ public: int init_messenger() { dout(1) << __func__ << dendl; - msg = Messenger::create(cct, entity_name_t::CLIENT(-1), + msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), "test-mon-msg", 0); assert(msg != NULL); msg->set_default_policy(Messenger::Policy::lossy_client(0,0)); diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index a09676ef6d05d..c33810374e6a3 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -244,7 +244,7 @@ class ClientStub : public TestStub return err; } - messenger.reset(Messenger::create(cct, entity_name_t::CLIENT(-1), + messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), "stubclient", getpid())); assert(messenger.get() != NULL); @@ -357,7 +357,7 @@ class OSDStub : public TestStub << cct->_conf->auth_supported << dendl; stringstream ss; ss << "client-osd" << whoami; - messenger.reset(Messenger::create(cct, entity_name_t::OSD(whoami), + messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami), ss.str().c_str(), getpid())); Throttle throttler(g_ceph_context, "osd_client_bytes", diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 6985145ef56f3..a42943fb6a92d 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -25,8 +25,6 @@ #include "msg/msg_types.h" #include "msg/Message.h" #include "msg/Messenger.h" -#include "msg/simple/SimpleMessenger.h" -#include "msg/async/AsyncMessenger.h" #include "msg/Connection.h" #include "messages/MPing.h" @@ -42,14 +40,8 @@ class MessengerTest : public ::testing::TestWithParam { MessengerTest(): server_msgr(NULL), client_msgr(NULL) {} virtual void SetUp() { cerr << __func__ << " start set up " << GetParam() << std::endl; - if (strcmp(GetParam(), "simple")) { - server_msgr = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(0), "server", getpid()); - client_msgr = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT(-1), "client", getpid()); - } else if (strcmp(GetParam(), "async")) { - server_msgr = new AsyncMessenger(g_ceph_context, entity_name_t::OSD(0), "server", getpid()); - client_msgr = new AsyncMessenger(g_ceph_context, entity_name_t::CLIENT(-1), "client", getpid()); - server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); - } + server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); + client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0)); } diff --git a/src/tools/cephfs/MDSUtility.cc b/src/tools/cephfs/MDSUtility.cc index dda915042481c..9bace76fa83ab 100644 --- a/src/tools/cephfs/MDSUtility.cc +++ b/src/tools/cephfs/MDSUtility.cc @@ -26,7 +26,7 @@ MDSUtility::MDSUtility() : waiting_for_mds_map(NULL) { monc = new MonClient(g_ceph_context); - messenger = Messenger::create(g_ceph_context, entity_name_t::CLIENT(), "mds", getpid()); + messenger = Messenger::create(g_ceph_context, g_ceph_context->_conf->ms_type, entity_name_t::CLIENT(), "mds", getpid()); mdsmap = new MDSMap(); objecter = new Objecter(g_ceph_context, messenger, monc, 0, 0); } -- 2.39.5