]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: connect() return non-futurized ConnectionRef
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 5 Feb 2020 04:55:11 +0000 (12:55 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Feb 2020 02:47:47 +0000 (10:47 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
12 files changed:
src/crimson/mgr/client.cc
src/crimson/mon/MonClient.cc
src/crimson/net/Messenger.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/shard_services.cc
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc
src/tools/crimson/perf_crimson_msgr.cc

index 5243338c27790a8865274d0436129c8222577923..638525cd75804fe027009643afe78f1daa7ff9b0 100644 (file)
@@ -72,14 +72,11 @@ seastar::future<> Client::reconnect()
       return seastar::now();
     }
     auto peer = mgrmap.get_active_addrs().front();
-    return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then(
-      [this](auto _conn) {
-        conn = _conn;
-        // ask for the mgrconfigure message
-        auto m = ceph::make_message<MMgrOpen>();
-        m->daemon_name = local_conf()->name.get_id();
-        return conn->send(std::move(m));
-      });
+    conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
+    // ask for the mgrconfigure message
+    auto m = ceph::make_message<MMgrOpen>();
+    m->daemon_name = local_conf()->name.get_id();
+    return conn->send(std::move(m));
   });
 }
 
index 4c1790ce0c97264c1821b6851e70d197d1be65d2..8a2a116544c9f2155646be122050eab248a00044 100644 (file)
@@ -942,8 +942,9 @@ seastar::future<> Client::reopen_session(int rank)
 #warning fixme
     auto peer = monmap.get_addrs(rank).front();
     logger().info("connecting to mon.{}", rank);
-    return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
-      [this] (auto conn) -> seastar::future<Connection::AuthResult> {
+    return seastar::futurize_apply(
+        [peer, this] () -> seastar::future<Connection::AuthResult> {
+      auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
       auto& mc = pending_conns.emplace_back(
        std::make_unique<Connection>(auth_registry, conn, &keyring));
       if (conn->get_peer_addr().is_msgr2()) {
index 9f6fcf3658b3b5a1e60bd351c892dcdc46a14e64..0ad93ea35c89fca8dde43654b0440510b577dd2b 100644 (file)
@@ -76,7 +76,7 @@ public:
 
   /// either return an existing connection to the peer,
   /// or a new pending connection
-  virtual seastar::future<ConnectionRef>
+  virtual ConnectionRef
   connect(const entity_addr_t& peer_addr,
           const entity_type_t& peer_type) = 0;
 
index 75ba6bc70673996037c5f592d4abed4300654799..807511712f017617b23e265754688594a9bea6ff 100644 (file)
@@ -131,7 +131,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
   return seastar::now();
 }
 
-seastar::future<crimson::net::ConnectionRef>
+crimson::net::ConnectionRef
 SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
 {
   assert(seastar::engine().cpu_id() == master_sid);
@@ -141,12 +141,12 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
   ceph_assert(peer_addr.get_port() > 0);
 
   if (auto found = lookup_conn(peer_addr); found) {
-    return seastar::make_ready_future<ConnectionRef>(found->shared_from_this());
+    return found->shared_from_this();
   }
   SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
       *this, *dispatcher, peer_addr.is_msgr2());
   conn->start_connect(peer_addr, peer_type);
-  return seastar::make_ready_future<ConnectionRef>(conn->shared_from_this());
+  return conn->shared_from_this();
 }
 
 seastar::future<> SocketMessenger::shutdown()
index cf5fa7d1de20ea86f46261d537ae5dd53d6ee82e..d1b86e16a2abada7b1640e19b979922313f9bca2 100644 (file)
@@ -65,8 +65,8 @@ class SocketMessenger final : public Messenger {
 
   seastar::future<> start(Dispatcher *dispatcher) override;
 
-  seastar::future<ConnectionRef> connect(const entity_addr_t& peer_addr,
-                                         const entity_type_t& peer_type) override;
+  ConnectionRef connect(const entity_addr_t& peer_addr,
+                        const entity_type_t& peer_type) override;
   // can only wait once
   seastar::future<> wait() override {
     assert(seastar::engine().cpu_id() == master_sid);
index 92079f3c8a565aff22fb6b19edef8a2c5f4556ed..3341c0fe0397e7ee30f2515b55e6ef4287389d94 100644 (file)
@@ -93,7 +93,7 @@ void Heartbeat::set_require_authorizer(bool require_authorizer)
   }
 }
 
-seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
 {
   auto [peer_info, added] = peers.try_emplace(peer);
   auto& info = peer_info->second;
@@ -102,17 +102,10 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
     logger().info("add_peer({})", peer);
     auto osdmap = service.get_osdmap_service().get_map();
     // TODO: use addrs
-    return seastar::when_all_succeed(
-        front_msgr->connect(osdmap->get_hb_front_addrs(peer).front(),
-                            CEPH_ENTITY_TYPE_OSD),
-        back_msgr->connect(osdmap->get_hb_back_addrs(peer).front(),
-                           CEPH_ENTITY_TYPE_OSD))
-      .then([&info=peer_info->second] (auto con_front, auto con_back) {
-        info.con_front = con_front;
-        info.con_back = con_back;
-      });
-  } else {
-    return seastar::now();
+    peer_info->second.con_front = front_msgr->connect(
+        osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+    peer_info->second.con_back = back_msgr->connect(
+        osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
   }
 }
 
@@ -143,7 +136,7 @@ seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
     });
 }
 
