From: Matan Breizman Date: Thu, 7 Dec 2023 13:48:02 +0000 (+0000) Subject: crimson/osd/pg: submit_error_log cleanup X-Git-Tag: testing/wip-batrick-testing-20240411.154038~637^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b44c41f08c2c2196197a5ce535466c94bb09398a;p=ceph-ci.git crimson/osd/pg: submit_error_log cleanup * error log completion logic is moved into maybe_submit_error_log * renamed it and it2 * maybe_submit_error_log is moved outside of failure_func * failure_func no longer gets rep_tid and record_error params * log_entry_version is removed, submit_error_log returns the version instead Signed-off-by: Matan Breizman --- diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index a605745a3be..cd2478d58ea 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -323,7 +323,6 @@ void PG::on_removal(ceph::os::Transaction &t) { void PG::clear_log_entry_maps() { log_entry_update_waiting_on.clear(); - log_entry_version.clear(); } void PG::on_activate(interval_set snaps) @@ -830,7 +829,7 @@ PG::do_osd_ops_execute( return submit_error_log(m, op_info, obc, e, rep_tid); } } - return seastar::now(); + return seastar::make_ready_future>(std::nullopt); }; auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { @@ -910,16 +909,16 @@ PG::do_osd_ops_execute( }); }); }), OpsExecuter::osd_op_errorator::all_same_way( - [this, rollbacker, failure_func_ptr] + [rollbacker, failure_func_ptr] (const std::error_code& e) mutable { // handle non-fatal errors only ceph_assert(e.value() == EDQUOT || e.value() == ENOSPC || e.value() == EAGAIN); return rollbacker.rollback_obc_if_modified(e).then_interruptible( - [this, e, failure_func_ptr] { + [e, failure_func_ptr] { // no need to record error log - return (*failure_func_ptr)(e , shard_services.get_tid(), false); + return (*failure_func_ptr)(e); }); })); @@ -935,20 +934,59 @@ PG::do_osd_ops_execute( ceph_tid_t rep_tid = shard_services.get_tid(); return rollbacker.rollback_obc_if_modified(e).then_interruptible( [maybe_submit_error_log=std::move(maybe_submit_error_log), - e, rep_tid, failure_func_ptr] { + this, e, rep_tid, failure_func_ptr] { // record error log return maybe_submit_error_log(e, rep_tid).then( - [failure_func_ptr, e, rep_tid] { + [this, failure_func_ptr, e, rep_tid] (auto version) { + auto all_completed = + [this, failure_func_ptr, e, rep_tid, version] { + if (version.has_value()) { + return complete_error_log(rep_tid, version.value()).then( + [failure_func_ptr, e] { + return (*failure_func_ptr)(e); + }); + } else { + return (*failure_func_ptr)(e); + } + }; return PG::do_osd_ops_iertr::make_ready_future>( std::move(seastar::now()), - std::move((*failure_func_ptr)(e, rep_tid, true)) + std::move(all_completed()) ); }); }); })); } -seastar::future<> PG::submit_error_log( +seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid, + const eversion_t& version) +{ + auto result = seastar::now(); + auto last_complete = peering_state.get_info().last_complete; + ceph_assert(log_entry_update_waiting_on.contains(rep_tid)); + auto& log_update = log_entry_update_waiting_on[rep_tid]; + ceph_assert(log_update.waiting_on.contains(pg_whoami)); + log_update.waiting_on.erase(pg_whoami); + if (log_update.waiting_on.empty()) { + log_entry_update_waiting_on.erase(rep_tid); + peering_state.complete_write(version, last_complete); + logger().debug("complete_error_log: write complete," + " erasing rep_tid {}", rep_tid); + } else { + logger().debug("complete_error_log: rep_tid {} awaiting update from {}", + rep_tid, log_update.waiting_on); + result = log_update.all_committed.get_shared_future().then( + [this, last_complete, rep_tid, version] { + logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid); + peering_state.complete_write(version, last_complete); + ceph_assert(!log_entry_update_waiting_on.contains(rep_tid)); + return seastar::now(); + }); + } + return result; +} + +seastar::future> PG::submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -971,7 +1009,7 @@ seastar::future<> PG::submit_error_log( ceph_assert(is_primary()); ceph_assert(!log_entries.empty()); ceph_assert(log_entries.rbegin()->version >= projected_last_update); - log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version; + projected_last_update = log_entries.rbegin()->version; ceph::os::Transaction t; peering_state.merge_new_log_entries( log_entries, t, peering_state.get_pg_trim_to(), @@ -1014,7 +1052,7 @@ seastar::future<> PG::submit_error_log( get_collection_ref(), std::move(t) ).then([this] { peering_state.update_trim_to(); - return seastar::now(); + return seastar::make_ready_future>(projected_last_update); }); }); }); @@ -1078,42 +1116,11 @@ PG::do_osd_ops( std::move(reply)); }, // failure_func - [m, &op_info, obc, this] - (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) { - logger().error("do_osd_ops_execute::failure_func {} got error: {} record_error: {}", - *m, e, record_error); - epoch_t epoch = get_osdmap_epoch(); - auto last_complete = peering_state.get_info().last_complete; - auto fut = seastar::now(); - if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { - logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}", - rep_tid); - ceph_assert(log_entry_version.contains(rep_tid)); - auto it = log_entry_update_waiting_on.find(rep_tid); - ceph_assert(it != log_entry_update_waiting_on.end()); - auto it2 = it->second.waiting_on.find(pg_whoami); - ceph_assert(it2 != it->second.waiting_on.end()); - it->second.waiting_on.erase(it2); - if (it->second.waiting_on.empty()) { - log_entry_update_waiting_on.erase(it); - peering_state.complete_write(log_entry_version[rep_tid], last_complete); - log_entry_version.erase(rep_tid); - logger().debug("do_osd_ops_execute::failure_func write complete," - " erasing rep_tid {}", rep_tid); - - } else { - fut = it->second.all_committed.get_shared_future().then( - [this, last_complete, rep_tid] { - logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid); - peering_state.complete_write(log_entry_version[rep_tid], last_complete); - ceph_assert(!log_entry_update_waiting_on.contains(rep_tid)); - return seastar::now(); - }); - } - } - return fut.then([this, m, e] { - return log_reply(m, e); - }); + [m, this] + (const std::error_code& e) { + logger().error("do_osd_ops_execute::failure_func {} got error: {}", + *m, e); + return log_reply(m, e); }); } @@ -1177,7 +1184,7 @@ PG::do_osd_ops( return do_osd_ops_iertr::now(); }, // failure_func - [] (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) { + [] (const std::error_code& e) { return do_osd_ops_iertr::now(); }); }); @@ -1442,7 +1449,6 @@ PG::interruptible_future<> PG::do_update_log_missing_reply( logger().debug("{}: erasing rep_tid {}", __func__, m->get_tid()); log_entry_update_waiting_on.erase(it); - log_entry_version.erase(m->get_tid()); } } else { logger().error("{} : {} got reply {} on unknown tid {}", diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 80a181f24a2..b829ea177db 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -533,7 +533,9 @@ public: void print(std::ostream& os) const; void dump_primary(Formatter*); - seastar::future<> submit_error_log( + seastar::future<> complete_error_log(const ceph_tid_t& rep_tid, + const eversion_t& version); + seastar::future> submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -791,7 +793,6 @@ private: }; std::map log_entry_update_waiting_on; - std::map log_entry_version; // snap trimming interval_set snap_trimq; };