]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: use std::map for tracking connections
authorKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 03:05:26 +0000 (11:05 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 16:10:32 +0000 (00:10 +0800)
* s/list/map/ for better lookup performance.
* and connection management related changes

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/net/Messenger.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index db01ac247a4bd1783145970741e342386824f89d..989659f7d1025e51859047eae4704dd0e0904d9d 100644 (file)
@@ -60,10 +60,8 @@ class Messenger {
     }
     return ++global_seq;
   }
-  ConnectionRef lookup_conn(const entity_addr_t&) {
-    // TODO: replace handling
-    return nullptr;
-  }
+  virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0;
+  virtual void unregister_conn(ConnectionRef) = 0;
 
   // @returns a tuple of <is_valid, auth_reply, session_key>
   virtual seastar::future<msgr_tag_t,    /// tag for error, 0 if authorized
index e820d058ba7697b8e421cd187c27c7db3125a405..b0f74a594ce2e3868edf1b229b615ea6b71bcf83 100644 (file)
@@ -139,6 +139,11 @@ void SocketConnection::read_tags_until_next_message()
           return seastar::make_ready_future<seastar::stop_iteration>(
               seastar::stop_iteration::no);
         });
+    }).handle_exception_type([this] (const std::system_error& e) {
+      if (e.code() == error::read_eof) {
+        close();
+      }
+      throw e;
     }).then_wrapped([this] (auto fut) {
       // satisfy the message promise
       fut.forward_to(std::move(on_message));
@@ -296,6 +301,7 @@ seastar::future<> SocketConnection::send(MessageRef msg)
 
 seastar::future<> SocketConnection::close()
 {
+  get_messenger()->unregister_conn(this);
   return seastar::when_all(in.close(), out.close()).discard_result();
 }
 
@@ -819,6 +825,9 @@ seastar::future<> SocketConnection::server_handshake()
 
 seastar::future<> SocketConnection::fault()
 {
+  if (policy.lossy) {
+    get_messenger()->unregister_conn(this);
+  }
   if (h.backoff.count()) {
     h.backoff += h.backoff;
   } else {
index ebfc715191f915b1cf8b3b81a518078c0caab950..5a9d283eab0eccceb4b389ed3328e974384e8155 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 
+#include <tuple>
 #include "auth/Auth.h"
 #include "SocketMessenger.h"
 #include "SocketConnection.h"
@@ -40,7 +41,9 @@ void SocketMessenger::bind(const entity_addr_t& addr)
 
 seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
 {
-  connections.push_back(conn);
+  auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+  std::ignore = i;
+  assert(added);
 
   return seastar::repeat([=] {
       return conn->read_message()
@@ -118,13 +121,8 @@ seastar::future<ceph::net::ConnectionRef>
 SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
                         const entity_addr_t& myaddr, entity_type_t host_type)
 {
-  if (auto found = std::find_if(connections.begin(),
-                               connections.end(),
-                               [&addr](auto conn) {
-                                 return conn->get_peer_addr() == addr;
-                               });
-      found != connections.end()) {
-    return seastar::make_ready_future<ceph::net::ConnectionRef>(*found);
+  if (auto found = lookup_conn(addr); found) {
+    return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
   }
   return seastar::connect(addr.in4_addr())
     .then([=] (seastar::connected_socket socket) {
@@ -153,11 +151,30 @@ seastar::future<> SocketMessenger::shutdown()
     listener->abort_accept();
   }
   return seastar::parallel_for_each(connections.begin(), connections.end(),
-    [this] (ConnectionRef conn) {
-      return conn->close();
+    [this] (auto conn) {
+      return conn.second->close();
     }).finally([this] { connections.clear(); });
 }
 
+ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+{
+  if (auto found = connections.find(addr);
+      found != connections.end()) {
+    return found->second;
+  } else {
+    return nullptr;
+  }
+}
+
+void SocketMessenger::unregister_conn(ConnectionRef conn)
+{
+  assert(conn);
+  auto found = connections.find(conn->get_peer_addr());
+  assert(found != connections.end());
+  assert(found->second == conn);
+  connections.erase(found);
+}
+
 seastar::future<msgr_tag_t, bufferlist>
 SocketMessenger::verify_authorizer(peer_type_t peer_type,
                                   auth_proto_t protocol,
index 34bf375f66220d51bfe7fdd52d6ce4276ba83d2e..68e5dc7382c4a0d5e935f2d93d2dc31cced838a1 100644 (file)
@@ -14,7 +14,7 @@
 
 #pragma once
 
-#include <list>
+#include <map>
 #include <boost/optional.hpp>
 #include <core/reactor.hh>
 
@@ -26,8 +26,7 @@ class SocketMessenger final : public Messenger {
   boost::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
   uint32_t global_seq = 0;
-
-  std::list<ConnectionRef> connections;
+  std::map<entity_addr_t, ConnectionRef> connections;
 
   seastar::future<> dispatch(ConnectionRef conn);
 
@@ -47,6 +46,8 @@ class SocketMessenger final : public Messenger {
                                         entity_type_t host_type) override;
 
   seastar::future<> shutdown() override;
+  ConnectionRef lookup_conn(const entity_addr_t& addr) override;
+  void unregister_conn(ConnectionRef) override;
   seastar::future<msgr_tag_t, bufferlist>
   verify_authorizer(peer_type_t peer_type,
                    auth_proto_t protocol,