]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: remove hardcoded msgr configuration 34603/head
authorchunmei-liu <chunmei.liu@intel.com>
Tue, 14 Apr 2020 04:56:10 +0000 (21:56 -0700)
committerchunmei-liu <chunmei.liu@intel.com>
Tue, 21 Apr 2020 00:11:29 +0000 (17:11 -0700)
remove crimson/net/Config.h and changed corresponding unit test case.

Signed-off-by: chunmei-liu <chunmei.liu@intel.com>
src/crimson/net/Config.h [deleted file]
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/thread/ThreadPool.cc
src/crimson/thread/ThreadPool.h
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_thread_pool.cc

diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h
deleted file mode 100644 (file)
index 229b689..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-// XXX: a poor man's md_config_t
-#pragma once
-
-#include "include/msgr.h"
-#include <chrono>
-
-namespace crimson::net {
-
-using namespace std::literals::chrono_literals;
-
-constexpr struct simple_md_config_t {
-  uint32_t host_type = CEPH_ENTITY_TYPE_OSD;
-  bool cephx_require_signatures = false;
-  bool cephx_cluster_require_signatures = false;
-  bool cephx_service_require_signatures = false;
-  bool ms_die_on_old_message = true;
-  bool ms_die_on_skipped_message = true;
-  double ms_initial_backoff = .2;
-  double ms_max_backoff = 15.0;
-  std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms;
-  size_t osd_client_message_size_cap = 500ULL << 20;
-} conf;
-}
index d0c677df9a55068637761ea23767b85c0d16accf..464a074b14e0a000bf6094b0cfc236e59f058e1f 100644 (file)
@@ -15,7 +15,6 @@
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/log.h"
-#include "Config.h"
 #include "Dispatcher.h"
 #include "Errors.h"
 #include "Socket.h"
@@ -25,6 +24,8 @@
 WRITE_RAW_ENCODER(ceph_msg_connect);
 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
 
+using crimson::common::local_conf;
+
 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
 {
   return out << "connect{features=" << std::hex << c.features << std::dec
@@ -517,14 +518,14 @@ bool ProtocolV1::require_auth_feature() const
   if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
     return false;
   }
-  if (conf.cephx_require_signatures) {
+  if (local_conf()->cephx_require_signatures) {
     return true;
   }
   if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
       h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
-    return conf.cephx_cluster_require_signatures;
+    return local_conf()->cephx_cluster_require_signatures;
   } else {
-    return conf.cephx_service_require_signatures;
+    return local_conf()->cephx_service_require_signatures;
   }
 }
 
index 3f590e66b0d57a7113e18659fa298a992ea82cdf..516a05ce6e4ad5915c0b9ee17943795012411086 100644 (file)
@@ -12,7 +12,6 @@
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/formatter.h"
 
-#include "Config.h"
 #include "Dispatcher.h"
 #include "Errors.h"
 #include "Socket.h"
