From fa27d7b64d6bad0f5fe0e61d0305fd08c3a70a62 Mon Sep 17 00:00:00 2001 From: chunmei-liu Date: Mon, 13 Apr 2020 21:56:10 -0700 Subject: [PATCH] crimson/net: remove hardcoded msgr configuration remove crimson/net/Config.h and changed corresponding unit test case. Signed-off-by: chunmei-liu --- src/crimson/net/Config.h | 26 ----------- src/crimson/net/ProtocolV1.cc | 9 ++-- src/crimson/net/ProtocolV2.cc | 12 ++--- src/crimson/net/SocketConnection.cc | 6 +-- src/crimson/thread/ThreadPool.cc | 15 +++--- src/crimson/thread/ThreadPool.h | 2 +- src/test/crimson/test_alien_echo.cc | 21 +++++++-- src/test/crimson/test_messenger.cc | 69 +++++++++++++++++----------- src/test/crimson/test_thread_pool.cc | 41 ++++++++++++----- 9 files changed, 113 insertions(+), 88 deletions(-) delete mode 100644 src/crimson/net/Config.h diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h deleted file mode 100644 index 229b68913db01..0000000000000 --- a/src/crimson/net/Config.h +++ /dev/null @@ -1,26 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -// XXX: a poor man's md_config_t -#pragma once - -#include "include/msgr.h" -#include - -namespace crimson::net { - -using namespace std::literals::chrono_literals; - -constexpr struct simple_md_config_t { - uint32_t host_type = CEPH_ENTITY_TYPE_OSD; - bool cephx_require_signatures = false; - bool cephx_cluster_require_signatures = false; - bool cephx_service_require_signatures = false; - bool ms_die_on_old_message = true; - bool ms_die_on_skipped_message = true; - double ms_initial_backoff = .2; - double ms_max_backoff = 15.0; - std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms; - size_t osd_client_message_size_cap = 500ULL << 20; -} conf; -} diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index d0c677df9a550..464a074b14e0a 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -15,7 +15,6 @@ #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" #include "crimson/common/log.h" -#include "Config.h" #include "Dispatcher.h" #include "Errors.h" #include "Socket.h" @@ -25,6 +24,8 @@ WRITE_RAW_ENCODER(ceph_msg_connect); WRITE_RAW_ENCODER(ceph_msg_connect_reply); +using crimson::common::local_conf; + std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) { return out << "connect{features=" << std::hex << c.features << std::dec @@ -517,14 +518,14 @@ bool ProtocolV1::require_auth_feature() const if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { return false; } - if (conf.cephx_require_signatures) { + if (local_conf()->cephx_require_signatures) { return true; } if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { - return conf.cephx_cluster_require_signatures; + return local_conf()->cephx_cluster_require_signatures; } else { - return conf.cephx_service_require_signatures; + return local_conf()->cephx_service_require_signatures; } } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 3f590e66b0d57..516a05ce6e4ad 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -12,7 +12,6 @@ #include "crimson/auth/AuthServer.h" #include "crimson/common/formatter.h" -#include "Config.h" #include "Dispatcher.h" #include "Errors.h" #include "Socket.h" @@ -24,6 +23,7 @@ #endif using namespace ceph::msgr::v2; +using crimson::common::local_conf; namespace { @@ -1960,14 +1960,14 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) logger().error("{} got old message {} <= {} {} {}, discarding", conn, message->get_seq(), cur_seq, message, *message); if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && - conf.ms_die_on_old_message) { + local_conf()->ms_die_on_old_message) { ceph_assert(0 == "old msgs despite reconnect_seq feature"); } return; } else if (message->get_seq() > cur_seq + 1) { logger().error("{} missed message? skipped from seq {} to {}", conn, cur_seq, message->get_seq()); - if (conf.ms_die_on_skipped_message) { + if (local_conf()->ms_die_on_skipped_message) { ceph_assert(0 == "skipped incoming seq"); } } @@ -2111,11 +2111,11 @@ void ProtocolV2::execute_wait(bool max_backoff) gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { - backoff = conf.ms_max_backoff; + backoff = local_conf().get_val("ms_max_backoff"); } else if (backoff > 0) { - backoff = std::min(conf.ms_max_backoff, 2 * backoff); + backoff = std::min(local_conf().get_val("ms_max_backoff"), 2 * backoff); } else { - backoff = conf.ms_initial_backoff; + backoff = local_conf().get_val("ms_initial_backoff"); } return protocol_timer.backoff(backoff).then([this] { if (unlikely(state != state_t::WAIT)) { diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 4e80b8104d174..1595b75148de8 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -14,7 +14,6 @@ #include "SocketConnection.h" -#include "Config.h" #include "ProtocolV1.h" #include "ProtocolV2.h" #include "SocketMessenger.h" @@ -24,6 +23,7 @@ #endif using namespace crimson::net; +using crimson::common::local_conf; SocketConnection::SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher, @@ -97,12 +97,12 @@ bool SocketConnection::update_rx_seq(seq_num_t seq) { if (seq <= in_seq) { if (HAVE_FEATURE(features, RECONNECT_SEQ) && - conf.ms_die_on_old_message) { + local_conf()->ms_die_on_old_message) { ceph_abort_msg("old msgs despite reconnect_seq feature"); } return false; } else if (seq > in_seq + 1) { - if (conf.ms_die_on_skipped_message) { + if (local_conf()->ms_die_on_skipped_message) { ceph_abort_msg("skipped incoming seq"); } return false; diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc index b4caaab2b5705..9914d8749f041 100644 --- a/src/crimson/thread/ThreadPool.cc +++ b/src/crimson/thread/ThreadPool.cc @@ -1,10 +1,13 @@ #include "ThreadPool.h" +#include #include -#include "crimson/net/Config.h" #include "include/intarith.h" #include "include/ceph_assert.h" +#include "crimson/common/config_proxy.h" + +using crimson::common::local_conf; namespace crimson::thread { @@ -14,10 +17,11 @@ ThreadPool::ThreadPool(size_t n_threads, : queue_size{round_up_to(queue_sz, seastar::smp::count)}, pending{queue_size} { + auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); for (size_t i = 0; i < n_threads; i++) { - threads.emplace_back([this, cpu_id] { + threads.emplace_back([this, cpu_id, queue_max_wait] { pin(cpu_id); - loop(); + loop(queue_max_wait); }); } } @@ -39,14 +43,13 @@ void ThreadPool::pin(unsigned cpu_id) ceph_assert(r == 0); } -void ThreadPool::loop() +void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) { for (;;) { WorkItem* work_item = nullptr; { std::unique_lock lock{mutex}; - cond.wait_for(lock, - crimson::net::conf.threadpool_empty_queue_max_wait, + cond.wait_for(lock, queue_max_wait, [this, &work_item] { return pending.pop(work_item) || is_stopping(); }); diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h index 1f44715615a23..8cf61799aab6e 100644 --- a/src/crimson/thread/ThreadPool.h +++ b/src/crimson/thread/ThreadPool.h @@ -81,7 +81,7 @@ class ThreadPool { const size_t queue_size; boost::lockfree::queue pending; - void loop(); + void loop(std::chrono::milliseconds queue_max_wait); bool is_stopping() const { return stopping.load(std::memory_order_relaxed); } diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 319f68ea93671..cefe6e8dc7929 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -2,11 +2,11 @@ #include "auth/Auth.h" #include "messages/MPing.h" +#include "common/ceph_argparse.h" #include "crimson/auth/DummyAuth.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" -#include "crimson/net/Config.h" #include "crimson/thread/Throttle.h" #include @@ -16,6 +16,7 @@ #include #include +using crimson::common::local_conf; enum class echo_role { as_server, @@ -61,7 +62,7 @@ struct Server { } } dispatcher; Server(crimson::net::MessengerRef msgr) - : byte_throttler(crimson::net::conf.osd_client_message_size_cap), + : byte_throttler(local_conf()->osd_client_message_size_cap), msgr{msgr} { msgr->set_crc_header(); @@ -85,7 +86,7 @@ struct Client { } } dispatcher; Client(crimson::net::MessengerRef msgr) - : byte_throttler(crimson::net::conf.osd_client_message_size_cap), + : byte_throttler(local_conf()->osd_client_message_size_cap), msgr{msgr} { msgr->set_crc_header(); @@ -121,7 +122,17 @@ public: void run(seastar::app_template& app, int argc, char** argv) { app.run(argc, argv, [this] { - return seastar::now().then([this] { + std::vector args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + return crimson::common::sharded_conf().start(init_params.name, cluster) + .then([conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([this] { return set_seastar_ready(); }).then([on_end = std::move(on_end)] () mutable { // seastar: let me know once i am free to leave. @@ -133,6 +144,8 @@ public: return seastar::make_ready_future<>(); }); }); + }).then([]() { + return crimson::common::sharded_conf().stop(); }).handle_exception([](auto ep) { std::cerr << "Error: " << ep << std::endl; }).finally([] { diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 06a126426256a..d0eddf56e9a5b 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1,3 +1,4 @@ +#include "common/ceph_argparse.h" #include "common/ceph_time.h" #include "messages/MPing.h" #include "messages/MCommand.h" @@ -7,7 +8,6 @@ #include "crimson/auth/DummyAuth.h" #include "crimson/common/log.h" #include "crimson/net/Connection.h" -#include "crimson/net/Config.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" #include "crimson/net/Interceptor.h" @@ -26,6 +26,7 @@ #include "test_cmds.h" namespace bpo = boost::program_options; +using crimson::common::local_conf; namespace { @@ -3541,35 +3542,49 @@ int main(int argc, char** argv) ("v2-testpeer-islocal", bpo::value()->default_value(true), "create a local crimson testpeer, or connect to a remote testpeer"); return app.run(argc, argv, [&app] { - auto&& config = app.configuration(); - verbose = config["verbose"].as(); - auto rounds = config["rounds"].as(); - auto keepalive_ratio = config["keepalive-ratio"].as(); - entity_addr_t v2_test_addr; - ceph_assert(v2_test_addr.parse( + std::vector args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + return crimson::common::sharded_conf().start(init_params.name, cluster) + .then([conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([&app] { + auto&& config = app.configuration(); + verbose = config["verbose"].as(); + auto rounds = config["rounds"].as(); + auto keepalive_ratio = config["keepalive-ratio"].as(); + entity_addr_t v2_test_addr; + ceph_assert(v2_test_addr.parse( config["v2-test-addr"].as().c_str(), nullptr)); - entity_addr_t v2_testpeer_addr; - ceph_assert(v2_testpeer_addr.parse( + entity_addr_t v2_testpeer_addr; + ceph_assert(v2_testpeer_addr.parse( config["v2-testpeer-addr"].as().c_str(), nullptr)); - auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as(); - return test_echo(rounds, keepalive_ratio, false) - .then([rounds, keepalive_ratio] { - return test_echo(rounds, keepalive_ratio, true); - }).then([] { - return test_concurrent_dispatch(false); - }).then([] { - return test_concurrent_dispatch(true); - }).then([] { - return test_preemptive_shutdown(false); - }).then([] { - return test_preemptive_shutdown(true); - }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] { - return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal); + auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as(); + return test_echo(rounds, keepalive_ratio, false) + .then([rounds, keepalive_ratio] { + return test_echo(rounds, keepalive_ratio, true); + }).then([] { + return test_concurrent_dispatch(false); + }).then([] { + return test_concurrent_dispatch(true); + }).then([] { + return test_preemptive_shutdown(false); + }).then([] { + return test_preemptive_shutdown(true); + }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] { + return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal); + }).then([] { + std::cout << "All tests succeeded" << std::endl; + // Seastar has bugs to have events undispatched during shutdown, + // which will result in memory leak and thus fail LeakSanitizer. + return seastar::sleep(100ms); + }); }).then([] { - std::cout << "All tests succeeded" << std::endl; - // Seastar has bugs to have events undispatched during shutdown, - // which will result in memory leak and thus fail LeakSanitizer. - return seastar::sleep(100ms); + return crimson::common::sharded_conf().stop(); }).handle_exception([] (auto eptr) { std::cout << "Test failure" << std::endl; return seastar::make_exception_future<>(eptr); diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc index b3dd719e29f2a..c3ab6fdb0e1b8 100644 --- a/src/test/crimson/test_thread_pool.cc +++ b/src/test/crimson/test_thread_pool.cc @@ -2,10 +2,14 @@ #include #include #include +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" #include "crimson/thread/ThreadPool.h" +#include "include/msgr.h" using namespace std::chrono_literals; using ThreadPool = crimson::thread::ThreadPool; +using crimson::common::local_conf; seastar::future<> test_accumulate(ThreadPool& tp) { static constexpr auto N = 5; @@ -33,20 +37,35 @@ seastar::future<> test_void_return(ThreadPool& tp) { int main(int argc, char** argv) { - ThreadPool tp{2, 128, 0}; + std::unique_ptr tp; seastar::app_template app; return app.run(argc, argv, [&tp] { - return tp.start().then([&tp] { - return test_accumulate(tp); - }).then([&tp] { - return test_void_return(tp); - }).handle_exception([](auto e) { - std::cerr << "Error: " << e << std::endl; - seastar::engine().exit(1); - }).finally([&tp] { - return tp.stop(); - }); + std::vector args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + return crimson::common::sharded_conf().start(init_params.name, cluster) + .then([conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([&tp] { + tp = std::make_unique(2, 128, 0); + return tp->start().then([&tp] { + return test_accumulate(*tp); + }).then([&tp] { + return test_void_return(*tp); + }).handle_exception([](auto e) { + std::cerr << "Error: " << e << std::endl; + seastar::engine().exit(1); + }).finally([&tp] { + return tp->stop(); }); + }).finally([] { + return crimson::common::sharded_conf().stop(); + }); + }); } /* -- 2.39.5