From: Josh Durgin Date: Thu, 8 Mar 2018 03:09:19 +0000 (-0500) Subject: PG, PrimaryLogPG: trim log and rollback info for error log entries X-Git-Tag: v13.1.0~587^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b50186bfe6c8981700e33c8a62850e21779d67d5;p=ceph.git PG, PrimaryLogPG: trim log and rollback info for error log entries Regular updates piggyback some osd state for this purpose with MOSDRepOp[Reply]. Do the same thing for pure log entry updates (write errors and lost/revert additions) via MOSDPGUpdateLogMissing[Reply]. Fixes: http://tracker.ceph.com/issues/22050 Signed-off-by: Josh Durgin --- diff --git a/qa/standalone/osd/repro_long_log.sh b/qa/standalone/osd/repro_long_log.sh index 28afca6e03e..9fadc50609b 100755 --- a/qa/standalone/osd/repro_long_log.sh +++ b/qa/standalone/osd/repro_long_log.sh @@ -81,8 +81,8 @@ function do_repro_long_log() { rados -p test rm foo done - # this demonstrates the problem - it should fail - test_log_size $PGID 41 || return 1 + # log should have been trimmed down to min_entries with one extra + test_log_size $PGID 21 || return 1 if [ "$which" = "test1" ]; then @@ -92,10 +92,11 @@ function do_repro_long_log() { else PRIMARY=$(ceph pg $PGID query | jq '.info.stats.up_primary') kill_daemons $dir TERM osd.$PRIMARY || return 1 - CEPH_ARGS="--osd-max-pg-log-entries=30 --osd-pg-log-trim-max=5" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1 + + CEPH_ARGS="--osd-max-pg-log-entries=2" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1 run_osd $dir $PRIMARY || return 1 wait_for_clean || return 1 - test_log_size $PGID 30 || return 1 + test_log_size $PGID 2 || return 1 fi } diff --git a/src/messages/MOSDPGUpdateLogMissing.h b/src/messages/MOSDPGUpdateLogMissing.h index 652757a69aa..18ff5423664 100644 --- a/src/messages/MOSDPGUpdateLogMissing.h +++ b/src/messages/MOSDPGUpdateLogMissing.h @@ -20,7 +20,7 @@ class MOSDPGUpdateLogMissing : public MOSDFastDispatchOp { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 3; static const int COMPAT_VERSION = 1; @@ -30,6 +30,9 @@ public: shard_id_t from; ceph_tid_t rep_tid = 0; mempool::osd_pglog::list entries; + // piggybacked osd/pg state + eversion_t pg_trim_to; // primary->replica: trim to here + eversion_t pg_roll_forward_to; // primary->replica: trim rollback info to here epoch_t get_epoch() const { return map_epoch; } spg_t get_pgid() const { return pgid; } @@ -55,7 +58,9 @@ public: shard_id_t from, epoch_t epoch, epoch_t min_epoch, - ceph_tid_t rep_tid) + ceph_tid_t rep_tid, + eversion_t pg_trim_to, + eversion_t pg_roll_forward_to) : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION), map_epoch(epoch), @@ -63,7 +68,10 @@ public: pgid(pgid), from(from), rep_tid(rep_tid), - entries(entries) {} + entries(entries), + pg_trim_to(pg_trim_to), + pg_roll_forward_to(pg_roll_forward_to) + {} private: ~MOSDPGUpdateLogMissing() override {} @@ -74,7 +82,10 @@ public: out << "pg_update_log_missing(" << pgid << " epoch " << map_epoch << "/" << min_epoch << " rep_tid " << rep_tid - << " entries " << entries << ")"; + << " entries " << entries + << " trim_to " << pg_trim_to + << " roll_forward_to " << pg_roll_forward_to + << ")"; } void encode_payload(uint64_t features) override { @@ -85,6 +96,8 @@ public: encode(rep_tid, payload); encode(entries, payload); encode(min_epoch, payload); + encode(pg_trim_to, payload); + encode(pg_roll_forward_to, payload); } void decode_payload() override { bufferlist::iterator p = payload.begin(); @@ -98,6 +111,10 @@ public: } else { min_epoch = map_epoch; } + if (header.version >= 3) { + decode(pg_trim_to, p); + decode(pg_roll_forward_to, p); + } } }; diff --git a/src/messages/MOSDPGUpdateLogMissingReply.h b/src/messages/MOSDPGUpdateLogMissingReply.h index e7d088a1fda..3e5b5e3b7c2 100644 --- a/src/messages/MOSDPGUpdateLogMissingReply.h +++ b/src/messages/MOSDPGUpdateLogMissingReply.h @@ -20,7 +20,7 @@ class MOSDPGUpdateLogMissingReply : public MOSDFastDispatchOp { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 3; static const int COMPAT_VERSION = 1; @@ -29,6 +29,8 @@ public: spg_t pgid; shard_id_t from; ceph_tid_t rep_tid = 0; + // piggybacked osd state + eversion_t last_complete_ondisk; epoch_t get_epoch() const { return map_epoch; } spg_t get_pgid() const { return pgid; } @@ -58,7 +60,8 @@ public: shard_id_t from, epoch_t epoch, epoch_t min_epoch, - ceph_tid_t rep_tid) + ceph_tid_t rep_tid, + eversion_t last_complete_ondisk) : MOSDFastDispatchOp( MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY, HEAD_VERSION, @@ -67,7 +70,8 @@ public: min_epoch(min_epoch), pgid(pgid), from(from), - rep_tid(rep_tid) + rep_tid(rep_tid), + last_complete_ondisk(last_complete_ondisk) {} private: @@ -78,7 +82,8 @@ public: void print(ostream& out) const override { out << "pg_update_log_missing_reply(" << pgid << " epoch " << map_epoch << "/" << min_epoch - << " rep_tid " << rep_tid << ")"; + << " rep_tid " << rep_tid + << " lcod " << last_complete_ondisk << ")"; } void encode_payload(uint64_t features) override { @@ -88,6 +93,7 @@ public: encode(from, payload); encode(rep_tid, payload); encode(min_epoch, payload); + encode(last_complete_ondisk, payload); } void decode_payload() override { bufferlist::iterator p = payload.begin(); @@ -100,6 +106,9 @@ public: } else { min_epoch = map_epoch; } + if (header.version >= 3) { + decode(last_complete_ondisk, p); + } } }; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 175bd755dc4..744cde166f2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -5246,25 +5246,41 @@ void PG::share_pg_info() bool PG::append_log_entries_update_missing( const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t) + ObjectStore::Transaction &t, boost::optional trim_to, + boost::optional roll_forward_to) { assert(!entries.empty()); assert(entries.begin()->version > info.last_update); PGLogEntryHandler rollbacker{this, &t}; + if (roll_forward_to) { + pg_log.roll_forward(&rollbacker); + } bool invalidate_stats = pg_log.append_new_log_entries(info.last_backfill, info.last_backfill_bitwise, entries, &rollbacker); + + if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) { + pg_log.roll_forward(&rollbacker); + } + if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) { + pg_log.roll_forward_to(*roll_forward_to, &rollbacker); + last_rollback_info_trimmed_to_applied = *roll_forward_to; + } + info.last_update = pg_log.get_head(); if (pg_log.get_missing().num_missing() == 0) { // advance last_complete since nothing else is missing! info.last_complete = info.last_update; } - info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats; + + dout(20) << __func__ << "trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl; + if (trim_to) + pg_log.trim(*trim_to, info); dirty_info = true; write_if_dirty(t); return invalidate_stats; @@ -5273,12 +5289,14 @@ bool PG::append_log_entries_update_missing( void PG::merge_new_log_entries( const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t) + ObjectStore::Transaction &t, + boost::optional trim_to, + boost::optional roll_forward_to) { dout(10) << __func__ << " " << entries << dendl; assert(is_primary()); - bool rebuild_missing = append_log_entries_update_missing(entries, t); + bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to); for (set::const_iterator i = actingbackfill.begin(); i != actingbackfill.end(); ++i) { diff --git a/src/osd/PG.h b/src/osd/PG.h index a167e75bbe0..e091e458501 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -2814,7 +2814,9 @@ protected: bool append_log_entries_update_missing( const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t); + ObjectStore::Transaction &t, + boost::optional trim_to, + boost::optional roll_forward_to); /** * Merge entries updating missing as necessary on all @@ -2822,7 +2824,9 @@ protected: */ void merge_new_log_entries( const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t); + ObjectStore::Transaction &t, + boost::optional trim_to, + boost::optional roll_forward_to); void reset_interval_flush(); void start_peering_interval( diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index dc194e03729..d60a9824a46 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -10156,7 +10156,7 @@ void PrimaryLogPG::submit_log_entries( [this, entries, repop, on_complete]() { ObjectStore::Transaction t; eversion_t old_last_update = info.last_update; - merge_new_log_entries(entries, t); + merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk); set waiting_on; @@ -10175,7 +10175,9 @@ void PrimaryLogPG::submit_log_entries( pg_whoami.shard, get_osdmap()->get_epoch(), last_peering_reset, - repop->rep_tid); + repop->rep_tid, + pg_trim_to, + min_last_complete_ondisk); osd->send_message_osd_cluster( peer.osd, m, get_osdmap()->get_epoch()); waiting_on.insert(peer); @@ -10228,6 +10230,8 @@ void PrimaryLogPG::submit_log_entries( int r = osd->store->queue_transaction(ch, std::move(t), NULL); assert(r == 0); }); + + calc_trim_to(); } void PrimaryLogPG::cancel_log_updates() @@ -11162,7 +11166,16 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op) op->get_req()); assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); ObjectStore::Transaction t; - append_log_entries_update_missing(m->entries, t); + boost::optional op_trim_to, op_roll_forward_to; + if (m->pg_trim_to != eversion_t()) + op_trim_to = m->pg_trim_to; + if (m->pg_roll_forward_to != eversion_t()) + op_roll_forward_to = m->pg_roll_forward_to; + + dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl; + + append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to); + eversion_t new_lcod = info.last_complete; Context *complete = new FunctionContext( [=](int) { @@ -11170,13 +11183,15 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op) op->get_req()); lock(); if (!pg_has_reset_since(msg->get_epoch())) { + update_last_complete_ondisk(new_lcod); MOSDPGUpdateLogMissingReply *reply = new MOSDPGUpdateLogMissingReply( spg_t(info.pgid.pgid, primary_shard().shard), pg_whoami.shard, msg->get_epoch(), msg->min_epoch, - msg->get_tid()); + msg->get_tid(), + new_lcod); reply->set_priority(CEPH_MSG_PRIO_HIGH); msg->get_connection()->send_message(reply); } @@ -11216,6 +11231,9 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op) if (it != log_entry_update_waiting_on.end()) { if (it->second.waiting_on.count(m->get_from())) { it->second.waiting_on.erase(m->get_from()); + if (m->last_complete_ondisk != eversion_t()) { + update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk); + } } else { osd->clog->error() << info.pgid << " got reply "