-seastar::future<> Heartbeat::add_reporter_peers(int whoami)
+void Heartbeat::add_reporter_peers(int whoami)
 {
   auto osdmap = service.get_osdmap_service().get_map();
   // include next and previous up osds to ensure we have a fully-connected set
@@ -160,20 +153,18 @@ seastar::future<> Heartbeat::add_reporter_peers(int whoami)
   auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
   osdmap->get_random_up_osds_by_subtree(
     whoami, subtree, min_down, want, &want);
-  return seastar::parallel_for_each(
-    std::move(want),
-    [epoch=osdmap->get_epoch(), this](int osd) {
-      return add_peer(osd, epoch);
-  });
+  auto epoch = osdmap->get_epoch();
+  for (int osd : want) {
+    add_peer(osd, epoch);
+  };
 }
 
 seastar::future<> Heartbeat::update_peers(int whoami)
 {
   const auto min_peers = static_cast<size_t>(
     local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
-  return add_reporter_peers(whoami).then([this] {
-    return remove_down_peers();
-  }).then([=](osds_t&& extra) {
+  add_reporter_peers(whoami);
+  return remove_down_peers().then([=](osds_t&& extra) {
     // too many?
     struct iteration_state {
       osds_t::const_iterator where;
@@ -197,11 +188,10 @@ seastar::future<> Heartbeat::update_peers(int whoami)
       next = osdmap->get_next_up_osd_after(next)) {
       want.push_back(next);
     }
-    return seastar::parallel_for_each(
-      std::move(want),
-      [epoch=osdmap->get_epoch(), this](int osd) {
-        return add_peer(osd, epoch);
-    });
+    auto epoch = osdmap->get_epoch();
+    for (int osd : want) {
+      add_peer(osd, epoch);
+    }
   });
 }
 
@@ -242,7 +232,7 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
   const auto peer = found->first;
   const auto epoch = found->second.epoch;
   return remove_peer(peer).then([peer, epoch, this] {
-    return add_peer(peer, epoch);
+    add_peer(peer, epoch);
   });
 }
 
index b8ff3bcccc681a794590cb0ba4c4a26f487a7b39..036299f38990204586ed9ed2934b32990352fa98 100644 (file)
@@ -34,7 +34,7 @@ public:
                          entity_addrvec_t back);
   seastar::future<> stop();
 
-  seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
+  void add_peer(osd_id_t peer, epoch_t epoch);
   seastar::future<> update_peers(int whoami);
   seastar::future<> remove_peer(osd_id_t peer);
 
@@ -67,7 +67,7 @@ private:
   /// @return peers not needed in this epoch
   seastar::future<osds_t> remove_down_peers();
   /// add enough reporters for fast failure detection
-  seastar::future<> add_reporter_peers(int whoami);
+  void add_reporter_peers(int whoami);
 
   seastar::future<> start_messenger(crimson::net::Messenger& msgr,
                                    const entity_addrvec_t& addrs);
index 59fa5fbcaf4baf3f2709a92d3e1cb44f53bc3fd6..b226ab90256cfa8a35ebf7fe040c4378910596a2 100644 (file)
@@ -976,25 +976,20 @@ seastar::future<> OSD::update_heartbeat_peers()
   if (!state.is_active()) {
     return seastar::now();
   }
-  return seastar::parallel_for_each(
-    pg_map.get_pgs(),
-    [this](auto& pg) {
-      vector<int> up, acting;
-      osdmap->pg_to_up_acting_osds(pg.first.pgid,
-                                  &up, nullptr,
-                                  &acting, nullptr);
-      return seastar::parallel_for_each(
-        boost::join(up, acting),
-        [this](int osd) {
-          if (osd == CRUSH_ITEM_NONE || osd == whoami) {
-            return seastar::now();
-          } else {
-            return heartbeat->add_peer(osd, osdmap->get_epoch());
-          }
-        });
-    }).then([this] {
-      return heartbeat->update_peers(whoami);
-    });
+  for (auto& pg : pg_map.get_pgs()) {
+    vector<int> up, acting;
+    osdmap->pg_to_up_acting_osds(pg.first.pgid,
+                                 &up, nullptr,
+                                 &acting, nullptr);
+    for (int osd : boost::join(up, acting)) {
+      if (osd == CRUSH_ITEM_NONE || osd == whoami) {
+        continue;
+      } else {
+        heartbeat->add_peer(osd, osdmap->get_epoch());
+      }
+    }
+  }
+  return heartbeat->update_peers(whoami);
 }
 
 seastar::future<> OSD::handle_peering_op(
index 784fca6fa5dc6e0393a0e1531eb0249f5dce93ac..3804280b712ab0d7214171c748a0778e0a570416 100644 (file)
@@ -57,11 +57,9 @@ seastar::future<> ShardServices::send_to_osd(
                    osdmap->get_info(peer).up_from, from_epoch);
     return seastar::now();
   } else {
-    return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
-      CEPH_ENTITY_TYPE_OSD)
-      .then([m, this] (auto conn) {
-             return conn->send(m);
-           });
+    auto conn = cluster_msgr.connect(
+        osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+    return conn->send(m);
   }
 }
 
