return pg->osdmap_gate.wait_for_map(
std::move(trigger), req->min_epoch);
}));
- co_await pg->handle_rep_op(req);
+
+ if (pg->can_discard_replica_op(*req)) {
+ co_return;
+ }
+
+ auto [commit_fut, reply] = co_await pg->handle_rep_op(req);
+
+ co_await std::move(commit_fut);
+
+ co_await interruptor::make_interruptible(
+ pg->shard_services.send_to_osd(
+ req->from.osd, std::move(reply), pg->get_osdmap_epoch())
+ );
}
seastar::future<> RepRequest::with_pg(
);
}
-PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
{
LOG_PREFIX(PG::handle_rep_op);
DEBUGDPP("{}", *this, *req);
- if (can_discard_replica_op(*req)) {
- co_return;
- }
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
txn,
false);
DEBUGDPP("{} do_transaction", *this, *req);
- co_await interruptor::make_interruptible(
+
+ auto commit_fut = interruptor::make_interruptible(
shard_services.get_store().do_transaction(coll_ref, std::move(txn))
);
req.get(), pg_whoami, 0,
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(lcod);
- co_await interruptor::make_interruptible(
- shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
- );
- co_return;
+ co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply));
}
PG::interruptible_future<> PG::update_snap_map(
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
- interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
+ using handle_rep_op_ret = std::tuple<
+ interruptible_future<>, // resolves upon commit
+ MURef<MOSDRepOpReply> // reply message
+ >;
+ // outer future resolves upon submission
+ using handle_rep_op_fut = interruptible_future<handle_rep_op_ret>;
+ handle_rep_op_fut handle_rep_op(Ref<MOSDRepOp> m);
void update_stats(const pg_stat_t &stat);
interruptible_future<> update_snap_map(
const std::vector<pg_log_entry_t> &log_entries,