auto& wop = *rmw_pipeline.tid_to_op_map.at(op.tid);
if (op.committed) {
// TODO: trace.event("sub write committed");
- logger().debug("ECBackend::{} from {} pending_commit {}",
- __func__, from, wop.pending_commit);
- ceph_assert(wop.pending_commit.count(from));
- wop.pending_commit.erase(from);
+ logger().debug("ECBackend::{} from {} pending_commits {}",
+ __func__, from, wop.pending_commits);
+ ceph_assert(wop.pending_commits > 0);
+ --wop.pending_commits;
+ // update_peer_last_complete_ondisk() is called by the higher
+ // layer handler: PG::handle_rep_write_reply().
}
- if (op.applied) {
- // TODO: trace.event("sub write applied");
- ceph_assert(wop.pending_apply.count(from));
- wop.pending_apply.erase(from);
+ if (wop.pending_commits == 0) {
+ rmw_pipeline.try_finish_rmw();
}
-
- if (wop.pending_commit.empty() &&
- wop.on_all_commit &&
- // also wait for apply, to preserve ordering with luminous peers.
- wop.pending_apply.empty()) {
- logger().info("ECBackend::{}: calling on_all_commit on {}", __func__, wop);
- wop.on_all_commit->complete(0);
- wop.on_all_commit = 0;
- // TODO: wop.trace.event("ec write all committed");
- }
- rmw_pipeline.check_ops();
return write_iertr::now();
}