From: Kefu Chai Date: Sat, 26 May 2018 05:44:23 +0000 (+0800) Subject: crimson: add throttler X-Git-Tag: v14.0.1~1115^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=243eeee9dd484a156eacbbeee3029b3862a60f1b;p=ceph.git crimson: add throttler Signed-off-by: Kefu Chai --- diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index b0f74a594ce2..7e0a4294bf29 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -180,6 +180,17 @@ void SocketConnection::requeue_sent() } } +seastar::future<> SocketConnection::maybe_throttle() +{ + if (!policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return policy.throttler_bytes->get(to_read); +} + seastar::future SocketConnection::read_message() { return on_message.get_future() @@ -187,8 +198,10 @@ seastar::future SocketConnection::read_message() // read header return read(sizeof(m.header)); }).then([this] (bufferlist bl) { + // throttle the traffic, maybe auto p = bl.cbegin(); ::decode(m.header, p); + return maybe_throttle(); }).then([this] { // read front return read(m.header.front_len); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 30a54dc5810e..54e794627b56 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -18,6 +18,7 @@ #include "msg/Policy.h" #include "Connection.h" +#include "crimson/thread/Throttle.h" class AuthSessionHandler; @@ -91,6 +92,7 @@ class SocketConnection : public Connection { /// header will follow seastar::promise<> on_message; + seastar::future<> maybe_throttle(); void read_tags_until_next_message(); seastar::future handle_ack(); @@ -102,7 +104,7 @@ class SocketConnection : public Connection { /// encode/write a message seastar::future<> write_message(MessageRef msg); - ceph::net::Policy policy; + ceph::net::Policy policy; uint64_t features; void set_features(uint64_t new_features) { features = new_features; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 5a9d283eab0e..9fc6cb008a45 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -118,8 +118,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) } seastar::future -SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type, - const entity_addr_t& myaddr, entity_type_t host_type) +SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type) { if (auto found = lookup_conn(addr); found) { return seastar::make_ready_future(found); @@ -129,7 +128,7 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type, ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr, std::move(socket)); // complete the handshake before returning to the caller - return conn->client_handshake(peer_type, host_type) + return conn->client_handshake(peer_type, get_myname().type()) .handle_exception([conn] (std::exception_ptr eptr) { // close the connection before returning errors return seastar::make_exception_future<>(eptr) @@ -156,6 +155,24 @@ seastar::future<> SocketMessenger::shutdown() }).finally([this] { connections.clear(); }); } +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { if (auto found = connections.find(addr); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 68e5dc7382c4..4945918e5028 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -18,15 +18,21 @@ #include #include +#include "msg/Policy.h" #include "Messenger.h" +#include "crimson/thread/Throttle.h" namespace ceph::net { +using SocketPolicy = ceph::net::Policy; + class SocketMessenger final : public Messenger { boost::optional listener; Dispatcher *dispatcher = nullptr; uint32_t global_seq = 0; std::map connections; + using Throttle = ceph::thread::Throttle; + ceph::net::PolicySet policy_set; seastar::future<> dispatch(ConnectionRef conn); @@ -41,11 +47,12 @@ class SocketMessenger final : public Messenger { seastar::future<> start(Dispatcher *dispatcher) override; seastar::future connect(const entity_addr_t& addr, - entity_type_t peer_type, - const entity_addr_t& myaddr, - entity_type_t host_type) override; + entity_type_t peer_type) override; seastar::future<> shutdown() override; + void set_default_policy(const SocketPolicy& p); + void set_policy(entity_type_t peer_type, const SocketPolicy& p); + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); ConnectionRef lookup_conn(const entity_addr_t& addr) override; void unregister_conn(ConnectionRef) override; seastar::future diff --git a/src/crimson/thread/CMakeLists.txt b/src/crimson/thread/CMakeLists.txt new file mode 100644 index 000000000000..ff2e5e44fc5e --- /dev/null +++ b/src/crimson/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +set(crimson_thread_srcs + Throttle.cc) +add_library(crimson_thread_objs OBJECT ${crimson_thread_srcs}) +target_compile_definitions(crimson_thread_objs + PUBLIC $) +target_include_directories(crimson_thread_objs + PUBLIC $) diff --git a/src/crimson/thread/Throttle.cc b/src/crimson/thread/Throttle.cc new file mode 100644 index 000000000000..1d67e7231743 --- /dev/null +++ b/src/crimson/thread/Throttle.cc @@ -0,0 +1,59 @@ +#include "Throttle.h" + +namespace ceph::thread { + +int64_t Throttle::take(int64_t c) +{ + if (!max) { + return 0; + } + count += c; + return count; +} + +int64_t Throttle::put(int64_t c) +{ + if (!max) { + return 0; + } + if (!c) { + return count; + } + on_free_slots.signal(); + count -= c; + return count; +} + +seastar::future<> Throttle::get(size_t c) +{ + if (!max) { + return seastar::now(); + } + return on_free_slots.wait([this, c] { + return !_should_wait(c); + }).then([this, c] { + count += c; + return seastar::now(); + }); +} + +void Throttle::reset_max(size_t m) { + if (max == m) { + return; + } + + if (m > max) { + on_free_slots.signal(); + } + max = m; +} + +bool Throttle::_should_wait(size_t c) const { + if (!max) { + return false; + } + return ((c <= max && count + c > max) || // normally stay under max + (c >= max && count > max)); // except for large c +} + +} // namespace ceph::thread::seastar diff --git a/src/crimson/thread/Throttle.h b/src/crimson/thread/Throttle.h new file mode 100644 index 000000000000..a3c6f463c6a8 --- /dev/null +++ b/src/crimson/thread/Throttle.h @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "common/ThrottleInterface.h" + +namespace ceph::thread { + +class Throttle final : public ThrottleInterface { + size_t max = 0; + size_t count = 0; + // we cannot change the "count" of seastar::semaphore after it is created, + // so use condition_variable instead. + seastar::condition_variable on_free_slots; +public: + explicit Throttle(size_t m) + : max(m) + {} + int64_t take(int64_t c = 1) override; + int64_t put(int64_t c = 1) override; + seastar::future<> get(size_t c); + size_t get_current() const { + return count; + } + size_t get_max() const { + return max; + } + void reset_max(size_t m); +private: + bool _should_wait(size_t c) const; +}; + +} // namespace ceph::thread