From ccbc90dd736770cafe928d7c038ebde471c06aad Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 27 Oct 2016 18:38:02 -0700 Subject: [PATCH] osd/: add projected_log to do client dup detection on not yet committed log entries Log entries don't get added to the log for ECBackend until reads are done, yet we still want any other requests with the same id to wait. ReplicatedPG::update_range should consider the projected log as well. Signed-off-by: Samuel Just --- src/osd/PG.cc | 43 +++++++++++++++++++++++---- src/osd/PG.h | 6 ++++ src/osd/PGLog.h | 15 ++++++++++ src/osd/ReplicatedPG.cc | 64 ++++++++++++++++++++--------------------- 4 files changed, 89 insertions(+), 39 deletions(-) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 4b43a141956..0ab50db7077 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -944,6 +944,7 @@ void PG::clear_primary_state() min_last_complete_ondisk = eversion_t(); pg_trim_to = eversion_t(); might_have_unfound.clear(); + projected_log = PGLog::IndexedLog(); last_update_ondisk = eversion_t(); @@ -2470,6 +2471,18 @@ void PG::update_heartbeat_peers() osd->need_heartbeat_peer_update(); } + +bool PG::check_in_progress_op( + const osd_reqid_t &r, + eversion_t *replay_version, + version_t *user_version, + int *return_code) const +{ + return ( + projected_log.get_request(r, replay_version, user_version, return_code) || + pg_log.get_log().get_request(r, replay_version, user_version, return_code)); +} + void PG::_update_calc_stats() { info.stats.version = info.last_update; @@ -3062,6 +3075,12 @@ void PG::append_log( pg_log.roll_forward(&handler); } } + auto last = logv.rbegin(); + if (is_primary() && last != logv.rend()) { + projected_log.skip_can_rollback_to_to_head(); + projected_log.trim(last->version, nullptr); + } + if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) { pg_log.roll_forward_to( roll_forward_to, @@ -4194,16 +4213,28 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) } // walk the log to find the latest update that affects our chunk - scrubber.subset_last_update = pg_log.get_tail(); - for (list::const_reverse_iterator p = pg_log.get_log().log.rbegin(); - p != pg_log.get_log().log.rend(); - ++p) { + scrubber.subset_last_update = eversion_t(); + for (auto p = projected_log.log.rbegin(); + p != projected_log.log.rend(); + ++p) { if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 && cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) { scrubber.subset_last_update = p->version; break; - } - } + } + } + if (scrubber.subset_last_update == eversion_t()) { + for (list::const_reverse_iterator p = + pg_log.get_log().log.rbegin(); + p != pg_log.get_log().log.rend(); + ++p) { + if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 && + cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) { + scrubber.subset_last_update = p->version; + break; + } + } + } // ask replicas to wait until // last_update_applied >= scrubber.subset_last_update and then scan diff --git a/src/osd/PG.h b/src/osd/PG.h index 22ab92f38ab..27255f39873 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -2191,6 +2191,12 @@ public: PerfCounters *logger = NULL); void write_if_dirty(ObjectStore::Transaction& t); + PGLog::IndexedLog projected_log; + bool check_in_progress_op( + const osd_reqid_t &r, + eversion_t *replay_version, + version_t *user_version, + int *return_code) const; eversion_t projected_last_update; eversion_t get_next_version() const { eversion_t at_version( diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index eff487db2ef..837f69f450d 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -186,6 +186,21 @@ public: return divergent; } + template + void scan_log_after( + const eversion_t &bound, ///< [in] scan entries > bound + T &&f) const { + auto iter = log.rbegin(); + while (iter != log.rend() && iter->version > bound) + ++iter; + + while (true) { + if (iter == log.rbegin()) + break; + f(*(--iter)); + } + } + /****/ void claim_log_and_clear_rollback_info(const pg_log_t& o) { // we must have already trimmed the old entries diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6fdadd310d6..773dd8e7deb 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1950,7 +1950,7 @@ void ReplicatedPG::do_op(OpRequestRef& op) eversion_t replay_version; version_t user_version; int return_code = 0; - bool got = pg_log.get_log().get_request( + bool got = check_in_progress_op( m->get_reqid(), &replay_version, &user_version, &return_code); if (got) { dout(3) << __func__ << " dup " << m->get_reqid() @@ -8573,6 +8573,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx) assert(ctx->at_version >= projected_last_update); projected_last_update = ctx->at_version; } + for (auto &&entry: ctx->log) { + projected_log.add(entry); + } pgbackend->submit_transaction( soid, ctx->delta_stats, @@ -11358,11 +11361,11 @@ void ReplicatedPG::update_range( scan_range(local_min, local_max, bi, handle); } - if (bi->version >= info.last_update) { + if (bi->version >= projected_last_update) { dout(10) << __func__<< ": bi is current " << dendl; - assert(bi->version == info.last_update); + assert(bi->version == projected_last_update); } else if (bi->version >= info.log_tail) { - if (pg_log.get_log().empty()) { + if (pg_log.get_log().empty() && projected_log.empty()) { /* Because we don't move log_tail on split, the log might be * empty even if log_tail != last_update. However, the only * way to get here with an empty log is if log_tail is actually @@ -11372,41 +11375,36 @@ void ReplicatedPG::update_range( assert(bi->version == eversion_t()); return; } - assert(!pg_log.get_log().empty()); + dout(10) << __func__<< ": bi is old, (" << bi->version - << ") can be updated with log" << dendl; - list::const_iterator i = - pg_log.get_log().log.end(); - --i; - while (i != pg_log.get_log().log.begin() && - i->version > bi->version) { - --i; - } - if (i->version == bi->version) - ++i; + << ") can be updated with log to projected_last_update " + << projected_last_update << dendl; - assert(i != pg_log.get_log().log.end()); - dout(10) << __func__ << ": updating from version " << i->version - << dendl; - for (; i != pg_log.get_log().log.end(); ++i) { - const hobject_t &soid = i->soid; + auto func = [&](const pg_log_entry_t &e) { + dout(10) << __func__ << ": updating from version " << e.version + << dendl; + const hobject_t &soid = e.soid; if (cmp(soid, bi->begin, get_sort_bitwise()) >= 0 && cmp(soid, bi->end, get_sort_bitwise()) < 0) { - if (i->is_update()) { - dout(10) << __func__ << ": " << i->soid << " updated to version " - << i->version << dendl; - bi->objects.erase(i->soid); + if (e.is_update()) { + dout(10) << __func__ << ": " << e.soid << " updated to version " + << e.version << dendl; + bi->objects.erase(e.soid); bi->objects.insert( make_pair( - i->soid, - i->version)); - } else if (i->is_delete()) { - dout(10) << __func__ << ": " << i->soid << " removed" << dendl; - bi->objects.erase(i->soid); - } - } - } - bi->version = info.last_update; + e.soid, + e.version)); + } else if (e.is_delete()) { + dout(10) << __func__ << ": " << e.soid << " removed" << dendl; + bi->objects.erase(e.soid); + } + } + }; + dout(10) << "scanning pg log first" << dendl; + pg_log.get_log().scan_log_after(bi->version, func); + dout(10) << "scanning projected log" << dendl; + projected_log.scan_log_after(bi->version, func); + bi->version = projected_last_update; } else { assert(0 == "scan_range should have raised bi->version past log_tail"); } -- 2.39.5