From c5146998b8da1e89debe00477a30c84bec036cd3 Mon Sep 17 00:00:00 2001 From: Matan Breizman Date: Thu, 2 Nov 2023 10:00:06 +0000 Subject: [PATCH] crimson/osd/pg: introduce PG::log_entry_version map `submit_error_log()` was returning `version` to be used later in `failure_func` call to `complete_write()`. Maintain the version returned from `submit_error_log()` in a dedicated map to avoid handling the lifetime of 'version'. Note: This change is crucial to the following change that will return 'error_fut' separately. Signed-off-by: Matan Breizman --- src/crimson/osd/pg.cc | 71 ++++++++++++++++++++----------------------- src/crimson/osd/pg.h | 4 +-- 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 9de30555c156a..0f6065a837758 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -929,8 +929,7 @@ seastar::future<> PG::submit_error_log( const OpInfo &op_info, ObjectContextRef obc, const std::error_code e, - ceph_tid_t rep_tid, - eversion_t &version) + ceph_tid_t rep_tid) { logger().debug("{}: {} rep_tid: {} error: {}", __func__, *m, rep_tid, e); @@ -948,8 +947,7 @@ seastar::future<> PG::submit_error_log( ceph_assert(is_primary()); ceph_assert(!log_entries.empty()); ceph_assert(log_entries.rbegin()->version >= projected_last_update); - version = projected_last_update = log_entries.rbegin()->version; - + log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version; ceph::os::Transaction t; peering_state.merge_new_log_entries( log_entries, t, peering_state.get_pg_trim_to(), @@ -1044,41 +1042,37 @@ PG::do_osd_ops( }, // failure_func [m, &op_info, obc, this] (const std::error_code& e) { - return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) { - logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e); - auto error_log_fut = seastar::now(); - epoch_t epoch = get_osdmap_epoch(); - ceph_tid_t rep_tid = shard_services.get_tid(); - auto last_complete = peering_state.get_info().last_complete; - if (op_info.may_write()) { - // This should be executed as OrderedExclusivePhaseT so that - // successive ops will not reorder. - // TODO: https://tracker.ceph.com/issues/61651 - error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid, version); - } - return error_log_fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] { - auto fut = seastar::now(); - if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { - ceph_assert(version != eversion_t()); - auto it = log_entry_update_waiting_on.find(rep_tid); - ceph_assert(it != log_entry_update_waiting_on.end()); - auto it2 = it->second.waiting_on.find(pg_whoami); - ceph_assert(it2 != it->second.waiting_on.end()); - it->second.waiting_on.erase(it2); - if (it->second.waiting_on.empty()) { - log_entry_update_waiting_on.erase(it); - peering_state.complete_write(version, last_complete); - } else { - fut = it->second.all_committed.get_shared_future().then( - [this, &version, last_complete] { - peering_state.complete_write(version, last_complete); - return seastar::now(); - }); - } + logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e); + auto error_log_fut = seastar::now(); + epoch_t epoch = get_osdmap_epoch(); + ceph_tid_t rep_tid = shard_services.get_tid(); + auto last_complete = peering_state.get_info().last_complete; + if (op_info.may_write()) { + error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid); + } + return error_log_fut.then([m, e, epoch, &op_info, rep_tid, last_complete, this] { + auto fut = seastar::now(); + if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { + ceph_assert(log_entry_version.contains(rep_tid)); + auto it = log_entry_update_waiting_on.find(rep_tid); + ceph_assert(it != log_entry_update_waiting_on.end()); + auto it2 = it->second.waiting_on.find(pg_whoami); + ceph_assert(it2 != it->second.waiting_on.end()); + it->second.waiting_on.erase(it2); + if (it->second.waiting_on.empty()) { + log_entry_update_waiting_on.erase(it); + peering_state.complete_write(log_entry_version[rep_tid], last_complete); + log_entry_version.erase(rep_tid); + } else { + fut = it->second.all_committed.get_shared_future().then( + [this, last_complete, rep_tid] { + peering_state.complete_write(log_entry_version[rep_tid], last_complete); + return seastar::now(); + }); } - return fut.then([this, m, e] { - return log_reply(m, e); - }); + } + return fut.then([this, m, e] { + return log_reply(m, e); }); }); }); @@ -1401,6 +1395,7 @@ PG::interruptible_future<> PG::do_update_log_missing_reply( it->second.all_committed.set_value(); it->second.all_committed = {}; log_entry_update_waiting_on.erase(it); + log_entry_version.erase(m->get_tid()); } } else { logger().error("{} : {} got reply {} on unknown tid {}", diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index e06df874b8918..93e3ae82ec6eb 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -537,8 +537,7 @@ public: const OpInfo &op_info, ObjectContextRef obc, const std::error_code e, - ceph_tid_t rep_tid, - eversion_t &version); + ceph_tid_t rep_tid); private: @@ -769,6 +768,7 @@ private: }; std::map log_entry_update_waiting_on; + std::map log_entry_version; // snap trimming interval_set snap_trimq; }; -- 2.39.5