@@ -24,6 +23,7 @@
 #endif
 
 using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
 
 namespace {
 
@@ -1960,14 +1960,14 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
       logger().error("{} got old message {} <= {} {} {}, discarding",
                      conn, message->get_seq(), cur_seq, message, *message);
       if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
-          conf.ms_die_on_old_message) {
+          local_conf()->ms_die_on_old_message) {
         ceph_assert(0 == "old msgs despite reconnect_seq feature");
       }
       return;
     } else if (message->get_seq() > cur_seq + 1) {
       logger().error("{} missed message? skipped from seq {} to {}",
                      conn, cur_seq, message->get_seq());
-      if (conf.ms_die_on_skipped_message) {
+      if (local_conf()->ms_die_on_skipped_message) {
         ceph_assert(0 == "skipped incoming seq");
       }
     }
@@ -2111,11 +2111,11 @@ void ProtocolV2::execute_wait(bool max_backoff)
   gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
-      backoff = conf.ms_max_backoff;
+      backoff = local_conf().get_val<double>("ms_max_backoff");
     } else if (backoff > 0) {
-      backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+      backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
     } else {
-      backoff = conf.ms_initial_backoff;
+      backoff = local_conf().get_val<double>("ms_initial_backoff");
     }
     return protocol_timer.backoff(backoff).then([this] {
       if (unlikely(state != state_t::WAIT)) {
index 4e80b8104d174520e154f961658c31eec8f968be..1595b75148de8f17e26e547283a577ed99efa5f1 100644 (file)
@@ -14,7 +14,6 @@
 
 #include "SocketConnection.h"
 
-#include "Config.h"
 #include "ProtocolV1.h"
 #include "ProtocolV2.h"
 #include "SocketMessenger.h"
@@ -24,6 +23,7 @@
 #endif
 
 using namespace crimson::net;
+using crimson::common::local_conf;
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    Dispatcher& dispatcher,
@@ -97,12 +97,12 @@ bool SocketConnection::update_rx_seq(seq_num_t seq)
 {
   if (seq <= in_seq) {
     if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
-        conf.ms_die_on_old_message) {
+        local_conf()->ms_die_on_old_message) {
       ceph_abort_msg("old msgs despite reconnect_seq feature");
     }
     return false;
   } else if (seq > in_seq + 1) {
-    if (conf.ms_die_on_skipped_message) {
+    if (local_conf()->ms_die_on_skipped_message) {
       ceph_abort_msg("skipped incoming seq");
     }
     return false;
index b4caaab2b570543db1d2b96eeca1a81f7eeac225..9914d8749f04142de781ece9710f0b678abc4a40 100644 (file)
@@ -1,10 +1,13 @@
 #include "ThreadPool.h"
 
+#include <chrono>
 #include <pthread.h>
-#include "crimson/net/Config.h"
 #include "include/intarith.h"
 
 #include "include/ceph_assert.h"
+#include "crimson/common/config_proxy.h"
+
+using crimson::common::local_conf;
 
 namespace crimson::thread {
 
@@ -14,10 +17,11 @@ ThreadPool::ThreadPool(size_t n_threads,
   : queue_size{round_up_to(queue_sz, seastar::smp::count)},
     pending{queue_size}
 {
+  auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
   for (size_t i = 0; i < n_threads; i++) {
-    threads.emplace_back([this, cpu_id] {
+    threads.emplace_back([this, cpu_id, queue_max_wait] {
       pin(cpu_id);
-      loop();
+      loop(queue_max_wait);
     });
   }
 }
@@ -39,14 +43,13 @@ void ThreadPool::pin(unsigned cpu_id)
   ceph_assert(r == 0);
 }
 
-void ThreadPool::loop()
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
 {
   for (;;) {
     WorkItem* work_item = nullptr;
     {
       std::unique_lock lock{mutex};
-      cond.wait_for(lock,
-                    crimson::net::conf.threadpool_empty_queue_max_wait,
+      cond.wait_for(lock, queue_max_wait,
                     [this, &work_item] {
         return pending.pop(work_item) || is_stopping();
       });
index 1f44715615a234dc612905e506eda7b882ec8ee7..8cf61799aab6ebaf2b416ddd7754040ed8c4e39e 100644 (file)
@@ -81,7 +81,7 @@ class ThreadPool {
   const size_t queue_size;
   boost::lockfree::queue<WorkItem*> pending;
 
-  void loop();
+  void loop(std::chrono::milliseconds queue_max_wait);
   bool is_stopping() const {
     return stopping.load(std::memory_order_relaxed);
   }
index 319f68ea9367185a8eac0c7358da1d2e9c834504..cefe6e8dc7929fe3b1169f25473de59698465c9e 100644 (file)
@@ -2,11 +2,11 @@
 
 #include "auth/Auth.h"
 #include "messages/MPing.h"
+#include "common/ceph_argparse.h"
 #include "crimson/auth/DummyAuth.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
-#include "crimson/net/Config.h"
 #include "crimson/thread/Throttle.h"
 
 #include <seastar/core/alien.hh>
@@ -16,6 +16,7 @@
 #include <seastar/core/posix.hh>
 #include <seastar/core/reactor.hh>
 
+using crimson::common::local_conf;
 
 enum class echo_role {
   as_server,
@@ -61,7 +62,7 @@ struct Server {
     }
   } dispatcher;
   Server(crimson::net::MessengerRef msgr)
-    : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
+    : byte_throttler(local_conf()->osd_client_message_size_cap),
       msgr{msgr}
   {
     msgr->set_crc_header();
@@ -85,7 +86,7 @@ struct Client {
     }
   } dispatcher;
   Client(crimson::net::MessengerRef msgr)
-    : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
+    : byte_throttler(local_conf()->osd_client_message_size_cap),
       msgr{msgr}
   {
     msgr->set_crc_header();
@@ -121,7 +122,17 @@ public:
 
   void run(seastar::app_template& app, int argc, char** argv) {
     app.run(argc, argv, [this] {
-      return seastar::now().then([this] {
+      std::vector<const char*> args;
+      std::string cluster;
+      std::string conf_file_list;
+      auto init_params = ceph_argparse_early_args(args,
+                                                CEPH_ENTITY_TYPE_CLIENT,
+                                                &cluster,
+                                                &conf_file_list);
+      return crimson::common::sharded_conf().start(init_params.name, cluster)
+      .then([conf_file_list] {
+        return local_conf().parse_config_files(conf_file_list);
+      }).then([this] {
         return set_seastar_ready();
       }).then([on_end = std::move(on_end)] () mutable {
         // seastar: let me know once i am free to leave.
@@ -133,6 +144,8 @@ public:
             return seastar::make_ready_future<>();
           });
         });
+      }).then([]() {
+        return crimson::common::sharded_conf().stop();
       }).handle_exception([](auto ep) {
         std::cerr << "Error: " << ep << std::endl;
       }).finally([] {
index 06a126426256a8357a0f7d91cf4560cc8a4abdbe..d0eddf56e9a5b1b0b19aed219ff2fa939928e47c 100644 (file)
@@ -1,3 +1,4 @@
+#include "common/ceph_argparse.h"
 #include "common/ceph_time.h"
 #include "messages/MPing.h"
 #include "messages/MCommand.h"
@@ -7,7 +8,6 @@
 #include "crimson/auth/DummyAuth.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
-#include "crimson/net/Config.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/net/Interceptor.h"
@@ -26,6 +26,7 @@
 #include "test_cmds.h"
 
 namespace bpo = boost::program_options;
+using crimson::common::local_conf;
 
 namespace {
 
@@ -3541,35 +3542,49 @@ int main(int argc, char** argv)
     ("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
      "create a local crimson testpeer, or connect to a remote testpeer");
   return app.run(argc, argv, [&app] {
-    auto&& config = app.configuration();
-    verbose = config["verbose"].as<bool>();
-    auto rounds = config["rounds"].as<unsigned>();
-    auto keepalive_ratio = config["keepalive-ratio"].as<double>();
-    entity_addr_t v2_test_addr;
-    ceph_assert(v2_test_addr.parse(
+    std::vector<const char*> args;
+    std::string cluster;
+    std::string conf_file_list;
+    auto init_params = ceph_argparse_early_args(args,
+                                                CEPH_ENTITY_TYPE_CLIENT,
+                                                &cluster,
+                                                &conf_file_list);
+    return crimson::common::sharded_conf().start(init_params.name, cluster)
+    .then([conf_file_list] {
+      return local_conf().parse_config_files(conf_file_list);
+    }).then([&app] {
+      auto&& config = app.configuration();
+      verbose = config["verbose"].as<bool>();
+      auto rounds = config["rounds"].as<unsigned>();
+      auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+      entity_addr_t v2_test_addr;
+      ceph_assert(v2_test_addr.parse(
           config["v2-test-addr"].as<std::string>().c_str(), nullptr));
-    entity_addr_t v2_testpeer_addr;
-    ceph_assert(v2_testpeer_addr.parse(
+      entity_addr_t v2_testpeer_addr;
+      ceph_assert(v2_testpeer_addr.parse(
           config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
-    auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
-    return test_echo(rounds, keepalive_ratio, false)
-    .then([rounds, keepalive_ratio] {
-      return test_echo(rounds, keepalive_ratio, true);
-    }).then([] {
-      return test_concurrent_dispatch(false);
-    }).then([] {
-      return test_concurrent_dispatch(true);
-    }).then([] {
-      return test_preemptive_shutdown(false);
-    }).then([] {
-      return test_preemptive_shutdown(true);
-    }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
-      return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+      auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
+      return test_echo(rounds, keepalive_ratio, false)
+      .then([rounds, keepalive_ratio] {
+        return test_echo(rounds, keepalive_ratio, true);
+      }).then([] {
+        return test_concurrent_dispatch(false);
+      }).then([] {
+        return test_concurrent_dispatch(true);
+      }).then([] {
+        return test_preemptive_shutdown(false);
+      }).then([] {
+        return test_preemptive_shutdown(true);
+      }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
+        return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+      }).then([] {
+        std::cout << "All tests succeeded" << std::endl;
+        // Seastar has bugs to have events undispatched during shutdown,
+        // which will result in memory leak and thus fail LeakSanitizer.
+        return seastar::sleep(100ms);
+      });
     }).then([] {
-      std::cout << "All tests succeeded" << std::endl;
-      // Seastar has bugs to have events undispatched during shutdown,
-      // which will result in memory leak and thus fail LeakSanitizer.
-      return seastar::sleep(100ms);
+      return crimson::common::sharded_conf().stop();
     }).handle_exception([] (auto eptr) {
       std::cout << "Test failure" << std::endl;
       return seastar::make_exception_future<>(eptr);
index b3dd719e29f2a3dc9c185630b5a51a4933311e5d..c3ab6fdb0e1b85b7cf301b9018c36e857038c4e8 100644 (file)
@@ -2,10 +2,14 @@
 #include <iostream>
 #include <numeric>
 #include <seastar/core/app-template.hh>
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
 #include "crimson/thread/ThreadPool.h"
+#include "include/msgr.h"
 
 using namespace std::chrono_literals;
 using ThreadPool = crimson::thread::ThreadPool;
+using crimson::common::local_conf;
 
 seastar::future<> test_accumulate(ThreadPool& tp) {
   static constexpr auto N = 5;
@@ -33,20 +37,35 @@ seastar::future<> test_void_return(ThreadPool& tp) {
 
 int main(int argc, char** argv)
 {
-  ThreadPool tp{2, 128, 0};
+  std::unique_ptr<crimson::thread::ThreadPool> tp;
   seastar::app_template app;
   return app.run(argc, argv, [&tp] {
-      return tp.start().then([&tp] {
-          return test_accumulate(tp);
-        }).then([&tp] {
-          return test_void_return(tp);
-        }).handle_exception([](auto e) {
-          std::cerr << "Error: " << e << std::endl;
-          seastar::engine().exit(1);
-        }).finally([&tp] {
-          return tp.stop();
-        });
+    std::vector<const char*> args;
+    std::string cluster;
+    std::string conf_file_list;
+    auto init_params = ceph_argparse_early_args(args,
+                                                CEPH_ENTITY_TYPE_CLIENT,
+                                                &cluster,
+                                                &conf_file_list);
+    return crimson::common::sharded_conf().start(init_params.name, cluster)
+    .then([conf_file_list] {
+      return local_conf().parse_config_files(conf_file_list);
+    }).then([&tp] {
+      tp = std::make_unique<crimson::thread::ThreadPool>(2, 128, 0);
+      return tp->start().then([&tp] {
+        return test_accumulate(*tp);
+      }).then([&tp] {
+        return test_void_return(*tp);
+      }).handle_exception([](auto e) {
+        std::cerr << "Error: " << e << std::endl;
+        seastar::engine().exit(1);
+      }).finally([&tp] {
+        return tp->stop();
       });
+    }).finally([] {
+      return crimson::common::sharded_conf().stop();
+    });
+  });
 }
 
 /*