#include <linux/fs.h>
#include <seastar/core/byteorder.hh>
+#include <seastar/core/rwlock.hh>
#include "crimson/os/seastore/cache.h"
#include "crimson/os/seastore/segment_cleaner.h"
std::optional<bufferptr> in_buffer;
std::optional<bufferlist> out_buffer;
+ using ref = std::unique_ptr<request_context_t>;
+ static ref make_ref() {
+ return std::make_unique<request_context_t>();
+ }
+
bool check_magic() const {
// todo
return true;
auto p = buffer.get_write();
seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC);
seastar::produce_be<uint32_t>(p, err);
+ logger().debug("write_reply writing err {}", err);
memcpy(p, handle, sizeof(handle));
return out.write(std::move(buffer)).then([this, &out] {
if (out_buffer) {
return seastar::do_for_each(
out_buffer->mut_buffers(),
[&out](bufferptr &ptr) {
+ logger().debug("write_reply writing {}", ptr.length());
return out.write(
seastar::temporary_buffer<char>(
ptr.c_str(),
}
};
+struct RequestWriter {
+ seastar::rwlock lock;
+ seastar::output_stream<char> stream;
+ bool stopped = false;
+ int pending = 0;
+
+ RequestWriter(
+ seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
+ RequestWriter(RequestWriter &&) = default;
+
+ seastar::future<> complete(request_context_t::ref &&req) {
+ if (stopped)
+ throw std::system_error(
+ std::make_error_code(
+ std::errc::operation_canceled));
+ auto &request = *req;
+ ++pending;
+ return lock.write_lock(
+ ).then([&request, this] {
+ return request.write_reply(stream);
+ }).finally([&, this, req=std::move(req)] {
+ --pending;
+ lock.write_unlock();
+ logger().debug("complete");
+ return seastar::now();
+ });
+ }
+
+ seastar::future<> close() {
+ stopped = true;
+ return lock.write_lock(
+ ).then([this] {
+ assert(pending == 0);
+ return stream.close();
+ });
+ }
+};
+
/**
* NBDHandler
*
seastar::future<> handle_command(
BlockDriver &backend,
- request_context_t &context,
- seastar::output_stream<char> &out)
+ request_context_t::ref request_ref,
+ RequestWriter &out)
{
- logger().debug("got command {}", context.get_command());
+ auto &request = *request_ref;
+ logger().debug("got command {}", request.get_command());
return ([&] {
- switch (context.get_command()) {
+ switch (request.get_command()) {
case NBD_CMD_WRITE:
return backend.write(
- context.from,
- *context.in_buffer);
+ request.from,
+ *request.in_buffer);
case NBD_CMD_READ:
return backend.read(
- context.from,
- context.len).then([&context] (auto buffer) {
- context.out_buffer = buffer;
+ request.from,
+ request.len).then([&] (auto buffer) {
+ logger().debug("read returned buffer len {}", buffer.length());
+ request.out_buffer = buffer;
});
case NBD_CMD_DISC:
throw std::system_error(std::make_error_code(std::errc::bad_message));
default:
throw std::system_error(std::make_error_code(std::errc::bad_message));
}
- })().then([&] {
- logger().debug("Writing reply");
- return context.write_reply(out);
+ })().then([&, request_ref=std::move(request_ref)]() mutable {
+ logger().debug("handle_command complete");
+ return out.complete(std::move(request_ref));
});
}
seastar::future<> handle_commands(
BlockDriver &backend,
seastar::input_stream<char>& in,
- seastar::output_stream<char>& out)
+ RequestWriter &out)
{
logger().debug("handle_commands");
return seastar::keep_doing(
[&] {
logger().debug("waiting for command");
- auto request_ref = std::make_unique<request_context_t>();
+ auto request_ref = request_context_t::make_ref();
auto &request = *request_ref;
return request.read_request(in
- ).then([&] {
- return handle_command(backend, request, out);
- }).then([req=std::move(request_ref)] {
- logger().debug("complete");
+ ).then([&, request_ref=std::move(request_ref)]() mutable {
+ static_cast<void>(handle_command(backend, std::move(request_ref), out));
+ logger().debug("handle_commands after fork");
+ return seastar::now();
});
});
}
[this](auto &conn) {
return seastar::do_with(
conn.input(),
- conn.output(),
+ RequestWriter{conn.output()},
[&, this](auto &input, auto &output) {
return send_negotiation(
backend.get_size(),
- output
+ output.stream
).then([&, this] {
return handle_commands(backend, input, output);
}).finally([&] {
logger().debug("Writing offset {}", offset);
assert(offset % segment_manager->get_block_size() == 0);
assert(ptr.length() == (size_t)segment_manager->get_block_size());
- return seastar::do_with(
- tm->create_transaction(),
- std::move(ptr),
- [this, offset](auto &t, auto &ptr) {
- return tm->dec_ref(
- *t,
- offset
- ).safe_then([](auto){}).handle_error(
- crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
- crimson::ct_error::pass_further_all{}
- ).safe_then([=, &t, &ptr] {
- logger().debug("dec_ref complete");
- return tm->alloc_extent<TestBlock>(
- *t,
- offset,
- ptr.length());
- }).safe_then([=, &t, &ptr](auto ext) mutable {
- assert(ext->get_laddr() == (size_t)offset);
- assert(ext->get_bptr().length() == ptr.length());
- ext->get_bptr().swap(ptr);
- logger().debug("submitting transaction");
- return tm->submit_transaction(std::move(t));
- });
+ return crimson::do_until(
+ [this, offset, ptr=std::move(ptr)] {
+ return seastar::do_with(
+ tm->create_transaction(),
+ ptr,
+ [this, offset](auto &t, auto &ptr) mutable {
+ return tm->dec_ref(
+ *t,
+ offset
+ ).safe_then([](auto){}).handle_error(
+ crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
+ crimson::ct_error::pass_further_all{}
+ ).safe_then([=, &t, &ptr] {
+ logger().debug("dec_ref complete");
+ return tm->alloc_extent<TestBlock>(
+ *t,
+ offset,
+ ptr.length());
+ }).safe_then([=, &t, &ptr](auto ext) mutable {
+ assert(ext->get_laddr() == (size_t)offset);
+ assert(ext->get_bptr().length() == ptr.length());
+ ext->get_bptr().swap(ptr);
+ logger().debug("submitting transaction");
+ return tm->submit_transaction(std::move(t));
+ }).safe_then([] {
+ return true;
+ }).handle_error(
+ [](const crimson::ct_error::eagain &e) {
+ return seastar::make_ready_future<bool>(false);
+ },
+ crimson::ct_error::pass_further_all{}
+ );
+ });
}).handle_error(
- crimson::ct_error::assert_all{}
+ crimson::ct_error::assert_all{"store-nbd write"}
);
}
logger().debug("Reading offset {}", offset);
assert(offset % segment_manager->get_block_size() == 0);
assert(size % (size_t)segment_manager->get_block_size() == 0);
- return seastar::do_with(
- tm->create_transaction(),
- [this, offset, size](auto &t) {
- return tm->read_extents<TestBlock>(*t, offset, size
- ).safe_then([=](auto ext_list) mutable {
- size_t cur = offset;
- bufferlist bl;
- for (auto &i: ext_list) {
- if (cur != i.first) {
- assert(cur < i.first);
- bl.append_zero(i.first - cur);
- cur = i.first;
- }
- bl.append(i.second->get_bptr());
- cur += i.second->get_bptr().length();
- }
- if (bl.length() != size) {
- assert(bl.length() < size);
- bl.append_zero(size - bl.length());
- }
- return seastar::make_ready_future<bufferlist>(std::move(bl));
- });
+ auto blptrret = std::make_unique<bufferlist>();
+ auto &blret = *blptrret;
+ return crimson::do_until(
+ [=, &blret] {
+ return seastar::do_with(
+ tm->create_transaction(),
+ [=, &blret](auto &t) {
+ return tm->read_extents<TestBlock>(*t, offset, size
+ ).safe_then([=, &blret](auto ext_list) mutable {
+ size_t cur = offset;
+ for (auto &i: ext_list) {
+ if (cur != i.first) {
+ assert(cur < i.first);
+ blret.append_zero(i.first - cur);
+ cur = i.first;
+ }
+ blret.append(i.second->get_bptr());
+ cur += i.second->get_bptr().length();
+ }
+ if (blret.length() != size) {
+ assert(blret.length() < size);
+ blret.append_zero(size - blret.length());
+ }
+ return seastar::make_ready_future<bool>(true);
+ }).handle_error(
+ [](const crimson::ct_error::eagain &e) {
+ return seastar::make_ready_future<bool>(false);
+ },
+ crimson::ct_error::pass_further_all{}
+ );
+ });
}).handle_error(
- crimson::ct_error::assert_all{}
- );
+ crimson::ct_error::assert_all{"store-nbd read"}
+ ).then([blptrret=std::move(blptrret)]() mutable {
+ logger().debug("read complete");
+ return std::move(*blptrret);
+ });
}
void init() {