void PG::clear_log_entry_maps()
{
log_entry_update_waiting_on.clear();
- log_entry_version.clear();
}
void PG::on_activate(interval_set<snapid_t> snaps)
return submit_error_log(m, op_info, obc, e, rep_tid);
}
}
- return seastar::now();
+ return seastar::make_ready_future<std::optional<eversion_t>>(std::nullopt);
};
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
});
});
}), OpsExecuter::osd_op_errorator::all_same_way(
- [this, rollbacker, failure_func_ptr]
+ [rollbacker, failure_func_ptr]
(const std::error_code& e) mutable {
// handle non-fatal errors only
ceph_assert(e.value() == EDQUOT ||
e.value() == ENOSPC ||
e.value() == EAGAIN);
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
- [this, e, failure_func_ptr] {
+ [e, failure_func_ptr] {
// no need to record error log
- return (*failure_func_ptr)(e , shard_services.get_tid(), false);
+ return (*failure_func_ptr)(e);
});
}));
ceph_tid_t rep_tid = shard_services.get_tid();
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[maybe_submit_error_log=std::move(maybe_submit_error_log),
- e, rep_tid, failure_func_ptr] {
+ this, e, rep_tid, failure_func_ptr] {
// record error log
return maybe_submit_error_log(e, rep_tid).then(
- [failure_func_ptr, e, rep_tid] {
+ [this, failure_func_ptr, e, rep_tid] (auto version) {
+ auto all_completed =
+ [this, failure_func_ptr, e, rep_tid, version] {
+ if (version.has_value()) {
+ return complete_error_log(rep_tid, version.value()).then(
+ [failure_func_ptr, e] {
+ return (*failure_func_ptr)(e);
+ });
+ } else {
+ return (*failure_func_ptr)(e);
+ }
+ };
return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
std::move(seastar::now()),
- std::move((*failure_func_ptr)(e, rep_tid, true))
+ std::move(all_completed())
);
});
});
}));
}
-seastar::future<> PG::submit_error_log(
+seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
+ const eversion_t& version)
+{
+ auto result = seastar::now();
+ auto last_complete = peering_state.get_info().last_complete;
+ ceph_assert(log_entry_update_waiting_on.contains(rep_tid));
+ auto& log_update = log_entry_update_waiting_on[rep_tid];
+ ceph_assert(log_update.waiting_on.contains(pg_whoami));
+ log_update.waiting_on.erase(pg_whoami);
+ if (log_update.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(rep_tid);
+ peering_state.complete_write(version, last_complete);
+ logger().debug("complete_error_log: write complete,"
+ " erasing rep_tid {}", rep_tid);
+ } else {
+ logger().debug("complete_error_log: rep_tid {} awaiting update from {}",
+ rep_tid, log_update.waiting_on);
+ result = log_update.all_committed.get_shared_future().then(
+ [this, last_complete, rep_tid, version] {
+ logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid);
+ peering_state.complete_write(version, last_complete);
+ ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
+ return seastar::now();
+ });
+ }
+ return result;
+}
+
+seastar::future<std::optional<eversion_t>> PG::submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
ceph_assert(is_primary());
ceph_assert(!log_entries.empty());
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
- log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version;
+ projected_last_update = log_entries.rbegin()->version;
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
get_collection_ref(), std::move(t)
).then([this] {
peering_state.update_trim_to();
- return seastar::now();
+ return seastar::make_ready_future<std::optional<eversion_t>>(projected_last_update);
});
});
});
std::move(reply));
},
// failure_func
- [m, &op_info, obc, this]
- (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) {
- logger().error("do_osd_ops_execute::failure_func {} got error: {} record_error: {}",
- *m, e, record_error);
- epoch_t epoch = get_osdmap_epoch();
- auto last_complete = peering_state.get_info().last_complete;
- auto fut = seastar::now();
- if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
- logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}",
- rep_tid);
- ceph_assert(log_entry_version.contains(rep_tid));
- auto it = log_entry_update_waiting_on.find(rep_tid);
- ceph_assert(it != log_entry_update_waiting_on.end());
- auto it2 = it->second.waiting_on.find(pg_whoami);
- ceph_assert(it2 != it->second.waiting_on.end());
- it->second.waiting_on.erase(it2);
- if (it->second.waiting_on.empty()) {
- log_entry_update_waiting_on.erase(it);
- peering_state.complete_write(log_entry_version[rep_tid], last_complete);
- log_entry_version.erase(rep_tid);
- logger().debug("do_osd_ops_execute::failure_func write complete,"
- " erasing rep_tid {}", rep_tid);
-
- } else {
- fut = it->second.all_committed.get_shared_future().then(
- [this, last_complete, rep_tid] {
- logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid);
- peering_state.complete_write(log_entry_version[rep_tid], last_complete);
- ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
- return seastar::now();
- });
- }
- }
- return fut.then([this, m, e] {
- return log_reply(m, e);
- });
+ [m, this]
+ (const std::error_code& e) {
+ logger().error("do_osd_ops_execute::failure_func {} got error: {}",
+ *m, e);
+ return log_reply(m, e);
});
}
return do_osd_ops_iertr::now();
},
// failure_func
- [] (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) {
+ [] (const std::error_code& e) {
return do_osd_ops_iertr::now();
});
});
logger().debug("{}: erasing rep_tid {}",
__func__, m->get_tid());
log_entry_update_waiting_on.erase(it);
- log_entry_version.erase(m->get_tid());
}
} else {
logger().error("{} : {} got reply {} on unknown tid {}",