return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.setxattr(os, osd_op, txn);
});
- case CEPH_OSD_OP_PGNLS_FILTER:
- return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
- return do_pgnls_filtered(pg, nspace, osd_op);
- });
- case CEPH_OSD_OP_PGNLS:
- return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
- return do_pgnls(pg, nspace, osd_op);
- });
case CEPH_OSD_OP_DELETE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.remove(os, txn);
}
}
+seastar::future<>
+OpsExecuter::execute_pg_op(OSDOp& osd_op)
+{
+ logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
+ case CEPH_OSD_OP_PGNLS:
+ return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
+ return do_pgnls(pg, nspace, osd_op);
+ });
+ case CEPH_OSD_OP_PGNLS_FILTER:
+ return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
+ return do_pgnls_filtered(pg, nspace, osd_op);
+ });
+ default:
+ logger().warn("unknown op {}", ceph_osd_op_name(op.op));
+ throw std::runtime_error(
+ fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
+ }
+}
+
} // namespace ceph::osd
backend(pg.get_backend()),
msg(std::move(msg)) {
}
+ OpsExecuter(PG& pg, Ref<MOSDOp> msg)
+ : OpsExecuter{PGBackend::cached_os_t{}, pg, std::move(msg)}
+ {}
seastar::future<> execute_osd_op(class OSDOp& osd_op);
+ seastar::future<> execute_pg_op(class OSDOp& osd_op);
template <typename Func>
seastar::future<> submit_changes(Func&& f) &&;
});
}
+seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+{
+ return seastar::do_with(OpsExecuter{*this/* as const& */, m},
+ [this, m] (auto& ox) {
+ return seastar::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
+ logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
+ return ox.execute_pg_op(osd_op);
+ });
+ }).then([m, this] {
+ auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ false);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }).handle_exception_type([=](const ceph::osd::error& e) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ });
+}
+
seastar::future<> PG::handle_op(ceph::net::Connection* conn,
Ref<MOSDOp> m)
{
if (m->finish_decode()) {
m->clear_payload();
}
- return do_osd_ops(m);
+ if (std::any_of(begin(m->ops), end(m->ops),
+ [](auto& op) { return ceph_osd_op_type_pg(op.op.op); })) {
+ return do_pg_ops(m);
+ } else {
+ return do_osd_ops(m);
+ }
}).then([conn](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
const boost::statechart::event_base &evt,
PeeringCtx &rctx);
seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
+ seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
seastar::future<> do_osd_op(
ObjectState& os,
OSDOp& op,