remove crimson/net/Config.h and changed corresponding unit test case.
Signed-off-by: chunmei-liu <chunmei.liu@intel.com>
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-// XXX: a poor man's md_config_t
-#pragma once
-
-#include "include/msgr.h"
-#include <chrono>
-
-namespace crimson::net {
-
-using namespace std::literals::chrono_literals;
-
-constexpr struct simple_md_config_t {
- uint32_t host_type = CEPH_ENTITY_TYPE_OSD;
- bool cephx_require_signatures = false;
- bool cephx_cluster_require_signatures = false;
- bool cephx_service_require_signatures = false;
- bool ms_die_on_old_message = true;
- bool ms_die_on_skipped_message = true;
- double ms_initial_backoff = .2;
- double ms_max_backoff = 15.0;
- std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms;
- size_t osd_client_message_size_cap = 500ULL << 20;
-} conf;
-}
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
#include "crimson/common/log.h"
-#include "Config.h"
#include "Dispatcher.h"
#include "Errors.h"
#include "Socket.h"
WRITE_RAW_ENCODER(ceph_msg_connect);
WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+using crimson::common::local_conf;
+
std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
{
return out << "connect{features=" << std::hex << c.features << std::dec
if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
return false;
}
- if (conf.cephx_require_signatures) {
+ if (local_conf()->cephx_require_signatures) {
return true;
}
if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
- return conf.cephx_cluster_require_signatures;
+ return local_conf()->cephx_cluster_require_signatures;
} else {
- return conf.cephx_service_require_signatures;
+ return local_conf()->cephx_service_require_signatures;
}
}
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
-#include "Config.h"
#include "Dispatcher.h"
#include "Errors.h"
#include "Socket.h"
#endif
using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
namespace {
logger().error("{} got old message {} <= {} {} {}, discarding",
conn, message->get_seq(), cur_seq, message, *message);
if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
- conf.ms_die_on_old_message) {
+ local_conf()->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
}
return;
} else if (message->get_seq() > cur_seq + 1) {
logger().error("{} missed message? skipped from seq {} to {}",
conn, cur_seq, message->get_seq());
- if (conf.ms_die_on_skipped_message) {
+ if (local_conf()->ms_die_on_skipped_message) {
ceph_assert(0 == "skipped incoming seq");
}
}
gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
- backoff = conf.ms_max_backoff;
+ backoff = local_conf().get_val<double>("ms_max_backoff");
} else if (backoff > 0) {
- backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
} else {
- backoff = conf.ms_initial_backoff;
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
}
return protocol_timer.backoff(backoff).then([this] {
if (unlikely(state != state_t::WAIT)) {
#include "SocketConnection.h"
-#include "Config.h"
#include "ProtocolV1.h"
#include "ProtocolV2.h"
#include "SocketMessenger.h"
#endif
using namespace crimson::net;
+using crimson::common::local_conf;
SocketConnection::SocketConnection(SocketMessenger& messenger,
Dispatcher& dispatcher,
{
if (seq <= in_seq) {
if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
- conf.ms_die_on_old_message) {
+ local_conf()->ms_die_on_old_message) {
ceph_abort_msg("old msgs despite reconnect_seq feature");
}
return false;
} else if (seq > in_seq + 1) {
- if (conf.ms_die_on_skipped_message) {
+ if (local_conf()->ms_die_on_skipped_message) {
ceph_abort_msg("skipped incoming seq");
}
return false;
#include "ThreadPool.h"
+#include <chrono>
#include <pthread.h>
-#include "crimson/net/Config.h"
#include "include/intarith.h"
#include "include/ceph_assert.h"
+#include "crimson/common/config_proxy.h"
+
+using crimson::common::local_conf;
namespace crimson::thread {
: queue_size{round_up_to(queue_sz, seastar::smp::count)},
pending{queue_size}
{
+ auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
for (size_t i = 0; i < n_threads; i++) {
- threads.emplace_back([this, cpu_id] {
+ threads.emplace_back([this, cpu_id, queue_max_wait] {
pin(cpu_id);
- loop();
+ loop(queue_max_wait);
});
}
}
ceph_assert(r == 0);
}
-void ThreadPool::loop()
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
{
for (;;) {
WorkItem* work_item = nullptr;
{
std::unique_lock lock{mutex};
- cond.wait_for(lock,
- crimson::net::conf.threadpool_empty_queue_max_wait,
+ cond.wait_for(lock, queue_max_wait,
[this, &work_item] {
return pending.pop(work_item) || is_stopping();
});
const size_t queue_size;
boost::lockfree::queue<WorkItem*> pending;
- void loop();
+ void loop(std::chrono::milliseconds queue_max_wait);
bool is_stopping() const {
return stopping.load(std::memory_order_relaxed);
}
#include "auth/Auth.h"
#include "messages/MPing.h"
+#include "common/ceph_argparse.h"
#include "crimson/auth/DummyAuth.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
-#include "crimson/net/Config.h"
#include "crimson/thread/Throttle.h"
#include <seastar/core/alien.hh>
#include <seastar/core/posix.hh>
#include <seastar/core/reactor.hh>
+using crimson::common::local_conf;
enum class echo_role {
as_server,
}
} dispatcher;
Server(crimson::net::MessengerRef msgr)
- : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
+ : byte_throttler(local_conf()->osd_client_message_size_cap),
msgr{msgr}
{
msgr->set_crc_header();
}
} dispatcher;
Client(crimson::net::MessengerRef msgr)
- : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
+ : byte_throttler(local_conf()->osd_client_message_size_cap),
msgr{msgr}
{
msgr->set_crc_header();
void run(seastar::app_template& app, int argc, char** argv) {
app.run(argc, argv, [this] {
- return seastar::now().then([this] {
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ return crimson::common::sharded_conf().start(init_params.name, cluster)
+ .then([conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([this] {
return set_seastar_ready();
}).then([on_end = std::move(on_end)] () mutable {
// seastar: let me know once i am free to leave.
return seastar::make_ready_future<>();
});
});
+ }).then([]() {
+ return crimson::common::sharded_conf().stop();
}).handle_exception([](auto ep) {
std::cerr << "Error: " << ep << std::endl;
}).finally([] {
+#include "common/ceph_argparse.h"
#include "common/ceph_time.h"
#include "messages/MPing.h"
#include "messages/MCommand.h"
#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
-#include "crimson/net/Config.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Interceptor.h"
#include "test_cmds.h"
namespace bpo = boost::program_options;
+using crimson::common::local_conf;
namespace {
("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
"create a local crimson testpeer, or connect to a remote testpeer");
return app.run(argc, argv, [&app] {
- auto&& config = app.configuration();
- verbose = config["verbose"].as<bool>();
- auto rounds = config["rounds"].as<unsigned>();
- auto keepalive_ratio = config["keepalive-ratio"].as<double>();
- entity_addr_t v2_test_addr;
- ceph_assert(v2_test_addr.parse(
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ return crimson::common::sharded_conf().start(init_params.name, cluster)
+ .then([conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([&app] {
+ auto&& config = app.configuration();
+ verbose = config["verbose"].as<bool>();
+ auto rounds = config["rounds"].as<unsigned>();
+ auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ entity_addr_t v2_test_addr;
+ ceph_assert(v2_test_addr.parse(
config["v2-test-addr"].as<std::string>().c_str(), nullptr));
- entity_addr_t v2_testpeer_addr;
- ceph_assert(v2_testpeer_addr.parse(
+ entity_addr_t v2_testpeer_addr;
+ ceph_assert(v2_testpeer_addr.parse(
config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
- auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
- return test_echo(rounds, keepalive_ratio, false)
- .then([rounds, keepalive_ratio] {
- return test_echo(rounds, keepalive_ratio, true);
- }).then([] {
- return test_concurrent_dispatch(false);
- }).then([] {
- return test_concurrent_dispatch(true);
- }).then([] {
- return test_preemptive_shutdown(false);
- }).then([] {
- return test_preemptive_shutdown(true);
- }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
- return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+ auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
+ return test_echo(rounds, keepalive_ratio, false)
+ .then([rounds, keepalive_ratio] {
+ return test_echo(rounds, keepalive_ratio, true);
+ }).then([] {
+ return test_concurrent_dispatch(false);
+ }).then([] {
+ return test_concurrent_dispatch(true);
+ }).then([] {
+ return test_preemptive_shutdown(false);
+ }).then([] {
+ return test_preemptive_shutdown(true);
+ }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
+ return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+ }).then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ // Seastar has bugs to have events undispatched during shutdown,
+ // which will result in memory leak and thus fail LeakSanitizer.
+ return seastar::sleep(100ms);
+ });
}).then([] {
- std::cout << "All tests succeeded" << std::endl;
- // Seastar has bugs to have events undispatched during shutdown,
- // which will result in memory leak and thus fail LeakSanitizer.
- return seastar::sleep(100ms);
+ return crimson::common::sharded_conf().stop();
}).handle_exception([] (auto eptr) {
std::cout << "Test failure" << std::endl;
return seastar::make_exception_future<>(eptr);
#include <iostream>
#include <numeric>
#include <seastar/core/app-template.hh>
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
#include "crimson/thread/ThreadPool.h"
+#include "include/msgr.h"
using namespace std::chrono_literals;
using ThreadPool = crimson::thread::ThreadPool;
+using crimson::common::local_conf;
seastar::future<> test_accumulate(ThreadPool& tp) {
static constexpr auto N = 5;
int main(int argc, char** argv)
{
- ThreadPool tp{2, 128, 0};
+ std::unique_ptr<crimson::thread::ThreadPool> tp;
seastar::app_template app;
return app.run(argc, argv, [&tp] {
- return tp.start().then([&tp] {
- return test_accumulate(tp);
- }).then([&tp] {
- return test_void_return(tp);
- }).handle_exception([](auto e) {
- std::cerr << "Error: " << e << std::endl;
- seastar::engine().exit(1);
- }).finally([&tp] {
- return tp.stop();
- });
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ return crimson::common::sharded_conf().start(init_params.name, cluster)
+ .then([conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([&tp] {
+ tp = std::make_unique<crimson::thread::ThreadPool>(2, 128, 0);
+ return tp->start().then([&tp] {
+ return test_accumulate(*tp);
+ }).then([&tp] {
+ return test_void_return(*tp);
+ }).handle_exception([](auto e) {
+ std::cerr << "Error: " << e << std::endl;
+ seastar::engine().exit(1);
+ }).finally([&tp] {
+ return tp->stop();
});
+ }).finally([] {
+ return crimson::common::sharded_conf().stop();
+ });
+ });
}
/*