void PG::RecoveryState::start_handle(RecoveryCtx *new_ctx) {
assert(!rctx);
- rctx = new_ctx;
- if (rctx)
+ assert(!orig_ctx);
+ orig_ctx = new_ctx;
+ if (new_ctx) {
+ if (messages_pending_flush) {
+ rctx = RecoveryCtx(*messages_pending_flush, *new_ctx);
+ } else {
+ rctx = *new_ctx;
+ }
rctx->start_time = ceph_clock_now(pg->cct);
+ }
+}
+
+void PG::RecoveryState::begin_block_outgoing() {
+ assert(!messages_pending_flush);
+ assert(orig_ctx);
+ assert(rctx);
+ messages_pending_flush = BufferedRecoveryMessages();
+ rctx = RecoveryCtx(*messages_pending_flush, *orig_ctx);
+}
+
+void PG::RecoveryState::clear_blocked_outgoing() {
+ assert(orig_ctx);
+ assert(rctx);
+ messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
+}
+
+void PG::RecoveryState::end_block_outgoing() {
+ assert(messages_pending_flush);
+ assert(orig_ctx);
+ assert(rctx);
+
+ rctx = RecoveryCtx(*orig_ctx);
+ rctx->accept_buffered_messages(*messages_pending_flush);
+ messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
}
void PG::RecoveryState::end_handle() {
utime_t dur = ceph_clock_now(pg->cct) - rctx->start_time;
machine.event_time += dur;
}
+
machine.event_count++;
- rctx = 0;
+ rctx = boost::optional<RecoveryCtx>();
+ orig_ctx = NULL;
}
void intrusive_ptr_add_ref(PG *pg) { pg->get("intptr"); }
public:
+ struct BufferedRecoveryMessages {
+ map<int, map<spg_t, pg_query_t> > query_map;
+ map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;
+ map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
+ };
+
struct RecoveryCtx {
utime_t start_time;
map<int, map<spg_t, pg_query_t> > *query_map;
on_applied(on_applied),
on_safe(on_safe),
transaction(transaction) {}
+
+ RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx)
+ : query_map(&(buf.query_map)),
+ info_map(&(buf.info_map)),
+ notify_list(&(buf.notify_list)),
+ on_applied(rctx.on_applied),
+ on_safe(rctx.on_safe),
+ transaction(rctx.transaction) {}
+
+ void accept_buffered_messages(BufferedRecoveryMessages &m) {
+ assert(query_map);
+ assert(info_map);
+ assert(notify_list);
+ for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();
+ i != m.query_map.end();
+ ++i) {
+ map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
+ for (map<spg_t, pg_query_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ omap[j->first] = j->second;
+ }
+ }
+ for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i
+ = m.info_map.begin();
+ i != m.info_map.end();
+ ++i) {
+ vector<pair<pg_notify_t, pg_interval_map_t> > &ovec =
+ (*info_map)[i->first];
+ ovec.reserve(ovec.size() + i->second.size());
+ ovec.insert(ovec.end(), i->second.begin(), i->second.end());
+ }
+ for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i
+ = m.notify_list.begin();
+ i != m.notify_list.end();
+ ++i) {
+ vector<pair<pg_notify_t, pg_interval_map_t> > &ovec =
+ (*notify_list)[i->first];
+ ovec.reserve(ovec.size() + i->second.size());
+ ovec.insert(ovec.end(), i->second.begin(), i->second.end());
+ }
+ }
};
struct NamedState {
class RecoveryState {
void start_handle(RecoveryCtx *new_ctx);
void end_handle();
+ public:
+ void begin_block_outgoing();
+ void end_block_outgoing();
+ void clear_blocked_outgoing();
+ private:
/* States */
struct Initial;
/* Accessor functions for state methods */
ObjectStore::Transaction* get_cur_transaction() {
+ assert(state->rctx);
assert(state->rctx->transaction);
return state->rctx->transaction;
}
void send_query(pg_shard_t to, const pg_query_t &query) {
+ assert(state->rctx);
assert(state->rctx->query_map);
(*state->rctx->query_map)[to.osd][spg_t(pg->info.pgid.pgid, to.shard)] =
query;
}
map<int, map<spg_t, pg_query_t> > *get_query_map() {
+ assert(state->rctx);
assert(state->rctx->query_map);
return state->rctx->query_map;
}
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *get_info_map() {
+ assert(state->rctx);
assert(state->rctx->info_map);
return state->rctx->info_map;
}
list< Context* > *get_on_safe_context_list() {
+ assert(state->rctx);
assert(state->rctx->on_safe);
return &(state->rctx->on_safe->contexts);
}
list< Context * > *get_on_applied_context_list() {
+ assert(state->rctx);
assert(state->rctx->on_applied);
return &(state->rctx->on_applied->contexts);
}
- RecoveryCtx *get_recovery_ctx() { return state->rctx; }
+ RecoveryCtx *get_recovery_ctx() { return &*(state->rctx); }
void send_notify(pg_shard_t to,
const pg_notify_t &info, const pg_interval_map_t &pi) {
+ assert(state->rctx);
assert(state->rctx->notify_list);
(*state->rctx->notify_list)[to.osd].push_back(make_pair(info, pi));
}
RecoveryMachine machine;
PG *pg;
- RecoveryCtx *rctx;
+
+ /// context passed in by state machine caller
+ RecoveryCtx *orig_ctx;
+
+ /// populated if we are buffering messages pending a flush
+ boost::optional<BufferedRecoveryMessages> messages_pending_flush;
+
+ /**
+ * populated between start_handle() and end_handle(), points into
+ * the message lists for messages_pending_flush while blocking messages
+ * or into orig_ctx otherwise
+ */
+ boost::optional<RecoveryCtx> rctx;
public:
- RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) {
+ RecoveryState(PG *pg)
+ : machine(this, pg), pg(pg), orig_ctx(0) {
machine.initiate();
}