const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
- ceph_tid_t rep_tid,
- eversion_t &version)
+ ceph_tid_t rep_tid)
{
logger().debug("{}: {} rep_tid: {} error: {}",
__func__, *m, rep_tid, e);
ceph_assert(is_primary());
ceph_assert(!log_entries.empty());
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
- version = projected_last_update = log_entries.rbegin()->version;
-
+ log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version;
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
},
// failure_func
[m, &op_info, obc, this] (const std::error_code& e) {
- return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
- logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e);
- auto error_log_fut = seastar::now();
- epoch_t epoch = get_osdmap_epoch();
- ceph_tid_t rep_tid = shard_services.get_tid();
- auto last_complete = peering_state.get_info().last_complete;
- if (op_info.may_write()) {
- // This should be executed as OrderedExclusivePhaseT so that
- // successive ops will not reorder.
- // TODO: https://tracker.ceph.com/issues/61651
- error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
- }
- return error_log_fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
- auto fut = seastar::now();
- if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
- ceph_assert(version != eversion_t());
- auto it = log_entry_update_waiting_on.find(rep_tid);
- ceph_assert(it != log_entry_update_waiting_on.end());
- auto it2 = it->second.waiting_on.find(pg_whoami);
- ceph_assert(it2 != it->second.waiting_on.end());
- it->second.waiting_on.erase(it2);
- if (it->second.waiting_on.empty()) {
- log_entry_update_waiting_on.erase(it);
- peering_state.complete_write(version, last_complete);
- } else {
- fut = it->second.all_committed.get_shared_future().then(
- [this, &version, last_complete] {
- peering_state.complete_write(version, last_complete);
- return seastar::now();
- });
- }
+ logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e);
+ auto error_log_fut = seastar::now();
+ epoch_t epoch = get_osdmap_epoch();
+ ceph_tid_t rep_tid = shard_services.get_tid();
+ auto last_complete = peering_state.get_info().last_complete;
+ if (op_info.may_write()) {
+ error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid);
+ }
+ return error_log_fut.then([m, e, epoch, &op_info, rep_tid, last_complete, this] {
+ auto fut = seastar::now();
+ if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+ ceph_assert(log_entry_version.contains(rep_tid));
+ auto it = log_entry_update_waiting_on.find(rep_tid);
+ ceph_assert(it != log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg_whoami);
+ ceph_assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+ if (it->second.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(it);
+ peering_state.complete_write(log_entry_version[rep_tid], last_complete);
+ log_entry_version.erase(rep_tid);
+ } else {
+ fut = it->second.all_committed.get_shared_future().then(
+ [this, last_complete, rep_tid] {
+ peering_state.complete_write(log_entry_version[rep_tid], last_complete);
+ return seastar::now();
+ });
}
- return fut.then([this, m, e] {
- return log_reply(m, e);
- });
+ }
+ return fut.then([this, m, e] {
+ return log_reply(m, e);
});
});
});
it->second.all_committed.set_value();
it->second.all_committed = {};
log_entry_update_waiting_on.erase(it);
+ log_entry_version.erase(m->get_tid());
}
} else {
logger().error("{} : {} got reply {} on unknown tid {}",