]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: make loopback Connection feature accurate all the time 11183/head
authorSage Weil <sage@redhat.com>
Mon, 10 Oct 2016 13:55:54 +0000 (09:55 -0400)
committerSage Weil <sage@redhat.com>
Mon, 10 Oct 2016 13:55:54 +0000 (09:55 -0400)
In 626360aab05545ddacb0ac28e54a70e31fd5695d we made the
OSD cluster loopback connection CEPH_FEATURES_ALL, but
all other loopback connections got features == 0.  I
can't come up with any reason we wouldn't want those
connections to have accurate feature bits, so let's just
use CEPH_FEATURES_ALL for all of them.

While we're here, make the cflags argument required.

Signed-off-by: Sage Weil <sage@redhat.com>
20 files changed:
src/ceph_mds.cc
src/ceph_mon.cc
src/ceph_osd.cc
src/mgr/DaemonServer.cc
src/msg/Messenger.cc
src/msg/Messenger.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/simple/SimpleMessenger.cc
src/msg/simple/SimpleMessenger.h
src/msg/xio/XioMessenger.cc
src/msg/xio/XioMessenger.h
src/test/messenger/simple_client.cc
src/test/messenger/simple_server.cc
src/test/mon/test-mon-msg.cc
src/test/mon/test_mon_workloadgen.cc
src/test/msgr/perf_msgr_client.cc
src/test/msgr/perf_msgr_server.cc
src/test/msgr/test_msgr.cc
src/test/osd/TestOSDScrub.cc

index 8daf4d7363256e765e994e03bfef32ded998b696..f22d8384df37249d59dfc569746a94fd832b81a5 100644 (file)
@@ -141,7 +141,7 @@ int main(int argc, const char **argv)
 
   Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
                                      entity_name_t::MDS(-1), "mds",
-                                     nonce, 0, Messenger::HAS_MANY_CONNECTIONS);
+                                     nonce, Messenger::HAS_MANY_CONNECTIONS);
   if (!msgr)
     exit(1);
   msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
index de0447606c28113a9d0bc20ac5b85191c781a350..172c9e78ecdb8d80b0753be27bc98d23f599c3da 100644 (file)
@@ -642,7 +642,7 @@ int main(int argc, const char **argv)
   int rank = monmap.get_rank(g_conf->name.get_id());
   Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
                                      entity_name_t::MON(rank), "mon",
-                                     0, 0, Messenger::HAS_MANY_CONNECTIONS);
+                                     0, Messenger::HAS_MANY_CONNECTIONS);
   if (!msgr)
     exit(1);
   msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
index d19b2813fb9bca3e3a5d853a4ac26166763474bc..33cb705a0d6625d54ac81f7a5a9b3f5b79d94742 100644 (file)
@@ -430,26 +430,26 @@ int main(int argc, const char **argv)
 
   Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
                                           entity_name_t::OSD(whoami), "client",
-                                          getpid(), 0,
+                                          getpid(),
                                           Messenger::HAS_HEAVY_TRAFFIC |
                                           Messenger::HAS_MANY_CONNECTIONS);
   Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
                                            entity_name_t::OSD(whoami), "cluster",
-                                           getpid(), CEPH_FEATURES_ALL,
+                                           getpid(),
                                            Messenger::HAS_HEAVY_TRAFFIC |
                                            Messenger::HAS_MANY_CONNECTIONS);
   Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
                                             entity_name_t::OSD(whoami), "hbclient",
-                                            getpid(), 0, Messenger::HEARTBEAT);
+                                            getpid(), Messenger::HEARTBEAT);
   Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
                                                   entity_name_t::OSD(whoami), "hb_back_server",
-                                                  getpid(), 0, Messenger::HEARTBEAT);
+                                                  getpid(), Messenger::HEARTBEAT);
   Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
                                                    entity_name_t::OSD(whoami), "hb_front_server",
-                                                   getpid(), 0, Messenger::HEARTBEAT);
+                                                   getpid(), Messenger::HEARTBEAT);
   Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
                                             entity_name_t::OSD(whoami), "ms_objecter",
