#include <linux/nbd.h>
#include <linux/fs.h>
+#include <seastar/apps/lib/stop_signal.hh>
#include <seastar/core/byteorder.hh>
+#include <seastar/util/defer.hh>
+#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include "crimson/common/log.h"
class NBDHandler {
BlockDriver &backend;
std::string uds_path;
+ std::optional<seastar::server_socket> socket;
+ seastar::gate gate;
public:
struct config_t {
std::string uds_path;
{}
seastar::future<> run();
+ seastar::future<> stop();
};
int main(int argc, char** argv)
}
std::vector<const char*> args(argv, argv + argc);
- seastar::app_template app;
+ seastar::app_template::config app_cfg;
+ app_cfg.name = "crimson-store-nbd";
+ app_cfg.auto_handle_sigint_sigterm = false;
+ seastar::app_template app(std::move(app_cfg));
std::vector<char*> av{argv[0]};
std::transform(begin(unrecognized_options),
[](auto& s) {
return const_cast<char*>(s.c_str());
});
+ return app.run(av.size(), av.data(), [&] {
+ if (debug) {
+ seastar::global_logger_registry().set_all_loggers_level(
+ seastar::log_level::debug
+ );
+ }
+ return seastar::async([&] {
+ seastar_apps_lib::stop_signal should_stop;
+ crimson::common::sharded_conf()
+ .start(EntityName{}, string_view{"ceph"}).get();
+ auto stop_conf = seastar::defer([] {
+ crimson::common::sharded_conf().stop().get();
+ });
- SeastarRunner sc;
- sc.init(av.size(), av.data());
-
- if (debug) {
- seastar::global_logger_registry().set_all_loggers_level(
- seastar::log_level::debug
- );
- }
-
- sc.run([=] {
- return crimson::common::sharded_conf(
- ).start(EntityName{}, string_view{"ceph"}
- ).then([=] {
auto backend = get_backend(backend_config);
- return seastar::do_with(
- NBDHandler(*backend, nbd_config),
- std::move(backend),
- [](auto &nbd, auto &backend) {
- return backend->mount(
- ).then([&] {
- logger().debug("Running nbd server...");
- return nbd.run();
- }).then([&] {
- return backend->close();
- });
- });
- }).then([=] {
- return crimson::common::sharded_conf().stop();
+ NBDHandler nbd(*backend, nbd_config);
+ backend->mount().get();
+ auto close_backend = seastar::defer([&] {
+ backend->close().get();
+ });
+
+ logger().debug("Running nbd server...");
+ (void)nbd.run();
+ auto stop_nbd = seastar::defer([&] {
+ nbd.stop().get();
+ });
+ should_stop.wait().get();
+ return 0;
});
});
-
- sc.stop();
}
class nbd_oldstyle_negotiation_t {
seastar::future<> NBDHandler::run()
{
logger().debug("About to listen on {}", uds_path);
- return seastar::do_with(
- seastar::engine().listen(
+ socket = seastar::engine().listen(
seastar::socket_address{
- seastar::unix_domain_addr{uds_path}}),
- [=](auto &socket) {
- return seastar::keep_doing(
- [this, &socket] {
- return socket.accept().then([this](auto acc) {
- logger().debug("Accepted");
- return seastar::do_with(
- std::move(acc.connection),
- [this](auto &conn) {
- return seastar::do_with(
- conn.input(),
- RequestWriter{conn.output()},
- [&, this](auto &input, auto &output) {
- return send_negotiation(
- backend.get_size(),
- output.stream
- ).then([&, this] {
- return handle_commands(backend, input, output);
- }).finally([&] {
- return input.close();
- }).finally([&] {
- return output.close();
- }).handle_exception([](auto e) {
- logger().error("NBDHandler::run saw exception {}", e);
- return seastar::now();
- });
- });
- });
- });
- });
- });
+ seastar::unix_domain_addr{uds_path}});
+
+ return seastar::keep_doing([this] {
+ return seastar::try_with_gate(gate, [this] {
+ return socket->accept().then([this](auto acc) {
+ logger().debug("Accepted");
+ return seastar::do_with(
+ std::move(acc.connection),
+ [this](auto &conn) {
+ return seastar::do_with(
+ conn.input(),
+ RequestWriter{conn.output()},
+ [&, this](auto &input, auto &output) {
+ return send_negotiation(
+ backend.get_size(),
+ output.stream
+ ).then([&, this] {
+ return handle_commands(backend, input, output);
+ }).finally([&] {
+ return input.close();
+ }).finally([&] {
+ return output.close();
+ }).handle_exception([](auto e) {
+ logger().error("NBDHandler::run saw exception {}", e);
+ return seastar::now();
+ });
+ });
+ });
+ });
+ }).handle_exception_type([](const seastar::gate_closed_exception&) {});
+ });
+}
+
+seastar::future<> NBDHandler::stop()
+{
+ seastar::future<> done = gate.close();
+ if (socket) {
+ socket->abort_accept();
+ socket.reset();
+ }
+ return done;
}