]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add throttler
authorKefu Chai <kchai@redhat.com>
Sat, 26 May 2018 05:44:23 +0000 (13:44 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 16:13:58 +0000 (00:13 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/crimson/thread/CMakeLists.txt [new file with mode: 0644]
src/crimson/thread/Throttle.cc [new file with mode: 0644]
src/crimson/thread/Throttle.h [new file with mode: 0644]

index b0f74a594ce2e3868edf1b229b615ea6b71bcf83..7e0a4294bf2922454c41a2e2ebd860e726db0759 100644 (file)
@@ -180,6 +180,17 @@ void SocketConnection::requeue_sent()
   }
 }
 
+seastar::future<> SocketConnection::maybe_throttle()
+{
+  if (!policy.throttler_bytes) {
+    return seastar::now();
+  }
+  const auto to_read = (m.header.front_len +
+                        m.header.middle_len +
+                        m.header.data_len);
+  return policy.throttler_bytes->get(to_read);
+}
+
 seastar::future<MessageRef> SocketConnection::read_message()
 {
   return on_message.get_future()
@@ -187,8 +198,10 @@ seastar::future<MessageRef> SocketConnection::read_message()
       // read header
       return read(sizeof(m.header));
     }).then([this] (bufferlist bl) {
+      // throttle the traffic, maybe
       auto p = bl.cbegin();
       ::decode(m.header, p);
+      return maybe_throttle();
     }).then([this] {
       // read front
       return read(m.header.front_len);
index 30a54dc5810ebcabf626d14e08a2557671068171..54e794627b5665ab2aad389b56a333cf991342b7 100644 (file)
@@ -18,6 +18,7 @@
 
 #include "msg/Policy.h"
 #include "Connection.h"
+#include "crimson/thread/Throttle.h"
 
 class AuthSessionHandler;
 
@@ -91,6 +92,7 @@ class SocketConnection : public Connection {
   /// header will follow
   seastar::promise<> on_message;
 
+  seastar::future<> maybe_throttle();
   void read_tags_until_next_message();
   seastar::future<seastar::stop_iteration> handle_ack();
 
@@ -102,7 +104,7 @@ class SocketConnection : public Connection {
   /// encode/write a message
   seastar::future<> write_message(MessageRef msg);
 
-  ceph::net::Policy policy;
+  ceph::net::Policy<ceph::thread::Throttle> policy;
   uint64_t features;
   void set_features(uint64_t new_features) {
     features = new_features;
index 5a9d283eab0eccceb4b389ed3328e974384e8155..9fc6cb008a45afd6350be4578b5671fbb76ecf90 100644 (file)
@@ -118,8 +118,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
 }
 
 seastar::future<ceph::net::ConnectionRef>
-SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
-                        const entity_addr_t& myaddr, entity_type_t host_type)
+SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
 {
   if (auto found = lookup_conn(addr); found) {
     return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
@@ -129,7 +128,7 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
       ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
                                                 std::move(socket));
       // complete the handshake before returning to the caller
-      return conn->client_handshake(peer_type, host_type)
+      return conn->client_handshake(peer_type, get_myname().type())
         .handle_exception([conn] (std::exception_ptr eptr) {
           // close the connection before returning errors
           return seastar::make_exception_future<>(eptr)
@@ -156,6 +155,24 @@ seastar::future<> SocketMessenger::shutdown()
     }).finally([this] { connections.clear(); });
 }
 
+void SocketMessenger::set_default_policy(const SocketPolicy& p)
+{
+  policy_set.set_default(p);
+}
+
+void SocketMessenger::set_policy(entity_type_t peer_type,
+                                const SocketPolicy& p)
+{
+  policy_set.set(peer_type, p);
+}
+
+void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
+                                          Throttle* throttle)
+{
+  // only byte throttler is used in OSD
+  policy_set.set_throttlers(peer_type, throttle, nullptr);
+}
+
 ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
   if (auto found = connections.find(addr);
index 68e5dc7382c4a0d5e935f2d93d2dc31cced838a1..4945918e50284f81ca50331f0f2e8191dad81c0d 100644 (file)
 #include <boost/optional.hpp>
 #include <core/reactor.hh>
 
+#include "msg/Policy.h"
 #include "Messenger.h"
+#include "crimson/thread/Throttle.h"
 
 namespace ceph::net {
 
+using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
+
 class SocketMessenger final : public Messenger {
   boost::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
   uint32_t global_seq = 0;
   std::map<entity_addr_t, ConnectionRef> connections;
+  using Throttle = ceph::thread::Throttle;
+  ceph::net::PolicySet<Throttle> policy_set;
 
   seastar::future<> dispatch(ConnectionRef conn);
 
@@ -41,11 +47,12 @@ class SocketMessenger final : public Messenger {
   seastar::future<> start(Dispatcher *dispatcher) override;
 
   seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
-                                        entity_type_t peer_type,
-                                         const entity_addr_t& myaddr,
-                                        entity_type_t host_type) override;
+                                        entity_type_t peer_type) override;
 
   seastar::future<> shutdown() override;
+  void set_default_policy(const SocketPolicy& p);
+  void set_policy(entity_type_t peer_type, const SocketPolicy& p);
+  void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
   ConnectionRef lookup_conn(const entity_addr_t& addr) override;
   void unregister_conn(ConnectionRef) override;
   seastar::future<msgr_tag_t, bufferlist>
diff --git a/src/crimson/thread/CMakeLists.txt b/src/crimson/thread/CMakeLists.txt
new file mode 100644 (file)
index 0000000..ff2e5e4
--- /dev/null
@@ -0,0 +1,7 @@
+set(crimson_thread_srcs
+  Throttle.cc)
+add_library(crimson_thread_objs OBJECT ${crimson_thread_srcs})
+target_compile_definitions(crimson_thread_objs
+  PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_COMPILE_DEFINITIONS>)
+target_include_directories(crimson_thread_objs
+  PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_INCLUDE_DIRECTORIES>)
diff --git a/src/crimson/thread/Throttle.cc b/src/crimson/thread/Throttle.cc
new file mode 100644 (file)
index 0000000..1d67e72
--- /dev/null
@@ -0,0 +1,59 @@
+#include "Throttle.h"
+
+namespace ceph::thread {
+
+int64_t Throttle::take(int64_t c)
+{
+  if (!max) {
+    return 0;
+  }
+  count += c;
+  return count;
+}
+
+int64_t Throttle::put(int64_t c)
+{
+  if (!max) {
+    return 0;
+  }
+  if (!c) {
+    return count;
+  }
+  on_free_slots.signal();
+  count -= c;
+  return count;
+}
+
+seastar::future<> Throttle::get(size_t c)
+{
+  if (!max) {
+    return seastar::now();
+  }
+  return on_free_slots.wait([this, c] {
+    return !_should_wait(c);
+  }).then([this, c] {
+    count += c;
+    return seastar::now();
+  });
+}
+
+void Throttle::reset_max(size_t m) {
+  if (max == m) {
+    return;
+  }
+
+  if (m > max) {
+    on_free_slots.signal();
+  }
+  max = m;
+}
+
+bool Throttle::_should_wait(size_t c) const {
+  if (!max) {
+    return false;
+  }
+  return ((c <= max && count + c > max) || // normally stay under max
+          (c >= max && count > max));      // except for large c
+}
+
+} // namespace ceph::thread::seastar
diff --git a/src/crimson/thread/Throttle.h b/src/crimson/thread/Throttle.h
new file mode 100644 (file)
index 0000000..a3c6f46
--- /dev/null
@@ -0,0 +1,36 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <core/condition-variable.hh>
+
+#include "common/ThrottleInterface.h"
+
+namespace ceph::thread {
+
+class Throttle final : public ThrottleInterface {
+  size_t max = 0;
+  size_t count = 0;
+  // we cannot change the "count" of seastar::semaphore after it is created,
+  // so use condition_variable instead.
+  seastar::condition_variable on_free_slots;
+public:
+  explicit Throttle(size_t m)
+    : max(m)
+  {}
+  int64_t take(int64_t c = 1) override;
+  int64_t put(int64_t c = 1) override;
+  seastar::future<> get(size_t c);
+  size_t get_current() const {
+    return count;
+  }
+  size_t get_max() const {
+    return max;
+  }
+  void reset_max(size_t m);
+private:
+  bool _should_wait(size_t c) const;
+};
+
+} // namespace ceph::thread