-                                            getpid());
+                                            getpid(), 0);
   if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
     exit(1);
   ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
index f0be92a1a9df4c176102c05c1cdabd6634c35a23..31f0ce999d396e31414a2b3329210a751e75bb8e 100644 (file)
@@ -46,7 +46,7 @@ int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
 {
   // Initialize Messenger
   msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
-      entity_name_t::MGR(gid), "server", getpid());
+                          entity_name_t::MGR(gid), "server", getpid(), 0);
   int r = msgr->bind(g_conf->public_addr);
   if (r < 0)
     return r;
index a271b082b9ac788de677313ddb2f57c9338bbb6b..aa6f716bd9cb947a574f1535109e8e699b099078 100644 (file)
@@ -25,7 +25,7 @@ static Spinlock random_lock;
 
 Messenger *Messenger::create(CephContext *cct, const string &type,
                             entity_name_t name, string lname,
-                            uint64_t nonce, uint64_t features, uint64_t cflags)
+                            uint64_t nonce, uint64_t cflags)
 {
   int r = -1;
   if (type == "random") {
@@ -34,13 +34,13 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
     r = dis(random_engine);
   }
   if (r == 0 || type == "simple")
-    return new SimpleMessenger(cct, name, lname, nonce, features);
+    return new SimpleMessenger(cct, name, lname, nonce);
   else if (r == 1 || type == "async")
-    return new AsyncMessenger(cct, name, lname, nonce, features);
+    return new AsyncMessenger(cct, name, lname, nonce);
 #ifdef HAVE_XIO
   else if ((type == "xio") &&
           cct->check_experimental_feature_enabled("ms-type-xio"))
-    return new XioMessenger(cct, name, lname, nonce, features, cflags);
+    return new XioMessenger(cct, name, lname, nonce, cflags);
 #endif
   lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
   return nullptr;
index fa261df30c727981e0f5497e55e7860faeba42b3..bd391e07eac7d2882c108cf521904e823a3e516b 100644 (file)
@@ -166,8 +166,7 @@ public:
                            entity_name_t name,
                           string lname,
                            uint64_t nonce,
-                          uint64_t features = 0,
-                          uint64_t cflags = 0);
+                          uint64_t cflags);
 
   /**
    * create a new messenger
index 7104ddb0f09b27abb8e4cda7d7d459b314f4d8bf..19ee090a8ad79f656ad8e531bbcc262f45ca405d 100644 (file)
@@ -259,7 +259,7 @@ class C_handle_reap : public EventCallback {
  */
 
 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
-                               string mname, uint64_t _nonce, uint64_t features)
+                               string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     dispatch_queue(cct, this, mname),
     lock("AsyncMessenger::lock"),
@@ -274,7 +274,6 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
   stack->start();
   local_worker = stack->get_worker();
   local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
-  local_features = features;
   init_local_connection();
   reap_handler = new C_handle_reap(this);
   unsigned processor_num = 1;
index ee3e0285e83b6ab61c244b7296992edef07372f0..5e9877f7bc78c094ba1d7b9a5ce66197e9f39e51 100644 (file)
@@ -84,7 +84,7 @@ public:
    * be a value that will be repeated if the daemon restarts.
    */
   AsyncMessenger(CephContext *cct, entity_name_t name,
-                 string mname, uint64_t _nonce, uint64_t features);
+                 string mname, uint64_t _nonce);
 
   /**
    * Destroy the AsyncMessenger. Pretty simple since all the work is done
@@ -306,7 +306,7 @@ private:
     assert(lock.is_locked());
     local_connection->peer_addr = my_inst.addr;
     local_connection->peer_type = my_inst.name.type();
-    local_connection->set_features(local_features);
+    local_connection->set_features(CEPH_FEATURES_ALL);
     ms_deliver_handle_fast_connect(local_connection.get());
   }
 
@@ -316,7 +316,6 @@ public:
 
   /// con used for sending messages to ourselves
   ConnectionRef local_connection;
-  uint64_t local_features;
 
   /**
    * @defgroup AsyncMessenger internals
index 4ef2fe1392335eeac4ff806f596aed0e3b7f8348..d89c9d9ef557a4cd4972b0e2bc52c965a10d5fb4 100644 (file)
@@ -39,7 +39,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
  */
 
 SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
