]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG: add machinery to temporarily buffer outgoing recovery messages
authorSamuel Just <sam.just@inktank.com>
Wed, 21 May 2014 20:09:33 +0000 (13:09 -0700)
committerSamuel Just <sam.just@inktank.com>
Fri, 27 Jun 2014 17:51:26 +0000 (10:51 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PG.cc
src/osd/PG.h

index 2c86f3ba2d2511875be56256b2f2dedbe98f2da2..aa84ae87e8f6eb1b2e395578fd20f13154c51656 100644 (file)
@@ -7481,9 +7481,40 @@ bool PG::PriorSet::affected_by_map(const OSDMapRef osdmap, const PG *debug_pg) c
 
 void PG::RecoveryState::start_handle(RecoveryCtx *new_ctx) {
   assert(!rctx);
-  rctx = new_ctx;
-  if (rctx)
+  assert(!orig_ctx);
+  orig_ctx = new_ctx;
+  if (new_ctx) {
+    if (messages_pending_flush) {
+      rctx = RecoveryCtx(*messages_pending_flush, *new_ctx);
+    } else {
+      rctx = *new_ctx;
+    }
     rctx->start_time = ceph_clock_now(pg->cct);
+  }
+}
+
+void PG::RecoveryState::begin_block_outgoing() {
+  assert(!messages_pending_flush);
+  assert(orig_ctx);
+  assert(rctx);
+  messages_pending_flush = BufferedRecoveryMessages();
+  rctx = RecoveryCtx(*messages_pending_flush, *orig_ctx);
+}
+
+void PG::RecoveryState::clear_blocked_outgoing() {
+  assert(orig_ctx);
+  assert(rctx);
+  messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
+}
+
+void PG::RecoveryState::end_block_outgoing() {
+  assert(messages_pending_flush);
+  assert(orig_ctx);
+  assert(rctx);
+
+  rctx = RecoveryCtx(*orig_ctx);
+  rctx->accept_buffered_messages(*messages_pending_flush);
+  messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
 }
 
 void PG::RecoveryState::end_handle() {
@@ -7491,8 +7522,10 @@ void PG::RecoveryState::end_handle() {
     utime_t dur = ceph_clock_now(pg->cct) - rctx->start_time;
     machine.event_time += dur;
   }
+
   machine.event_count++;
-  rctx = 0;
+  rctx = boost::optional<RecoveryCtx>();
+  orig_ctx = NULL;
 }
 
 void intrusive_ptr_add_ref(PG *pg) { pg->get("intptr"); }
index 8967a56fd2d7d9b8cea8b415700f8944c84834e4..a380db42406ba8b2384a4a8992b92b773ffcd19d 100644 (file)
@@ -487,6 +487,12 @@ public:
 
 
 public:    
+  struct BufferedRecoveryMessages {
+    map<int, map<spg_t, pg_query_t> > query_map;
+    map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;
+    map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
+  };
+
   struct RecoveryCtx {
     utime_t start_time;
     map<int, map<spg_t, pg_query_t> > *query_map;
@@ -508,6 +514,48 @@ public:
        on_applied(on_applied),
        on_safe(on_safe),
        transaction(transaction) {}
+
+    RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx)
+      : query_map(&(buf.query_map)),
+       info_map(&(buf.info_map)),
+       notify_list(&(buf.notify_list)),
+       on_applied(rctx.on_applied),
+       on_safe(rctx.on_safe),
+       transaction(rctx.transaction) {}
+
+    void accept_buffered_messages(BufferedRecoveryMessages &m) {
+      assert(query_map);
+      assert(info_map);
+      assert(notify_list);
+      for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();
+          i != m.query_map.end();
+          ++i) {
+       map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
+       for (map<spg_t, pg_query_t>::iterator j = i->second.begin();
+            j != i->second.end();
+            ++j) {
+         omap[j->first] = j->second;
+       }
+      }
+      for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i
+            = m.info_map.begin();
+          i != m.info_map.end();
+          ++i) {
+       vector<pair<pg_notify_t, pg_interval_map_t> > &ovec =
+         (*info_map)[i->first];
+       ovec.reserve(ovec.size() + i->second.size());
+       ovec.insert(ovec.end(), i->second.begin(), i->second.end());
+      }
+      for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i
+            = m.notify_list.begin();
+          i != m.notify_list.end();
+          ++i) {
+       vector<pair<pg_notify_t, pg_interval_map_t> > &ovec =
+         (*notify_list)[i->first];
+       ovec.reserve(ovec.size() + i->second.size());
+       ovec.insert(ovec.end(), i->second.begin(), i->second.end());
+      }
+    }
   };
 
   struct NamedState {
@@ -1337,6 +1385,11 @@ public:
   class RecoveryState {
     void start_handle(RecoveryCtx *new_ctx);
     void end_handle();
+  public:
+    void begin_block_outgoing();
+    void end_block_outgoing();
+    void clear_blocked_outgoing();
+  private:
 
     /* States */
     struct Initial;
@@ -1360,40 +1413,47 @@ public:
 
       /* Accessor functions for state methods */
       ObjectStore::Transaction* get_cur_transaction() {
+       assert(state->rctx);
        assert(state->rctx->transaction);
        return state->rctx->transaction;
       }
 
       void send_query(pg_shard_t to, const pg_query_t &query) {
+       assert(state->rctx);
        assert(state->rctx->query_map);
        (*state->rctx->query_map)[to.osd][spg_t(pg->info.pgid.pgid, to.shard)] =
          query;
       }
 
       map<int, map<spg_t, pg_query_t> > *get_query_map() {
+       assert(state->rctx);
        assert(state->rctx->query_map);
        return state->rctx->query_map;
       }
 
       map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *get_info_map() {
+       assert(state->rctx);
        assert(state->rctx->info_map);
        return state->rctx->info_map;
       }
 
       list< Context* > *get_on_safe_context_list() {
+       assert(state->rctx);
        assert(state->rctx->on_safe);
        return &(state->rctx->on_safe->contexts);
       }
 
       list< Context * > *get_on_applied_context_list() {
+       assert(state->rctx);
        assert(state->rctx->on_applied);
        return &(state->rctx->on_applied->contexts);
       }
 
-      RecoveryCtx *get_recovery_ctx() { return state->rctx; }
+      RecoveryCtx *get_recovery_ctx() { return &*(state->rctx); }
 
       void send_notify(pg_shard_t to,
                       const pg_notify_t &info, const pg_interval_map_t &pi) {
+       assert(state->rctx);
        assert(state->rctx->notify_list);
        (*state->rctx->notify_list)[to.osd].push_back(make_pair(info, pi));
       }
@@ -1855,10 +1915,23 @@ public:
 
     RecoveryMachine machine;
     PG *pg;
-    RecoveryCtx *rctx;
+
+    /// context passed in by state machine caller
+    RecoveryCtx *orig_ctx;
+
+    /// populated if we are buffering messages pending a flush
+    boost::optional<BufferedRecoveryMessages> messages_pending_flush;
+
+    /**
+     * populated between start_handle() and end_handle(), points into
+     * the message lists for messages_pending_flush while blocking messages
+     * or into orig_ctx otherwise
+     */
+    boost::optional<RecoveryCtx> rctx;
 
   public:
-    RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) {
+    RecoveryState(PG *pg)
+      : machine(this, pg), pg(pg), orig_ctx(0) {
       machine.initiate();
     }