#include "include/rados/librados.hpp"
#include "common/async/completion.h"
+#include "librados/AioCompletionImpl.h"
/// Defines asynchronous librados operations that satisfy all of the
/// "Requirements on asynchronous operations" imposed by the C++ Networking TS
/// argument to the handler.
template <typename Result>
struct Invoker {
- using Signature = void(boost::system::error_code, Result);
+ using Signature = void(boost::system::error_code, version_t, Result);
Result result;
template <typename Completion>
- void dispatch(Completion&& completion, boost::system::error_code ec) {
- ceph::async::dispatch(std::move(completion), ec, std::move(result));
+ void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
+ ceph::async::dispatch(std::move(completion), ec, ver, std::move(result));
}
};
// specialization for Result=void
template <>
struct Invoker<void> {
- using Signature = void(boost::system::error_code);
+ using Signature = void(boost::system::error_code, version_t);
template <typename Completion>
- void dispatch(Completion&& completion, boost::system::error_code ec) {
- ceph::async::dispatch(std::move(completion), ec);
+ void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
+ ceph::async::dispatch(std::move(completion), ec, ver);
}
};
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
// move result out of Completion memory being freed
auto op = std::move(p->user_data);
- const int ret = op.aio_completion->get_return_value();
+ // access AioCompletionImpl directly to avoid locking
+ const librados::AioCompletionImpl* pc = op.aio_completion->pc;
+ const int ret = pc->rval;
+ const version_t ver = pc->objver;
boost::system::error_code ec;
if (ret < 0) {
ec.assign(-ret, librados::detail::err_category());
}
- op.dispatch(std::move(p), ec);
+ op.dispatch(std::move(p), ec, ver);
}
template <typename Executor1, typename CompletionHandler>
/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
-/// given handler with signature (boost::system::error_code, bufferlist).
+/// given handler with signature (error_code, version_t, bufferlist).
template <typename ExecutionContext, typename CompletionToken>
auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
size_t len, uint64_t off, CompletionToken&& token)
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, bufferlist{});
+ ceph::async::post(std::move(p), ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}
}
/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
-/// given handler with signature (boost::system::error_code).
+/// given handler with signature (error_code, version_t).
template <typename ExecutionContext, typename CompletionToken>
auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
- bufferlist &bl, size_t len, uint64_t off,
+ const bufferlist &bl, size_t len, uint64_t off,
CompletionToken&& token)
{
using Op = detail::AsyncOp<void>;
using Signature = typename Op::Signature;
return boost::asio::async_initiate<CompletionToken, Signature>(
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
- bufferlist &bl, size_t len, uint64_t off) {
+ const bufferlist &bl, size_t len, uint64_t off) {
auto p = Op::create(ex, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec);
+ ceph::async::post(std::move(p), ec, 0);
} else {
p.release(); // release ownership until completion
}
}
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
-/// given handler with signature (boost::system::error_code, bufferlist).
+/// given handler with signature (error_code, version_t, bufferlist).
template <typename ExecutionContext, typename CompletionToken>
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectReadOperation *read_op, int flags,
flags, &op.result);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, bufferlist{});
+ ceph::async::post(std::move(p), ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}
}
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
-/// given handler with signature (boost::system::error_code).
+/// given handler with signature (error_code, version_t).
template <typename ExecutionContext, typename CompletionToken>
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectWriteOperation *write_op, int flags,
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec);
+ ceph::async::post(std::move(p), ec, 0);
} else {
p.release(); // release ownership until completion
}
}
/// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
-/// given handler with signature (boost::system::error_code, bufferlist).
+/// given handler with signature (error_code, version_t, bufferlist).
template <typename ExecutionContext, typename CompletionToken>
auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
bufferlist& bl, uint64_t timeout_ms, CompletionToken &&token)
bl, timeout_ms, &op.result);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, bufferlist{});
+ ceph::async::post(std::move(p), ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}
#define dout_subsys ceph_subsys_rados
#define dout_context g_ceph_context
-using namespace std;
-
// test fixture for global setup/teardown
class AsioRados : public ::testing::Test {
static constexpr auto poolname = "ceph_test_rados_api_asio";
librados::IoCtx AsioRados::io;
librados::IoCtx AsioRados::snapio;
+using boost::system::error_code;
+using read_result = std::tuple<version_t, bufferlist>;
+
void rethrow(std::exception_ptr eptr) {
if (eptr) std::rethrow_exception(eptr);
}
{
boost::asio::io_context service;
- auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ auto success_cb = [&] (error_code ec, version_t ver, bufferlist bl) {
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
EXPECT_EQ("hello", bl.to_str());
};
librados::async_read(service, io, "exist", 256, 0, success_cb);
- auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ auto failure_cb = [&] (error_code ec, version_t ver, bufferlist bl) {
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ EXPECT_EQ(0, ver);
+ EXPECT_EQ(0, bl.length());
};
librados::async_read(service, io, "noexist", 256, 0, failure_cb);
{
boost::asio::io_context service;
- std::future<bufferlist> f1 = librados::async_read(service, io, "exist", 256,
- 0, boost::asio::use_future);
- std::future<bufferlist> f2 = librados::async_read(service, io, "noexist", 256,
- 0, boost::asio::use_future);
+ auto f1 = librados::async_read(service, io, "exist", 256,
+ 0, boost::asio::use_future);
+ auto f2 = librados::async_read(service, io, "noexist", 256,
+ 0, boost::asio::use_future);
service.run();
- EXPECT_NO_THROW({
- auto bl = f1.get();
- EXPECT_EQ("hello", bl.to_str());
- });
+ auto [ver, bl] = f1.get();
+ EXPECT_LT(0, ver);
+ EXPECT_EQ("hello", bl.to_str());
+
EXPECT_THROW(f2.get(), boost::system::system_error);
}
boost::asio::io_context service;
auto success_cr = [&] (boost::asio::yield_context yield) {
- boost::system::error_code ec;
- auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]);
+ error_code ec;
+ auto [ver, bl] = librados::async_read(service, io, "exist", 256,
+ 0, yield[ec]);
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
EXPECT_EQ("hello", bl.to_str());
};
boost::asio::spawn(service, success_cr, rethrow);
auto failure_cr = [&] (boost::asio::yield_context yield) {
- boost::system::error_code ec;
- auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]);
+ error_code ec;
+ auto [ver, bl] = librados::async_read(service, io, "noexist", 256,
+ 0, yield[ec]);
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ EXPECT_EQ(0, ver);
+ EXPECT_EQ(0, bl.length());
};
boost::asio::spawn(service, failure_cr, rethrow);
bufferlist bl;
bl.append("hello");
- auto success_cb = [&] (boost::system::error_code ec) {
+ auto success_cb = [&] (error_code ec, version_t ver) {
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
};
librados::async_write(service, io, "exist", bl, bl.length(), 0,
success_cb);
- auto failure_cb = [&] (boost::system::error_code ec) {
+ auto failure_cb = [&] (error_code ec, version_t ver) {
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ EXPECT_EQ(0, ver);
};
librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
failure_cb);
service.run();
- EXPECT_NO_THROW(f1.get());
+ EXPECT_LT(0, f1.get());
EXPECT_THROW(f2.get(), boost::system::system_error);
}
bl.append("hello");
auto success_cr = [&] (boost::asio::yield_context yield) {
- boost::system::error_code ec;
- librados::async_write(service, io, "exist", bl, bl.length(), 0,
- yield[ec]);
+ error_code ec;
+ auto ver = librados::async_write(service, io, "exist", bl,
+ bl.length(), 0, yield[ec]);
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
EXPECT_EQ("hello", bl.to_str());
};
boost::asio::spawn(service, success_cr, rethrow);
auto failure_cr = [&] (boost::asio::yield_context yield) {
- boost::system::error_code ec;
- librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
- yield[ec]);
+ error_code ec;
+ auto ver = librados::async_write(service, snapio, "exist", bl,
+ bl.length(), 0, yield[ec]);
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ EXPECT_EQ(0, ver);
};
boost::asio::spawn(service, failure_cr, rethrow);
{
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
- auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ auto success_cb = [&] (error_code ec, version_t ver, bufferlist bl) {
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
EXPECT_EQ("hello", bl.to_str());
};
librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb);
{
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
- auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ auto failure_cb = [&] (error_code ec, version_t ver, bufferlist bl) {
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ EXPECT_EQ(0, ver);
+ EXPECT_EQ(0, bl.length());
};
librados::async_operate(service, io, "noexist", &op, 0, nullptr, failure_cb);
}
TEST_F(AsioRados, AsyncReadOperationFuture)
{
boost::asio::io_context service;
- std::future<bufferlist> f1;
+ std::future<read_result> f1;
{
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr,
boost::asio::use_future);
}
- std::future<bufferlist> f2;
+ std::future<read_result> f2;
{
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
}
service.run();
- EXPECT_NO_THROW({
- auto bl = f1.get();
- EXPECT_EQ("hello", bl.to_str());
- });
+ auto [ver, bl] = f1.get();
+ EXPECT_LT(0, ver);
+ EXPECT_EQ("hello", bl.to_str());
+
EXPECT_THROW(f2.get(), boost::system::system_error);
}
auto success_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
- boost::system::error_code ec;
- auto bl = librados::async_operate(service, io, "exist", &op, 0, nullptr,
- yield[ec]);
+ error_code ec;
+ auto [ver, bl] = librados::async_operate(service, io, "exist", &op,
+ 0, nullptr, yield[ec]);
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
EXPECT_EQ("hello", bl.to_str());
};
boost::asio::spawn(service, success_cr, rethrow);
auto failure_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
- boost::system::error_code ec;
- auto bl = librados::async_operate(service, io, "noexist", &op, 0, nullptr,
- yield[ec]);
+ error_code ec;
+ auto [ver, bl] = librados::async_operate(service, io, "noexist", &op,
+ 0, nullptr, yield[ec]);
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ EXPECT_EQ(0, ver);
+ EXPECT_EQ(0, bl.length());
};
boost::asio::spawn(service, failure_cr, rethrow);
{
librados::ObjectWriteOperation op;
op.write_full(bl);
- auto success_cb = [&] (boost::system::error_code ec) {
+ auto success_cb = [&] (error_code ec, version_t ver) {
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
};
librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb);
}
{
librados::ObjectWriteOperation op;
op.write_full(bl);
- auto failure_cb = [&] (boost::system::error_code ec) {
+ auto failure_cb = [&] (error_code ec, version_t ver) {
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ EXPECT_EQ(0, ver);
};
librados::async_operate(service, snapio, "exist", &op, 0, nullptr, failure_cb);
}
bufferlist bl;
bl.append("hello");
- std::future<void> f1;
+ std::future<version_t> f1;
{
librados::ObjectWriteOperation op;
op.write_full(bl);
f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr,
boost::asio::use_future);
}
- std::future<void> f2;
+ std::future<version_t> f2;
{
librados::ObjectWriteOperation op;
op.write_full(bl);
}
service.run();
- EXPECT_NO_THROW(f1.get());
+ EXPECT_LT(0, f1.get());
EXPECT_THROW(f2.get(), boost::system::system_error);
}
auto success_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectWriteOperation op;
op.write_full(bl);
- boost::system::error_code ec;
- librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]);
+ error_code ec;
+ auto ver = librados::async_operate(service, io, "exist", &op,
+ 0, nullptr, yield[ec]);
EXPECT_FALSE(ec);
+ EXPECT_LT(0, ver);
};
boost::asio::spawn(service, success_cr, rethrow);
auto failure_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectWriteOperation op;
op.write_full(bl);
- boost::system::error_code ec;
- librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]);
+ error_code ec;
+ auto ver = librados::async_operate(service, snapio, "exist", &op,
+ 0, nullptr, yield[ec]);
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ EXPECT_EQ(0, ver);
};
boost::asio::spawn(service, failure_cr, rethrow);