From f9f89be38333d3817a0eafc52a961365e21e1f3a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 21 May 2014 13:09:33 -0700 Subject: [PATCH] PG: add machinery to temporarily buffer outgoing recovery messages Signed-off-by: Samuel Just --- src/osd/PG.cc | 39 +++++++++++++++++++++++-- src/osd/PG.h | 79 +++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 112 insertions(+), 6 deletions(-) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 2c86f3ba2d251..aa84ae87e8f6e 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -7481,9 +7481,40 @@ bool PG::PriorSet::affected_by_map(const OSDMapRef osdmap, const PG *debug_pg) c void PG::RecoveryState::start_handle(RecoveryCtx *new_ctx) { assert(!rctx); - rctx = new_ctx; - if (rctx) + assert(!orig_ctx); + orig_ctx = new_ctx; + if (new_ctx) { + if (messages_pending_flush) { + rctx = RecoveryCtx(*messages_pending_flush, *new_ctx); + } else { + rctx = *new_ctx; + } rctx->start_time = ceph_clock_now(pg->cct); + } +} + +void PG::RecoveryState::begin_block_outgoing() { + assert(!messages_pending_flush); + assert(orig_ctx); + assert(rctx); + messages_pending_flush = BufferedRecoveryMessages(); + rctx = RecoveryCtx(*messages_pending_flush, *orig_ctx); +} + +void PG::RecoveryState::clear_blocked_outgoing() { + assert(orig_ctx); + assert(rctx); + messages_pending_flush = boost::optional(); +} + +void PG::RecoveryState::end_block_outgoing() { + assert(messages_pending_flush); + assert(orig_ctx); + assert(rctx); + + rctx = RecoveryCtx(*orig_ctx); + rctx->accept_buffered_messages(*messages_pending_flush); + messages_pending_flush = boost::optional(); } void PG::RecoveryState::end_handle() { @@ -7491,8 +7522,10 @@ void PG::RecoveryState::end_handle() { utime_t dur = ceph_clock_now(pg->cct) - rctx->start_time; machine.event_time += dur; } + machine.event_count++; - rctx = 0; + rctx = boost::optional(); + orig_ctx = NULL; } void intrusive_ptr_add_ref(PG *pg) { pg->get("intptr"); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 8967a56fd2d7d..a380db42406ba 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -487,6 +487,12 @@ public: public: + struct BufferedRecoveryMessages { + map > query_map; + map > > info_map; + map > > notify_list; + }; + struct RecoveryCtx { utime_t start_time; map > *query_map; @@ -508,6 +514,48 @@ public: on_applied(on_applied), on_safe(on_safe), transaction(transaction) {} + + RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx) + : query_map(&(buf.query_map)), + info_map(&(buf.info_map)), + notify_list(&(buf.notify_list)), + on_applied(rctx.on_applied), + on_safe(rctx.on_safe), + transaction(rctx.transaction) {} + + void accept_buffered_messages(BufferedRecoveryMessages &m) { + assert(query_map); + assert(info_map); + assert(notify_list); + for (map >::iterator i = m.query_map.begin(); + i != m.query_map.end(); + ++i) { + map &omap = (*query_map)[i->first]; + for (map::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + omap[j->first] = j->second; + } + } + for (map > >::iterator i + = m.info_map.begin(); + i != m.info_map.end(); + ++i) { + vector > &ovec = + (*info_map)[i->first]; + ovec.reserve(ovec.size() + i->second.size()); + ovec.insert(ovec.end(), i->second.begin(), i->second.end()); + } + for (map > >::iterator i + = m.notify_list.begin(); + i != m.notify_list.end(); + ++i) { + vector > &ovec = + (*notify_list)[i->first]; + ovec.reserve(ovec.size() + i->second.size()); + ovec.insert(ovec.end(), i->second.begin(), i->second.end()); + } + } }; struct NamedState { @@ -1337,6 +1385,11 @@ public: class RecoveryState { void start_handle(RecoveryCtx *new_ctx); void end_handle(); + public: + void begin_block_outgoing(); + void end_block_outgoing(); + void clear_blocked_outgoing(); + private: /* States */ struct Initial; @@ -1360,40 +1413,47 @@ public: /* Accessor functions for state methods */ ObjectStore::Transaction* get_cur_transaction() { + assert(state->rctx); assert(state->rctx->transaction); return state->rctx->transaction; } void send_query(pg_shard_t to, const pg_query_t &query) { + assert(state->rctx); assert(state->rctx->query_map); (*state->rctx->query_map)[to.osd][spg_t(pg->info.pgid.pgid, to.shard)] = query; } map > *get_query_map() { + assert(state->rctx); assert(state->rctx->query_map); return state->rctx->query_map; } map > > *get_info_map() { + assert(state->rctx); assert(state->rctx->info_map); return state->rctx->info_map; } list< Context* > *get_on_safe_context_list() { + assert(state->rctx); assert(state->rctx->on_safe); return &(state->rctx->on_safe->contexts); } list< Context * > *get_on_applied_context_list() { + assert(state->rctx); assert(state->rctx->on_applied); return &(state->rctx->on_applied->contexts); } - RecoveryCtx *get_recovery_ctx() { return state->rctx; } + RecoveryCtx *get_recovery_ctx() { return &*(state->rctx); } void send_notify(pg_shard_t to, const pg_notify_t &info, const pg_interval_map_t &pi) { + assert(state->rctx); assert(state->rctx->notify_list); (*state->rctx->notify_list)[to.osd].push_back(make_pair(info, pi)); } @@ -1855,10 +1915,23 @@ public: RecoveryMachine machine; PG *pg; - RecoveryCtx *rctx; + + /// context passed in by state machine caller + RecoveryCtx *orig_ctx; + + /// populated if we are buffering messages pending a flush + boost::optional messages_pending_flush; + + /** + * populated between start_handle() and end_handle(), points into + * the message lists for messages_pending_flush while blocking messages + * or into orig_ctx otherwise + */ + boost::optional rctx; public: - RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) { + RecoveryState(PG *pg) + : machine(this, pg), pg(pg), orig_ctx(0) { machine.initiate(); } -- 2.39.5