}
ECBackend::write_iertr::future<>
-ECBackend::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>)
+ECBackend::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply> m)
{
+ const auto& op = m->op;
+ assert(rmw_pipeline.tid_to_op_map.contains(op.tid));
+ const auto& from = op.from;
+ auto& wop = *rmw_pipeline.tid_to_op_map.at(op.tid);
+ if (op.committed) {
+ // TODO: trace.event("sub write committed");
+ ceph_assert(wop.pending_commit.count(from));
+ wop.pending_commit.erase(from);
+ }
+ if (op.applied) {
+ // TODO: trace.event("sub write applied");
+ ceph_assert(wop.pending_apply.count(from));
+ wop.pending_apply.erase(from);
+ }
+
+ if (wop.pending_commit.empty() &&
+ wop.on_all_commit &&
+ // also wait for apply, to preserve ordering with luminous peers.
+ wop.pending_apply.empty()) {
+ logger().info("{}: calling on_all_commit on {}", __func__, wop);
+ wop.on_all_commit->complete(0);
+ wop.on_all_commit = 0;
+ // TODO: wop.trace.event("ec write all committed");
+ }
+ rmw_pipeline.check_ops();
return write_iertr::now();
}
[pg] (Ref<MOSDECSubOpWrite> concrete_req) {
return pg->handle_rep_write_op(std::move(concrete_req));
},
- [ec_backend] (Ref<MOSDECSubOpWriteReply> concrete_req) {
- return ec_backend->handle_rep_write_reply(
- std::move(concrete_req)
- ).handle_error_interruptible(crimson::ct_error::assert_all{});
+ [pg] (Ref<MOSDECSubOpWriteReply> concrete_req) {
+ return pg->handle_rep_write_reply(std::move(concrete_req));
},
[pg] (Ref<MOSDECSubOpRead> concrete_req) {
return pg->handle_rep_read_op(std::move(concrete_req));
}).handle_error_interruptible(crimson::ct_error::assert_all{});
}
+PG::interruptible_future<> PG::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply> m)
+{
+ if (const auto& op = m->op; op.committed) {
+ // TODO: trace.event("sub write committed");
+ if (op.from != pg_whoami) {
+ peering_state.update_peer_last_complete_ondisk(op.from, op.last_complete);
+ }
+ }
+ auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
+ assert(ec_backend);
+ return ec_backend->handle_rep_write_reply(
+ std::move(m)
+ ).handle_error_interruptible(crimson::ct_error::assert_all{});
+}
+
PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
{
auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
seastar::future<> clear_temp_objects();
interruptible_future<> handle_rep_write_op(Ref<MOSDECSubOpWrite>);
+ interruptible_future<> handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>);
interruptible_future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
private: