]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: set policy from peer_type
authorYingxin Cheng <yingxincheng@gmail.com>
Mon, 1 Apr 2019 14:03:03 +0000 (22:03 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:19 +0000 (11:21 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Messenger.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/perf_crimson_msgr.cc
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc

index ed6037ca27356de8aaa687c78a92fc91cb5d1a35..fd423482e2b1c5b27182ae6b19a35bdecc0b4677 100644 (file)
@@ -111,6 +111,10 @@ class Messenger {
 
   virtual void print(ostream& out) const = 0;
 
+  virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
+
+  virtual SocketPolicy get_default_policy() const = 0;
+
   virtual void set_default_policy(const SocketPolicy& p) = 0;
 
   virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;
index 115f0618006af92f75c0d26189221c64b57cbdb9..6ac9caf3cc7d6b3f648c1ac037d030a4be81ade5 100644 (file)
@@ -72,8 +72,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
   conn.peer_type = _peer_type;
-  // TODO: lossless policy
-  conn.policy = SocketPolicy::lossy_client(0);
+  conn.policy = messenger.get_policy(_peer_type);
   messenger.register_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   execute_connecting();
@@ -1192,8 +1191,7 @@ void ProtocolV2::execute_accepting()
           ceph_assert(conn.get_peer_type() == -1);
           conn.peer_type = _peer_type;
 
-          // TODO: lossless policy
-          conn.policy = SocketPolicy::stateless_server(0);
+          conn.policy = messenger.get_policy(_peer_type);
           logger().debug("{} accept of host type {}, lossy={} server={} standby={} resetcheck={}",
                          conn, (int)_peer_type,
                          conn.policy.lossy, conn.policy.server,
index b10b99975158f4ab92dd746f7fd0ac5c5072b2be..8c1886fef68d823035f9f96a3580c2215078c876 100644 (file)
@@ -215,6 +215,16 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f
   return set_myaddrs(entity_addrvec_t{addr});
 }
 
+SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
+{
+  return policy_set.get(peer_type);
+}
+
+SocketPolicy SocketMessenger::get_default_policy() const
+{
+  return policy_set.get_default();
+}
+
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
 {
   policy_set.set_default(p);
index 729729194a6ac0b6ff8c26827b8bd39160f00d7b..6903d26dca75c47bf0a62e9b787a0c50ca7d4439 100644 (file)
@@ -91,6 +91,10 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
         << ") " << get_myaddr();
   }
 
+  SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+  SocketPolicy get_default_policy() const override;
+
   void set_default_policy(const SocketPolicy& p) override;
 
   void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
index ac161a1206deac19d7d42659f26aff1b3b17bc4c..0d3f15e55801b847190968d16567d2e58dbc0aa7 100644 (file)
@@ -85,6 +85,7 @@ static seastar::future<> run(unsigned rounds,
           return fut.then([&server, addr](ceph::net::Messenger *messenger) {
               return server.container().invoke_on_all([messenger](auto& server) {
                   server.msgr = messenger->get_local_shard();
+                  server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
                   server.msgr->set_auth_client(&server.dummy_auth);
                   server.msgr->set_auth_server(&server.dummy_auth);
                 }).then([messenger, addr] {
@@ -185,6 +186,7 @@ static seastar::future<> run(unsigned rounds,
             return ceph::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid)
             .then([&client] (ceph::net::Messenger *messenger) {
               client.msgr = messenger;
+              client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
               client.msgr->set_auth_client(&client.dummy_auth);
               client.msgr->set_auth_server(&client.dummy_auth);
               return client.msgr->start(&client);
index bcec1a8455cf1722ca754c74969e290933f16202..8e3272cab81ca8936b2f05ab988e72a8554fcba4 100644 (file)
@@ -162,6 +162,7 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
           [addr, count](auto& server) mutable {
             std::cout << "server listening at " << addr << std::endl;
             // bind the server
+            server.msgr.set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
             server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
                                              &server.byte_throttler);
             server.msgr.set_auth_client(&server.dummy_auth);
@@ -186,6 +187,7 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
         return seastar::do_with(seastar_pingpong::Client{*msgr},
           [addr, count](auto& client) {
             std::cout << "client sending to " << addr << std::endl;
+            client.msgr.set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
             client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
                                              &client.byte_throttler);
             client.msgr.set_auth_client(&client.dummy_auth);
index 0601ad63da7a2eb600985cffcb03976988a29d6b..de80376b8630a58fd542275371729126af50dafe 100644 (file)
@@ -62,6 +62,7 @@ static seastar::future<> test_echo(unsigned rounds,
         return fut.then([this, addr](ceph::net::Messenger *messenger) {
             return container().invoke_on_all([messenger](auto& server) {
                 server.msgr = messenger->get_local_shard();
+                server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
                 server.msgr->set_auth_client(&server.dummy_auth);
                 server.msgr->set_auth_server(&server.dummy_auth);
               }).then([messenger, addr] {
@@ -151,6 +152,7 @@ static seastar::future<> test_echo(unsigned rounds,
           .then([this](ceph::net::Messenger *messenger) {
             return container().invoke_on_all([messenger](auto& client) {
                 client.msgr = messenger->get_local_shard();
+                client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
                 client.msgr->set_auth_client(&client.dummy_auth);
                 client.msgr->set_auth_server(&client.dummy_auth);
               }).then([this, messenger] {
@@ -332,6 +334,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
           .then([this, addr](ceph::net::Messenger *messenger) {
             return container().invoke_on_all([messenger](auto& server) {
                 server.msgr = messenger->get_local_shard();
+                server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
                 server.msgr->set_auth_client(&server.dummy_auth);
                 server.msgr->set_auth_server(&server.dummy_auth);
               }).then([messenger, addr] {
@@ -363,6 +366,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
           .then([this](ceph::net::Messenger *messenger) {
             return container().invoke_on_all([messenger](auto& client) {
                 client.msgr = messenger->get_local_shard();
+                client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
                 client.msgr->set_auth_client(&client.dummy_auth);
                 client.msgr->set_auth_server(&client.dummy_auth);
               }).then([this, messenger] {