int role = osdmap->calc_pg_role(whoami, pg->acting);
pg->set_role(role);
- PG::RecoveryCtx rctx(0, 0, 0, 0, 0);
+ PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0);
pg->handle_loaded(&rctx);
dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl;
epoch_t epoch, int from, int& created, bool primary)
{
PG *pg;
- ObjectStore::Transaction *t = 0;
- C_Contexts *fin = 0;
if (!_have_pg(info.pgid)) {
// same primary?
}
// ok, create PG locally using provided Info and History
- t = new ObjectStore::Transaction;
- fin = new C_Contexts(g_ceph_context);
- map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list; // primary -> list
- map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
- map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > info_map; // peer -> message
- PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &fin->contexts, t);
- pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, *t);
+ PG::RecoveryCtx rctx = create_context();
+ pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi,
+ *rctx.transaction);
pg->handle_create(&rctx);
- do_notifies(notify_list);
- do_queries(query_map);
- do_infos(info_map);
+ dispatch_context(rctx, pg);
created++;
dout(10) << *pg << " is new" << dendl;
pg->unlock();
return NULL;
}
- t = new ObjectStore::Transaction;
- fin = new C_Contexts(g_ceph_context);
- }
- if (t && t->empty()) {
- delete t;
- delete fin;
- } else if (t) {
- int tr = store->queue_transaction(
- &pg->osr,
- t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
}
return pg;
}
split_pg(parent, children, t);
+#if 0
// reset pg
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list; // primary -> list
map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
do_notifies(notify_list);
do_queries(query_map);
do_infos(info_map);
+#endif
}
void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t)
osdmap->get_epoch());
if (can_create_pg(pgid)) {
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *fin = new C_Contexts(g_ceph_context);
pg_interval_map_t pi;
+ PG::RecoveryCtx rctx = create_context();
PG *pg = _create_lock_pg(pgid, true, false,
- 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, pi,
- *t);
+ 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting,
+ history, pi,
+ *rctx.transaction);
creating_pgs.erase(pgid);
-
wake_pg_waiters(pg->info.pgid);
- PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
pg->handle_create(&rctx);
- pg->write_if_dirty(*t);
+ pg->write_if_dirty(*rctx.transaction);
pg->update_stats();
-
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
-
+ dispatch_context(rctx, pg);
pg->unlock();
num_created++;
}
// ----------------------------------------
// peering and recovery
+PG::RecoveryCtx OSD::create_context()
+{
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ C_Contexts *on_applied = new C_Contexts(g_ceph_context);
+ C_Contexts *on_safe = new C_Contexts(g_ceph_context);
+ map< int, map<pg_t,pg_query_t> > *query_map =
+ new map<int, map<pg_t, pg_query_t> >;
+ map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
+ new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
+ map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map =
+ new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
+ PG::RecoveryCtx rctx(query_map, info_map, notify_list,
+ on_applied, on_safe, t);
+ return rctx;
+}
+
+void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg)
+{
+ if (!ctx.transaction->empty()) {
+ ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
+ int tr = store->queue_transaction(
+ &pg->osr,
+ ctx.transaction, ctx.on_applied, ctx.on_safe);
+ assert(tr == 0);
+ ctx.transaction = new ObjectStore::Transaction;
+ ctx.on_applied = new C_Contexts(g_ceph_context);
+ ctx.on_safe = new C_Contexts(g_ceph_context);
+ }
+}
+
+void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg)
+{
+ do_notifies(*ctx.notify_list);
+ delete ctx.notify_list;
+ do_queries(*ctx.query_map);
+ delete ctx.query_map;
+ do_infos(*ctx.info_map);
+ delete ctx.info_map;
+ if (ctx.transaction->empty() || !pg) {
+ delete ctx.transaction;
+ delete ctx.on_applied;
+ delete ctx.on_safe;
+ } else {
+ ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
+ int tr = store->queue_transaction(
+ &pg->osr,
+ ctx.transaction, ctx.on_applied, ctx.on_safe);
+ assert(tr == 0);
+ }
+}
+
/** do_notifies
* Send an MOSDPGNotify to a primary, with a list of PGs that I have
* content for, and they are primary for.
dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
#endif
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *fin = new C_Contexts(g_ceph_context);
- map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list; // primary -> list
- map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
- map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > info_map; // peer -> message
- PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
-
+ PG::RecoveryCtx rctx = create_context();
int started = pg->start_recovery_ops(max, &rctx);
-
dout(10) << "do_recovery started " << started
<< " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on "
<< *pg << dendl;
* out while trying to pull.
*/
if (!started && pg->have_unfound()) {
- pg->discover_all_missing(query_map);
- if (!query_map.size()) {
+ pg->discover_all_missing(*rctx.query_map);
+ if (!rctx.query_map->size()) {
dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
recovery_wq.lock();
pg->recovery_item.remove_myself(); // sigh...
}
}
- do_notifies(notify_list);
- do_queries(query_map);
- do_infos(info_map);
-
- pg->write_if_dirty(*t);
- if (!t->empty()) {
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
- } else {
- delete t;
- delete fin;
- }
+ pg->write_if_dirty(*rctx.transaction);
+ dispatch_context(rctx, pg);
pg->unlock();
}
pg->put();
void OSD::process_peering_event(PG *pg)
{
- map< int, map<pg_t, pg_query_t> > query_map;
- map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
- map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map; // peer -> message
bool need_up_thru = false;
epoch_t same_interval_since;
OSDMapRef curmap;
+ PG::RecoveryCtx rctx = create_context();
{
map_lock.get_read();
pg->lock();
pg->unlock();
return;
}
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *pfin = new C_Contexts(g_ceph_context);
- PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list,
- &pfin->contexts, t);
advance_pg(curmap->get_epoch(), pg, &rctx);
if (!pg->peering_queue.empty()) {
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
pg->peering_queue.pop_front();
pg->handle_peering_event(evt, &rctx);
}
- if (!t->empty()) {
- int tr = store->queue_transaction(
- &pg->osr,
- t, new ObjectStore::C_DeleteTransaction(t), pfin);
- assert(tr == 0);
- } else {
- delete t;
- delete pfin;
- }
+ dispatch_context_transaction(rctx, pg);
need_up_thru = pg->need_up_thru;
same_interval_since = pg->info.history.same_interval_since;
pg->unlock();
Mutex::Locker l(osd_lock);
if (need_up_thru)
queue_want_up_thru(same_interval_since);
- do_notifies(notify_list);
- do_queries(query_map);
- do_infos(info_map);
+ dispatch_context(rctx, 0);
}
service.send_pg_temp();
}
map< int, map<pg_t, pg_query_t> > *query_map;
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map;
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list;
- list< Context* > *context_list;
+ C_Contexts *on_applied;
+ C_Contexts *on_safe;
ObjectStore::Transaction *transaction;
- RecoveryCtx() : query_map(0), info_map(0), notify_list(0),
- context_list(0), transaction(0) {}
RecoveryCtx(map< int, map<pg_t, pg_query_t> > *query_map,
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map,
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list,
- list< Context* > *context_list,
+ C_Contexts *on_applied,
+ C_Contexts *on_safe,
ObjectStore::Transaction *transaction)
: query_map(query_map), info_map(info_map),
notify_list(notify_list),
- context_list(context_list), transaction(transaction) {}
+ on_applied(on_applied),
+ on_safe(on_safe),
+ transaction(transaction) {}
};
struct NamedState {
return state->rctx->info_map;
}
- list< Context* > *get_context_list() {
- assert(state->rctx->context_list);
- return state->rctx->context_list;
+ list< Context* > *get_on_safe_context_list() {
+ assert(state->rctx->on_safe);
+ return &(state->rctx->on_safe->contexts);
+ }
+
+ list< Context * > *get_on_applied_context_list() {
+ assert(state->rctx->on_applied);
+ return &(state->rctx->on_applied->contexts);
}
void send_notify(int to, const pg_notify_t &info, const pg_interval_map_t &pi) {