From e7edf20fb7264bfaeeb30ac0185e9edc925878b6 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 12 Feb 2016 09:27:01 -0800 Subject: [PATCH] osd/: Use MOSDPGUpdateLogMissing to implement mark_unfound_lost_delete safely Using a MOSDPGLog was unsafe since it is not ordered with respect to repops. Instead, use a new message sent through the same paths as repops. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 8 +- src/osd/PG.cc | 87 ++++++---- src/osd/PG.h | 83 +++++++++- src/osd/PGBackend.cc | 31 ++++ src/osd/PGBackend.h | 11 ++ src/osd/PGLog.cc | 16 +- src/osd/PGLog.h | 19 ++- src/osd/ReplicatedPG.cc | 353 +++++++++++++++++++++++++++++++--------- src/osd/ReplicatedPG.h | 50 +++++- src/osd/osd_types.cc | 6 + src/osd/osd_types.h | 22 ++- 11 files changed, 564 insertions(+), 122 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index fc945fec2d9eb..616019785f77a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5292,7 +5292,12 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe // simulate pg cmd= for pg->do-command if (prefix != "pg") cmd_putval(cct, cmdmap, "cmd", prefix); - r = pg->do_command(cmdmap, ss, data, odata); + r = pg->do_command(cmdmap, ss, data, odata, con, tid); + if (r == -EAGAIN) { + pg->unlock(); + // don't reply, pg will do so async + return; + } } else { ss << "not primary for pgid " << pgid; @@ -5592,7 +5597,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe reply->set_data(odata); con->send_message(reply); } - return; } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 0b32703037f31..ac1eec876487c 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4556,39 +4556,71 @@ void PG::share_pg_info() } } -/* - * Share a new segment of this PG's log with some replicas, after PG is active. - * - * Updates peer_missing and peer_info. - */ -void PG::share_pg_log() +void PG::append_log_entries_update_missing( + const list &entries, + ObjectStore::Transaction &t) { - dout(10) << __func__ << dendl; + assert(!entries.empty()); + assert(entries.begin()->version > info.last_update); + + PGLogEntryHandler rollbacker; + pg_log.append_new_log_entries( + info.last_backfill, + info.last_backfill_bitwise, + entries, + &rollbacker); + rollbacker.apply(this, &t); + info.last_update = pg_log.get_head(); + + if (pg_log.get_missing().num_missing() == 0) { + // advance last_complete since nothing else is missing! + info.last_complete = info.last_update; + } + + info.stats.stats_invalid = true; + dirty_info = true; + write_if_dirty(t); +} + + +void PG::merge_new_log_entries( + const list &entries, + ObjectStore::Transaction &t) +{ + dout(10) << __func__ << " " << entries << dendl; assert(is_primary()); - set::const_iterator a = actingbackfill.begin(); - assert(a != actingbackfill.end()); - set::const_iterator end = actingbackfill.end(); - while (a != end) { - pg_shard_t peer(*a); - ++a; + append_log_entries_update_missing(entries, t); + for (set::const_iterator i = actingbackfill.begin(); + i != actingbackfill.end(); + ++i) { + pg_shard_t peer(*i); if (peer == pg_whoami) continue; + assert(peer_missing.count(peer)); + assert(peer_info.count(peer)); pg_missing_t& pmissing(peer_missing[peer]); pg_info_t& pinfo(peer_info[peer]); - - MOSDPGLog *m = new MOSDPGLog( - peer.shard, pg_whoami.shard, - info.last_update.epoch, info); - m->log.copy_after(pg_log.get_log(), pinfo.last_update); - - for (list::const_iterator i = m->log.log.begin(); - i != m->log.log.end(); - ++i) { - pmissing.add_next_event(*i); - } - pinfo.last_update = m->log.head; - - osd->send_message_osd_cluster(peer.osd, m, get_osdmap()->get_epoch()); + PGLog::append_log_entries_update_missing( + pinfo.last_backfill, + info.last_backfill_bitwise, + entries, + NULL, + pmissing, + NULL, + this); + pinfo.last_update = info.last_update; + pinfo.stats.stats_invalid = true; + } + for (auto &&i: entries) { + missing_loc.rebuild( + i.soid, + get_sort_bitwise(), + pg_whoami, + actingbackfill, + info, + pg_log.get_missing(), + peer_missing, + peer_info); } } @@ -6640,7 +6672,6 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&) if (pg->cct->_conf->osd_auto_mark_unfound_lost) { pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound << " objects unfound and apparently lost, would automatically marking lost but NOT IMPLEMENTED\n"; - //pg->mark_all_unfound_lost(*context< RecoveryMachine >().get_cur_transaction()); } else pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound << " objects unfound and apparently lost\n"; } diff --git a/src/osd/PG.h b/src/osd/PG.h index e0543cc2a2474..ada160153fcf9 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -423,6 +423,55 @@ public: missing_loc.erase(hoid); } + /// Call to update structures for hoid after a change + void rebuild( + const hobject_t &hoid, + bool sort_bitwise, + pg_shard_t self, + const set to_recover, + const pg_info_t &info, + const pg_missing_t &missing, + const map &pmissing, + const map &pinfo) { + recovered(hoid); + boost::optional item; + set have; + auto miter = missing.missing.find(hoid); + if (miter != missing.missing.end()) { + item = miter->second; + } else { + for (auto &&i: to_recover) { + if (i == self) + continue; + auto pmiter = pmissing.find(i); + assert(pmiter != pmissing.end()); + miter = pmiter->second.missing.find(hoid); + if (miter != pmiter->second.missing.end()) { + item = miter->second; + break; + } + } + } + if (!item) + return; // recovered! + + needs_recovery_map[hoid] = *item; + auto mliter = + missing_loc.insert(make_pair(hoid, set())).first; + assert(info.last_backfill == hobject_t::get_max()); + assert(info.last_update >= item->need); + if (!missing.is_missing(hoid)) + mliter->second.insert(self); + for (auto &&i: pmissing) { + auto pinfoiter = pinfo.find(i.first); + assert(pinfoiter != pinfo.end()); + if (item->need <= pinfoiter->second.last_update && + cmp(hoid, pinfoiter->second.last_backfill, sort_bitwise) <= 0 && + !i.second.is_missing(hoid)) + mliter->second.insert(i.first); + } + } + const set &get_locations(const hobject_t &hoid) const { return missing_loc.count(hoid) ? missing_loc.find(hoid)->second : empty_set; @@ -854,7 +903,6 @@ public: bool adjust_need_up_thru(const OSDMapRef osdmap); bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const; - virtual void mark_all_unfound_lost(int how) = 0; virtual void dump_recovery_info(Formatter *f) const = 0; bool calc_min_last_complete_ondisk() { @@ -920,11 +968,15 @@ public: list to_rollback; set to_remove; list to_trim; + list > to_stash; // LogEntryHandler void remove(const hobject_t &hoid) { to_remove.insert(hoid); } + void try_stash(const hobject_t &hoid, version_t v) { + to_stash.push_back(make_pair(hoid, v)); + } void rollback(const pg_log_entry_t &entry) { to_rollback.push_back(entry); } @@ -941,6 +993,11 @@ public: SnapRollBacker rollbacker(j->soid, pg, t); j->mod_desc.visit(&rollbacker); } + for (list >::iterator i = to_stash.begin(); + i != to_stash.end(); + ++i) { + pg->get_pgbackend()->try_stash(i->first, i->second, t); + } for (set::iterator i = to_remove.begin(); i != to_remove.end(); ++i) { @@ -2218,8 +2275,19 @@ public: /// share pg info after a pg is active void share_pg_info(); - /// share new pg log entries after a pg is active - void share_pg_log(); + + + void append_log_entries_update_missing( + const list &entries, + ObjectStore::Transaction &t); + + /** + * Merge entries updating missing as necessary on all + * actingbackfill logs and missings (also missing_loc) + */ + void merge_new_log_entries( + const list &entries, + ObjectStore::Transaction &t); void reset_interval_flush(); void start_peering_interval( @@ -2315,8 +2383,13 @@ public: virtual void do_backfill(OpRequestRef op) = 0; virtual void snap_trimmer(epoch_t epoch_queued) = 0; - virtual int do_command(cmdmap_t cmdmap, ostream& ss, - bufferlist& idata, bufferlist& odata) = 0; + virtual int do_command( + cmdmap_t cmdmap, + ostream& ss, + bufferlist& idata, + bufferlist& odata, + ConnectionRef conn, + ceph_tid_t tid) = 0; virtual void on_role_change() = 0; virtual void on_pool_change() = 0; diff --git a/src/osd/PGBackend.cc b/src/osd/PGBackend.cc index 2ffe6a3cff64b..d2e7a629680ff 100644 --- a/src/osd/PGBackend.cc +++ b/src/osd/PGBackend.cc @@ -59,6 +59,12 @@ struct RollbackVisitor : public ObjectModDesc::Visitor { temp.append(t); temp.swap(t); } + void try_rmobject(version_t old_version) { + ObjectStore::Transaction temp; + pg->rollback_try_stash(hoid, old_version, &temp); + temp.append(t); + temp.swap(t); + } void create() { ObjectStore::Transaction temp; pg->rollback_create(hoid, &temp); @@ -82,6 +88,17 @@ void PGBackend::rollback( } +void PGBackend::try_stash( + const hobject_t &hoid, + version_t v, + ObjectStore::Transaction *t) +{ + t->try_rename( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + ghobject_t(hoid, v, get_parent()->whoami_shard().shard)); +} + void PGBackend::on_change_cleanup(ObjectStore::Transaction *t) { dout(10) << __func__ << dendl; @@ -253,6 +270,20 @@ void PGBackend::rollback_stash( ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); } +void PGBackend::rollback_try_stash( + const hobject_t &hoid, + version_t old_version, + ObjectStore::Transaction *t) { + assert(!hoid.is_temp()); + t->remove( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + t->try_rename( + coll, + ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard), + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); +} + void PGBackend::rollback_create( const hobject_t &hoid, ObjectStore::Transaction *t) { diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 1af8a0c4c867a..4ba705b66f2a3 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -496,6 +496,11 @@ struct shard_info_wrapper; ) = 0; + void try_stash( + const hobject_t &hoid, + version_t v, + ObjectStore::Transaction *t); + void rollback( const hobject_t &hoid, const ObjectModDesc &desc, @@ -519,6 +524,12 @@ struct shard_info_wrapper; version_t old_version, ObjectStore::Transaction *t); + /// Unstash object to rollback stash + void rollback_try_stash( + const hobject_t &hoid, + version_t old_version, + ObjectStore::Transaction *t); + /// Delete object to rollback create void rollback_create( const hobject_t &hoid, diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc index 3575b0393c8a9..e8f2a3ac49ef3 100644 --- a/src/osd/PGLog.cc +++ b/src/osd/PGLog.cc @@ -570,6 +570,7 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead void PGLog::append_log_entries_update_missing( const hobject_t &last_backfill, + bool last_backfill_bitwise, const list &entries, IndexedLog *log, pg_missing_t &missing, @@ -589,12 +590,20 @@ void PGLog::append_log_entries_update_missing( ldpp_dout(dpp, 20) << "update missing, append " << ne << dendl; log->index(ne); } - if (p->soid <= last_backfill) { + if (cmp(p->soid, last_backfill, last_backfill_bitwise) <= 0) { missing.add_next_event(*p); - if (p->is_delete() && rollbacker) - rollbacker->remove(p->soid); + if (rollbacker) { + // hack to match PG::mark_all_unfound_lost + if (p->is_lost_delete() && p->mod_desc.can_rollback()) { + rollbacker->try_stash(p->soid, p->version.version); + } else if (p->is_delete()) { + rollbacker->remove(p->soid); + } + } } } + if (log) + log->reset_rollback_info_trimmed_to_riter(); } void PGLog::merge_log(ObjectStore::Transaction& t, @@ -708,6 +717,7 @@ void PGLog::merge_log(ObjectStore::Transaction& t, entries.splice(entries.end(), olog.log, from, to); append_log_entries_update_missing( info.last_backfill, + info.last_backfill_bitwise, entries, &log, missing, diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index 66adfa2ab9eec..7740b3aaa90e5 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -48,6 +48,9 @@ struct PGLog : DoutPrefixProvider { const pg_log_entry_t &entry) = 0; virtual void remove( const hobject_t &hoid) = 0; + virtual void try_stash( + const hobject_t &entry, + version_t v) = 0; virtual void trim( const pg_log_entry_t &entry) = 0; virtual ~LogEntryHandler() {} @@ -231,6 +234,13 @@ struct PGLog : DoutPrefixProvider { ++rollback_info_trimmed_to_riter; } + void reset_rollback_info_trimmed_to_riter() { + rollback_info_trimmed_to_riter = log.rbegin(); + while (rollback_info_trimmed_to_riter != log.rend() && + rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to) + ++rollback_info_trimmed_to_riter; + } + // indexes objects, caller ops and extra caller ops void index() { objects.clear(); @@ -257,7 +267,7 @@ struct PGLog : DoutPrefixProvider { reset_riter(); indexed_data = PGLOG_INDEXED_ALL; - + reset_rollback_info_trimmed_to_riter(); } void index_objects() const { @@ -269,7 +279,6 @@ struct PGLog : DoutPrefixProvider { } indexed_data |= PGLOG_INDEXED_OBJECTS; - } void index_caller_ops() const { @@ -767,6 +776,7 @@ public: static void append_log_entries_update_missing( const hobject_t &last_backfill, + bool last_backfill_bitwise, const list &entries, IndexedLog *log, pg_missing_t &missing, @@ -774,15 +784,20 @@ public: const DoutPrefixProvider *dpp); void append_new_log_entries( const hobject_t &last_backfill, + bool last_backfill_bitwise, const list &entries, LogEntryHandler *rollbacker) { append_log_entries_update_missing( last_backfill, + last_backfill_bitwise, entries, &log, missing, rollbacker, this); + if (!entries.empty()) { + mark_writeout_from(entries.begin()->version); + } } void write_log(ObjectStore::Transaction& t, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 064c7726e6973..00343bcb89732 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -49,6 +49,7 @@ #include "messages/MOSDPGPushReply.h" #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MCommandReply.h" #include "Watch.h" @@ -685,8 +686,13 @@ int ReplicatedPG::get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilt // ========================================================== -int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss, - bufferlist& idata, bufferlist& odata) +int ReplicatedPG::do_command( + cmdmap_t cmdmap, + ostream& ss, + bufferlist& idata, + bufferlist& odata, + ConnectionRef con, + ceph_tid_t tid) { const pg_missing_t &missing = pg_log.get_missing(); string prefix; @@ -791,10 +797,8 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss, return -EINVAL; } - ss << "pg has " << unfound - << " objects unfound and apparently lost, marking"; - mark_all_unfound_lost(mode); - return 0; + mark_all_unfound_lost(mode, con, tid); + return -EAGAIN; } else if (command == "list_missing") { hobject_t offset; @@ -8461,6 +8465,26 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop( return repop; } + +ReplicatedPG::RepGather *ReplicatedPG::new_repop( + ObcLockManager &&manager, + boost::optional > &&on_complete) +{ + RepGather *repop = new RepGather( + std::move(manager), + std::move(on_complete), + osd->get_tid(), + info.last_complete); + + repop->start = ceph_clock_now(cct); + + repop_queue.push_back(&repop->queue_item); + repop->get(); + + osd->logger->inc(l_osd_op_wip); + + return repop; +} void ReplicatedPG::remove_repop(RepGather *repop) { @@ -8500,6 +8524,123 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx) repop->put(); } + +void ReplicatedPG::submit_log_entries( + const list &entries, + ObcLockManager &&manager, + boost::optional > &&on_complete) +{ + dout(10) << __func__ << entries << dendl; + assert(is_primary()); + + ObjectStore::Transaction t; + + eversion_t old_last_update = info.last_update; + merge_new_log_entries(entries, t); + + boost::intrusive_ptr repop; + set waiting_on; + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + repop = new_repop( + std::move(manager), + std::move(on_complete)); + } + for (set::const_iterator i = actingbackfill.begin(); + i != actingbackfill.end(); + ++i) { + pg_shard_t peer(*i); + if (peer == pg_whoami) continue; + assert(peer_missing.count(peer)); + assert(peer_info.count(peer)); + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + assert(repop); + MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing( + entries, + spg_t(info.pgid.pgid, i->shard), + pg_whoami.shard, + get_osdmap()->get_epoch(), + repop->rep_tid); + osd->send_message_osd_cluster( + peer.osd, m, get_osdmap()->get_epoch()); + waiting_on.insert(peer); + } else { + MOSDPGLog *m = new MOSDPGLog( + peer.shard, pg_whoami.shard, + info.last_update.epoch, + info); + m->log.log = entries; + m->log.tail = old_last_update; + m->log.head = info.last_update; + osd->send_message_osd_cluster( + peer.osd, m, get_osdmap()->get_epoch()); + } + } + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + ceph_tid_t rep_tid = repop->rep_tid; + waiting_on.insert(pg_whoami); + log_entry_update_waiting_on.insert( + make_pair( + rep_tid, + LogUpdateCtx{std::move(repop), std::move(waiting_on)} + )); + struct OnComplete : public Context { + ReplicatedPGRef pg; + ceph_tid_t rep_tid; + epoch_t epoch; + OnComplete( + ReplicatedPGRef pg, + ceph_tid_t rep_tid, + epoch_t epoch) + : pg(pg), rep_tid(rep_tid), epoch(epoch) {} + void finish(int) override { + pg->lock(); + if (!pg->pg_has_reset_since(epoch)) { + auto it = pg->log_entry_update_waiting_on.find(rep_tid); + assert(it != pg->log_entry_update_waiting_on.end()); + auto it2 = it->second.waiting_on.find(pg->pg_whoami); + assert(it2 != it->second.waiting_on.end()); + it->second.waiting_on.erase(it2); + if (it->second.waiting_on.empty()) { + pg->repop_all_applied(it->second.repop.get()); + pg->repop_all_committed(it->second.repop.get()); + pg->log_entry_update_waiting_on.erase(it); + } + } + pg->unlock(); + } + }; + t.register_on_complete( + new OnComplete{this, rep_tid, get_osdmap()->get_epoch()}); + } else { + if (on_complete) { + struct OnComplete : public Context { + ReplicatedPGRef pg; + std::function on_complete; + epoch_t epoch; + OnComplete( + ReplicatedPGRef pg, + std::function &&on_complete, + epoch_t epoch) + : pg(pg), + on_complete(std::move(on_complete)), + epoch(epoch) {} + void finish(int) override { + pg->lock(); + if (!pg->pg_has_reset_since(epoch)) + on_complete(); + pg->unlock(); + } + }; + t.register_on_complete( + new OnComplete{ + this, std::move(*on_complete), get_osdmap()->get_epoch() + }); + } + } + int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL); + assert(r == 0); +} + // ------------------------------------------------------- void ReplicatedPG::get_watchers(list &pg_watchers) @@ -8658,11 +8799,12 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) ctx->log.back().mod_desc.mark_unrollbackable(); } - // no ctx->delta_stats - simple_opc_submit(std::move(ctx)); // apply new object state. ctx->obc->obs = ctx->new_obs; + + // no ctx->delta_stats + simple_opc_submit(std::move(ctx)); } ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi, @@ -9465,26 +9607,81 @@ ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, return obc; } -struct C_PG_MarkUnfoundLost : public Context { - ReplicatedPGRef pg; - list obcs; - explicit C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {} - void finish(int r) { - pg->_finish_mark_all_unfound_lost(obcs); - } -}; - void ReplicatedPG::do_update_log_missing(OpRequestRef &op) { + MOSDPGUpdateLogMissing *m = static_cast( + op->get_req()); + assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); + ObjectStore::Transaction t; + append_log_entries_update_missing(m->entries, t); + // TODO FIX + + Context *c = new FunctionContext( + [=](int) { + MOSDPGUpdateLogMissing *msg = + static_cast( + op->get_req()); + MOSDPGUpdateLogMissingReply *reply = + new MOSDPGUpdateLogMissingReply( + spg_t(info.pgid.pgid, primary_shard().shard), + pg_whoami.shard, + msg->get_epoch(), + msg->get_tid()); + reply->set_priority(CEPH_MSG_PRIO_HIGH); + msg->get_connection()->send_message(reply); + }); + + /* Hack to work around the fact that ReplicatedBackend sends + * ack+commit if commit happens first */ + if (pool.info.ec_pool()) { + t.register_on_complete(c); + } else { + t.register_on_commit(c); + } + int tr = osd->store->queue_transaction( + osr.get(), + std::move(t), + nullptr); + assert(tr == 0); } void ReplicatedPG::do_update_log_missing_reply(OpRequestRef &op) { + MOSDPGUpdateLogMissingReply *m = + static_cast( + op->get_req()); + dout(20) << __func__ << " got reply from " + << m->get_from() << dendl; + + auto it = log_entry_update_waiting_on.find(m->get_tid()); + if (it != log_entry_update_waiting_on.end()) { + if (it->second.waiting_on.count(m->get_from())) { + it->second.waiting_on.erase(m->get_from()); + } else { + osd->clog->error() + << info.pgid << " got reply " + << *m << " from shard we are not waiting for " + << m->get_from(); + } + + if (it->second.waiting_on.empty()) { + repop_all_applied(it->second.repop.get()); + repop_all_committed(it->second.repop.get()); + log_entry_update_waiting_on.erase(it); + } + } else { + osd->clog->error() + << info.pgid << " got reply " + << *m << " on unknown tid " << m->get_tid(); + } } /* Mark all unfound objects as lost. */ -void ReplicatedPG::mark_all_unfound_lost(int what) +void ReplicatedPG::mark_all_unfound_lost( + int what, + ConnectionRef con, + ceph_tid_t tid) { dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl; @@ -9492,16 +9689,18 @@ void ReplicatedPG::mark_all_unfound_lost(int what) pg_log.get_log().print(*_dout); *_dout << dendl; - ObjectStore::Transaction t; - C_PG_MarkUnfoundLost *c = new C_PG_MarkUnfoundLost(this); + list log_entries; utime_t mtime = ceph_clock_now(cct); - info.last_update.epoch = get_osdmap()->get_epoch(); - const pg_missing_t &missing = pg_log.get_missing(); map::const_iterator m = missing_loc.get_needs_recovery().begin(); map::const_iterator mend = missing_loc.get_needs_recovery().end(); + + ObcLockManager manager; + eversion_t v = info.last_update; + v.epoch = get_osdmap()->get_epoch(); + unsigned num_unfound = missing_loc.num_unfound(); while (m != mend) { const hobject_t &oid(m->first); if (!missing_loc.is_unfound(oid)) { @@ -9515,47 +9714,43 @@ void ReplicatedPG::mark_all_unfound_lost(int what) switch (what) { case pg_log_entry_t::LOST_MARK: - obc = mark_object_lost(&t, oid, m->second.need, mtime, pg_log_entry_t::LOST_MARK); - pg_log.missing_got(m++); assert(0 == "actually, not implemented yet!"); - // we need to be careful about how this is handled on the replica! break; case pg_log_entry_t::LOST_REVERT: prev = pick_newest_available(oid); if (prev > eversion_t()) { // log it - ++info.last_update.version; + ++v.version; pg_log_entry_t e( - pg_log_entry_t::LOST_REVERT, oid, info.last_update, + pg_log_entry_t::LOST_REVERT, oid, v, m->second.need, 0, osd_reqid_t(), mtime); e.reverting_to = prev; - pg_log.add(e); + e.mod_desc.mark_unrollbackable(); + log_entries.push_back(e); dout(10) << e << dendl; // we are now missing the new version; recovery code will sort it out. ++m; - pg_log.revise_need(oid, info.last_update); - missing_loc.revise_need(oid, info.last_update); break; } - /** fall-thru **/ case pg_log_entry_t::LOST_DELETE: { - // log it - ++info.last_update.version; - pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need, + ++v.version; + pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need, 0, osd_reqid_t(), mtime); - pg_log.add(e); + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + if (pool.info.require_rollback()) { + e.mod_desc.try_rmobject(v.version); + } else { + e.mod_desc.mark_unrollbackable(); + } + } // otherwise, just do what we used to do dout(10) << e << dendl; + log_entries.push_back(e); - t.remove( - coll, - ghobject_t(oid, ghobject_t::NO_GEN, pg_whoami.shard)); - pg_log.missing_add_event(e); ++m; - missing_loc.recovered(oid); } break; @@ -9563,47 +9758,51 @@ void ReplicatedPG::mark_all_unfound_lost(int what) assert(0); } - if (obc) - c->obcs.push_back(obc); + if (obc) { + bool got = manager.get_lock_type( + ObjectContext::RWState::RWEXCL, + oid, + obc, + OpRequestRef()); + if (!got) { + assert(0 == "Couldn't lock unfound object?"); + } + } } - dout(30) << __func__ << ": log after:\n"; - pg_log.get_log().print(*_dout); - *_dout << dendl; - info.stats.stats_invalid = true; - if (missing.num_missing() == 0) { - // advance last_complete since nothing else is missing! - info.last_complete = info.last_update; - } - - dirty_info = true; - write_if_dirty(t); - - - osd->store->queue_transaction(osr.get(), std::move(t), c, NULL, - new C_OSD_OndiskWriteUnlockList(&c->obcs)); - - // Send out the PG log to all replicas - // So that they know what is lost - share_pg_log(); - - // queue ourselves so that we push the (now-lost) object_infos to replicas. - osd->queue_for_recovery(this); -} - -void ReplicatedPG::_finish_mark_all_unfound_lost(list& obcs) -{ - lock(); - dout(10) << "_finish_mark_all_unfound_lost " << dendl; - - if (!deleting) - requeue_ops(waiting_for_all_missing); - waiting_for_all_missing.clear(); - - obcs.clear(); - unlock(); + struct OnComplete { + ReplicatedPG *pg; + std::function on_complete; + void operator()() { + pg->requeue_ops(pg->waiting_for_all_missing); + pg->waiting_for_all_missing.clear(); + pg->osd->queue_for_recovery(pg); + } + }; + submit_log_entries( + log_entries, + std::move(manager), + boost::optional >( + [=]() { + requeue_ops(waiting_for_all_missing); + waiting_for_all_missing.clear(); + osd->queue_for_recovery(this); + + stringstream ss; + ss << "pg has " << num_unfound + << " objects unfound and apparently lost marking"; + string rs = ss.str(); + dout(0) << "do_command r=" << 0 << " " << rs << dendl; + osd->clog->info() << rs << "\n"; + if (con) { + MCommandReply *reply = new MCommandReply(0, rs); + reply->set_tid(tid); + con->send_message(reply); + } + } + )); } void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits) diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 53cb12ecfc393..aece2f100e16d 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -737,6 +737,23 @@ public: on_success(std::move(c->on_success)), on_finish(std::move(c->on_finish)) {} + RepGather( + ObcLockManager &&manager, + boost::optional > &&on_complete, + ceph_tid_t rt, + eversion_t lc) : + queue_item(this), + nref(1), + rep_tid(rt), + rep_aborted(false), rep_done(false), + all_applied(false), all_committed(false), + pg_local_last_complete(lc), + lock_manager(std::move(manager)) { + if (on_complete) { + on_success.push_back(std::move(*on_complete)); + } + } + RepGather *get() { nref++; return this; @@ -866,11 +883,29 @@ protected: OpContext *ctx, ObjectContextRef obc, ceph_tid_t rep_tid); + RepGather *new_repop( + ObcLockManager &&manager, + boost::optional > &&on_complete); void remove_repop(RepGather *repop); OpContextUPtr simple_opc_create(ObjectContextRef obc); void simple_opc_submit(OpContextUPtr ctx); + /** + * Merge entries atomically into all actingbackfill osds + * adjusting missing and recovery state as necessary + */ + void submit_log_entries( + const list &entries, + ObcLockManager &&manager, + boost::optional > &&on_complete); + struct LogUpdateCtx { + boost::intrusive_ptr repop; + set waiting_on; + }; + map log_entry_update_waiting_on; + + // hot/cold tracking HitSetRef hit_set; ///< currently accumulating HitSet utime_t hit_set_start_stamp; ///< time the current HitSet started recording @@ -1426,8 +1461,13 @@ public: const PGPool &_pool, spg_t p); ~ReplicatedPG() {} - int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata, - bufferlist& odata); + int do_command( + cmdmap_t cmdmap, + ostream& ss, + bufferlist& idata, + bufferlist& odata, + ConnectionRef conn, + ceph_tid_t tid) override; void do_request( OpRequestRef& op, @@ -1583,7 +1623,10 @@ public: void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op); void kick_object_context_blocked(ObjectContextRef obc); - void mark_all_unfound_lost(int what); + void mark_all_unfound_lost( + int what, + ConnectionRef con, + ceph_tid_t tid); eversion_t pick_newest_available(const hobject_t& oid); ObjectContextRef mark_object_lost(ObjectStore::Transaction *t, const hobject_t& oid, eversion_t version, @@ -1594,7 +1637,6 @@ public: void do_update_log_missing_reply( OpRequestRef &op); - void _finish_mark_all_unfound_lost(list& obcs); void on_role_change(); void on_pool_change(); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 6f26284b69c5f..eaf7e3ae9b968 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -3272,6 +3272,12 @@ void ObjectModDesc::visit(Visitor *visitor) const visitor->update_snaps(snaps); break; } + case TRY_DELETE: { + version_t old_version; + ::decode(old_version, bp); + visitor->try_rmobject(old_version); + break; + } default: assert(0 == "Invalid rollback code"); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 39500d6b473b4..49b4e2bd8ec38 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2360,6 +2360,15 @@ public: virtual void append(uint64_t old_offset) {} virtual void setattrs(map > &attrs) {} virtual void rmobject(version_t old_version) {} + /** + * Used to support the unfound_lost_delete log event: if the stashed + * version exists, we unstash it, otherwise, we do nothing. This way + * each replica rolls back to whatever state it had prior to the attempt + * at mark unfound lost delete + */ + virtual void try_rmobject(version_t old_version) { + rmobject(old_version); + } virtual void create() {} virtual void update_snaps(set &old_snaps) {} virtual ~Visitor() {} @@ -2371,7 +2380,8 @@ public: SETATTRS = 2, DELETE = 3, CREATE = 4, - UPDATE_SNAPS = 5 + UPDATE_SNAPS = 5, + TRY_DELETE = 6 }; ObjectModDesc() : can_local_rollback(true), rollback_info_completed(false) {} void claim(ObjectModDesc &other) { @@ -2431,6 +2441,16 @@ public: rollback_info_completed = true; return true; } + bool try_rmobject(version_t deletion_version) { + if (!can_local_rollback || rollback_info_completed) + return false; + ENCODE_START(1, 1, bl); + append_id(TRY_DELETE); + ::encode(deletion_version, bl); + ENCODE_FINISH(bl); + rollback_info_completed = true; + return true; + } void create() { if (!can_local_rollback || rollback_info_completed) return; -- 2.39.5