}
}
+seastar::future<> SocketConnection::maybe_throttle()
+{
+ if (!policy.throttler_bytes) {
+ return seastar::now();
+ }
+ const auto to_read = (m.header.front_len +
+ m.header.middle_len +
+ m.header.data_len);
+ return policy.throttler_bytes->get(to_read);
+}
+
seastar::future<MessageRef> SocketConnection::read_message()
{
return on_message.get_future()
// read header
return read(sizeof(m.header));
}).then([this] (bufferlist bl) {
+ // throttle the traffic, maybe
auto p = bl.cbegin();
::decode(m.header, p);
+ return maybe_throttle();
}).then([this] {
// read front
return read(m.header.front_len);
#include "msg/Policy.h"
#include "Connection.h"
+#include "crimson/thread/Throttle.h"
class AuthSessionHandler;
/// header will follow
seastar::promise<> on_message;
+ seastar::future<> maybe_throttle();
void read_tags_until_next_message();
seastar::future<seastar::stop_iteration> handle_ack();
/// encode/write a message
seastar::future<> write_message(MessageRef msg);
- ceph::net::Policy policy;
+ ceph::net::Policy<ceph::thread::Throttle> policy;
uint64_t features;
void set_features(uint64_t new_features) {
features = new_features;
}
seastar::future<ceph::net::ConnectionRef>
-SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
- const entity_addr_t& myaddr, entity_type_t host_type)
+SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
{
if (auto found = lookup_conn(addr); found) {
return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
std::move(socket));
// complete the handshake before returning to the caller
- return conn->client_handshake(peer_type, host_type)
+ return conn->client_handshake(peer_type, get_myname().type())
.handle_exception([conn] (std::exception_ptr eptr) {
// close the connection before returning errors
return seastar::make_exception_future<>(eptr)
}).finally([this] { connections.clear(); });
}
+void SocketMessenger::set_default_policy(const SocketPolicy& p)
+{
+ policy_set.set_default(p);
+}
+
+void SocketMessenger::set_policy(entity_type_t peer_type,
+ const SocketPolicy& p)
+{
+ policy_set.set(peer_type, p);
+}
+
+void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
+ Throttle* throttle)
+{
+ // only byte throttler is used in OSD
+ policy_set.set_throttlers(peer_type, throttle, nullptr);
+}
+
ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
{
if (auto found = connections.find(addr);
#include <boost/optional.hpp>
#include <core/reactor.hh>
+#include "msg/Policy.h"
#include "Messenger.h"
+#include "crimson/thread/Throttle.h"
namespace ceph::net {
+using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
+
class SocketMessenger final : public Messenger {
boost::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
uint32_t global_seq = 0;
std::map<entity_addr_t, ConnectionRef> connections;
+ using Throttle = ceph::thread::Throttle;
+ ceph::net::PolicySet<Throttle> policy_set;
seastar::future<> dispatch(ConnectionRef conn);
seastar::future<> start(Dispatcher *dispatcher) override;
seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
- entity_type_t peer_type,
- const entity_addr_t& myaddr,
- entity_type_t host_type) override;
+ entity_type_t peer_type) override;
seastar::future<> shutdown() override;
+ void set_default_policy(const SocketPolicy& p);
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p);
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
ConnectionRef lookup_conn(const entity_addr_t& addr) override;
void unregister_conn(ConnectionRef) override;
seastar::future<msgr_tag_t, bufferlist>
--- /dev/null
+set(crimson_thread_srcs
+ Throttle.cc)
+add_library(crimson_thread_objs OBJECT ${crimson_thread_srcs})
+target_compile_definitions(crimson_thread_objs
+ PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_COMPILE_DEFINITIONS>)
+target_include_directories(crimson_thread_objs
+ PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_INCLUDE_DIRECTORIES>)
--- /dev/null
+#include "Throttle.h"
+
+namespace ceph::thread {
+
+int64_t Throttle::take(int64_t c)
+{
+ if (!max) {
+ return 0;
+ }
+ count += c;
+ return count;
+}
+
+int64_t Throttle::put(int64_t c)
+{
+ if (!max) {
+ return 0;
+ }
+ if (!c) {
+ return count;
+ }
+ on_free_slots.signal();
+ count -= c;
+ return count;
+}
+
+seastar::future<> Throttle::get(size_t c)
+{
+ if (!max) {
+ return seastar::now();
+ }
+ return on_free_slots.wait([this, c] {
+ return !_should_wait(c);
+ }).then([this, c] {
+ count += c;
+ return seastar::now();
+ });
+}
+
+void Throttle::reset_max(size_t m) {
+ if (max == m) {
+ return;
+ }
+
+ if (m > max) {
+ on_free_slots.signal();
+ }
+ max = m;
+}
+
+bool Throttle::_should_wait(size_t c) const {
+ if (!max) {
+ return false;
+ }
+ return ((c <= max && count + c > max) || // normally stay under max
+ (c >= max && count > max)); // except for large c
+}
+
+} // namespace ceph::thread::seastar
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <core/condition-variable.hh>
+
+#include "common/ThrottleInterface.h"
+
+namespace ceph::thread {
+
+class Throttle final : public ThrottleInterface {
+ size_t max = 0;
+ size_t count = 0;
+ // we cannot change the "count" of seastar::semaphore after it is created,
+ // so use condition_variable instead.
+ seastar::condition_variable on_free_slots;
+public:
+ explicit Throttle(size_t m)
+ : max(m)
+ {}
+ int64_t take(int64_t c = 1) override;
+ int64_t put(int64_t c = 1) override;
+ seastar::future<> get(size_t c);
+ size_t get_current() const {
+ return count;
+ }
+ size_t get_max() const {
+ return max;
+ }
+ void reset_max(size_t m);
+private:
+ bool _should_wait(size_t c) const;
+};
+
+} // namespace ceph::thread