]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Add hit_set_flushing to track current flushes and prevent races 1405/head
authorDavid Zafman <david.zafman@inktank.com>
Fri, 7 Mar 2014 02:08:46 +0000 (18:08 -0800)
committerDavid Zafman <david.zafman@inktank.com>
Fri, 7 Mar 2014 19:48:21 +0000 (11:48 -0800)
When flushing a HitSet track in hit_set_flushing map so that
agent_load_hit_sets() doesn't try to read it too soon.

Fixes: #7575
Signed-off-by: David Zafman <david.zafman@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 6c9d5ecf484151f1ec54861b991ef89abca6556b..c09156f1426844e11ae2da4fd550af3f5c515ed1 100644 (file)
@@ -6168,6 +6168,10 @@ void ReplicatedPG::repop_all_applied(RepGather *repop)
   repop->all_applied = true;
   if (!repop->rep_aborted) {
     eval_repop(repop);
+    if (repop->on_applied) {
+     repop->on_applied->complete(0);
+     repop->on_applied = NULL;
+    }
   }
 }
 
@@ -8821,6 +8825,10 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     repop_queue.pop_front();
     dout(10) << " applying repop tid " << repop->rep_tid << dendl;
     repop->rep_aborted = true;
+    if (repop->on_applied) {
+      delete repop->on_applied;
+      repop->on_applied = NULL;
+    }
 
     if (requeue) {
       if (repop->ctx->op) {
@@ -10200,6 +10208,7 @@ void ReplicatedPG::hit_set_clear()
   dout(20) << __func__ << dendl;
   hit_set.reset();
   hit_set_start_stamp = utime_t();
+  hit_set_flushing.clear();
 }
 
 void ReplicatedPG::hit_set_setup()
@@ -10289,6 +10298,15 @@ bool ReplicatedPG::hit_set_apply_log()
   return true;
 }
 
+struct C_HitSetFlushing : public Context {
+  ReplicatedPGRef pg;
+  time_t hit_set_name;
+  C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { }
+  void finish(int r) {
+    pg->hit_set_flushing.erase(hit_set_name);
+  }
+};
+
 void ReplicatedPG::hit_set_persist()
 {
   dout(10) << __func__  << dendl;
@@ -10298,6 +10316,7 @@ void ReplicatedPG::hit_set_persist()
   RepGather *repop;
   hobject_t oid;
   bool reset = false;
+  time_t flush_time = 0;
 
   if (!info.hit_set.current_info.begin)
     info.hit_set.current_info.begin = hit_set_start_stamp;
@@ -10315,6 +10334,9 @@ void ReplicatedPG::hit_set_persist()
     if (agent_state)
       agent_state->add_hit_set(info.hit_set.current_info.begin, hit_set);
 
+    // hold a ref until it is flushed to disk
+    hit_set_flushing[info.hit_set.current_info.begin] = hit_set;
+    flush_time = info.hit_set.current_info.begin;
   } else {
     // persist snapshot of current hitset
     ::encode(*hit_set, bl);
@@ -10324,6 +10346,8 @@ void ReplicatedPG::hit_set_persist()
 
   ObjectContextRef obc = get_object_context(oid, true);
   repop = simple_repop_create(obc);
+  if (flush_time != 0)
+    repop->on_applied = new C_HitSetFlushing(this, flush_time);
   OpContext *ctx = repop->ctx;
   ctx->at_version = get_next_version();
 
@@ -10637,14 +10661,20 @@ void ReplicatedPG::agent_load_hit_sets()
          derr << __func__ << " on non-replicated pool" << dendl;
          break;
        }
-       bufferlist bl;
-       hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
-       int r = osd->store->read(coll, oid, 0, 0, bl);
-       assert(r >= 0);
-       HitSetRef hs(new HitSet);
-       bufferlist::iterator pbl = bl.begin();
-       ::decode(*hs, pbl);
-       agent_state->add_hit_set(p->begin.sec(), hs);
+
+       // check if it's still in flight
+       if (hit_set_flushing.count(p->begin)) {
+         agent_state->add_hit_set(p->begin.sec(), hit_set_flushing[p->begin]);
+       } else {
+         bufferlist bl;
+         hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
+         int r = osd->store->read(coll, oid, 0, 0, bl);
+         assert(r >= 0);
+         HitSetRef hs(new HitSet);
+         bufferlist::iterator pbl = bl.begin();
+         ::decode(*hs, pbl);
+         agent_state->add_hit_set(p->begin.sec(), hs);
+       }
       }
     }
   }
index c53d4b9b92ce036cfe02b74af5401d448c21435c..c670d39eda9c4f84499c24a75a13140e9775a1aa 100644 (file)
@@ -595,6 +595,8 @@ public:
     eversion_t          pg_local_last_complete;
 
     bool queue_snap_trimmer;
+
+    Context *on_applied;
     
     RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, 
              eversion_t lc) :
@@ -607,7 +609,8 @@ public:
       //sent_nvram(false),
       sent_disk(false),
       pg_local_last_complete(lc),
-      queue_snap_trimmer(false) { }
+      queue_snap_trimmer(false),
+      on_applied(NULL) { }
 
     RepGather *get() {
       nref++;
@@ -617,6 +620,7 @@ public:
       assert(nref > 0);
       if (--nref == 0) {
        delete ctx; // must already be unlocked
+       assert(on_applied == NULL);
        delete this;
        //generic_dout(0) << "deleting " << this << dendl;
       }
@@ -719,6 +723,8 @@ protected:
   HitSetRef hit_set;        ///< currently accumulating HitSet
   utime_t hit_set_start_stamp;    ///< time the current HitSet started recording
 
+  map<time_t,HitSetRef> hit_set_flushing; ///< currently being written, not yet readable
+
   void hit_set_clear();     ///< discard any HitSet state
   void hit_set_setup();     ///< initialize HitSet state
   void hit_set_create();    ///< create a new HitSet
@@ -733,6 +739,7 @@ protected:
   boost::scoped_ptr<TierAgentState> agent_state;
 
   friend class C_AgentFlushStartStop;
+  friend class C_HitSetFlushing;
 
   void agent_setup();       ///< initialize agent state
   void agent_work(int max); ///< entry point to do some agent work