uint64_t features = osdmap.get_features(*q, &mask);
if ((mon->messenger->get_policy(*q).features_required & mask) != features) {
dout(0) << "crush map has features " << features << ", adjusting msgr requires" << dendl;
- Messenger::Policy p = mon->messenger->get_policy(*q);
+ ceph::net::Policy p = mon->messenger->get_policy(*q);
p.features_required = (p.features_required & ~mask) | features;
mon->messenger->set_policy(*q, p);
}
#include "Message.h"
#include "Dispatcher.h"
-#include "common/Mutex.h"
+#include "Policy.h"
#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Throttle.h"
#include "include/Context.h"
#include "include/types.h"
#include "include/ceph_features.h"
class Timer;
-
class Messenger {
private:
list<Dispatcher*> dispatchers;
CephContext *cct;
int crcflags;
- /**
- * A Policy describes the rules of a Connection. Is there a limit on how
- * much data this Connection can have locally? When the underlying connection
- * experiences an error, does the Connection disappear? Can this Messenger
- * re-establish the underlying connection?
- */
- struct Policy {
- /// If true, the Connection is tossed out on errors.
- bool lossy;
- /// If true, the underlying connection can't be re-established from this end.
- bool server;
- /// If true, we will standby when idle
- bool standby;
- /// If true, we will try to detect session resets
- bool resetcheck;
- /**
- * The throttler is used to limit how much data is held by Messages from
- * the associated Connection(s). When reading in a new Message, the Messenger
- * will call throttler->throttle() for the size of the new Message.
- */
- Throttle *throttler_bytes;
- Throttle *throttler_messages;
-
- /// Specify features supported locally by the endpoint.
- uint64_t features_supported;
- /// Specify features any remotes must have to talk to this endpoint.
- uint64_t features_required;
-
- Policy()
- : lossy(false), server(false), standby(false), resetcheck(true),
- throttler_bytes(NULL),
- throttler_messages(NULL),
- features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
- features_required(0) {}
- private:
- Policy(bool l, bool s, bool st, bool r, uint64_t req)
- : lossy(l), server(s), standby(st), resetcheck(r),
- throttler_bytes(NULL),
- throttler_messages(NULL),
- features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
- features_required(req) {}
-
- public:
- static Policy stateful_server(uint64_t req) {
- return Policy(false, true, true, true, req);
- }
- static Policy stateless_server(uint64_t req) {
- return Policy(true, true, false, false, req);
- }
- static Policy lossless_peer(uint64_t req) {
- return Policy(false, false, true, false, req);
- }
- static Policy lossless_peer_reuse(uint64_t req) {
- return Policy(false, false, true, true, req);
- }
- static Policy lossy_client(uint64_t req) {
- return Policy(true, false, false, false, req);
- }
- static Policy lossless_client(uint64_t req) {
- return Policy(false, false, false, true, req);
- }
- };
-
+ using Policy = ceph::net::Policy<Throttle>;
/**
* Messenger constructor. Call this from your implementation.
* Messenger users should construct full implementations directly,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "include/ceph_features.h"
+
+namespace ceph::net {
+
+using peer_type_t = int;
+
+/**
+ * A Policy describes the rules of a Connection. Is there a limit on how
+ * much data this Connection can have locally? When the underlying connection
+ * experiences an error, does the Connection disappear? Can this Messenger
+ * re-establish the underlying connection?
+ */
+template<class ThrottleType>
+struct Policy {
+ /// If true, the Connection is tossed out on errors.
+ bool lossy;
+ /// If true, the underlying connection can't be re-established from this end.
+ bool server;
+ /// If true, we will standby when idle
+ bool standby;
+ /// If true, we will try to detect session resets
+ bool resetcheck;
+ /**
+ * The throttler is used to limit how much data is held by Messages from
+ * the associated Connection(s). When reading in a new Message, the Messenger
+ * will call throttler->throttle() for the size of the new Message.
+ */
+ ThrottleType* throttler_bytes;
+ ThrottleType* throttler_messages;
+
+ /// Specify features supported locally by the endpoint.
+ uint64_t features_supported;
+ /// Specify features any remotes must have to talk to this endpoint.
+ uint64_t features_required;
+
+ Policy()
+ : lossy(false), server(false), standby(false), resetcheck(true),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
+ features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
+ features_required(0) {}
+private:
+ Policy(bool l, bool s, bool st, bool r, uint64_t req)
+ : lossy(l), server(s), standby(st), resetcheck(r),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
+ features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
+ features_required(req) {}
+
+public:
+ static Policy stateful_server(uint64_t req) {
+ return Policy(false, true, true, true, req);
+ }
+ static Policy stateless_server(uint64_t req) {
+ return Policy(true, true, false, false, req);
+ }
+ static Policy lossless_peer(uint64_t req) {
+ return Policy(false, false, true, false, req);
+ }
+ static Policy lossless_peer_reuse(uint64_t req) {
+ return Policy(false, false, true, true, req);
+ }
+ static Policy lossy_client(uint64_t req) {
+ return Policy(true, false, false, false, req);
+ }
+ static Policy lossless_client(uint64_t req) {
+ return Policy(false, false, false, true, req);
+ }
+};
+
+template<class ThrottleType>
+class PolicySet {
+ using policy_t = Policy<ThrottleType> ;
+ /// the default Policy we use for Pipes
+ policy_t default_policy;
+ /// map specifying different Policies for specific peer types
+ map<int, policy_t> policy_map; // entity_name_t::type -> Policy
+
+public:
+ const policy_t& get(peer_type_t peer_type) const {
+ if (auto found = policy_map.find(peer_type); found != policy_map.end()) {
+ return found->second;
+ } else {
+ return default_policy;
+ }
+ }
+ policy_t& get(peer_type_t peer_type) {
+ if (auto found = policy_map.find(peer_type); found != policy_map.end()) {
+ return found->second;
+ } else {
+ return default_policy;
+ }
+ }
+ void set(peer_type_t peer_type, const policy_t& p) {
+ policy_map[peer_type] = p;
+ }
+ const policy_t& get_default() const {
+ return default_policy;
+ }
+ void set_default(const policy_t& p) {
+ default_policy = p;
+ }
+ void set_throttlers(peer_type_t peer_type,
+ ThrottleType* byte_throttle,
+ ThrottleType* msg_throttle) {
+ auto& policy = get(peer_type);
+ policy.throttler_bytes = byte_throttle;
+ policy.throttler_messages = msg_throttle;
+ }
+};
+
+}
#define SIMPLE_POLICY_MESSENGER_H
#include "Messenger.h"
+#include "Policy.h"
class SimplePolicyMessenger : public Messenger
{
private:
/// lock protecting policy
Mutex policy_lock;
- /// the default Policy we use for Pipes
- Policy default_policy;
- /// map specifying different Policies for specific peer types
- map<int, Policy> policy_map; // entity_name_t::type -> Policy
+ // entity_name_t::type -> Policy
+ ceph::net::PolicySet<Throttle> policy_set;
public:
*/
Policy get_policy(int t) override {
Mutex::Locker l(policy_lock);
- map<int, Policy>::iterator iter =
- policy_map.find(t);
- if (iter != policy_map.end())
- return iter->second;
- else
- return default_policy;
+ return policy_set.get(t);
}
Policy get_default_policy() override {
Mutex::Locker l(policy_lock);
- return default_policy;
+ return policy_set.get_default();
}
/**
*/
void set_default_policy(Policy p) override {
Mutex::Locker l(policy_lock);
- default_policy = p;
+ policy_set.set_default(p);
}
/**
* Set a policy which is applied to all peers of the given type.
*/
void set_policy(int type, Policy p) override {
Mutex::Locker l(policy_lock);
- policy_map[type] = p;
+ policy_set.set(type, p);
}
/**
* you destroy SimpleMessenger.
*/
void set_policy_throttlers(int type,
- Throttle *byte_throttle,
- Throttle *msg_throttle) override {
+ Throttle* byte_throttle,
+ Throttle* msg_throttle) override {
Mutex::Locker l(policy_lock);
- map<int, Policy>::iterator iter =
- policy_map.find(type);
- if (iter != policy_map.end()) {
- iter->second.throttler_bytes = byte_throttle;
- iter->second.throttler_messages = msg_throttle;
- } else {
- default_policy.throttler_bytes = byte_throttle;
- default_policy.throttler_messages = msg_throttle;
- }
+ policy_set.set_throttlers(type, byte_throttle, msg_throttle);
}
}; /* SimplePolicyMessenger */