From f7b55ec14415811d750e79838f746a91106ba81e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 21 Oct 2016 14:33:08 -0700 Subject: [PATCH] osd/: Update PGBackend users to project last_update and submit stat deltas The RMW pipeline means that we don't start committing an update immediately, so we can't update the log syncronously with submit_transaction. Thus, in order to pipeline writes, PG/ReplicatedPG will need to project last_update and abstain from updating info directly (updating info.stats was the only offender). Signed-off-by: Samuel Just --- src/osd/PG.cc | 2 ++ src/osd/PG.h | 7 +++++-- src/osd/PGBackend.h | 6 ++++++ src/osd/ReplicatedBackend.cc | 5 +++++ src/osd/ReplicatedBackend.h | 1 + src/osd/ReplicatedPG.cc | 36 +++++++++++++++++++++++------------- src/osd/ReplicatedPG.h | 8 ++++++-- 7 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 73c71bbdc9be4..030d4abce5301 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -5115,6 +5115,8 @@ void PG::start_peering_interval( // pg->on_* on_change(t); + projected_last_update = eversion_t(); + assert(!deleting); // should we tell the primary we are here? diff --git a/src/osd/PG.h b/src/osd/PG.h index ce79c69660fde..517970cc68f26 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -2192,11 +2192,14 @@ public: PerfCounters *logger = NULL); void write_if_dirty(ObjectStore::Transaction& t); + eversion_t projected_last_update; eversion_t get_next_version() const { - eversion_t at_version(get_osdmap()->get_epoch(), - pg_log.get_head().version+1); + eversion_t at_version( + get_osdmap()->get_epoch(), + projected_last_update.version+1); assert(at_version > info.last_update); assert(at_version > pg_log.get_head()); + assert(at_version > projected_last_update); return at_version; } diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index c92d44fb8558d..418c2c08a88f6 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -106,6 +106,11 @@ typedef ceph::shared_ptr OSDMapRef; virtual void cancel_pull(const hobject_t &soid) = 0; + virtual void apply_stats( + const hobject_t &soid, + const object_stat_sum_t &delta_stats) = 0; + + /** * Bless a context * @@ -380,6 +385,7 @@ typedef ceph::shared_ptr OSDMapRef; /// execute implementation specific transaction virtual void submit_transaction( const hobject_t &hoid, ///< [in] object + const object_stat_sum_t &delta_stats,///< [in] stat change const eversion_t &at_version, ///< [in] version PGTransactionUPtr &&t, ///< [in] trans to execute (move) const eversion_t &trim_to, ///< [in] trim log to here diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 795e34dae3a56..1bd94c5cc523f 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -521,6 +521,7 @@ void generate_transaction( void ReplicatedBackend::submit_transaction( const hobject_t &soid, + const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&_t, const eversion_t &trim_to, @@ -534,6 +535,10 @@ void ReplicatedBackend::submit_transaction( osd_reqid_t reqid, OpRequestRef orig_op) { + parent->apply_stats( + soid, + delta_stats); + vector log_entries(_log_entries); ObjectStore::Transaction op_t; PGTransactionUPtr t(std::move(_t)); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 65a4c55d688b3..6b8df0a5d3401 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -344,6 +344,7 @@ public: friend class C_OSD_OnOpApplied; void submit_transaction( const hobject_t &hoid, + const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ca28a1d50ceb7..aa25002821258 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6901,25 +6901,30 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc ctx->obc->ssc->snapset = ctx->new_snapset; } - apply_ctx_stats(ctx, scrub_ok); + apply_ctx_scrub_stats(ctx, scrub_ok); } -void ReplicatedPG::apply_ctx_stats(OpContext *ctx, bool scrub_ok) -{ - info.stats.stats.add(ctx->delta_stats); +void ReplicatedPG::apply_stats( + const hobject_t &soid, + const object_stat_sum_t &delta_stats) { + + info.stats.stats.add(delta_stats); - const hobject_t& soid = ctx->obs->oi.soid; for (set::iterator i = backfill_targets.begin(); i != backfill_targets.end(); ++i) { pg_shard_t bt = *i; pg_info_t& pinfo = peer_info[bt]; if (cmp(soid, pinfo.last_backfill, get_sort_bitwise()) <= 0) - pinfo.stats.stats.add(ctx->delta_stats); + pinfo.stats.stats.add(delta_stats); else if (cmp(soid, last_backfill_started, get_sort_bitwise()) <= 0) - pending_backfill_updates[soid].stats.add(ctx->delta_stats); + pending_backfill_updates[soid].stats.add(delta_stats); } +} +void ReplicatedPG::apply_ctx_scrub_stats(OpContext *ctx, bool scrub_ok) +{ + const hobject_t& soid = ctx->obs->oi.soid; if (!scrub_ok && scrubber.active) { assert(cmp(soid, scrubber.start, get_sort_bitwise()) < 0 || cmp(soid, scrubber.end, get_sort_bitwise()) >= 0); @@ -8555,8 +8560,13 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx) ctx->obc, ctx->clone_obc, unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef()); + if (!(ctx->log.empty())) { + assert(ctx->at_version >= projected_last_update); + projected_last_update = ctx->at_version; + } pgbackend->submit_transaction( soid, + ctx->delta_stats, ctx->at_version, std::move(ctx->op_t), pg_trim_to, @@ -9806,7 +9816,7 @@ void ReplicatedPG::mark_all_unfound_lost( missing_loc.get_needs_recovery().end(); ObcLockManager manager; - eversion_t v = info.last_update; + eversion_t v = get_next_version(); v.epoch = get_osdmap()->get_epoch(); unsigned num_unfound = missing_loc.num_unfound(); while (m != mend) { @@ -9829,7 +9839,6 @@ void ReplicatedPG::mark_all_unfound_lost( prev = pick_newest_available(oid); if (prev > eversion_t()) { // log it - ++v.version; pg_log_entry_t e( pg_log_entry_t::LOST_REVERT, oid, v, m->second.need, 0, osd_reqid_t(), mtime, 0); @@ -9839,13 +9848,13 @@ void ReplicatedPG::mark_all_unfound_lost( dout(10) << e << dendl; // we are now missing the new version; recovery code will sort it out. + ++v.version; ++m; break; } case pg_log_entry_t::LOST_DELETE: { - ++v.version; pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need, 0, osd_reqid_t(), mtime, 0); if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { @@ -9858,6 +9867,7 @@ void ReplicatedPG::mark_all_unfound_lost( dout(10) << e << dendl; log_entries.push_back(e); + ++v.version; ++m; } break; @@ -11518,7 +11528,7 @@ void ReplicatedPG::hit_set_remove_all() utime_t now = ceph_clock_now(cct); ctx->mtime = now; hit_set_trim(ctx, 0); - apply_ctx_stats(ctx.get()); + apply_ctx_scrub_stats(ctx.get()); simple_opc_submit(std::move(ctx)); } @@ -11737,7 +11747,7 @@ void ReplicatedPG::hit_set_persist() hit_set_trim(ctx, max); - apply_ctx_stats(ctx.get()); + apply_ctx_scrub_stats(ctx.get()); simple_opc_submit(std::move(ctx)); } @@ -13118,7 +13128,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) pg->queue_snap_trim(); }); - pg->apply_ctx_stats(ctx.get()); + pg->apply_ctx_scrub_stats(ctx.get()); in_flight.insert(pos); pg->simple_opc_submit(std::move(ctx)); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 7b99a78773e38..88cf36ba2c81e 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -268,6 +268,9 @@ public: const object_stat_sum_t &stat_diff) override; void failed_push(const list &from, const hobject_t &soid) override; void cancel_pull(const hobject_t &soid) override; + void apply_stats( + const hobject_t &soid, + const object_stat_sum_t &delta_stats) override; template class BlessedGenContext; class BlessedContext; @@ -1100,8 +1103,9 @@ protected: void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv); void make_writeable(OpContext *ctx); void log_op_stats(OpContext *ctx); - void apply_ctx_stats(OpContext *ctx, - bool scrub_ok=false); ///< true if we should skip scrub stat update + void apply_ctx_scrub_stats( + OpContext *ctx, + bool scrub_ok=false); ///< true if we should skip scrub stat update void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi, interval_set& modified, uint64_t offset, -- 2.39.5