From a8e3448fef6f130a5510e660772443f949b56a25 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 13 Mar 2018 10:28:55 +0800 Subject: [PATCH] src/msg: extract Policy into its own header and templaterize it. as we need to share Policy between seastar app and non-seastar apps. and the Throttle interface for seastar is different from that for non-seastar, so we should templaterize the Policy and PolicySet. Signed-off-by: Kefu Chai --- src/mon/OSDMonitor.cc | 2 +- src/msg/Messenger.h | 69 ++----------------- src/msg/Policy.h | 117 ++++++++++++++++++++++++++++++++ src/msg/SimplePolicyMessenger.h | 34 +++------- 4 files changed, 132 insertions(+), 90 deletions(-) create mode 100644 src/msg/Policy.h diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 081cbcfe2f5..21086ebb8c3 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -542,7 +542,7 @@ void OSDMonitor::update_msgr_features() uint64_t features = osdmap.get_features(*q, &mask); if ((mon->messenger->get_policy(*q).features_required & mask) != features) { dout(0) << "crush map has features " << features << ", adjusting msgr requires" << dendl; - Messenger::Policy p = mon->messenger->get_policy(*q); + ceph::net::Policy p = mon->messenger->get_policy(*q); p.features_required = (p.features_required & ~mask) | features; mon->messenger->set_policy(*q, p); } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 2c8b94c7d7f..338db67b7ae 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -21,8 +21,10 @@ #include "Message.h" #include "Dispatcher.h" -#include "common/Mutex.h" +#include "Policy.h" #include "common/Cond.h" +#include "common/Mutex.h" +#include "common/Throttle.h" #include "include/Context.h" #include "include/types.h" #include "include/ceph_features.h" @@ -36,7 +38,6 @@ class Timer; - class Messenger { private: list dispatchers; @@ -75,69 +76,7 @@ public: CephContext *cct; int crcflags; - /** - * A Policy describes the rules of a Connection. Is there a limit on how - * much data this Connection can have locally? When the underlying connection - * experiences an error, does the Connection disappear? Can this Messenger - * re-establish the underlying connection? - */ - struct Policy { - /// If true, the Connection is tossed out on errors. - bool lossy; - /// If true, the underlying connection can't be re-established from this end. - bool server; - /// If true, we will standby when idle - bool standby; - /// If true, we will try to detect session resets - bool resetcheck; - /** - * The throttler is used to limit how much data is held by Messages from - * the associated Connection(s). When reading in a new Message, the Messenger - * will call throttler->throttle() for the size of the new Message. - */ - Throttle *throttler_bytes; - Throttle *throttler_messages; - - /// Specify features supported locally by the endpoint. - uint64_t features_supported; - /// Specify features any remotes must have to talk to this endpoint. - uint64_t features_required; - - Policy() - : lossy(false), server(false), standby(false), resetcheck(true), - throttler_bytes(NULL), - throttler_messages(NULL), - features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), - features_required(0) {} - private: - Policy(bool l, bool s, bool st, bool r, uint64_t req) - : lossy(l), server(s), standby(st), resetcheck(r), - throttler_bytes(NULL), - throttler_messages(NULL), - features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), - features_required(req) {} - - public: - static Policy stateful_server(uint64_t req) { - return Policy(false, true, true, true, req); - } - static Policy stateless_server(uint64_t req) { - return Policy(true, true, false, false, req); - } - static Policy lossless_peer(uint64_t req) { - return Policy(false, false, true, false, req); - } - static Policy lossless_peer_reuse(uint64_t req) { - return Policy(false, false, true, true, req); - } - static Policy lossy_client(uint64_t req) { - return Policy(true, false, false, false, req); - } - static Policy lossless_client(uint64_t req) { - return Policy(false, false, false, true, req); - } - }; - + using Policy = ceph::net::Policy; /** * Messenger constructor. Call this from your implementation. * Messenger users should construct full implementations directly, diff --git a/src/msg/Policy.h b/src/msg/Policy.h new file mode 100644 index 00000000000..571da8cf919 --- /dev/null +++ b/src/msg/Policy.h @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "include/ceph_features.h" + +namespace ceph::net { + +using peer_type_t = int; + +/** + * A Policy describes the rules of a Connection. Is there a limit on how + * much data this Connection can have locally? When the underlying connection + * experiences an error, does the Connection disappear? Can this Messenger + * re-establish the underlying connection? + */ +template +struct Policy { + /// If true, the Connection is tossed out on errors. + bool lossy; + /// If true, the underlying connection can't be re-established from this end. + bool server; + /// If true, we will standby when idle + bool standby; + /// If true, we will try to detect session resets + bool resetcheck; + /** + * The throttler is used to limit how much data is held by Messages from + * the associated Connection(s). When reading in a new Message, the Messenger + * will call throttler->throttle() for the size of the new Message. + */ + ThrottleType* throttler_bytes; + ThrottleType* throttler_messages; + + /// Specify features supported locally by the endpoint. + uint64_t features_supported; + /// Specify features any remotes must have to talk to this endpoint. + uint64_t features_required; + + Policy() + : lossy(false), server(false), standby(false), resetcheck(true), + throttler_bytes(NULL), + throttler_messages(NULL), + features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), + features_required(0) {} +private: + Policy(bool l, bool s, bool st, bool r, uint64_t req) + : lossy(l), server(s), standby(st), resetcheck(r), + throttler_bytes(NULL), + throttler_messages(NULL), + features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), + features_required(req) {} + +public: + static Policy stateful_server(uint64_t req) { + return Policy(false, true, true, true, req); + } + static Policy stateless_server(uint64_t req) { + return Policy(true, true, false, false, req); + } + static Policy lossless_peer(uint64_t req) { + return Policy(false, false, true, false, req); + } + static Policy lossless_peer_reuse(uint64_t req) { + return Policy(false, false, true, true, req); + } + static Policy lossy_client(uint64_t req) { + return Policy(true, false, false, false, req); + } + static Policy lossless_client(uint64_t req) { + return Policy(false, false, false, true, req); + } +}; + +template +class PolicySet { + using policy_t = Policy ; + /// the default Policy we use for Pipes + policy_t default_policy; + /// map specifying different Policies for specific peer types + map policy_map; // entity_name_t::type -> Policy + +public: + const policy_t& get(peer_type_t peer_type) const { + if (auto found = policy_map.find(peer_type); found != policy_map.end()) { + return found->second; + } else { + return default_policy; + } + } + policy_t& get(peer_type_t peer_type) { + if (auto found = policy_map.find(peer_type); found != policy_map.end()) { + return found->second; + } else { + return default_policy; + } + } + void set(peer_type_t peer_type, const policy_t& p) { + policy_map[peer_type] = p; + } + const policy_t& get_default() const { + return default_policy; + } + void set_default(const policy_t& p) { + default_policy = p; + } + void set_throttlers(peer_type_t peer_type, + ThrottleType* byte_throttle, + ThrottleType* msg_throttle) { + auto& policy = get(peer_type); + policy.throttler_bytes = byte_throttle; + policy.throttler_messages = msg_throttle; + } +}; + +} diff --git a/src/msg/SimplePolicyMessenger.h b/src/msg/SimplePolicyMessenger.h index 466eb1d34c4..2e9b84ec1ab 100644 --- a/src/msg/SimplePolicyMessenger.h +++ b/src/msg/SimplePolicyMessenger.h @@ -17,16 +17,15 @@ #define SIMPLE_POLICY_MESSENGER_H #include "Messenger.h" +#include "Policy.h" class SimplePolicyMessenger : public Messenger { private: /// lock protecting policy Mutex policy_lock; - /// the default Policy we use for Pipes - Policy default_policy; - /// map specifying different Policies for specific peer types - map policy_map; // entity_name_t::type -> Policy + // entity_name_t::type -> Policy + ceph::net::PolicySet policy_set; public: @@ -45,17 +44,12 @@ public: */ Policy get_policy(int t) override { Mutex::Locker l(policy_lock); - map::iterator iter = - policy_map.find(t); - if (iter != policy_map.end()) - return iter->second; - else - return default_policy; + return policy_set.get(t); } Policy get_default_policy() override { Mutex::Locker l(policy_lock); - return default_policy; + return policy_set.get_default(); } /** @@ -68,7 +62,7 @@ public: */ void set_default_policy(Policy p) override { Mutex::Locker l(policy_lock); - default_policy = p; + policy_set.set_default(p); } /** * Set a policy which is applied to all peers of the given type. @@ -80,7 +74,7 @@ public: */ void set_policy(int type, Policy p) override { Mutex::Locker l(policy_lock); - policy_map[type] = p; + policy_set.set(type, p); } /** @@ -95,18 +89,10 @@ public: * you destroy SimpleMessenger. */ void set_policy_throttlers(int type, - Throttle *byte_throttle, - Throttle *msg_throttle) override { + Throttle* byte_throttle, + Throttle* msg_throttle) override { Mutex::Locker l(policy_lock); - map::iterator iter = - policy_map.find(type); - if (iter != policy_map.end()) { - iter->second.throttler_bytes = byte_throttle; - iter->second.throttler_messages = msg_throttle; - } else { - default_policy.throttler_bytes = byte_throttle; - default_policy.throttler_messages = msg_throttle; - } + policy_set.set_throttlers(type, byte_throttle, msg_throttle); } }; /* SimplePolicyMessenger */ -- 2.39.5