logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid());
co_await enter_stage<interruptor>(client_pp().process);
+ auto all_completed = interruptor::now();
+ {
+ // as with PG::submit_executer, we need to build the pg log entries
+ // and submit the transaction atomically
+ co_await interruptor::make_interruptible(pg->submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ pg->submit_lock.unlock();
+ });
- logger().debug("{}: processing obc={}", *this, obc_manager.get_obc()->get_oid());
-
- auto txn = co_await remove_or_update(
- obc_manager.get_obc(), obc_manager.get_head_obc());
-
- auto [submitted, all_completed] = co_await pg->submit_transaction(
- ObjectContextRef(obc_manager.get_obc()),
- nullptr,
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries)
- );
-
- co_await std::move(submitted);
+ logger().debug("{}: calling remove_or_update obc={}",
+ *this, obc_manager.get_obc()->get_oid());
+
+ auto txn = co_await remove_or_update(
+ obc_manager.get_obc(), obc_manager.get_head_obc());
+
+ auto submitted = interruptor::now();
+ std::tie(submitted, all_completed) = co_await pg->submit_transaction(
+ ObjectContextRef(obc_manager.get_obc()),
+ nullptr,
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries)
+ );
+ co_await std::move(submitted);
+ }
co_await enter_stage<interruptor>(client_pp().wait_repop);
const std::error_code e,
ceph_tid_t rep_tid)
{
+ // as with submit_executer, need to ensure that log numbering and submission
+ // are atomic
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
LOG_PREFIX(PG::submit_error_log);
DEBUGDPP("{} rep_tid: {} error: {}",
*this, *m, rep_tid, e);
OpsExecuter &&ox,
const std::vector<OSDOp>& ops) {
LOG_PREFIX(PG::submit_executer);
- // transaction must commit at this point
- return std::move(
+
+ // we need to build the pg log entries and submit the transaction
+ // atomically to ensure log ordering
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
+
+ auto [submitted, completed] = co_await std::move(
ox
).flush_changes_n_do_ops_effects(
ops,
std::move(osd_op_p),
std::move(log_entries));
});
+
+ co_return std::make_tuple(
+ std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}),
+ std::move(completed));
}
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)