std::move(log_entries));
co_return std::make_tuple(
std::move(submitted),
- all_completed.then_interruptible(
- [this, last_complete=peering_state.get_info().last_complete, at_version]
- (auto acked) {
- for (const auto& peer : acked) {
- peering_state.update_peer_last_complete_ondisk(
- peer.shard, peer.last_complete_ondisk);
- }
- peering_state.complete_write(at_version, last_complete);
- return seastar::now();
- })
+ std::move(all_completed)
);
}
assert(0 == "impossible");
}
if (--peers->pending == 0) {
+ // no peers other than me, replication size is 1
+ pg.complete_write(peers->at_version, peers->last_complete);
peers->all_committed.set_value();
peers->all_committed = {};
return seastar::now();
}
+ // wait for all peers to ack (ReplicatedBackend::got_rep_op_reply)
return peers->all_committed.get_shared_future();
}).then_interruptible([pending_txn, this, _new_clone, &hoid,
to_push_delete=std::move(to_push_delete),
for (auto& peer : peers.acked_peers) {
if (peer.shard == reply.from) {
peer.last_complete_ondisk = reply.get_last_complete_ondisk();
+ pg.update_peer_last_complete_ondisk(
+ peer.shard, peer.last_complete_ondisk);
if (--peers.pending == 0) {
+ pg.complete_write(peers.at_version, peers.last_complete);
peers.all_committed.set_value();
peers.all_committed = {};
}