index 1a8b2039c56ab806b5d67006f8ea8d580b8e4f28..06f12f6cc39286b4c9bb3a329b98867b01cea0b8 100644 (file)
@@ -187,9 +187,9 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
       client.msgr->set_require_authorizer(false);
       client.msgr->set_auth_client(&client.dummy_auth);
       client.msgr->set_auth_server(&client.dummy_auth);
-      return client.msgr->start(&client.dispatcher).then([addr, &client] {
-        return client.msgr->connect(addr, entity_name_t::TYPE_OSD);
-      }).then([&disp=client.dispatcher, count](crimson::net::ConnectionRef conn) {
+      return client.msgr->start(&client.dispatcher).then(
+          [addr, &client, &disp=client.dispatcher, count] {
+        auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
         return seastar::do_until(
           [&disp,count] { return disp.count >= count; },
           [&disp,conn] {
index ebb3910783138af4df4c188590ee93012ce51f08..68d6da744dd0684f9fa3e10ddb13acfe717268a2 100644 (file)
@@ -146,17 +146,15 @@ static seastar::future<> test_echo(unsigned rounds,
 
       seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
         mono_time start_time = mono_clock::now();
-        return msgr->connect(peer_addr, entity_name_t::TYPE_OSD
-        ).then([this, start_time](auto conn) {
-          return seastar::futurize_apply([this, conn] {
-            return do_dispatch_pingpong(conn.get());
-          }).finally([this, conn, start_time] {
-            auto session = find_session(conn.get());
-            std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
-            std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
-            logger().info("{}: handshake {}, pingpong {}",
-                          *conn, dur_handshake.count(), dur_pingpong.count());
-          });
+        auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+        return seastar::futurize_apply([this, conn] {
+          return do_dispatch_pingpong(conn.get());
+        }).finally([this, conn, start_time] {
+          auto session = find_session(conn.get());
+          std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+          std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+          logger().info("{}: handshake {}, pingpong {}",
+                        *conn, dur_handshake.count(), dur_pingpong.count());
         });
       }
 
@@ -326,9 +324,8 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       server->init(entity_name_t::OSD(4), "server3", 5, addr),
       client->init(entity_name_t::OSD(5), "client3", 6)
   ).then([server, client] {
-    return client->msgr->connect(server->msgr->get_myaddr(),
-                                 entity_name_t::TYPE_OSD);
-  }).then([](crimson::net::ConnectionRef conn) {
+    auto conn = client->msgr->connect(server->msgr->get_myaddr(),
+                                      entity_name_t::TYPE_OSD);
     // send two messages
     return conn->send(make_message<MPing>()).then([conn] {
       return conn->send(make_message<MPing>());
@@ -402,20 +399,18 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
         msgr->set_auth_server(&dummy_auth);
         return msgr->start(this);
       }
-      seastar::future<> send_pings(const entity_addr_t& addr) {
-        return msgr->connect(addr, entity_name_t::TYPE_OSD
-        ).then([this](crimson::net::ConnectionRef conn) {
-          // forwarded to stopped_send_promise
-          (void) seastar::do_until(
-            [this] { return stop_send; },
-            [this, conn] {
-              return conn->send(make_message<MPing>()).then([] {
-                return seastar::sleep(0ms);
-              });
-            }
-          ).then_wrapped([this, conn] (auto fut) {
-            fut.forward_to(std::move(stopped_send_promise));
-          });
+      void send_pings(const entity_addr_t& addr) {
+        auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
+        // forwarded to stopped_send_promise
+        (void) seastar::do_until(
+          [this] { return stop_send; },
+          [this, conn] {
+            return conn->send(make_message<MPing>()).then([] {
+              return seastar::sleep(0ms);
+            });
+          }
+        ).then_wrapped([this, conn] (auto fut) {
+          fut.forward_to(std::move(stopped_send_promise));
         });
       }
       seastar::future<> shutdown() {
@@ -442,8 +437,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
     server->init(entity_name_t::OSD(6), "server4", 7, addr),
     client->init(entity_name_t::OSD(7), "client4", 8)
   ).then([server, client] {
-    return client->send_pings(server->get_addr());
-  }).then([] {
+    client->send_pings(server->get_addr());
     return seastar::sleep(100ms);
   }).then([client] {
     logger().info("client shutdown...");
@@ -1079,28 +1073,26 @@ class FailoverSuite : public Dispatcher {
  public:
   seastar::future<> connect_peer() {
     logger().info("[Test] connect_peer({})", test_peer_addr);
-    return test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD
-    ).then([this] (auto conn) {
-      auto result = interceptor.find_result(conn);
-      ceph_assert(result != nullptr);
-
-      if (tracked_conn) {
-        if (tracked_conn->is_closed()) {
-          ceph_assert(tracked_conn != conn);
-          logger().info("[Test] this is a new session replacing an closed one");
-        } else {
-          ceph_assert(tracked_index == result->index);
-          ceph_assert(tracked_conn == conn);
-          logger().info("[Test] this is not a new session");
-        }
+    auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
+    auto result = interceptor.find_result(conn);
+    ceph_assert(result != nullptr);
+
+    if (tracked_conn) {
+      if (tracked_conn->is_closed()) {
+        ceph_assert(tracked_conn != conn);
+        logger().info("[Test] this is a new session replacing an closed one");
       } else {
-        logger().info("[Test] this is a new session");
+        ceph_assert(tracked_index == result->index);
+        ceph_assert(tracked_conn == conn);
+        logger().info("[Test] this is not a new session");
       }
-      tracked_index = result->index;
-      tracked_conn = conn;
+    } else {
+      logger().info("[Test] this is a new session");
+    }
+    tracked_index = result->index;
+    tracked_conn = conn;
 
-      return flush_pending_send();
-    });
+    return flush_pending_send();
   }
 
   seastar::future<> send_peer() {
@@ -1247,9 +1239,7 @@ class FailoverTest : public Dispatcher {
     cmd_msgr->set_auth_server(&dummy_auth);
     return cmd_msgr->start(this).then([this, cmd_peer_addr] {
       logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
-      return cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
-    }).then([this] (auto conn) {
-      cmd_conn = conn;
+      cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
       return pingpong();
     });
   }
@@ -1441,24 +1431,21 @@ class FailoverSuitePeer : public Dispatcher {
 
   seastar::future<> connect_peer(entity_addr_t addr) {
     logger().info("[TestPeer] connect_peer({})", addr);
-    return peer_msgr->connect(addr, entity_name_t::TYPE_OSD
-    ).then([this] (auto conn) {
-      auto new_tracked_conn = conn;
-      if (tracked_conn) {
-        if (tracked_conn->is_closed()) {
-          ceph_assert(tracked_conn != new_tracked_conn);
-          logger().info("[TestPeer] this is a new session"
-                        " replacing an closed one");
-        } else {
-          ceph_assert(tracked_conn == new_tracked_conn);
-          logger().info("[TestPeer] this is not a new session");
-        }
+    auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+    if (tracked_conn) {
+      if (tracked_conn->is_closed()) {
+        ceph_assert(tracked_conn != new_tracked_conn);
+        logger().info("[TestPeer] this is a new session"
+                      " replacing an closed one");
       } else {
-        logger().info("[TestPeer] this is a new session");
+        ceph_assert(tracked_conn == new_tracked_conn);
+        logger().info("[TestPeer] this is not a new session");
       }
-      tracked_conn = new_tracked_conn;
-      return flush_pending_send();
-    });
+    } else {
+      logger().info("[TestPeer] this is a new session");
+    }
+    tracked_conn = new_tracked_conn;
+    return flush_pending_send();
   }
 
   seastar::future<> send_peer() {
index 0c24f3b24c8bb6f2c2ad66f92f4ee60f7e62de7d..a47fa874412464bb0d949522673aee364689848b 100644 (file)
@@ -366,12 +366,9 @@ static seastar::future<> run(
           // start clients in active cores (#1 ~ #jobs)
           if (client.is_active()) {
             mono_time start_time = mono_clock::now();
-            return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD
-            ).then([&client] (auto conn) {
-              client.active_conn = conn;
-              // make sure handshake won't hurt the performance
-              return seastar::sleep(1s);
-            }).then([&client, start_time] {
+            client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+            // make sure handshake won't hurt the performance
+            return seastar::sleep(1s).then([&client, start_time] {
               if (client.conn_stats.connected_time == mono_clock::zero()) {
                 logger().error("\n{} not connected after 1s!\n", client.lname);
                 ceph_assert(false);