});
}
+OpsExecuter::osd_op_errorator::future<>
+OpsExecuter::execute_osd_op(OSDOp& osd_op)
+{
+ // TODO: dispatch via call table?
+ // TODO: we might want to find a way to unify both input and output
+ // of each op.
+ logger().debug(
+ "handling op {} on object {}",
+ ceph_osd_op_name(osd_op.op.op),
+ get_target());
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
+ case CEPH_OSD_OP_SYNC_READ:
+ [[fallthrough]];
+ case CEPH_OSD_OP_READ:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.read(os.oi,
+ osd_op.op.extent.offset,
+ osd_op.op.extent.length,
+ osd_op.op.extent.truncate_size,
+ osd_op.op.extent.truncate_seq,
+ osd_op.op.flags).safe_then(
+ [&osd_op](ceph::bufferlist&& bl) {
+ osd_op.rval = bl.length();
+ osd_op.outdata = std::move(bl);
+ return osd_op_errorator::now();
+ });
+ });
+ case CEPH_OSD_OP_SPARSE_READ:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.sparse_read(os, osd_op);
+ });
+ case CEPH_OSD_OP_CHECKSUM:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.checksum(os, osd_op);
+ });
+ case CEPH_OSD_OP_GETXATTR:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.getxattr(os, osd_op);
+ });
+ case CEPH_OSD_OP_GETXATTRS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.get_xattrs(os, osd_op);
+ });
+ case CEPH_OSD_OP_CREATE:
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.create(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_WRITE:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.write(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_WRITEFULL:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.writefull(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_TRUNCATE:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ // FIXME: rework needed. Move this out to do_write_op(), introduce
+ // do_write_op_no_user_modify()...
+ return backend.truncate(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_ZERO:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.zero(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_SETALLOCHINT:
+ return osd_op_errorator::now();
+ case CEPH_OSD_OP_SETXATTR:
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.setxattr(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_DELETE:
+ return do_write_op([] (auto& backend, auto& os, auto& txn) {
+ return backend.remove(os, txn);
+ }, true);
+ case CEPH_OSD_OP_CALL:
+ return this->do_op_call(osd_op);
+ case CEPH_OSD_OP_STAT:
+ // note: stat does not require RD
+ return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
+ return backend.stat(os, osd_op);
+ });
+ case CEPH_OSD_OP_TMAPUP:
+ // TODO: there was an effort to kill TMAP in ceph-osd. According to
+ // @dzafman this isn't possible yet. Maybe it could be accomplished
+ // before crimson's readiness and we'd luckily don't need to carry.
+ return dont_do_legacy_op();
+
+ // OMAP
+ case CEPH_OSD_OP_OMAPGETKEYS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_keys(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETVALS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETHEADER:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_header(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals_by_keys(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPSETVALS:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_set_vals(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_OMAPSETHEADER:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_set_header(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_OMAPRMKEYRANGE:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_remove_range(os, osd_op, txn);
+ }, true);
+
+ // watch/notify
+ case CEPH_OSD_OP_WATCH:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_op_watch(osd_op, os, txn);
+ }, false);
+ case CEPH_OSD_OP_NOTIFY:
+ return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_op_notify(osd_op, os);
+ });
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_op_notify_ack(osd_op, os);
+ });
+
+ default:
+ logger().warn("unknown op {}", ceph_osd_op_name(op.op));
+ throw std::runtime_error(
+ fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
+ }
+}
+
static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
const std::string& type,
bufferlist::const_iterator& iter)
});
}
-OpsExecuter::osd_op_errorator::future<>
-OpsExecuter::execute_osd_op(OSDOp& osd_op)
-{
- // TODO: dispatch via call table?
- // TODO: we might want to find a way to unify both input and output
- // of each op.
- logger().debug(
- "handling op {} on object {}",
- ceph_osd_op_name(osd_op.op.op),
- get_target());
- switch (const ceph_osd_op& op = osd_op.op; op.op) {
- case CEPH_OSD_OP_SYNC_READ:
- [[fallthrough]];
- case CEPH_OSD_OP_READ:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.read(os.oi,
- osd_op.op.extent.offset,
- osd_op.op.extent.length,
- osd_op.op.extent.truncate_size,
- osd_op.op.extent.truncate_seq,
- osd_op.op.flags).safe_then(
- [&osd_op](ceph::bufferlist&& bl) {
- osd_op.rval = bl.length();
- osd_op.outdata = std::move(bl);
- return osd_op_errorator::now();
- });
- });
- case CEPH_OSD_OP_SPARSE_READ:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.sparse_read(os, osd_op);
- });
- case CEPH_OSD_OP_CHECKSUM:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.checksum(os, osd_op);
- });
- case CEPH_OSD_OP_GETXATTR:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.getxattr(os, osd_op);
- });
- case CEPH_OSD_OP_GETXATTRS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.get_xattrs(os, osd_op);
- });
- case CEPH_OSD_OP_CREATE:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.create(os, osd_op, txn);
- }, true);
- case CEPH_OSD_OP_WRITE:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.write(os, osd_op, txn, *osd_op_params);
- }, true);
- case CEPH_OSD_OP_WRITEFULL:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.writefull(os, osd_op, txn, *osd_op_params);
- }, true);
- case CEPH_OSD_OP_TRUNCATE:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- // FIXME: rework needed. Move this out to do_write_op(), introduce
- // do_write_op_no_user_modify()...
- return backend.truncate(os, osd_op, txn, *osd_op_params);
- }, true);
- case CEPH_OSD_OP_ZERO:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.zero(os, osd_op, txn, *osd_op_params);
- }, true);
- case CEPH_OSD_OP_SETALLOCHINT:
- return osd_op_errorator::now();
- case CEPH_OSD_OP_SETXATTR:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.setxattr(os, osd_op, txn);
- }, true);
- case CEPH_OSD_OP_DELETE:
- return do_write_op([] (auto& backend, auto& os, auto& txn) {
- return backend.remove(os, txn);
- }, true);
- case CEPH_OSD_OP_CALL:
- return this->do_op_call(osd_op);
- case CEPH_OSD_OP_STAT:
- // note: stat does not require RD
- return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
- return backend.stat(os, osd_op);
- });
- case CEPH_OSD_OP_TMAPUP:
- // TODO: there was an effort to kill TMAP in ceph-osd. According to
- // @dzafman this isn't possible yet. Maybe it could be accomplished
- // before crimson's readiness and we'd luckily don't need to carry.
- return dont_do_legacy_op();
-
- // OMAP
- case CEPH_OSD_OP_OMAPGETKEYS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_keys(os, osd_op);
- });
- case CEPH_OSD_OP_OMAPGETVALS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_vals(os, osd_op);
- });
- case CEPH_OSD_OP_OMAPGETHEADER:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_header(os, osd_op);
- });
- case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_vals_by_keys(os, osd_op);
- });
- case CEPH_OSD_OP_OMAPSETVALS:
-#if 0
- if (!pg.get_pool().info.supports_omap()) {
- return crimson::ct_error::operation_not_supported::make();
- }
-#endif
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_set_vals(os, osd_op, txn, *osd_op_params);
- }, true);
- case CEPH_OSD_OP_OMAPSETHEADER:
-#if 0
- if (!pg.get_pool().info.supports_omap()) {
- return crimson::ct_error::operation_not_supported::make();
- }
-#endif
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_set_header(os, osd_op, txn);
- }, true);
- case CEPH_OSD_OP_OMAPRMKEYRANGE:
-#if 0
- if (!pg.get_pool().info.supports_omap()) {
- return crimson::ct_error::operation_not_supported::make();
- }
-#endif
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_remove_range(os, osd_op, txn);
- }, true);
-
- // watch/notify
- case CEPH_OSD_OP_WATCH:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return do_op_watch(osd_op, os, txn);
- }, false);
- case CEPH_OSD_OP_NOTIFY:
- return do_read_op([this, &osd_op] (auto&, const auto& os) {
- return do_op_notify(osd_op, os);
- });
- case CEPH_OSD_OP_NOTIFY_ACK:
- return do_read_op([this, &osd_op] (auto&, const auto& os) {
- return do_op_notify_ack(osd_op, os);
- });
-
- default:
- logger().warn("unknown op {}", ceph_osd_op_name(op.op));
- throw std::runtime_error(
- fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
- }
-}
-
static seastar::future<ceph::bufferlist> do_pgls_common(
const hobject_t& pg_start,
const hobject_t& pg_end,