From 135c27ec74be352416d06a9d0ad78e63cf477433 Mon Sep 17 00:00:00 2001 From: David Zafman Date: Thu, 6 Mar 2014 18:08:46 -0800 Subject: [PATCH] osd: Add hit_set_flushing to track current flushes and prevent races When flushing a HitSet track in hit_set_flushing map so that agent_load_hit_sets() doesn't try to read it too soon. Fixes: #7575 Signed-off-by: David Zafman --- src/osd/ReplicatedPG.cc | 46 ++++++++++++++++++++++++++++++++++------- src/osd/ReplicatedPG.h | 9 +++++++- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6c9d5ecf48415..c09156f142684 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6168,6 +6168,10 @@ void ReplicatedPG::repop_all_applied(RepGather *repop) repop->all_applied = true; if (!repop->rep_aborted) { eval_repop(repop); + if (repop->on_applied) { + repop->on_applied->complete(0); + repop->on_applied = NULL; + } } } @@ -8821,6 +8825,10 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) repop_queue.pop_front(); dout(10) << " applying repop tid " << repop->rep_tid << dendl; repop->rep_aborted = true; + if (repop->on_applied) { + delete repop->on_applied; + repop->on_applied = NULL; + } if (requeue) { if (repop->ctx->op) { @@ -10200,6 +10208,7 @@ void ReplicatedPG::hit_set_clear() dout(20) << __func__ << dendl; hit_set.reset(); hit_set_start_stamp = utime_t(); + hit_set_flushing.clear(); } void ReplicatedPG::hit_set_setup() @@ -10289,6 +10298,15 @@ bool ReplicatedPG::hit_set_apply_log() return true; } +struct C_HitSetFlushing : public Context { + ReplicatedPGRef pg; + time_t hit_set_name; + C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { } + void finish(int r) { + pg->hit_set_flushing.erase(hit_set_name); + } +}; + void ReplicatedPG::hit_set_persist() { dout(10) << __func__ << dendl; @@ -10298,6 +10316,7 @@ void ReplicatedPG::hit_set_persist() RepGather *repop; hobject_t oid; bool reset = false; + time_t flush_time = 0; if (!info.hit_set.current_info.begin) info.hit_set.current_info.begin = hit_set_start_stamp; @@ -10315,6 +10334,9 @@ void ReplicatedPG::hit_set_persist() if (agent_state) agent_state->add_hit_set(info.hit_set.current_info.begin, hit_set); + // hold a ref until it is flushed to disk + hit_set_flushing[info.hit_set.current_info.begin] = hit_set; + flush_time = info.hit_set.current_info.begin; } else { // persist snapshot of current hitset ::encode(*hit_set, bl); @@ -10324,6 +10346,8 @@ void ReplicatedPG::hit_set_persist() ObjectContextRef obc = get_object_context(oid, true); repop = simple_repop_create(obc); + if (flush_time != 0) + repop->on_applied = new C_HitSetFlushing(this, flush_time); OpContext *ctx = repop->ctx; ctx->at_version = get_next_version(); @@ -10637,14 +10661,20 @@ void ReplicatedPG::agent_load_hit_sets() derr << __func__ << " on non-replicated pool" << dendl; break; } - bufferlist bl; - hobject_t oid = get_hit_set_archive_object(p->begin, p->end); - int r = osd->store->read(coll, oid, 0, 0, bl); - assert(r >= 0); - HitSetRef hs(new HitSet); - bufferlist::iterator pbl = bl.begin(); - ::decode(*hs, pbl); - agent_state->add_hit_set(p->begin.sec(), hs); + + // check if it's still in flight + if (hit_set_flushing.count(p->begin)) { + agent_state->add_hit_set(p->begin.sec(), hit_set_flushing[p->begin]); + } else { + bufferlist bl; + hobject_t oid = get_hit_set_archive_object(p->begin, p->end); + int r = osd->store->read(coll, oid, 0, 0, bl); + assert(r >= 0); + HitSetRef hs(new HitSet); + bufferlist::iterator pbl = bl.begin(); + ::decode(*hs, pbl); + agent_state->add_hit_set(p->begin.sec(), hs); + } } } } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c53d4b9b92ce0..c670d39eda9c4 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -595,6 +595,8 @@ public: eversion_t pg_local_last_complete; bool queue_snap_trimmer; + + Context *on_applied; RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, eversion_t lc) : @@ -607,7 +609,8 @@ public: //sent_nvram(false), sent_disk(false), pg_local_last_complete(lc), - queue_snap_trimmer(false) { } + queue_snap_trimmer(false), + on_applied(NULL) { } RepGather *get() { nref++; @@ -617,6 +620,7 @@ public: assert(nref > 0); if (--nref == 0) { delete ctx; // must already be unlocked + assert(on_applied == NULL); delete this; //generic_dout(0) << "deleting " << this << dendl; } @@ -719,6 +723,8 @@ protected: HitSetRef hit_set; ///< currently accumulating HitSet utime_t hit_set_start_stamp; ///< time the current HitSet started recording + map hit_set_flushing; ///< currently being written, not yet readable + void hit_set_clear(); ///< discard any HitSet state void hit_set_setup(); ///< initialize HitSet state void hit_set_create(); ///< create a new HitSet @@ -733,6 +739,7 @@ protected: boost::scoped_ptr agent_state; friend class C_AgentFlushStartStop; + friend class C_HitSetFlushing; void agent_setup(); ///< initialize agent state void agent_work(int max); ///< entry point to do some agent work -- 2.39.5