]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
src/msg: extract Policy into its own header
authorKefu Chai <kchai@redhat.com>
Tue, 13 Mar 2018 02:28:55 +0000 (10:28 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 06:09:22 +0000 (14:09 +0800)
and templaterize it. as we need to share Policy between seastar app and
non-seastar apps. and the Throttle interface for seastar is different
from that for non-seastar, so we should templaterize the Policy and
PolicySet.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mon/OSDMonitor.cc
src/msg/Messenger.h
src/msg/Policy.h [new file with mode: 0644]
src/msg/SimplePolicyMessenger.h

index 081cbcfe2f5e5da6050fa448b556565c492adf97..21086ebb8c31a8550088b19fff6060b3193c4bfc 100644 (file)
@@ -542,7 +542,7 @@ void OSDMonitor::update_msgr_features()
     uint64_t features = osdmap.get_features(*q, &mask);
     if ((mon->messenger->get_policy(*q).features_required & mask) != features) {
       dout(0) << "crush map has features " << features << ", adjusting msgr requires" << dendl;
-      Messenger::Policy p = mon->messenger->get_policy(*q);
+      ceph::net::Policy p = mon->messenger->get_policy(*q);
       p.features_required = (p.features_required & ~mask) | features;
       mon->messenger->set_policy(*q, p);
     }
index 2c8b94c7d7fc22fd34ed847c947b19d1e8475db7..338db67b7aee514c95fcf44b4de964f41dbe411c 100644 (file)
 
 #include "Message.h"
 #include "Dispatcher.h"
-#include "common/Mutex.h"
+#include "Policy.h"
 #include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Throttle.h"
 #include "include/Context.h"
 #include "include/types.h"
 #include "include/ceph_features.h"
@@ -36,7 +38,6 @@
 
 class Timer;
 
-
 class Messenger {
 private:
   list<Dispatcher*> dispatchers;
@@ -75,69 +76,7 @@ public:
   CephContext *cct;
   int crcflags;
 
-  /**
-   * A Policy describes the rules of a Connection. Is there a limit on how
-   * much data this Connection can have locally? When the underlying connection
-   * experiences an error, does the Connection disappear? Can this Messenger
-   * re-establish the underlying connection?
-   */
-  struct Policy {
-    /// If true, the Connection is tossed out on errors.
-    bool lossy;
-    /// If true, the underlying connection can't be re-established from this end.
-    bool server;
-    /// If true, we will standby when idle
-    bool standby;
-    /// If true, we will try to detect session resets
-    bool resetcheck;
-    /**
-     *  The throttler is used to limit how much data is held by Messages from
-     *  the associated Connection(s). When reading in a new Message, the Messenger
-     *  will call throttler->throttle() for the size of the new Message.
-     */
-    Throttle *throttler_bytes;
-    Throttle *throttler_messages;
-
-    /// Specify features supported locally by the endpoint.
-    uint64_t features_supported;
-    /// Specify features any remotes must have to talk to this endpoint.
-    uint64_t features_required;
-
-    Policy()
-      : lossy(false), server(false), standby(false), resetcheck(true),
-       throttler_bytes(NULL),
-       throttler_messages(NULL),
-       features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
-       features_required(0) {}
-  private:
-    Policy(bool l, bool s, bool st, bool r, uint64_t req)
-      : lossy(l), server(s), standby(st), resetcheck(r),
-       throttler_bytes(NULL),
-       throttler_messages(NULL),
-       features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
-       features_required(req) {}
-
-  public:
-    static Policy stateful_server(uint64_t req) {
-      return Policy(false, true, true, true, req);
-    }
-    static Policy stateless_server(uint64_t req) {
-      return Policy(true, true, false, false, req);
-    }
-    static Policy lossless_peer(uint64_t req) {
-      return Policy(false, false, true, false, req);
-    }
-    static Policy lossless_peer_reuse(uint64_t req) {
-      return Policy(false, false, true, true, req);
-    }
-    static Policy lossy_client(uint64_t req) {
-      return Policy(true, false, false, false, req);
-    }
-    static Policy lossless_client(uint64_t req) {
-      return Policy(false, false, false, true, req);
-    }
-  };
-
+  using Policy = ceph::net::Policy<Throttle>;
   /**
    * Messenger constructor. Call this from your implementation.
    * Messenger users should construct full implementations directly,
diff --git a/src/msg/Policy.h b/src/msg/Policy.h
new file mode 100644 (file)
index 0000000..571da8c
--- /dev/null
@@ -0,0 +1,117 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "include/ceph_features.h"
+
+namespace ceph::net {
+
+using peer_type_t = int;
+
+/**
+ * A Policy describes the rules of a Connection. Is there a limit on how
+ * much data this Connection can have locally? When the underlying connection
+ * experiences an error, does the Connection disappear? Can this Messenger
+ * re-establish the underlying connection?
+ */
+template<class ThrottleType>
+struct Policy {
+  /// If true, the Connection is tossed out on errors.
+  bool lossy;
+  /// If true, the underlying connection can't be re-established from this end.
+  bool server;
+  /// If true, we will standby when idle
+  bool standby;
+  /// If true, we will try to detect session resets
+  bool resetcheck;
+  /**
+   *  The throttler is used to limit how much data is held by Messages from
+   *  the associated Connection(s). When reading in a new Message, the Messenger
+   *  will call throttler->throttle() for the size of the new Message.
+   */
+  ThrottleType* throttler_bytes;
+  ThrottleType* throttler_messages;
+  
+  /// Specify features supported locally by the endpoint.
+  uint64_t features_supported;
+  /// Specify features any remotes must have to talk to this endpoint.
+  uint64_t features_required;
+  
+  Policy()
+    : lossy(false), server(false), standby(false), resetcheck(true),
+      throttler_bytes(NULL),
+      throttler_messages(NULL),
+      features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
+      features_required(0) {}
+private:
+  Policy(bool l, bool s, bool st, bool r, uint64_t req)
+    : lossy(l), server(s), standby(st), resetcheck(r),
+      throttler_bytes(NULL),
+      throttler_messages(NULL),
+      features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
+      features_required(req) {}
+  
+public:
+  static Policy stateful_server(uint64_t req) {
+    return Policy(false, true, true, true, req);
+  }
+  static Policy stateless_server(uint64_t req) {
+    return Policy(true, true, false, false, req);
+  }
+  static Policy lossless_peer(uint64_t req) {
+    return Policy(false, false, true, false, req);
+  }
+  static Policy lossless_peer_reuse(uint64_t req) {
+    return Policy(false, false, true, true, req);
+  }
+  static Policy lossy_client(uint64_t req) {
+    return Policy(true, false, false, false, req);
+  }
+  static Policy lossless_client(uint64_t req) {
+    return Policy(false, false, false, true, req);
+  }
+};
+
+template<class ThrottleType>
+class PolicySet {
+  using policy_t = Policy<ThrottleType> ;
+  /// the default Policy we use for Pipes
+  policy_t default_policy;
+  /// map specifying different Policies for specific peer types
+  map<int, policy_t> policy_map; // entity_name_t::type -> Policy
+
+public:
+  const policy_t& get(peer_type_t peer_type) const {
+    if (auto found = policy_map.find(peer_type); found != policy_map.end()) {
+      return found->second;
+    } else {
+      return default_policy;
+    }
+  }
+  policy_t& get(peer_type_t peer_type) {
+    if (auto found = policy_map.find(peer_type); found != policy_map.end()) {
+      return found->second;
+    } else {
+      return default_policy;
+    }
+  }
+  void set(peer_type_t peer_type, const policy_t& p) {
+    policy_map[peer_type] = p;
+  }
+  const policy_t& get_default() const {
+    return default_policy;
+  }
+  void set_default(const policy_t& p) {
+    default_policy = p;
+  }
+  void set_throttlers(peer_type_t peer_type,
+                      ThrottleType* byte_throttle,
+                      ThrottleType* msg_throttle) {
+    auto& policy = get(peer_type);
+    policy.throttler_bytes = byte_throttle;
+    policy.throttler_messages = msg_throttle;
+  }
+};
+
+}
index 466eb1d34c44cae2f8c6bf7d3ef434b41aa8a822..2e9b84ec1ab744afb647f5fd3592a5b4d35407e9 100644 (file)
 #define SIMPLE_POLICY_MESSENGER_H
 
 #include "Messenger.h"
+#include "Policy.h"
 
 class SimplePolicyMessenger : public Messenger
 {
 private:
   /// lock protecting policy
   Mutex policy_lock;
-  /// the default Policy we use for Pipes
-  Policy default_policy;
-  /// map specifying different Policies for specific peer types
-  map<int, Policy> policy_map; // entity_name_t::type -> Policy
+  // entity_name_t::type -> Policy
+  ceph::net::PolicySet<Throttle> policy_set;
 
 public:
 
@@ -45,17 +44,12 @@ public:
    */
   Policy get_policy(int t) override {
     Mutex::Locker l(policy_lock);
-    map<int, Policy>::iterator iter =
-      policy_map.find(t);
-    if (iter != policy_map.end())
-      return iter->second;
-    else
-      return default_policy;
+    return policy_set.get(t);
   }
 
   Policy get_default_policy() override {
     Mutex::Locker l(policy_lock);
-    return default_policy;
+    return policy_set.get_default();
   }
 
   /**
@@ -68,7 +62,7 @@ public:
    */
   void set_default_policy(Policy p) override {
     Mutex::Locker l(policy_lock);
-    default_policy = p;
+    policy_set.set_default(p);
   }
   /**
    * Set a policy which is applied to all peers of the given type.
@@ -80,7 +74,7 @@ public:
    */
   void set_policy(int type, Policy p) override {
     Mutex::Locker l(policy_lock);
-    policy_map[type] = p;
+    policy_set.set(type, p);
   }
 
   /**
@@ -95,18 +89,10 @@ public:
    * you destroy SimpleMessenger.
    */
   void set_policy_throttlers(int type,
-                            Throttle *byte_throttle,
-                            Throttle *msg_throttle) override {
+                            Throttlebyte_throttle,
+                            Throttlemsg_throttle) override {
     Mutex::Locker l(policy_lock);
-    map<int, Policy>::iterator iter =
-      policy_map.find(type);
-    if (iter != policy_map.end()) {
-      iter->second.throttler_bytes = byte_throttle;
-      iter->second.throttler_messages = msg_throttle;
-    } else {
-      default_policy.throttler_bytes = byte_throttle;
-      default_policy.throttler_messages = msg_throttle;
-    }
+    policy_set.set_throttlers(type, byte_throttle, msg_throttle);
   }
 
 }; /* SimplePolicyMessenger */