From 458c1f22a4d215418122f63942782b8a9108fc41 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 19 Feb 2018 08:54:58 -0600 Subject: [PATCH] osd/PrimaryLogPG: cancel all objecter ops atomically We want to avoid a situation like this: - start proxy op A (epoch E) - start proxy op B (epoch E) - start proxy op C (epoch E) - objecter sends none of these because target is down in epoch E - osdmap update to E+1 - pg cancels requeues A, B - objecter updates to E+1 - objecter sends C - pg cancels/requeues C Note that the key thing is that operations on each object are canceled atomically. On the interval change we do it all at once. In the other cases, we cancel everything on the given object together. Fixes: http://tracker.ceph.com/issues/22123 Signed-off-by: Sage Weil (cherry picked from commit 93fd56ed039363c4f169259a0a560b968d1a0333) Conflicts: src/osd/PrimaryLogPG.cc: - Resolved in cancel_copy and added cancel_and_requeue_proxy_ops. - Define io_tids to FlushOp --- src/osd/PrimaryLogPG.cc | 106 +++++++++++++++++++++++++++++----------- src/osd/PrimaryLogPG.h | 17 ++++--- 2 files changed, 87 insertions(+), 36 deletions(-) diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 7aba1df5d2d98..6442b4718fba3 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2899,14 +2899,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid) in_progress_proxy_ops.erase(p); } -void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop) +void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop, + vector *tids) { dout(10) << __func__ << " " << prdop->soid << dendl; prdop->canceled = true; // cancel objecter op, if we can if (prdop->objecter_tid) { - osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED); + tids->push_back(prdop->objecter_tid); for (uint32_t i = 0; i < prdop->ops.size(); i++) { prdop->ops[i].outdata.clear(); } @@ -2915,20 +2916,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop) } } -void PrimaryLogPG::cancel_proxy_ops(bool requeue) +void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector *tids) { dout(10) << __func__ << dendl; // cancel proxy reads map::iterator p = proxyread_ops.begin(); while (p != proxyread_ops.end()) { - cancel_proxy_read((p++)->second); + cancel_proxy_read((p++)->second, tids); } // cancel proxy writes map::iterator q = proxywrite_ops.begin(); while (q != proxywrite_ops.end()) { - cancel_proxy_write((q++)->second); + cancel_proxy_write((q++)->second, tids); } if (requeue) { @@ -3082,14 +3083,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r) pwop->ctx = NULL; } -void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop) +void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop, + vector *tids) { dout(10) << __func__ << " " << pwop->soid << dendl; pwop->canceled = true; // cancel objecter op, if we can if (pwop->objecter_tid) { - osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED); + tids->push_back(pwop->objecter_tid); delete pwop->ctx; pwop->ctx = NULL; proxywrite_ops.erase(pwop->objecter_tid); @@ -8026,7 +8028,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc, // FIXME: if the src etc match, we could avoid restarting from the // beginning. CopyOpRef cop = copy_ops[dest]; - cancel_copy(cop, false); + vector tids; + cancel_copy(cop, false, &tids); + osd->objecter->op_cancel(tids, -ECANCELED); } CopyOpRef cop(std::make_shared(cb, obc, src, oloc, version, flags, @@ -8106,6 +8110,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) { + vector tids; dout(10) << __func__ << " " << oid << " tid " << tid << " " << cpp_strerror(r) << dendl; map::iterator p = copy_ops.find(oid); @@ -8292,7 +8297,7 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) for (map::iterator it = proxyread_ops.begin(); it != proxyread_ops.end();) { if (it->second->soid == cobc->obs.oi.soid) { - cancel_proxy_read((it++)->second); + cancel_proxy_read((it++)->second, &tids); } else { ++it; } @@ -8300,17 +8305,40 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) for (map::iterator it = proxywrite_ops.begin(); it != proxywrite_ops.end();) { if (it->second->soid == cobc->obs.oi.soid) { - cancel_proxy_write((it++)->second); + cancel_proxy_write((it++)->second, &tids); } else { ++it; } } + osd->objecter->op_cancel(tids, -ECANCELED); kick_proxy_ops_blocked(cobc->obs.oi.soid); } kick_object_context_blocked(cobc); } +void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) { + vector tids; + for (map::iterator it = proxyread_ops.begin(); + it != proxyread_ops.end();) { + if (it->second->soid == oid) { + cancel_proxy_read((it++)->second, &tids); + } else { + ++it; + } + } + for (map::iterator it = proxywrite_ops.begin(); + it != proxywrite_ops.end();) { + if (it->second->soid == oid) { + cancel_proxy_write((it++)->second, &tids); + } else { + ++it; + } + } + osd->objecter->op_cancel(tids, -ECANCELED); + kick_proxy_ops_blocked(oid); +} + void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t) { dout(20) << __func__ << " " << cop @@ -8645,7 +8673,8 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results, agent_choose_mode(); } -void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue) +void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue, + vector *tids) { dout(10) << __func__ << " " << cop->obc->obs.oi.soid << " from " << cop->src << " " << cop->oloc @@ -8653,10 +8682,10 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue) // cancel objecter op, if we can if (cop->objecter_tid) { - osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED); + tids->push_back(cop->objecter_tid); cop->objecter_tid = 0; if (cop->objecter_tid2) { - osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED); + tids->push_back(cop->objecter_tid2); cop->objecter_tid2 = 0; } } @@ -8675,13 +8704,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue) cop->obc = ObjectContextRef(); } -void PrimaryLogPG::cancel_copy_ops(bool requeue) +void PrimaryLogPG::cancel_copy_ops(bool requeue, vector *tids) { dout(10) << __func__ << dendl; map::iterator p = copy_ops.begin(); while (p != copy_ops.end()) { // requeue this op? can I queue up all of them? - cancel_copy((p++)->second, requeue); + cancel_copy((p++)->second, requeue, tids); } } @@ -8817,7 +8846,9 @@ int PrimaryLogPG::start_flush( osd->reply_op_error(fop->dup_ops.front(), -EBUSY); fop->dup_ops.pop_front(); } - cancel_flush(fop, false); + vector tids; + cancel_flush(fop, false, &tids); + osd->objecter->op_cancel(tids, -ECANCELED); } /** @@ -9017,7 +9048,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop) return -EAGAIN; // will retry } else { osd->logger->inc(l_osd_tier_try_flush_fail); - cancel_flush(fop, false); + vector tids; + cancel_flush(fop, false, &tids); + osd->objecter->op_cancel(tids, -ECANCELED); return -ECANCELED; } } @@ -9056,7 +9089,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop) dout(10) << __func__ << " failed write lock, no op; failing" << dendl; close_op_ctx(ctx.release()); osd->logger->inc(l_osd_tier_try_flush_fail); - cancel_flush(fop, false); + vector tids; + cancel_flush(fop, false, &tids); + osd->objecter->op_cancel(tids, -ECANCELED); return -ECANCELED; } @@ -9096,15 +9131,22 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop) return -EINPROGRESS; } -void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue) +void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue, + vector *tids) { dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid " << fop->objecter_tid << dendl; if (fop->objecter_tid) { - osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED); + tids->push_back(fop->objecter_tid); fop->objecter_tid = 0; } - if (fop->blocking) { + if (fop->io_tids.size()) { + for (auto &p : fop->io_tids) { + tids->push_back(p.second); + p.second = 0; + } + } + if (fop->blocking && fop->obc->is_blocked()) { fop->obc->stop_block(); kick_object_context_blocked(fop->obc); } @@ -9120,12 +9162,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue) flush_ops.erase(fop->obc->obs.oi.soid); } -void PrimaryLogPG::cancel_flush_ops(bool requeue) +void PrimaryLogPG::cancel_flush_ops(bool requeue, vector *tids) { dout(10) << __func__ << dendl; map::iterator p = flush_ops.begin(); while (p != flush_ops.end()) { - cancel_flush((p++)->second, requeue); + cancel_flush((p++)->second, requeue, tids); } } @@ -11002,9 +11044,13 @@ void PrimaryLogPG::on_shutdown() scrub_clear_state(); unreg_next_scrub(); - cancel_copy_ops(false); - cancel_flush_ops(false); - cancel_proxy_ops(false); + + vector tids; + cancel_copy_ops(false, &tids); + cancel_flush_ops(false, &tids); + cancel_proxy_ops(false, &tids); + osd->objecter->op_cancel(tids, -ECANCELED); + apply_and_flush_repops(false); cancel_log_updates(); // we must remove PGRefs, so do this this prior to release_backoffs() callers @@ -11109,9 +11155,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t) clear_scrub_reserved(); - cancel_copy_ops(is_primary()); - cancel_flush_ops(is_primary()); - cancel_proxy_ops(is_primary()); + vector tids; + cancel_copy_ops(is_primary(), &tids); + cancel_flush_ops(is_primary(), &tids); + cancel_proxy_ops(is_primary(), &tids); + osd->objecter->op_cancel(tids, -ECANCELED); // requeue object waiters for (auto& p : waiting_for_unreadable_object) { diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index e660466b685e0..e59f8c662daa6 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -228,6 +228,8 @@ public: bool blocking; ///< whether we are blocking updates bool removal; ///< we are removing the backend object boost::optional> on_flush; ///< callback, may be null + // for chunked object + map io_tids; FlushOp() : flushed_version(0), objecter_tid(0), rval(0), @@ -1288,8 +1290,8 @@ protected: void _copy_some(ObjectContextRef obc, CopyOpRef cop); void finish_copyfrom(CopyFromCallback *cb); void finish_promote(int r, CopyResults *results, ObjectContextRef obc); - void cancel_copy(CopyOpRef cop, bool requeue); - void cancel_copy_ops(bool requeue); + void cancel_copy(CopyOpRef cop, bool requeue, vector *tids); + void cancel_copy_ops(bool requeue, vector *tids); friend struct C_Copyfrom; @@ -1303,8 +1305,8 @@ protected: boost::optional> &&on_flush); void finish_flush(hobject_t oid, ceph_tid_t tid, int r); int try_flush_mark_clean(FlushOpRef fop); - void cancel_flush(FlushOpRef fop, bool requeue); - void cancel_flush_ops(bool requeue); + void cancel_flush(FlushOpRef fop, bool requeue, vector *tids); + void cancel_flush_ops(bool requeue, vector *tids); /// @return false if clone is has been evicted bool is_present_clone(hobject_t coid); @@ -1351,14 +1353,14 @@ protected: map> in_progress_proxy_ops; void kick_proxy_ops_blocked(hobject_t& soid); - void cancel_proxy_ops(bool requeue); + void cancel_proxy_ops(bool requeue, vector *tids); // -- proxyread -- map proxyread_ops; void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL); void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r); - void cancel_proxy_read(ProxyReadOpRef prdop); + void cancel_proxy_read(ProxyReadOpRef prdop, vector *tids); friend struct C_ProxyRead; @@ -1366,8 +1368,9 @@ protected: map proxywrite_ops; void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL); + void cancel_and_requeue_proxy_ops(hobject_t oid); void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r); - void cancel_proxy_write(ProxyWriteOpRef pwop); + void cancel_proxy_write(ProxyWriteOpRef pwop, vector *tids); friend struct C_ProxyWrite_Commit; -- 2.39.5