From: Samuel Just Date: Thu, 3 Nov 2016 00:38:13 +0000 (-0700) Subject: osd/: use PGBackend::call_write_ordered to submit log entries in commit order X-Git-Tag: v11.1.0~245^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=967764707affd458300845d4a0e77407ef00b1af;p=ceph.git osd/: use PGBackend::call_write_ordered to submit log entries in commit order Without this change, we might submit new log entries for marking objects unfound in a way that causes replicas to process them out-of-order with pending writes with lower version numbers. That would be bad. Instead, add an interface to allow an arbitrary callback to be called after any previously submitted transaction commit, but before any subsequently submitted operations commit. Signed-off-by: Samuel Just --- diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 418c2c08a88..d4e0091ca73 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -401,6 +401,8 @@ typedef ceph::shared_ptr OSDMapRef; OpRequestRef op ///< [in] op ) = 0; + /// submit callback to be called in order with pending writes + virtual void call_write_ordered(std::function &&cb) = 0; void try_stash( const hobject_t &hoid, diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 6b8df0a5d34..b9431cc0a8d 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -342,6 +342,13 @@ private: public: friend class C_OSD_OnOpCommit; friend class C_OSD_OnOpApplied; + + void call_write_ordered(std::function &&cb) override { + // ReplicatedBackend submits writes inline in submit_transaction, so + // we can just call the callback. + cb(); + } + void submit_transaction( const hobject_t &hoid, const object_stat_sum_t &delta_stats, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index aa250028212..a3d32493883 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -8665,121 +8665,133 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx) void ReplicatedPG::submit_log_entries( const mempool::osd::list &entries, ObcLockManager &&manager, - boost::optional > &&on_complete, + boost::optional > &&_on_complete, OpRequestRef op) { dout(10) << __func__ << " " << entries << dendl; assert(is_primary()); - ObjectStore::Transaction t; - - eversion_t old_last_update = info.last_update; - merge_new_log_entries(entries, t); + if (!entries.empty()) { + assert(entries.rbegin()->version >= projected_last_update); + projected_last_update = entries.rbegin()->version; + } boost::intrusive_ptr repop; - set waiting_on; + boost::optional > on_complete; if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { repop = new_repop( std::move(manager), std::move(op), - std::move(on_complete)); - } - for (set::const_iterator i = actingbackfill.begin(); - i != actingbackfill.end(); - ++i) { - pg_shard_t peer(*i); - if (peer == pg_whoami) continue; - assert(peer_missing.count(peer)); - assert(peer_info.count(peer)); - if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { - assert(repop); - MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing( - entries, - spg_t(info.pgid.pgid, i->shard), - pg_whoami.shard, - get_osdmap()->get_epoch(), - repop->rep_tid); - osd->send_message_osd_cluster( - peer.osd, m, get_osdmap()->get_epoch()); - waiting_on.insert(peer); - } else { - MOSDPGLog *m = new MOSDPGLog( - peer.shard, pg_whoami.shard, - info.last_update.epoch, - info); - m->log.log = entries; - m->log.tail = old_last_update; - m->log.head = info.last_update; - osd->send_message_osd_cluster( - peer.osd, m, get_osdmap()->get_epoch()); - } + std::move(_on_complete)); + } else { + on_complete = std::move(_on_complete); } - if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { - ceph_tid_t rep_tid = repop->rep_tid; - waiting_on.insert(pg_whoami); - log_entry_update_waiting_on.insert( - make_pair( - rep_tid, - LogUpdateCtx{std::move(repop), std::move(waiting_on)} - )); - struct OnComplete : public Context { - ReplicatedPGRef pg; - ceph_tid_t rep_tid; - epoch_t epoch; - OnComplete( - ReplicatedPGRef pg, - ceph_tid_t rep_tid, - epoch_t epoch) - : pg(pg), rep_tid(rep_tid), epoch(epoch) {} - void finish(int) override { - pg->lock(); - if (!pg->pg_has_reset_since(epoch)) { - auto it = pg->log_entry_update_waiting_on.find(rep_tid); - assert(it != pg->log_entry_update_waiting_on.end()); - auto it2 = it->second.waiting_on.find(pg->pg_whoami); - assert(it2 != it->second.waiting_on.end()); - it->second.waiting_on.erase(it2); - if (it->second.waiting_on.empty()) { - pg->repop_all_applied(it->second.repop.get()); - pg->repop_all_committed(it->second.repop.get()); - pg->log_entry_update_waiting_on.erase(it); + + pgbackend->call_write_ordered( + [this, entries, repop, on_complete]() { + ObjectStore::Transaction t; + eversion_t old_last_update = info.last_update; + merge_new_log_entries(entries, t); + + + set waiting_on; + for (set::const_iterator i = actingbackfill.begin(); + i != actingbackfill.end(); + ++i) { + pg_shard_t peer(*i); + if (peer == pg_whoami) continue; + assert(peer_missing.count(peer)); + assert(peer_info.count(peer)); + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + assert(repop); + MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing( + entries, + spg_t(info.pgid.pgid, i->shard), + pg_whoami.shard, + get_osdmap()->get_epoch(), + repop->rep_tid); + osd->send_message_osd_cluster( + peer.osd, m, get_osdmap()->get_epoch()); + waiting_on.insert(peer); + } else { + MOSDPGLog *m = new MOSDPGLog( + peer.shard, pg_whoami.shard, + info.last_update.epoch, + info); + m->log.log = entries; + m->log.tail = old_last_update; + m->log.head = info.last_update; + osd->send_message_osd_cluster( + peer.osd, m, get_osdmap()->get_epoch()); + } + } + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) { + ceph_tid_t rep_tid = repop->rep_tid; + waiting_on.insert(pg_whoami); + log_entry_update_waiting_on.insert( + make_pair( + rep_tid, + LogUpdateCtx{std::move(repop), std::move(waiting_on)} + )); + struct OnComplete : public Context { + ReplicatedPGRef pg; + ceph_tid_t rep_tid; + epoch_t epoch; + OnComplete( + ReplicatedPGRef pg, + ceph_tid_t rep_tid, + epoch_t epoch) + : pg(pg), rep_tid(rep_tid), epoch(epoch) {} + void finish(int) override { + pg->lock(); + if (!pg->pg_has_reset_since(epoch)) { + auto it = pg->log_entry_update_waiting_on.find(rep_tid); + assert(it != pg->log_entry_update_waiting_on.end()); + auto it2 = it->second.waiting_on.find(pg->pg_whoami); + assert(it2 != it->second.waiting_on.end()); + it->second.waiting_on.erase(it2); + if (it->second.waiting_on.empty()) { + pg->repop_all_applied(it->second.repop.get()); + pg->repop_all_committed(it->second.repop.get()); + pg->log_entry_update_waiting_on.erase(it); + } + } + pg->unlock(); } + }; + t.register_on_complete( + new OnComplete{this, rep_tid, get_osdmap()->get_epoch()}); + } else { + if (on_complete) { + struct OnComplete : public Context { + ReplicatedPGRef pg; + std::function on_complete; + epoch_t epoch; + OnComplete( + ReplicatedPGRef pg, + const std::function &on_complete, + epoch_t epoch) + : pg(pg), + on_complete(std::move(on_complete)), + epoch(epoch) {} + void finish(int) override { + pg->lock(); + if (!pg->pg_has_reset_since(epoch)) + on_complete(); + pg->unlock(); + } + }; + t.register_on_complete( + new OnComplete{ + this, *on_complete, get_osdmap()->get_epoch() + }); } - pg->unlock(); } - }; - t.register_on_complete( - new OnComplete{this, rep_tid, get_osdmap()->get_epoch()}); - } else { - if (on_complete) { - struct OnComplete : public Context { - ReplicatedPGRef pg; - std::function on_complete; - epoch_t epoch; - OnComplete( - ReplicatedPGRef pg, - std::function &&on_complete, - epoch_t epoch) - : pg(pg), - on_complete(std::move(on_complete)), - epoch(epoch) {} - void finish(int) override { - pg->lock(); - if (!pg->pg_has_reset_since(epoch)) - on_complete(); - pg->unlock(); - } - }; - t.register_on_complete( - new OnComplete{ - this, std::move(*on_complete), get_osdmap()->get_epoch() - }); - } - } - t.register_on_applied( - new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update}); - int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL); - assert(r == 0); + t.register_on_applied( + new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update}); + int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL); + assert(r == 0); + }); } void ReplicatedPG::cancel_log_updates() @@ -9900,8 +9912,8 @@ void ReplicatedPG::mark_all_unfound_lost( reply->set_tid(tid); con->send_message(reply); } - } - )); + }), + OpRequestRef()); } void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)