-                                string mname, uint64_t _nonce, uint64_t features)
+                                string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     accepter(this, _nonce),
     dispatch_queue(cct, this, mname),
@@ -55,7 +55,6 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
   ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
                              "SimpleMessenger read timeout");
   ceph_spin_init(&global_seq_lock);
-  local_features = features;
   init_local_connection();
 }
 
@@ -718,6 +717,6 @@ void SimpleMessenger::init_local_connection()
 {
   local_connection->peer_addr = my_inst.addr;
   local_connection->peer_type = my_inst.name.type();
-  local_connection->set_features(local_features);
+  local_connection->set_features(CEPH_FEATURES_ALL);
   ms_deliver_handle_fast_connect(local_connection.get());
 }
index 2f4685bedc404451e839145cb9654825a7cc49d6..ac0c8293ec632d68945bf9fb7651463c2fbe941c 100644 (file)
@@ -82,7 +82,7 @@ public:
    * features The local features bits for the local_connection
    */
   SimpleMessenger(CephContext *cct, entity_name_t name,
-                 string mname, uint64_t _nonce, uint64_t features);
+                 string mname, uint64_t _nonce);
 
   /**
    * Destroy the SimpleMessenger. Pretty simple since all the work is done
@@ -332,7 +332,6 @@ public:
 
   /// con used for sending messages to ourselves
   ConnectionRef local_connection;
-  uint64_t local_features;
 
   /**
    * @defgroup SimpleMessenger internals
index bce31d3f9d55b021940a4b83454202edaf97163e..52d22fb83a299c172f8e0d2c82cc5fcd14db5586 100644 (file)
@@ -350,7 +350,7 @@ static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
 }
 
 XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
-                          string mname, uint64_t _nonce, uint64_t features,
+                          string mname, uint64_t _nonce,
                           uint64_t cflags, DispatchStrategy *ds)
   : SimplePolicyMessenger(cct, name, mname, _nonce),
     XioInit(cct),
@@ -378,7 +378,6 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
   /* update class instance count */
   nInstances.inc();
 
-  local_features = features;
   loop_con->set_features(features);
 
   ldout(cct,2) << "Create msgr: " << this << " instance: "
@@ -775,9 +774,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
     return NULL;
   XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
   assert(!!xmsg);
-  new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt,
-                   static_cast<XioMessenger*>(
-                     xcon->get_messenger())->local_features);
+  new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
   return xmsg;
 }
 
index 8a1bbc76fd8f45d617f4bffcc67ef62345be997e..0bfdd6e78017b417a03c964628260f5e3b53476d 100644 (file)
@@ -63,7 +63,7 @@ private:
 
 public:
   XioMessenger(CephContext *cct, entity_name_t name,
-              string mname, uint64_t nonce, uint64_t features,
+              string mname, uint64_t nonce,
               uint64_t cflags = 0,
               DispatchStrategy* ds = new QueueStrategy(1));
 
@@ -156,9 +156,6 @@ private:
 protected:
   virtual void ready()
     { }
-
-public:
-  uint64_t local_features;
 };
 
 XioCommand* pool_alloc_xio_command(XioConnection *xcon);
index a63cb2804bc65a2a78cd77070c97e960e9493fd7..b1a33bc64119483811a46f2d10dd25ffa50ca79b 100644 (file)
@@ -105,7 +105,7 @@ int main(int argc, const char **argv)
        messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
                                      entity_name_t::MON(-1),
                                      "client",
-                                     getpid());
+                                     getpid(), 0);
 
        // enable timing prints
        messenger->set_magic(MSG_MAGIC_TRACE_CTR);
index 674ef3e948d6ee61e1399e92ed4961a2df1f0fdc..57115ea7fd9134241872328c36635cef8c18358f 100644 (file)
@@ -76,7 +76,8 @@ int main(int argc, const char **argv)
        messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
                                      entity_name_t::MON(-1),
                                      "simple_server",
