#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
}
seastar::future<>
-PG::do_osd_op(const ObjectState& os, OSDOp& osd_op)
+PG::do_osd_op(const ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& txn)
{
- switch (const auto& op = osd_op.op; op.op) {
+ // TODO: dispatch via call table?
+ // TODO: we might want to find a way to unify both input and output
+ // of each op.
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];
case CEPH_OSD_OP_READ:
osd_op.outdata = std::move(bl);
return seastar::now();
});
+ case CEPH_OSD_OP_WRITE:
+ // TODO: handle write separately. For `rados bench write` the fall-
+ // through path somehow works but this is really nasty.
+ [[fallthrough]];
+ case CEPH_OSD_OP_WRITEFULL:
+ return backend->writefull(os, osd_op, txn);
+ case CEPH_OSD_OP_SETALLOCHINT:
+ return seastar::now();
default:
throw std::runtime_error(
fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
{
// todo: issue requests in parallel if they don't write,
// with writes being basically a synchronization barrier
- return seastar::do_with(std::move(m), [this](auto& m) {
+ return seastar::do_with(std::move(m), ceph::os::Transaction{},
+ [this](auto& m, auto& txn) {
return seastar::do_for_each(begin(m->ops), end(m->ops),
- [m,this](OSDOp& osd_op) {
+ [m,&txn,this](OSDOp& osd_op) {
const auto oid = (m->get_snapid() == CEPH_SNAPDIR ?
m->get_hobj().get_head() :
m->get_hobj());
- return backend->get_object_state(oid).then([&osd_op,this](auto os) {
- return do_osd_op(*os, osd_op);
+ return backend->get_object_state(oid).then([&osd_op,&txn,this](auto os) {
+ return do_osd_op(*os, osd_op, txn);
}).handle_exception_type([&osd_op](const object_not_found&) {
osd_op.rval = -ENOENT;
throw;
});
+ }).then([&] {
+ return txn.empty() ? seastar::now()
+ : backend->submit_transaction(std::move(txn));
}).then([=] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
0, false);
#include <seastar/core/shared_future.hh>
#include "crimson/net/Fwd.h"
+#include "crimson/os/Transaction.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
#include "recovery_state.h"
const std::vector<int>& new_acting,
int new_acting_primary);
seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
- seastar::future<> do_osd_op(const ObjectState& os, OSDOp& op);
+ seastar::future<> do_osd_op(
+ const ObjectState& os,
+ OSDOp& op,
+ ceph::os::Transaction& txn);
private:
const spg_t pgid;
return store->get_attr(coll,
ghobject_t{oid, ghobject_t::NO_GEN, shard},
OI_ATTR).then_wrapped([oid, this](auto fut) {
- auto oi = std::make_unique<object_info_t>();
if (fut.failed()) {
auto ep = std::move(fut).get_exception();
if (!ceph::os::CyanStore::EnoentException::is_class_of(ep)) {
return seastar::make_ready_future<bufferlist>(std::move(bl));
});
}
+
+seastar::future<> PGBackend::writefull(
+ const ObjectState& os,
+ const OSDOp& osd_op,
+ ceph::os::Transaction& txn)
+{
+ const ceph_osd_op& op = osd_op.op;
+ if (op.extent.length != osd_op.indata.length()) {
+ throw ::invalid_argument();
+ }
+
+ if (os.exists && op.extent.length < os.oi.size) {
+ txn.truncate(coll->cid, ghobject_t{os.oi.soid}, op.extent.length);
+ }
+ if (op.extent.length) {
+ txn.write(coll->cid, ghobject_t{os.oi.soid}, 0, op.extent.length,
+ osd_op.indata, op.flags);
+ }
+ return seastar::now();
+}
+
+seastar::future<> PGBackend::submit_transaction(ceph::os::Transaction&& txn)
+{
+ logger().trace("submit_transaction: num_ops={}", txn.get_num_ops());
+ return store->do_transaction(coll, std::move(txn));
+}