From 33b985dcfec8b9690e18f662431de41870f505ba Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 18 Jun 2012 10:08:11 -0700 Subject: [PATCH] OSD,PG: added helper methods for creating and dispatching RecoveryCtxs This is simpler than having to update all of the RecoveryCtx users whenever we change the types in RecoveryCtx. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 148 ++++++++++++++++++++++++------------------------- src/osd/OSD.h | 3 + src/osd/PG.cc | 6 +- src/osd/PG.h | 23 +++++--- 4 files changed, 93 insertions(+), 87 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a2972bf126a09..a65b87061ed5a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1364,7 +1364,7 @@ void OSD::load_pgs() int role = osdmap->calc_pg_role(whoami, pg->acting); pg->set_role(role); - PG::RecoveryCtx rctx(0, 0, 0, 0, 0); + PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0); pg->handle_loaded(&rctx); dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl; @@ -1382,8 +1382,6 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, epoch_t epoch, int from, int& created, bool primary) { PG *pg; - ObjectStore::Transaction *t = 0; - C_Contexts *fin = 0; if (!_have_pg(info.pgid)) { // same primary? @@ -1426,17 +1424,11 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, } // ok, create PG locally using provided Info and History - t = new ObjectStore::Transaction; - fin = new C_Contexts(g_ceph_context); - map< int, vector > > notify_list; // primary -> list - map< int, map > query_map; // peer -> PG -> get_summary_since - map > > info_map; // peer -> message - PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &fin->contexts, t); - pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, *t); + PG::RecoveryCtx rctx = create_context(); + pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, + *rctx.transaction); pg->handle_create(&rctx); - do_notifies(notify_list); - do_queries(query_map); - do_infos(info_map); + dispatch_context(rctx, pg); created++; dout(10) << *pg << " is new" << dendl; @@ -1454,17 +1446,6 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, pg->unlock(); return NULL; } - t = new ObjectStore::Transaction; - fin = new C_Contexts(g_ceph_context); - } - if (t && t->empty()) { - delete t; - delete fin; - } else if (t) { - int tr = store->queue_transaction( - &pg->osr, - t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); } return pg; } @@ -3943,6 +3924,7 @@ void OSD::do_split(PG *parent, set& childpgids, ObjectStore::Transaction& split_pg(parent, children, t); +#if 0 // reset pg map< int, vector > > notify_list; // primary -> list map< int, map > query_map; // peer -> PG -> get_summary_since @@ -3966,6 +3948,7 @@ void OSD::do_split(PG *parent, set& childpgids, ObjectStore::Transaction& do_notifies(notify_list); do_queries(query_map); do_infos(info_map); +#endif } void OSD::split_pg(PG *parent, map& children, ObjectStore::Transaction &t) @@ -4173,23 +4156,18 @@ void OSD::handle_pg_create(OpRequestRef op) osdmap->get_epoch()); if (can_create_pg(pgid)) { - ObjectStore::Transaction *t = new ObjectStore::Transaction; - C_Contexts *fin = new C_Contexts(g_ceph_context); pg_interval_map_t pi; + PG::RecoveryCtx rctx = create_context(); PG *pg = _create_lock_pg(pgid, true, false, - 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, pi, - *t); + 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, + history, pi, + *rctx.transaction); creating_pgs.erase(pgid); - wake_pg_waiters(pg->info.pgid); - PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); pg->handle_create(&rctx); - pg->write_if_dirty(*t); + pg->write_if_dirty(*rctx.transaction); pg->update_stats(); - - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); - + dispatch_context(rctx, pg); pg->unlock(); num_created++; } @@ -4205,6 +4183,57 @@ void OSD::handle_pg_create(OpRequestRef op) // ---------------------------------------- // peering and recovery +PG::RecoveryCtx OSD::create_context() +{ + ObjectStore::Transaction *t = new ObjectStore::Transaction; + C_Contexts *on_applied = new C_Contexts(g_ceph_context); + C_Contexts *on_safe = new C_Contexts(g_ceph_context); + map< int, map > *query_map = + new map >; + map > > *notify_list = + new map > >; + map > > *info_map = + new map > >; + PG::RecoveryCtx rctx(query_map, info_map, notify_list, + on_applied, on_safe, t); + return rctx; +} + +void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg) +{ + if (!ctx.transaction->empty()) { + ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); + int tr = store->queue_transaction( + &pg->osr, + ctx.transaction, ctx.on_applied, ctx.on_safe); + assert(tr == 0); + ctx.transaction = new ObjectStore::Transaction; + ctx.on_applied = new C_Contexts(g_ceph_context); + ctx.on_safe = new C_Contexts(g_ceph_context); + } +} + +void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg) +{ + do_notifies(*ctx.notify_list); + delete ctx.notify_list; + do_queries(*ctx.query_map); + delete ctx.query_map; + do_infos(*ctx.info_map); + delete ctx.info_map; + if (ctx.transaction->empty() || !pg) { + delete ctx.transaction; + delete ctx.on_applied; + delete ctx.on_safe; + } else { + ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); + int tr = store->queue_transaction( + &pg->osr, + ctx.transaction, ctx.on_applied, ctx.on_safe); + assert(tr == 0); + } +} + /** do_notifies * Send an MOSDPGNotify to a primary, with a list of PGs that I have * content for, and they are primary for. @@ -4847,15 +4876,8 @@ void OSD::do_recovery(PG *pg) dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl; #endif - ObjectStore::Transaction *t = new ObjectStore::Transaction; - C_Contexts *fin = new C_Contexts(g_ceph_context); - map< int, vector > > notify_list; // primary -> list - map< int, map > query_map; // peer -> PG -> get_summary_since - map > > info_map; // peer -> message - PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); - + PG::RecoveryCtx rctx = create_context(); int started = pg->start_recovery_ops(max, &rctx); - dout(10) << "do_recovery started " << started << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on " << *pg << dendl; @@ -4867,8 +4889,8 @@ void OSD::do_recovery(PG *pg) * out while trying to pull. */ if (!started && pg->have_unfound()) { - pg->discover_all_missing(query_map); - if (!query_map.size()) { + pg->discover_all_missing(*rctx.query_map); + if (!rctx.query_map->size()) { dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl; recovery_wq.lock(); pg->recovery_item.remove_myself(); // sigh... @@ -4876,19 +4898,9 @@ void OSD::do_recovery(PG *pg) } } - do_notifies(notify_list); - do_queries(query_map); - do_infos(info_map); - - pg->write_if_dirty(*t); - if (!t->empty()) { - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); - } else { - delete t; - delete fin; - } + pg->write_if_dirty(*rctx.transaction); + dispatch_context(rctx, pg); pg->unlock(); } pg->put(); @@ -5274,12 +5286,10 @@ void OSDService::queue_for_removal(epoch_t epoch, int osdnum, pg_t pgid) { void OSD::process_peering_event(PG *pg) { - map< int, map > query_map; - map< int, vector > > notify_list; - map > > info_map; // peer -> message bool need_up_thru = false; epoch_t same_interval_since; OSDMapRef curmap; + PG::RecoveryCtx rctx = create_context(); { map_lock.get_read(); pg->lock(); @@ -5289,25 +5299,13 @@ void OSD::process_peering_event(PG *pg) pg->unlock(); return; } - ObjectStore::Transaction *t = new ObjectStore::Transaction; - C_Contexts *pfin = new C_Contexts(g_ceph_context); - PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, - &pfin->contexts, t); advance_pg(curmap->get_epoch(), pg, &rctx); if (!pg->peering_queue.empty()) { PG::CephPeeringEvtRef evt = pg->peering_queue.front(); pg->peering_queue.pop_front(); pg->handle_peering_event(evt, &rctx); } - if (!t->empty()) { - int tr = store->queue_transaction( - &pg->osr, - t, new ObjectStore::C_DeleteTransaction(t), pfin); - assert(tr == 0); - } else { - delete t; - delete pfin; - } + dispatch_context_transaction(rctx, pg); need_up_thru = pg->need_up_thru; same_interval_since = pg->info.history.same_interval_since; pg->unlock(); @@ -5316,9 +5314,7 @@ void OSD::process_peering_event(PG *pg) Mutex::Locker l(osd_lock); if (need_up_thru) queue_want_up_thru(same_interval_since); - do_notifies(notify_list); - do_queries(query_map); - do_infos(info_map); + dispatch_context(rctx, 0); } service.send_pg_temp(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 4999878890998..03b3822536595 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -785,6 +785,9 @@ protected: } // -- generic pg peering -- + PG::RecoveryCtx create_context(); + void dispatch_context(PG::RecoveryCtx &ctx, PG *pg); + void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg); void do_notifies(map< int,vector > >& notify_list); void do_queries(map< int, map >& query_map); void do_infos(map > >& info_map); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index efbe3dcdb5d29..eeb416d9036f8 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4471,7 +4471,7 @@ PG::RecoveryState::Active::Active(my_context ctx) assert(pg->is_primary()); dout(10) << "In Active, about to call activate" << dendl; pg->activate(*context< RecoveryMachine >().get_cur_transaction(), - *context< RecoveryMachine >().get_context_list(), + *context< RecoveryMachine >().get_on_safe_context_list(), *context< RecoveryMachine >().get_query_map(), context< RecoveryMachine >().get_info_map()); assert(pg->is_active()); @@ -4611,7 +4611,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const RecoveryComplet assert(!pg->needs_recovery()); pg->finish_recovery(*context< RecoveryMachine >().get_cur_transaction(), - *context< RecoveryMachine >().get_context_list()); + *context< RecoveryMachine >().get_on_safe_context_list()); return discard_event(); } @@ -4695,7 +4695,7 @@ PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx) pg->last_peering_reset = pg->get_osdmap()->get_epoch(); pg->activate(*context< RecoveryMachine >().get_cur_transaction(), - *context< RecoveryMachine >().get_context_list(), + *context< RecoveryMachine >().get_on_safe_context_list(), query_map, NULL); dout(10) << "Activate Finished" << dendl; } diff --git a/src/osd/PG.h b/src/osd/PG.h index 051bd7401ca48..04c45e3129973 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -481,18 +481,20 @@ public: map< int, map > *query_map; map< int, vector > > *info_map; map< int, vector > > *notify_list; - list< Context* > *context_list; + C_Contexts *on_applied; + C_Contexts *on_safe; ObjectStore::Transaction *transaction; - RecoveryCtx() : query_map(0), info_map(0), notify_list(0), - context_list(0), transaction(0) {} RecoveryCtx(map< int, map > *query_map, map< int, vector > > *info_map, map< int, vector > > *notify_list, - list< Context* > *context_list, + C_Contexts *on_applied, + C_Contexts *on_safe, ObjectStore::Transaction *transaction) : query_map(query_map), info_map(info_map), notify_list(notify_list), - context_list(context_list), transaction(transaction) {} + on_applied(on_applied), + on_safe(on_safe), + transaction(transaction) {} }; struct NamedState { @@ -991,9 +993,14 @@ public: return state->rctx->info_map; } - list< Context* > *get_context_list() { - assert(state->rctx->context_list); - return state->rctx->context_list; + list< Context* > *get_on_safe_context_list() { + assert(state->rctx->on_safe); + return &(state->rctx->on_safe->contexts); + } + + list< Context * > *get_on_applied_context_list() { + assert(state->rctx->on_applied); + return &(state->rctx->on_applied->contexts); } void send_notify(int to, const pg_notify_t &info, const pg_interval_map_t &pi) { -- 2.39.5