-                                     0 /* nonce */);
+                                     0 /* nonce */,
+                                     0 /* flags */);
        // enable timing prints
        messenger->set_magic(MSG_MAGIC_TRACE_CTR);
        messenger->set_default_policy(
index 58e8b7d133f23bb9b6561f44164513567e5d0952..ee50c02626f23be16eb27c597377f9c9454c4163 100644 (file)
@@ -79,7 +79,7 @@ public:
     dout(1) << __func__ << dendl;
 
     msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
-                            "test-mon-msg", 0);
+                            "test-mon-msg", 0, 0);
     assert(msg != NULL);
     msg->set_default_policy(Messenger::Policy::lossy_client(0,0));
     dout(0) << __func__ << " starting messenger at "
index b12272270b59a23b4d4cc8908ac0cdd38a110559..73fbac06482e7cec916ebf2aba9003ef1579689e 100644 (file)
@@ -361,7 +361,7 @@ class OSDStub : public TestStub
     stringstream ss;
     ss << "client-osd" << whoami;
     messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami),
-                                     ss.str().c_str(), getpid()));
+                                     ss.str().c_str(), getpid(), 0));
 
     Throttle throttler(g_ceph_context, "osd_client_bytes",
        g_conf->osd_client_message_size_cap);
index d502cca405b0daedcf81e7ddcd5de51c18bd0722..b567743df847984d4b82590f7332b9894a43c0d8 100644 (file)
@@ -130,7 +130,7 @@ class MessengerClient {
     addr.parse(serveraddr.c_str());
     addr.set_nonce(0);
     for (int i = 0; i < jobs; ++i) {
-      Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
+      Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
       msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
       entity_inst_t inst(entity_name_t::OSD(0), addr);
       ConnectionRef conn = msgr->get_connection(inst);
index eefaf9a29236273040f5861f936d27d49feccf61..381cd52812b4dde6c8672f65e99e02ffec21f3fe 100644 (file)
@@ -116,7 +116,7 @@ class MessengerServer {
  public:
   MessengerServer(string t, string addr, int threads, int delay):
       msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
-    msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
+    msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0);
     msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
   }
   ~MessengerServer() {
index b2ac1433f4a8b91623e15ed0ec22cb606f76dfaf..e1e7c7933394caa12047bf3f207027141136dc27 100644 (file)
@@ -65,8 +65,8 @@ class MessengerTest : public ::testing::TestWithParam<const char*> {
   MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
   virtual void SetUp() {
     lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
-    server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
-    client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
+    server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
+    client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
     server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
     client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0));
   }
@@ -952,7 +952,7 @@ class SyntheticWorkload {
     char addr[64];
     for (int i = 0; i < servers; ++i) {
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
-                               "server", getpid()+i);
+                               "server", getpid()+i, 0);
       snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
       bind_addr.parse(addr);
       msgr->bind(bind_addr);
@@ -966,7 +966,7 @@ class SyntheticWorkload {
 
     for (int i = 0; i < clients; ++i) {
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
-                               "client", getpid()+i+servers);
+                               "client", getpid()+i+servers, 0);
       if (cli_policy.standby) {
         snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
         bind_addr.parse(addr);
@@ -1431,7 +1431,7 @@ class MarkdownDispatcher : public Dispatcher {
 
 // Markdown with external lock
 TEST_P(MessengerTest, MarkdownTest) {
-  Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+  Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
   MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
   bind_addr.parse("127.0.0.1:16800");
index a5da6dc6c98663d67bb869d153479a8233921b66..fbd82a64165e0275da57fd113209aa7dcf654d71 100644 (file)
@@ -57,8 +57,8 @@ TEST(TestOSDScrub, scrub_time_permit) {
              g_conf->osd_data,
              g_conf->osd_journal);
   Messenger *ms = Messenger::create(g_ceph_context, g_conf->ms_type,
-             entity_name_t::OSD(0), "make_checker",
-             getpid());
+                                   entity_name_t::OSD(0), "make_checker",
+                                   getpid(), 0);
   ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
   ms->set_default_policy(Messenger::Policy::stateless_server(0, 0));
   ms->bind(g_conf->public_addr);