}));
}));
}
+seastar::future<> PG::submit_error_log(
+ Ref<MOSDOp> m,
+ const OpInfo &op_info,
+ ObjectContextRef obc,
+ const std::error_code e,
+ ceph_tid_t rep_tid,
+ eversion_t &version)
+{
+ const osd_reqid_t &reqid = m->get_reqid();
+ mempool::osd_pglog::list<pg_log_entry_t> log_entries;
+ log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
+ obc->obs.oi.soid,
+ next_version(),
+ eversion_t(), 0,
+ reqid, utime_t(),
+ -e.value()));
+ if (op_info.allows_returnvec()) {
+ log_entries.back().set_op_returns(m->ops);
+ }
+ ceph_assert(is_primary());
+ if (!log_entries.empty()) {
+ ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+ version = projected_last_update = log_entries.rbegin()->version;
+ }
+ ceph::os::Transaction t;
+ peering_state.merge_new_log_entries(
+ log_entries, t, peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+
+ set<pg_shard_t> waiting_on;
+ for (auto &i : get_acting_recovery_backfill()) {
+ pg_shard_t peer(i);
+ if (peer == pg_whoami) continue;
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, i.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+ send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
+ waiting_on.insert(peer);
+ }
+ waiting_on.insert(pg_whoami);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
+ return shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t))
+ .then([this] {
+ peering_state.update_trim_to();
+ return seastar::now();
+ });
+}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
},
- [m, this] (const std::error_code& e) {
- auto reply = crimson::make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- if (m->ops.empty() ? 0 :
- m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
- reply->set_result(0);
+ [m, &op_info, obc, this] (const std::error_code& e) {
+ return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
+ auto fut = seastar::now();
+ epoch_t epoch = get_osdmap_epoch();
+ ceph_tid_t rep_tid = shard_services.get_tid();
+ auto last_complete = peering_state.get_info().last_complete;
+ if (op_info.may_write()) {
+ fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
}
- reply->set_enoent_reply_versions(
- peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
+ auto log_reply = [m, e, this] {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ if (m->ops.empty() ? 0 :
+ m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+ reply->set_result(0);
+ }
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+ std::move(reply));
+ };
+
+ if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+ auto it = log_entry_update_waiting_on.find(rep_tid);
+ ceph_assert(it != log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg_whoami);
+ ceph_assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+
+ if (it->second.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(it);
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ } else {
+ return it->second.all_committed.get_shared_future()
+ .then([this, &version, last_complete, log_reply = std::move(log_reply)] {
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ });
+ }
+ } else {
+ return log_reply();
+ }
+ });
});
+ });
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>