From 12e22b3d44eba51a70d8babebc2684f0c46575a7 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 24 Apr 2012 16:00:49 -0700 Subject: [PATCH] OSD,PG: handle pg map advance in process_peering_event The pg map will now be advanced in process_peering_event (in advance_pg) to allow handle_osd_map to not grab pg locks in-line. handle_osd_map queues NullEvts to ensure that each pg is updated in a timely fashion. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 155 ++++++++++++---------------------------- src/osd/OSD.h | 3 +- src/osd/PG.cc | 9 +++ src/osd/PG.h | 16 +++++ src/osd/ReplicatedPG.cc | 5 +- 5 files changed, 76 insertions(+), 112 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 12c3d94972a3d..d00d2a7305291 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3250,43 +3250,6 @@ void OSD::handle_osd_map(MOSDMap *m) map_in_progress = true; } - osd_lock.Unlock(); - - op_tp.pause(); - disk_tp.pause(); - - // requeue under osd_lock to preserve ordering of _dispatch() wrt incoming messages - osd_lock.Lock(); - - op_wq.lock(); - - list rq; - while (true) { - PG *pg = op_wq._dequeue(); - if (!pg) - break; - - // op_wq is inside pg->lock - op_wq.unlock(); - pg->lock(); - op_wq.lock(); - - // we should still have something in op_queue, unless a racing - // thread did something very strange :/ - assert(!pg->op_queue.empty()); - - OpRequestRef op = pg->op_queue.front(); - pg->op_queue.pop_front(); - pg->unlock(); - pg->put(); - dout(15) << " will requeue " << *op->request << dendl; - rq.push_back(op); - } - push_waiters(rq); // requeue under osd_lock! - op_wq.unlock(); - - recovery_tp.pause(); - ObjectStore::Transaction t; // store new maps: queue for disk and put in the osdmap cache @@ -3346,6 +3309,8 @@ void OSD::handle_osd_map(MOSDMap *m) assert(0 == "MOSDMap lied about what maps it had?"); } +/* + // TODOSAM: find strategy for cluster snaps // check for cluster snapshot string cluster_snap; for (epoch_t cur = start; cur <= last && cluster_snap.length() == 0; cur++) { @@ -3368,6 +3333,7 @@ void OSD::handle_osd_map(MOSDMap *m) osd_lock.Lock(); assert(osd_lock.is_locked()); + */ if (superblock.oldest_map) { for (epoch_t e = superblock.oldest_map; e < m->oldest_map; ++e) { @@ -3383,7 +3349,6 @@ void OSD::handle_osd_map(MOSDMap *m) superblock.newest_map = last; - // finally, take map_lock _after_ we do this flush, to avoid deadlock map_lock.get_write(); C_Contexts *fin = new C_Contexts(g_ceph_context); @@ -3517,8 +3482,8 @@ void OSD::handle_osd_map(MOSDMap *m) return; } - map_lock.put_write(); clear_map_bl_cache_pins(); + map_lock.put_write(); /* * wait for this to be stable. @@ -3538,10 +3503,6 @@ void OSD::handle_osd_map(MOSDMap *m) ulock.Unlock(); osd_lock.Lock(); - op_tp.unpause(); - recovery_tp.unpause(); - disk_tp.unpause(); - if (m->newest_map && m->newest_map > last) { dout(10) << " msg say newest map is " << m->newest_map << ", requesting more" << dendl; monc->sub_want("osdmap", osdmap->get_epoch()+1, CEPH_SUBSCRIBE_ONETIME); @@ -3565,6 +3526,26 @@ void OSD::handle_osd_map(MOSDMap *m) } } +void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) +{ + assert(pg->is_locked()); + epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1; + OSDMapRef lastmap = pg->get_osdmap(); + + for (; + next_epoch <= osd_epoch; + ++next_epoch) { + OSDMapRef nextmap = get_map(next_epoch); + vector newup, newacting; + nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting); + pg->handle_advance_map(lastmap, nextmap, newup, newacting, rctx); + lastmap = nextmap; + } + pg->handle_activate_map(rctx); + if (pg->dirty_info) + pg->write_info(*rctx->transaction); +} + /** * scan placement groups, initiate any replication @@ -3655,48 +3636,6 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin) } } - // do splits? - for (map::iterator p = pool_resize.begin(); p != pool_resize.end(); p++) { - dout(10) << " processing pool " << p->first << " resize" << dendl; - clog.error() << "ignoring pool " << p->first << " resize; not fully implemented\n"; - if (false) { - for (hash_map::iterator it = pg_map.begin(); - it != pg_map.end(); - it++) { - pg_t pgid = it->first; - PG *pg = it->second; - set children; - if (pgid.is_split(p->second, pg->pool->info.get_pg_num(), &children)) { - do_split(pg, children, t, tfin); - } - } - } - } - - // if we skipped a discontinuity and are the first epoch, we won't have a previous map. - OSDMapRef lastmap; - if (osdmap->get_epoch() > superblock.oldest_map) - lastmap = get_map(osdmap->get_epoch() - 1); - - // scan existing pg's - for (hash_map::iterator it = pg_map.begin(); - it != pg_map.end(); - it++) { - PG *pg = it->second; - - vector newup, newacting; - osdmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting); - - //pg->lock_with_map_lock_held(); - - // update pg's osdmap ref, assert lock is held - pg->reassert_lock_with_map_lock_held(); - - dout(10) << "Scanning pg " << *pg << dendl; - pg->handle_advance_map(osdmap, lastmap, newup, newacting, 0); - //pg->unlock(); - } - // scan pgs with waiters map >::iterator p = waiting_for_pg.begin(); while (p != waiting_for_pg.end()) { @@ -3737,8 +3676,7 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) it != pg_map.end(); it++) { PG *pg = it->second; - //pg->lock_with_map_lock_held(); - + pg->lock_with_map_lock_held(); if (pg->is_primary()) num_pg_primary++; else if (pg->is_replica()) @@ -3748,26 +3686,18 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean) oldest_last_clean = pg->info.history.last_epoch_clean; - + if (!osdmap->have_pg_pool(pg->info.pgid.pool())) { //pool is deleted! queue_pg_for_deletion(pg); //pg->unlock(); continue; + } else { + pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch()); } - - PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &tfin, &t); - pg->handle_activate_map(&rctx); - - //pg->write_if_dirty(t); - - //pg->unlock(); + pg->unlock(); } - do_notifies(notify_list, osdmap->get_epoch()); // notify? (residual|replica) - do_queries(query_map); - do_infos(info_map); - logger->set(l_osd_pg, pg_map.size()); logger->set(l_osd_pg_primary, num_pg_primary); logger->set(l_osd_pg_replica, num_pg_replica); @@ -3775,8 +3705,10 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) wake_all_pg_waiters(); // the pg mapping may have shifted maybe_update_heartbeat_peers(); - - send_pg_temp(); +/* + // TODOSAM: solve map_cache trimming problem + trim_map_cache(oldest_last_clean); + */ if (osdmap->test_flag(CEPH_OSDMAP_FULL)) { dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl; @@ -5397,19 +5329,24 @@ void OSD::process_peering_event(PG *pg) map< int, map > query_map; map< int, vector > > notify_list; map info_map; // peer -> message + OSDMapRef curmap; + { + map_lock.get_read(); + curmap = osdmap; + map_lock.put_read(); + } { pg->lock(); - if (pg->peering_queue.empty()) { - pg->unlock(); - return; - } ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *pfin = new C_Contexts(g_ceph_context); PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &pfin->contexts, t); - PG::CephPeeringEvtRef evt = pg->peering_queue.front(); - pg->peering_queue.pop_front(); - pg->handle_peering_event(evt, &rctx); + advance_pg(curmap->get_epoch(), pg, &rctx); + if (!pg->peering_queue.empty()) { + PG::CephPeeringEvtRef evt = pg->peering_queue.front(); + pg->peering_queue.pop_front(); + pg->handle_peering_event(evt, &rctx); + } if (!t->empty()) { int tr = store->queue_transaction( &pg->osr, @@ -5432,7 +5369,7 @@ void OSD::process_peering_event(PG *pg) do_infos(info_map); { Mutex::Locker l(osd_lock); - maybe_update_heartbeat_peers(); + send_pg_temp(); } } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 9890dceeccf11..65aa71231c82a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -445,6 +445,7 @@ private: void note_down_osd(int osd); void note_up_osd(int osd); + void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx); void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin); void activate_map(ObjectStore::Transaction& t, list& tfin); @@ -1134,8 +1135,8 @@ public: void handle_sub_op(OpRequestRef op); void handle_sub_op_reply(OpRequestRef op); - static bool op_is_discardable(class MOSDOp *m); /// check if we can throw out op from a disconnected client + static bool op_is_discardable(class MOSDOp *m); /// check if op has sufficient caps bool op_has_sufficient_caps(PG *pg, class MOSDOp *m); /// check if op should be (re)queued for processing diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 0f7b2a6fe93bd..337a616e87281 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4049,6 +4049,15 @@ void PG::queue_log(epoch_t msg_epoch, MLogRec(from, msg)))); } +void PG::queue_null(epoch_t msg_epoch, + epoch_t query_epoch) +{ + dout(10) << "handle_null" << dendl; + queue_peering_event( + CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, + NullEvt()))); +} + void PG::queue_query(epoch_t msg_epoch, epoch_t query_epoch, int from, const pg_query_t& q) diff --git a/src/osd/PG.h b/src/osd/PG.h index 9ccbe7a21c9ce..86615926224bd 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -868,6 +868,9 @@ public: struct NeedUpThru : boost::statechart::event< NeedUpThru > { NeedUpThru() : boost::statechart::event< NeedUpThru >() {}; }; + struct NullEvt : boost::statechart::event< NullEvt > { + NullEvt() : boost::statechart::event< NullEvt >() {}; + }; /* Encapsulates PG recovery process */ class RecoveryState { @@ -966,12 +969,16 @@ public: boost::statechart::custom_reaction< MNotifyRec >, boost::statechart::custom_reaction< MInfoRec >, boost::statechart::custom_reaction< MLogRec >, + boost::statechart::custom_reaction< NullEvt >, boost::statechart::transition< boost::statechart::event_base, Crashed > > reactions; boost::statechart::result react(const MNotifyRec&); boost::statechart::result react(const MInfoRec&); boost::statechart::result react(const MLogRec&); + boost::statechart::result react(const NullEvt&) { + return discard_event(); + } }; struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState { @@ -982,11 +989,15 @@ public: boost::statechart::custom_reaction< QueryState >, boost::statechart::custom_reaction< AdvMap >, boost::statechart::custom_reaction< ActMap >, + boost::statechart::custom_reaction< NullEvt >, boost::statechart::transition< boost::statechart::event_base, Crashed > > reactions; boost::statechart::result react(const QueryState& q); boost::statechart::result react(const AdvMap&); boost::statechart::result react(const ActMap&); + boost::statechart::result react(const NullEvt&) { + return discard_event(); + } }; struct Start; @@ -998,10 +1009,14 @@ public: typedef boost::mpl::list < boost::statechart::custom_reaction< QueryState >, boost::statechart::custom_reaction< AdvMap >, + boost::statechart::custom_reaction< NullEvt >, boost::statechart::transition< boost::statechart::event_base, Crashed > > reactions; boost::statechart::result react(const QueryState& q); boost::statechart::result react(const AdvMap&); + boost::statechart::result react(const NullEvt&) { + return discard_event(); + } }; struct MakePrimary : boost::statechart::event< MakePrimary > { @@ -1400,6 +1415,7 @@ public: MOSDPGLog *msg); void queue_query(epoch_t msg_epoch, epoch_t query_epoch, int from, const pg_query_t& q); + void queue_null(epoch_t msg_epoch, epoch_t query_epoch); void handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap, vector& newup, vector& newacting, RecoveryCtx *rctx); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 9c8217833be5d..77287d6cad3f0 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5659,8 +5659,9 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) remove_repop(repop); } - if (requeue) - osd->push_waiters(rq); + if (requeue) { + op_waiters.splice(op_waiters.end(), rq); + } } void ReplicatedPG::on_shutdown() -- 2.39.5