#endif
logger().debug("calling method {}.{}", cname, mname);
-#if 0
- int prev_rd = ctx->num_read;
- int prev_wr = ctx->num_write;
-#endif
-
-
- return seastar::async([this, &osd_op, method, indata=std::move(indata)]() mutable {
+ return seastar::async([this, &osd_op, flags, method, indata=std::move(indata)]() mutable {
ceph::bufferlist outdata;
+ const auto prev_rd = num_read;
+ const auto prev_wr = num_write;
const auto ret = method->exec(reinterpret_cast<cls_method_context_t>(this),
indata, outdata);
+ if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
+ logger().error("method tried to read object but is not marked RD");
+ throw ceph::osd::input_output_error{};
+ }
+ if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
+ logger().error("method tried to update object but is not marked WR");
+ throw ceph::osd::input_output_error{};
+ }
+
+ // for write calls we never return data expect errors. For details refer
+ // to cls/cls_hello.cc.
+ if (ret < 0 || (flags & CLS_METHOD_WR) == 0) {
+ logger().debug("method called response length={}", outdata.length());
+ osd_op.op.extent.length = outdata.length();
+ osd_op.outdata.claim_append(outdata);
+ }
if (ret < 0) {
throw ceph::osd::make_error(ret);
}
-#if 0
- if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
- derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl;
- result = -EIO;
- break;
- }
- if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
- derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl;
- result = -EIO;
- break;
- }
-#endif
-
- logger().debug("method called response length={}", outdata.length());
- osd_op.op.extent.length = outdata.length();
- osd_op.outdata.claim_append(outdata);
});
}
});
}
-// TODO: split the method accordingly to os' constness needs
seastar::future<>
OpsExecuter::do_osd_op(OSDOp& osd_op)
{
case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];
case CEPH_OSD_OP_READ:
- return backend.read(os->oi,
- op.extent.offset,
- op.extent.length,
- op.extent.truncate_size,
- op.extent.truncate_seq,
- op.flags).then([&osd_op](bufferlist bl) {
- osd_op.rval = bl.length();
- osd_op.outdata = std::move(bl);
- return seastar::now();
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.read(os.oi,
+ osd_op.op.extent.offset,
+ osd_op.op.extent.length,
+ osd_op.op.extent.truncate_size,
+ osd_op.op.extent.truncate_seq,
+ osd_op.op.flags).then(
+ [&osd_op](bufferlist bl) {
+ osd_op.rval = bl.length();
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
});
case CEPH_OSD_OP_GETXATTR:
- return backend.getxattr(os, osd_op);
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.getxattr(os, osd_op);
+ });
case CEPH_OSD_OP_WRITE:
- return backend.write(*os, osd_op, txn);
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.write(os, osd_op, txn);
+ });
case CEPH_OSD_OP_WRITEFULL:
- // XXX: os = backend.write(std::move(os), ...) instead?
- return backend.writefull(*os, osd_op, txn);
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.writefull(os, osd_op, txn);
+ });
case CEPH_OSD_OP_SETALLOCHINT:
return seastar::now();
case CEPH_OSD_OP_SETXATTR:
- return backend.setxattr(*os, osd_op, txn);
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.setxattr(os, osd_op, txn);
+ });
case CEPH_OSD_OP_PGNLS:
return do_pgnls(osd_op.indata, os->oi.soid.get_namespace(), op.pgls.count)
.then([&osd_op](bufferlist bl) {
return seastar::now();
});
case CEPH_OSD_OP_DELETE:
- return backend.remove(*os, txn);
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.remove(os, txn);
+ });
case CEPH_OSD_OP_CALL:
return this->do_op_call(osd_op);
case CEPH_OSD_OP_STAT:
- return backend.stat(*os, osd_op);
+ // note: stat does not require RD
+ return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
+ return backend.stat(os, osd_op);
+ });
default:
logger().warn("unknown op {}", ceph_osd_op_name(op.op));
throw std::runtime_error(
PGBackend& backend;
ceph::os::Transaction txn;
+ size_t num_read = 0; ///< count read ops
+ size_t num_write = 0; ///< count update ops
+
seastar::future<ceph::bufferlist> do_pgnls(
ceph::bufferlist& indata,
const std::string& nspace,
uint64_t limit);
seastar::future<> do_op_call(class OSDOp& osd_op);
+ template <class Func>
+ auto do_const_op(Func&& f) {
+ // TODO: pass backend as read-only
+ return std::forward<Func>(f)(backend, const_cast<const ObjectState&>(*os));
+ }
+
+ template <class Func>
+ auto do_read_op(Func&& f) {
+ ++num_read;
+ // TODO: pass backend as read-only
+ return std::forward<Func>(f)(backend, const_cast<const ObjectState&>(*os));
+ }
+
+ template <class Func>
+ auto do_write_op(Func&& f) {
+ ++num_write;
+ return std::forward<Func>(f)(backend, *os, txn);
+ }
+
public:
OpsExecuter(PGBackend::cached_os_t os, PG& pg)
: os(std::move(os)), pg(pg), backend(pg.get_backend()) {