From 0dbe8fd3987db1c38bb2fbeae7789a8084dd5489 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 10 Oct 2016 09:55:54 -0400 Subject: [PATCH] msg: make loopback Connection feature accurate all the time In 626360aab05545ddacb0ac28e54a70e31fd5695d we made the OSD cluster loopback connection CEPH_FEATURES_ALL, but all other loopback connections got features == 0. I can't come up with any reason we wouldn't want those connections to have accurate feature bits, so let's just use CEPH_FEATURES_ALL for all of them. While we're here, make the cflags argument required. Signed-off-by: Sage Weil --- src/ceph_mds.cc | 2 +- src/ceph_mon.cc | 2 +- src/ceph_osd.cc | 12 ++++++------ src/mgr/DaemonServer.cc | 2 +- src/msg/Messenger.cc | 8 ++++---- src/msg/Messenger.h | 3 +-- src/msg/async/AsyncMessenger.cc | 3 +-- src/msg/async/AsyncMessenger.h | 5 ++--- src/msg/simple/SimpleMessenger.cc | 5 ++--- src/msg/simple/SimpleMessenger.h | 3 +-- src/msg/xio/XioMessenger.cc | 7 ++----- src/msg/xio/XioMessenger.h | 5 +---- src/test/messenger/simple_client.cc | 2 +- src/test/messenger/simple_server.cc | 3 ++- src/test/mon/test-mon-msg.cc | 2 +- src/test/mon/test_mon_workloadgen.cc | 2 +- src/test/msgr/perf_msgr_client.cc | 2 +- src/test/msgr/perf_msgr_server.cc | 2 +- src/test/msgr/test_msgr.cc | 10 +++++----- src/test/osd/TestOSDScrub.cc | 4 ++-- 20 files changed, 37 insertions(+), 47 deletions(-) diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 8daf4d7363256..f22d8384df372 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -141,7 +141,7 @@ int main(int argc, const char **argv) Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MDS(-1), "mds", - nonce, 0, Messenger::HAS_MANY_CONNECTIONS); + nonce, Messenger::HAS_MANY_CONNECTIONS); if (!msgr) exit(1); msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL); diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index de0447606c281..172c9e78ecdb8 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -642,7 +642,7 @@ int main(int argc, const char **argv) int rank = monmap.get_rank(g_conf->name.get_id()); Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(rank), "mon", - 0, 0, Messenger::HAS_MANY_CONNECTIONS); + 0, Messenger::HAS_MANY_CONNECTIONS); if (!msgr) exit(1); msgr->set_cluster_protocol(CEPH_MON_PROTOCOL); diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index d19b2813fb9bc..33cb705a0d662 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -430,26 +430,26 @@ int main(int argc, const char **argv) Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "client", - getpid(), 0, + getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "cluster", - getpid(), CEPH_FEATURES_ALL, + getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hbclient", - getpid(), 0, Messenger::HEARTBEAT); + getpid(), Messenger::HEARTBEAT); Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hb_back_server", - getpid(), 0, Messenger::HEARTBEAT); + getpid(), Messenger::HEARTBEAT); Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "hb_front_server", - getpid(), 0, Messenger::HEARTBEAT); + getpid(), Messenger::HEARTBEAT); Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::OSD(whoami), "ms_objecter", - getpid()); + getpid(), 0); if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter) exit(1); ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL); diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index f0be92a1a9df4..31f0ce999d396 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -46,7 +46,7 @@ int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) { // Initialize Messenger msgr = Messenger::create(g_ceph_context, g_conf->ms_type, - entity_name_t::MGR(gid), "server", getpid()); + entity_name_t::MGR(gid), "server", getpid(), 0); int r = msgr->bind(g_conf->public_addr); if (r < 0) return r; diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index a271b082b9ac7..aa6f716bd9cb9 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -25,7 +25,7 @@ static Spinlock random_lock; Messenger *Messenger::create(CephContext *cct, const string &type, entity_name_t name, string lname, - uint64_t nonce, uint64_t features, uint64_t cflags) + uint64_t nonce, uint64_t cflags) { int r = -1; if (type == "random") { @@ -34,13 +34,13 @@ Messenger *Messenger::create(CephContext *cct, const string &type, r = dis(random_engine); } if (r == 0 || type == "simple") - return new SimpleMessenger(cct, name, lname, nonce, features); + return new SimpleMessenger(cct, name, lname, nonce); else if (r == 1 || type == "async") - return new AsyncMessenger(cct, name, lname, nonce, features); + return new AsyncMessenger(cct, name, lname, nonce); #ifdef HAVE_XIO else if ((type == "xio") && cct->check_experimental_feature_enabled("ms-type-xio")) - return new XioMessenger(cct, name, lname, nonce, features, cflags); + return new XioMessenger(cct, name, lname, nonce, cflags); #endif lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl; return nullptr; diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index fa261df30c727..bd391e07eac7d 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -166,8 +166,7 @@ public: entity_name_t name, string lname, uint64_t nonce, - uint64_t features = 0, - uint64_t cflags = 0); + uint64_t cflags); /** * create a new messenger diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 7104ddb0f09b2..19ee090a8ad79 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -259,7 +259,7 @@ class C_handle_reap : public EventCallback { */ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, uint64_t features) + string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), @@ -274,7 +274,6 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, stack->start(); local_worker = stack->get_worker(); local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); - local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); unsigned processor_num = 1; diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index ee3e0285e83b6..5e9877f7bc78c 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -84,7 +84,7 @@ public: * be a value that will be repeated if the daemon restarts. */ AsyncMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, uint64_t features); + string mname, uint64_t _nonce); /** * Destroy the AsyncMessenger. Pretty simple since all the work is done @@ -306,7 +306,7 @@ private: assert(lock.is_locked()); local_connection->peer_addr = my_inst.addr; local_connection->peer_type = my_inst.name.type(); - local_connection->set_features(local_features); + local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); } @@ -316,7 +316,6 @@ public: /// con used for sending messages to ourselves ConnectionRef local_connection; - uint64_t local_features; /** * @defgroup AsyncMessenger internals diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 4ef2fe1392335..d89c9d9ef557a 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -39,7 +39,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { */ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, uint64_t features) + string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), accepter(this, _nonce), dispatch_queue(cct, this, mname), @@ -55,7 +55,6 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout), "SimpleMessenger read timeout"); ceph_spin_init(&global_seq_lock); - local_features = features; init_local_connection(); } @@ -718,6 +717,6 @@ void SimpleMessenger::init_local_connection() { local_connection->peer_addr = my_inst.addr; local_connection->peer_type = my_inst.name.type(); - local_connection->set_features(local_features); + local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); } diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h index 2f4685bedc404..ac0c8293ec632 100644 --- a/src/msg/simple/SimpleMessenger.h +++ b/src/msg/simple/SimpleMessenger.h @@ -82,7 +82,7 @@ public: * features The local features bits for the local_connection */ SimpleMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, uint64_t features); + string mname, uint64_t _nonce); /** * Destroy the SimpleMessenger. Pretty simple since all the work is done @@ -332,7 +332,6 @@ public: /// con used for sending messages to ourselves ConnectionRef local_connection; - uint64_t local_features; /** * @defgroup SimpleMessenger internals diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index bce31d3f9d55b..52d22fb83a299 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -350,7 +350,7 @@ static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) { } XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, uint64_t features, + string mname, uint64_t _nonce, uint64_t cflags, DispatchStrategy *ds) : SimplePolicyMessenger(cct, name, mname, _nonce), XioInit(cct), @@ -378,7 +378,6 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, /* update class instance count */ nInstances.inc(); - local_features = features; loop_con->set_features(features); ldout(cct,2) << "Create msgr: " << this << " instance: " @@ -775,9 +774,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, return NULL; XioMsg *xmsg = reinterpret_cast(mp_mem.addr); assert(!!xmsg); - new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, - static_cast( - xcon->get_messenger())->local_features); + new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); return xmsg; } diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h index 8a1bbc76fd8f4..0bfdd6e78017b 100644 --- a/src/msg/xio/XioMessenger.h +++ b/src/msg/xio/XioMessenger.h @@ -63,7 +63,7 @@ private: public: XioMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t nonce, uint64_t features, + string mname, uint64_t nonce, uint64_t cflags = 0, DispatchStrategy* ds = new QueueStrategy(1)); @@ -156,9 +156,6 @@ private: protected: virtual void ready() { } - -public: - uint64_t local_features; }; XioCommand* pool_alloc_xio_command(XioConnection *xcon); diff --git a/src/test/messenger/simple_client.cc b/src/test/messenger/simple_client.cc index a63cb2804bc65..b1a33bc641194 100644 --- a/src/test/messenger/simple_client.cc +++ b/src/test/messenger/simple_client.cc @@ -105,7 +105,7 @@ int main(int argc, const char **argv) messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(-1), "client", - getpid()); + getpid(), 0); // enable timing prints messenger->set_magic(MSG_MAGIC_TRACE_CTR); diff --git a/src/test/messenger/simple_server.cc b/src/test/messenger/simple_server.cc index 674ef3e948d6e..57115ea7fd913 100644 --- a/src/test/messenger/simple_server.cc +++ b/src/test/messenger/simple_server.cc @@ -76,7 +76,8 @@ int main(int argc, const char **argv) messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(-1), "simple_server", - 0 /* nonce */); + 0 /* nonce */, + 0 /* flags */); // enable timing prints messenger->set_magic(MSG_MAGIC_TRACE_CTR); messenger->set_default_policy( diff --git a/src/test/mon/test-mon-msg.cc b/src/test/mon/test-mon-msg.cc index 58e8b7d133f23..ee50c02626f23 100644 --- a/src/test/mon/test-mon-msg.cc +++ b/src/test/mon/test-mon-msg.cc @@ -79,7 +79,7 @@ public: dout(1) << __func__ << dendl; msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1), - "test-mon-msg", 0); + "test-mon-msg", 0, 0); assert(msg != NULL); msg->set_default_policy(Messenger::Policy::lossy_client(0,0)); dout(0) << __func__ << " starting messenger at " diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index b12272270b59a..73fbac06482e7 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -361,7 +361,7 @@ class OSDStub : public TestStub stringstream ss; ss << "client-osd" << whoami; messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami), - ss.str().c_str(), getpid())); + ss.str().c_str(), getpid(), 0)); Throttle throttler(g_ceph_context, "osd_client_bytes", g_conf->osd_client_message_size_cap); diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc index d502cca405b0d..b567743df8479 100644 --- a/src/test/msgr/perf_msgr_client.cc +++ b/src/test/msgr/perf_msgr_client.cc @@ -130,7 +130,7 @@ class MessengerClient { addr.parse(serveraddr.c_str()); addr.set_nonce(0); for (int i = 0; i < jobs; ++i) { - Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i); + Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0); msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0)); entity_inst_t inst(entity_name_t::OSD(0), addr); ConnectionRef conn = msgr->get_connection(inst); diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc index eefaf9a292362..381cd52812b4d 100644 --- a/src/test/msgr/perf_msgr_server.cc +++ b/src/test/msgr/perf_msgr_server.cc @@ -116,7 +116,7 @@ class MessengerServer { public: MessengerServer(string t, string addr, int threads, int delay): msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) { - msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0); + msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0); msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); } ~MessengerServer() { diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index b2ac1433f4a8b..e1e7c7933394c 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -65,8 +65,8 @@ class MessengerTest : public ::testing::TestWithParam { MessengerTest(): server_msgr(NULL), client_msgr(NULL) {} virtual void SetUp() { lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; - server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); - client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); + server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); + client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0); server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0)); } @@ -952,7 +952,7 @@ class SyntheticWorkload { char addr[64]; for (int i = 0; i < servers; ++i) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), - "server", getpid()+i); + "server", getpid()+i, 0); snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); bind_addr.parse(addr); msgr->bind(bind_addr); @@ -966,7 +966,7 @@ class SyntheticWorkload { for (int i = 0; i < clients; ++i) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), - "client", getpid()+i+servers); + "client", getpid()+i+servers, 0); if (cli_policy.standby) { snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers); bind_addr.parse(addr); @@ -1431,7 +1431,7 @@ class MarkdownDispatcher : public Dispatcher { // Markdown with external lock TEST_P(MessengerTest, MarkdownTest) { - Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); + Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("127.0.0.1:16800"); diff --git a/src/test/osd/TestOSDScrub.cc b/src/test/osd/TestOSDScrub.cc index a5da6dc6c9866..fbd82a64165e0 100644 --- a/src/test/osd/TestOSDScrub.cc +++ b/src/test/osd/TestOSDScrub.cc @@ -57,8 +57,8 @@ TEST(TestOSDScrub, scrub_time_permit) { g_conf->osd_data, g_conf->osd_journal); Messenger *ms = Messenger::create(g_ceph_context, g_conf->ms_type, - entity_name_t::OSD(0), "make_checker", - getpid()); + entity_name_t::OSD(0), "make_checker", + getpid(), 0); ms->set_cluster_protocol(CEPH_OSD_PROTOCOL); ms->set_default_policy(Messenger::Policy::stateless_server(0, 0)); ms->bind(g_conf->public_addr); -- 2.39.5